Google Cloud Storage Operators

Cloud Storage allows world-wide storage and retrieval of any amount of data at any time. You can use Cloud Storage for a range of scenarios including serving website content, storing data for archival and disaster recovery, or distributing large data objects to users via direct download.

Prerequisite Tasks

Operators

GCSToBigQueryOperator

Use the GCSToBigQueryOperator to execute a BigQuery load job to load existing dataset from Google Cloud Storage to BigQuery table.

tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery.py[source]

load_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.csv"],
    destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
    schema_fields=[
        {"name": "name", "type": "STRING", "mode": "NULLABLE"},
        {"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
    ],
    write_disposition="WRITE_TRUNCATE",
)

Also you can use GCSToBigQueryOperator in the deferrable mode:

tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py[source]

load_string_based_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_str_csv_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.csv"],
    destination_project_dataset_table=f"{DATASET_NAME_STR}.{TABLE_NAME_STR}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key=MAX_ID_STR,
    deferrable=True,
)

load_date_based_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_date_csv_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states-by-date.csv"],
    destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key=MAX_ID_DATE,
    deferrable=True,
)

GCSTimeSpanFileTransformOperator

Use the GCSTimeSpanFileTransformOperator to transform files that were modified in a specific time span (the data interval). The time span is defined by the time span’s start and end timestamps. If a DAG does not have a next DAG instance scheduled, the time span end infinite, meaning the operator processes all files older than data_interval_start.

tests/system/providers/google/cloud/gcs/example_gcs_transform_timespan.py[source]

gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
    task_id="gcs_timespan_transform_files",
    source_bucket=BUCKET_NAME_SRC,
    source_prefix=SOURCE_PREFIX,
    source_gcp_conn_id=SOURCE_GCP_CONN_ID,
    destination_bucket=BUCKET_NAME_DST,
    destination_prefix=DESTINATION_PREFIX,
    destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
    transform_script=["python", TRANSFORM_SCRIPT_PATH],
)

GCSBucketCreateAclEntryOperator

Creates a new ACL entry on the specified bucket.

For parameter definition, take a look at GCSBucketCreateAclEntryOperator

Using the operator

tests/system/providers/google/cloud/gcs/example_gcs_acl.py[source]

gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
    bucket=BUCKET_NAME,
    entity=GCS_ACL_ENTITY,
    role=GCS_ACL_BUCKET_ROLE,
    task_id="gcs_bucket_create_acl_entry_task",
)

Templating

template_fields: Sequence[str] = (
    "bucket",
    "entity",
    "role",
    "user_project",
    "impersonation_chain",
)

More information

See Google Cloud Storage Documentation to create a new ACL entry for a bucket.

GCSObjectCreateAclEntryOperator

Creates a new ACL entry on the specified object.

For parameter definition, take a look at GCSObjectCreateAclEntryOperator

Using the operator

tests/system/providers/google/cloud/gcs/example_gcs_acl.py[source]

gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
    bucket=BUCKET_NAME,
    object_name=FILE_NAME,
    entity=GCS_ACL_ENTITY,
    role=GCS_ACL_OBJECT_ROLE,
    task_id="gcs_object_create_acl_entry_task",
)

Templating

template_fields: Sequence[str] = (
    "bucket",
    "object_name",
    "entity",
    "generation",
    "role",
    "user_project",
    "impersonation_chain",
)

More information

See Google Cloud Storage insert documentation to create a ACL entry for ObjectAccess.

Deleting Bucket

Deleting Bucket allows you to remove bucket object from the Google Cloud Storage. It is performed through the GCSDeleteBucketOperator operator.

tests/system/providers/google/cloud/gcs/example_gcs_upload_download.py[source]

delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME)

You can use Jinja templating with bucket_name, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Reference

For further information, look at:

Sensors

GCSObjectExistenceSensor

Use the GCSObjectExistenceSensor to wait (poll) for the existence of a file in Google Cloud Storage.

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_object_exists = GCSObjectExistenceSensor(
    bucket=BUCKET_NAME,
    object=FILE_NAME,
    task_id="gcs_object_exists_task",
)

GCSObjectsWithPrefixExistenceSensor

Use the GCSObjectsWithPrefixExistenceSensor to wait (poll) for the existence of a file with a specified prefix in Google Cloud Storage.

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
    bucket=BUCKET_NAME,
    prefix=FILE_NAME[:5],
    task_id="gcs_object_with_prefix_exists_task",
)

GCSUploadSessionCompleteSensor

Use the GCSUploadSessionCompleteSensor to check for a change in the number of files with a specified prefix in Google Cloud Storage.

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
    bucket=BUCKET_NAME,
    prefix=FILE_NAME,
    inactivity_period=15,
    min_objects=1,
    allow_delete=True,
    previous_objects=set(),
    task_id="gcs_upload_session_complete_task",
)

GCSObjectUpdateSensor

Use the GCSObjectUpdateSensor to check if an object is updated in Google Cloud Storage.

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[source]

gcs_update_object_exists = GCSObjectUpdateSensor(
    bucket=BUCKET_NAME,
    object=FILE_NAME,
    task_id="gcs_object_update_sensor_task",
)

More information

Sensors have different modes that determine the behaviour of resources while the task is executing. See Airflow sensors documentation for best practices when using sensors.

Reference

For further information, look at:

Was this entry helpful?