Source code for airflow.providers.http.hooks.http

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Callable, Dict, Optional, Union

import requests
import tenacity
from requests.auth import HTTPBasicAuth

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook


[docs]class HttpHook(BaseHook): """ Interact with HTTP servers. :param method: the API method to be called :type method: str :param http_conn_id: :ref:`http connection<howto/connection:http>` that has the base API url i.e https://www.google.com/ and optional authentication credentials. Default headers can also be specified in the Extra field in json format. :type http_conn_id: str :param auth_type: The auth type for the service :type auth_type: AuthBase of python requests lib """
[docs] conn_name_attr = 'http_conn_id'
[docs] default_conn_name = 'http_default'
[docs] conn_type = 'http'
[docs] hook_name = 'HTTP'
def __init__( self, method: str = 'POST', http_conn_id: str = default_conn_name, auth_type: Any = HTTPBasicAuth, ) -> None: super().__init__() self.http_conn_id = http_conn_id self.method = method.upper() self.base_url: str = "" self._retry_obj: Callable[..., Any] self.auth_type: Any = auth_type # headers may be passed through directly or in the "extra" field in the connection # definition
[docs] def get_conn(self, headers: Optional[Dict[Any, Any]] = None) -> requests.Session: """ Returns http session for use with requests :param headers: additional headers to be passed through as a dictionary :type headers: dict """ session = requests.Session() if self.http_conn_id: conn = self.get_connection(self.http_conn_id) if conn.host and "://" in conn.host: self.base_url = conn.host else: # schema defaults to HTTP schema = conn.schema if conn.schema else "http" host = conn.host if conn.host else "" self.base_url = schema + "://" + host if conn.port: self.base_url = self.base_url + ":" + str(conn.port) if conn.login: session.auth = self.auth_type(conn.login, conn.password) if conn.extra: try: session.headers.update(conn.extra_dejson) except TypeError: self.log.warning('Connection to %s has invalid extra field.', conn.host) if headers: session.headers.update(headers) return session
[docs] def run( self, endpoint: Optional[str] = None, data: Optional[Union[Dict[str, Any], str]] = None, headers: Optional[Dict[str, Any]] = None, extra_options: Optional[Dict[str, Any]] = None, **request_kwargs: Any, ) -> Any: r""" Performs the request :param endpoint: the endpoint to be called i.e. resource/v1/query? :type endpoint: str :param data: payload to be uploaded or request parameters :type data: dict :param headers: additional headers to be passed through as a dictionary :type headers: dict :param extra_options: additional options to be used when executing the request i.e. {'check_response': False} to avoid checking raising exceptions on non 2XX or 3XX status codes :type extra_options: dict :param request_kwargs: Additional kwargs to pass when creating a request. For example, ``run(json=obj)`` is passed as ``requests.Request(json=obj)`` """ extra_options = extra_options or {} session = self.get_conn(headers) if self.base_url and not self.base_url.endswith('/') and endpoint and not endpoint.startswith('/'): url = self.base_url + '/' + endpoint else: url = (self.base_url or '') + (endpoint or '') if self.method == 'GET': # GET uses params req = requests.Request(self.method, url, params=data, headers=headers, **request_kwargs) elif self.method == 'HEAD': # HEAD doesn't use params req = requests.Request(self.method, url, headers=headers, **request_kwargs) else: # Others use data req = requests.Request(self.method, url, data=data, headers=headers, **request_kwargs) prepped_request = session.prepare_request(req) self.log.info("Sending '%s' to url: %s", self.method, url) return self.run_and_check(session, prepped_request, extra_options)
[docs] def check_response(self, response: requests.Response) -> None: """ Checks the status code and raise an AirflowException exception on non 2XX or 3XX status codes :param response: A requests response object :type response: requests.response """ try: response.raise_for_status() except requests.exceptions.HTTPError: self.log.error("HTTP error: %s", response.reason) self.log.error(response.text) raise AirflowException(str(response.status_code) + ":" + response.reason)
[docs] def run_and_check( self, session: requests.Session, prepped_request: requests.PreparedRequest, extra_options: Dict[Any, Any], ) -> Any: """ Grabs extra options like timeout and actually runs the request, checking for the result :param session: the session to be used to execute the request :type session: requests.Session :param prepped_request: the prepared request generated in run() :type prepped_request: session.prepare_request :param extra_options: additional options to be used when executing the request i.e. ``{'check_response': False}`` to avoid checking raising exceptions on non 2XX or 3XX status codes :type extra_options: dict """ extra_options = extra_options or {} settings = session.merge_environment_settings( prepped_request.url, proxies=extra_options.get("proxies", {}), stream=extra_options.get("stream", False), verify=extra_options.get("verify"), cert=extra_options.get("cert"), ) # Send the request. send_kwargs = { "timeout": extra_options.get("timeout"), "allow_redirects": extra_options.get("allow_redirects", True), } send_kwargs.update(settings) try: response = session.send(prepped_request, **send_kwargs) if extra_options.get('check_response', True): self.check_response(response) return response except requests.exceptions.ConnectionError as ex: self.log.warning('%s Tenacity will retry to execute the operation', ex) raise ex
[docs] def run_with_advanced_retry(self, _retry_args: Dict[Any, Any], *args: Any, **kwargs: Any) -> Any: """ Runs Hook.run() with a Tenacity decorator attached to it. This is useful for connectors which might be disturbed by intermittent issues and should not instantly fail. :param _retry_args: Arguments which define the retry behaviour. See Tenacity documentation at https://github.com/jd/tenacity :type _retry_args: dict .. code-block:: python hook = HttpHook(http_conn_id="my_conn", method="GET") retry_args = dict( wait=tenacity.wait_exponential(), stop=tenacity.stop_after_attempt(10), retry=requests.exceptions.ConnectionError, ) hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args) """ self._retry_obj = tenacity.Retrying(**_retry_args) return self._retry_obj(self.run, *args, **kwargs)
[docs] def test_connection(self): """Test HTTP Connection""" try: self.run() return True, 'Connection successfully tested' except Exception as e: return False, str(e)

Was this entry helpful?