Source code for airflow.models.taskfail

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Taskfail tracks the failed run durations of each task instance"""
from __future__ import annotations

from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, text
from sqlalchemy.orm import relationship

from airflow.models.base import Base, StringID
from airflow.utils.sqlalchemy import UtcDateTime

[docs]class TaskFail(Base): """TaskFail tracks the failed run durations of each task instance."""
[docs] __tablename__ = "task_fail"
[docs] id = Column(Integer, primary_key=True)
[docs] task_id = Column(StringID(), nullable=False)
[docs] dag_id = Column(StringID(), nullable=False)
[docs] run_id = Column(StringID(), nullable=False)
[docs] map_index = Column(Integer, nullable=False, server_default=text('-1'))
[docs] start_date = Column(UtcDateTime)
[docs] end_date = Column(UtcDateTime)
[docs] duration = Column(Integer)
[docs] __table_args__ = ( Index("idx_task_fail_task_instance", dag_id, task_id, run_id, map_index), ForeignKeyConstraint( [dag_id, task_id, run_id, map_index], [ "task_instance.dag_id", "task_instance.task_id", "task_instance.run_id", "task_instance.map_index", ], name='task_fail_ti_fkey', ondelete="CASCADE",
), ) # We don't need a DB level FK here, as we already have that to TI (which has one to DR) but by defining # the relationship we can more easily find the execution date for these rows
[docs] dag_run = relationship( "DagRun", primaryjoin="""and_( TaskFail.dag_id == foreign(DagRun.dag_id), TaskFail.run_id == foreign(DagRun.run_id), )""", viewonly=True,
) def __init__(self, ti): self.dag_id = ti.dag_id self.task_id = ti.task_id self.run_id = ti.run_id self.map_index = ti.map_index self.start_date = ti.start_date self.end_date = ti.end_date if self.end_date and self.start_date: self.duration = int((self.end_date - self.start_date).total_seconds()) else: self.duration = None
[docs] def __repr__(self): prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.run_id}" if self.map_index != -1: prefix += f" map_index={self.map_index}" return prefix + '>'

Was this entry helpful?