Amazon Elastic Container Service (ECS) Operators

Amazon Elastic Container Service (Amazon ECS) is a fully managed container orchestration service that makes it easy for you to deploy, manage, and scale containerized applications.

Airflow provides operators to run Task Definitions on an ECS cluster.

Prerequisite Tasks

To use these operators, you must do a few things:

  • You will need to have created your ECS Cluster, and have created a Task Definition before you can use this Operator. The Task Definition contains details of the containerized application you want to run.

Overview

To run a Task Definition defined in an Amazon ECS cluster you can use EcsOperator.

This Operator support running your containers in ECS Clusters that are either Serverless (FARGATE), via EC2, or via external resources (EXTERNAL). The parameters you need to configure for this Operator will depend upon which launch_type you want to use.

Launch Types

launch_type="EC2|FARGATE|EXTERNAL"
  • If you are using AWS Fargate as your compute resource in your ECS Cluster, set the parameter launch_type to FARGATE. When using a launch type of FARGATE you will need to provide network_configuration parameters.

  • If you are using EC2 as the compute resources in your ECS Cluster, set the parameter to EC2.

  • If you have integrated external resources in your ECS Cluster, for example using ECS Anywhere, and want to run your containers on those external resources, set the parameter to EXTERNAL.

airflow/providers/amazon/aws/example_dags/example_ecs_ec2.py[source]

    hello_world = EcsOperator(
        task_id="hello_world",
        cluster=os.environ.get("CLUSTER_NAME", "existing_cluster_name"),
        task_definition=os.environ.get("TASK_DEFINITION", "existing_task_definition_name"),
        launch_type="EXTERNAL|EC2",
        aws_conn_id="aws_ecs",
        overrides={
            "containerOverrides": [
                {
                    "name": "hello-world-container",
                    "command": ["echo", "hello", "world"],
                },
            ],
        },
        tags={
            "Customer": "X",
            "Project": "Y",
            "Application": "Z",
            "Version": "0.0.1",
            "Environment": "Development",
        },
        awslogs_group="/ecs/hello-world",
        awslogs_region="aws-region",
        awslogs_stream_prefix="ecs/hello-world-container"
    )

airflow/providers/amazon/aws/example_dags/example_ecs_fargate.py[source]

    hello_world = EcsOperator(
        task_id="hello_world",
        cluster=os.environ.get("CLUSTER_NAME", "existing_cluster_name"),
        task_definition=os.environ.get("TASK_DEFINITION", "existing_task_definition_name"),
        launch_type="FARGATE",
        aws_conn_id="aws_ecs",
        overrides={
            "containerOverrides": [
                {
                    "name": "hello-world-container",
                    "command": ["echo", "hello", "world"],
                },
            ],
        },
        network_configuration={
            "awsvpcConfiguration": {
                "securityGroups": [os.environ.get("SECURITY_GROUP_ID", "sg-123abc")],
                "subnets": [os.environ.get("SUBNET_ID", "subnet-123456ab")],
            },
        },
        tags={
            "Customer": "X",
            "Project": "Y",
            "Application": "Z",
            "Version": "0.0.1",
            "Environment": "Development",
        },
        awslogs_group="/ecs/hello-world",
        awslogs_stream_prefix="prefix_b/hello-world-container",
    )

CloudWatch Logging

To stream logs to AWS CloudWatch, you need to define these parameters. Using the example Operators above, we would add these additional parameters to enable logging to CloudWatch. You will need to ensure that you have the appropriate level of permissions (see next section)

airflow/providers/amazon/aws/example_dags/example_ecs_ec2.py[source]

        awslogs_group="/ecs/hello-world",
        awslogs_region="aws-region",
        awslogs_stream_prefix="ecs/hello-world-container"

IAM Permissions

You will need to ensure you have the following IAM permissions to run Tasks via this Operator. In this example, the Operator will have permissions to run Tasks on an ECS Cluster called "cluster a" in a specific AWS region and account.

{
    "Effect": "Allow",
    "Action": [
        "ecs:RunTask",
        "ecs:DescribeTasks"
    ],
    "Resource": : [ "arn:aws:ecs:{aws region}:{aws account number}:cluster/{custer a}"
}

If you use the "reattach=True" (the default is False), you will need to add further permissions. You will need to add the following additional Actions to the IAM policy.

"ecs:DescribeTaskDefinition",
"ecs:ListTasks"

CloudWatch Permissions

If you plan on streaming Apache Airflow logs into AWS CloudWatch, you will need to ensure that you have configured the appropriate permissions set.

iam.PolicyStatement(
    actions=[
        "logs:CreateLogStream",
        "logs:CreateLogGroup",
        "logs:PutLogEvents",
        "logs:GetLogEvents",
        "logs:GetLogRecord",
        "logs:GetLogGroupFields",
        "logs:GetQueryResults"
    ],
    effect=iam.Effect.ALLOW,
    resources=[
        "arn:aws:logs:{aws region}:{aws account number}:log-group:{aws-log-group-name}:log-stream:{aws-log-stream-name}/\*"
        ]
)

More information

For further information, look at:

Was this entry helpful?