Source code for airflow.contrib.hooks.openfaas_hook

# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.hooks.base_hook import BaseHook
import requests
from airflow import AirflowException

[docs]OK_STATUS_CODE = 202
[docs]class OpenFaasHook(BaseHook): """ Interact with Openfaas to query, deploy, invoke and update function :param function_name: Name of the function, Defaults to None :type query: str :param conn_id: openfass connection to use, Defaults to open_faas_default for example host :, Conn Type : Http :type conn_id: str """
[docs] GET_FUNCTION = "/system/function/"
[docs] INVOKE_ASYNC_FUNCTION = "/async-function/"
[docs] DEPLOY_FUNCTION = "/system/functions"
[docs] UPDATE_FUNCTION = "/system/functions"
def __init__(self, function_name=None, conn_id='open_faas_default', *args, **kwargs): self.function_name = function_name self.conn_id = conn_id super(BaseHook, self).__init__(*args, **kwargs)
[docs] def get_conn(self): conn = self.get_connection(self.conn_id) return conn
[docs] def deploy_function(self, overwrite_function_if_exist, body): if overwrite_function_if_exist:"Function already exist " + self.function_name + " going to update") self.update_function(body) else: url = self.get_conn().host + self.DEPLOY_FUNCTION"Deploying function " + url) response =, body) if response.status_code != OK_STATUS_CODE: self.log.error("Response status " + str(response.status_code)) self.log.error("Failed to deploy") raise AirflowException('failed to deploy') else:"Function deployed " + self.function_name)
[docs] def invoke_async_function(self, body): url = self.get_conn().host + self.INVOKE_ASYNC_FUNCTION + self.function_name"Invoking function " + url) response =, body) if response.ok:"Invoked " + self.function_name) else: self.log.error("Response status " + str(response.status_code)) raise AirflowException('failed to invoke function')
[docs] def update_function(self, body): url = self.get_conn().host + self.UPDATE_FUNCTION"Updating function " + url) response = requests.put(url, body) if response.status_code != OK_STATUS_CODE: self.log.error("Response status " + str(response.status_code)) self.log.error("Failed to update response " + response.content.decode("utf-8")) raise AirflowException('failed to update ' + self.function_name) else:"Function was updated")
[docs] def does_function_exist(self): url = self.get_conn().host + self.GET_FUNCTION + self.function_name response = requests.get(url) if response.ok: return True else: self.log.error("Failed to find function " + self.function_name) return False

Was this entry helpful?