Source code for

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Objects relating to sourcing connections from Google Cloud Secrets Manager."""

from __future__ import annotations

import logging
from typing import Sequence

from deprecated import deprecated
from google.auth.exceptions import DefaultCredentialsError

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from import _SecretManagerClient
from import (
from airflow.secrets import BaseSecretsBackend
from airflow.utils.log.logging_mixin import LoggingMixin

[docs]log = logging.getLogger(__name__)
[docs]SECRET_ID_PATTERN = r"^[a-zA-Z0-9-_]*$"
[docs]class CloudSecretManagerBackend(BaseSecretsBackend, LoggingMixin): """ Retrieves Connection object from Google Cloud Secrets Manager. Configurable via ``airflow.cfg`` as follows: .. code-block:: ini [secrets] backend = backend_kwargs = {"connections_prefix": "airflow-connections", "sep": "-"} For example, if the Secrets Manager secret id is ``airflow-connections-smtp_default``, this would be accessible if you provide ``{"connections_prefix": "airflow-connections", "sep": "-"}`` and request conn_id ``smtp_default``. If the Secrets Manager secret id is ``airflow-variables-hello``, this would be accessible if you provide ``{"variables_prefix": "airflow-variables", "sep": "-"}`` and request Variable Key ``hello``. The full secret id should follow the pattern "[a-zA-Z0-9-_]". :param connections_prefix: Specifies the prefix of the secret to read to get Connections. If set to None (null), requests for connections will not be sent to GCP Secrets Manager :param variables_prefix: Specifies the prefix of the secret to read to get Variables. If set to None (null), requests for variables will not be sent to GCP Secrets Manager :param config_prefix: Specifies the prefix of the secret to read to get Airflow Configurations containing secrets. If set to None (null), requests for configurations will not be sent to GCP Secrets Manager :param gcp_key_path: Path to Google Cloud Service Account key file (JSON). Mutually exclusive with gcp_keyfile_dict. use default credentials in the current environment if not provided. :param gcp_keyfile_dict: Dictionary of keyfile parameters. Mutually exclusive with gcp_key_path. :param gcp_credential_config_file: File path to or content of a GCP credential configuration file. :param gcp_scopes: Comma-separated string containing OAuth2 scopes :param project_id: Project ID to read the secrets from. If not passed, the project ID from credentials will be used. :param sep: Separator used to concatenate connections_prefix and conn_id. Default: "-" :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account. """ def __init__( self, connections_prefix: str = "airflow-connections", variables_prefix: str = "airflow-variables", config_prefix: str = "airflow-config", gcp_keyfile_dict: dict | None = None, gcp_key_path: str | None = None, gcp_credential_config_file: dict[str, str] | str | None = None, gcp_scopes: str | None = None, project_id: str = PROVIDE_PROJECT_ID, sep: str = "-", impersonation_chain: str | Sequence[str] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) self.connections_prefix = connections_prefix self.variables_prefix = variables_prefix self.config_prefix = config_prefix self.sep = sep if connections_prefix is not None: if not self._is_valid_prefix_and_sep(): raise AirflowException( "`connections_prefix`, `variables_prefix` and `sep` should " f"follows that pattern {SECRET_ID_PATTERN}" ) try: if impersonation_chain: target_principal, delegates = _get_target_principal_and_delegates(impersonation_chain) else: target_principal = None delegates = None self.credentials, self.project_id = get_credentials_and_project_id( keyfile_dict=gcp_keyfile_dict, key_path=gcp_key_path, credential_config_file=gcp_credential_config_file, scopes=gcp_scopes, target_principal=target_principal, delegates=delegates, ) except (DefaultCredentialsError, FileNotFoundError): log.exception( "Unable to load credentials for GCP Secret Manager. " "Make sure that the keyfile path or dictionary, credential configuration file, " "or GOOGLE_APPLICATION_CREDENTIALS environment variable is correct and properly configured." ) # In case project id provided if project_id: self.project_id = project_id @property
[docs] def client(self) -> _SecretManagerClient: """ Property returning secret client. :return: Secrets client """ return _SecretManagerClient(credentials=self.credentials)
def _is_valid_prefix_and_sep(self) -> bool: prefix = self.connections_prefix + self.sep return _SecretManagerClient.is_valid_secret_name(prefix)
[docs] def get_conn_value(self, conn_id: str) -> str | None: """ Get serialized representation of Connection. :param conn_id: connection id """ if self.connections_prefix is None: return None return self._get_secret(self.connections_prefix, conn_id)
@deprecated( reason=( "Method `CloudSecretManagerBackend.get_conn_uri` is deprecated and will be removed " "in a future release. Please use method `get_conn_value` instead." ), category=AirflowProviderDeprecationWarning, )
[docs] def get_conn_uri(self, conn_id: str) -> str | None: """ Return URI representation of Connection conn_id. As of Airflow version 2.3.0 this method is deprecated. :param conn_id: the connection id :return: deserialized Connection """ return self.get_conn_value(conn_id)
[docs] def get_variable(self, key: str) -> str | None: """ Get Airflow Variable from Environment Variable. :param key: Variable Key :return: Variable Value """ if self.variables_prefix is None: return None return self._get_secret(self.variables_prefix, key)
[docs] def get_config(self, key: str) -> str | None: """ Get Airflow Configuration. :param key: Configuration Option Key :return: Configuration Option Value """ if self.config_prefix is None: return None return self._get_secret(self.config_prefix, key)
def _get_secret(self, path_prefix: str, secret_id: str) -> str | None: """ Get secret value from the SecretManager based on prefix. :param path_prefix: Prefix for the Path to get Secret :param secret_id: Secret Key """ secret_id = self.build_path(path_prefix, secret_id, self.sep) return self.client.get_secret(secret_id=secret_id, project_id=self.project_id)

Was this entry helpful?