Source code for airflow.providers.amazon.aws.secrets.systems_manager
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Objects relating to sourcing connections from AWS SSM Parameter Store"""importreimportwarningsfromtypingimportOptionalimportboto3fromairflow.compat.functoolsimportcached_propertyfromairflow.providers.amazon.aws.utilsimportget_airflow_versionfromairflow.secretsimportBaseSecretsBackendfromairflow.utils.log.logging_mixinimportLoggingMixindef_parse_version(val):val=re.sub(r'(\d+\.\d+\.\d+).*',lambdax:x.group(1),val)returntuple(int(x)forxinval.split('.'))
[docs]classSystemsManagerParameterStoreBackend(BaseSecretsBackend,LoggingMixin):""" Retrieves Connection or Variables from AWS SSM Parameter Store Configurable via ``airflow.cfg`` like so: .. code-block:: ini [secrets] backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend backend_kwargs = {"connections_prefix": "/airflow/connections", "profile_name": null} For example, if ssm path is ``/airflow/connections/smtp_default``, this would be accessible if you provide ``{"connections_prefix": "/airflow/connections"}`` and request conn_id ``smtp_default``. And if ssm path is ``/airflow/variables/hello``, this would be accessible if you provide ``{"variables_prefix": "/airflow/variables"}`` and request conn_id ``hello``. :param connections_prefix: Specifies the prefix of the secret to read to get Connections. If set to None (null), requests for connections will not be sent to AWS SSM Parameter Store. :param variables_prefix: Specifies the prefix of the secret to read to get Variables. If set to None (null), requests for variables will not be sent to AWS SSM Parameter Store. :param config_prefix: Specifies the prefix of the secret to read to get Variables. If set to None (null), requests for configurations will not be sent to AWS SSM Parameter Store. :param profile_name: The name of a profile to use. If not given, then the default profile is used. """def__init__(self,connections_prefix:str='/airflow/connections',variables_prefix:str='/airflow/variables',config_prefix:str='/airflow/config',profile_name:Optional[str]=None,**kwargs,):super().__init__()ifconnections_prefixisnotNone:self.connections_prefix=connections_prefix.rstrip("/")else:self.connections_prefix=connections_prefixifvariables_prefixisnotNone:self.variables_prefix=variables_prefix.rstrip('/')else:self.variables_prefix=variables_prefixifconfig_prefixisnotNone:self.config_prefix=config_prefix.rstrip('/')else:self.config_prefix=config_prefixself.profile_name=profile_nameself.kwargs=kwargs@cached_property
[docs]defclient(self):"""Create a SSM client"""session=boto3.Session(profile_name=self.profile_name)returnsession.client("ssm",**self.kwargs)
[docs]defget_conn_value(self,conn_id:str)->Optional[str]:""" Get param value :param conn_id: connection id """ifself.connections_prefixisNone:returnNonereturnself._get_secret(self.connections_prefix,conn_id)
[docs]defget_conn_uri(self,conn_id:str)->Optional[str]:""" Return URI representation of Connection conn_id. As of Airflow version 2.3.0 this method is deprecated. :param conn_id: the connection id :return: deserialized Connection """ifget_airflow_version()>=(2,3):warnings.warn(f"Method `{self.__class__.__name__}.get_conn_uri` is deprecated and will be removed ""in a future release. Please use method `get_conn_value` instead.",DeprecationWarning,stacklevel=2,)returnself.get_conn_value(conn_id)
[docs]defget_variable(self,key:str)->Optional[str]:""" Get Airflow Variable from Environment Variable :param key: Variable Key :return: Variable Value """ifself.variables_prefixisNone:returnNonereturnself._get_secret(self.variables_prefix,key)
[docs]defget_config(self,key:str)->Optional[str]:""" Get Airflow Configuration :param key: Configuration Option Key :return: Configuration Option Value """ifself.config_prefixisNone:returnNonereturnself._get_secret(self.config_prefix,key)
def_get_secret(self,path_prefix:str,secret_id:str)->Optional[str]:""" Get secret value from Parameter Store. :param path_prefix: Prefix for the Path to get Secret :param secret_id: Secret Key """ssm_path=self.build_path(path_prefix,secret_id)try:response=self.client.get_parameter(Name=ssm_path,WithDecryption=True)returnresponse["Parameter"]["Value"]exceptself.client.exceptions.ParameterNotFound:self.log.debug("Parameter %s not found.",ssm_path)returnNone