DatabricksCreateJobsOperator

Use the DatabricksCreateJobsOperator to create (or reset) a Databricks job. This operator relies on past XComs to remember the job_id that was created so that repeated calls with this operator will update the existing job rather than creating new ones. When paired with the DatabricksRunNowOperator all runs will fall under the same job within the Databricks UI.

Using the Operator

There are three ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use to call the api/2.1/jobs/create endpoint and pass it directly to our DatabricksCreateJobsOperator through the json parameter. With this approach you get full control over the underlying payload to Jobs REST API, including execution of Databricks jobs with multiple tasks, but it’s harder to detect errors because of the lack of the type checking.

The second way to accomplish the same thing is to use the named parameters of the DatabricksCreateJobsOperator directly. Note that there is exactly one named parameter for each top level parameter in the api/2.1/jobs/create endpoint.

The third way is to use both the json parameter AND the named parameters. They will be merged together. If there are conflicts during the merge, the named parameters will take precedence and override the top level json keys.

Currently the named parameters that DatabricksCreateJobsOperator supports are:
  • name

  • description

  • tags

  • tasks

  • job_clusters

  • email_notifications

  • webhook_notifications

  • notification_settings

  • timeout_seconds

  • schedule

  • max_concurrent_runs

  • git_source

  • access_control_list

Examples

Specifying parameters as JSON

An example usage of the DatabricksCreateJobsOperator is as follows:

tests/system/databricks/example_databricks.py[source]

    # Example of using the JSON parameter to initialize the operator.
    job = {
        "tasks": [
            {
                "task_key": "test",
                "job_cluster_key": "job_cluster",
                "notebook_task": {
                    "notebook_path": "/Shared/test",
                },
            },
        ],
        "job_clusters": [
            {
                "job_cluster_key": "job_cluster",
                "new_cluster": {
                    "spark_version": "7.3.x-scala2.12",
                    "node_type_id": "i3.xlarge",
                    "num_workers": 2,
                },
            },
        ],
    }

    jobs_create_json = DatabricksCreateJobsOperator(task_id="jobs_create_json", json=job)

Using named parameters

You can also use named parameters to initialize the operator and run the job.

tests/system/databricks/example_databricks.py[source]

    # Example of using the named parameters to initialize the operator.
    tasks = [
        {
            "task_key": "test",
            "job_cluster_key": "job_cluster",
            "notebook_task": {
                "notebook_path": "/Shared/test",
            },
        },
    ]
    job_clusters = [
        {
            "job_cluster_key": "job_cluster",
            "new_cluster": {
                "spark_version": "7.3.x-scala2.12",
                "node_type_id": "i3.xlarge",
                "num_workers": 2,
            },
        },
    ]

    jobs_create_named = DatabricksCreateJobsOperator(
        task_id="jobs_create_named", tasks=tasks, job_clusters=job_clusters
    )

Pairing with DatabricksRunNowOperator

You can use the job_id that is returned by the DatabricksCreateJobsOperator in the return_value XCom as an argument to the DatabricksRunNowOperator to run the job.

tests/system/databricks/example_databricks.py[source]

    # Example of using the DatabricksRunNowOperator after creating a job with DatabricksCreateJobsOperator.
    run_now = DatabricksRunNowOperator(
        task_id="run_now", job_id="{{ ti.xcom_pull(task_ids='jobs_create_named') }}"
    )

    jobs_create_named >> run_now

Was this entry helpful?