# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import boto3
import configparser
import logging
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
def _parse_s3_config(config_file_name, config_format='boto', profile=None):
"""
Parses a config file for s3 credentials. Can currently
parse boto, s3cmd.conf and AWS SDK config formats
:param config_file_name: path to the config file
:type config_file_name: str
:param config_format: config type. One of "boto", "s3cmd" or "aws".
Defaults to "boto"
:type config_format: str
:param profile: profile name in AWS type config file
:type profile: str
"""
config = configparser.ConfigParser()
if config.read(config_file_name): # pragma: no cover
sections = config.sections()
else:
raise AirflowException("Couldn't read {0}".format(config_file_name))
# Setting option names depending on file format
if config_format is None:
config_format = 'boto'
conf_format = config_format.lower()
if conf_format == 'boto': # pragma: no cover
if profile is not None and 'profile ' + profile in sections:
cred_section = 'profile ' + profile
else:
cred_section = 'Credentials'
elif conf_format == 'aws' and profile is not None:
cred_section = profile
else:
cred_section = 'default'
# Option names
if conf_format in ('boto', 'aws'): # pragma: no cover
key_id_option = 'aws_access_key_id'
secret_key_option = 'aws_secret_access_key'
# security_token_option = 'aws_security_token'
else:
key_id_option = 'access_key'
secret_key_option = 'secret_key'
# Actual Parsing
if cred_section not in sections:
raise AirflowException("This config file format is not recognized")
else:
try:
access_key = config.get(cred_section, key_id_option)
secret_key = config.get(cred_section, secret_key_option)
except Exception:
logging.warning("Option Error in parsing s3 config file")
raise
return access_key, secret_key
[docs]class AwsHook(BaseHook):
"""
Interact with AWS.
This class is a thin wrapper around the boto3 python library.
"""
def __init__(self, aws_conn_id='aws_default', verify=None):
self.aws_conn_id = aws_conn_id
self.verify = verify
def _get_credentials(self, region_name):
aws_access_key_id = None
aws_secret_access_key = None
aws_session_token = None
endpoint_url = None
if self.aws_conn_id:
try:
connection_object = self.get_connection(self.aws_conn_id)
extra_config = connection_object.extra_dejson
if connection_object.login:
aws_access_key_id = connection_object.login
aws_secret_access_key = connection_object.password
elif 'aws_secret_access_key' in extra_config:
aws_access_key_id = extra_config[
'aws_access_key_id']
aws_secret_access_key = extra_config[
'aws_secret_access_key']
elif 's3_config_file' in extra_config:
aws_access_key_id, aws_secret_access_key = \
_parse_s3_config(
extra_config['s3_config_file'],
extra_config.get('s3_config_format'),
extra_config.get('profile'))
if region_name is None:
region_name = extra_config.get('region_name')
role_arn = extra_config.get('role_arn')
external_id = extra_config.get('external_id')
aws_account_id = extra_config.get('aws_account_id')
aws_iam_role = extra_config.get('aws_iam_role')
if role_arn is None and aws_account_id is not None and \
aws_iam_role is not None:
role_arn = "arn:aws:iam::{}:role/{}" \
.format(aws_account_id, aws_iam_role)
if role_arn is not None:
sts_session = boto3.session.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name)
sts_client = sts_session.client('sts')
if external_id is None:
sts_response = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName='Airflow_' + self.aws_conn_id)
else:
sts_response = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName='Airflow_' + self.aws_conn_id,
ExternalId=external_id)
credentials = sts_response['Credentials']
aws_access_key_id = credentials['AccessKeyId']
aws_secret_access_key = credentials['SecretAccessKey']
aws_session_token = credentials['SessionToken']
endpoint_url = extra_config.get('host')
except AirflowException:
# No connection found: fallback on boto3 credential strategy
# http://boto3.readthedocs.io/en/latest/guide/configuration.html
pass
return boto3.session.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
region_name=region_name), endpoint_url
def get_client_type(self, client_type, region_name=None, config=None):
session, endpoint_url = self._get_credentials(region_name)
return session.client(client_type, endpoint_url=endpoint_url,
config=config, verify=self.verify)
def get_resource_type(self, resource_type, region_name=None, config=None):
session, endpoint_url = self._get_credentials(region_name)
return session.resource(resource_type, endpoint_url=endpoint_url,
config=config, verify=self.verify)
[docs] def get_session(self, region_name=None):
"""Get the underlying boto3.session."""
session, _ = self._get_credentials(region_name)
return session
[docs] def get_credentials(self, region_name=None):
"""Get the underlying `botocore.Credentials` object.
This contains the following authentication attributes: access_key, secret_key and token.
"""
session, _ = self._get_credentials(region_name)
# Credentials are refreshable, so accessing your access key and
# secret key separately can lead to a race condition.
# See https://stackoverflow.com/a/36291428/8283373
return session.get_credentials().get_frozen_credentials()
[docs] def expand_role(self, role):
"""
If the IAM role is a role name, get the Amazon Resource Name (ARN) for the role.
If IAM role is already an IAM role ARN, no change is made.
:param role: IAM role name or ARN
:return: IAM role ARN
"""
if '/' in role:
return role
else:
return self.get_client_type('iam').get_role(RoleName=role)['Role']['Arn']