Amazon Elastic Kubernetes Service (EKS)¶
Amazon Elastic Kubernetes Service (Amazon EKS) is a managed service that makes it easy for you to run Kubernetes on AWS without needing to stand up or maintain your own Kubernetes control plane. Kubernetes is an open-source system for automating the deployment, scaling, and management of containerized applications.
Airflow provides operators to create and interact with the EKS clusters and compute infrastructure.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Airflow®
Operators¶
Create an Amazon EKS cluster¶
To create an Amazon EKS Cluster you can use
EksCreateClusterOperator.
- Note: An AWS IAM role with the following permissions is required:
- eks.amazonaws.commust be added to the Trusted Relationships- AmazonEKSClusterPolicyIAM Policy must be attached
tests/system/amazon/aws/example_eks_with_nodegroups.py
# Create an Amazon EKS Cluster control plane without attaching compute service.
create_cluster = EksCreateClusterOperator(
    task_id="create_cluster",
    cluster_name=cluster_name,
    cluster_role_arn=test_context[ROLE_ARN_KEY],
    resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
    compute=None,
)
Create an Amazon EKS cluster and node group in one step¶
To create an Amazon EKS cluster and an EKS managed node group in one command, you can use
EksCreateClusterOperator.
- Note: An AWS IAM role with the following permissions is required:
- ec2.amazon.aws.commust be in the Trusted Relationships- eks.amazonaws.commust be added to the Trusted Relationships- AmazonEC2ContainerRegistryReadOnlyIAM Policy must be attached- AmazonEKSClusterPolicyIAM Policy must be attached- AmazonEKSWorkerNodePolicyIAM Policy must be attached
tests/system/amazon/aws/example_eks_with_nodegroup_in_one_step.py
# Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
create_cluster_and_nodegroup = EksCreateClusterOperator(
    task_id="create_cluster_and_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
    cluster_role_arn=test_context[ROLE_ARN_KEY],
    # Opting to use the same ARN for the cluster and the nodegroup here,
    # but a different ARN could be configured and passed if desired.
    nodegroup_role_arn=test_context[ROLE_ARN_KEY],
    resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
    # ``compute='nodegroup'`` is the default, explicitly set here for demo purposes.
    compute="nodegroup",
    # The launch template enforces IMDSv2 and is required for internal
    # compliance when running these system tests on AWS infrastructure.
    create_nodegroup_kwargs={"launchTemplate": {"name": launch_template_name}},
)
Create an Amazon EKS cluster and AWS Fargate profile in one step¶
To create an Amazon EKS cluster and an AWS Fargate profile in one command, you can use
EksCreateClusterOperator.
You can also run this operator in deferrable mode by setting deferrable param to True.
- Note: An AWS IAM role with the following permissions is required:
- ec2.amazon.aws.commust be in the Trusted Relationships- eks.amazonaws.commust be added to the Trusted Relationships- AmazonEC2ContainerRegistryReadOnlyIAM Policy must be attached- AmazonEKSClusterPolicyIAM Policy must be attached- AmazonEKSWorkerNodePolicyIAM Policy must be attached
tests/system/amazon/aws/example_eks_with_fargate_in_one_step.py
# Create an Amazon EKS cluster control plane and an AWS Fargate compute platform in one step.
create_cluster_and_fargate_profile = EksCreateClusterOperator(
    task_id="create_eks_cluster_and_fargate_profile",
    cluster_name=cluster_name,
    cluster_role_arn=cluster_role_arn,
    resources_vpc_config={
        "subnetIds": subnets,
        "endpointPublicAccess": True,
        "endpointPrivateAccess": False,
    },
    compute="fargate",
    fargate_profile_name=fargate_profile_name,
    # Opting to use the same ARN for the cluster and the pod here,
    # but a different ARN could be configured and passed if desired.
    fargate_pod_execution_role_arn=fargate_pod_role_arn,
)
Delete an Amazon EKS Cluster¶
To delete an existing Amazon EKS Cluster you can use
EksDeleteClusterOperator.
You can also run this operator in deferrable mode by setting deferrable param to True.
tests/system/amazon/aws/example_eks_with_nodegroups.py
delete_cluster = EksDeleteClusterOperator(
    task_id="delete_cluster",
    cluster_name=cluster_name,
)
- Note: If the cluster has any attached resources, such as an Amazon EKS Nodegroup or AWS
- Fargate profile, the cluster can not be deleted. Using the - forceparameter will attempt to delete any attached resources first.
tests/system/amazon/aws/example_eks_with_nodegroup_in_one_step.py
# An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
delete_nodegroup_and_cluster = EksDeleteClusterOperator(
    task_id="delete_nodegroup_and_cluster",
    cluster_name=cluster_name,
    force_delete_compute=True,
)
Create an Amazon EKS managed node group¶
To create an Amazon EKS managed node group you can use
EksCreateNodegroupOperator.
You can also run this operator in deferrable mode by setting deferrable param to True.
- Note: An AWS IAM role with the following permissions is required:
- ec2.amazon.aws.commust be in the Trusted Relationships- AmazonEC2ContainerRegistryReadOnlyIAM Policy must be attached- AmazonEKSWorkerNodePolicyIAM Policy must be attached
tests/system/amazon/aws/example_eks_with_nodegroups.py
create_nodegroup = EksCreateNodegroupOperator(
    task_id="create_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
    nodegroup_subnets=test_context[SUBNETS_KEY],
    nodegroup_role_arn=test_context[ROLE_ARN_KEY],
)
Delete an Amazon EKS managed node group¶
To delete an existing Amazon EKS managed node group you can use
EksDeleteNodegroupOperator.
You can also run this operator in deferrable mode by setting deferrable param to True.
tests/system/amazon/aws/example_eks_with_nodegroups.py
delete_nodegroup = EksDeleteNodegroupOperator(
    task_id="delete_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
)
Create an AWS Fargate Profile¶
To create an AWS Fargate Profile you can use
EksCreateFargateProfileOperator.
- Note: An AWS IAM role with the following permissions is required:
- ec2.amazon.aws.commust be in the Trusted Relationships- AmazonEC2ContainerRegistryReadOnlyIAM Policy must be attached- AmazonEKSWorkerNodePolicyIAM Policy must be attached
tests/system/amazon/aws/example_eks_with_fargate_profile.py
create_fargate_profile = EksCreateFargateProfileOperator(
    task_id="create_eks_fargate_profile",
    cluster_name=cluster_name,
    pod_execution_role_arn=fargate_pod_role_arn,
    fargate_profile_name=fargate_profile_name,
    selectors=SELECTORS,
)
Delete an AWS Fargate Profile¶
To delete an existing AWS Fargate Profile you can use
EksDeleteFargateProfileOperator.
tests/system/amazon/aws/example_eks_with_fargate_profile.py
delete_fargate_profile = EksDeleteFargateProfileOperator(
    task_id="delete_eks_fargate_profile",
    cluster_name=cluster_name,
    fargate_profile_name=fargate_profile_name,
)
Perform a Task on an Amazon EKS Cluster¶
To run a pod on an existing Amazon EKS Cluster, you can use
EksPodOperator.
Note: An Amazon EKS Cluster with underlying compute infrastructure is required.
tests/system/amazon/aws/example_eks_with_nodegroups.py
start_pod = EksPodOperator(
    task_id="start_pod",
    pod_name="test_pod",
    cluster_name=cluster_name,
    image="amazon/aws-cli:latest",
    cmds=["sh", "-c", "echo Test Airflow; date"],
    labels={"demo": "hello_world"},
    get_logs=True,
    on_finish_action="keep_pod",
)
Sensors¶
Wait on an Amazon EKS cluster state¶
To check the state of an Amazon EKS Cluster until it reaches the target state or another terminal
state you can use EksClusterStateSensor.
tests/system/amazon/aws/example_eks_with_nodegroups.py
await_create_cluster = EksClusterStateSensor(
    task_id="await_create_cluster",
    cluster_name=cluster_name,
    target_state=ClusterStates.ACTIVE,
)
Wait on an Amazon EKS managed node group state¶
To check the state of an Amazon EKS managed node group until it reaches the target state or another terminal
state you can use EksNodegroupStateSensor.
tests/system/amazon/aws/example_eks_with_nodegroups.py
await_create_nodegroup = EksNodegroupStateSensor(
    task_id="await_create_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
    target_state=NodegroupStates.ACTIVE,
)
Wait on an AWS Fargate profile state¶
To check the state of an AWS Fargate profile until it reaches the target state or another terminal
state you can use EksFargateProfileSensor.
tests/system/amazon/aws/example_eks_with_fargate_profile.py
await_create_fargate_profile = EksFargateProfileStateSensor(
    task_id="wait_for_create_fargate_profile",
    cluster_name=cluster_name,
    fargate_profile_name=fargate_profile_name,
    target_state=FargateProfileStates.ACTIVE,
)