Source code for airflow.providers.openlineage.api.datasets

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Public helpers for emitting OpenLineage events carrying dataset lineage."""

from __future__ import annotations

import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from openlineage.client.event_v2 import Dataset, Job, Run, RunEvent, RunState

from airflow.providers.openlineage.api.core import emit, is_openlineage_active
from airflow.providers.openlineage.plugins.adapter import _PRODUCER, OpenLineageAdapter
from airflow.providers.openlineage.plugins.macros import (
    _get_dag_run_clear_number,
    _get_dag_run_conf,
    _get_logical_date,
    lineage_job_name,
    lineage_job_namespace,
    lineage_run_id,
)
from airflow.providers.openlineage.utils.utils import (
    build_task_event_job_facets,
    build_task_event_run_facets,
    get_dag_run_dag_and_task_from_ti,
    get_task_instance_from_context,
)
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
    from openlineage.client.event_v2 import InputDataset, OutputDataset
    from openlineage.client.facet_v2 import JobFacet, RunFacet

    from airflow.models.taskinstance import TaskInstance
    from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance

log = logging.getLogger(__name__)

__all__ = ["emit_dataset_lineage"]


[docs] def emit_dataset_lineage( *, inputs: list[InputDataset] | None = None, outputs: list[OutputDataset] | None = None, task_instance: RuntimeTaskInstance | TaskInstance | None = None, additional_run_facets: dict[str, RunFacet] | None = None, additional_job_facets: dict[str, JobFacet] | None = None, raise_on_error: bool = False, ) -> None: """ Emit an OpenLineage RUNNING event attributing datasets to the current task run. This helper lets DAG authors supplement automatic extractor-based lineage with their own datasets. It constructs and emits a RUNNING ``RunEvent`` whose run/job identifiers match the currently executing Airflow task, attaches the same facets the provider adds to a task's START/COMPLETE events. At least one of ``inputs`` or ``outputs`` must be non-empty. Helpful References: - Dataset naming convention: https://openlineage.io/docs/spec/naming - Dataset naming helpers: https://openlineage.io/docs/client/python/best-practices#dataset-naming-helpers - Available facets: https://openlineage.io/docs/spec/facets :param inputs: Input datasets consumed by the task. :param outputs: Output datasets produced by the task. :param task_instance: The Airflow task instance to attribute lineage to. Defaults to the currently executing task instance obtained from the execution context. :param additional_run_facets: Extra run facets to attach. :param additional_job_facets: Extra job facets to attach. :param raise_on_error: When ``False`` (default), any exception raised while building or emitting the event is logged at WARNING level and the function returns silently — so a broken lineage helper never breaks a user's task. Set to ``True`` to opt into normal exception propagation. :raises ValueError: When ``raise_on_error=True``, if both ``inputs`` and ``outputs`` are empty or ``None``. :raises TypeError: When ``raise_on_error=True``, if any item in ``inputs`` or ``outputs`` is not an OpenLineage ``Dataset``. :raises RuntimeError: When ``raise_on_error=True``, if ``task_instance`` is not provided and cannot be resolved from the current execution context. Example: .. code-block:: python from openlineage.client.event_v2 import Dataset from airflow.providers.openlineage.api import emit_dataset_lineage @task def my_task(): emit_dataset_lineage( inputs=[Dataset(namespace="s3://bucket", name="raw/2024/01/01/data.csv")], outputs=[Dataset(namespace="snowflake://account", name="analytics.public.users")], ) """ if not is_openlineage_active(): log.info("OpenLineage is not active - emit_dataset_lineage will have no effect.") return try: if not inputs and not outputs: raise ValueError("At least one of `inputs` or `outputs` must be provided.") inputs = inputs or [] outputs = outputs or [] if not all(isinstance(d, Dataset) for d in (*inputs, *outputs)): raise TypeError("`inputs` and `outputs` must contain only OpenLineage Dataset objects.") if task_instance is None: log.debug("TaskInstance not provided, retrieving it from context.") task_instance = get_task_instance_from_context() dag_run, dag, task = get_dag_run_dag_and_task_from_ti(task_instance) task_uuid = lineage_run_id(task_instance) run_facets = build_task_event_run_facets( task_instance=task_instance, dag_run=dag_run, dag=dag, task=task, task_uuid=task_uuid, ti_state=TaskInstanceState.RUNNING, parent_run_id=OpenLineageAdapter.build_dag_run_id( dag_id=dag.dag_id, logical_date=_get_logical_date(task_instance), clear_number=_get_dag_run_clear_number(task_instance), ), parent_job_name=dag.dag_id, dr_conf=_get_dag_run_conf(task_instance), additional_run_facets=additional_run_facets, ) job_facets = build_task_event_job_facets( task=task, dag=dag, additional_job_facets=additional_job_facets ) event = RunEvent( eventType=RunState.RUNNING, eventTime=datetime.now(tz=timezone.utc).isoformat(), run=Run(runId=task_uuid, facets=run_facets), job=Job( namespace=lineage_job_namespace(), name=lineage_job_name(task_instance), facets=job_facets, ), inputs=inputs, outputs=outputs, producer=_PRODUCER, ) log.info("emit_dataset_lineage will emit a RUNNING OpenLineage event.") emit(event) except Exception as err: if raise_on_error: raise log.warning("emit_dataset_lineage raised an error `%s`", err) log.debug("Exception details:", exc_info=True)

Was this entry helpful?