Source code for airflow.providers.openlineage.utils.utils
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportdatetimeimportjsonimportloggingfromcontextlibimportsuppressfromfunctoolsimportwrapsfromimportlibimportmetadatafromtypingimportTYPE_CHECKING,Any,Callableimportattrsfromairflowimport__version__asAIRFLOW_VERSION# TODO: move this maybe to Airflow's logic?fromairflow.modelsimportBaseOperator,DagRun,TaskReschedulefromairflow.providers.openlineageimport(__version__asOPENLINEAGE_PROVIDER_VERSION,conf,)fromairflow.providers.openlineage.plugins.facetsimport(AirflowDagRunFacet,AirflowDebugRunFacet,AirflowJobFacet,AirflowMappedTaskRunFacet,AirflowRunFacet,AirflowStateRunFacet,UnknownOperatorAttributeRunFacet,UnknownOperatorInstance,)fromairflow.providers.openlineage.utils.selective_enableimport(is_dag_lineage_enabled,is_task_lineage_enabled,)fromairflow.providers.openlineage.version_compatimportAIRFLOW_V_2_10_PLUS,AIRFLOW_V_3_0_PLUSfromairflow.sensors.baseimportBaseSensorOperatorfromairflow.serialization.serialized_objectsimportSerializedBaseOperatorfromairflow.utils.module_loadingimportimport_stringfromairflow.utils.sessionimportNEW_SESSION,provide_sessionfromopenlineage.client.utilsimportRedactMixintry:fromairflow.sdkimportBaseOperatorasSdkBaseOperatorexceptImportError:
ifTYPE_CHECKING:fromairflow.modelsimportTaskInstancefromairflow.providers.common.compat.assetsimportAssetfromairflow.sdkimportDAG,MappedOperatorfromairflow.sdk.execution_time.secrets_maskerimport(Redactable,Redacted,SecretsMasker,should_hide_value_for_key,)fromairflow.utils.stateimportDagRunState,TaskInstanceStatefromopenlineage.client.event_v2importDatasetasOpenLineageDatasetfromopenlineage.client.facet_v2importRunFacet,processing_engine_runelse:try:fromairflow.sdkimportDAG,MappedOperatorexceptImportError:fromairflow.modelsimportDAG,MappedOperatortry:fromairflow.providers.common.compat.assetsimportAssetexceptImportError:ifAIRFLOW_V_3_0_PLUS:fromairflow.sdkimportAssetelse:# dataset is renamed to asset since Airflow 3.0fromairflow.datasetsimportDatasetasAssettry:fromairflow.sdk.execution_time.secrets_maskerimport(Redactable,Redacted,SecretsMasker,should_hide_value_for_key,)exceptImportError:fromairflow.utils.log.secrets_maskerimport(Redactable,Redacted,SecretsMasker,should_hide_value_for_key,)
[docs]defget_airflow_mapped_task_facet(task_instance:TaskInstance)->dict[str,Any]:# check for -1 comes from SmartSensor compatibility with dynamic task mapping# this comes from Airflow codelog.debug("AirflowMappedTaskRunFacet is deprecated and will be removed. ""Use information from AirflowRunFacet instead.")ifhasattr(task_instance,"map_index")andgetattr(task_instance,"map_index")!=-1:return{"airflow_mappedTask":AirflowMappedTaskRunFacet.from_task_instance(task_instance)}return{}
[docs]defget_user_provided_run_facets(ti:TaskInstance,ti_state:TaskInstanceState)->dict[str,RunFacet]:custom_facets={}# Append custom run facets by executing the custom_run_facet functions.forcustom_facet_funcinconf.custom_run_facets():try:func:Callable[[TaskInstance,TaskInstanceState],dict[str,RunFacet]]|None=(try_import_from_string(custom_facet_func))ifnotfunc:log.warning("OpenLineage is unable to import custom facet function `%s`; will ignore it.",custom_facet_func,)continuefacets:dict[str,RunFacet]|None=func(ti,ti_state)iffacetsandisinstance(facets,dict):duplicate_facet_keys=[facet_keyforfacet_keyinfacetsiffacet_keyincustom_facets]ifduplicate_facet_keys:log.warning("Duplicate OpenLineage custom facets key(s) found: `%s` from function `%s`; ""this will overwrite the previous value.",", ".join(duplicate_facet_keys),custom_facet_func,)log.debug("Adding OpenLineage custom facet with key(s): `%s` from function `%s`.",tuple(facets),custom_facet_func,)custom_facets.update(facets)exceptExceptionasexc:log.warning("Error processing custom facet function `%s`; will ignore it. Error was: %s: %s",custom_facet_func,type(exc).__name__,exc,)returncustom_facets
[docs]defget_fully_qualified_class_name(operator:BaseOperator|MappedOperator|SdkBaseOperator)->str:ifisinstance(operator,(MappedOperator,SerializedBaseOperator)):# as in airflow.api_connexion.schemas.common_schema.ClassReferenceSchemareturnoperator._task_module+"."+operator._task_type# type: ignoreop_class=get_operator_class(operator)returnop_class.__module__+"."+op_class.__name__
[docs]defis_selective_lineage_enabled(obj:DAG|BaseOperator|MappedOperator|SdkBaseOperator)->bool:"""If selective enable is active check if DAG or Task is enabled to emit events."""ifnotconf.selective_enable():returnTrueifisinstance(obj,DAG):returnis_dag_lineage_enabled(obj)elifisinstance(obj,(BaseOperator,MappedOperator,SdkBaseOperator)):returnis_task_lineage_enabled(obj)else:raiseTypeError("is_selective_lineage_enabled can only be used on DAG or Operator objects")
[docs]classInfoJsonEncodable(dict):""" Airflow objects might not be json-encodable overall. The class provides additional attributes to control what and how is encoded: * renames: a dictionary of attribute name changes * | casts: a dictionary consisting of attribute names | and corresponding methods that should change | object value * includes: list of attributes to be included in encoding * excludes: list of attributes to be excluded from encoding Don't use both includes and excludes. """
self._fields=[]self._cast_fields()self._rename_fields()self._include_fields()dict.__init__(self,**{field:InfoJsonEncodable._cast_basic_types(getattr(self,field))forfieldinself._fields},)delself.obj@staticmethoddef_cast_basic_types(value):ifisinstance(value,datetime.datetime):returnvalue.isoformat()ifisinstance(value,datetime.timedelta):returnf"{value.total_seconds()} seconds"ifisinstance(value,(set,list,tuple)):returnstr(list(value))returnvaluedef_rename_fields(self):forfield,renamedinself.renames.items():ifhasattr(self.obj,field):setattr(self,renamed,getattr(self.obj,field))self._fields.append(renamed)def_cast_fields(self):forfield,funcinself.casts.items():setattr(self,field,func(self.obj))self._fields.append(field)def_include_fields(self):ifself.includesandself.excludes:raiseValueError("Don't use both includes and excludes.")ifself.includes:forfieldinself.includes:iffieldnotinself._fieldsandhasattr(self.obj,field):setattr(self,field,getattr(self.obj,field))self._fields.append(field)else:ifhasattr(self.obj,"__dict__"):obj_fields=self.obj.__dict__elifattrs.has(self.obj.__class__):# e.g. attrs.define class with slots=True has no __dict__obj_fields={field.name:getattr(self.obj,field.name)forfieldinattrs.fields(self.obj.__class__)}else:raiseValueError("Cannot iterate over fields: "f"The object of type {type(self.obj).__name__} neither has a __dict__ attribute ""nor is defined as an attrs class.")forfield,valinobj_fields.items():iffieldnotinself._fieldsandfieldnotinself.excludesandfieldnotinself.renames:setattr(self,field,val)self._fields.append(field)
[docs]classDagInfo(InfoJsonEncodable):"""Defines encoding DAG object to JSON."""
[docs]includes=["dag_id","description","fileloc","owner","schedule_interval",# For Airflow 2."timetable_summary",# For Airflow 3."start_date","tags",]
@conf.cachedef_get_all_packages_installed()->dict[str,str]:""" Retrieve a dictionary of all installed packages and their versions. This operation involves scanning the system's installed packages, which can be a heavy operation. It is recommended to cache the result to avoid repeated, expensive lookups. """return{dist.metadata["Name"]:dist.versionfordistinmetadata.distributions()}
[docs]defget_airflow_debug_facet()->dict[str,AirflowDebugRunFacet]:ifnotconf.debug_mode():return{}log.warning("OpenLineage debug_mode is enabled. Be aware that this may log and emit extensive details.")return{"debug":AirflowDebugRunFacet(packages=_get_all_packages_installed(),)}
[docs]defget_airflow_job_facet(dag_run:DagRun)->dict[str,AirflowJobFacet]:ifnotdag_run.dag:return{}return{"airflow":AirflowJobFacet(taskTree={},# caused OOM errors, to be removed, see #41587taskGroups=_get_task_groups_details(dag_run.dag),tasks=_get_tasks_details(dag_run.dag),)}
def_get_tasks_details(dag:DAG)->dict:tasks={single_task.task_id:{"operator":get_fully_qualified_class_name(single_task),"task_group":single_task.task_group.group_idifsingle_task.task_groupelseNone,"emits_ol_events":_emits_ol_events(single_task),"ui_color":single_task.ui_color,"ui_fgcolor":single_task.ui_fgcolor,"ui_label":single_task.label,"is_setup":single_task.is_setup,"is_teardown":single_task.is_teardown,"downstream_task_ids":sorted(single_task.downstream_task_ids),}forsingle_taskinsorted(dag.tasks,key=lambdax:x.task_id)}returntasksdef_get_task_groups_details(dag:DAG)->dict:return{tg_id:{"parent_group":tg.parent_group.group_id,"ui_color":tg.ui_color,"ui_fgcolor":tg.ui_fgcolor,"ui_label":tg.label,}fortg_id,tgindag.task_group_dict.items()}def_emits_ol_events(task:BaseOperator|MappedOperator)->bool:config_selective_enabled=is_selective_lineage_enabled(task)config_disabled_for_operators=is_operator_disabled(task)# empty operators without callbacks/outlets are skipped for optimization by Airflow# in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasksis_skipped_as_empty_operator=all((task.inherits_from_empty_operator,notgetattr(task,"on_execute_callback",None),notgetattr(task,"on_success_callback",None),nottask.outlets,))emits_ol_events=all((config_selective_enabled,notconfig_disabled_for_operators,notis_skipped_as_empty_operator,))returnemits_ol_events
[docs]defget_unknown_source_attribute_run_facet(task:BaseOperator,name:str|None=None):ifnotname:name=get_operator_class(task).__name__log.debug("UnknownOperatorAttributeRunFacet is deprecated and will be removed. ""Use information from AirflowRunFacet instead.")return{"unknownSourceAttribute":attrs.asdict(UnknownOperatorAttributeRunFacet(unknownItems=[UnknownOperatorInstance(name=name,properties=TaskInfo(task),)]))}
[docs]classOpenLineageRedactor(SecretsMasker):""" This class redacts sensitive data similar to SecretsMasker in Airflow logs. The difference is that our default max recursion depth is way higher - due to the structure of OL events we need more depth. Additionally, we allow data structures to specify data that needs not to be redacted by specifying _skip_redact list by deriving RedactMixin. """@classmethod
def_redact(self,item:Redactable,name:str|None,depth:int,max_depth:int)->Redacted:ifAIRFLOW_V_3_0_PLUS:# Keep compatibility for Airflow 2.x, remove when Airflow 3.0 is the minimum versionclassAirflowContextDeprecationWarning(UserWarning):passelse:fromairflow.utils.contextimport(# type: ignore[attr-defined,no-redef]AirflowContextDeprecationWarning,)ifdepth>max_depth:returnitemtry:# It's impossible to check the type of variable in a dict without accessing it, and# this already causes warning - so suppress itwithsuppress(AirflowContextDeprecationWarning):iftype(item).__name__=="Proxy":# Those are deprecated values in _DEPRECATION_REPLACEMENTS# in airflow.utils.context.Contextreturn"<<non-redactable: Proxy>>"ifnameandshould_hide_value_for_key(name):returnself._redact_all(item,depth,max_depth)ifattrs.has(type(item)):# TODO: FIXME when mypy gets compatible with new attrsfordict_key,subvalinattrs.asdict(item,# type: ignore[arg-type]recurse=False,).items():if_is_name_redactable(dict_key,item):setattr(item,dict_key,self._redact(subval,name=dict_key,depth=(depth+1),max_depth=max_depth,),)returnitemelifis_json_serializable(item)andhasattr(item,"__dict__"):fordict_key,subvalinitem.__dict__.items():iftype(subval).__name__=="Proxy":return"<<non-redactable: Proxy>>"if_is_name_redactable(dict_key,item):setattr(item,dict_key,self._redact(subval,name=dict_key,depth=(depth+1),max_depth=max_depth,),)returnitemelse:returnsuper()._redact(item,name,depth,max_depth)exceptExceptionasexc:log.warning("Unable to redact %r. Error was: %s: %s",item,type(exc).__name__,exc)returnitem
[docs]defprint_warning(log):defdecorator(f):@wraps(f)defwrapper(*args,**kwargs):try:returnf(*args,**kwargs)exceptException:log.warning("OpenLineage event emission failed. Exception below is being caught: it's printed for visibility. This has no impact on actual task execution status.",exc_info=True,)returnwrapperreturndecorator
[docs]defshould_use_external_connection(hook)->bool:# If we're at Airflow 2.10, the execution is process-isolated, so we can safely run those again.ifnotAIRFLOW_V_2_10_PLUS:returnhook.__class__.__name__notin["SnowflakeHook","SnowflakeSqlApiHook","RedshiftSQLHook",]returnTrue
[docs]deftranslate_airflow_asset(asset:Asset,lineage_context)->OpenLineageDataset|None:""" Convert an Asset with an AIP-60 compliant URI to an OpenLineageDataset. This function returns None if no URI normalizer is defined, no asset converter is found or some core Airflow changes are missing and ImportError is raised. """ifAIRFLOW_V_3_0_PLUS:fromairflow.sdk.definitions.assetimport_get_normalized_schemeelse:try:fromairflow.datasetsimport_get_normalized_scheme# type: ignore[no-redef, attr-defined]exceptImportError:returnNonetry:fromairflow.providers_managerimportProvidersManagerol_converters=getattr(ProvidersManager(),"asset_to_openlineage_converters",None)ifnotol_converters:ol_converters=ProvidersManager().dataset_to_openlineage_converters# type: ignore[attr-defined]normalized_uri=asset.normalized_uriexcept(ImportError,AttributeError):returnNoneifnormalized_uriisNone:returnNoneifnot(normalized_scheme:=_get_normalized_scheme(normalized_uri)):returnNoneif(airflow_to_ol_converter:=ol_converters.get(normalized_scheme))isNone:returnNonereturnairflow_to_ol_converter(Asset(uri=normalized_uri,extra=asset.extra),lineage_context)