airflow.providers.amazon.aws.executors.ecs.ecs_executor

AWS ECS Executor.

Each Airflow task gets delegated out to an Amazon ECS Task.

Attributes

WorkloadKey

INVALID_CREDENTIALS_EXCEPTIONS

Classes

AwsEcsExecutor

Executes the provided Airflow command on an ECS instance.

Module Contents

type airflow.providers.amazon.aws.executors.ecs.ecs_executor.WorkloadKey = _EcsWorkloadKey[source]
airflow.providers.amazon.aws.executors.ecs.ecs_executor.INVALID_CREDENTIALS_EXCEPTIONS = ['ExpiredTokenException', 'InvalidClientTokenId', 'UnrecognizedClientException'][source]
class airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor(*args, **kwargs)[source]

Bases: airflow.executors.base_executor.BaseExecutor

Executes the provided Airflow command on an ECS instance.

The Airflow Scheduler creates a shell command, and passes it to the executor. This ECS Executor runs said Airflow command on a remote Amazon ECS Cluster with a task-definition configured to launch the same containers as the Scheduler. It then periodically checks in with the launched tasks (via task ARNs) to determine the status.

This allows individual tasks to specify CPU, memory, GPU, env variables, etc. When initializing a task, there’s an option for “executor config” which should be a dictionary with keys that match the ContainerOverride definition per AWS documentation (see link below).

Prerequisite: proper configuration of Boto3 library .. seealso:: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html for authentication and access-key management. You can store an environmental variable, setup aws config from console, or use IAM roles.

See also

https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerOverride.html for an Airflow TaskInstance’s executor_config.

supports_multi_team: bool = True[source]
supports_callbacks: bool = True[source]
DESCRIBE_TASKS_BATCH_SIZE = 99[source]
queued_tasks: dict[airflow.models.taskinstance.TaskInstanceKey, airflow.executors.workloads.All][source]
active_workers: airflow.providers.amazon.aws.executors.ecs.utils.EcsTaskCollection[source]
pending_workloads: collections.deque[source]
cluster[source]
container_name[source]
attempts_since_last_successful_connection = 0[source]
IS_BOTO_CONNECTION_HEALTHY = False[source]
run_task_kwargs[source]
max_run_task_attempts[source]
queue_workload(workload, session)[source]
start()[source]

Call this when the Executor is run for the first time by the scheduler.

check_health()[source]

Make a test API call to check the health of the ECS Executor.

Deliberately use an invalid task ID, some potential outcomes in order:
  1. AccessDeniedException is raised if there are insufficient permissions.

  2. ClusterNotFoundException is raised if permissions exist but the cluster does not.

  3. The API responds with a failure message if the cluster is found and there are permissions, but the cluster itself has issues.

  4. InvalidParameterException is raised if the permissions and cluster exist but the task does not.

The last one is considered a success state for the purposes of this check.

load_ecs_connection(check_connection=True)[source]
sync()[source]

Sync will get called periodically by the heartbeat method.

Executors should override this to perform gather statuses.

sync_running_workloads()[source]

Check and update state on all running workloads (tasks and callbacks).

attempt_workload_runs()[source]

Take tasks from the pending_workloads queue, and attempts to find an instance to run it on.

If the launch type is EC2, this will attempt to place tasks on empty EC2 instances. If

there are no EC2 instances available, no task is placed and this function will be called again in the next heart-beat.

If the launch type is FARGATE, this will run the tasks on new AWS Fargate instances.

execute_async(key, command, queue=None, executor_config=None)[source]

Save the workload to be executed in the next sync by inserting the commands into a queue.

end(heartbeat_interval=10)[source]

Wait for all currently running tasks to end, and don’t launch any tasks.

terminate()[source]

Kill all ECS processes by calling Boto3’s StopTask API.

get_container(container_list)[source]

Search task list for core Airflow container.

try_adopt_task_instances(tis)[source]

Adopt task instances which have an external_executor_id (the ECS task ARN).

Anything that is not adopted will be cleared by the scheduler and becomes eligible for re-scheduling.

property pending_tasks: collections.deque[source]

Use pending_workloads as pending_tasks is deprecated.

sync_running_tasks()[source]

Use sync_running_workloads as sync_running_tasks is deprecated.

attempt_task_runs()[source]

Use attempt_workload_runs as attempt_task_runs is deprecated.

Was this entry helpful?