Source code for airflow.providers.google.cloud.example_dags.example_dataflow
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG for Google Cloud Dataflow service"""importosfromdatetimeimportdatetimefromtypingimportCallable,Dict,Listfromurllib.parseimporturlparsefromairflowimportmodelsfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.apache.beam.operators.beamimport(BeamRunJavaPipelineOperator,BeamRunPythonPipelineOperator,)fromairflow.providers.google.cloud.hooks.dataflowimportDataflowJobStatusfromairflow.providers.google.cloud.operators.dataflowimport(CheckJobRunning,DataflowTemplatedJobStartOperator,)fromairflow.providers.google.cloud.sensors.dataflowimport(DataflowJobAutoScalingEventsSensor,DataflowJobMessagesSensor,DataflowJobMetricsSensor,DataflowJobStatusSensor,)fromairflow.providers.google.cloud.transfers.gcs_to_localimportGCSToLocalFilesystemOperator
}}withmodels.DAG("example_gcp_dataflow_native_java",schedule_interval='@once',# Override to match your needsstart_date=START_DATE,catchup=False,tags=['example'],)asdag_native_java:# [START howto_operator_start_java_job_jar_on_gcs]
)# [END howto_operator_start_python_job]start_python_job_local=BeamRunPythonPipelineOperator(task_id="start-python-job-local",py_file='apache_beam.examples.wordcount',py_options=['-m'],pipeline_options={'output':GCS_OUTPUT,},py_requirements=['apache-beam[gcp]==2.14.0'],py_interpreter='python3',py_system_site_packages=False,)withmodels.DAG("example_gcp_dataflow_native_python_async",default_args=default_args,start_date=START_DATE,catchup=False,schedule_interval='@once',# Override to match your needstags=['example'],)asdag_native_python_async:# [START howto_operator_start_python_job_async]
},)# [END howto_operator_start_python_job_async]# [START howto_sensor_wait_for_job_status]wait_for_python_job_async_done=DataflowJobStatusSensor(task_id="wait-for-python-job-async-done",job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",expected_statuses={DataflowJobStatus.JOB_STATE_DONE},location='europe-west3',)# [END howto_sensor_wait_for_job_status]# [START howto_sensor_wait_for_job_metric]defcheck_metric_scalar_gte(metric_name:str,value:int)->Callable:"""Check is metric greater than equals to given value."""defcallback(metrics:List[Dict])->bool:dag_native_python_async.log.info("Looking for '%s' >= %d",metric_name,value)formetricinmetrics:context=metric.get("name",{}).get("context",{})original_name=context.get("original_name","")tentative=context.get("tentative","")iforiginal_name=="Service-cpu_num_seconds"andnottentative:returnmetric["scalar"]>=valueraiseAirflowException(f"Metric '{metric_name}' not found in metrics")returncallbackwait_for_python_job_async_metric=DataflowJobMetricsSensor(task_id="wait-for-python-job-async-metric",job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",location='europe-west3',callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds",value=100),fail_on_terminal_state=False,)# [END howto_sensor_wait_for_job_metric]# [START howto_sensor_wait_for_job_message]defcheck_message(messages:List[dict])->bool:"""Check message"""formessageinmessages:if"Adding workflow start and stop steps."inmessage.get("messageText",""):returnTruereturnFalsewait_for_python_job_async_message=DataflowJobMessagesSensor(task_id="wait-for-python-job-async-message",job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",location='europe-west3',callback=check_message,fail_on_terminal_state=False,)# [END howto_sensor_wait_for_job_message]# [START howto_sensor_wait_for_job_autoscaling_event]defcheck_autoscaling_event(autoscaling_events:List[dict])->bool:"""Check autoscaling event"""forautoscaling_eventinautoscaling_events:if"Worker pool started."inautoscaling_event.get("description",{}).get("messageText",""):returnTruereturnFalsewait_for_python_job_async_autoscaling_event=DataflowJobAutoScalingEventsSensor(task_id="wait-for-python-job-async-autoscaling-event",job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",location='europe-west3',callback=check_autoscaling_event,fail_on_terminal_state=False,)# [END howto_sensor_wait_for_job_autoscaling_event]start_python_job_async>>wait_for_python_job_async_donestart_python_job_async>>wait_for_python_job_async_metricstart_python_job_async>>wait_for_python_job_async_messagestart_python_job_async>>wait_for_python_job_async_autoscaling_eventwithmodels.DAG("example_gcp_dataflow_template",default_args=default_args,start_date=START_DATE,catchup=False,schedule_interval='@once',# Override to match your needstags=['example'],)asdag_template:# [START howto_operator_start_template_job]