Source code for tests.system.providers.google.cloud.dataflow.example_dataflow_go
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG for Apache Beam operatorsRequirements: This test requires the gcloud and go commands to run."""from__future__importannotationsimportosfromdatetimeimportdatetimefrompathlibimportPathfromairflow.models.dagimportDAGfromairflow.providers.apache.beam.hooks.beamimportBeamRunnerTypefromairflow.providers.apache.beam.operators.beamimportBeamRunGoPipelineOperatorfromairflow.providers.google.cloud.hooks.dataflowimportDataflowJobStatusfromairflow.providers.google.cloud.operators.dataflowimportDataflowConfigurationfromairflow.providers.google.cloud.operators.gcsimportGCSCreateBucketOperator,GCSDeleteBucketOperatorfromairflow.providers.google.cloud.sensors.dataflowimport(DataflowJobAutoScalingEventsSensor,DataflowJobMessagesSensor,DataflowJobStatusSensor,)fromairflow.providers.google.cloud.transfers.local_to_gcsimportLocalFilesystemToGCSOperatorfromairflow.utils.trigger_ruleimportTriggerRule
upload_file=LocalFilesystemToGCSOperator(task_id="upload_file_to_bucket",src=GO_FILE_LOCAL_PATH,dst=GO_FILE_NAME,bucket=BUCKET_NAME,)start_go_pipeline_dataflow_runner=BeamRunGoPipelineOperator(task_id="start_go_pipeline_dataflow_runner",runner=BeamRunnerType.DataflowRunner,go_file=GCS_GO,pipeline_options={"tempLocation":GCS_TMP,"stagingLocation":GCS_STAGING,"output":GCS_OUTPUT,"WorkerHarnessContainerImage":"apache/beam_go_sdk:2.46.0",},dataflow_config=DataflowConfiguration(job_name="start_go_job",location=LOCATION),)wait_for_go_job_async_done=DataflowJobStatusSensor(task_id="wait_for_go_job_async_done",job_id="{{task_instance.xcom_pull('start_go_pipeline_dataflow_runner')['dataflow_job_id']}}",expected_statuses={DataflowJobStatus.JOB_STATE_DONE},location=LOCATION,)defcheck_message(messages:list[dict])->bool:"""Check message"""formessageinmessages:if"Adding workflow start and stop steps."inmessage.get("messageText",""):returnTruereturnFalsewait_for_go_job_async_message=DataflowJobMessagesSensor(task_id="wait_for_go_job_async_message",job_id="{{task_instance.xcom_pull('start_go_pipeline_dataflow_runner')['dataflow_job_id']}}",location=LOCATION,callback=check_message,fail_on_terminal_state=False,)defcheck_autoscaling_event(autoscaling_events:list[dict])->bool:"""Check autoscaling event"""forautoscaling_eventinautoscaling_events:if"Worker pool started."inautoscaling_event.get("description",{}).get("messageText",""):returnTruereturnFalsewait_for_go_job_async_autoscaling_event=DataflowJobAutoScalingEventsSensor(task_id="wait_for_go_job_async_autoscaling_event",job_id="{{task_instance.xcom_pull('start_go_pipeline_dataflow_runner')['dataflow_job_id']}}",location=LOCATION,callback=check_autoscaling_event,fail_on_terminal_state=False,)delete_bucket=GCSDeleteBucketOperator(task_id="delete_bucket",bucket_name=BUCKET_NAME,trigger_rule=TriggerRule.ALL_DONE)(# TEST SETUPcreate_bucket>>upload_file# TEST BODY>>start_go_pipeline_dataflow_runner>>[wait_for_go_job_async_done,wait_for_go_job_async_message,wait_for_go_job_async_autoscaling_event,]# TEST TEARDOWN>>delete_bucket)fromtests.system.utilsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)