# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Operators that integrat with Google Cloud Build service."""
from copy import deepcopy
import re
from urllib.parse import urlparse, unquote
import six
from airflow import AirflowException
from airflow.contrib.hooks.gcp_cloud_build_hook import CloudBuildHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
[docs]REGEX_REPO_PATH = re.compile(r"^/p/(?P<project_id>[^/]+)/r/(?P<repo_name>[^/]+)")
[docs]class BuildProcessor:
"""
Processes build configurations to add additional functionality to support the use of operators.
The following improvements are made:
* It is required to provide the source and only one type can be given,
* It is possible to provide the source as the URL address instead dict.
:param body: The request body.
See: https://cloud.google.com/cloud-build/docs/api/reference/rest/Shared.Types/Build
:type body: dict
"""
def __init__(self, body):
self.body = deepcopy(body)
[docs] def _verify_source(self):
is_storage = "storageSource" in self.body["source"]
is_repo = "repoSource" in self.body["source"]
sources_count = sum([is_storage, is_repo])
if sources_count != 1:
raise AirflowException(
"The source could not be determined. Please choose one data source from: "
"storageSource and repoSource."
)
[docs] def process_body(self):
"""
Processes the body passed in the constructor
:return: the body.
:type: dict
"""
self._verify_source()
self._reformat_source()
return self.body
@staticmethod
[docs] def _convert_repo_url_to_dict(source):
"""
Convert url to repository in Google Cloud Source to a format supported by the API
Example valid input:
.. code-block:: none
https://source.developers.google.com/p/airflow-project/r/airflow-repo#branch-name
"""
url_parts = urlparse(source)
match = REGEX_REPO_PATH.search(url_parts.path)
if url_parts.scheme != "https" or url_parts.hostname != "source.developers.google.com" or not match:
raise AirflowException(
"Invalid URL. You must pass the URL in the format: "
"https://source.developers.google.com/p/airflow-project/r/airflow-repo#branch-name"
)
project_id = unquote(match.group("project_id"))
repo_name = unquote(match.group("repo_name"))
source_dict = {"projectId": project_id, "repoName": repo_name, "branchName": "master"}
if url_parts.fragment:
source_dict["branchName"] = url_parts.fragment
return source_dict
@staticmethod
[docs] def _convert_storage_url_to_dict(storage_url):
"""
Convert url to object in Google Cloud Storage to a format supported by the API
Example valid input:
.. code-block:: none
gs://bucket-name/object-name.tar.gz
"""
url_parts = urlparse(storage_url)
if url_parts.scheme != "gs" or not url_parts.hostname or not url_parts.path or url_parts.path == "/":
raise AirflowException(
"Invalid URL. You must pass the URL in the format: "
"gs://bucket-name/object-name.tar.gz#24565443"
)
source_dict = {"bucket": url_parts.hostname, "object": url_parts.path[1:]}
if url_parts.fragment:
source_dict["generation"] = url_parts.fragment
return source_dict
[docs]class CloudBuildCreateBuildOperator(BaseOperator):
"""
Starts a build with the specified configuration.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudBuildCreateBuildOperator`
:param body: The request body.
See: https://cloud.google.com/cloud-build/docs/api/reference/rest/Shared.Types/Build
:type body: dict
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: API version used (for example v1 or v1beta1).
:type api_version: str
"""
[docs] template_fields = ("body", "gcp_conn_id", "api_version")
@apply_defaults
def __init__(
self, body, project_id=None, gcp_conn_id="google_cloud_default", api_version="v1", *args, **kwargs
):
super(CloudBuildCreateBuildOperator, self).__init__(*args, **kwargs)
self.body = body
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.api_version = api_version
self._validate_inputs()
[docs] def execute(self, context):
hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, api_version=self.api_version)
body = BuildProcessor(body=self.body).process_body()
return hook.create_build(body=body, project_id=self.project_id)