Source code for tests.system.google.cloud.gcs.example_gcs_sensor
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG for Google Cloud Storage sensors."""from__future__importannotationsimportosfromdatetimeimportdatetimefromairflow.models.baseoperatorimportchainfromairflow.models.dagimportDAGfromairflow.providers.google.cloud.operators.gcsimportGCSCreateBucketOperator,GCSDeleteBucketOperatorfromairflow.providers.google.cloud.sensors.gcsimport(GCSObjectExistenceSensor,GCSObjectsWithPrefixExistenceSensor,GCSObjectUpdateSensor,GCSUploadSessionCompleteSensor,)fromairflow.providers.google.cloud.transfers.gcs_to_gcsimportGCSToGCSOperatorfromairflow.utils.trigger_ruleimportTriggerRulefromproviders.tests.system.googleimportDEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
[docs]defworkaround_in_debug_executor(cls):""" DebugExecutor change sensor mode from poke to reschedule. Some sensors don't work correctly in reschedule mode. They are decorated with `poke_mode_only` decorator to fail when mode is changed. This method creates dummy property to overwrite it and force poke method to always return True. """cls.mode=dummy_mode_property()cls.poke=lambdaself,context:True
workaround_in_debug_executor(GCSUploadSessionCompleteSensor)# [START howto_sensor_gcs_upload_session_complete_task]gcs_upload_session_complete=GCSUploadSessionCompleteSensor(bucket=DESTINATION_BUCKET_NAME,prefix=FILE_NAME,inactivity_period=15,min_objects=1,allow_delete=True,previous_objects=set(),task_id="gcs_upload_session_complete_task",)# [END howto_sensor_gcs_upload_session_complete_task]# [START howto_sensor_gcs_upload_session_async_task]gcs_upload_session_async_complete=GCSUploadSessionCompleteSensor(bucket=DESTINATION_BUCKET_NAME,prefix=FILE_NAME,inactivity_period=15,min_objects=1,allow_delete=True,previous_objects=set(),task_id="gcs_upload_session_async_complete",deferrable=True,)# [END howto_sensor_gcs_upload_session_async_task]# [START howto_sensor_object_update_exists_task]gcs_update_object_exists=GCSObjectUpdateSensor(bucket=DESTINATION_BUCKET_NAME,object=FILE_NAME,task_id="gcs_object_update_sensor_task",)# [END howto_sensor_object_update_exists_task]# [START howto_sensor_object_update_exists_task_async]gcs_update_object_exists_async=GCSObjectUpdateSensor(bucket=DESTINATION_BUCKET_NAME,object=FILE_NAME,task_id="gcs_object_update_sensor_task_async",deferrable=True,)# [END howto_sensor_object_update_exists_task_async]copy_file=GCSToGCSOperator(task_id="copy_example_gcs_file",source_bucket=RESOURCES_BUCKET_NAME,source_object=UPLOAD_FILE_PATH,destination_bucket=DESTINATION_BUCKET_NAME,destination_object=FILE_NAME,exact_match=True,)# [START howto_sensor_object_exists_task]gcs_object_exists=GCSObjectExistenceSensor(bucket=DESTINATION_BUCKET_NAME,object=FILE_NAME,task_id="gcs_object_exists_task",)# [END howto_sensor_object_exists_task]# [START howto_sensor_object_exists_task_defered]gcs_object_exists_defered=GCSObjectExistenceSensor(bucket=DESTINATION_BUCKET_NAME,object=FILE_NAME,task_id="gcs_object_exists_defered",deferrable=True)# [END howto_sensor_object_exists_task_defered]# [START howto_sensor_object_with_prefix_exists_task]gcs_object_with_prefix_exists=GCSObjectsWithPrefixExistenceSensor(bucket=DESTINATION_BUCKET_NAME,prefix=FILE_NAME[:5],task_id="gcs_object_with_prefix_exists_task",)# [END howto_sensor_object_with_prefix_exists_task]# [START howto_sensor_object_with_prefix_exists_task_async]gcs_object_with_prefix_exists_async=GCSObjectsWithPrefixExistenceSensor(bucket=DESTINATION_BUCKET_NAME,prefix=FILE_NAME[:5],task_id="gcs_object_with_prefix_exists_task_async",deferrable=True,)# [END howto_sensor_object_with_prefix_exists_task_async]delete_bucket=GCSDeleteBucketOperator(task_id="delete_bucket",bucket_name=DESTINATION_BUCKET_NAME,trigger_rule=TriggerRule.ALL_DONE)chain(# TEST SETUPcreate_bucket,copy_file,# TEST BODY[gcs_object_exists,gcs_object_exists_defered,gcs_object_with_prefix_exists,gcs_object_with_prefix_exists_async,],# TEST TEARDOWNdelete_bucket,)chain(create_bucket,# TEST BODYgcs_upload_session_complete,gcs_update_object_exists,gcs_update_object_exists_async,delete_bucket,)fromtests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)