airflow.contrib.operators.dataflow_operator

Module Contents

class airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator(jar, job_name='{{task.task_id}}', dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, job_class=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Java Cloud DataFlow batch job. The parameters of the operation will be passed to the job.

Example:

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date':
        (2016, 8, 1),
    'email': ['alex@vanboxel.be'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=30),
    'dataflow_default_options': {
        'project': 'my-gcp-project',
        'zone': 'us-central1-f',
        'stagingLocation': 'gs://bucket/tmp/dataflow/staging/',
    }
}

dag = DAG('test-dag', default_args=default_args)

task = DataFlowJavaOperator(
    gcp_conn_id='gcp_default',
    task_id='normalize-cal',
    jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
    options={
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '50',
        'start': '{{ds}}',
        'partitionType': 'DAY'

    },
    dag=dag)

See also

For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params

Parameters
  • jar (str) – The reference to a self executing DataFlow jar (templated).

  • job_name (str) – The ‘jobName’ to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key 'jobName' in options will be overwritten.

  • dataflow_default_options (dict) – Map of default job options.

  • options (dict) – Map of job specific options.

  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.

  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

  • poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.

  • job_class (str) – The name of the dataflow job class to be executed, it is often not the main class configured in the dataflow jar file.

jar, options, and job_name are templated so you can use variables in them.

Note that both dataflow_default_options and options will be merged to specify pipeline execution parameter, and dataflow_default_options is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG.

It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.

default_args = {
    'dataflow_default_options': {
        'project': 'my-gcp-project',
        'zone': 'europe-west1-d',
        'stagingLocation': 'gs://my-staging-bucket/staging/'
    }
}

You need to pass the path to your dataflow as a file reference with the jar parameter, the jar needs to be a self executing jar (see documentation here: https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar). Use options to pass on options to your job.

t1 = DataFlowJavaOperator(
    task_id='dataflow_example',
    jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar',
    options={
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '50',
        'start': '{{ds}}',
        'partitionType': 'DAY',
        'labels': {'foo' : 'bar'}
    },
    gcp_conn_id='gcp-airflow-service-account',
    dag=my-dag)
template_fields = ['options', 'jar', 'job_name'][source]
ui_color = #0273d4[source]
execute(self, context)[source]
class airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator(template, job_name='{{task.task_id}}', dataflow_default_options=None, parameters=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Templated Cloud DataFlow batch job. The parameters of the operation will be passed to the job.

Parameters
  • template (str) – The reference to the DataFlow template.

  • job_name – The ‘jobName’ to use when executing the DataFlow template (templated).

  • dataflow_default_options (dict) – Map of default job environment options.

  • parameters (dict) – Map of job specific parameters for the template.

  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.

  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

  • poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.

It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.

default_args = {
    'dataflow_default_options': {
        'project': 'my-gcp-project',
        'region': 'europe-west1',
        'zone': 'europe-west1-d',
        'tempLocation': 'gs://my-staging-bucket/staging/',
        }
    }
}

You need to pass the path to your dataflow template as a file reference with the template parameter. Use parameters to pass on parameters to your job. Use environment to pass on runtime environment variables to your job.

t1 = DataflowTemplateOperator(
    task_id='dataflow_example',
    template='{{var.value.gcp_dataflow_base}}',
    parameters={
        'inputFile': "gs://bucket/input/my_input.txt",
        'outputFile': "gs://bucket/output/my_output.txt"
    },
    gcp_conn_id='gcp-airflow-service-account',
    dag=my-dag)

template, dataflow_default_options, parameters, and job_name are templated so you can use variables in them.

Note that dataflow_default_options is expected to save high-level options for project information, which apply to all dataflow operators in the DAG.

template_fields = ['parameters', 'dataflow_default_options', 'template', 'job_name'][source]
ui_color = #0273d4[source]
execute(self, context)[source]
class airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator(py_file, job_name='{{task.task_id}}', py_options=None, dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Launching Cloud Dataflow jobs written in python. Note that both dataflow_default_options and options will be merged to specify pipeline execution parameter, and dataflow_default_options is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG.

See also

For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params

Parameters
  • py_file (str) – Reference to the python dataflow pipeline file.py, e.g., /some/local/file/path/to/your/python/pipeline/file. (templated)

  • job_name (str) – The ‘job_name’ to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key 'jobName' or 'job_name' in options will be overwritten.

  • py_options – Additional python options, e.g., [“-m”, “-v”].

  • dataflow_default_options (dict) – Map of default job options.

  • options (dict) – Map of job specific options.

  • gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.

  • delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

  • poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.

template_fields = ['options', 'dataflow_default_options', 'job_name', 'py_file'][source]
execute(self, context)[source]

Execute the python dataflow job.

class airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper(gcp_conn_id='google_cloud_default', delegate_to=None)[source]

Bases: object

GoogleCloudStorageHook helper class to download GCS object.

GCS_PREFIX_LENGTH = 5[source]
google_cloud_to_local(self, file_name)[source]

Checks whether the file specified by file_name is stored in Google Cloud Storage (GCS), if so, downloads the file and saves it locally. The full path of the saved file will be returned. Otherwise the local file_name will be returned immediately.

Parameters

file_name (str) – The full path of input file.

Returns

The full path of local file.

Return type

str

Was this entry helpful?