Source code for airflow.providers.amazon.aws.secrets.systems_manager
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Objects relating to sourcing connections from AWS SSM Parameter Store"""fromtypingimportOptionalimportboto3try:fromfunctoolsimportcached_propertyexceptImportError:fromcached_propertyimportcached_propertyfromairflow.secretsimportBaseSecretsBackendfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]classSystemsManagerParameterStoreBackend(BaseSecretsBackend,LoggingMixin):""" Retrieves Connection or Variables from AWS SSM Parameter Store Configurable via ``airflow.cfg`` like so: .. code-block:: ini [secrets] backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend backend_kwargs = {"connections_prefix": "/airflow/connections", "profile_name": null} For example, if ssm path is ``/airflow/connections/smtp_default``, this would be accessible if you provide ``{"connections_prefix": "/airflow/connections"}`` and request conn_id ``smtp_default``. And if ssm path is ``/airflow/variables/hello``, this would be accessible if you provide ``{"variables_prefix": "/airflow/variables"}`` and request conn_id ``hello``. :param connections_prefix: Specifies the prefix of the secret to read to get Connections. If set to None (null), requests for connections will not be sent to AWS SSM Parameter Store. :type connections_prefix: str :param variables_prefix: Specifies the prefix of the secret to read to get Variables. If set to None (null), requests for variables will not be sent to AWS SSM Parameter Store. :type variables_prefix: str :param config_prefix: Specifies the prefix of the secret to read to get Variables. If set to None (null), requests for configurations will not be sent to AWS SSM Parameter Store. :type config_prefix: str :param profile_name: The name of a profile to use. If not given, then the default profile is used. :type profile_name: str """def__init__(self,connections_prefix:str='/airflow/connections',variables_prefix:str='/airflow/variables',config_prefix:str='/airflow/config',profile_name:Optional[str]=None,**kwargs,):super().__init__()ifconnections_prefixisnotNone:self.connections_prefix=connections_prefix.rstrip("/")else:self.connections_prefix=connections_prefixifvariables_prefixisnotNone:self.variables_prefix=variables_prefix.rstrip('/')else:self.variables_prefix=variables_prefixifconfig_prefixisnotNone:self.config_prefix=config_prefix.rstrip('/')else:self.config_prefix=config_prefixself.profile_name=profile_nameself.kwargs=kwargs@cached_property
[docs]defclient(self):"""Create a SSM client"""session=boto3.Session(profile_name=self.profile_name)returnsession.client("ssm",**self.kwargs)
[docs]defget_conn_uri(self,conn_id:str)->Optional[str]:""" Get param value :param conn_id: connection id :type conn_id: str """ifself.connections_prefixisNone:returnNonereturnself._get_secret(self.connections_prefix,conn_id)
[docs]defget_variable(self,key:str)->Optional[str]:""" Get Airflow Variable from Environment Variable :param key: Variable Key :return: Variable Value """ifself.variables_prefixisNone:returnNonereturnself._get_secret(self.variables_prefix,key)
[docs]defget_config(self,key:str)->Optional[str]:""" Get Airflow Configuration :param key: Configuration Option Key :return: Configuration Option Value """ifself.config_prefixisNone:returnNonereturnself._get_secret(self.config_prefix,key)
def_get_secret(self,path_prefix:str,secret_id:str)->Optional[str]:""" Get secret value from Parameter Store. :param path_prefix: Prefix for the Path to get Secret :type path_prefix: str :param secret_id: Secret Key :type secret_id: str """ssm_path=self.build_path(path_prefix,secret_id)try:response=self.client.get_parameter(Name=ssm_path,WithDecryption=True)value=response["Parameter"]["Value"]returnvalueexceptself.client.exceptions.ParameterNotFound:self.log.debug("Parameter %s not found.",ssm_path)returnNone