Source code for airflow.providers.amazon.aws.executors.ecs.boto_schema
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
AWS ECS Executor Boto Schema.
Schemas for easily and consistently parsing boto responses.
"""
from __future__ import annotations
from marshmallow import EXCLUDE, Schema, fields, post_load
[docs]class BotoContainerSchema(Schema):
"""
Botocore Serialization Object for ECS ``Container`` shape.
Note that there are many more parameters, but the executor only needs the members listed below.
"""
[docs] exit_code = fields.Integer(data_key="exitCode")
[docs] last_status = fields.String(data_key="lastStatus")
[docs] name = fields.String(required=True)
[docs] reason = fields.String()
[docs] container_arn = fields.String(data_key="containerArn")
[docs]class BotoTaskSchema(Schema):
"""
Botocore Serialization Object for ECS ``Task`` shape.
Note that there are many more parameters, but the executor only needs the members listed below.
"""
[docs] task_arn = fields.String(data_key="taskArn", required=True)
[docs] last_status = fields.String(data_key="lastStatus", required=True)
[docs] desired_status = fields.String(data_key="desiredStatus", required=True)
[docs] containers = fields.List(fields.Nested(BotoContainerSchema), required=True)
[docs] started_at = fields.Field(data_key="startedAt")
[docs] stopped_reason = fields.String(data_key="stoppedReason")
@post_load
[docs] def make_task(self, data, **kwargs):
"""Overwrite marshmallow load() to return an EcsExecutorTask instance instead of a dictionary."""
# Imported here to avoid circular import.
from airflow.providers.amazon.aws.executors.ecs.utils import EcsExecutorTask
return EcsExecutorTask(**data)
[docs]class BotoFailureSchema(Schema):
"""Botocore Serialization Object for ECS ``Failure`` Shape."""
[docs] reason = fields.String()
[docs]class BotoRunTaskSchema(Schema):
"""Botocore Serialization Object for ECS ``RunTask`` Operation output."""
[docs] tasks = fields.List(fields.Nested(BotoTaskSchema), required=True)
[docs] failures = fields.List(fields.Nested(BotoFailureSchema), required=True)
[docs]class BotoDescribeTasksSchema(Schema):
"""Botocore Serialization Object for ECS ``DescribeTask`` Operation output."""
[docs] tasks = fields.List(fields.Nested(BotoTaskSchema), required=True)
[docs] failures = fields.List(fields.Nested(BotoFailureSchema), required=True)