# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Utilities for processing hook-level lineage into OpenLineage events."""
from __future__ import annotations
import datetime as dt
import logging
from openlineage.client.event_v2 import Job, Run, RunEvent, RunState
from openlineage.client.facet_v2 import external_query_run, job_type_job, sql_job
from openlineage.client.uuid import generate_new_uuid
from airflow.providers.common.compat.sdk import timezone
from airflow.providers.common.sql.hooks.lineage import SqlJobHookLineageExtra
from airflow.providers.openlineage.extractors.base import OperatorLineage
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
from airflow.providers.openlineage.plugins.macros import (
_get_logical_date,
lineage_job_name,
lineage_job_namespace,
lineage_root_job_name,
lineage_root_job_namespace,
lineage_root_run_id,
lineage_run_id,
)
from airflow.providers.openlineage.sqlparser import SQLParser, get_openlineage_facets_with_sql
from airflow.providers.openlineage.utils.utils import _get_parent_run_facet
[docs]
log = logging.getLogger(__name__)
def _resolve_namespace(hook, conn_id: str | None) -> str | None:
"""
Resolve the OpenLineage namespace from a hook.
Tries ``hook.get_openlineage_database_info`` to build the namespace.
Returns ``None`` when the hook does not expose this method.
"""
if conn_id:
try:
connection = hook.get_connection(conn_id)
database_info = hook.get_openlineage_database_info(connection)
except Exception as e:
log.debug("Failed to get OpenLineage database info: %s", e)
database_info = None
if database_info is not None:
return SQLParser.create_namespace(database_info)
return None
def _get_hook_conn_id(hook) -> str | None:
"""
Try to extract the connection ID from a hook instance.
Checks for ``get_conn_id()`` first, then falls back to the attribute
named by ``hook.conn_name_attr``.
"""
if callable(getattr(hook, "get_conn_id", None)):
return hook.get_conn_id()
conn_name_attr = getattr(hook, "conn_name_attr", None)
if conn_name_attr:
return getattr(hook, conn_name_attr, None)
return None
def _create_ol_event_pair(
task_instance,
job_name: str,
is_successful: bool,
inputs: list | None = None,
outputs: list | None = None,
run_facets: dict | None = None,
job_facets: dict | None = None,
event_time: dt.datetime | None = None,
) -> tuple[RunEvent, RunEvent]:
"""
Create a START + COMPLETE/FAIL child event pair linked to a task instance.
Handles parent-run facet generation, run-ID creation and event timestamps
so callers only need to supply the query-specific facets and datasets.
"""
parent_facets = _get_parent_run_facet(
parent_run_id=lineage_run_id(task_instance),
parent_job_name=lineage_job_name(task_instance),
parent_job_namespace=lineage_job_namespace(),
root_parent_run_id=lineage_root_run_id(task_instance),
root_parent_job_name=lineage_root_job_name(task_instance),
root_parent_job_namespace=lineage_root_job_namespace(task_instance),
)
run = Run(
runId=str(generate_new_uuid(instant=_get_logical_date(task_instance))),
facets={**parent_facets, **(run_facets or {})},
)
job = Job(namespace=lineage_job_namespace(), name=job_name, facets=job_facets or {})
event_time = event_time or timezone.utcnow()
start = RunEvent(
eventType=RunState.START,
eventTime=event_time.isoformat(),
run=run,
job=job,
inputs=inputs or [],
outputs=outputs or [],
)
end = RunEvent(
eventType=RunState.COMPLETE if is_successful else RunState.FAIL,
eventTime=event_time.isoformat(),
run=run,
job=job,
inputs=inputs or [],
outputs=outputs or [],
)
return start, end