Amazon Web Services Connection

The Amazon Web Services connection type enables the AWS Integrations.

Authenticating to AWS

Authentication may be performed using any of the boto3 options. Alternatively, one can pass credentials in as a Connection initialisation parameter.

To use IAM instance profile, create an “empty” connection (i.e. one with no AWS Access Key ID or AWS Secret Access Key specified, or aws://).

Default Connection IDs

The default connection ID is aws_default. If the environment/machine where you are running Airflow has the file credentials in /home/.aws/, and the default connection has user and pass fields empty, it will take automatically the credentials from there.

Note

Previously, the aws_default connection had the “extras” field set to {"region_name": "us-east-1"} on install. This means that by default the aws_default connection used the us-east-1 region. This is no longer the case and the region needs to be set manually, either in the connection screens in Airflow, or via the AWS_DEFAULT_REGION environment variable.

Configuring the Connection

AWS Access Key ID (optional)

Specify the AWS access key ID used for the initial connection. If you do an assume role by specifying a role_arn in the Extra field, then temporary credentials will be used for subsequent calls to AWS.

AWS Secret Access Key (optional)

Specify the AWS secret access key used for the initial connection. If you do an assume role by specifying a role_arn in the Extra field, then temporary credentials will be used for subsequent calls to AWS.

Extra (optional)

Specify the extra parameters (as json dictionary) that can be used in AWS connection. All parameters are optional.

The following extra parameters used to create an initial boto3.session.Session:

  • aws_access_key_id: AWS access key ID used for the initial connection.

  • aws_secret_access_key: AWS secret access key used for the initial connection

  • aws_session_token: AWS session token used for the initial connection if you use external credentials. You are responsible for renewing these.

  • region_name: AWS Region for the connection.

  • profile_name: The name of a profile to use listed in configuration and credential file settings.

The following extra parameters used for assume role:

  • role_arn: If specified, then assume this role, obtaining a set of temporary security credentials using the assume_role_method.

  • assume_role_method: AWS STS client method, one of assume_role, assume_role_with_saml or assume_role_with_web_identity if not specified then assume_role is used.

  • assume_role_kwargs: Additional kwargs passed to assume_role_method.

The following extra parameters used for boto3.client and boto3.resource:

  • config_kwargs: Additional kwargs used to construct a botocore.config.Config.

  • endpoint_url: Endpoint URL for the connection.

  • verify: Whether or not to verify SSL certificates.

Warning

Extra parameters below are deprecated and will be removed in a future version of this provider.

  • aws_account_id: Used to construct role_arn if it was not specified.

  • aws_iam_role: Used to construct role_arn if it was not specified.

  • external_id: A unique identifier that might be required when you assume a role in another account. Used if ExternalId in assume_role_kwargs was not specified.

  • s3_config_file: Path to local credentials file.

  • s3_config_format: s3_config_file format, one of aws, boto or s3cmd if not specified then boto is used.

  • profile: If you are getting your credentials from the s3_config_file you can specify the profile with this parameter.

  • host: Used as connection’s URL. Use endpoint_url instead.

  • session_kwargs: Additional kwargs passed to boto3.session.Session.

If you are configuring the connection via a URI, ensure that all components of the URI are URL-encoded.

Examples

Snippet to create Connection and convert to URI

import os
from airflow.models.connection import Connection


conn = Connection(
    conn_id="sample_aws_connection",
    conn_type="aws",
    login="AKIAIOSFODNN7EXAMPLE",  # Reference to AWS Access Key ID
    password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",  # Reference to AWS Secret Access Key
    extra={
        # Specify extra parameters here
        "region_name": "eu-central-1",
    },
)

# Generate Environment Variable Name and Connection URI
env_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}"
conn_uri = conn.get_uri()
print(f"{env_key}={conn_uri}")
# AIRFLOW_CONN_SAMPLE_AWS_CONNECTION=aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?region_name=eu-central-1

# Test connection
os.environ[env_key] = conn_uri
print(conn.test_connection())

