Apache Beam Operators

Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines. Using one of the open source Beam SDKs, you build a program that defines the pipeline. The pipeline is then executed by one of Beam’s supported distributed processing back-ends, which include Apache Flink, Apache Spark, and Google Cloud Dataflow.

Run Python Pipelines in Apache Beam

The py_file argument must be specified for BeamRunPythonPipelineOperator as it contains the pipeline to be executed by Beam. The Python file can be available on GCS that Airflow has the ability to download or available on the local filesystem (provide the absolute path to it).

The py_interpreter argument specifies the Python version to be used when executing the pipeline, the default is python3`. If your Airflow instance is running on Python 2 - specify ``python2 and ensure your py_file is in Python 2. For best results, use Python 3.

If py_requirements argument is specified a temporary Python virtual environment with specified requirements will be created and within it pipeline will run.

The py_system_site_packages argument specifies whether or not all the Python packages from your Airflow instance, will be accessible within virtual environment (if py_requirements argument is specified), recommend avoiding unless the Dataflow job requires it.

Python Pipelines with DirectRunner

airflow/providers/apache/beam/example_dags/example_beam.py[source]

start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
    task_id="start_python_pipeline_local_direct_runner",
    py_file='apache_beam.examples.wordcount',
    py_options=['-m'],
    py_requirements=['apache-beam[gcp]==2.26.0'],
    py_interpreter='python3',
    py_system_site_packages=False,
)

airflow/providers/apache/beam/example_dags/example_beam.py[source]

start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
    task_id="start_python_pipeline_direct_runner",
    py_file=GCS_PYTHON,
    py_options=[],
    pipeline_options={"output": GCS_OUTPUT},
    py_requirements=['apache-beam[gcp]==2.26.0'],
    py_interpreter='python3',
    py_system_site_packages=False,
)

Python Pipelines with DataflowRunner

airflow/providers/apache/beam/example_dags/example_beam.py[source]

start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
    task_id="start_python_pipeline_dataflow_runner",
    runner="DataflowRunner",
    py_file=GCS_PYTHON,
    pipeline_options={
        'tempLocation': GCS_TMP,
        'stagingLocation': GCS_STAGING,
        'output': GCS_OUTPUT,
    },
    py_options=[],
    py_requirements=['apache-beam[gcp]==2.26.0'],
    py_interpreter='python3',
    py_system_site_packages=False,
    dataflow_config=DataflowConfiguration(
        job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
    ),
)

airflow/providers/apache/beam/example_dags/example_beam.py[source]

start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
    task_id="start_python_job_dataflow_runner_async",
    runner="DataflowRunner",
    py_file=GCS_PYTHON_DATAFLOW_ASYNC,
    pipeline_options={
        'tempLocation': GCS_TMP,
        'stagingLocation': GCS_STAGING,
        'output': GCS_OUTPUT,
    },
    py_options=[],
    py_requirements=['apache-beam[gcp]==2.26.0'],
    py_interpreter='python3',
    py_system_site_packages=False,
    dataflow_config=DataflowConfiguration(
        job_name='{{task.task_id}}',
        project_id=GCP_PROJECT_ID,
        location="us-central1",
        wait_until_finished=False,
    ),
)

wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
    task_id="wait-for-python-job-async-done",
    job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
    expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
    project_id=GCP_PROJECT_ID,
    location='us-central1',
)

start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done

Run Java Pipelines in Apache Beam

For Java pipeline the jar argument must be specified for BeamRunJavaPipelineOperator as it contains the pipeline to be executed by Apache Beam. The JAR can be available on GCS that Airflow has the ability to download or available on the local filesystem (provide the absolute path to it).

Java Pipelines with DirectRunner

airflow/providers/apache/beam/example_dags/example_beam.py[source]

jar_to_local_direct_runner = GCSToLocalFilesystemOperator(
    task_id="jar_to_local_direct_runner",
    bucket=GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
    object_name=GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
    filename="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
)

start_java_pipeline_direct_runner = BeamRunJavaPipelineOperator(
    task_id="start_java_pipeline_direct_runner",
    jar="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
    pipeline_options={
        'output': '/tmp/start_java_pipeline_direct_runner',
        'inputFile': GCS_INPUT,
    },
    job_class='org.apache.beam.examples.WordCount',
)

jar_to_local_direct_runner >> start_java_pipeline_direct_runner

Java Pipelines with DataflowRunner

airflow/providers/apache/beam/example_dags/example_beam.py[source]

jar_to_local_dataflow_runner = GCSToLocalFilesystemOperator(
    task_id="jar_to_local_dataflow_runner",
    bucket=GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
    object_name=GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
    filename="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
)

start_java_pipeline_dataflow = BeamRunJavaPipelineOperator(
    task_id="start_java_pipeline_dataflow",
    runner="DataflowRunner",
    jar="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
    pipeline_options={
        'tempLocation': GCS_TMP,
        'stagingLocation': GCS_STAGING,
        'output': GCS_OUTPUT,
    },
    job_class='org.apache.beam.examples.WordCount',
    dataflow_config={"job_name": "{{task.task_id}}", "location": "us-central1"},
)

jar_to_local_dataflow_runner >> start_java_pipeline_dataflow

Was this entry helpful?