Source code for airflow.ti_deps.deps.prev_dagrun_dep

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.utils.db import provide_session
from airflow.utils.state import State


[docs]class PrevDagrunDep(BaseTIDep): """ Is the past dagrun in a state that allows this task instance to run, e.g. did this task instance's task in the previous dagrun complete if we are depending on past. """ NAME = "Previous Dagrun State" IGNOREABLE = True IS_TASK_DEP = True @provide_session def _get_dep_statuses(self, ti, session, dep_context): if dep_context.ignore_depends_on_past: yield self._passing_status( reason="The context specified that the state of past DAGs could be " "ignored.") return if not ti.task.depends_on_past: yield self._passing_status( reason="The task did not have depends_on_past set.") return # Don't depend on the previous task instance if we are the first task dag = ti.task.dag if dag.catchup: if dag.previous_schedule(ti.execution_date) is None: yield self._passing_status( reason="This task does not have a schedule or is @once" ) return if dag.previous_schedule(ti.execution_date) < ti.task.start_date: yield self._passing_status( reason="This task instance was the first task instance for its task.") return else: dr = ti.get_dagrun() last_dagrun = dr.get_previous_dagrun() if dr else None if not last_dagrun: yield self._passing_status( reason="This task instance was the first task instance for its task.") return previous_ti = ti.previous_ti if not previous_ti: yield self._failing_status( reason="depends_on_past is true for this task's DAG, but the previous " "task instance has not run yet.") return if previous_ti.state not in {State.SKIPPED, State.SUCCESS}: yield self._failing_status( reason="depends_on_past is true for this task, but the previous task " "instance {0} is in the state '{1}' which is not a successful " "state.".format(previous_ti, previous_ti.state)) previous_ti.task = ti.task if (ti.task.wait_for_downstream and not previous_ti.are_dependents_done(session=session)): yield self._failing_status( reason="The tasks downstream of the previous task instance {0} haven't " "completed.".format(previous_ti))