#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains a Google Cloud Functions Hook."""
from __future__ import annotations
import time
from typing import Sequence
import requests
from googleapiclient.discovery import build
from airflow.exceptions import AirflowException
from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook
# Time to sleep between active checks of the operation results
[docs]TIME_TO_SLEEP_IN_SECONDS = 1
[docs]class CloudFunctionsHook(GoogleBaseHook):
"""
Hook for the Google Cloud Functions APIs.
All the methods in the hook where project_id is used must be called with
keyword arguments rather than positional.
"""
_conn: build | None = None
def __init__(
self,
api_version: str,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
if kwargs.get("delegate_to") is not None:
raise RuntimeError(
"The `delegate_to` parameter has been deprecated before and finally removed in this version"
" of Google Provider. You MUST convert it to `impersonate_chain`"
)
super().__init__(
gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain,
)
self.api_version = api_version
@staticmethod
def _full_location(project_id: str, location: str) -> str:
"""
Retrieve full location of the function in the form of
``projects/<GCP_PROJECT_ID>/locations/<GCP_LOCATION>``.
:param project_id: The Google Cloud Project project_id where the function belongs.
:param location: The location where the function is created.
:return:
"""
return f"projects/{project_id}/locations/{location}"
[docs] def get_conn(self) -> build:
"""
Retrieves the connection to Cloud Functions.
:return: Google Cloud Function services object.
"""
if not self._conn:
http_authorized = self._authorize()
self._conn = build(
"cloudfunctions", self.api_version, http=http_authorized, cache_discovery=False
)
return self._conn
[docs] def get_function(self, name: str) -> dict:
"""
Returns the Cloud Function with the given name.
:param name: Name of the function.
:return: A Cloud Functions object representing the function.
"""
# fmt: off
return self.get_conn().projects().locations().functions().get(
name=name).execute(num_retries=self.num_retries)
# fmt: on
@GoogleBaseHook.fallback_to_default_project_id
[docs] def create_new_function(self, location: str, body: dict, project_id: str) -> None:
"""
Creates a new function in Cloud Function in the location specified in the body.
:param location: The location of the function.
:param body: The body required by the Cloud Functions insert API.
:param project_id: Optional, Google Cloud Project project_id where the function belongs.
If set to None or missing, the default project_id from the Google Cloud connection is used.
:return: None
"""
# fmt: off
response = self.get_conn().projects().locations().functions().create(
location=self._full_location(project_id, location),
body=body
).execute(num_retries=self.num_retries)
# fmt: on
operation_name = response["name"]
self._wait_for_operation_to_complete(operation_name=operation_name)
[docs] def update_function(self, name: str, body: dict, update_mask: list[str]) -> None:
"""
Updates Cloud Functions according to the specified update mask.
:param name: The name of the function.
:param body: The body required by the cloud function patch API.
:param update_mask: The update mask - array of fields that should be patched.
:return: None
"""
# fmt: off
response = self.get_conn().projects().locations().functions().patch(
updateMask=",".join(update_mask),
name=name,
body=body
).execute(num_retries=self.num_retries)
# fmt: on
operation_name = response["name"]
self._wait_for_operation_to_complete(operation_name=operation_name)
@GoogleBaseHook.fallback_to_default_project_id
[docs] def upload_function_zip(self, location: str, zip_path: str, project_id: str) -> str:
"""
Uploads zip file with sources.
:param location: The location where the function is created.
:param zip_path: The path of the valid .zip file to upload.
:param project_id: Optional, Google Cloud Project project_id where the function belongs.
If set to None or missing, the default project_id from the Google Cloud connection is used.
:return: The upload URL that was returned by generateUploadUrl method.
"""
# fmt: off
response = \
self.get_conn().projects().locations().functions().generateUploadUrl(
parent=self._full_location(project_id, location)
).execute(num_retries=self.num_retries)
# fmt: on
upload_url = response.get("uploadUrl")
with open(zip_path, "rb") as file:
requests.put(
url=upload_url,
data=file,
# Those two headers needs to be specified according to:
# https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/generateUploadUrl
# nopep8
headers={
"Content-type": "application/zip",
"x-goog-content-length-range": "0,104857600",
},
)
return upload_url
[docs] def delete_function(self, name: str) -> None:
"""
Deletes the specified Cloud Function.
:param name: The name of the function.
:return: None
"""
# fmt: off
response = self.get_conn().projects().locations().functions().delete(
name=name).execute(num_retries=self.num_retries)
# fmt: on
operation_name = response["name"]
self._wait_for_operation_to_complete(operation_name=operation_name)
@GoogleBaseHook.fallback_to_default_project_id
[docs] def call_function(
self,
function_id: str,
input_data: dict,
location: str,
project_id: str = PROVIDE_PROJECT_ID,
) -> dict:
"""
Synchronously invokes a deployed Cloud Function. To be used for testing
purposes as very limited traffic is allowed.
:param function_id: ID of the function to be called
:param input_data: Input to be passed to the function
:param location: The location where the function is located.
:param project_id: Optional, Google Cloud Project project_id where the function belongs.
If set to None or missing, the default project_id from the Google Cloud connection is used.
:return: None
"""
name = f"projects/{project_id}/locations/{location}/functions/{function_id}"
# fmt: off
response = self.get_conn().projects().locations().functions().call(
name=name,
body=input_data
).execute(num_retries=self.num_retries)
# fmt: on
if "error" in response:
raise AirflowException(response["error"])
return response
def _wait_for_operation_to_complete(self, operation_name: str) -> dict:
"""
Waits for the named operation to complete - checks status of the
asynchronous call.
:param operation_name: The name of the operation.
:return: The response returned by the operation.
:exception: AirflowException in case error is returned.
"""
service = self.get_conn()
while True:
# fmt: off
operation_response = service.operations().get(
name=operation_name,
).execute(num_retries=self.num_retries)
# fmt: on
if operation_response.get("done"):
response = operation_response.get("response")
error = operation_response.get("error")
# Note, according to documentation always either response or error is
# set when "done" == True
if error:
raise AirflowException(str(error))
return response
time.sleep(TIME_TO_SLEEP_IN_SECONDS)