dbt Cloud Operators

These operators can execute dbt Cloud jobs, poll for status of a currently-executing job, and download run artifacts locally.

Each of the operators can be tied to a specific dbt Cloud Account in two ways:

  • Explicitly provide the Account ID (via the account_id parameter) to the operator.

  • Or, specify the dbt Cloud Account in the Airflow Connection. The operators will fallback to using this automatically if the Account ID is not passed to the operator.

Trigger a dbt Cloud Job

Use the DbtCloudRunJobOperator to trigger a run of a dbt Cloud job. By default, the operator will periodically check on the status of the executed job to terminate with a successful status every check_interval seconds or until the job reaches a timeout length of execution time. This functionality is controlled by the wait_for_termination parameter. Alternatively, wait_for_termination can be set to False to perform an asynchronous wait (typically paired with the DbtCloudJobRunSensor). Setting wait_for_termination to False is a good approach for long-running dbt Cloud jobs.

While schema_override and steps_override are explicit, optional parameters for the DbtCloudRunJobOperator, custom run configurations can also be passed to the operator using the additional_run_config dictionary. This parameter can be used to initialize additional runtime configurations or overrides for the job run such as threads_override, generate_docs_override, git_branch, etc. For a complete list of the other configurations that can used at runtime, reference the API documentation.

The below examples demonstrate how to instantiate DbtCloudRunJobOperator tasks with both synchronous and asynchronous waiting for run termination, respectively. To note, the account_id for the operators is referenced within the default_args of the example DAG.

tests/system/providers/dbt/cloud/example_dbt_cloud.py[source]

trigger_job_run1 = DbtCloudRunJobOperator(
    task_id="trigger_job_run1",
    job_id=48617,
    check_interval=10,
    timeout=300,
)

This next example also shows how to pass in custom runtime configuration (in this case for threads_override) via the additional_run_config dictionary.

tests/system/providers/dbt/cloud/example_dbt_cloud.py[source]

trigger_job_run2 = DbtCloudRunJobOperator(
    task_id="trigger_job_run2",
    job_id=48617,
    wait_for_termination=False,
    additional_run_config={"threads_override": 8},
)

Poll for status of a dbt Cloud Job run

Use the DbtCloudJobRunSensor to periodically retrieve the status of a dbt Cloud job run and check whether the run has succeeded. This sensor provides all of the same functionality available with the BaseSensorOperator.

In the example below, the run_id value in the example below comes from the output of a previous DbtCloudRunJobOperator task by utilizing the .output property exposed for all operators. Also, to note, the account_id for the task is referenced within the default_args of the example DAG.

tests/system/providers/dbt/cloud/example_dbt_cloud.py[source]

job_run_sensor = DbtCloudJobRunSensor(
    task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)

Download run artifacts

Use the DbtCloudGetJobRunArtifactOperator to download dbt-generated artifacts for a dbt Cloud job run. The specified path value should be rooted at the target/ directory. Typical artifacts include manifest.json, catalog.json, and run_results.json, but other artifacts such as raw SQL of models or sources.json can also be downloaded.

For more information on dbt Cloud artifacts, reference this documentation.

tests/system/providers/dbt/cloud/example_dbt_cloud.py[source]

get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
    task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)

Was this entry helpful?