# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
import uuid
from typing import TYPE_CHECKING
import yaml
from openlineage.client import OpenLineageClient, set_producer
from openlineage.client.facet import (
BaseFacet,
DocumentationJobFacet,
ErrorMessageRunFacet,
NominalTimeRunFacet,
OwnershipJobFacet,
OwnershipJobFacetOwners,
ParentRunFacet,
ProcessingEngineRunFacet,
SourceCodeLocationJobFacet,
)
from openlineage.client.run import Job, Run, RunEvent, RunState
from airflow.configuration import conf
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION
from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.log.secrets_masker import SecretsMasker
_DAG_DEFAULT_NAMESPACE = "default"
_DAG_NAMESPACE = conf.get(
"openlineage", "namespace", fallback=os.getenv("OPENLINEAGE_NAMESPACE", _DAG_DEFAULT_NAMESPACE)
)
_PRODUCER = f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}"
set_producer(_PRODUCER)
[docs]class OpenLineageAdapter(LoggingMixin):
"""Translate Airflow metadata to OpenLineage events instead of creating them from Airflow code."""
def __init__(self, client: OpenLineageClient | None = None, secrets_masker: SecretsMasker | None = None):
super().__init__()
self._client = client
if not secrets_masker:
from airflow.utils.log.secrets_masker import _secrets_masker
secrets_masker = _secrets_masker()
self._redacter = OpenLineageRedactor.from_masker(secrets_masker)
[docs] def get_or_create_openlineage_client(self) -> OpenLineageClient:
if not self._client:
config = self.get_openlineage_config()
if config:
self._client = OpenLineageClient.from_dict(config=config)
else:
self._client = OpenLineageClient.from_environment()
return self._client
[docs] def get_openlineage_config(self) -> dict | None:
# First, try to read from YAML file
openlineage_config_path = conf.get("openlineage", "config_path")
if openlineage_config_path:
config = self._read_yaml_config(openlineage_config_path)
if config:
return config.get("transport", None)
# Second, try to get transport config
transport = conf.getjson("openlineage", "transport")
if not transport:
return None
elif not isinstance(transport, dict):
raise ValueError(f"{transport} is not a dict")
return transport
def _read_yaml_config(self, path: str) -> dict | None:
with open(path) as config_file:
return yaml.safe_load(config_file)
@staticmethod
[docs] def build_dag_run_id(dag_id, dag_run_id):
return str(uuid.uuid3(uuid.NAMESPACE_URL, f"{_DAG_NAMESPACE}.{dag_id}.{dag_run_id}"))
@staticmethod
[docs] def build_task_instance_run_id(task_id, execution_date, try_number):
return str(
uuid.uuid3(
uuid.NAMESPACE_URL,
f"{_DAG_NAMESPACE}.{task_id}.{execution_date}.{try_number}",
)
)
[docs] def emit(self, event: RunEvent):
if not self._client:
self._client = self.get_or_create_openlineage_client()
redacted_event: RunEvent = self._redacter.redact(event, max_depth=20) # type: ignore[assignment]
try:
with Stats.timer("ol.emit.attempts"):
return self._client.emit(redacted_event)
except Exception as e:
Stats.incr("ol.emit.failed")
self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId)
self.log.debug("OpenLineage emission failure: %s", e)
[docs] def start_task(
self,
run_id: str,
job_name: str,
job_description: str,
event_time: str,
parent_job_name: str | None,
parent_run_id: str | None,
code_location: str | None,
nominal_start_time: str | None,
nominal_end_time: str | None,
owners: list[str],
task: OperatorLineage | None,
run_facets: dict[str, BaseFacet] | None = None, # Custom run facets
):
"""
Emits openlineage event of type START.
:param run_id: globally unique identifier of task in dag run
:param job_name: globally unique identifier of task in dag
:param job_description: user provided description of job
:param event_time:
:param parent_job_name: the name of the parent job (typically the DAG,
but possibly a task group)
:param parent_run_id: identifier of job spawning this task
:param code_location: file path or URL of DAG file
:param nominal_start_time: scheduled time of dag run
:param nominal_end_time: following schedule of dag run
:param owners: list of owners of DAG
:param task: metadata container with information extracted from operator
:param run_facets: custom run facets
"""
from airflow.version import version as AIRFLOW_VERSION
processing_engine_version_facet = ProcessingEngineRunFacet(
version=AIRFLOW_VERSION,
name="Airflow",
openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION,
)
if not run_facets:
run_facets = {}
if task:
run_facets = {**task.run_facets, **run_facets}
run_facets["processing_engine"] = processing_engine_version_facet # type: ignore
event = RunEvent(
eventType=RunState.START,
eventTime=event_time,
run=self._build_run(
run_id=run_id,
job_name=job_name,
parent_job_name=parent_job_name,
parent_run_id=parent_run_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets=run_facets,
),
job=self._build_job(
job_name=job_name,
job_description=job_description,
code_location=code_location,
owners=owners,
job_facets=task.job_facets if task else None,
),
inputs=task.inputs if task else [],
outputs=task.outputs if task else [],
producer=_PRODUCER,
)
self.emit(event)
[docs] def complete_task(
self,
run_id: str,
job_name: str,
parent_job_name: str | None,
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
):
"""
Emits openlineage event of type COMPLETE.
:param run_id: globally unique identifier of task in dag run
:param job_name: globally unique identifier of task between dags
:param parent_job_name: the name of the parent job (typically the DAG,
but possibly a task group)
:param parent_run_id: identifier of job spawning this task
:param end_time: time of task completion
:param task: metadata container with information extracted from operator
"""
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=end_time,
run=self._build_run(
run_id=run_id,
job_name=job_name,
parent_job_name=parent_job_name,
parent_run_id=parent_run_id,
run_facets=task.run_facets,
),
job=self._build_job(job_name, job_facets=task.job_facets),
inputs=task.inputs,
outputs=task.outputs,
producer=_PRODUCER,
)
self.emit(event)
[docs] def fail_task(
self,
run_id: str,
job_name: str,
parent_job_name: str | None,
parent_run_id: str | None,
end_time: str,
task: OperatorLineage,
):
"""
Emits openlineage event of type FAIL.
:param run_id: globally unique identifier of task in dag run
:param job_name: globally unique identifier of task between dags
:param parent_job_name: the name of the parent job (typically the DAG,
but possibly a task group)
:param parent_run_id: identifier of job spawning this task
:param end_time: time of task completion
:param task: metadata container with information extracted from operator
"""
event = RunEvent(
eventType=RunState.FAIL,
eventTime=end_time,
run=self._build_run(
run_id=run_id,
job_name=job_name,
parent_job_name=parent_job_name,
parent_run_id=parent_run_id,
run_facets=task.run_facets,
),
job=self._build_job(job_name, job_facets=task.job_facets),
inputs=task.inputs,
outputs=task.outputs,
producer=_PRODUCER,
)
self.emit(event)
[docs] def dag_started(
self,
dag_run: DagRun,
msg: str,
nominal_start_time: str,
nominal_end_time: str,
):
event = RunEvent(
eventType=RunState.START,
eventTime=dag_run.start_date.isoformat(),
job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE),
run=self._build_run(
run_id=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id),
job_name=dag_run.dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
),
inputs=[],
outputs=[],
producer=_PRODUCER,
)
self.emit(event)
[docs] def dag_success(self, dag_run: DagRun, msg: str):
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=dag_run.end_date.isoformat(),
job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE),
run=Run(runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id)),
inputs=[],
outputs=[],
producer=_PRODUCER,
)
self.emit(event)
[docs] def dag_failed(self, dag_run: DagRun, msg: str):
event = RunEvent(
eventType=RunState.FAIL,
eventTime=dag_run.end_date.isoformat(),
job=Job(name=dag_run.dag_id, namespace=_DAG_NAMESPACE),
run=Run(
runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id),
facets={"errorMessage": ErrorMessageRunFacet(message=msg, programmingLanguage="python")},
),
inputs=[],
outputs=[],
producer=_PRODUCER,
)
self.emit(event)
@staticmethod
def _build_run(
run_id: str,
job_name: str,
parent_job_name: str | None = None,
parent_run_id: str | None = None,
nominal_start_time: str | None = None,
nominal_end_time: str | None = None,
run_facets: dict[str, BaseFacet] | None = None,
) -> Run:
facets: dict[str, BaseFacet] = {}
if nominal_start_time:
facets.update({"nominalTime": NominalTimeRunFacet(nominal_start_time, nominal_end_time)})
if parent_run_id:
parent_run_facet = ParentRunFacet.create(
runId=parent_run_id,
namespace=_DAG_NAMESPACE,
name=parent_job_name or job_name,
)
facets.update(
{
"parent": parent_run_facet,
"parentRun": parent_run_facet, # Keep sending this for the backward compatibility
}
)
if run_facets:
facets.update(run_facets)
return Run(run_id, facets)
@staticmethod
def _build_job(
job_name: str,
job_description: str | None = None,
code_location: str | None = None,
owners: list[str] | None = None,
job_facets: dict[str, BaseFacet] | None = None,
):
facets: dict[str, BaseFacet] = {}
if job_description:
facets.update({"documentation": DocumentationJobFacet(description=job_description)})
if code_location:
facets.update({"sourceCodeLocation": SourceCodeLocationJobFacet("", url=code_location)})
if owners:
facets.update(
{
"ownership": OwnershipJobFacet(
owners=[OwnershipJobFacetOwners(name=owner) for owner in owners]
)
}
)
if job_facets:
facets = {**facets, **job_facets}
return Job(_DAG_NAMESPACE, job_name, facets)