Source code for airflow.providers.amazon.aws.executors.ecs.boto_schema

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
AWS ECS Executor Boto Schema.

Schemas for easily and consistently parsing boto responses.
"""

from __future__ import annotations

from marshmallow import EXCLUDE, Schema, fields, post_load


[docs]class BotoContainerSchema(Schema): """ Botocore Serialization Object for ECS ``Container`` shape. Note that there are many more parameters, but the executor only needs the members listed below. """
[docs] exit_code = fields.Integer(data_key="exitCode")
[docs] last_status = fields.String(data_key="lastStatus")
[docs] name = fields.String(required=True)
[docs] reason = fields.String()
[docs] container_arn = fields.String(data_key="containerArn")
[docs] class Meta: """Options object for a Schema. See Schema.Meta for more details and valid values."""
[docs] unknown = EXCLUDE
[docs]class BotoTaskSchema(Schema): """ Botocore Serialization Object for ECS ``Task`` shape. Note that there are many more parameters, but the executor only needs the members listed below. """
[docs] task_arn = fields.String(data_key="taskArn", required=True)
[docs] last_status = fields.String(data_key="lastStatus", required=True)
[docs] desired_status = fields.String(data_key="desiredStatus", required=True)
[docs] containers = fields.List(fields.Nested(BotoContainerSchema), required=True)
[docs] started_at = fields.Field(data_key="startedAt")
[docs] stopped_reason = fields.String(data_key="stoppedReason")
@post_load
[docs] def make_task(self, data, **kwargs): """Overwrite marshmallow load() to return an EcsExecutorTask instance instead of a dictionary.""" # Imported here to avoid circular import. from airflow.providers.amazon.aws.executors.ecs.utils import EcsExecutorTask return EcsExecutorTask(**data)
[docs] class Meta: """Options object for a Schema. See Schema.Meta for more details and valid values."""
[docs] unknown = EXCLUDE
[docs]class BotoFailureSchema(Schema): """Botocore Serialization Object for ECS ``Failure`` Shape."""
[docs] arn = fields.String()
[docs] reason = fields.String()
[docs] class Meta: """Options object for a Schema. See Schema.Meta for more details and valid values."""
[docs] unknown = EXCLUDE
[docs]class BotoRunTaskSchema(Schema): """Botocore Serialization Object for ECS ``RunTask`` Operation output."""
[docs] tasks = fields.List(fields.Nested(BotoTaskSchema), required=True)
[docs] failures = fields.List(fields.Nested(BotoFailureSchema), required=True)
[docs] class Meta: """Options object for a Schema. See Schema.Meta for more details and valid values."""
[docs] unknown = EXCLUDE
[docs]class BotoDescribeTasksSchema(Schema): """Botocore Serialization Object for ECS ``DescribeTask`` Operation output."""
[docs] tasks = fields.List(fields.Nested(BotoTaskSchema), required=True)
[docs] failures = fields.List(fields.Nested(BotoFailureSchema), required=True)
[docs] class Meta: """Options object for a Schema. See Schema.Meta for more details and valid values."""
[docs] unknown = EXCLUDE

Was this entry helpful?