This module contains an operator to run downstream tasks only for the latest scheduled DagRun

Module Contents



Allows a workflow to skip tasks that are not running during the most

class airflow.operators.latest_only.LatestOnlyOperator(task_id: str, owner: str = conf.get('operators', 'DEFAULT_OWNER'), email: Optional[Union[str, Iterable[str]]] = None, email_on_retry: bool = conf.getboolean('email', 'default_email_on_retry', fallback=True), email_on_failure: bool = conf.getboolean('email', 'default_email_on_failure', fallback=True), retries: Optional[int] = conf.getint('core', 'default_task_retries', fallback=0), retry_delay: datetime.timedelta = timedelta(seconds=300), retry_exponential_backoff: bool = False, max_retry_delay: Optional[datetime.timedelta] = None, start_date: Optional[datetime.datetime] = None, end_date: Optional[datetime.datetime] = None, depends_on_past: bool = False, wait_for_downstream: bool = False, dag=None, params: Optional[Dict] = None, default_args: Optional[Dict] = None, priority_weight: int = 1, weight_rule: str = conf.get('core', 'default_task_weight_rule', fallback=WeightRule.DOWNSTREAM), queue: str = conf.get('operators', 'default_queue'), pool: Optional[str] = None, pool_slots: int = 1, sla: Optional[datetime.timedelta] = None, execution_timeout: Optional[datetime.timedelta] = None, on_execute_callback: Optional[TaskStateChangeCallback] = None, on_failure_callback: Optional[TaskStateChangeCallback] = None, on_success_callback: Optional[TaskStateChangeCallback] = None, on_retry_callback: Optional[TaskStateChangeCallback] = None, pre_execute: Optional[TaskPreExecuteHook] = None, post_execute: Optional[TaskPostExecuteHook] = None, trigger_rule: str = TriggerRule.ALL_SUCCESS, resources: Optional[Dict] = None, run_as_user: Optional[str] = None, task_concurrency: Optional[int] = None, max_active_tis_per_dag: Optional[int] = None, executor_config: Optional[Dict] = None, do_xcom_push: bool = True, inlets: Optional[Any] = None, outlets: Optional[Any] = None, task_group: Optional[airflow.utils.task_group.TaskGroup] = None, doc: Optional[str] = None, doc_md: Optional[str] = None, doc_json: Optional[str] = None, doc_yaml: Optional[str] = None, doc_rst: Optional[str] = None, **kwargs)[source]

Bases: airflow.operators.branch.BaseBranchOperator

Allows a workflow to skip tasks that are not running during the most recent schedule interval.

If the task is run outside of the latest schedule interval (i.e. external_trigger), all directly downstream tasks will be skipped.

Note that downstream tasks are never skipped if the given DAG_Run is marked as externally triggered.

ui_color = #e9ffdb[source]
choose_branch(self, context: Dict) Union[str, Iterable[str]][source]

Subclasses should implement this, running whatever logic is necessary to choose a branch and returning a task_id or list of task_ids.


context (dict) – Context dictionary as passed to execute()

Was this entry helpful?