Using instance profile

This will use boto’s default credential look-up chain (the profile named “default” from the ~/.boto/ config files, and instance profile when running inside AWS)

URI format example

export AIRFLOW_CONN_AWS_DEFAULT=aws://

JSON format example

export AIRFLOW_CONN_AWS_DEFAULT='{"conn_type": "aws"}'

With a AWS IAM key pair

URI format example

export AIRFLOW_CONN_AWS_DEFAULT=aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@

Note here, that the secret access key has been URL-encoded (changing / to %2F), and also the trailing @ (without which, it is treated as <host>:<port> and will not work)

JSON format example

export AIRFLOW_CONN_AWS_DEFAULT='{
  "conn_type": "aws",
  "login": "AKIAIOSFODNN7EXAMPLE",
  "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
}'

Examples for the Extra field

  1. Using ~/.aws/credentials and ~/.aws/config file, with a profile.

This assumes all other Connection fields eg AWS Access Key ID or AWS Secret Access Key are empty.

{
  "profile_name": "my_profile"
}
  1. Specifying a role_arn to assume and a region_name

{
  "role_arn": "arn:aws:iam::112223334444:role/my_role",
  "region_name": "ap-southeast-2"
}
  1. Configuring an outbound HTTP proxy

{
  "config_kwargs": {
    "proxies": {
      "http": "http://myproxy.mycompany.local:8080",
      "https": "http://myproxy.mycompany.local:8080"
    }
  }
}
  1. Using AssumeRoleWithSAML

{
  "region_name":"eu-west-1",
  "role_arn":"arn:aws:iam::112223334444:role/my_role",
  "assume_role_method":"assume_role_with_saml",
  "assume_role_with_saml":{
    "principal_arn":"arn:aws:iam::112223334444:saml-provider/my_saml_provider",
    "idp_url":"https://idp.mycompany.local/.../saml/clients/amazon-aws",
    "idp_auth_method":"http_spegno_auth",
    "mutual_authentication":"OPTIONAL",
    "idp_request_kwargs":{
      "headers":{"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"},
      "verify":false
    },
    "idp_request_retry_kwargs": {
      "total": 10,
      "backoff_factor":1,
      "status":10,
      "status_forcelist": [400, 429, 500, 502, 503, 504]
    },
    "log_idp_response":false,
    "saml_response_xpath":"////INPUT[@NAME='SAMLResponse']/@VALUE",
  },
  "assume_role_kwargs": { "something":"something" }
}

The following settings may be used within the assume_role_with_saml container in Extra.

  • principal_arn: The ARN of the SAML provider created in IAM that describes the identity provider.

  • idp_url: The URL to your IDP endpoint, which provides SAML Assertions.

  • idp_auth_method: Specify “http_spegno_auth” to use the Python requests_gssapi library. This library is more up to date than requests_kerberos and is backward compatible. See requests_gssapi documentation on PyPI.

  • mutual_authentication: Can be “REQUIRED”, “OPTIONAL” or “DISABLED”. See requests_gssapi documentation on PyPI.

  • idp_request_kwargs: Additional kwargs passed to requests when requesting from the IDP (over HTTP/S).

  • idp_request_retry_kwargs: Additional kwargs to construct a urllib3.util.Retry used as a retry strategy when requesting from the IDP.

  • log_idp_response: Useful for debugging - if specified, print the IDP response content to the log. Note that a successful response will contain sensitive information!

  • saml_response_xpath: How to query the IDP response using XML / HTML xpath.

  • assume_role_kwargs: Additional kwargs passed to sts_client.assume_role_with_saml.

Note

The requests_gssapi library is used to obtain a SAML response from your IDP. You may need to pip uninstall python-gssapi and pip install gssapi instead for this to work. The python-gssapi library is outdated, and conflicts with some versions of paramiko which Airflow uses elsewhere.

Avoid Throttling exceptions

