OpenLineage Airflow integration

OpenLineage is an open framework for data lineage collection and analysis. At its core it is an extensible specification that systems can use to interoperate with lineage metadata. Check out OpenLineage docs.

Quickstart

To instrument your Airflow instance with OpenLineage, see Using OpenLineage integration.

To implement OpenLineage support for Airflow Operators, see Implementing OpenLineage in Operators.

What’s in it for me ?

The metadata collected can answer questions like:

  • Why did specific data transformation fail?

  • What are the upstream sources feeding into certain dataset?

  • What downstream processes rely on this specific dataset?

  • Is my data fresh?

  • Can I identify the bottleneck in my data processing pipeline?

  • How did the latest code change affect data processing times?

  • How can I trace the cause of data inaccuracies in my report?

  • How are data privacy and compliance requirements being managed through the data’s lifecycle?

  • Are there redundant data processes that can be optimized or removed?

  • What data dependencies exist for this critical report?

Understanding complex inter-DAG dependencies and providing up-to-date runtime visibility into DAG execution can be challenging. OpenLineage integrates with Airflow to collect DAG lineage metadata so that inter-DAG dependencies are easily maintained and viewable via a lineage graph, while also keeping a catalog of historical runs of DAGs.

For OpenLineage backend that will receive events, you can use Marquez

How it works under the hood ?

OpenLineage integration implements AirflowPlugin. This allows it to be discovered on Airflow start and register Airflow Listener.

The OpenLineageListener is then called by Airflow when certain events happen - when DAGs or TaskInstances start, complete or fail. For DAGs, the listener runs in Airflow Scheduler. For TaskInstances, the listener runs on Airflow Worker.

When TaskInstance listener method gets called, the OpenLineageListener constructs metadata like event’s unique run_id and event time. Then, it tries to extract metadata from Airflow Operators as described in Extraction precedence.

Was this entry helpful?