airflow.contrib.operators.dataflow_operator¶
Module Contents¶
- 
class airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator(jar, job_name='{{task.task_id}}', dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, job_class=None, *args, **kwargs)[source]¶
- Bases: - airflow.models.BaseOperator- Start a Java Cloud DataFlow batch job. The parameters of the operation will be passed to the job. - Example: - default_args = { 'owner': 'Airflow', 'depends_on_past': False, 'start_date': (2016, 8, 1), 'email': ['alex@vanboxel.be'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=30), 'dataflow_default_options': { 'project': 'my-gcp-project', 'zone': 'us-central1-f', 'stagingLocation': 'gs://bucket/tmp/dataflow/staging/', } } dag = DAG('test-dag', default_args=default_args) task = DataFlowJavaOperator( gcp_conn_id='gcp_default', task_id='normalize-cal', jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar', options={ 'autoscalingAlgorithm': 'BASIC', 'maxNumWorkers': '50', 'start': '{{ds}}', 'partitionType': 'DAY' }, dag=dag) - See also - For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params - Parameters
- jar (str) – The reference to a self executing DataFlow jar (templated). 
- job_name (str) – The ‘jobName’ to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key - 'jobName'in- optionswill be overwritten.
- dataflow_default_options (dict) – Map of default job options. 
- options (dict) – Map of job specific options. 
- gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform. 
- delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. 
- poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state. 
- job_class (str) – The name of the dataflow job class to be executed, it is often not the main class configured in the dataflow jar file. 
 
 - jar,- options, and- job_nameare templated so you can use variables in them.- Note that both - dataflow_default_optionsand- optionswill be merged to specify pipeline execution parameter, and- dataflow_default_optionsis expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG.- It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location. - default_args = { 'dataflow_default_options': { 'project': 'my-gcp-project', 'zone': 'europe-west1-d', 'stagingLocation': 'gs://my-staging-bucket/staging/' } } - You need to pass the path to your dataflow as a file reference with the - jarparameter, the jar needs to be a self executing jar (see documentation here: https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar). Use- optionsto pass on options to your job.- t1 = DataFlowJavaOperator( task_id='dataflow_example', jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar', options={ 'autoscalingAlgorithm': 'BASIC', 'maxNumWorkers': '50', 'start': '{{ds}}', 'partitionType': 'DAY', 'labels': {'foo' : 'bar'} }, gcp_conn_id='gcp-airflow-service-account', dag=my-dag) 
- 
class airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator(template, job_name='{{task.task_id}}', dataflow_default_options=None, parameters=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, *args, **kwargs)[source]¶
- Bases: - airflow.models.BaseOperator- Start a Templated Cloud DataFlow batch job. The parameters of the operation will be passed to the job. - Parameters
- template (str) – The reference to the DataFlow template. 
- job_name – The ‘jobName’ to use when executing the DataFlow template (templated). 
- dataflow_default_options (dict) – Map of default job environment options. 
- parameters (dict) – Map of job specific parameters for the template. 
- gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform. 
- delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. 
- poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state. 
 
 - It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location. - See also - https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment - default_args = { 'dataflow_default_options': { 'project': 'my-gcp-project', 'region': 'europe-west1', 'zone': 'europe-west1-d', 'tempLocation': 'gs://my-staging-bucket/staging/', } } } - You need to pass the path to your dataflow template as a file reference with the - templateparameter. Use- parametersto pass on parameters to your job. Use- environmentto pass on runtime environment variables to your job.- t1 = DataflowTemplateOperator( task_id='dataflow_example', template='{{var.value.gcp_dataflow_base}}', parameters={ 'inputFile': "gs://bucket/input/my_input.txt", 'outputFile': "gs://bucket/output/my_output.txt" }, gcp_conn_id='gcp-airflow-service-account', dag=my-dag) - template,- dataflow_default_options,- parameters, and- job_nameare templated so you can use variables in them.- Note that - dataflow_default_optionsis expected to save high-level options for project information, which apply to all dataflow operators in the DAG.- See also - https://cloud.google.com/dataflow/docs/reference/rest/v1b3 /LaunchTemplateParameters https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment For more detail on job template execution have a look at the reference: https://cloud.google.com/dataflow/docs/templates/executing-templates 
- 
class airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator(py_file, job_name='{{task.task_id}}', py_options=None, dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, *args, **kwargs)[source]¶
- Bases: - airflow.models.BaseOperator- Launching Cloud Dataflow jobs written in python. Note that both dataflow_default_options and options will be merged to specify pipeline execution parameter, and dataflow_default_options is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG. - See also - For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params - Parameters
- py_file (str) – Reference to the python dataflow pipeline file.py, e.g., /some/local/file/path/to/your/python/pipeline/file. (templated) 
- job_name (str) – The ‘job_name’ to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key - 'jobName'or- 'job_name'in- optionswill be overwritten.
- py_options – Additional python options, e.g., [“-m”, “-v”]. 
- dataflow_default_options (dict) – Map of default job options. 
- options (dict) – Map of job specific options. 
- gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform. 
- delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. 
- poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state. 
 
 
- 
class airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper(gcp_conn_id='google_cloud_default', delegate_to=None)[source]¶
- Bases: - object- GoogleCloudStorageHook helper class to download GCS object.