Source code for tests.system.openlineage.example_openlineage_manual_lineage_dag

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
DAG exercising the public OpenLineage manual lineage API: emit_dataset_lineage and emit_query_lineage.

It checks:
    - emit_dataset_lineage produces a RUNNING event with inputs/outputs
    - emit_dataset_lineage accepts additional_run_facets and additional_job_facets
    - datasets can carry dataset-level facets (DataQualityAssertionsDatasetFacet)
    - emit_query_lineage produces a START + COMPLETE event pair
    - query_text attaches a sql job facet
    - query_id + query_source_namespace attaches external query run facet
    - multiple emit_query_lineage calls increment the job name counter (.1, .2, .3)
    - failed query (is_successful=False) produces a FAIL event with error message run facet
    - counter resets to .1 in a new task (no cross-task spill)
    - explicit job_name= bypasses the counter
    - explicit task_instance= bypasses context resolution inside the helper
"""

from __future__ import annotations

import datetime as dt

from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import data_quality_assertions_dataset, source_code_location_job

from airflow.providers.openlineage.api import emit_dataset_lineage, emit_query_lineage

try:
    from airflow.sdk import dag, get_current_context, task
except ImportError:
    from airflow.decorators import dag, task  # type: ignore[no-redef, attr-defined]
    from airflow.operators.python import get_current_context  # type: ignore[no-redef]

from system.openlineage.expected_events import get_expected_event_file_path
from system.openlineage.operator import OpenLineageTestOperator


def _dataset_with_assertions(namespace: str, name: str) -> Dataset:
    return Dataset(
        namespace=namespace,
        name=name,
        facets={
            "dataQualityAssertions": data_quality_assertions_dataset.DataQualityAssertionsDatasetFacet(  # type: ignore[dict-item]
                assertions=[
                    data_quality_assertions_dataset.Assertion(
                        assertion="not_null", success=True, column="id"
                    ),
                    data_quality_assertions_dataset.Assertion(assertion="unique", success=True, column="id"),
                ],
            ),
        },
    )


[docs] DAG_ID = "openlineage_manual_lineage_dag"
@dag( dag_id=DAG_ID, start_date=dt.datetime(2024, 1, 1), schedule=None, catchup=False, default_args={"retries": 0}, )
[docs] def openlineage_manual_lineage_dag(): @task def datasets_minimal() -> None: """Minimal emit_dataset_lineage — just one input dataset.""" emit_dataset_lineage( inputs=[Dataset(namespace="s3://example-bucket", name="raw/orders.csv")], ) @task def datasets_maximal() -> None: """Maximal emit_dataset_lineage — every kwarg set, datasets with facets.""" ctx = get_current_context() ti = ctx["task_instance"] emit_dataset_lineage( inputs=[_dataset_with_assertions("s3://example-bucket", "raw/2024/01/01/orders.csv")], outputs=[ _dataset_with_assertions("snowflake://example-acct", "analytics.public.orders_enriched") ], task_instance=ti, additional_run_facets={"my_custom_run_facet": {"key": "value"}}, # type: ignore[dict-item] additional_job_facets={ "sourceCodeLocation": source_code_location_job.SourceCodeLocationJobFacet( type="git", url="https://github.com/apache/airflow", repoUrl="https://github.com/apache/airflow", path="providers/openlineage/tests/system/openlineage/example_openlineage_manual_lineage_dag.py", version="main", branch="main", ), }, raise_on_error=True, ) @task def query_minimal() -> None: """Minimal emit_query_lineage — query_id + source only.""" emit_query_lineage( query_id="qid-min-1", query_source_namespace="snowflake://example-acct", ) @task def query_maximal() -> None: """Maximal emit_query_lineage — every kwarg set.""" ctx = get_current_context() ti = ctx["task_instance"] start = dt.datetime(2024, 5, 1, 10, 0, 0, tzinfo=dt.timezone.utc) end = dt.datetime(2024, 5, 1, 10, 0, 5, tzinfo=dt.timezone.utc) emit_query_lineage( query_id="qid-max-1", query_source_namespace="snowflake://example-acct", query_text=( "INSERT INTO analytics.public.user_events_summary " "SELECT user_id, COUNT(*) FROM analytics.public.user_events GROUP BY user_id" ), inputs=[ _dataset_with_assertions("snowflake://example-acct", "analytics.public.user_events_extra") ], outputs=[ _dataset_with_assertions("snowflake://example-acct", "analytics.public.user_events_summary") ], start_time=start, end_time=end, is_successful=True, default_database="ANALYTICS", default_schema="public", task_instance=ti, raise_on_error=True, ) @task def query_multiple_in_one_task() -> None: """Three emit_query_lineage calls — counter goes .1, .2, .3; third call is FAIL.""" emit_query_lineage( query_id="qid-multi-1", query_source_namespace="snowflake://example-acct", query_text="SELECT id, email FROM analytics.public.users WHERE active = true", ) emit_query_lineage( query_id="qid-multi-2", query_source_namespace="snowflake://example-acct", query_text="SELECT * FROM analytics.public.orders WHERE created_at > '2024-01-01'", ) emit_query_lineage( query_id="qid-multi-3", query_source_namespace="snowflake://example-acct", query_text="SELECT broken(", is_successful=False, error_message="syntax error at or near 'broken'", ) @task def query_isolated_task() -> None: """Single call in a new task — counter resets to .1, proving no cross-task spill.""" emit_query_lineage( query_id="qid-isolated-1", query_source_namespace="snowflake://example-acct", query_text="SELECT 1", ) @task def query_with_explicit_job_name() -> None: """Explicit job_name= bypasses the counter; counter stays unset after the call.""" emit_query_lineage( query_id="qid-explicit-name", query_source_namespace="snowflake://example-acct", query_text="SELECT version()", job_name="custom_job_name_set_by_caller", ) @task def query_with_explicit_task_instance() -> None: """Explicit task_instance= bypasses context resolution inside the helper.""" ti = get_current_context()["task_instance"] emit_query_lineage( query_id="qid-explicit-ti", query_source_namespace="snowflake://example-acct", query_text="SELECT now()", task_instance=ti, ) check_events = OpenLineageTestOperator( task_id="check_events", file_path=get_expected_event_file_path(DAG_ID), event_count_assertions={ # emit_dataset_lineage produces a RUNNING event f"{DAG_ID}.datasets_minimal.event.running": "==1", f"{DAG_ID}.datasets_maximal.event.running": "==1", # minimal query: 1 start + 1 complete f"{DAG_ID}.query_minimal.manual_query.1.event.start": "==1", f"{DAG_ID}.query_minimal.manual_query.1.event.complete": "==1", # maximal query f"{DAG_ID}.query_maximal.manual_query.1.event.start": "==1", f"{DAG_ID}.query_maximal.manual_query.1.event.complete": "==1", # three calls in one task: .1 and .2 complete, .3 fail f"{DAG_ID}.query_multiple_in_one_task.manual_query.1.event.start": "==1", f"{DAG_ID}.query_multiple_in_one_task.manual_query.1.event.complete": "==1", f"{DAG_ID}.query_multiple_in_one_task.manual_query.2.event.start": "==1", f"{DAG_ID}.query_multiple_in_one_task.manual_query.2.event.complete": "==1", f"{DAG_ID}.query_multiple_in_one_task.manual_query.3.event.start": "==1", f"{DAG_ID}.query_multiple_in_one_task.manual_query.3.event.fail": "==1", # isolated task resets counter to .1 f"{DAG_ID}.query_isolated_task.manual_query.1.event.start": "==1", f"{DAG_ID}.query_isolated_task.manual_query.1.event.complete": "==1", # explicit job_name bypasses counter "custom_job_name_set_by_caller.event.start": "==1", "custom_job_name_set_by_caller.event.complete": "==1", # explicit task_instance f"{DAG_ID}.query_with_explicit_task_instance.manual_query.1.event.start": "==1", f"{DAG_ID}.query_with_explicit_task_instance.manual_query.1.event.complete": "==1", }, ) ( datasets_minimal() >> datasets_maximal() >> query_minimal() >> query_maximal() >> query_multiple_in_one_task() >> query_isolated_task() >> query_with_explicit_job_name() >> query_with_explicit_task_instance() >> check_events )
openlineage_manual_lineage_dag() from tests_common.test_utils.system_tests import get_test_run # noqa: E402 # Needed to run the example DAG with pytest (see: contributing-docs/testing/system_tests.rst)
[docs] test_run = get_test_run(dag)

Was this entry helpful?