airflow.contrib.operators.dataflow_operator
¶
Module Contents¶
-
class
airflow.contrib.operators.dataflow_operator.
DataFlowJavaOperator
(jar, job_name='{{task.task_id}}', dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, job_class=None, *args, **kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a Java Cloud DataFlow batch job. The parameters of the operation will be passed to the job.
Example:
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': (2016, 8, 1), 'email': ['alex@vanboxel.be'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=30), 'dataflow_default_options': { 'project': 'my-gcp-project', 'zone': 'us-central1-f', 'stagingLocation': 'gs://bucket/tmp/dataflow/staging/', } } dag = DAG('test-dag', default_args=default_args) task = DataFlowJavaOperator( gcp_conn_id='gcp_default', task_id='normalize-cal', jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar', options={ 'autoscalingAlgorithm': 'BASIC', 'maxNumWorkers': '50', 'start': '{{ds}}', 'partitionType': 'DAY' }, dag=dag)
See also
For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- Parameters
jar (str) – The reference to a self executing DataFlow jar (templated).
job_name (str) – The ‘jobName’ to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key
'jobName'
inoptions
will be overwritten.dataflow_default_options (dict) – Map of default job options.
options (dict) – Map of job specific options.
gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.
job_class (str) – The name of the dataflow job class to be executed, it is often not the main class configured in the dataflow jar file.
jar
,options
, andjob_name
are templated so you can use variables in them.Note that both
dataflow_default_options
andoptions
will be merged to specify pipeline execution parameter, anddataflow_default_options
is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG.It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.
default_args = { 'dataflow_default_options': { 'project': 'my-gcp-project', 'zone': 'europe-west1-d', 'stagingLocation': 'gs://my-staging-bucket/staging/' } }
You need to pass the path to your dataflow as a file reference with the
jar
parameter, the jar needs to be a self executing jar (see documentation here: https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar). Useoptions
to pass on options to your job.t1 = DataFlowJavaOperator( task_id='datapflow_example', jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar', options={ 'autoscalingAlgorithm': 'BASIC', 'maxNumWorkers': '50', 'start': '{{ds}}', 'partitionType': 'DAY', 'labels': {'foo' : 'bar'} }, gcp_conn_id='gcp-airflow-service-account', dag=my-dag)
-
class
airflow.contrib.operators.dataflow_operator.
DataflowTemplateOperator
(template, job_name='{{task.task_id}}', dataflow_default_options=None, parameters=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, *args, **kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Start a Templated Cloud DataFlow batch job. The parameters of the operation will be passed to the job.
- Parameters
template (str) – The reference to the DataFlow template.
job_name – The ‘jobName’ to use when executing the DataFlow template (templated).
dataflow_default_options (dict) – Map of default job environment options.
parameters (dict) – Map of job specific parameters for the template.
gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.
It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.
See also
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment
default_args = { 'dataflow_default_options': { 'project': 'my-gcp-project', 'region': 'europe-west1', 'zone': 'europe-west1-d', 'tempLocation': 'gs://my-staging-bucket/staging/', } } }
You need to pass the path to your dataflow template as a file reference with the
template
parameter. Useparameters
to pass on parameters to your job. Useenvironment
to pass on runtime environment variables to your job.t1 = DataflowTemplateOperator( task_id='datapflow_example', template='{{var.value.gcp_dataflow_base}}', parameters={ 'inputFile': "gs://bucket/input/my_input.txt", 'outputFile': "gs://bucket/output/my_output.txt" }, gcp_conn_id='gcp-airflow-service-account', dag=my-dag)
template
,dataflow_default_options
,parameters
, andjob_name
are templated so you can use variables in them.Note that
dataflow_default_options
is expected to save high-level options for project information, which apply to all dataflow operators in the DAG.See also
https://cloud.google.com/dataflow/docs/reference/rest/v1b3 /LaunchTemplateParameters https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment For more detail on job template execution have a look at the reference: https://cloud.google.com/dataflow/docs/templates/executing-templates
-
class
airflow.contrib.operators.dataflow_operator.
DataFlowPythonOperator
(py_file, job_name='{{task.task_id}}', py_options=None, dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, *args, **kwargs)[source]¶ Bases:
airflow.models.BaseOperator
Launching Cloud Dataflow jobs written in python. Note that both dataflow_default_options and options will be merged to specify pipeline execution parameter, and dataflow_default_options is expected to save high-level options, for instances, project and zone information, which apply to all dataflow operators in the DAG.
See also
For more detail on job submission have a look at the reference: https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- Parameters
py_file (str) – Reference to the python dataflow pipeline file.py, e.g., /some/local/file/path/to/your/python/pipeline/file. (templated)
job_name (str) – The ‘job_name’ to use when executing the DataFlow job (templated). This ends up being set in the pipeline options, so any entry with key
'jobName'
or'job_name'
inoptions
will be overwritten.py_options – Additional python options, e.g., [“-m”, “-v”].
dataflow_default_options (dict) – Map of default job options.
options (dict) – Map of job specific options.
gcp_conn_id (str) – The connection ID to use connecting to Google Cloud Platform.
delegate_to (str) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
poll_sleep (int) – The time in seconds to sleep between polling Google Cloud Platform for the dataflow job status while the job is in the JOB_STATE_RUNNING state.
-
class
airflow.contrib.operators.dataflow_operator.
GoogleCloudBucketHelper
(gcp_conn_id='google_cloud_default', delegate_to=None)[source]¶ Bases:
object
GoogleCloudStorageHook helper class to download GCS object.