Source code for airflow.providers.google.cloud.example_dags.example_gcs_to_gcs

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google Cloud Storage to Google Cloud Storage transfer operators.
"""

import os

from airflow import models
from airflow.providers.google.cloud.operators.gcs import GCSSynchronizeBucketsOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.dates import days_ago

BUCKET_1_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sync-1-src")
BUCKET_1_DST = os.environ.get("GCP_GCS_BUCKET_1_DST", "test-gcs-sync-1-dst")

BUCKET_2_SRC = os.environ.get("GCP_GCS_BUCKET_2_SRC", "test-gcs-sync-2-src")
BUCKET_2_DST = os.environ.get("GCP_GCS_BUCKET_2_DST", "test-gcs-sync-2-dst")

BUCKET_3_SRC = os.environ.get("GCP_GCS_BUCKET_3_SRC", "test-gcs-sync-3-src")
BUCKET_3_DST = os.environ.get("GCP_GCS_BUCKET_3_DST", "test-gcs-sync-3-dst")

OBJECT_1 = os.environ.get("GCP_GCS_OBJECT_1", "test-gcs-to-gcs-1")
OBJECT_2 = os.environ.get("GCP_GCS_OBJECT_2", "test-gcs-to-gcs-2")

with models.DAG(
    "example_gcs_to_gcs", schedule_interval='@once', start_date=days_ago(1), tags=['example']
) as dag:
    # [START howto_synch_bucket]
    sync_bucket = GCSSynchronizeBucketsOperator(
        task_id="sync_bucket", source_bucket=BUCKET_1_SRC, destination_bucket=BUCKET_1_DST
    )
    # [END howto_synch_bucket]

    # [START howto_synch_full_bucket]
    sync_full_bucket = GCSSynchronizeBucketsOperator(
        task_id="sync_full_bucket",
        source_bucket=BUCKET_1_SRC,
        destination_bucket=BUCKET_1_DST,
        delete_extra_files=True,
        allow_overwrite=True,
    )
    # [END howto_synch_full_bucket]

    # [START howto_synch_to_subdir]
    sync_to_subdirectory = GCSSynchronizeBucketsOperator(
        task_id="sync_to_subdirectory",
        source_bucket=BUCKET_1_SRC,
        destination_bucket=BUCKET_1_DST,
        destination_object="subdir/",
    )
    # [END howto_synch_to_subdir]

    # [START howto_sync_from_subdir]
    sync_from_subdirectory = GCSSynchronizeBucketsOperator(
        task_id="sync_from_subdirectory",
        source_bucket=BUCKET_1_SRC,
        source_object="subdir/",
        destination_bucket=BUCKET_1_DST,
    )
    # [END howto_sync_from_subdir]

    # [START howto_operator_gcs_to_gcs_single_file]
    copy_single_file = GCSToGCSOperator(
        task_id="copy_single_gcs_file",
        source_bucket=BUCKET_1_SRC,
        source_object=OBJECT_1,
        destination_bucket=BUCKET_1_DST,  # If not supplied the source_bucket value will be used
        destination_object="backup_" + OBJECT_1,  # If not supplied the source_object value will be used
    )
    # [END howto_operator_gcs_to_gcs_single_file]

    # [START howto_operator_gcs_to_gcs_wildcard]
    copy_files_with_wildcard = GCSToGCSOperator(
        task_id="copy_files_with_wildcard",
        source_bucket=BUCKET_1_SRC,
        source_object="data/*.txt",
        destination_bucket=BUCKET_1_DST,
        destination_object="backup/",
    )
    # [END howto_operator_gcs_to_gcs_wildcard]

    # [START howto_operator_gcs_to_gcs_without_wildcard]
    copy_files_without_wildcard = GCSToGCSOperator(
        task_id="copy_files_without_wildcard",
        source_bucket=BUCKET_1_SRC,
        source_object="subdir/",
        destination_bucket=BUCKET_1_DST,
        destination_object="backup/",
    )
    # [END howto_operator_gcs_to_gcs_without_wildcard]

    # [START howto_operator_gcs_to_gcs_delimiter]
    copy_files_with_delimiter = GCSToGCSOperator(
        task_id="copy_files_with_delimiter",
        source_bucket=BUCKET_1_SRC,
        source_object="data/",
        destination_bucket=BUCKET_1_DST,
        destination_object="backup/",
        delimiter='.txt',
    )
    # [END howto_operator_gcs_to_gcs_delimiter]

    # [START howto_operator_gcs_to_gcs_list]
    copy_files_with_list = GCSToGCSOperator(
        task_id="copy_files_with_list",
        source_bucket=BUCKET_1_SRC,
        source_objects=[OBJECT_1, OBJECT_2],  # Instead of files each element could be a wildcard expression
        destination_bucket=BUCKET_1_DST,
        destination_object="backup/",
    )
    # [END howto_operator_gcs_to_gcs_list]

    # [START howto_operator_gcs_to_gcs_single_file_move]
    move_single_file = GCSToGCSOperator(
        task_id="move_single_file",
        source_bucket=BUCKET_1_SRC,
        source_object=OBJECT_1,
        destination_bucket=BUCKET_1_DST,
        destination_object="backup_" + OBJECT_1,
        move_object=True,
    )
    # [END howto_operator_gcs_to_gcs_single_file_move]

    # [START howto_operator_gcs_to_gcs_list_move]
    move_files_with_list = GCSToGCSOperator(
        task_id="move_files_with_list",
        source_bucket=BUCKET_1_SRC,
        source_objects=[OBJECT_1, OBJECT_2],
        destination_bucket=BUCKET_1_DST,
        destination_object="backup/",
    )
    # [END howto_operator_gcs_to_gcs_list_move]

Was this entry helpful?