Source code for airflow.providers.google.cloud.example_dags.example_gcs
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG for Google Cloud Storage operators."""importosfromairflowimportmodelsfromairflow.operators.bashimportBashOperatorfromairflow.providers.google.cloud.operators.gcsimport(GCSBucketCreateAclEntryOperator,GCSCreateBucketOperator,GCSDeleteBucketOperator,GCSDeleteObjectsOperator,GCSFileTransformOperator,GCSListObjectsOperator,GCSObjectCreateAclEntryOperator,)fromairflow.providers.google.cloud.sensors.gcsimport(GCSObjectExistenceSensor,GCSObjectsWithPrefixExistenceSensor,)fromairflow.providers.google.cloud.transfers.gcs_to_gcsimportGCSToGCSOperatorfromairflow.providers.google.cloud.transfers.gcs_to_localimportGCSToLocalFilesystemOperatorfromairflow.providers.google.cloud.transfers.local_to_gcsimportLocalFilesystemToGCSOperatorfromairflow.utils.datesimportdays_agofromairflow.utils.stateimportStatePROJECT_ID=os.environ.get("GCP_PROJECT_ID","example-id")BUCKET_1=os.environ.get("GCP_GCS_BUCKET_1","test-gcs-example-bucket")GCS_ACL_ENTITY=os.environ.get("GCS_ACL_ENTITY","allUsers")GCS_ACL_BUCKET_ROLE="OWNER"GCS_ACL_OBJECT_ROLE="OWNER"BUCKET_2=os.environ.get("GCP_GCS_BUCKET_2","test-gcs-example-bucket-2")PATH_TO_TRANSFORM_SCRIPT=os.environ.get('GCP_GCS_PATH_TO_TRANSFORM_SCRIPT','test.py')PATH_TO_UPLOAD_FILE=os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE","test-gcs-example.txt")PATH_TO_UPLOAD_FILE_PREFIX=os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX","test-gcs-")PATH_TO_SAVED_FILE=os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE","test-gcs-example-download.txt")BUCKET_FILE_LOCATION=PATH_TO_UPLOAD_FILE.rpartition("/")[-1]withmodels.DAG("example_gcs",start_date=days_ago(1),schedule_interval='@once',tags=['example'],)asdag:create_bucket1=GCSCreateBucketOperator(task_id="create_bucket1",bucket_name=BUCKET_1,project_id=PROJECT_ID)create_bucket2=GCSCreateBucketOperator(task_id="create_bucket2",bucket_name=BUCKET_2,project_id=PROJECT_ID)list_buckets=GCSListObjectsOperator(task_id="list_buckets",bucket=BUCKET_1)list_buckets_result=BashOperator(task_id="list_buckets_result",bash_command=f"echo {list_buckets.output}",)upload_file=LocalFilesystemToGCSOperator(task_id="upload_file",src=PATH_TO_UPLOAD_FILE,dst=BUCKET_FILE_LOCATION,bucket=BUCKET_1,)transform_file=GCSFileTransformOperator(task_id="transform_file",source_bucket=BUCKET_1,source_object=BUCKET_FILE_LOCATION,transform_script=["python",PATH_TO_TRANSFORM_SCRIPT],)# [START howto_operator_gcs_bucket_create_acl_entry_task]gcs_bucket_create_acl_entry_task=GCSBucketCreateAclEntryOperator(bucket=BUCKET_1,entity=GCS_ACL_ENTITY,role=GCS_ACL_BUCKET_ROLE,task_id="gcs_bucket_create_acl_entry_task",)# [END howto_operator_gcs_bucket_create_acl_entry_task]# [START howto_operator_gcs_object_create_acl_entry_task]gcs_object_create_acl_entry_task=GCSObjectCreateAclEntryOperator(bucket=BUCKET_1,object_name=BUCKET_FILE_LOCATION,entity=GCS_ACL_ENTITY,role=GCS_ACL_OBJECT_ROLE,task_id="gcs_object_create_acl_entry_task",)# [END howto_operator_gcs_object_create_acl_entry_task]# [START howto_operator_gcs_download_file_task]download_file=GCSToLocalFilesystemOperator(task_id="download_file",object_name=BUCKET_FILE_LOCATION,bucket=BUCKET_1,filename=PATH_TO_SAVED_FILE,)# [END howto_operator_gcs_download_file_task]copy_file=GCSToGCSOperator(task_id="copy_file",source_bucket=BUCKET_1,source_object=BUCKET_FILE_LOCATION,destination_bucket=BUCKET_2,destination_object=BUCKET_FILE_LOCATION,)delete_files=GCSDeleteObjectsOperator(task_id="delete_files",bucket_name=BUCKET_1,objects=[BUCKET_FILE_LOCATION])# [START howto_operator_gcs_delete_bucket]delete_bucket_1=GCSDeleteBucketOperator(task_id="delete_bucket_1",bucket_name=BUCKET_1)delete_bucket_2=GCSDeleteBucketOperator(task_id="delete_bucket_2",bucket_name=BUCKET_2)# [END howto_operator_gcs_delete_bucket][create_bucket1,create_bucket2]>>list_buckets>>list_buckets_result[create_bucket1,create_bucket2]>>upload_fileupload_file>>[download_file,copy_file]upload_file>>gcs_bucket_create_acl_entry_task>>gcs_object_create_acl_entry_task>>delete_filescreate_bucket1>>delete_bucket_1create_bucket2>>delete_bucket_2create_bucket2>>copy_filecreate_bucket1>>copy_filelist_buckets>>delete_bucket_1upload_file>>delete_bucket_1create_bucket1>>upload_file>>delete_bucket_1upload_file>>transform_file>>delete_bucket_1gcs_bucket_create_acl_entry_task>>delete_bucket_1gcs_object_create_acl_entry_task>>delete_bucket_1download_file>>delete_bucket_1copy_file>>delete_bucket_1copy_file>>delete_bucket_2delete_files>>delete_bucket_1withmodels.DAG("example_gcs_sensors",start_date=days_ago(1),schedule_interval='@once',tags=['example'],)asdag2:create_bucket=GCSCreateBucketOperator(task_id="create_bucket",bucket_name=BUCKET_1,project_id=PROJECT_ID)upload_file=LocalFilesystemToGCSOperator(task_id="upload_file",src=PATH_TO_UPLOAD_FILE,dst=BUCKET_FILE_LOCATION,bucket=BUCKET_1,)# [START howto_sensor_object_exists_task]gcs_object_exists=GCSObjectExistenceSensor(bucket=BUCKET_1,object=PATH_TO_UPLOAD_FILE,mode='poke',task_id="gcs_object_exists_task",)# [END howto_sensor_object_exists_task]# [START howto_sensor_object_with_prefix_exists_task]gcs_object_with_prefix_exists=GCSObjectsWithPrefixExistenceSensor(bucket=BUCKET_1,prefix=PATH_TO_UPLOAD_FILE_PREFIX,mode='poke',task_id="gcs_object_with_prefix_exists_task",)# [END howto_sensor_object_with_prefix_exists_task]delete_bucket=GCSDeleteBucketOperator(task_id="delete_bucket",bucket_name=BUCKET_1)create_bucket>>upload_file>>[gcs_object_exists,gcs_object_with_prefix_exists]>>delete_bucketif__name__=='__main__':dag.clear(dag_run_state=State.NONE)dag.run()