Google Cloud Dataflow Operators

Dataflow is a managed service for executing a wide variety of data processing patterns. These pipelines are created using the Apache Beam programming model which allows for both batch and streaming processing.

Prerequisite Tasks

To use these operators, you must do a few things:

Ways to run a data pipeline

There are several ways to run a Dataflow pipeline depending on your environment, source files:

  • Non-templated pipeline: Developer can run the pipeline as a local process on the Airflow worker if you have a *.jar file for Java or a *.py file for Python. This also means that the necessary system dependencies must be installed on the worker. For Java, worker must have the JRE Runtime installed. For Python, the Python interpreter. The runtime versions must be compatible with the pipeline versions. This is the fastest way to start a pipeline, but because of its frequent problems with system dependencies, it may cause problems. See: Java SDK pipelines, Python SDK pipelines for more detailed information. Developer can also create a pipeline by passing its structure in a JSON format and then run it to create a Job. See: JSON-formatted pipelines and JSON-formatted pipelines for more detailed information.

  • Templated pipeline: The programmer can make the pipeline independent of the environment by preparing a template that will then be run on a machine managed by Google. This way, changes to the environment won’t affect your pipeline. There are two types of the templates:

    • Classic templates. Developers run the pipeline and create a template. The Apache Beam SDK stages files in Cloud Storage, creates a template file (similar to job request), and saves the template file in Cloud Storage. See: Templated jobs

    • Flex Templates. Developers package the pipeline into a Docker image and then use the gcloud command-line tool to build and save the Flex Template spec file in Cloud Storage. See: Templated jobs

  • SQL pipeline: Developer can write pipeline as SQL statement and then execute it in Dataflow. See: Dataflow SQL

It is a good idea to test your pipeline using the non-templated pipeline, and then run the pipeline in production using the templates.

For details on the differences between the pipeline types, see Dataflow templates in the Google Cloud documentation.

Starting Non-templated pipeline

JSON-formatted pipelines

A new pipeline can be created by passing its structure in a JSON format. See DataflowCreatePipelineOperator This will create a new pipeline that will be visible on Dataflow Pipelines UI.

Here is an example of how you can create a Dataflow Pipeline by running DataflowCreatePipelineOperator:

tests/system/providers/google/cloud/dataflow/example_dataflow_pipeline.py[source]

create_pipeline = DataflowCreatePipelineOperator(
    task_id="create_pipeline",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body={
        "name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
        "type": PIPELINE_TYPE,
        "workload": {
            "dataflowFlexTemplateRequest": {
                "launchParameter": {
                    "containerSpecGcsPath": GCS_PATH,
                    "jobName": PIPELINE_JOB_NAME,
                    "environment": {"tempLocation": TEMP_LOCATION},
                    "parameters": {
                        "inputFile": INPUT_FILE,
                        "output": OUTPUT,
                    },
                },
                "projectId": GCP_PROJECT_ID,
                "location": GCP_LOCATION,
            }
        },
    },
)

To run a newly created pipeline you can use DataflowRunPipelineOperator

tests/system/providers/google/cloud/dataflow/example_dataflow_pipeline.py[source]

run_pipeline = DataflowRunPipelineOperator(
    task_id="run_pipeline",
    pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
)

Once called, the DataflowRunPipelineOperator will return the Google Cloud Dataflow Job created by running the given pipeline.

For further information regarding the API usage, see Data Pipelines API REST Resource in the Google Cloud documentation.

To create a new pipeline using the source file (JAR in Java or Python file) use the create job operators. The source file can be located on GCS or on the local filesystem. BeamRunJavaPipelineOperator or BeamRunPythonPipelineOperator

Java SDK pipelines

For Java pipeline the jar argument must be specified for BeamRunJavaPipelineOperator as it contains the pipeline to be executed on Dataflow. The JAR can be available on GCS that Airflow has the ability to download or available on the local filesystem (provide the absolute path to it).

Here is an example of creating and running a pipeline in Java with jar stored on GCS:

tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py[source]

start_java_job = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start-java-job",
    jar=GCS_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "check_if_running": CheckJobRunning.IgnoreJob,
        "location": LOCATION,
        "poll_sleep": 10,
    },
)

