Transfer data in Google Cloud Storage¶
The Google Cloud Storage (GCS) is used to store large data from various applications. Note that files are called objects in GCS terminology, so the use of the term "object" and "file" in this guide is interchangeable. There are several operators for whose purpose is to copy data as part of the Google Cloud Service. This page shows how to use these operators.
Overview¶
Cloud Storage Transfer Service¶
There are many operators that manage the Google Cloud Data Transfer service. If you want to create a new data transfer
task, use the operator
CloudDataTransferServiceCreateJobOperator
You can also use the previous operator for this service -
CloudDataTransferServiceGCSToGCSOperator
These operators do not control the copying process locally, but uses Google resources, which allows them to perform this task faster and more economically. The economic effects are especially prominent when Airflow is not hosted in Google Cloud, because these operators reduce egress traffic.
These operators modify source objects if the option that specifies whether objects should be deleted from the source after they are transferred to the sink is enabled.
When you use the Google Cloud Data Transfer service, you can specify whether overwriting objects that already exist in the sink is allowed, whether objects that exist only in the sink should be deleted, or whether objects should be deleted from the source after they are transferred to the sink.
Source objects can be specified using include and exclusion prefixes, as well as based on the file modification date.
If you need information on how to use it, look at the guide: Google Cloud Transfer Service Operators
Local transfer¶
There are two operators that are used to copy data, where the entire process is controlled locally.
In the next section they will be described.
Prerequisite Tasks¶
Operators¶
GCSToGCSOperator¶
GCSToGCSOperator
allows you to copy
one or more files within GCS. The files may be copied between two different buckets or within one bucket.
The copying always takes place without taking into account the initial state of the destination bucket.
This operator only deletes objects in the source bucket if the file move option is active. When copying files between two different buckets, this operator never deletes data in the destination bucket.
When you use this operator, you can specify whether objects should be deleted from the source after they are transferred to the sink. Source objects can be specified using a single wildcard, as well as based on the file modification date.
The way this operator works by default can be compared to the cp
command. When the file move option is active, this
operator functions like the mv
command.
Below are examples of using the GCSToGCSOperator to copy a single file, to copy multiple files with a wild card, to copy multiple files, to move a single file, and to move multiple files.
Copy single file¶
The following example would copy a single file, OBJECT_1
from the BUCKET_1_SRC
GCS bucket to the BUCKET_1_DST
bucket.
tests/system/providers/google/gcs/example_gcs_to_gcs.py
copy_single_file = GCSToGCSOperator(
task_id="copy_single_gcs_file",
source_bucket=BUCKET_NAME_SRC,
source_object=OBJECT_1,
destination_bucket=BUCKET_NAME_DST, # If not supplied the source_bucket value will be used
destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used
)
Copy multiple files¶
There are several ways to copy multiple files, various examples of which are presented following.
tests/system/providers/google/gcs/example_gcs_to_gcs.py
copy_files_with_wildcard = GCSToGCSOperator(
task_id="copy_files_with_wildcard",
source_bucket=BUCKET_NAME_SRC,
source_object="data/*.txt",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
The source_object
value may contain one wild card, denoted as "*". All files matching the wild card expression will
be copied. In this example, all root level files ending with .txt
in BUCKET_1_SRC
will be copied to the data
folder in BUCKET_1_DST
, with file names unchanged.
tests/system/providers/google/gcs/example_gcs_to_gcs.py
copy_files_with_delimiter = GCSToGCSOperator(
task_id="copy_files_with_delimiter",
source_bucket=BUCKET_NAME_SRC,
source_object="data/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
delimiter='.txt',
)
For source_objects with no wildcard, all files in source_objects would be listed, using provided delimiter if any. Then copy files from source_objects to destination_object and rename each source file.
The following example would copy all the files in subdir/
folder (i.e subdir/a.csv, subdir/b.csv, subdir/c.csv) from
the BUCKET_1_SRC
GCS bucket to the backup/
folder in BUCKET_1_DST
bucket. (i.e backup/a.csv, backup/b.csv, backup/c.csv)
tests/system/providers/google/gcs/example_gcs_to_gcs.py
copy_files_without_wildcard = GCSToGCSOperator(
task_id="copy_files_without_wildcard",
source_bucket=BUCKET_NAME_SRC,
source_object="subdir/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
The delimiter filed may be specified to select any source files starting with source_object
and ending with the
value supplied to delimiter
. This example uses the delimiter
value to implement the same functionality as the
prior example.
tests/system/providers/google/gcs/example_gcs_to_gcs.py
copy_files_with_list = GCSToGCSOperator(
task_id="copy_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[OBJECT_1, OBJECT_2], # Instead of files each element could be a wildcard expression
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
Lastly, files may be copied by omitting the source_object
argument and instead supplying a list to source_objects
argument. In this example, OBJECT_1
and OBJECT_2
will be copied from BUCKET_1_SRC
to BUCKET_1_DST
. Instead
of specific file names, the list can contain one or more wild card expressions, each with no more than one wild card.
Supplying a list of size 1 functions the same as supplying a value to the source_object
argument.
Move single file¶
Supplying True
to the move
argument causes the operator to delete source_object
once the copy is complete.
tests/system/providers/google/gcs/example_gcs_to_gcs.py
move_single_file = GCSToGCSOperator(
task_id="move_single_file",
source_bucket=BUCKET_NAME_SRC,
source_object=OBJECT_1,
destination_bucket=BUCKET_NAME_DST,
destination_object="backup_" + OBJECT_1,
move_object=True,
)
Move multiple files¶
Multiple files may be moved by supplying True
to the move
argument. The same rules concerning wild cards and
the delimiter
argument apply to moves as well as copies.
tests/system/providers/google/gcs/example_gcs_to_gcs.py
move_files_with_list = GCSToGCSOperator(
task_id="move_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[OBJECT_1, OBJECT_2],
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
GCSSynchronizeBuckets¶
The GCSSynchronizeBuckets
operator checks the initial state of the destination bucket, and then compares it with the source bucket.
Based on this, it creates an operation plan that describes which objects should be deleted from
the destination bucket, which should be overwritten, and which should be copied.
This operator never modifies data in the source bucket.
When you use this operator, you can specify whether overwriting objects that already exist in the sink is allowed, whether objects that exist only in the sink should be deleted, whether subdirectories are to be processed or which subdirectory is to be processed.
The way this operator works can be compared to the rsync
command.
Basic Synchronization¶
The following example will ensure all files in BUCKET_1_SRC
, including any in subdirectories, are also in
BUCKET_1_DST
. It will not overwrite identically named files in BUCKET_1_DST
if they already exist. It will not
delete any files in BUCKET_1_DST
not in BUCKET_1_SRC
.
tests/system/providers/google/gcs/example_gcs_to_gcs.py
sync_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)
Full Bucket Synchronization¶
This example will ensure all files in BUCKET_1_SRC
, including any in subdirectories, are also in
BUCKET_1_DST
. It will overwrite identically named files in BUCKET_1_DST
if they already exist. It will
delete any files in BUCKET_1_DST
not in BUCKET_1_SRC
.
tests/system/providers/google/gcs/example_gcs_to_gcs.py
sync_full_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_full_bucket",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
delete_extra_files=True,
allow_overwrite=True,
)
Synchronize to a Subdirectory¶
The following example will ensure all files in BUCKET_1_SRC
, including any in subdirectories, are also in the
subdir
folder in BUCKET_1_DST
. It will not overwrite identically named files in BUCKET_1_DST/subdir
if they
already exist and it will not delete any files in BUCKET_1_DST/subdir
not in BUCKET_1_SRC
.
tests/system/providers/google/gcs/example_gcs_to_gcs.py
sync_to_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_to_subdirectory",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
destination_object="subdir/",
)
Synchronize from a Subdirectory¶
This example will ensure all files in BUCKET_1_SRC/subdir
, including any in subdirectories, are also in the
in BUCKET_1_DST
. It will not overwrite identically named files in BUCKET_1_DST
if they
already exist and it will not delete any files in BUCKET_1_DST
not in BUCKET_1_SRC/subdir
.
tests/system/providers/google/gcs/example_gcs_to_gcs.py
sync_from_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_from_subdirectory",
source_bucket=BUCKET_NAME_SRC,
source_object="subdir/",
destination_bucket=BUCKET_NAME_DST,
)
Reference¶
For further information, look at: