Source code for airflow.providers.google.cloud.example_dags.example_gcs_timespan_file_transform

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG for Google Cloud Storage time-span file transform operator.
"""

import os

from airflow import models
from airflow.providers.google.cloud.operators.gcs import GCSTimeSpanFileTransformOperator
from airflow.utils.dates import days_ago
from airflow.utils.state import State

SOURCE_BUCKET = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
SOURCE_PREFIX = "gcs_timespan_file_transform_source"
SOURCE_GCP_CONN_ID = "google_cloud_default"
DESTINATION_BUCKET = SOURCE_BUCKET
DESTINATION_PREFIX = "gcs_timespan_file_transform_destination"
DESTINATION_GCP_CONN_ID = "google_cloud_default"

PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
    'GCP_GCS_PATH_TO_TRANSFORM_SCRIPT', 'test_gcs_timespan_transform_script.py'
)


with models.DAG(
    "example_gcs_timespan_file_transform",
    start_date=days_ago(1),
    schedule_interval='@once',
    tags=['example'],
) as dag:

    # [START howto_operator_gcs_timespan_file_transform_operator_Task]
    gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
        task_id="gcs_timespan_transform_files",
        source_bucket=SOURCE_BUCKET,
        source_prefix=SOURCE_PREFIX,
        source_gcp_conn_id=SOURCE_GCP_CONN_ID,
        destination_bucket=DESTINATION_BUCKET,
        destination_prefix=DESTINATION_PREFIX,
        destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
        transform_script=["python", PATH_TO_TRANSFORM_SCRIPT],
    )
    # [END howto_operator_gcs_timespan_file_transform_operator_Task]


if __name__ == '__main__':
    dag.clear(dag_run_state=State.NONE)
    dag.run()

Was this entry helpful?