Source code for airflow.providers.openlineage.utils.utils
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportdatetimeimportjsonimportloggingfromcontextlibimportsuppressfromfunctoolsimportwrapsfromtypingimportTYPE_CHECKING,Any,Iterableimportattrsfromopenlineage.client.utilsimportRedactMixin# TODO: move this maybe to Airflow's logic?fromairflow.modelsimportDAG,BaseOperator,MappedOperatorfromairflow.providers.openlineageimportconffromairflow.providers.openlineage.plugins.facetsimport(AirflowMappedTaskRunFacet,AirflowRunFacet,UnknownOperatorAttributeRunFacet,UnknownOperatorInstance,)fromairflow.providers.openlineage.utils.selective_enableimport(is_dag_lineage_enabled,is_task_lineage_enabled,)fromairflow.utils.contextimportAirflowContextDeprecationWarningfromairflow.utils.log.secrets_maskerimportRedactable,Redacted,SecretsMasker,should_hide_value_for_keyifTYPE_CHECKING:fromairflow.modelsimportDagRun,TaskInstance
[docs]defget_custom_facets(task_instance:TaskInstance|None=None)->dict[str,Any]:custom_facets={}# check for -1 comes from SmartSensor compatibility with dynamic task mapping# this comes from Airflow codeifhasattr(task_instance,"map_index")andgetattr(task_instance,"map_index")!=-1:custom_facets["airflow_mappedTask"]=AirflowMappedTaskRunFacet.from_task_instance(task_instance)returncustom_facets
[docs]defis_selective_lineage_enabled(obj:DAG|BaseOperator|MappedOperator)->bool:"""If selective enable is active check if DAG or Task is enabled to emit events."""ifnotconf.selective_enable():returnTrueifisinstance(obj,DAG):returnis_dag_lineage_enabled(obj)elifisinstance(obj,(BaseOperator,MappedOperator)):returnis_task_lineage_enabled(obj)else:raiseTypeError("is_selective_lineage_enabled can only be used on DAG or Operator objects")
[docs]classInfoJsonEncodable(dict):""" Airflow objects might not be json-encodable overall. The class provides additional attributes to control what and how is encoded: * renames: a dictionary of attribute name changes * | casts: a dictionary consisting of attribute names | and corresponding methods that should change | object value * includes: list of attributes to be included in encoding * excludes: list of attributes to be excluded from encoding Don't use both includes and excludes. """
def__init__(self,obj):self.obj=objself._fields=[]self._cast_fields()self._rename_fields()self._include_fields()dict.__init__(self,**{field:InfoJsonEncodable._cast_basic_types(getattr(self,field))forfieldinself._fields},)@staticmethoddef_cast_basic_types(value):ifisinstance(value,datetime.datetime):returnvalue.isoformat()ifisinstance(value,(set,list,tuple)):returnstr(list(value))returnvaluedef_rename_fields(self):forfield,renamedinself.renames.items():ifhasattr(self.obj,field):setattr(self,renamed,getattr(self.obj,field))self._fields.append(renamed)def_cast_fields(self):forfield,funcinself.casts.items():setattr(self,field,func(self.obj))self._fields.append(field)def_include_fields(self):ifself.includesandself.excludes:raiseValueError("Don't use both includes and excludes.")ifself.includes:forfieldinself.includes:iffieldnotinself._fieldsandhasattr(self.obj,field):setattr(self,field,getattr(self.obj,field))self._fields.append(field)else:forfield,valinself.obj.__dict__.items():iffieldnotinself._fieldsandfieldnotinself.excludesandfieldnotinself.renames:setattr(self,field,val)self._fields.append(field)
[docs]classDagInfo(InfoJsonEncodable):"""Defines encoding DAG object to JSON."""
[docs]classOpenLineageRedactor(SecretsMasker):""" This class redacts sensitive data similar to SecretsMasker in Airflow logs. The difference is that our default max recursion depth is way higher - due to the structure of OL events we need more depth. Additionally, we allow data structures to specify data that needs not to be redacted by specifying _skip_redact list by deriving RedactMixin. """@classmethod
def_redact(self,item:Redactable,name:str|None,depth:int,max_depth:int)->Redacted:ifdepth>max_depth:returnitemtry:# It's impossible to check the type of variable in a dict without accessing it, and# this already causes warning - so suppress itwithsuppress(AirflowContextDeprecationWarning):iftype(item).__name__=="Proxy":# Those are deprecated values in _DEPRECATION_REPLACEMENTS# in airflow.utils.context.Contextreturn"<<non-redactable: Proxy>>"ifnameandshould_hide_value_for_key(name):returnself._redact_all(item,depth,max_depth)ifattrs.has(type(item)):# TODO: FIXME when mypy gets compatible with new attrsfordict_key,subvalinattrs.asdict(item,# type: ignore[arg-type]recurse=False,).items():if_is_name_redactable(dict_key,item):setattr(item,dict_key,self._redact(subval,name=dict_key,depth=(depth+1),max_depth=max_depth),)returnitemelifis_json_serializable(item)andhasattr(item,"__dict__"):fordict_key,subvalinitem.__dict__.items():iftype(subval).__name__=="Proxy":return"<<non-redactable: Proxy>>"if_is_name_redactable(dict_key,item):setattr(item,dict_key,self._redact(subval,name=dict_key,depth=(depth+1),max_depth=max_depth),)returnitemelse:returnsuper()._redact(item,name,depth,max_depth)exceptExceptionasexc:log.warning("Unable to redact %r. Error was: %s: %s",item,type(exc).__name__,exc)returnitem