airflow.providers.openlineage.api.datasets¶
Public helpers for emitting OpenLineage events carrying dataset lineage.
Functions¶
|
Emit an OpenLineage RUNNING event attributing datasets to the current task run. |
Module Contents¶
- airflow.providers.openlineage.api.datasets.emit_dataset_lineage(*, inputs=None, outputs=None, task_instance=None, additional_run_facets=None, additional_job_facets=None, raise_on_error=False)[source]¶
Emit an OpenLineage RUNNING event attributing datasets to the current task run.
This helper lets DAG authors supplement automatic extractor-based lineage with their own datasets. It constructs and emits a RUNNING
RunEventwhose run/job identifiers match the currently executing Airflow task, attaches the same facets the provider adds to a task’s START/COMPLETE events.At least one of
inputsoroutputsmust be non-empty.Helpful References:
Dataset naming convention: https://openlineage.io/docs/spec/naming
Dataset naming helpers: https://openlineage.io/docs/client/python/best-practices#dataset-naming-helpers
Available facets: https://openlineage.io/docs/spec/facets
- Parameters:
inputs (list[openlineage.client.event_v2.InputDataset] | None) – Input datasets consumed by the task.
outputs (list[openlineage.client.event_v2.OutputDataset] | None) – Output datasets produced by the task.
task_instance (airflow.sdk.execution_time.task_runner.RuntimeTaskInstance | airflow.models.taskinstance.TaskInstance | None) – The Airflow task instance to attribute lineage to. Defaults to the currently executing task instance obtained from the execution context.
additional_run_facets (dict[str, openlineage.client.facet_v2.RunFacet] | None) – Extra run facets to attach.
additional_job_facets (dict[str, openlineage.client.facet_v2.JobFacet] | None) – Extra job facets to attach.
raise_on_error (bool) – When
False(default), any exception raised while building or emitting the event is logged at WARNING level and the function returns silently — so a broken lineage helper never breaks a user’s task. Set toTrueto opt into normal exception propagation.
- Raises:
ValueError – When
raise_on_error=True, if bothinputsandoutputsare empty orNone.TypeError – When
raise_on_error=True, if any item ininputsoroutputsis not an OpenLineageDataset.RuntimeError – When
raise_on_error=True, iftask_instanceis not provided and cannot be resolved from the current execution context.
Example:
from openlineage.client.event_v2 import Dataset from airflow.providers.openlineage.api import emit_dataset_lineage @task def my_task(): emit_dataset_lineage( inputs=[Dataset(namespace="s3://bucket", name="raw/2024/01/01/data.csv")], outputs=[Dataset(namespace="snowflake://account", name="analytics.public.users")], )