Azure Data Factory Operators¶
Azure Data Factory is Azure’s cloud ETL service for scale-out serverless data integration and data transformation. It offers a code-free UI for intuitive authoring and single-pane-of-glass monitoring and management.
AzureDataFactoryRunPipelineOperator¶
Use the AzureDataFactoryRunPipelineOperator
to execute a pipeline within a data factory.
By default, the operator will periodically check on the status of the executed pipeline to terminate with a “Succeeded” status.
This functionality can be disabled for an asynchronous wait – typically with the AzureDataFactoryPipelineRunStatusSensor
– by setting wait_for_termination
to False.
Below is an example of using this operator to execute an Azure Data Factory pipeline.
run_pipeline1 = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="pipeline1", parameters={"myParam": "value"}, )
Below is an example of using this operator to execute an Azure Data Factory pipeline with a deferrable flag so that polling for the status of the pipeline run occurs on the Airflow Triggerer.
run_pipeline3 = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline3", pipeline_name="pipeline1", parameters={"myParam": "value"}, deferrable=True )
Here is a different example of using this operator to execute a pipeline but coupled with the AzureDataFactoryPipelineRunStatusSensor
to perform an asynchronous wait.
run_pipeline2 = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="pipeline2", wait_for_termination=False, ) pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=cast(str, XComArg(run_pipeline2, key="run_id")), ) # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor_defered", run_id=cast(str, XComArg(run_pipeline2, key="run_id")), deferrable=True, ) # The following sensor is deprecated. # Please use the AzureDataFactoryPipelineRunStatusSensor and set deferrable to True pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusAsyncSensor( task_id="pipeline_run_async_sensor", run_id=cast(str, XComArg(run_pipeline2, key="run_id")), )
Also you can use deferrable mode in AzureDataFactoryPipelineRunStatusSensor
if you would like to free up the worker slots while the sensor is running.
run_pipeline2 = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="pipeline2", wait_for_termination=False, ) pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=cast(str, XComArg(run_pipeline2, key="run_id")), ) # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor_defered", run_id=cast(str, XComArg(run_pipeline2, key="run_id")), deferrable=True, ) # The following sensor is deprecated. # Please use the AzureDataFactoryPipelineRunStatusSensor and set deferrable to True pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusAsyncSensor( task_id="pipeline_run_async_sensor", run_id=cast(str, XComArg(run_pipeline2, key="run_id")), )
Poll for status of a data factory pipeline run asynchronously¶
Use the AzureDataFactoryPipelineRunStatusAsyncSensor
(deferrable version) to periodically retrieve the
status of a data factory pipeline run asynchronously. This sensor will free up the worker slots since
polling for job status happens on the Airflow triggerer, leading to efficient utilization
of resources within Airflow.
run_pipeline2 = AzureDataFactoryRunPipelineOperator(
task_id="run_pipeline2",
pipeline_name="pipeline2",
wait_for_termination=False,
)
pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
task_id="pipeline_run_sensor",
run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
)
# Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
task_id="pipeline_run_sensor_defered",
run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
deferrable=True,
)
# The following sensor is deprecated.
# Please use the AzureDataFactoryPipelineRunStatusSensor and set deferrable to True
pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusAsyncSensor(
task_id="pipeline_run_async_sensor",
run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
)