dbt Cloud Operators¶
These operators can execute dbt Cloud jobs, poll for status of a currently-executing job, and download run artifacts locally.
Each of the operators can be tied to a specific dbt Cloud Account in two ways:
Explicitly provide the Account ID (via the
account_idparameter) to the operator.Or, specify the dbt Cloud Account in the Airflow Connection. The operators will fallback to using this automatically if the Account ID is not passed to the operator.
Trigger a dbt Cloud Job¶
Use the DbtCloudRunJobOperator to trigger a run of a dbt
Cloud job. By default, the operator will periodically check on the status of the executed job to terminate
with a successful status every check_interval seconds or until the job reaches a timeout length of
execution time. This functionality is controlled by the wait_for_termination parameter. Alternatively,
wait_for_termination can be set to False to perform an asynchronous wait (typically paired with the
DbtCloudJobRunSensor). Setting wait_for_termination to
False is a good approach for long-running dbt Cloud jobs.
While schema_override and steps_override are explicit, optional parameters for the
DbtCloudRunJobOperator, custom run configurations can also be passed to the operator using the
additional_run_config dictionary. This parameter can be used to initialize additional runtime
configurations or overrides for the job run such as threads_override, generate_docs_override,
git_branch, etc. For a complete list of the other configurations that can used at runtime, reference the
API documentation.
The below examples demonstrate how to instantiate DbtCloudRunJobOperator tasks with both synchronous and
asynchronous waiting for run termination, respectively. To note, the account_id for the operators is
referenced within the default_args of the example DAG.
trigger_job_run1 = DbtCloudRunJobOperator(
task_id="trigger_job_run1",
job_id=48617,
check_interval=10,
timeout=300,
)
This next example also shows how to pass in custom runtime configuration (in this case for threads_override)
via the additional_run_config dictionary.
trigger_job_run2 = DbtCloudRunJobOperator(
task_id="trigger_job_run2",
job_id=48617,
wait_for_termination=False,
additional_run_config={"threads_override": 8},
)
Poll for status of a dbt Cloud Job run¶
Use the DbtCloudJobRunSensor to periodically retrieve the
status of a dbt Cloud job run and check whether the run has succeeded. This sensor provides all of the same
functionality available with the BaseSensorOperator.
In the example below, the run_id value in the example below comes from the output of a previous
DbtCloudRunJobOperator task by utilizing the .output property exposed for all operators. Also, to note,
the account_id for the task is referenced within the default_args of the example DAG.
job_run_sensor = DbtCloudJobRunSensor(
task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)
Download run artifacts¶
Use the DbtCloudGetJobRunArtifactOperator to download
dbt-generated artifacts for a dbt Cloud job run. The specified path value should be rooted at the
target/ directory. Typical artifacts include manifest.json, catalog.json, and
run_results.json, but other artifacts such as raw SQL of models or sources.json can also be
downloaded.
For more information on dbt Cloud artifacts, reference this documentation.
get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)