Here is an example of creating and running a pipeline in Java with jar stored on GCS in deferrable mode:

tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py[source]

start_java_deferrable = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start-java-job-deferrable",
    jar=GCS_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "job_name": "test-java-pipeline-job",
        "check_if_running": CheckJobRunning.WaitForRun,
        "location": LOCATION,
        "poll_sleep": 10,
        "append_job_name": False,
    },
    deferrable=True,
)

Here is an example of creating and running a pipeline in Java with jar stored on local file system:

tests/system/providers/google/cloud/dataflow/example_dataflow_native_java.py[source]

start_java_job_local = BeamRunJavaPipelineOperator(
    task_id="start_java_job_local",
    jar=JAR_FILE_NAME,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "check_if_running": CheckJobRunning.WaitForRun,
        "location": LOCATION,
        "poll_sleep": 10,
    },
)

Python SDK pipelines

The py_file argument must be specified for BeamRunPythonPipelineOperator as it contains the pipeline to be executed on Dataflow. The Python file can be available on GCS that Airflow has the ability to download or available on the local filesystem (provide the absolute path to it).

The py_interpreter argument specifies the Python version to be used when executing the pipeline, the default is python3. If your Airflow instance is running on Python 2 - specify python2 and ensure your py_file is in Python 2. For best results, use Python 3.

If py_requirements argument is specified a temporary Python virtual environment with specified requirements will be created and within it pipeline will run.

The py_system_site_packages argument specifies whether or not all the Python packages from your Airflow instance, will be accessible within virtual environment (if py_requirements argument is specified), recommend avoiding unless the Dataflow job requires it.

tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py[source]

start_python_job = BeamRunPythonPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_python_job",
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    py_requirements=["apache-beam[gcp]==2.47.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
)

Execution models

Dataflow has multiple options of executing pipelines. It can be done in the following modes: batch asynchronously (fire and forget), batch blocking (wait until completion), or streaming (run indefinitely). In Airflow it is best practice to use asynchronous batch pipelines or streams and use sensors to listen for expected job state.

By default BeamRunJavaPipelineOperator, BeamRunPythonPipelineOperator, DataflowTemplatedJobStartOperator and DataflowStartFlexTemplateOperator have argument wait_until_finished set to None which cause different behaviour depends on the type of pipeline:

  • for the streaming pipeline, wait for jobs to start,

  • for the batch pipeline, wait for the jobs to complete.

If wait_until_finished is set to True operator will always wait for end of pipeline execution. If set to False only submits the jobs.

See: Configuring PipelineOptions for execution on the Cloud Dataflow service

Asynchronous execution

Dataflow batch jobs are by default asynchronous; however, this is dependent on the application code (contained in the JAR or Python file) and how it is written. In order for the Dataflow job to execute asynchronously, ensure the pipeline objects are not being waited upon (not calling waitUntilFinish or wait_until_finish on the PipelineResult in your application code).

tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py[source]

start_python_job_async = BeamRunPythonPipelineOperator(
    task_id="start_python_job_async",
    runner=BeamRunnerType.DataflowRunner,
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    py_requirements=["apache-beam[gcp]==2.47.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={
        "job_name": "start_python_job_async",
        "location": LOCATION,
        "wait_until_finished": False,
    },
)

Blocking execution

In order for a Dataflow job to execute and wait until completion, ensure the pipeline objects are waited upon in the application code. This can be done for the Java SDK by calling waitUntilFinish on the PipelineResult returned from pipeline.run() or for the Python SDK by calling wait_until_finish on the PipelineResult returned from pipeline.run().

Blocking jobs should be avoided as there is a background process that occurs when run on Airflow. This process is continuously being run to wait for the Dataflow job to be completed and increases the consumption of resources by Airflow in doing so.

Streaming execution

To execute a streaming Dataflow job, ensure the streaming option is set (for Python) or read from an unbounded data source, such as Pub/Sub, in your pipeline (for Java).

tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py[source]

start_streaming_python_job = BeamRunPythonPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_streaming_python_job",
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "temp_location": GCS_TMP,
        "input_topic": "projects/pubsub-public-data/topics/taxirides-realtime",
        "output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
        "streaming": True,
    },
    py_requirements=["apache-beam[gcp]==2.47.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={"location": LOCATION, "job_name": "start_python_job_streaming"},
)

