Source code for airflow.providers.google.cloud.example_dags.example_cloud_build

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG that displays interactions with Google Cloud Build.

This DAG relies on the following OS environment variables:

* GCP_PROJECT_ID - Google Cloud Project to use for the Cloud Function.
* GCP_CLOUD_BUILD_ARCHIVE_URL - Path to the zipped source in Google Cloud Storage.
    This object must be a gzipped archive file (.tar.gz) containing source to build.
* GCP_CLOUD_BUILD_REPOSITORY_NAME - Name of the Cloud Source Repository.

"""

import os
from pathlib import Path

from future.backports.urllib.parse import urlparse

from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.cloud_build import (
    CloudBuildCancelBuildOperator,
    CloudBuildCreateBuildOperator,
    CloudBuildCreateBuildTriggerOperator,
    CloudBuildDeleteBuildTriggerOperator,
    CloudBuildGetBuildOperator,
    CloudBuildGetBuildTriggerOperator,
    CloudBuildListBuildsOperator,
    CloudBuildListBuildTriggersOperator,
    CloudBuildRetryBuildOperator,
    CloudBuildRunBuildTriggerOperator,
    CloudBuildUpdateBuildTriggerOperator,
)
from airflow.utils import dates

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "aitflow-test-project")

GCP_SOURCE_ARCHIVE_URL = os.environ.get("GCP_CLOUD_BUILD_ARCHIVE_URL", "gs://airflow-test-bucket/file.tar.gz")
GCP_SOURCE_REPOSITORY_NAME = os.environ.get("GCP_CLOUD_BUILD_REPOSITORY_NAME", "airflow-test-repo")

GCP_SOURCE_ARCHIVE_URL_PARTS = urlparse(GCP_SOURCE_ARCHIVE_URL)
GCP_SOURCE_BUCKET_NAME = GCP_SOURCE_ARCHIVE_URL_PARTS.netloc

CURRENT_FOLDER = Path(__file__).parent

# [START howto_operator_gcp_create_build_trigger_body]
create_build_trigger_body = {
    "name": "test-cloud-build-trigger",
    "trigger_template": {
        "project_id": GCP_PROJECT_ID,
        "repo_name": GCP_SOURCE_REPOSITORY_NAME,
        "branch_name": "master",
    },
    "filename": "cloudbuild.yaml",
}
# [END howto_operator_gcp_create_build_trigger_body]

update_build_trigger_body = {
    "name": "test-cloud-build-trigger",
    "trigger_template": {
        "project_id": GCP_PROJECT_ID,
        "repo_name": GCP_SOURCE_REPOSITORY_NAME,
        "branch_name": "dev",
    },
    "filename": "cloudbuild.yaml",
}

# [START howto_operator_gcp_create_build_from_storage_body]
create_build_from_storage_body = {
    "source": {"storage_source": GCP_SOURCE_ARCHIVE_URL},
    "steps": [
        {
            "name": "gcr.io/cloud-builders/docker",
            "args": ["build", "-t", f"gcr.io/$PROJECT_ID/{GCP_SOURCE_BUCKET_NAME}", "."],
        }
    ],
    "images": [f"gcr.io/$PROJECT_ID/{GCP_SOURCE_BUCKET_NAME}"],
}
# [END howto_operator_gcp_create_build_from_storage_body]

# [START howto_operator_create_build_from_repo_body]
create_build_from_repo_body = {
    "source": {"repo_source": {"repo_name": GCP_SOURCE_REPOSITORY_NAME, "branch_name": "main"}},
    "steps": [
        {
            "name": "gcr.io/cloud-builders/docker",
            "args": ["build", "-t", "gcr.io/$PROJECT_ID/$REPO_NAME", "."],
        }
    ],
    "images": ["gcr.io/$PROJECT_ID/$REPO_NAME"],
}
# [END howto_operator_create_build_from_repo_body]


with models.DAG(
    "example_gcp_cloud_build",
    default_args=dict(start_date=dates.days_ago(1)),
    schedule_interval='@once',
    tags=["example"],
) as build_dag:

    # [START howto_operator_create_build_from_storage]
    create_build_from_storage = CloudBuildCreateBuildOperator(
        task_id="create_build_from_storage", project_id=GCP_PROJECT_ID, build=create_build_from_storage_body
    )
    # [END howto_operator_create_build_from_storage]

    # [START howto_operator_create_build_from_storage_result]
    create_build_from_storage_result = BashOperator(
        bash_command=f"echo { create_build_from_storage.output['results'] }",
        task_id="create_build_from_storage_result",
    )
    # [END howto_operator_create_build_from_storage_result]

    # [START howto_operator_create_build_from_repo]
    create_build_from_repo = CloudBuildCreateBuildOperator(
        task_id="create_build_from_repo", project_id=GCP_PROJECT_ID, build=create_build_from_repo_body
    )
    # [END howto_operator_create_build_from_repo]

    # [START howto_operator_create_build_from_repo_result]
    create_build_from_repo_result = BashOperator(
        bash_command=f"echo { create_build_from_repo.output['results'] }",
        task_id="create_build_from_repo_result",
    )
    # [END howto_operator_create_build_from_repo_result]

    # [START howto_operator_list_builds]
    list_builds = CloudBuildListBuildsOperator(
        task_id="list_builds", project_id=GCP_PROJECT_ID, location="global"
    )
    # [END howto_operator_list_builds]

    # [START howto_operator_create_build_without_wait]
    create_build_without_wait = CloudBuildCreateBuildOperator(
        task_id="create_build_without_wait",
        project_id=GCP_PROJECT_ID,
        build=create_build_from_repo_body,
        wait=False,
    )
    # [END howto_operator_create_build_without_wait]

    # [START howto_operator_cancel_build]
    cancel_build = CloudBuildCancelBuildOperator(
        task_id="cancel_build",
        id_=create_build_without_wait.output['id'],
        project_id=GCP_PROJECT_ID,
    )
    # [END howto_operator_cancel_build]

    # [START howto_operator_retry_build]
    retry_build = CloudBuildRetryBuildOperator(
        task_id="retry_build",
        id_=cancel_build.output['id'],
        project_id=GCP_PROJECT_ID,
    )
    # [END howto_operator_retry_build]

    # [START howto_operator_get_build]
    get_build = CloudBuildGetBuildOperator(
        task_id="get_build",
        id_=retry_build.output['id'],
        project_id=GCP_PROJECT_ID,
    )
    # [END howto_operator_get_build]

    # [START howto_operator_gcp_create_build_from_yaml_body]
    create_build_from_file = CloudBuildCreateBuildOperator(
        task_id="create_build_from_file",
        project_id=GCP_PROJECT_ID,
        build=str(CURRENT_FOLDER.joinpath('example_cloud_build.yaml')),
        params={'name': 'Airflow'},
    )
    # [END howto_operator_gcp_create_build_from_yaml_body]

    create_build_from_storage >> create_build_from_storage_result
    create_build_from_storage_result >> list_builds
    create_build_from_repo >> create_build_from_repo_result
    create_build_from_repo_result >> list_builds
    list_builds >> create_build_without_wait >> cancel_build
    cancel_build >> retry_build >> get_build

with models.DAG(
    "example_gcp_cloud_build_trigger",
    default_args=dict(start_date=dates.days_ago(1)),
    schedule_interval='@once',
    tags=["example"],
) as build_trigger_dag:

    # [START howto_operator_create_build_trigger]
    create_build_trigger = CloudBuildCreateBuildTriggerOperator(
        task_id="create_build_trigger", project_id=GCP_PROJECT_ID, trigger=create_build_trigger_body
    )
    # [END howto_operator_create_build_trigger]

    # [START howto_operator_run_build_trigger]
    run_build_trigger = CloudBuildRunBuildTriggerOperator(
        task_id="run_build_trigger",
        project_id=GCP_PROJECT_ID,
        trigger_id=create_build_trigger.output['id'],
        source=create_build_from_repo_body['source']['repo_source'],
    )
    # [END howto_operator_run_build_trigger]

    # [START howto_operator_create_build_trigger]
    update_build_trigger = CloudBuildUpdateBuildTriggerOperator(
        task_id="update_build_trigger",
        project_id=GCP_PROJECT_ID,
        trigger_id=create_build_trigger.output['id'],
        trigger=update_build_trigger_body,
    )
    # [END howto_operator_create_build_trigger]

    # [START howto_operator_get_build_trigger]
    get_build_trigger = CloudBuildGetBuildTriggerOperator(
        task_id="get_build_trigger",
        project_id=GCP_PROJECT_ID,
        trigger_id=create_build_trigger.output['id'],
    )
    # [END howto_operator_get_build_trigger]

    # [START howto_operator_delete_build_trigger]
    delete_build_trigger = CloudBuildDeleteBuildTriggerOperator(
        task_id="delete_build_trigger",
        project_id=GCP_PROJECT_ID,
        trigger_id=create_build_trigger.output['id'],
    )
    # [END howto_operator_delete_build_trigger]

    # [START howto_operator_list_build_triggers]
    list_build_triggers = CloudBuildListBuildTriggersOperator(
        task_id="list_build_triggers", project_id=GCP_PROJECT_ID, location="global", page_size=5
    )
    # [END howto_operator_list_build_triggers]

    create_build_trigger >> run_build_trigger >> update_build_trigger  # pylint: disable=pointless-statement
    update_build_trigger >> get_build_trigger >> delete_build_trigger  # pylint: disable=pointless-statement
    delete_build_trigger >> list_build_triggers  # pylint: disable=pointless-statement

Was this entry helpful?