#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google Cloud Storage operators.
"""
import os
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.gcs import (
GCSBucketCreateAclEntryOperator,
GCSCreateBucketOperator,
GCSDeleteBucketOperator,
GCSDeleteObjectsOperator,
GCSFileTransformOperator,
GCSListObjectsOperator,
GCSObjectCreateAclEntryOperator,
)
from airflow.providers.google.cloud.sensors.gcs import (
GCSObjectExistenceSensor,
GCSObjectsWithPrefixExistenceSensor,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils.dates import days_ago
from airflow.utils.state import State
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
GCS_ACL_BUCKET_ROLE = "OWNER"
GCS_ACL_OBJECT_ROLE = "OWNER"
BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
PATH_TO_TRANSFORM_SCRIPT = os.environ.get('GCP_GCS_PATH_TO_TRANSFORM_SCRIPT', 'test.py')
PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt")
PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt")
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
with models.DAG(
"example_gcs",
start_date=days_ago(1),
schedule_interval=None,
tags=['example'],
) as dag:
create_bucket1 = GCSCreateBucketOperator(
task_id="create_bucket1", bucket_name=BUCKET_1, project_id=PROJECT_ID
)
create_bucket2 = GCSCreateBucketOperator(
task_id="create_bucket2", bucket_name=BUCKET_2, project_id=PROJECT_ID
)
list_buckets = GCSListObjectsOperator(task_id="list_buckets", bucket=BUCKET_1)
list_buckets_result = BashOperator(
task_id="list_buckets_result",
bash_command="echo \"{{ task_instance.xcom_pull('list_buckets') }}\"",
)
upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
src=PATH_TO_UPLOAD_FILE,
dst=BUCKET_FILE_LOCATION,
bucket=BUCKET_1,
)
transform_file = GCSFileTransformOperator(
task_id="transform_file",
source_bucket=BUCKET_1,
source_object=BUCKET_FILE_LOCATION,
transform_script=["python", PATH_TO_TRANSFORM_SCRIPT],
)
# [START howto_operator_gcs_bucket_create_acl_entry_task]
gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
bucket=BUCKET_1,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_BUCKET_ROLE,
task_id="gcs_bucket_create_acl_entry_task",
)
# [END howto_operator_gcs_bucket_create_acl_entry_task]
# [START howto_operator_gcs_object_create_acl_entry_task]
gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
bucket=BUCKET_1,
object_name=BUCKET_FILE_LOCATION,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_OBJECT_ROLE,
task_id="gcs_object_create_acl_entry_task",
)
# [END howto_operator_gcs_object_create_acl_entry_task]
# [START howto_operator_gcs_download_file_task]
download_file = GCSToLocalFilesystemOperator(
task_id="download_file",
object_name=BUCKET_FILE_LOCATION,
bucket=BUCKET_1,
filename=PATH_TO_SAVED_FILE,
)
# [END howto_operator_gcs_download_file_task]
copy_file = GCSToGCSOperator(
task_id="copy_file",
source_bucket=BUCKET_1,
source_object=BUCKET_FILE_LOCATION,
destination_bucket=BUCKET_2,
destination_object=BUCKET_FILE_LOCATION,
)
delete_files = GCSDeleteObjectsOperator(
task_id="delete_files", bucket_name=BUCKET_1, objects=[BUCKET_FILE_LOCATION]
)
# [START howto_operator_gcs_delete_bucket]
delete_bucket_1 = GCSDeleteBucketOperator(task_id="delete_bucket_1", bucket_name=BUCKET_1)
delete_bucket_2 = GCSDeleteBucketOperator(task_id="delete_bucket_2", bucket_name=BUCKET_2)
# [END howto_operator_gcs_delete_bucket]
[create_bucket1, create_bucket2] >> list_buckets >> list_buckets_result
[create_bucket1, create_bucket2] >> upload_file
upload_file >> [download_file, copy_file]
upload_file >> gcs_bucket_create_acl_entry_task >> gcs_object_create_acl_entry_task >> delete_files
create_bucket1 >> delete_bucket_1
create_bucket2 >> delete_bucket_2
create_bucket2 >> copy_file
create_bucket1 >> copy_file
list_buckets >> delete_bucket_1
upload_file >> delete_bucket_1
create_bucket1 >> upload_file >> delete_bucket_1
upload_file >> transform_file >> delete_bucket_1
gcs_bucket_create_acl_entry_task >> delete_bucket_1
gcs_object_create_acl_entry_task >> delete_bucket_1
download_file >> delete_bucket_1
copy_file >> delete_bucket_1
copy_file >> delete_bucket_2
delete_files >> delete_bucket_1
with models.DAG(
"example_gcs_sensors",
start_date=days_ago(1),
schedule_interval=None,
tags=['example'],
) as dag2:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_1, project_id=PROJECT_ID
)
upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
src=PATH_TO_UPLOAD_FILE,
dst=BUCKET_FILE_LOCATION,
bucket=BUCKET_1,
)
# [START howto_sensor_object_exists_task]
gcs_object_exists = GCSObjectExistenceSensor(
bucket=BUCKET_1,
object=PATH_TO_UPLOAD_FILE,
mode='poke',
task_id="gcs_object_exists_task",
)
# [END howto_sensor_object_exists_task]
# [START howto_sensor_object_with_prefix_exists_task]
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=BUCKET_1,
prefix=PATH_TO_UPLOAD_FILE_PREFIX,
mode='poke',
task_id="gcs_object_with_prefix_exists_task",
)
# [END howto_sensor_object_with_prefix_exists_task]
delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1)
create_bucket >> upload_file >> [gcs_object_exists, gcs_object_with_prefix_exists] >> delete_bucket
if __name__ == '__main__':
dag.clear(dag_run_state=State.NONE)
dag.run()