Setting argument drain_pipeline to True allows to stop streaming job by draining it instead of canceling during killing task instance.

See the Stopping a running pipeline.

Templated jobs

Templates give the ability to stage a pipeline on Cloud Storage and run it from there. This provides flexibility in the development workflow as it separates the development of a pipeline from the staging and execution steps. There are two types of templates for Dataflow: Classic and Flex. See the official documentation for Dataflow templates for more information.

Here is an example of running a Dataflow job using a Classic Template with DataflowTemplatedJobStartOperator:

tests/system/providers/google/cloud/dataflow/example_dataflow_template.py[source]

start_template_job = DataflowTemplatedJobStartOperator(
    task_id="start_template_job",
    project_id=PROJECT_ID,
    template="gs://dataflow-templates/latest/Word_Count",
    parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
    location=LOCATION,
    wait_until_finished=True,
)

Also for this action you can use the operator in the deferrable mode:

tests/system/providers/google/cloud/dataflow/example_dataflow_template.py[source]

start_template_job_deferrable = DataflowTemplatedJobStartOperator(
    task_id="start_template_job_deferrable",
    project_id=PROJECT_ID,
    template="gs://dataflow-templates/latest/Word_Count",
    parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
    location=LOCATION,
    deferrable=True,
)

See the list of Google-provided templates that can be used with this operator.

Here is an example of running a Dataflow job using a Flex Template with DataflowStartFlexTemplateOperator:

tests/system/providers/google/cloud/dataflow/example_dataflow_template.py[source]

start_flex_template_job = DataflowStartFlexTemplateOperator(
    task_id="start_flex_template_job",
    project_id=PROJECT_ID,
    body=BODY,
    location=LOCATION,
    append_job_name=False,
    wait_until_finished=True,
)

Also for this action you can use the operator in the deferrable mode:

tests/system/providers/google/cloud/dataflow/example_dataflow_template.py[source]

start_flex_template_job_deferrable = DataflowStartFlexTemplateOperator(
    task_id="start_flex_template_job_deferrable",
    project_id=PROJECT_ID,
    body=BODY,
    location=LOCATION,
    append_job_name=False,
    deferrable=True,
)

Dataflow SQL

Dataflow SQL supports a variant of the ZetaSQL query syntax and includes additional streaming extensions for running Dataflow streaming jobs.

Here is an example of running Dataflow SQL job with DataflowStartSqlJobOperator:

tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py[source]

start_sql = DataflowStartSqlJobOperator(
    task_id="start_sql_query",
    job_name=DATAFLOW_SQL_JOB_NAME,
    query=f"""
        SELECT
            emp_name as employee,
            salary as employee_salary
        FROM
            bigquery.table.`{PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}`
        WHERE salary >= 1000;
    """,
    options={
        "bigquery-project": PROJECT_ID,
        "bigquery-dataset": BQ_SQL_DATASET,
        "bigquery-table": BQ_SQL_TABLE_OUTPUT,
    },
    location=LOCATION,
    do_xcom_push=True,
)

Warning

This operator requires gcloud command (Google Cloud SDK) must be installed on the Airflow worker <https://cloud.google.com/sdk/docs/install>`__

See the Dataflow SQL reference.

Stopping a pipeline

To stop one or more Dataflow pipelines you can use DataflowStopJobOperator. Streaming pipelines are drained by default, setting drain_pipeline to False will cancel them instead. Provide job_id to stop a specific job, or job_name_prefix to stop all jobs with provided name prefix.

tests/system/providers/google/cloud/dataflow/example_dataflow_native_python.py[source]

stop_dataflow_job = DataflowStopJobOperator(
    task_id="stop_dataflow_job",
    location=LOCATION,
    job_name_prefix="start-python-pipeline",
)

See: Stopping a running pipeline.

Deleting a pipeline

To delete a Dataflow pipeline you can use DataflowDeletePipelineOperator. Here is an example how you can use this operator:

tests/system/providers/google/cloud/dataflow/example_dataflow_pipeline.py[source]

