dbt Cloud Operators¶
These operators can execute dbt Cloud jobs, poll for status of a currently-executing job, and download run artifacts locally.
Each of the operators can be tied to a specific dbt Cloud Account in two ways:
Explicitly provide the Account ID (via the
account_id
parameter) to the operator.Or, specify the dbt Cloud Account in the Airflow Connection. The operators will fallback to using this automatically if the Account ID is not passed to the operator.
Trigger a dbt Cloud Job¶
Use the DbtCloudRunJobOperator
to trigger a run of a dbt
Cloud job. By default, the operator will periodically check on the status of the executed job to terminate
with a successful status every check_interval
seconds or until the job reaches a timeout
length of
execution time. This functionality is controlled by the wait_for_termination
parameter. Alternatively,
wait_for_termination
can be set to False to perform an asynchronous wait (typically paired with the
DbtCloudJobRunSensor
). Setting wait_for_termination
to
False is a good approach for long-running dbt Cloud jobs.
While schema_override
and steps_override
are explicit, optional parameters for the
DbtCloudRunJobOperator
, custom run configurations can also be passed to the operator using the
additional_run_config
dictionary. This parameter can be used to initialize additional runtime
configurations or overrides for the job run such as threads_override
, generate_docs_override
,
git_branch
, etc. For a complete list of the other configurations that can used at runtime, reference the
API documentation.
The below examples demonstrate how to instantiate DbtCloudRunJobOperator tasks with both synchronous and
asynchronous waiting for run termination, respectively. To note, the account_id
for the operators is
referenced within the default_args
of the example DAG.
trigger_job_run1 = DbtCloudRunJobOperator(
task_id="trigger_job_run1",
job_id=48617,
check_interval=10,
timeout=300,
)
This next example also shows how to pass in custom runtime configuration (in this case for threads_override
)
via the additional_run_config
dictionary.
trigger_job_run2 = DbtCloudRunJobOperator(
task_id="trigger_job_run2",
job_id=48617,
wait_for_termination=False,
additional_run_config={"threads_override": 8},
)
Poll for status of a dbt Cloud Job run¶
Use the DbtCloudJobRunSensor
to periodically retrieve the
status of a dbt Cloud job run and check whether the run has succeeded. This sensor provides all of the same
functionality available with the BaseSensorOperator
.
In the example below, the run_id
value in the example below comes from the output of a previous
DbtCloudRunJobOperator task by utilizing the .output
property exposed for all operators. Also, to note,
the account_id
for the task is referenced within the default_args
of the example DAG.
job_run_sensor = DbtCloudJobRunSensor(
task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)
Download run artifacts¶
Use the DbtCloudGetJobRunArtifactOperator
to download
dbt-generated artifacts for a dbt Cloud job run. The specified path
value should be rooted at the
target/
directory. Typical artifacts include manifest.json
, catalog.json
, and
run_results.json
, but other artifacts such as raw SQL of models or sources.json
can also be
downloaded.
For more information on dbt Cloud artifacts, reference this documentation.
get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)