Transfer data in Google Cloud Storage

The Google Cloud Storage (GCS) is used to store large data from various applications. Note that files are called objects in GCS terminology, so the use of the term "object" and "file" in this guide is interchangeable. There are several operators for whose purpose is to copy data as part of the Google Cloud Service. This page shows how to use these operators.

Overview

Cloud Storage Transfer Service

There are many operators that manage the Google Cloud Data Transfer service. If you want to create a new data transfer task, use the operator CloudDataTransferServiceCreateJobOperator You can also use the previous operator for this service - CloudDataTransferServiceGCSToGCSOperator

These operators do not control the copying process locally, but uses Google resources, which allows them to perform this task faster and more economically. The economic effects are especially prominent when Airflow is not hosted in Google Cloud, because these operators reduce egress traffic.

These operators modify source objects if the option that specifies whether objects should be deleted from the source after they are transferred to the sink is enabled.

When you use the Google Cloud Data Transfer service, you can specify whether overwriting objects that already exist in the sink is allowed, whether objects that exist only in the sink should be deleted, or whether objects should be deleted from the source after they are transferred to the sink.

Source objects can be specified using include and exclusion prefixes, as well as based on the file modification date.

If you need information on how to use it, look at the guide: Google Cloud Transfer Service Operators

Local transfer

There are two operators that are used to copy data, where the entire process is controlled locally.

In the next section they will be described.

Prerequisite Tasks

Operators

GCSToGCSOperator

GCSToGCSOperator allows you to copy one or more files within GCS. The files may be copied between two different buckets or within one bucket. The copying always takes place without taking into account the initial state of the destination bucket.

This operator only deletes objects in the source bucket if the file move option is active. When copying files between two different buckets, this operator never deletes data in the destination bucket.

When you use this operator, you can specify whether objects should be deleted from the source after they are transferred to the sink. Source objects can be specified using a single wildcard, as well as based on the file modification date.

The way this operator works by default can be compared to the cp command. When the file move option is active, this operator functions like the mv command.

Below are examples of using the GCSToGCSOperator to copy a single file, to copy multiple files with a wild card, to copy multiple files, to move a single file, and to move multiple files.

Copy single file

The following example would copy a single file, OBJECT_1 from the BUCKET_1_SRC GCS bucket to the BUCKET_1_DST bucket.

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

copy_single_file = GCSToGCSOperator(
    task_id="copy_single_gcs_file",
    source_bucket=BUCKET_1_SRC,
    source_object=OBJECT_1,
    destination_bucket=BUCKET_1_DST,  # If not supplied the source_bucket value will be used
    destination_object="backup_" + OBJECT_1,  # If not supplied the source_object value will be used
)

Copy multiple files

There are several ways to copy multiple files, various examples of which are presented following.

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

copy_files_with_wildcard = GCSToGCSOperator(
    task_id="copy_files_with_wildcard",
    source_bucket=BUCKET_1_SRC,
    source_object="data/*.txt",
    destination_bucket=BUCKET_1_DST,
    destination_object="backup/",
)

The source_object value may contain one wild card, denoted as "*". All files matching the wild card expression will be copied. In this example, all root level files ending with .txt in BUCKET_1_SRC will be copied to the data folder in BUCKET_1_DST, with file names unchanged.

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

copy_files_with_delimiter = GCSToGCSOperator(
    task_id="copy_files_with_delimiter",
    source_bucket=BUCKET_1_SRC,
    source_object="data/",
    destination_bucket=BUCKET_1_DST,
    destination_object="backup/",
    delimiter='.txt',
)

For source_objects with no wildcard, all files in source_objects would be listed, using provided delimiter if any. Then copy files from source_objects to destination_object and rename each source file.

The following example would copy all the files in subdir/ folder (i.e subdir/a.csv, subdir/b.csv, subdir/c.csv) from the BUCKET_1_SRC GCS bucket to the backup/ folder in BUCKET_1_DST bucket. (i.e backup/a.csv, backup/b.csv, backup/c.csv)

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

copy_files_without_wildcard = GCSToGCSOperator(
    task_id="copy_files_without_wildcard",
    source_bucket=BUCKET_1_SRC,
    source_object="subdir/",
    destination_bucket=BUCKET_1_DST,
    destination_object="backup/",
)

