Azure Data Factory Operators

Azure Data Factory is Azure’s cloud ETL service for scale-out serverless data integration and data transformation. It offers a code-free UI for intuitive authoring and single-pane-of-glass monitoring and management.

AzureDataFactoryRunPipelineOperator

Use the AzureDataFactoryRunPipelineOperator to execute a pipeline within a data factory. By default, the operator will periodically check on the status of the executed pipeline to terminate with a “Succeeded” status. This functionality can be disabled for an asynchronous wait – typically with the AzureDataFactoryPipelineRunStatusSensor – by setting wait_for_termination to False.

Below is an example of using this operator to execute an Azure Data Factory pipeline.

tests/system/providers/microsoft/azure/example_adf_run_pipeline.py[source]

    run_pipeline1 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline1",
        pipeline_name="pipeline1",
        parameters={"myParam": "value"},
    )

Below is an example of using this operator to execute an Azure Data Factory pipeline with a deferrable flag so that polling for the status of the pipeline run occurs on the Airflow Triggerer.

tests/system/providers/microsoft/azure/example_adf_run_pipeline.py[source]

run_pipeline3 = AzureDataFactoryRunPipelineOperator(
    task_id="run_pipeline3",
    pipeline_name="pipeline1",
    parameters={"myParam": "value"},
    deferrable=True,
)

Here is a different example of using this operator to execute a pipeline but coupled with the AzureDataFactoryPipelineRunStatusSensor to perform an asynchronous wait.

tests/system/providers/microsoft/azure/example_adf_run_pipeline.py[source]

    run_pipeline2 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline2",
        pipeline_name="pipeline2",
        wait_for_termination=False,
    )

    pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    )

    # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
    pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor_defered",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

    pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_async_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

Also you can use deferrable mode in AzureDataFactoryPipelineRunStatusSensor if you would like to free up the worker slots while the sensor is running.

tests/system/providers/microsoft/azure/example_adf_run_pipeline.py[source]

    run_pipeline2 = AzureDataFactoryRunPipelineOperator(
        task_id="run_pipeline2",
        pipeline_name="pipeline2",
        wait_for_termination=False,
    )

    pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    )

    # Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
    pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_sensor_defered",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

    pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
        task_id="pipeline_run_async_sensor",
        run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
        deferrable=True,
    )

Poll for status of a data factory pipeline run asynchronously

Use the AzureDataFactoryPipelineRunStatusAsyncSensor (deferrable version) to periodically retrieve the status of a data factory pipeline run asynchronously. This sensor will free up the worker slots since polling for job status happens on the Airflow triggerer, leading to efficient utilization of resources within Airflow.

tests/system/providers/microsoft/azure/example_adf_run_pipeline.py[source]

run_pipeline2 = AzureDataFactoryRunPipelineOperator(
    task_id="run_pipeline2",
    pipeline_name="pipeline2",
    wait_for_termination=False,
)

pipeline_run_sensor = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_sensor",
    run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
)

# Performs polling on the Airflow Triggerer thus freeing up resources on Airflow Worker
pipeline_run_sensor_deferred = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_sensor_defered",
    run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    deferrable=True,
)

pipeline_run_async_sensor = AzureDataFactoryPipelineRunStatusSensor(
    task_id="pipeline_run_async_sensor",
    run_id=cast(str, XComArg(run_pipeline2, key="run_id")),
    deferrable=True,
)

Reference

For further information, please refer to the Microsoft documentation:

Was this entry helpful?