Source code for airflow.providers.hashicorp.secrets.vault

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Objects relating to sourcing connections & variables from Hashicorp Vault"""
from typing import Optional

from airflow.providers.hashicorp._internal_client.vault_client import _VaultClient  # noqa
from airflow.secrets import BaseSecretsBackend
from airflow.utils.log.logging_mixin import LoggingMixin


# pylint: disable=too-many-instance-attributes,too-many-locals
[docs]class VaultBackend(BaseSecretsBackend, LoggingMixin): """ Retrieves Connections and Variables from Hashicorp Vault. Configurable via ``airflow.cfg`` as follows: .. code-block:: ini [secrets] backend = airflow.providers.hashicorp.secrets.vault.VaultBackend backend_kwargs = { "connections_path": "connections", "url": "http://127.0.0.1:8200", "mount_point": "airflow" } For example, if your keys are under ``connections`` path in ``airflow`` mount_point, this would be accessible if you provide ``{"connections_path": "connections"}`` and request conn_id ``smtp_default``. :param connections_path: Specifies the path of the secret to read to get Connections. (default: 'connections'). If set to None (null), requests for connections will not be sent to Vault. :type connections_path: str :param variables_path: Specifies the path of the secret to read to get Variable. (default: 'variables'). If set to None (null), requests for variables will not be sent to Vault. :type variables_path: str :param config_path: Specifies the path of the secret to read Airflow Configurations (default: 'config'). If set to None (null), requests for configurations will not be sent to Vault. :type config_path: str :param url: Base URL for the Vault instance being addressed. :type url: str :param auth_type: Authentication Type for Vault. Default is ``token``. Available values are: ('approle', 'aws_iam', 'azure', 'github', 'gcp', 'kubernetes', 'ldap', 'radius', 'token', 'userpass') :type auth_type: str :param auth_mount_point: It can be used to define mount_point for authentication chosen Default depends on the authentication method used. :type auth_mount_point: str :param mount_point: The "path" the secret engine was mounted on. Default is "secret". Note that this mount_point is not used for authentication if authentication is done via a different engine. For authentication mount_points see, auth_mount_point. :type mount_point: str :param kv_engine_version: Select the version of the engine to run (``1`` or ``2``, default: ``2``). :type kv_engine_version: int :param token: Authentication token to include in requests sent to Vault. (for ``token`` and ``github`` auth_type) :type token: str :param token_path: path to file containing authentication token to include in requests sent to Vault (for ``token`` and ``github`` auth_type). :type token_path: str :param username: Username for Authentication (for ``ldap`` and ``userpass`` auth_type). :type username: str :param password: Password for Authentication (for ``ldap`` and ``userpass`` auth_type). :type password: str :param key_id: Key ID for Authentication (for ``aws_iam`` and ''azure`` auth_type). :type key_id: str :param secret_id: Secret ID for Authentication (for ``approle``, ``aws_iam`` and ``azure`` auth_types). :type secret_id: str :param role_id: Role ID for Authentication (for ``approle``, ``aws_iam`` auth_types). :type role_id: str :param kubernetes_role: Role for Authentication (for ``kubernetes`` auth_type). :type kubernetes_role: str :param kubernetes_jwt_path: Path for kubernetes jwt token (for ``kubernetes`` auth_type, default: ``/var/run/secrets/kubernetes.io/serviceaccount/token``). :type kubernetes_jwt_path: str :param gcp_key_path: Path to Google Cloud Service Account key file (JSON) (for ``gcp`` auth_type). Mutually exclusive with gcp_keyfile_dict. :type gcp_key_path: str :param gcp_keyfile_dict: Dictionary of keyfile parameters. (for ``gcp`` auth_type). Mutually exclusive with gcp_key_path. :type gcp_keyfile_dict: dict :param gcp_scopes: Comma-separated string containing OAuth2 scopes (for ``gcp`` auth_type). :type gcp_scopes: str :param azure_tenant_id: The tenant id for the Azure Active Directory (for ``azure`` auth_type). :type azure_tenant_id: str :param azure_resource: The configured URL for the application registered in Azure Active Directory (for ``azure`` auth_type). :type azure_resource: str :param radius_host: Host for radius (for ``radius`` auth_type). :type radius_host: str :param radius_secret: Secret for radius (for ``radius`` auth_type). :type radius_secret: str :param radius_port: Port for radius (for ``radius`` auth_type). :type radius_port: str """ def __init__( # pylint: disable=too-many-arguments self, connections_path: str = 'connections', variables_path: str = 'variables', config_path: str = 'config', url: Optional[str] = None, auth_type: str = 'token', auth_mount_point: Optional[str] = None, mount_point: str = 'secret', kv_engine_version: int = 2, token: Optional[str] = None, token_path: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, key_id: Optional[str] = None, secret_id: Optional[str] = None, role_id: Optional[str] = None, kubernetes_role: Optional[str] = None, kubernetes_jwt_path: str = '/var/run/secrets/kubernetes.io/serviceaccount/token', gcp_key_path: Optional[str] = None, gcp_keyfile_dict: Optional[dict] = None, gcp_scopes: Optional[str] = None, azure_tenant_id: Optional[str] = None, azure_resource: Optional[str] = None, radius_host: Optional[str] = None, radius_secret: Optional[str] = None, radius_port: Optional[int] = None, **kwargs, ): super().__init__() if connections_path is not None: self.connections_path = connections_path.rstrip('/') else: self.connections_path = connections_path if variables_path is not None: self.variables_path = variables_path.rstrip('/') else: self.variables_path = variables_path if config_path is not None: self.config_path = config_path.rstrip('/') else: self.config_path = config_path self.mount_point = mount_point self.kv_engine_version = kv_engine_version self.vault_client = _VaultClient( url=url, auth_type=auth_type, auth_mount_point=auth_mount_point, mount_point=mount_point, kv_engine_version=kv_engine_version, token=token, token_path=token_path, username=username, password=password, key_id=key_id, secret_id=secret_id, role_id=role_id, kubernetes_role=kubernetes_role, kubernetes_jwt_path=kubernetes_jwt_path, gcp_key_path=gcp_key_path, gcp_keyfile_dict=gcp_keyfile_dict, gcp_scopes=gcp_scopes, azure_tenant_id=azure_tenant_id, azure_resource=azure_resource, radius_host=radius_host, radius_secret=radius_secret, radius_port=radius_port, **kwargs, )
[docs] def get_conn_uri(self, conn_id: str) -> Optional[str]: """ Get secret value from Vault. Store the secret in the form of URI :param conn_id: The connection id :type conn_id: str :rtype: str :return: The connection uri retrieved from the secret """ if self.connections_path is None: return None else: secret_path = self.build_path(self.connections_path, conn_id) response = self.vault_client.get_secret(secret_path=secret_path) return response.get("conn_uri") if response else None
[docs] def get_variable(self, key: str) -> Optional[str]: """ Get Airflow Variable :param key: Variable Key :type key: str :rtype: str :return: Variable Value retrieved from the vault """ if self.variables_path is None: return None else: secret_path = self.build_path(self.variables_path, key) response = self.vault_client.get_secret(secret_path=secret_path) return response.get("value") if response else None
[docs] def get_config(self, key: str) -> Optional[str]: """ Get Airflow Configuration :param key: Configuration Option Key :type key: str :rtype: str :return: Configuration Option Value retrieved from the vault """ if self.config_path is None: return None else: secret_path = self.build_path(self.config_path, key) response = self.vault_client.get_secret(secret_path=secret_path) return response.get("value") if response else None

Was this entry helpful?