Google Cloud Dataflow Operators¶
Dataflow is a managed service for executing a wide variety of data processing patterns. These pipelines are created using the Apache Beam programming model which allows for both batch and streaming processing.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Ways to run a data pipeline¶
There are several ways to run a Dataflow pipeline depending on your environment, source files:
- Non-templated pipeline: Developer can run the pipeline as a local process on the Airflow worker if you have a - *.jarfile for Java or a- *.pyfile for Python. This also means that the necessary system dependencies must be installed on the worker. For Java, worker must have the JRE Runtime installed. For Python, the Python interpreter. The runtime versions must be compatible with the pipeline versions. This is the fastest way to start a pipeline, but because of its frequent problems with system dependencies, it may cause problems. See: Java SDK pipelines, Python SDK pipelines for more detailed information.
- Templated pipeline: The programmer can make the pipeline independent of the environment by preparing a template that will then be run on a machine managed by Google. This way, changes to the environment won’t affect your pipeline. There are two types of the templates: - Classic templates. Developers run the pipeline and create a template. The Apache Beam SDK stages files in Cloud Storage, creates a template file (similar to job request), and saves the template file in Cloud Storage. See: Templated jobs 
- Flex Templates. Developers package the pipeline into a Docker image and then use the - gcloudcommand-line tool to build and save the Flex Template spec file in Cloud Storage. See: Templated jobs
 