delete_pipeline = DataflowDeletePipelineOperator(
    task_id="delete_pipeline",
    pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Sensors

When job is triggered asynchronously sensors may be used to run checks for specific job properties.

DataflowJobStatusSensor.

tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py[source]

wait_for_python_job_async_done = DataflowJobStatusSensor(
    task_id="wait_for_python_job_async_done",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
    location=LOCATION,
)

This operator can be run in deferrable mode by passing deferrable=True as a parameter.

tests/system/providers/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[source]

wait_for_beam_python_pipeline_job_status_def = DataflowJobStatusSensor(
    task_id="wait_for_beam_python_pipeline_job_status_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    expected_statuses=DataflowJobStatus.JOB_STATE_DONE,
    location=LOCATION,
    deferrable=True,
)

DataflowJobMetricsSensor.

tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py[source]

def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
    """Check is metric greater than equals to given value."""

    def callback(metrics: list[dict]) -> bool:
        dag.log.info("Looking for '%s' >= %d", metric_name, value)
        for metric in metrics:
            context = metric.get("name", {}).get("context", {})
            original_name = context.get("original_name", "")
            tentative = context.get("tentative", "")
            if original_name == "Service-cpu_num_seconds" and not tentative:
                return metric["scalar"] >= value
        raise AirflowException(f"Metric '{metric_name}' not found in metrics")

    return callback

wait_for_python_job_async_metric = DataflowJobMetricsSensor(
    task_id="wait_for_python_job_async_metric",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
    fail_on_terminal_state=False,
)

This operator can be run in deferrable mode by passing deferrable=True as a parameter.

tests/system/providers/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[source]

def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
    """Check is metric greater than equals to given value."""

    def callback(metrics: list[dict]) -> bool:
        dag.log.info("Looking for '%s' >= %d", metric_name, value)
        for metric in metrics:
            context = metric.get("name", {}).get("context", {})
            original_name = context.get("original_name", "")
            tentative = context.get("tentative", "")
            if original_name == "Service-cpu_num_seconds" and not tentative:
                return metric["scalar"] >= value
        raise AirflowException(f"Metric '{metric_name}' not found in metrics")

    return callback

wait_for_beam_python_pipeline_job_metric_def = DataflowJobMetricsSensor(
    task_id="wait_for_beam_python_pipeline_job_metric_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
    fail_on_terminal_state=False,
    deferrable=True,
)

DataflowJobMessagesSensor.

tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py[source]

def check_message(messages: list[dict]) -> bool:
    """Check message"""
    for message in messages:
        if "Adding workflow start and stop steps." in message.get("messageText", ""):
            return True
    return False

wait_for_python_job_async_message = DataflowJobMessagesSensor(
    task_id="wait_for_python_job_async_message",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_message,
    fail_on_terminal_state=False,
)

This operator can be run in deferrable mode by passing deferrable=True as a parameter.

tests/system/providers/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[source]

def check_job_message(messages: list[dict]) -> bool:
    """Check job message."""
    for message in messages:
        if "Adding workflow start and stop steps." in message.get("messageText", ""):
            return True
    return False

wait_for_beam_python_pipeline_job_message_def = DataflowJobMessagesSensor(
    task_id="wait_for_beam_python_pipeline_job_message_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_job_message,
    fail_on_terminal_state=False,
    deferrable=True,
)

DataflowJobAutoScalingEventsSensor.

tests/system/providers/google/cloud/dataflow/example_dataflow_native_python_async.py[source]

def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
    """Check autoscaling event"""
    for autoscaling_event in autoscaling_events:
        if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
            return True
    return False

wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
    task_id="wait_for_python_job_async_autoscaling_event",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_autoscaling_event,
    fail_on_terminal_state=False,
)

This operator can be run in deferrable mode by passing deferrable=True as a parameter.

tests/system/providers/google/cloud/dataflow/example_dataflow_sensors_deferrable.py[source]

def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
    """Check autoscaling event."""
    for autoscaling_event in autoscaling_events:
        if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
            return True
    return False

wait_for_beam_python_pipeline_job_autoscaling_event_def = DataflowJobAutoScalingEventsSensor(
    task_id="wait_for_beam_python_pipeline_job_autoscaling_event_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_autoscaling_event,
    fail_on_terminal_state=False,
    deferrable=True,
)

Was this entry helpful?