Amazon Web Services have quota limits for simultaneous API call as result with frequent calls apache-airflow-providers-amazon components might fail during execution with a throttling exception, e.g. ThrottlingException, ProvisionedThroughputExceededException.

botocore.config.Config supports different exponential backoff modes out of the box: legacy, standard, adaptive

By default, botocore.config.Config uses legacy mode with 5 maximum retry attempts, which may not be enough in some cases.

If you encounter throttling exceptions, you may change the mode to standard with more retry attempts.

Set in Connection

Connection extra field:
{
  "config_kwargs": {
    "retries": {
      "mode": "standard",
      "max_attempts": 10
    }
  }
}

Set in AWS Config File

~/.aws/config:
[profile awesome_aws_profile]
retry_mode = standard
max_attempts = 10
Connection extra field:
{
  "profile_name": "awesome_aws_profile"
}

Set by Environment Variables

Note

This sets the retry mode on all connections, unless another retry config is explicitly set on a specific connection.

export AWS_RETRY_MODE=standard
export AWS_MAX_ATTEMPTS=10

Session Factory

The default BaseSessionFactory for the connection can handle most of the authentication methods for AWS. In the case that you would like to have full control of boto3 session creation or you are using custom federation that requires external process to source the credentials, you can subclass BaseSessionFactory and override create_session and/or _create_basic_session method depending on your needs.

You will also need to add configuration for AwsBaseHook to use the custom implementation by their full path.

Example

Configuration:
[aws]
session_factory = my_company.aws.MyCustomSessionFactory
Connection extra field:
{
  "federation": {
    "username": "my_username",
    "password": "my_password"
  }
}
Custom Session Factory:
def get_federated_aws_credentials(username: str, password: str):
    """
    Mock interaction with federation endpoint/process and returns AWS credentials.
    """
    return {
        "Version": 1,
        "AccessKeyId": "key",
        "SecretAccessKey": "secret",
        "SessionToken": "token",
        "Expiration": "2050-12-31T00:00:00.000Z",
    }


class MyCustomSessionFactory(BaseSessionFactory):
    @property
    def federated(self):
        return "federation" in self.extra_config

    def _create_basic_session(self, session_kwargs: dict[str, Any]) -> boto3.session.Session:
        if self.federated:
            return self._create_federated_session(session_kwargs)
        else:
            return super()._create_basic_session(session_kwargs)

    def _create_federated_session(self, session_kwargs: dict[str, Any]) -> boto3.session.Session:
        username = self.extra_config["federation"]["username"]
        region_name = self._get_region_name()
        self.log.debug(
            f"Creating federated session with username={username} region_name={region_name} for "
            f"connection {self.conn.conn_id}"
        )
        credentials = RefreshableCredentials.create_from_metadata(
            metadata=self._refresh_federated_credentials(),
            refresh_using=self._refresh_federated_credentials,
            method="custom-federation",
        )
        session = botocore.session.get_session()
        session._credentials = credentials
        session.set_config_variable("region", region_name)
        return boto3.session.Session(botocore_session=session, **session_kwargs)

    def _refresh_federated_credentials(self) -> dict[str, str]:
        self.log.debug("Refreshing federated AWS credentials")
        credentials = get_federated_aws_credentials(**self.extra_config["federation"])
        access_key_id = credentials["AccessKeyId"]
        expiry_time = credentials["Expiration"]
        self.log.info(
            f"New federated AWS credentials received with aws_access_key_id={access_key_id} and "
            f"expiry_time={expiry_time} for connection {self.conn.conn_id}"
        )
        return {
            "access_key": access_key_id,
            "secret_key": credentials["SecretAccessKey"],
            "token": credentials["SessionToken"],
            "expiry_time": expiry_time,
        }

Google Cloud to AWS authentication using Web Identity Federation

Thanks to Web Identity Federation, you can use the credentials from the Google Cloud platform to authorize access in the Amazon Web Service platform. If you additionally use authorizations with access token obtained from metadata server or Workload Identity, you can improve the security of your environment by eliminating long-lived credentials.

