Source code for airflow.providers.docker.hooks.docker

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, Optional

from docker import APIClient
from docker.constants import DEFAULT_TIMEOUT_SECONDS
from docker.errors import APIError

from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin


[docs]class DockerHook(BaseHook, LoggingMixin): """ Interact with a Docker Daemon or Registry. :param docker_conn_id: The :ref:`Docker connection id <howto/connection:docker>` where credentials and extra configuration are stored """
[docs] conn_name_attr = 'docker_conn_id'
[docs] default_conn_name = 'docker_default'
[docs] conn_type = 'docker'
[docs] hook_name = 'Docker'
@staticmethod
[docs] def get_ui_field_behaviour() -> Dict[str, Any]: """Returns custom field behaviour""" return { "hidden_fields": ['schema'], "relabeling": { 'host': 'Registry URL', 'login': 'Username',
}, } def __init__( self, docker_conn_id: Optional[str] = default_conn_name, base_url: Optional[str] = None, version: Optional[str] = None, tls: Optional[str] = None, timeout: int = DEFAULT_TIMEOUT_SECONDS, ) -> None: super().__init__() if not base_url: raise AirflowException('No Docker base URL provided') if not version: raise AirflowException('No Docker API version provided') if not docker_conn_id: raise AirflowException('No Docker connection id provided') conn = self.get_connection(docker_conn_id) if not conn.host: raise AirflowException('No Docker URL provided') if not conn.login: raise AirflowException('No username provided') extra_options = conn.extra_dejson self.__base_url = base_url self.__version = version self.__tls = tls self.__timeout = timeout if conn.port: self.__registry = f"{conn.host}:{conn.port}" else: self.__registry = conn.host self.__username = conn.login self.__password = conn.password self.__email = extra_options.get('email') self.__reauth = extra_options.get('reauth') != 'no'
[docs] def get_conn(self) -> APIClient: client = APIClient( base_url=self.__base_url, version=self.__version, tls=self.__tls, timeout=self.__timeout ) self.__login(client) return client
def __login(self, client) -> None: self.log.debug('Logging into Docker') try: client.login( username=self.__username, password=self.__password, registry=self.__registry, email=self.__email, reauth=self.__reauth, ) self.log.debug('Login successful') except APIError as docker_error: self.log.error('Docker login failed: %s', str(docker_error)) raise AirflowException(f'Docker login failed: {docker_error}')

Was this entry helpful?