Source code for airflow.providers.google.cloud.example_dags.example_dataflow
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG for Google Cloud Dataflow service"""importosfromtypingimportCallable,Dict,Listfromurllib.parseimporturlparsefromairflowimportmodelsfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.google.cloud.hooks.dataflowimportDataflowJobStatusfromairflow.providers.google.cloud.operators.dataflowimport(CheckJobRunning,DataflowCreateJavaJobOperator,DataflowCreatePythonJobOperator,DataflowTemplatedJobStartOperator,)fromairflow.providers.google.cloud.sensors.dataflowimport(DataflowJobAutoScalingEventsSensor,DataflowJobMessagesSensor,DataflowJobMetricsSensor,DataflowJobStatusSensor,)fromairflow.providers.google.cloud.transfers.gcs_to_localimportGCSToLocalFilesystemOperatorfromairflow.utils.datesimportdays_agoGCS_TMP=os.environ.get('GCP_DATAFLOW_GCS_TMP','gs://INVALID BUCKET NAME/temp/')GCS_STAGING=os.environ.get('GCP_DATAFLOW_GCS_STAGING','gs://INVALID BUCKET NAME/staging/')GCS_OUTPUT=os.environ.get('GCP_DATAFLOW_GCS_OUTPUT','gs://INVALID BUCKET NAME/output')GCS_JAR=os.environ.get('GCP_DATAFLOW_JAR','gs://INVALID BUCKET NAME/word-count-beam-bundled-0.1.jar')GCS_PYTHON=os.environ.get('GCP_DATAFLOW_PYTHON','gs://INVALID BUCKET NAME/wordcount_debugging.py')GCS_JAR_PARTS=urlparse(GCS_JAR)GCS_JAR_BUCKET_NAME=GCS_JAR_PARTS.netlocGCS_JAR_OBJECT_NAME=GCS_JAR_PARTS.path[1:]default_args={'dataflow_default_options':{'tempLocation':GCS_TMP,'stagingLocation':GCS_STAGING,}}withmodels.DAG("example_gcp_dataflow_native_java",schedule_interval=None,# Override to match your needsstart_date=days_ago(1),tags=['example'],)asdag_native_java:# [START howto_operator_start_java_job_jar_on_gcs]start_java_job=DataflowCreateJavaJobOperator(task_id="start-java-job",jar=GCS_JAR,job_name='{{task.task_id}}',options={'output':GCS_OUTPUT,},poll_sleep=10,job_class='org.apache.beam.examples.WordCount',check_if_running=CheckJobRunning.IgnoreJob,location='europe-west3',)# [END howto_operator_start_java_job_jar_on_gcs]# [START howto_operator_start_java_job_local_jar]jar_to_local=GCSToLocalFilesystemOperator(task_id="jar-to-local",bucket=GCS_JAR_BUCKET_NAME,object_name=GCS_JAR_OBJECT_NAME,filename="/tmp/dataflow-{{ ds_nodash }}.jar",)start_java_job_local=DataflowCreateJavaJobOperator(task_id="start-java-job-local",jar="/tmp/dataflow-{{ ds_nodash }}.jar",job_name='{{task.task_id}}',options={'output':GCS_OUTPUT,},poll_sleep=10,job_class='org.apache.beam.examples.WordCount',check_if_running=CheckJobRunning.WaitForRun,)jar_to_local>>start_java_job_local# [END howto_operator_start_java_job_local_jar]withmodels.DAG("example_gcp_dataflow_native_python",default_args=default_args,start_date=days_ago(1),schedule_interval=None,# Override to match your needstags=['example'],)asdag_native_python:# [START howto_operator_start_python_job]start_python_job=DataflowCreatePythonJobOperator(task_id="start-python-job",py_file=GCS_PYTHON,py_options=[],job_name='{{task.task_id}}',options={'output':GCS_OUTPUT,},py_requirements=['apache-beam[gcp]==2.21.0'],py_interpreter='python3',py_system_site_packages=False,location='europe-west3',)# [END howto_operator_start_python_job]start_python_job_local=DataflowCreatePythonJobOperator(task_id="start-python-job-local",py_file='apache_beam.examples.wordcount',py_options=['-m'],job_name='{{task.task_id}}',options={'output':GCS_OUTPUT,},py_requirements=['apache-beam[gcp]==2.14.0'],py_interpreter='python3',py_system_site_packages=False,)withmodels.DAG("example_gcp_dataflow_native_python_async",default_args=default_args,start_date=days_ago(1),schedule_interval=None,# Override to match your needstags=['example'],)asdag_native_python_async:# [START howto_operator_start_python_job_async]start_python_job_async=DataflowCreatePythonJobOperator(task_id="start-python-job-async",py_file=GCS_PYTHON,py_options=[],job_name='{{task.task_id}}',options={'output':GCS_OUTPUT,},py_requirements=['apache-beam[gcp]==2.25.0'],py_interpreter='python3',py_system_site_packages=False,location='europe-west3',wait_until_finished=False,)# [END howto_operator_start_python_job_async]# [START howto_sensor_wait_for_job_status]wait_for_python_job_async_done=DataflowJobStatusSensor(task_id="wait-for-python-job-async-done",job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",expected_statuses={DataflowJobStatus.JOB_STATE_DONE},location='europe-west3',)# [END howto_sensor_wait_for_job_status]# [START howto_sensor_wait_for_job_metric]defcheck_metric_scalar_gte(metric_name:str,value:int)->Callable:"""Check is metric greater than equals to given value."""defcallback(metrics:List[Dict])->bool:dag_native_python_async.log.info("Looking for '%s' >= %d",metric_name,value)formetricinmetrics:context=metric.get("name",{}).get("context",{})original_name=context.get("original_name","")tentative=context.get("tentative","")iforiginal_name=="Service-cpu_num_seconds"andnottentative:returnmetric["scalar"]>=valueraiseAirflowException(f"Metric '{metric_name}' not found in metrics")returncallbackwait_for_python_job_async_metric=DataflowJobMetricsSensor(task_id="wait-for-python-job-async-metric",job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",location='europe-west3',callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds",value=100),)# [END howto_sensor_wait_for_job_metric]# [START howto_sensor_wait_for_job_message]defcheck_message(messages:List[dict])->bool:"""Check message"""formessageinmessages:if"Adding workflow start and stop steps."inmessage.get("messageText",""):returnTruereturnFalsewait_for_python_job_async_message=DataflowJobMessagesSensor(task_id="wait-for-python-job-async-message",job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",location='europe-west3',callback=check_message,)# [END howto_sensor_wait_for_job_message]# [START howto_sensor_wait_for_job_autoscaling_event]defcheck_autoscaling_event(autoscaling_events:List[dict])->bool:"""Check autoscaling event"""forautoscaling_eventinautoscaling_events:if"Worker pool started."inautoscaling_event.get("description",{}).get("messageText",""):returnTruereturnFalsewait_for_python_job_async_autoscaling_event=DataflowJobAutoScalingEventsSensor(task_id="wait-for-python-job-async-autoscaling-event",job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",location='europe-west3',callback=check_autoscaling_event,)# [END howto_sensor_wait_for_job_autoscaling_event]start_python_job_async>>wait_for_python_job_async_donestart_python_job_async>>wait_for_python_job_async_metricstart_python_job_async>>wait_for_python_job_async_messagestart_python_job_async>>wait_for_python_job_async_autoscaling_eventwithmodels.DAG("example_gcp_dataflow_template",default_args=default_args,start_date=days_ago(1),schedule_interval=None,# Override to match your needstags=['example'],)asdag_template:# [START howto_operator_start_template_job]start_template_job=DataflowTemplatedJobStartOperator(task_id="start-template-job",template='gs://dataflow-templates/latest/Word_Count',parameters={'inputFile':"gs://dataflow-samples/shakespeare/kinglear.txt",'output':GCS_OUTPUT},location='europe-west3',)# [END howto_operator_start_template_job]