Listeners¶
You can write listeners to enable Airflow to notify you when events happen. Pluggy powers these listeners.
Warning
Listeners are an advanced feature of Airflow. They are not isolated from the Airflow components they run in, and can slow down or in come cases take down your Airflow instance. As such, extra care should be taken when writing listeners.
Airflow supports notifications for the following events:
Lifecycle Events¶
on_startingbefore_stopping
Lifecycle events allow you to react to start and stop events for an Airflow Job, like  SchedulerJob or BackfillJob.
DagRun State Change Events¶
DagRun state change events occur when a DagRun changes state.
on_dag_run_running
@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to RUNNING.
    """
    print("Dag run  in running state")
    queued_at = dag_run.queued_at
    dag_hash_info = dag_run.dag_hash
    print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}")
on_dag_run_success
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to SUCCESS.
    """
    print("Dag run in success state")
    start_date = dag_run.start_date
    end_date = dag_run.end_date
    print(f"Dag run start:{start_date} end:{end_date}")
on_dag_run_failed
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("Dag run  in failure state")
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    external_trigger = dag_run.external_trigger
    print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
    print(f"Failed with message: {msg}")
TaskInstance State Change Events¶
TaskInstance state change events occur when a TaskInstance changes state.
You can use these events to react to LocalTaskJob state changes.
on_task_instance_running
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
    """
    This method is called when task state changes to RUNNING.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that is running its dag_run,
    task and dag information.
    """
    print("Task instance is in running state")
    print(" Previous state of the Task instance:", previous_state)
    state: TaskInstanceState = task_instance.state
    name: str = task_instance.task_id
    start_date = task_instance.start_date
    dagrun = task_instance.dag_run
    dagrun_status = dagrun.state
    task = task_instance.task
    if TYPE_CHECKING:
        assert task
    dag = task.dag
    dag_name = None
    if dag:
        dag_name = dag.dag_id
    print(f"Current task name:{name} state:{state} start_date:{start_date}")
    print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")
on_task_instance_success
@hookimpl
def on_task_instance_success(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
    """
    This method is called when task state changes to SUCCESS.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has succeeded its
    dag_run, task and dag information.
    """
    print("Task instance in success state")
    print(" Previous state of the Task instance:", previous_state)
    dag_id = task_instance.dag_id
    hostname = task_instance.hostname
    operator = task_instance.operator
    dagrun = task_instance.dag_run
    queued_at = dagrun.queued_at
    print(f"Dag name:{dag_id} queued_at:{queued_at}")
    print(f"Task hostname:{hostname} operator:{operator}")
on_task_instance_failed
@hookimpl
def on_task_instance_failed(
    previous_state: TaskInstanceState, task_instance: TaskInstance, error: None | str | BaseException, session
):
    """
    This method is called when task state changes to FAILED.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has failed its dag_run,
    task and dag information.
    """
    print("Task instance in failure state")
    start_date = task_instance.start_date
    end_date = task_instance.end_date
    duration = task_instance.duration
    dagrun = task_instance.dag_run
    task = task_instance.task
    if TYPE_CHECKING:
        assert task
    dag = task.dag
    print(f"Task start:{start_date} end:{end_date} duration:{duration}")
    print(f"Task:{task} dag:{dag} dagrun:{dagrun}")
    if error:
        print(f"Failure caused by {error}")
Dataset Events¶
on_dataset_createdon_dataset_changed
Dataset events occur when Dataset management operations are run.
Dag Import Error Events¶
on_new_dag_import_erroron_existing_dag_import_error
Dag import error events occur when dag processor finds import error in the Dag code and update the metadata database table.
This is an experimental feature.
Usage¶
To create a listener:
import
airflow.listeners.hookimplimplement the
hookimplsfor events that you’d like to generate notifications
Airflow defines the specification as hookspec. Your implementation must accept the same named parameters as defined in hookspec. If you don’t use the same parameters as hookspec, Pluggy throws an error when you try to use your plugin. But you don’t need to implement every method. Many listeners only implement one method, or a subset of methods.
To include the listener in your Airflow installation, include it as a part of an Airflow Plugin.
Listener API is meant to be called across all DAGs and all operators. You can’t listen to events generated by specific DAGs. For that behavior, try methods like on_success_callback and pre_execute. These provide callbacks for particular DAG authors or operator creators. The logs and print() calls will be handled as part of the listeners.
Compatibility note¶
The listeners interface might change over time. We are using pluggy specifications which
means that implementation of the listeners written for older versions of the interface should be
forward-compatible with future versions of Airflow.
However, the opposite is not guaranteed, so if your listener is implemented against a newer version of the interface, it might not work with older versions of Airflow. It is not a problem if you target single version of Airflow, because you can adjust your implementation to the version of Airflow you use, but it is important if you are writing plugins or extensions that could be used with different versions of Airflow.
For example if a new field is added to the interface (like the error field in the
on_task_instance_failed method in 2.10.0), the listener implementation will not handle the case when
the field is not present in the event object and such listeners will only work for Airflow 2.10.0 and later.
In order to implement a listener that is compatible with multiple versions of Airflow including using features and fields added in newer versions of Airflow, you should check version of Airflow used and use newer version of the interface implementation, but for older versions of Airflow you should use older version of the interface.
For example if you want to implement a listener that uses the error field in the
on_task_instance_failed, you should use code like this:
from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl
airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):
    class ClassBasedListener:
        ...
        @hookimpl
        def on_task_instance_failed(
            self, previous_state, task_instance, error: None | str | BaseException, session
        ):
            # Handle error case here
            pass
else:
    class ClassBasedListener:  # type: ignore[no-redef]
        ...
        @hookimpl
        def on_task_instance_failed(self, previous_state, task_instance, session):
            # Handle no error case here
            pass
List of changes in the listener interfaces since 2.8.0 when they were introduced:
Airflow Version  | 
Affected method  | 
Change  | 
|---|---|---|
2.10.0  | 
  | 
An error field added to the interface  |