- SQL pipeline: Developer can write pipeline as SQL statement and then execute it in Dataflow. See: Dataflow SQL 
It is a good idea to test your pipeline using the non-templated pipeline, and then run the pipeline in production using the templates.
For details on the differences between the pipeline types, see Dataflow templates in the Google Cloud documentation.
Starting Non-templated pipeline¶
To create a new pipeline using the source file (JAR in Java or Python file) use
the create job operators. The source file can be located on GCS or on the local filesystem.
BeamRunJavaPipelineOperator
or
BeamRunPythonPipelineOperator
Java SDK pipelines¶
For Java pipeline the jar argument must be specified for
BeamRunJavaPipelineOperator
as it contains the pipeline to be executed on Dataflow. The JAR can be available on GCS that Airflow
has the ability to download or available on the local filesystem (provide the absolute path to it).
Here is an example of creating and running a pipeline in Java with jar stored on GCS:
start_java_job = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start-java-job",
    jar=GCS_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "check_if_running": CheckJobRunning.IgnoreJob,
        "location": LOCATION,
        "poll_sleep": 10,
    },
)
Here is an example of creating and running a pipeline in Java with jar stored on GCS in deferrable mode:
start_java_deferrable = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start-java-job-deferrable",
    jar=GCS_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "check_if_running": CheckJobRunning.WaitForRun,
        "location": LOCATION,
        "poll_sleep": 10,
        "append_job_name": False,
    },
    deferrable=True,
)
Here is an example of creating and running a pipeline in Java with jar stored on local file system:
start_java_job_local = BeamRunJavaPipelineOperator(
    task_id="start_java_job_local",
    jar=JAR_FILE_NAME,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "check_if_running": CheckJobRunning.WaitForRun,
        "location": LOCATION,
        "poll_sleep": 10,
    },
)
Python SDK pipelines¶
The py_file argument must be specified for
BeamRunPythonPipelineOperator
as it contains the pipeline to be executed on Dataflow. The Python file can be available on GCS that Airflow
has the ability to download or available on the local filesystem (provide the absolute path to it).
The py_interpreter argument specifies the Python version to be used when executing the pipeline, the default
is python3. If your Airflow instance is running on Python 2 - specify python2 and ensure your py_file is
in Python 2. For best results, use Python 3.
If py_requirements argument is specified a temporary Python virtual environment with specified requirements will be created
and within it pipeline will run.
The py_system_site_packages argument specifies whether or not all the Python packages from your Airflow instance,
will be accessible within virtual environment (if py_requirements argument is specified),
recommend avoiding unless the Dataflow job requires it.
start_python_job = BeamRunPythonPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_python_job",
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    py_requirements=["apache-beam[gcp]==2.47.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
)
Execution models¶
Dataflow has multiple options of executing pipelines. It can be done in the following modes: batch asynchronously (fire and forget), batch blocking (wait until completion), or streaming (run indefinitely). In Airflow it is best practice to use asynchronous batch pipelines or streams and use sensors to listen for expected job state.
By default BeamRunJavaPipelineOperator,
BeamRunPythonPipelineOperator,
DataflowTemplatedJobStartOperator and
DataflowStartFlexTemplateOperator
have argument wait_until_finished set to None which cause different behaviour depends on the type of pipeline:
- for the streaming pipeline, wait for jobs to start, 
- for the batch pipeline, wait for the jobs to complete. 
If wait_until_finished is set to True operator will always wait for end of pipeline execution.
If set to False only submits the jobs.
See: Configuring PipelineOptions for execution on the Cloud Dataflow service
Asynchronous execution¶
Dataflow batch jobs are by default asynchronous; however, this is dependent on the application code (contained in the JAR
or Python file) and how it is written. In order for the Dataflow job to execute asynchronously, ensure the
pipeline objects are not being waited upon (not calling waitUntilFinish or wait_until_finish on the
PipelineResult in your application code).
start_python_job_async = BeamRunPythonPipelineOperator(
    task_id="start_python_job_async",
    runner=BeamRunnerType.DataflowRunner,
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    py_requirements=["apache-beam[gcp]==2.47.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={
        "job_name": "start_python_job_async",
        "location": LOCATION,
        "wait_until_finished": False,
    },
)
Blocking execution¶
In order for a Dataflow job to execute and wait until completion, ensure the pipeline objects are waited upon
in the application code. This can be done for the Java SDK by calling waitUntilFinish on the PipelineResult
returned from pipeline.run() or for the Python SDK by calling wait_until_finish on the PipelineResult
returned from pipeline.run().
Blocking jobs should be avoided as there is a background process that occurs when run on Airflow. This process is continuously being run to wait for the Dataflow job to be completed and increases the consumption of resources by Airflow in doing so.
Streaming execution¶
To execute a streaming Dataflow job, ensure the streaming option is set (for Python) or read from an unbounded data source, such as Pub/Sub, in your pipeline (for Java).
start_streaming_python_job = BeamRunPythonPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_streaming_python_job",
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "temp_location": GCS_TMP,
        "input_topic": "projects/pubsub-public-data/topics/taxirides-realtime",
        "output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
        "streaming": True,
    },
    py_requirements=["apache-beam[gcp]==2.47.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={"location": LOCATION, "job_name": "start_python_job_streaming"},
)
Setting argument drain_pipeline to True allows to stop streaming job by draining it
instead of canceling during killing task instance.
See the Stopping a running pipeline.
Templated jobs¶
Templates give the ability to stage a pipeline on Cloud Storage and run it from there. This provides flexibility in the development workflow as it separates the development of a pipeline from the staging and execution steps. There are two types of templates for Dataflow: Classic and Flex. See the official documentation for Dataflow templates for more information.
Here is an example of running a Dataflow job using a Classic Template with
DataflowTemplatedJobStartOperator:
start_template_job = DataflowTemplatedJobStartOperator(
    task_id="start_template_job",
    project_id=PROJECT_ID,
    template="gs://dataflow-templates/latest/Word_Count",
    parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
    location=LOCATION,
    wait_until_finished=True,
)
Also for this action you can use the operator in the deferrable mode:
start_template_job_deferrable = DataflowTemplatedJobStartOperator(
    task_id="start_template_job_deferrable",
    project_id=PROJECT_ID,
    template="gs://dataflow-templates/latest/Word_Count",
    parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
    location=LOCATION,
    deferrable=True,
)
See the list of Google-provided templates that can be used with this operator.
Here is an example of running a Dataflow job using a Flex Template with
DataflowStartFlexTemplateOperator:
start_flex_template_job = DataflowStartFlexTemplateOperator(
    task_id="start_flex_template_job",
    project_id=PROJECT_ID,
    body=BODY,
    location=LOCATION,
    append_job_name=False,
    wait_until_finished=True,
)
Also for this action you can use the operator in the deferrable mode:
start_flex_template_job_deferrable = DataflowStartFlexTemplateOperator(
    task_id="start_flex_template_job_deferrable",
    project_id=PROJECT_ID,
    body=BODY,
    location=LOCATION,
    append_job_name=False,
    deferrable=True,
)
Dataflow SQL¶
Dataflow SQL supports a variant of the ZetaSQL query syntax and includes additional streaming extensions for running Dataflow streaming jobs.
Here is an example of running Dataflow SQL job with
DataflowStartSqlJobOperator:
start_sql = DataflowStartSqlJobOperator(
    task_id="start_sql_query",
    job_name=DATAFLOW_SQL_JOB_NAME,
    query=f"""
        SELECT
            emp_name as employee,
            salary as employee_salary
        FROM
            bigquery.table.`{PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}`
        WHERE salary >= 1000;
    """,
    options={
        "bigquery-project": PROJECT_ID,
        "bigquery-dataset": BQ_SQL_DATASET,
        "bigquery-table": BQ_SQL_TABLE_OUTPUT,
    },
    location=LOCATION,
    do_xcom_push=True,
)
Warning
This operator requires gcloud command (Google Cloud SDK) must be installed on the Airflow worker
<https://cloud.google.com/sdk/docs/install>`__
See the Dataflow SQL reference.
Stopping a pipeline¶
To stop one or more Dataflow pipelines you can use
DataflowStopJobOperator.
Streaming pipelines are drained by default, setting drain_pipeline to False will cancel them instead.
Provide job_id to stop a specific job, or job_name_prefix to stop all jobs with provided name prefix.
stop_dataflow_job = DataflowStopJobOperator(
    task_id="stop_dataflow_job",
    location=LOCATION,
    job_name_prefix="start-python-pipeline",
)
Sensors¶
When job is triggered asynchronously sensors may be used to run checks for specific job properties.
wait_for_python_job_async_done = DataflowJobStatusSensor(
    task_id="wait_for_python_job_async_done",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
    location=LOCATION,
)
This operator can be run in deferrable mode by passing deferrable=True as a parameter.
wait_for_beam_python_pipeline_job_status_def = DataflowJobStatusSensor(
    task_id="wait_for_beam_python_pipeline_job_status_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    expected_statuses=DataflowJobStatus.JOB_STATE_DONE,
    location=LOCATION,
    deferrable=True,
)
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
    """Check is metric greater than equals to given value."""
    def callback(metrics: list[dict]) -> bool:
        dag.log.info("Looking for '%s' >= %d", metric_name, value)
        for metric in metrics:
            context = metric.get("name", {}).get("context", {})
            original_name = context.get("original_name", "")
            tentative = context.get("tentative", "")
            if original_name == "Service-cpu_num_seconds" and not tentative:
                return metric["scalar"] >= value
        raise AirflowException(f"Metric '{metric_name}' not found in metrics")
    return callback
