Google Cloud Storage Operators¶
Cloud Storage allows world-wide storage and retrieval of any amount of data at any time. You can use Cloud Storage for a range of scenarios including serving website content, storing data for archival and disaster recovery, or distributing large data objects to users via direct download.
Operators¶
GCSToBigQueryOperator¶
Use the
GCSToBigQueryOperator
to execute a BigQuery load job.
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bigquery_example',
bucket='cloud-samples-data',
source_objects=['bigquery/us-states/us-states.csv'],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
schema_fields=[
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
dag=dag,
)
GCSTimeSpanFileTransformOperator¶
Use the
GCSTimeSpanFileTransformOperator
to transform files that were modified in a specific time span. The time span is defined
by the DAG instance logical execution timestamp (execution_date
, start of time span)
and the timestamp when the next DAG instance execution is scheduled (end of time span). If a DAG
does not have a next DAG instance scheduled, the time span end infinite, meaning the operator
processes all files older than execution_date
.
gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
task_id="gcs_timespan_transform_files",
source_bucket=SOURCE_BUCKET,
source_prefix=SOURCE_PREFIX,
source_gcp_conn_id=SOURCE_GCP_CONN_ID,
destination_bucket=DESTINATION_BUCKET,
destination_prefix=DESTINATION_PREFIX,
destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
transform_script=["python", PATH_TO_TRANSFORM_SCRIPT],
)
GCSBucketCreateAclEntryOperator¶
Creates a new ACL entry on the specified bucket.
For parameter definition, take a look at
GCSBucketCreateAclEntryOperator
Using the operator¶
gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
bucket=BUCKET_1,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_BUCKET_ROLE,
task_id="gcs_bucket_create_acl_entry_task",
)
Templating¶
template_fields = (
'bucket',
'entity',
'role',
'user_project',
'impersonation_chain',
)
More information¶
See Google Cloud Storage Documentation to create a new ACL entry for a bucket.
GCSObjectCreateAclEntryOperator¶
Creates a new ACL entry on the specified object.
For parameter definition, take a look at
GCSObjectCreateAclEntryOperator
Using the operator¶
gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
bucket=BUCKET_1,
object_name=BUCKET_FILE_LOCATION,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_OBJECT_ROLE,
task_id="gcs_object_create_acl_entry_task",
)
Templating¶
template_fields = (
'bucket',
'object_name',
'entity',
'generation',
'role',
'user_project',
'impersonation_chain',
)
More information¶
See Google Cloud Storage insert documentation to create a ACL entry for ObjectAccess.
Deleting Bucket¶
Deleting Bucket allows you to remove bucket object from the Google Cloud Storage.
It is performed through the
GCSDeleteBucketOperator
operator.
delete_bucket_1 = GCSDeleteBucketOperator(task_id="delete_bucket_1", bucket_name=BUCKET_1)
delete_bucket_2 = GCSDeleteBucketOperator(task_id="delete_bucket_2", bucket_name=BUCKET_2)
You can use Jinja templating with
bucket_name
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Sensors¶
GCSObjectExistenceSensor¶
Use the GCSObjectExistenceSensor
to wait (poll) for the existence of a file in Google Cloud Storage.
gcs_object_exists = GCSObjectExistenceSensor(
bucket=BUCKET_1,
object=PATH_TO_UPLOAD_FILE,
mode='poke',
task_id="gcs_object_exists_task",
)
GCSObjectsWithPrefixExistenceSensor¶
Use the GCSObjectsWithPrefixExistenceSensor
to wait (poll) for the existence of a file with a specified prefix in Google Cloud Storage.
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=BUCKET_1,
prefix=PATH_TO_UPLOAD_FILE_PREFIX,
mode='poke',
task_id="gcs_object_with_prefix_exists_task",
)
More information¶
Sensors have different modes that determine the behaviour of resources while the task is executing. See Airflow sensors documentation for best practices when using sensors.