Callbacks¶
A valuable component of logging and monitoring is the use of task callbacks to act upon changes in state of a given task, or across all tasks in a given DAG. For example, you may wish to alert when certain tasks have failed, or have the last task in your DAG invoke a callback when it succeeds.
Note
Callback functions are only invoked when the task state changes due to execution by a worker. As such, task changes set by the command line interface (CLI) or user interface (UI) do not execute callback functions.
Callback Types¶
There are four types of task events that can trigger a callback:
| Name | Description | 
|---|---|
| 
 | Invoked when the task succeeds | 
| 
 | Invoked when the task fails | 
| 
 | Invoked when a task misses its defined SLA | 
| 
 | Invoked when the task is up for retry | 
Example¶
In the following example, failures in any task call the task_failure_alert function, and success in the last task calls the dag_success_alert function:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")
with DAG(
    dag_id="example_callback",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = DummyOperator(task_id="task2")
    task3 = DummyOperator(task_id="task3", on_success_callback=dag_success_alert)
    task1 >> task2 >> task3