The Google Cloud credentials is exchanged for the Amazon Web Service temporary credentials by AWS Security Token Service.

The following diagram illustrates a typical communication flow used to obtain the AWS credentials.

../_images/aws-web-identity-federation-gcp.png

Communication Flow Diagram

Role setup

In order for a Google identity to be recognized by AWS, you must configure roles in AWS.

You can do it by using the role wizard or by using the Terraform.

Role wizard

To create an IAM role for web identity federation:

  1. Sign in to the AWS Management Console and open the IAM console at https://console.aws.amazon.com/iam/.

  2. In the navigation pane, choose Roles and then choose Create role.

  3. Choose the Web identity role type.

  4. For Identity provider, choose the Google.

  5. Type the service account email address (in the form <NAME>@<PROJECT_ID>.iam.gserviceaccount.com) into the Audience box.

  6. Review your web identity information and then choose Next: Permissions.

  7. Select the policy to use for the permissions policy or choose Create policy to open a new browser tab and create a new policy from scratch. For more information, see Creating IAM Policy.

  8. Choose Next: Tags.

  9. (Optional) Add metadata to the role by attaching tags as key–value pairs. For more information about using tags in IAM, see Tagging IAM users and roles.

  10. Choose Next: Review.

  11. For Role name, type a role name. Role names must be unique within your AWS account.

  12. (Optional) For Role description, type a description for the new role.

  13. Review the role and then choose Create role.

For more information, see: Creating a role for web identity or OpenID connect federation (console)

Finally, you should get a role that has a similar policy to the one below:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Federated": "accounts.google.com"
      },
      "Action": "sts:AssumeRoleWithWebIdentity",
      "Condition": {
        "StringEquals": {
          "accounts.google.com:aud": "<NAME>@<PROJECT_ID>.iam.gserviceaccount.com"
        }
      }
    }
  ]
}

In order to protect against the misuse of the Google OpenID token, you can also limit the scope of use by configuring restrictions per audience. You will need to configure the same value for the connection, and then this value also included in the ID Token. AWS will test if this value matches. For that, you can add a new condition to the policy.

{
  "Condition": {
    "StringEquals": {
      "accounts.google.com:aud": "<NAME>@<PROJECT_ID>.iam.gserviceaccount.com",
      "accounts.google.com:oaud": "service-amp.my-company.com"
    }
  }
}

After creating the role, you should configure the connection in Airflow.

Terraform

In order to quickly configure a new role, you can use the following Terraform script, which configures AWS roles along with the assigned policy. Before using it, you need correct the variables in the locals section to suit your environment:

  • google_service_account - The email address of the service account that will have permission to use this role

  • google_openid_audience - Constant value that is configured in the Airflow role and connection. It prevents misuse of the Google ID token.

  • aws_role_name - The name of the new AWS role.

  • aws_policy_name - The name of the new AWS policy.

For more information on using Terraform scripts, see: Terraform docs - Get started - AWS

After executing the plan, you should configure the connection in Airflow.

Connection setup

In order to use a Google identity, field "assume_role_method" must be "assume_role_with_web_identity" and field "assume_role_with_web_identity_federation" must be "google" in the extra section of the connection setup. It also requires that you set up roles in the "role_arn" field. Optionally, you can limit the use of the Google Open ID token by configuring the "assume_role_with_web_identity_federation_audience" field. The value of these fields must match the value configured in the role.

Airflow will establish Google’s credentials based on the Application Default Credentials.

Below is an example connection configuration.

{
  "role_arn": "arn:aws:iam::240057002457:role/WebIdentity-Role",
  "assume_role_method": "assume_role_with_web_identity",
  "assume_role_with_web_identity_federation": "google",
  "assume_role_with_web_identity_federation_audience": "service_a.apache.com"
}

You can configure connection, also using environmental variable AIRFLOW_CONN_{CONN_ID}.

