Source code for tests.system.google.cloud.gcs.example_gcs_to_gcs
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG for Google Cloud Storage GCSSynchronizeBucketsOperator andGCSToGCSOperator operators."""from__future__importannotationsimportosimportshutilfromdatetimeimportdatetimefromairflow.decoratorsimporttaskfromairflow.models.baseoperatorimportchainfromairflow.models.dagimportDAGfromairflow.operators.pythonimportPythonOperatorfromairflow.providers.google.cloud.operators.gcsimport(GCSCreateBucketOperator,GCSDeleteBucketOperator,GCSListObjectsOperator,GCSSynchronizeBucketsOperator,)fromairflow.providers.google.cloud.transfers.gcs_to_gcsimportGCSToGCSOperatorfromairflow.providers.google.cloud.transfers.local_to_gcsimportLocalFilesystemToGCSOperatorfromairflow.providers.standard.operators.bashimportBashOperatorfromairflow.utils.trigger_ruleimportTriggerRulefromproviders.tests.system.googleimportDEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
[docs]defcreate_workdir()->str:""" Task creates working directory. The logic behind this task is a workaround that provides sustainable execution in Composer environment: local files can be safely shared among tasks if they are located within '/home/airflow/gcs/data/' folder which is mounted to GCS bucket under the hood (https://cloud.google.com/composer/docs/composer-2/cloud-storage). Otherwise, worker nodes don't share local path and thus files created by one task aren't guaranteed to be accessible be others. """workdir=PREFIXifos.path.exists(HOME)elseHOMEos.makedirs(PREFIX)returnworkdir
create_workdir_task=create_workdir()generate_random_file=BashOperator(task_id="generate_random_file",bash_command=f"cat /dev/urandom | head -c $((1 * 1024 * 1024)) > {PREFIX+RANDOM_FILE_NAME}",)create_bucket_src=GCSCreateBucketOperator(task_id="create_bucket_src",bucket_name=BUCKET_NAME_SRC,project_id=PROJECT_ID,)create_bucket_dst=GCSCreateBucketOperator(task_id="create_bucket_dst",bucket_name=BUCKET_NAME_DST,project_id=PROJECT_ID,)upload_file_src=LocalFilesystemToGCSOperator(task_id="upload_file_src",src=PREFIX+RANDOM_FILE_NAME,dst=PREFIX+RANDOM_FILE_NAME,bucket=BUCKET_NAME_SRC,)upload_file_src_sub=LocalFilesystemToGCSOperator(task_id="upload_file_src_sub",src=PREFIX+RANDOM_FILE_NAME,dst=f"{PREFIX}subdir/{RANDOM_FILE_NAME}",bucket=BUCKET_NAME_SRC,)upload_file_dst=LocalFilesystemToGCSOperator(task_id="upload_file_dst",src=PREFIX+RANDOM_FILE_NAME,dst=PREFIX+RANDOM_FILE_NAME,bucket=BUCKET_NAME_DST,)upload_file_dst_sub=LocalFilesystemToGCSOperator(task_id="upload_file_dst_sub",src=PREFIX+RANDOM_FILE_NAME,dst=f"{PREFIX}subdir/{RANDOM_FILE_NAME}",bucket=BUCKET_NAME_DST,)# [START howto_synch_bucket]sync_bucket=GCSSynchronizeBucketsOperator(task_id="sync_bucket",source_bucket=BUCKET_NAME_SRC,destination_bucket=BUCKET_NAME_DST)# [END howto_synch_bucket]# [START howto_synch_full_bucket]sync_full_bucket=GCSSynchronizeBucketsOperator(task_id="sync_full_bucket",source_bucket=BUCKET_NAME_SRC,destination_bucket=BUCKET_NAME_DST,delete_extra_files=True,allow_overwrite=True,)# [END howto_synch_full_bucket]# [START howto_synch_to_subdir]sync_to_subdirectory=GCSSynchronizeBucketsOperator(task_id="sync_to_subdirectory",source_bucket=BUCKET_NAME_SRC,destination_bucket=BUCKET_NAME_DST,destination_object="subdir/",)# [END howto_synch_to_subdir]# [START howto_sync_from_subdir]sync_from_subdirectory=GCSSynchronizeBucketsOperator(task_id="sync_from_subdirectory",source_bucket=BUCKET_NAME_SRC,source_object="subdir/",destination_bucket=BUCKET_NAME_DST,)# [END howto_sync_from_subdir]# [START howto_operator_gcs_to_gcs_single_file]copy_single_file=GCSToGCSOperator(task_id="copy_single_gcs_file",source_bucket=BUCKET_NAME_SRC,source_object=PREFIX+OBJECT_1,destination_bucket=BUCKET_NAME_DST,# If not supplied the source_bucket value will be useddestination_object="backup_"+OBJECT_1,# If not supplied the source_object value will be usedexact_match=True,)# [END howto_operator_gcs_to_gcs_single_file]# [START howto_operator_gcs_to_gcs_without_wildcard]copy_files=GCSToGCSOperator(task_id="copy_files",source_bucket=BUCKET_NAME_SRC,source_object=PREFIX+"subdir/",destination_bucket=BUCKET_NAME_DST,destination_object="backup/",)# [END howto_operator_gcs_to_gcs_without_wildcard]# [START howto_operator_gcs_to_gcs_match_glob]copy_files_with_match_glob=GCSToGCSOperator(task_id="copy_files_with_match_glob",source_bucket=BUCKET_NAME_SRC,source_object="data/",destination_bucket=BUCKET_NAME_DST,destination_object="backup/",match_glob="**/*.txt",)# [END howto_operator_gcs_to_gcs_match_glob]# [START howto_operator_gcs_to_gcs_list]copy_files_with_list=GCSToGCSOperator(task_id="copy_files_with_list",source_bucket=BUCKET_NAME_SRC,source_objects=[PREFIX+OBJECT_1,PREFIX+OBJECT_2,],# Instead of files each element could be a wildcard expressiondestination_bucket=BUCKET_NAME_DST,destination_object="backup/",)# [END howto_operator_gcs_to_gcs_list]# [START howto_operator_gcs_to_gcs_single_file_move]move_single_file=GCSToGCSOperator(task_id="move_single_file",source_bucket=BUCKET_NAME_SRC,source_object=PREFIX+OBJECT_1,destination_bucket=BUCKET_NAME_DST,destination_object="backup_"+OBJECT_1,exact_match=True,move_object=True,)# [END howto_operator_gcs_to_gcs_single_file_move]# [START howto_operator_gcs_to_gcs_list_move]move_files_with_list=GCSToGCSOperator(task_id="move_files_with_list",source_bucket=BUCKET_NAME_SRC,source_objects=[PREFIX+OBJECT_1,PREFIX+OBJECT_2],destination_bucket=BUCKET_NAME_DST,destination_object="backup/",)# [END howto_operator_gcs_to_gcs_list_move]list_objects=GCSListObjectsOperator(task_id="list_objects",bucket=BUCKET_NAME_DST)assert_copied_files_exist=PythonOperator(task_id="assert_copied_files_exist",python_callable=_assert_copied_files_exist,)delete_bucket_src=GCSDeleteBucketOperator(task_id="delete_bucket_src",bucket_name=BUCKET_NAME_SRC,trigger_rule=TriggerRule.ALL_DONE)delete_bucket_dst=GCSDeleteBucketOperator(task_id="delete_bucket_dst",bucket_name=BUCKET_NAME_DST,trigger_rule=TriggerRule.ALL_DONE)@task(trigger_rule=TriggerRule.ALL_DONE)defdelete_work_dir(create_workdir_result:str)->None:shutil.rmtree(create_workdir_result)chain(# TEST SETUPcreate_workdir_task,generate_random_file,[create_bucket_src,create_bucket_dst],[upload_file_src,upload_file_src_sub],[upload_file_dst,upload_file_dst_sub],# TEST BODYsync_bucket,sync_full_bucket,sync_to_subdirectory,sync_from_subdirectory,copy_single_file,copy_files,copy_files_with_match_glob,copy_files_with_list,move_single_file,move_files_with_list,list_objects,assert_copied_files_exist,# TEST TEARDOWN[delete_bucket_src,delete_bucket_dst,delete_work_dir(create_workdir_task)],)fromtests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)