Source code for airflow.providers.openlineage.plugins.adapter
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportosimportuuidfromtypingimportTYPE_CHECKINGimportyamlfromopenlineage.clientimportOpenLineageClient,set_producerfromopenlineage.client.facetimport(BaseFacet,DocumentationJobFacet,ErrorMessageRunFacet,NominalTimeRunFacet,OwnershipJobFacet,OwnershipJobFacetOwners,ParentRunFacet,ProcessingEngineRunFacet,SourceCodeLocationJobFacet,)fromopenlineage.client.runimportJob,Run,RunEvent,RunStatefromairflow.configurationimportconffromairflow.providers.openlineageimport__version__asOPENLINEAGE_PROVIDER_VERSIONfromairflow.providers.openlineage.utils.utilsimportOpenLineageRedactorfromairflow.statsimportStatsfromairflow.utils.log.logging_mixinimportLoggingMixinifTYPE_CHECKING:fromairflow.models.dagrunimportDagRunfromairflow.providers.openlineage.extractorsimportOperatorLineagefromairflow.utils.log.secrets_maskerimportSecretsMasker_DAG_DEFAULT_NAMESPACE="default"_DAG_NAMESPACE=conf.get("openlineage","namespace",fallback=os.getenv("OPENLINEAGE_NAMESPACE",_DAG_DEFAULT_NAMESPACE))_PRODUCER=f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}"set_producer(_PRODUCER)
[docs]classOpenLineageAdapter(LoggingMixin):"""Translate Airflow metadata to OpenLineage events instead of creating them from Airflow code."""def__init__(self,client:OpenLineageClient|None=None,secrets_masker:SecretsMasker|None=None):super().__init__()self._client=clientifnotsecrets_masker:fromairflow.utils.log.secrets_maskerimport_secrets_maskersecrets_masker=_secrets_masker()self._redacter=OpenLineageRedactor.from_masker(secrets_masker)
[docs]defget_openlineage_config(self)->dict|None:# First, try to read from YAML fileopenlineage_config_path=conf.get("openlineage","config_path")ifopenlineage_config_path:config=self._read_yaml_config(openlineage_config_path)ifconfig:returnconfig.get("transport",None)# Second, try to get transport configtransport=conf.getjson("openlineage","transport")ifnottransport:returnNoneelifnotisinstance(transport,dict):raiseValueError(f"{transport} is not a dict")returntransport
[docs]defemit(self,event:RunEvent):ifnotself._client:self._client=self.get_or_create_openlineage_client()redacted_event:RunEvent=self._redacter.redact(event,max_depth=20)# type: ignore[assignment]try:withStats.timer("ol.emit.attempts"):returnself._client.emit(redacted_event)exceptExceptionase:Stats.incr("ol.emit.failed")self.log.warning("Failed to emit OpenLineage event of id %s",event.run.runId)self.log.debug("OpenLineage emission failure: %s",e)
[docs]defstart_task(self,run_id:str,job_name:str,job_description:str,event_time:str,parent_job_name:str|None,parent_run_id:str|None,code_location:str|None,nominal_start_time:str|None,nominal_end_time:str|None,owners:list[str],task:OperatorLineage|None,run_facets:dict[str,BaseFacet]|None=None,# Custom run facets):""" Emits openlineage event of type START. :param run_id: globally unique identifier of task in dag run :param job_name: globally unique identifier of task in dag :param job_description: user provided description of job :param event_time: :param parent_job_name: the name of the parent job (typically the DAG, but possibly a task group) :param parent_run_id: identifier of job spawning this task :param code_location: file path or URL of DAG file :param nominal_start_time: scheduled time of dag run :param nominal_end_time: following schedule of dag run :param owners: list of owners of DAG :param task: metadata container with information extracted from operator :param run_facets: custom run facets """fromairflow.versionimportversionasAIRFLOW_VERSIONprocessing_engine_version_facet=ProcessingEngineRunFacet(version=AIRFLOW_VERSION,name="Airflow",openlineageAdapterVersion=OPENLINEAGE_PROVIDER_VERSION,)ifnotrun_facets:run_facets={}iftask:run_facets={**task.run_facets,**run_facets}run_facets["processing_engine"]=processing_engine_version_facet# type: ignoreevent=RunEvent(eventType=RunState.START,eventTime=event_time,run=self._build_run(run_id=run_id,job_name=job_name,parent_job_name=parent_job_name,parent_run_id=parent_run_id,nominal_start_time=nominal_start_time,nominal_end_time=nominal_end_time,run_facets=run_facets,),job=self._build_job(job_name=job_name,job_description=job_description,code_location=code_location,owners=owners,job_facets=task.job_facetsiftaskelseNone,),inputs=task.inputsiftaskelse[],outputs=task.outputsiftaskelse[],producer=_PRODUCER,)self.emit(event)
[docs]defcomplete_task(self,run_id:str,job_name:str,parent_job_name:str|None,parent_run_id:str|None,end_time:str,task:OperatorLineage,):""" Emits openlineage event of type COMPLETE. :param run_id: globally unique identifier of task in dag run :param job_name: globally unique identifier of task between dags :param parent_job_name: the name of the parent job (typically the DAG, but possibly a task group) :param parent_run_id: identifier of job spawning this task :param end_time: time of task completion :param task: metadata container with information extracted from operator """event=RunEvent(eventType=RunState.COMPLETE,eventTime=end_time,run=self._build_run(run_id=run_id,job_name=job_name,parent_job_name=parent_job_name,parent_run_id=parent_run_id,run_facets=task.run_facets,),job=self._build_job(job_name,job_facets=task.job_facets),inputs=task.inputs,outputs=task.outputs,producer=_PRODUCER,)self.emit(event)
[docs]deffail_task(self,run_id:str,job_name:str,parent_job_name:str|None,parent_run_id:str|None,end_time:str,task:OperatorLineage,):""" Emits openlineage event of type FAIL. :param run_id: globally unique identifier of task in dag run :param job_name: globally unique identifier of task between dags :param parent_job_name: the name of the parent job (typically the DAG, but possibly a task group) :param parent_run_id: identifier of job spawning this task :param end_time: time of task completion :param task: metadata container with information extracted from operator """event=RunEvent(eventType=RunState.FAIL,eventTime=end_time,run=self._build_run(run_id=run_id,job_name=job_name,parent_job_name=parent_job_name,parent_run_id=parent_run_id,run_facets=task.run_facets,),job=self._build_job(job_name,job_facets=task.job_facets),inputs=task.inputs,outputs=task.outputs,producer=_PRODUCER,)self.emit(event)
@staticmethoddef_build_run(run_id:str,job_name:str,parent_job_name:str|None=None,parent_run_id:str|None=None,nominal_start_time:str|None=None,nominal_end_time:str|None=None,run_facets:dict[str,BaseFacet]|None=None,)->Run:facets:dict[str,BaseFacet]={}ifnominal_start_time:facets.update({"nominalTime":NominalTimeRunFacet(nominal_start_time,nominal_end_time)})ifparent_run_id:parent_run_facet=ParentRunFacet.create(runId=parent_run_id,namespace=_DAG_NAMESPACE,name=parent_job_nameorjob_name,)facets.update({"parent":parent_run_facet,"parentRun":parent_run_facet,# Keep sending this for the backward compatibility})ifrun_facets:facets.update(run_facets)returnRun(run_id,facets)@staticmethoddef_build_job(job_name:str,job_description:str|None=None,code_location:str|None=None,owners:list[str]|None=None,job_facets:dict[str,BaseFacet]|None=None,):facets:dict[str,BaseFacet]={}ifjob_description:facets.update({"documentation":DocumentationJobFacet(description=job_description)})ifcode_location:facets.update({"sourceCodeLocation":SourceCodeLocationJobFacet("",url=code_location)})ifowners:facets.update({"ownership":OwnershipJobFacet(owners=[OwnershipJobFacetOwners(name=owner)forownerinowners])})ifjob_facets:facets={**facets,**job_facets}returnJob(_DAG_NAMESPACE,job_name,facets)