Transfer data in Google Cloud Storage¶
The Google Cloud Storage (GCS) is used to store large data from various applications. Note that files are called objects in GCS terminology, so the use of the term “object” and “file” in this guide is interchangeable. There are several operators for whose purpose is to copy data as part of the Google Cloud Service. This page shows how to use these operators.
Cloud Storage Transfer Service¶
There are many operators that manage the Google Cloud Data Transfer service. If you want to create a new data transfer
task, use the operator
CloudDataTransferServiceCreateJobOperator
You can also use the previous operator for this service -
CloudDataTransferServiceGCSToGCSOperator
These operators do not control the copying process locally, but uses Google resources, which allows them to perform this task faster and more economically. The economic effects are especially prominent when Airflow is not hosted in Google Cloud, because these operators reduce egress traffic.
These operators modify source objects if the option that specifies whether objects should be deleted from the source after they are transferred to the sink is enabled.
When you use the Google Cloud Data Transfer service, you can specify whether overwriting objects that already exist in the sink is allowed, whether objects that exist only in the sink should be deleted, or whether objects should be deleted from the source after they are transferred to the sink.
Source objects can be specified using include and exclusion prefixes, as well as based on the file modification date.
If you need information on how to use it, look at the guide: Google Cloud Transfer Service Operators
Local transfer¶
There are two operators that are used to copy data, where the entire process is controlled locally.
In the next section they will be described.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Operators¶
GCSToGCSOperator¶
GCSToGCSOperator allows you to copy
one or more files within GCS. The files may be copied between two different buckets or within one bucket.
The copying always takes place without taking into account the initial state of the destination bucket.
This operator only deletes objects in the source bucket if the file move option is active. When copying files between two different buckets, this operator never deletes data in the destination bucket.
When you use this operator, you can specify whether objects should be deleted from the source after they are transferred to the sink. Source objects can be specified using a single wildcard, as well as based on the file modification date.
Filtering objects according to their path could be done by using the match_glob field.
You should avoid using the delimiter field nor a wildcard in the path of the source object(s), as both practices are deprecated.
Additionally, filtering could be achieved based on the file’s creation date (is_older_than) or modification date (last_modified_time and maximum_modified_time).
The way this operator works by default can be compared to the cp command. When the file move option is active, this
operator functions like the mv command.
Below are examples of using the GCSToGCSOperator to copy a single file, to copy multiple files with a wild card, to copy multiple files, to move a single file, and to move multiple files.
Copy single file¶
The following example would copy a single file, OBJECT_1 from the BUCKET_1_SRC GCS bucket to the BUCKET_1_DST bucket.
Note that if the flag exact_match=False then the source_object will be considered as a prefix for search objects
in the BUCKET_1_SRC GCS bucket. That’s why if any will be found, they will be copied as well. To prevent this from
happening, please use exact_match=False.
copy_single_file = GCSToGCSOperator(
    task_id="copy_single_gcs_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,  # If not supplied the source_bucket value will be used
    destination_object="backup_" + OBJECT_1,  # If not supplied the source_object value will be used
    exact_match=True,
)
Copy multiple files¶
There are several ways to copy multiple files, various examples of which are presented following.
copy_files_with_wildcard = GCSToGCSOperator(
    task_id="copy_files_with_wildcard",
    source_bucket=BUCKET_NAME_SRC,
    source_object="data/*.txt",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)
