Google Cloud Storage Operators¶
Cloud Storage allows world-wide storage and retrieval of any amount of data at any time. You can use Cloud Storage for a range of scenarios including serving website content, storing data for archival and disaster recovery, or distributing large data objects to users via direct download.
See Google Transfer Operators for a list of specialized transfer operators to and from Google Cloud Storage.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Operators¶
GCSTimeSpanFileTransformOperator¶
Use the
GCSTimeSpanFileTransformOperator
to transform files that were modified in a specific time span (the data interval).
The time span is defined by the time span’s start and end timestamps. If a DAG
does not have a next DAG instance scheduled, the time span end infinite, meaning the operator
processes all files older than data_interval_start
.
gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
task_id="gcs_timespan_transform_files",
source_bucket=BUCKET_NAME_SRC,
source_prefix=SOURCE_PREFIX,
source_gcp_conn_id=SOURCE_GCP_CONN_ID,
destination_bucket=BUCKET_NAME_DST,
destination_prefix=DESTINATION_PREFIX,
destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
transform_script=["python", TRANSFORM_SCRIPT_PATH],
)
GCSBucketCreateAclEntryOperator¶
Creates a new ACL entry on the specified bucket.
For parameter definition, take a look at
GCSBucketCreateAclEntryOperator
Using the operator¶
gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
bucket=BUCKET_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_BUCKET_ROLE,
task_id="gcs_bucket_create_acl_entry_task",
)
Templating¶
template_fields: Sequence[str] = (
"bucket",
"entity",
"role",
"user_project",
"impersonation_chain",
)
More information¶
See Google Cloud Storage Documentation to create a new ACL entry for a bucket.
GCSObjectCreateAclEntryOperator¶
Creates a new ACL entry on the specified object.
For parameter definition, take a look at
GCSObjectCreateAclEntryOperator
Using the operator¶
gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
bucket=BUCKET_NAME,
object_name=FILE_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_OBJECT_ROLE,
task_id="gcs_object_create_acl_entry_task",
)
Templating¶
template_fields: Sequence[str] = (
"bucket",
"object_name",
"entity",
"generation",
"role",
"user_project",
"impersonation_chain",
)
More information¶
See Google Cloud Storage insert documentation to create a ACL entry for ObjectAccess.
Deleting Bucket¶
Deleting Bucket allows you to remove bucket object from the Google Cloud Storage.
It is performed through the
GCSDeleteBucketOperator
operator.
delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME)
You can use Jinja templating with
bucket_name
, gcp_conn_id
, impersonation_chain
, user_project
parameters which allows you to dynamically determine values.
Sensors¶
GCSObjectExistenceSensor¶
Use the GCSObjectExistenceSensor
to wait (poll) for the existence of a file in Google Cloud Storage.
gcs_object_exists = GCSObjectExistenceSensor(
bucket=BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_exists_task",
)
Also you can use deferrable mode in this operator if you would like to free up the worker slots while the sensor is running.
gcs_object_exists_defered = GCSObjectExistenceSensor(
bucket=BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
)
GCSObjectExistenceAsyncSensor¶
GCSObjectExistenceAsyncSensor
is deprecated and will be removed in a future release. Please use GCSObjectExistenceSensor
and use the deferrable mode in that operator.
gcs_object_exists_async = GCSObjectExistenceAsyncSensor(
bucket=BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_exists_task_async",
)
GCSObjectsWithPrefixExistenceSensor¶
Use the GCSObjectsWithPrefixExistenceSensor
to wait (poll) for the existence of a file with a specified prefix in Google Cloud Storage.
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task",
)
You can set the deferrable
param to True if you want this sensor to run asynchronously, leading to more
efficient utilization of resources in your Airflow deployment. However the triggerer component needs to be enabled
for this functionality to work.
gcs_object_with_prefix_exists_async = GCSObjectsWithPrefixExistenceSensor(
bucket=BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task_async",
deferrable=True,
)
GCSUploadSessionCompleteSensor¶
Use the GCSUploadSessionCompleteSensor
to check for a change in the number of files with a specified prefix in Google Cloud Storage.
gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
bucket=BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_complete_task",
)
You can set the parameter deferrable
to True if you want the worker slots to be freed up while sensor is running.
gcs_upload_session_async_complete = GCSUploadSessionCompleteSensor(
bucket=BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_async_complete",
deferrable=True,
)
GCSObjectUpdateSensor¶
Use the GCSObjectUpdateSensor
to check if an object is updated in Google Cloud Storage.
gcs_update_object_exists = GCSObjectUpdateSensor(
bucket=BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task",
)
You can set the deferrable
param to True if you want this sensor to run asynchronously, leading to efficient
utilization of resources in your Airflow deployment. However the triggerer component needs to be enabled
for this functionality to work.
gcs_update_object_exists_async = GCSObjectUpdateSensor(
bucket=BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_update_sensor_task_async", deferrable=True
)
More information¶
Sensors have different modes that determine the behaviour of resources while the task is executing. See Airflow sensors documentation for best practices when using sensors.