airflow.providers.openlineage.api.datasets

Public helpers for emitting OpenLineage events carrying dataset lineage.

Functions

emit_dataset_lineage(*[, inputs, outputs, ...])

Emit an OpenLineage RUNNING event attributing datasets to the current task run.

Module Contents

airflow.providers.openlineage.api.datasets.emit_dataset_lineage(*, inputs=None, outputs=None, task_instance=None, additional_run_facets=None, additional_job_facets=None, raise_on_error=False)[source]

Emit an OpenLineage RUNNING event attributing datasets to the current task run.

This helper lets DAG authors supplement automatic extractor-based lineage with their own datasets. It constructs and emits a RUNNING RunEvent whose run/job identifiers match the currently executing Airflow task, attaches the same facets the provider adds to a task’s START/COMPLETE events.

At least one of inputs or outputs must be non-empty.

Helpful References:

Parameters:
  • inputs (list[openlineage.client.event_v2.InputDataset] | None) – Input datasets consumed by the task.

  • outputs (list[openlineage.client.event_v2.OutputDataset] | None) – Output datasets produced by the task.

  • task_instance (airflow.sdk.execution_time.task_runner.RuntimeTaskInstance | airflow.models.taskinstance.TaskInstance | None) – The Airflow task instance to attribute lineage to. Defaults to the currently executing task instance obtained from the execution context.

  • additional_run_facets (dict[str, openlineage.client.facet_v2.RunFacet] | None) – Extra run facets to attach.

  • additional_job_facets (dict[str, openlineage.client.facet_v2.JobFacet] | None) – Extra job facets to attach.

  • raise_on_error (bool) – When False (default), any exception raised while building or emitting the event is logged at WARNING level and the function returns silently — so a broken lineage helper never breaks a user’s task. Set to True to opt into normal exception propagation.

Raises:
  • ValueError – When raise_on_error=True, if both inputs and outputs are empty or None.

  • TypeError – When raise_on_error=True, if any item in inputs or outputs is not an OpenLineage Dataset.

  • RuntimeError – When raise_on_error=True, if task_instance is not provided and cannot be resolved from the current execution context.

Example:

from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.api import emit_dataset_lineage


@task
def my_task():
    emit_dataset_lineage(
        inputs=[Dataset(namespace="s3://bucket", name="raw/2024/01/01/data.csv")],
        outputs=[Dataset(namespace="snowflake://account", name="analytics.public.users")],
    )

Was this entry helpful?