The delimiter filed may be specified to select any source files starting with source_object and ending with the value supplied to delimiter. This example uses the delimiter value to implement the same functionality as the prior example.

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

copy_files_with_list = GCSToGCSOperator(
    task_id="copy_files_with_list",
    source_bucket=BUCKET_1_SRC,
    source_objects=[OBJECT_1, OBJECT_2],  # Instead of files each element could be a wildcard expression
    destination_bucket=BUCKET_1_DST,
    destination_object="backup/",
)

Lastly, files may be copied by omitting the source_object argument and instead supplying a list to source_objects argument. In this example, OBJECT_1 and OBJECT_2 will be copied from BUCKET_1_SRC to BUCKET_1_DST. Instead of specific file names, the list can contain one or more wild card expressions, each with no more than one wild card. Supplying a list of size 1 functions the same as supplying a value to the source_object argument.

Move single file

Supplying True to the move argument causes the operator to delete source_object once the copy is complete.

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

move_single_file = GCSToGCSOperator(
    task_id="move_single_file",
    source_bucket=BUCKET_1_SRC,
    source_object=OBJECT_1,
    destination_bucket=BUCKET_1_DST,
    destination_object="backup_" + OBJECT_1,
    move_object=True,
)

Move multiple files

Multiple files may be moved by supplying True to the move argument. The same rules concerning wild cards and the delimiter argument apply to moves as well as copies.

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

move_files_with_list = GCSToGCSOperator(
    task_id="move_files_with_list",
    source_bucket=BUCKET_1_SRC,
    source_objects=[OBJECT_1, OBJECT_2],
    destination_bucket=BUCKET_1_DST,
    destination_object="backup/",
)

GCSSynchronizeBuckets

The GCSSynchronizeBuckets operator checks the initial state of the destination bucket, and then compares it with the source bucket. Based on this, it creates an operation plan that describes which objects should be deleted from the destination bucket, which should be overwritten, and which should be copied.

This operator never modifies data in the source bucket.

When you use this operator, you can specify whether overwriting objects that already exist in the sink is allowed, whether objects that exist only in the sink should be deleted, whether subdirectories are to be processed or which subdirectory is to be processed.

The way this operator works can be compared to the rsync command.

Basic Synchronization

The following example will ensure all files in BUCKET_1_SRC, including any in subdirectories, are also in BUCKET_1_DST. It will not overwrite identically named files in BUCKET_1_DST if they already exist. It will not delete any files in BUCKET_1_DST not in BUCKET_1_SRC.

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

sync_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_bucket", source_bucket=BUCKET_1_SRC, destination_bucket=BUCKET_1_DST
)

Full Bucket Synchronization

This example will ensure all files in BUCKET_1_SRC, including any in subdirectories, are also in BUCKET_1_DST. It will overwrite identically named files in BUCKET_1_DST if they already exist. It will delete any files in BUCKET_1_DST not in BUCKET_1_SRC.

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

sync_full_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_full_bucket",
    source_bucket=BUCKET_1_SRC,
    destination_bucket=BUCKET_1_DST,
    delete_extra_files=True,
    allow_overwrite=True,
)

Synchronize to a Subdirectory

The following example will ensure all files in BUCKET_1_SRC, including any in subdirectories, are also in the subdir folder in BUCKET_1_DST. It will not overwrite identically named files in BUCKET_1_DST/subdir if they already exist and it will not delete any files in BUCKET_1_DST/subdir not in BUCKET_1_SRC.

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

sync_to_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_to_subdirectory",
    source_bucket=BUCKET_1_SRC,
    destination_bucket=BUCKET_1_DST,
    destination_object="subdir/",
)

Synchronize from a Subdirectory

This example will ensure all files in BUCKET_1_SRC/subdir, including any in subdirectories, are also in the in BUCKET_1_DST. It will not overwrite identically named files in BUCKET_1_DST if they already exist and it will not delete any files in BUCKET_1_DST not in BUCKET_1_SRC/subdir.

airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.pyView Source

sync_from_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_from_subdirectory",
    source_bucket=BUCKET_1_SRC,
    source_object="subdir/",
    destination_bucket=BUCKET_1_DST,
)

Was this entry helpful?