Source code for airflow.contrib.hooks.aws_hook

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import boto3
import configparser
import logging

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook


def _parse_s3_config(config_file_name, config_format='boto', profile=None):
    """
    Parses a config file for s3 credentials. Can currently
    parse boto, s3cmd.conf and AWS SDK config formats

    :param config_file_name: path to the config file
    :type config_file_name: str
    :param config_format: config type. One of "boto", "s3cmd" or "aws".
        Defaults to "boto"
    :type config_format: str
    :param profile: profile name in AWS type config file
    :type profile: str
    """
    config = configparser.ConfigParser()
    if config.read(config_file_name):  # pragma: no cover
        sections = config.sections()
    else:
        raise AirflowException("Couldn't read {0}".format(config_file_name))
    # Setting option names depending on file format
    if config_format is None:
        config_format = 'boto'
    conf_format = config_format.lower()
    if conf_format == 'boto':  # pragma: no cover
        if profile is not None and 'profile ' + profile in sections:
            cred_section = 'profile ' + profile
        else:
            cred_section = 'Credentials'
    elif conf_format == 'aws' and profile is not None:
        cred_section = profile
    else:
        cred_section = 'default'
    # Option names
    if conf_format in ('boto', 'aws'):  # pragma: no cover
        key_id_option = 'aws_access_key_id'
        secret_key_option = 'aws_secret_access_key'
        # security_token_option = 'aws_security_token'
    else:
        key_id_option = 'access_key'
        secret_key_option = 'secret_key'
    # Actual Parsing
    if cred_section not in sections:
        raise AirflowException("This config file format is not recognized")
    else:
        try:
            access_key = config.get(cred_section, key_id_option)
            secret_key = config.get(cred_section, secret_key_option)
        except Exception:
            logging.warning("Option Error in parsing s3 config file")
            raise
        return access_key, secret_key


[docs]class AwsHook(BaseHook): """ Interact with AWS. This class is a thin wrapper around the boto3 python library. """ def __init__(self, aws_conn_id='aws_default'): self.aws_conn_id = aws_conn_id def _get_credentials(self, region_name): aws_access_key_id = None aws_secret_access_key = None aws_session_token = None endpoint_url = None if self.aws_conn_id: try: connection_object = self.get_connection(self.aws_conn_id) extra_config = connection_object.extra_dejson if connection_object.login: aws_access_key_id = connection_object.login aws_secret_access_key = connection_object.password elif 'aws_secret_access_key' in extra_config: aws_access_key_id = extra_config[ 'aws_access_key_id'] aws_secret_access_key = extra_config[ 'aws_secret_access_key'] elif 's3_config_file' in extra_config: aws_access_key_id, aws_secret_access_key = \ _parse_s3_config( extra_config['s3_config_file'], extra_config.get('s3_config_format'), extra_config.get('profile')) if region_name is None: region_name = extra_config.get('region_name') role_arn = extra_config.get('role_arn') external_id = extra_config.get('external_id') aws_account_id = extra_config.get('aws_account_id') aws_iam_role = extra_config.get('aws_iam_role') if role_arn is None and aws_account_id is not None and \ aws_iam_role is not None: role_arn = "arn:aws:iam::{}:role/{}" \ .format(aws_account_id, aws_iam_role) if role_arn is not None: sts_session = boto3.session.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name) sts_client = sts_session.client('sts') if external_id is None: sts_response = sts_client.assume_role( RoleArn=role_arn, RoleSessionName='Airflow_' + self.aws_conn_id) else: sts_response = sts_client.assume_role( RoleArn=role_arn, RoleSessionName='Airflow_' + self.aws_conn_id, ExternalId=external_id) credentials = sts_response['Credentials'] aws_access_key_id = credentials['AccessKeyId'] aws_secret_access_key = credentials['SecretAccessKey'] aws_session_token = credentials['SessionToken'] endpoint_url = extra_config.get('host') except AirflowException: # No connection found: fallback on boto3 credential strategy # http://boto3.readthedocs.io/en/latest/guide/configuration.html pass return boto3.session.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token, region_name=region_name), endpoint_url def get_client_type(self, client_type, region_name=None, config=None): session, endpoint_url = self._get_credentials(region_name) return session.client(client_type, endpoint_url=endpoint_url, config=config) def get_resource_type(self, resource_type, region_name=None, config=None): session, endpoint_url = self._get_credentials(region_name) return session.resource(resource_type, endpoint_url=endpoint_url, config=config)
[docs] def get_session(self, region_name=None): """Get the underlying boto3.session.""" session, _ = self._get_credentials(region_name) return session
[docs] def get_credentials(self, region_name=None): """Get the underlying `botocore.Credentials` object. This contains the following authentication attributes: access_key, secret_key and token. """ session, _ = self._get_credentials(region_name) # Credentials are refreshable, so accessing your access key and # secret key separately can lead to a race condition. # See https://stackoverflow.com/a/36291428/8283373 return session.get_credentials().get_frozen_credentials()
[docs] def expand_role(self, role): """ If the IAM role is a role name, get the Amazon Resource Name (ARN) for the role. If IAM role is already an IAM role ARN, no change is made. :param role: IAM role name or ARN :return: IAM role ARN """ if '/' in role: return role else: return self.get_client_type('iam').get_role(RoleName=role)['Role']['Arn']