export AIRFLOW_CONN_AWS_DEFAULT="aws://\
?role_arn=arn%3Aaws%3Aiam%3A%3A240057002457%3Arole%2FWebIdentity-Role&\
assume_role_method=assume_role_with_web_identity&\
assume_role_with_web_identity_federation=google&\
assume_role_with_web_identity_federation_audience=aaa.polidea.com"

Using IAM Roles for Service Accounts (IRSA) on EKS

If you are running Airflow on Amazon EKS, you can grant AWS related permission (such as S3 Read/Write for remote logging) to the Airflow service by granting the IAM role to it’s service account. IRSA provides fine-grained permission management for apps(e.g., pods) that run on EKS and use other AWS services. These could be apps that use S3, any other AWS services like Secrets Manager, CloudWatch, DynamoDB etc.

To activate this, the following steps must be followed:

  1. Create an IAM OIDC Provider on EKS cluster.

  2. Create an IAM Role and Policy to attach to the Airflow service account with web identity provider created at 1.

  3. Add the corresponding IAM Role to the Airflow service account as an annotation.

Then you can find AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE in environment variables of appropriate pods that Amazon EKS Pod Identity Web Hook added. Then boto3 will configure credentials using those variables. In order to use IRSA in Airflow, you have to create an aws connection with all fields empty. If a field such as role-arn is set, Airflow does not follow the boto3 default flow because it manually create a session using connection fields. If you did not change the default connection ID, an empty AWS connection named aws_default would be enough.

Create IAM Role for Service Account(IRSA) using eksctl

eksctl is a simple CLI tool for creating and managing clusters on EKS. Follow the steps to create IRSA for Airflow.

  1. Install eksctl in your local machine.

  2. Setup AWS credentials in your terminal to run eksctl commands.

  3. The IAM OIDC Provider is not enabled by default, you can use the following command to enable.

eksctl utils associate-iam-oidc-provider --cluster="<EKS_CLUSTER_ID>" --approve
  1. Replace EKS_CLUSTER_ID, SERVICE_ACCOUNT_NAME and NAMESPACE and execute the the following command. This command will use an existing EKS Cluster ID and create an IAM role, service account and namespace.

eksctl create iamserviceaccount --cluster="<EKS_CLUSTER_ID>" --name="<SERVICE_ACCOUNT_NAME>" --namespace="<NAMESPACE>" --attach-policy-arn="<IAM_POLICY_ARN>" --approve``

This is an example command with values. This example is using managed policy with full S3 permissions attached to the IAM role. We highly recommend you to create a restricted IAM policy with necessary permissions to S3, Secrets Manager, CloudWatch etc. and use it with --attach-policy-arn.

eksctl create iamserviceaccount --cluster=airflow-eks-cluster --name=airflow-sa --namespace=airflow --attach-policy-arn=arn:aws:iam::aws:policy/AmazonS3FullAccess --approve
  1. Use the service account name in Airflow Helm chart deployment or with Kubernetes Pod Operator.

Create IAM Role for Service Account(IRSA) using Terraform

For Terraform users, IRSA roles can be created using Amazon EKS Blueprints for Terraform module.

This module creates a new IAM Role, service account and namespace. This will associate IAM role with the service account and adds the annotation to the service account. You need to create an IAM policy with the required permissions that you would like the containers in your pods to have. Replace IAM_POLICY_ARN with your IAM policy ARN, other required inputs as shown below and run terraform apply.

module "airflow_irsa" {
  source = "github.com/aws-ia/terraform-aws-eks-blueprints//modules/irsa"

  eks_cluster_id             = "<EKS_CLUSTER_ID>"
  eks_oidc_provider_arn      = "<EKS_CLUSTER_OIDC_PROVIDER_ARN>"
  irsa_iam_policies          = ["<IAM_POLICY_ARN>"]
  kubernetes_namespace       = "<NAMESPACE>"
  kubernetes_service_account = "<SERVICE_ACCOUNT_NAME>"
}

Once the Terraform module is applied then you can use the service account in your Airflow deployments or with Kubernetes Pod Operator.

Was this entry helpful?