wait_for_python_job_async_metric = DataflowJobMetricsSensor(
    task_id="wait_for_python_job_async_metric",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
    fail_on_terminal_state=False,
)
This operator can be run in deferrable mode by passing deferrable=True as a parameter.
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
    """Check is metric greater than equals to given value."""
    def callback(metrics: list[dict]) -> bool:
        dag.log.info("Looking for '%s' >= %d", metric_name, value)
        for metric in metrics:
            context = metric.get("name", {}).get("context", {})
            original_name = context.get("original_name", "")
            tentative = context.get("tentative", "")
            if original_name == "Service-cpu_num_seconds" and not tentative:
                return metric["scalar"] >= value
        raise AirflowException(f"Metric '{metric_name}' not found in metrics")
    return callback
wait_for_beam_python_pipeline_job_metric_def = DataflowJobMetricsSensor(
    task_id="wait_for_beam_python_pipeline_job_metric_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
    fail_on_terminal_state=False,
    deferrable=True,
)
def check_message(messages: list[dict]) -> bool:
    """Check message"""
    for message in messages:
        if "Adding workflow start and stop steps." in message.get("messageText", ""):
            return True
    return False
wait_for_python_job_async_message = DataflowJobMessagesSensor(
    task_id="wait_for_python_job_async_message",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_message,
    fail_on_terminal_state=False,
)
This operator can be run in deferrable mode by passing deferrable=True as a parameter.
def check_job_message(messages: list[dict]) -> bool:
    """Check job message."""
    for message in messages:
        if "Adding workflow start and stop steps." in message.get("messageText", ""):
            return True
    return False
wait_for_beam_python_pipeline_job_message_def = DataflowJobMessagesSensor(
    task_id="wait_for_beam_python_pipeline_job_message_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_job_message,
    fail_on_terminal_state=False,
    deferrable=True,
)
DataflowJobAutoScalingEventsSensor.
def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
    """Check autoscaling event"""
    for autoscaling_event in autoscaling_events:
        if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
            return True
    return False
wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
    task_id="wait_for_python_job_async_autoscaling_event",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_autoscaling_event,
    fail_on_terminal_state=False,
)
This operator can be run in deferrable mode by passing deferrable=True as a parameter.
def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
    """Check autoscaling event."""
    for autoscaling_event in autoscaling_events:
        if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
            return True
    return False
wait_for_beam_python_pipeline_job_autoscaling_event_def = DataflowJobAutoScalingEventsSensor(
    task_id="wait_for_beam_python_pipeline_job_autoscaling_event_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_autoscaling_event,
    fail_on_terminal_state=False,
    deferrable=True,
)
Reference¶
For further information, look at: