Source code for airflow.models.taskfail
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Taskfail tracks the failed run durations of each task instance"""
from __future__ import annotations
from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, text
from sqlalchemy.orm import relationship
from airflow.models.base import Base, StringID
from airflow.utils.sqlalchemy import UtcDateTime
[docs]class TaskFail(Base):
    """TaskFail tracks the failed run durations of each task instance."""
[docs]    __tablename__ = "task_fail" 
[docs]    id = Column(Integer, primary_key=True) 
[docs]    task_id = Column(StringID(), nullable=False) 
[docs]    dag_id = Column(StringID(), nullable=False) 
[docs]    run_id = Column(StringID(), nullable=False) 
[docs]    map_index = Column(Integer, nullable=False, server_default=text("-1")) 
[docs]    start_date = Column(UtcDateTime) 
[docs]    end_date = Column(UtcDateTime) 
[docs]    duration = Column(Integer) 
[docs]    __table_args__ = (
        Index("idx_task_fail_task_instance", dag_id, task_id, run_id, map_index),
        ForeignKeyConstraint(
            [dag_id, task_id, run_id, map_index],
            [
                "task_instance.dag_id",
                "task_instance.task_id",
                "task_instance.run_id",
                "task_instance.map_index",
            ],
            name="task_fail_ti_fkey",
            ondelete="CASCADE", 
        ),
    )
    # We don't need a DB level FK here, as we already have that to TI (which has one to DR) but by defining
    # the relationship we can more easily find the execution date for these rows
[docs]    dag_run = relationship(
        "DagRun",
        primaryjoin="""and_(
            TaskFail.dag_id == foreign(DagRun.dag_id),
            TaskFail.run_id == foreign(DagRun.run_id),
        )""",
        viewonly=True, 
    )
    def __init__(self, ti):
        self.dag_id = ti.dag_id
        self.task_id = ti.task_id
        self.run_id = ti.run_id
        self.map_index = ti.map_index
        self.start_date = ti.start_date
        self.end_date = ti.end_date
        if self.end_date and self.start_date:
            self.duration = int((self.end_date - self.start_date).total_seconds())
        else:
            self.duration = None
[docs]    def __repr__(self):
        prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.run_id}"
        if self.map_index != -1:
            prefix += f" map_index={self.map_index}"
        return prefix + ">"