The source_object value may contain one wild card, denoted as “*”. All files matching the wild card expression will
be copied. In this example, all root level files ending with .txt in BUCKET_1_SRC will be copied to the data
folder in BUCKET_1_DST, with file names unchanged.
copy_files_with_delimiter = GCSToGCSOperator(
    task_id="copy_files_with_delimiter",
    source_bucket=BUCKET_NAME_SRC,
    source_object="data/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
    delimiter=".txt",
)
For source_objects with no wildcard, all files in source_objects would be listed, using provided delimiter if any. Then copy files from source_objects to destination_object and rename each source file.
As previously stated, the delimiter field, as well as utilizing a wildcard (*) in the source object(s),
are both deprecated. Thus, it is not recommended to use them - but to utilize match_glob instead, as follows:
copy_files_with_match_glob = GCSToGCSOperator(
    task_id="copy_files_with_match_glob",
    source_bucket=BUCKET_NAME_SRC,
    source_object="data/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
    match_glob="**/*.txt",
)
The following example would copy all the files in subdir/ folder (i.e subdir/a.csv, subdir/b.csv, subdir/c.csv) from
the BUCKET_1_SRC GCS bucket to the backup/ folder in BUCKET_1_DST bucket. (i.e backup/a.csv, backup/b.csv, backup/c.csv)
copy_files_without_wildcard = GCSToGCSOperator(
    task_id="copy_files_without_wildcard",
    source_bucket=BUCKET_NAME_SRC,
    source_object="subdir/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)
The delimiter field may be specified to select any source files starting with source_object and ending with the
value supplied to delimiter. This example uses the delimiter value to implement the same functionality as the
prior example.
copy_files_with_list = GCSToGCSOperator(
    task_id="copy_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[OBJECT_1, OBJECT_2],  # Instead of files each element could be a wildcard expression
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)
Lastly, files may be copied by omitting the source_object argument and instead supplying a list to source_objects
argument. In this example, OBJECT_1 and OBJECT_2 will be copied from BUCKET_1_SRC to BUCKET_1_DST. Instead
of specific file names, the list can contain one or more wild card expressions, each with no more than one wild card.
Supplying a list of size 1 functions the same as supplying a value to the source_object argument.
Move single file¶
Supplying True to the move argument causes the operator to delete source_object once the copy is complete.
Note that if the flag exact_match=False then the source_object will be considered as a prefix for search objects
in the BUCKET_1_SRC GCS bucket. That’s why if any will be found, they will be copied as well. To prevent this from
happening, please use exact_match=False.
move_single_file = GCSToGCSOperator(
    task_id="move_single_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup_" + OBJECT_1,
    exact_match=True,
    move_object=True,
)
Move multiple files¶
Multiple files may be moved by supplying True to the move argument. The same rules concerning wild cards and
the delimiter argument apply to moves as well as copies.
move_files_with_list = GCSToGCSOperator(
    task_id="move_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[OBJECT_1, OBJECT_2],
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)
GCSSynchronizeBuckets¶
The GCSSynchronizeBuckets
operator checks the initial state of the destination bucket, and then compares it with the source bucket.
Based on this, it creates an operation plan that describes which objects should be deleted from
the destination bucket, which should be overwritten, and which should be copied.
This operator never modifies data in the source bucket.
When you use this operator, you can specify whether overwriting objects that already exist in the sink is allowed, whether objects that exist only in the sink should be deleted, whether subdirectories are to be processed or which subdirectory is to be processed.
The way this operator works can be compared to the rsync command.
Basic Synchronization¶
The following example will ensure all files in BUCKET_1_SRC, including any in subdirectories, are also in
BUCKET_1_DST. It will not overwrite identically named files in BUCKET_1_DST if they already exist. It will not
delete any files in BUCKET_1_DST not in BUCKET_1_SRC.
sync_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)
Full Bucket Synchronization¶
This example will ensure all files in BUCKET_1_SRC, including any in subdirectories, are also in
BUCKET_1_DST. It will overwrite identically named files in BUCKET_1_DST if they already exist. It will
delete any files in BUCKET_1_DST not in BUCKET_1_SRC.
sync_full_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_full_bucket",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    delete_extra_files=True,
    allow_overwrite=True,
)
Synchronize to a Subdirectory¶
The following example will ensure all files in BUCKET_1_SRC, including any in subdirectories, are also in the
subdir folder in BUCKET_1_DST. It will not overwrite identically named files in BUCKET_1_DST/subdir if they
already exist and it will not delete any files in BUCKET_1_DST/subdir not in BUCKET_1_SRC.
sync_to_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_to_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="subdir/",
)
Synchronize from a Subdirectory¶
This example will ensure all files in BUCKET_1_SRC/subdir, including any in subdirectories, are also in the
in BUCKET_1_DST. It will not overwrite identically named files in BUCKET_1_DST if they
already exist and it will not delete any files in BUCKET_1_DST not in BUCKET_1_SRC/subdir.
sync_from_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_from_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    source_object="subdir/",
    destination_bucket=BUCKET_NAME_DST,
)
Reference¶
For further information, look at:
