Source code for airflow.providers.openlineage.utils.utils
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportdatetimeimportjsonimportloggingimportrefromcontextlibimportredirect_stdout,suppressfromfunctoolsimportwrapsfromioimportStringIOfromtypingimportTYPE_CHECKING,Any,Iterableimportattrsfromdeprecatedimportdeprecatedfromopenlineage.client.utilsimportRedactMixinfrompackaging.versionimportVersionfromairflowimport__version__asAIRFLOW_VERSIONfromairflow.exceptionsimportAirflowProviderDeprecationWarning# TODO: move this maybe to Airflow's logic?fromairflow.modelsimportDAG,BaseOperator,MappedOperatorfromairflow.providers.openlineageimportconffromairflow.providers.openlineage.plugins.facetsimport(AirflowJobFacet,AirflowMappedTaskRunFacet,AirflowRunFacet,AirflowStateRunFacet,BaseFacet,UnknownOperatorAttributeRunFacet,UnknownOperatorInstance,)fromairflow.providers.openlineage.utils.selective_enableimport(is_dag_lineage_enabled,is_task_lineage_enabled,)fromairflow.serialization.serialized_objectsimportSerializedBaseOperatorfromairflow.utils.contextimportAirflowContextDeprecationWarningfromairflow.utils.log.secrets_maskerimportRedactable,Redacted,SecretsMasker,should_hide_value_for_keyfromairflow.utils.module_loadingimportimport_stringifTYPE_CHECKING:fromairflow.modelsimportDagRun,TaskInstance
[docs]defget_custom_facets(task_instance:TaskInstance|None=None)->dict[str,Any]:custom_facets={}# check for -1 comes from SmartSensor compatibility with dynamic task mapping# this comes from Airflow codeifhasattr(task_instance,"map_index")andgetattr(task_instance,"map_index")!=-1:custom_facets["airflow_mappedTask"]=AirflowMappedTaskRunFacet.from_task_instance(task_instance)returncustom_facets
[docs]defget_fully_qualified_class_name(operator:BaseOperator|MappedOperator)->str:ifisinstance(operator,(MappedOperator,SerializedBaseOperator)):# as in airflow.api_connexion.schemas.common_schema.ClassReferenceSchemareturnoperator._task_module+"."+operator._task_type# type: ignoreop_class=get_operator_class(operator)returnop_class.__module__+"."+op_class.__name__
[docs]defis_selective_lineage_enabled(obj:DAG|BaseOperator|MappedOperator)->bool:"""If selective enable is active check if DAG or Task is enabled to emit events."""ifnotconf.selective_enable():returnTrueifisinstance(obj,DAG):returnis_dag_lineage_enabled(obj)elifisinstance(obj,(BaseOperator,MappedOperator)):returnis_task_lineage_enabled(obj)else:raiseTypeError("is_selective_lineage_enabled can only be used on DAG or Operator objects")
[docs]classInfoJsonEncodable(dict):""" Airflow objects might not be json-encodable overall. The class provides additional attributes to control what and how is encoded: * renames: a dictionary of attribute name changes * | casts: a dictionary consisting of attribute names | and corresponding methods that should change | object value * includes: list of attributes to be included in encoding * excludes: list of attributes to be excluded from encoding Don't use both includes and excludes. """
def__init__(self,obj):self.obj=objself._fields=[]self._cast_fields()self._rename_fields()self._include_fields()dict.__init__(self,**{field:InfoJsonEncodable._cast_basic_types(getattr(self,field))forfieldinself._fields},)@staticmethoddef_cast_basic_types(value):ifisinstance(value,datetime.datetime):returnvalue.isoformat()ifisinstance(value,datetime.timedelta):returnf"{value.total_seconds()} seconds"ifisinstance(value,(set,tuple)):returnstr(list(value))returnvaluedef_rename_fields(self):forfield,renamedinself.renames.items():ifhasattr(self.obj,field):setattr(self,renamed,getattr(self.obj,field))self._fields.append(renamed)def_cast_fields(self):forfield,funcinself.casts.items():setattr(self,field,func(self.obj))self._fields.append(field)def_include_fields(self):ifself.includesandself.excludes:raiseValueError("Don't use both includes and excludes.")ifself.includes:forfieldinself.includes:iffieldnotinself._fieldsandhasattr(self.obj,field):setattr(self,field,getattr(self.obj,field))self._fields.append(field)else:forfield,valinself.obj.__dict__.items():iffieldnotinself._fieldsandfieldnotinself.excludesandfieldnotinself.renames:setattr(self,field,val)self._fields.append(field)
[docs]classDagInfo(InfoJsonEncodable):"""Defines encoding DAG object to JSON."""
def_safe_get_dag_tree_view(dag:DAG)->list[str]:# get_tree_view() has been added in Airflow 2.8.2ifhasattr(dag,"get_tree_view"):returndag.get_tree_view().splitlines()withredirect_stdout(StringIO())asstdout:dag.tree_view()returnstdout.getvalue().splitlines()def_get_parsed_dag_tree(dag:DAG)->dict:""" Get DAG's tasks hierarchy representation. While the task dependencies are defined as following: task >> [task_2, task_4] >> task_7 task_3 >> task_5 task_6 # has no dependencies, it's a root and a leaf The result of this function will look like: { "task": { "task_2": { "task_7": {} }, "task_4": { "task_7": {} } }, "task_3": { "task_5": {} }, "task_6": {} } """lines=_safe_get_dag_tree_view(dag)task_dict:dict[str,dict]={}parent_map:dict[int,tuple[str,dict]]={}forlineinlines:stripped_line=line.strip()ifnotstripped_line:continue# Determine the level by counting the leading spaces, assuming 4 spaces per level# as defined in airflow.models.dag.DAG._generate_tree_view()level=(len(line)-len(stripped_line))//4# airflow.models.baseoperator.BaseOperator.__repr__ is used in DAG tree# <Task({op_class}): {task_id}>match=re.match(r"^<Task\((.+)\): (.*?)>$",stripped_line)ifnotmatch:return{}current_task_id=match[2]iflevel==0:# It's a root tasktask_dict[current_task_id]={}parent_map[level]=(current_task_id,task_dict[current_task_id])else:# Find the immediate parent taskparent_task,parent_dict=parent_map[(level-1)]# Create new dict for the current taskparent_dict[current_task_id]={}# Update this task in the parent mapparent_map[level]=(current_task_id,parent_dict[current_task_id])returntask_dictdef_get_tasks_details(dag:DAG)->dict:tasks={single_task.task_id:{"operator":get_fully_qualified_class_name(single_task),"task_group":single_task.task_group.group_idifsingle_task.task_groupelseNone,"emits_ol_events":_emits_ol_events(single_task),"ui_color":single_task.ui_color,"ui_fgcolor":single_task.ui_fgcolor,"ui_label":single_task.label,"is_setup":single_task.is_setup,"is_teardown":single_task.is_teardown,}forsingle_taskindag.tasks}returntasksdef_get_task_groups_details(dag:DAG)->dict:return{tg_id:{"parent_group":tg.parent_group.group_id,"tooltip":tg.tooltip,"ui_color":tg.ui_color,"ui_fgcolor":tg.ui_fgcolor,"ui_label":tg.label,}fortg_id,tgindag.task_group_dict.items()}def_emits_ol_events(task:BaseOperator|MappedOperator)->bool:config_selective_enabled=is_selective_lineage_enabled(task)config_disabled_for_operators=is_operator_disabled(task)# empty operators without callbacks/outlets are skipped for optimization by Airflow# in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasksis_skipped_as_empty_operator=all((task.inherits_from_empty_operator,nottask.on_execute_callback,nottask.on_success_callback,nottask.outlets,))emits_ol_events=all((config_selective_enabled,notconfig_disabled_for_operators,notis_skipped_as_empty_operator))returnemits_ol_events
[docs]classOpenLineageRedactor(SecretsMasker):""" This class redacts sensitive data similar to SecretsMasker in Airflow logs. The difference is that our default max recursion depth is way higher - due to the structure of OL events we need more depth. Additionally, we allow data structures to specify data that needs not to be redacted by specifying _skip_redact list by deriving RedactMixin. """@classmethod
def_redact(self,item:Redactable,name:str|None,depth:int,max_depth:int)->Redacted:ifdepth>max_depth:returnitemtry:# It's impossible to check the type of variable in a dict without accessing it, and# this already causes warning - so suppress itwithsuppress(AirflowContextDeprecationWarning):iftype(item).__name__=="Proxy":# Those are deprecated values in _DEPRECATION_REPLACEMENTS# in airflow.utils.context.Contextreturn"<<non-redactable: Proxy>>"ifnameandshould_hide_value_for_key(name):returnself._redact_all(item,depth,max_depth)ifattrs.has(type(item)):# TODO: FIXME when mypy gets compatible with new attrsfordict_key,subvalinattrs.asdict(item,# type: ignore[arg-type]recurse=False,).items():if_is_name_redactable(dict_key,item):setattr(item,dict_key,self._redact(subval,name=dict_key,depth=(depth+1),max_depth=max_depth),)returnitemelifis_json_serializable(item)andhasattr(item,"__dict__"):fordict_key,subvalinitem.__dict__.items():iftype(subval).__name__=="Proxy":return"<<non-redactable: Proxy>>"if_is_name_redactable(dict_key,item):setattr(item,dict_key,self._redact(subval,name=dict_key,depth=(depth+1),max_depth=max_depth),)returnitemelse:returnsuper()._redact(item,name,depth,max_depth)exceptExceptionasexc:log.warning("Unable to redact %r. Error was: %s: %s",item,type(exc).__name__,exc)returnitem
[docs]defprint_warning(log):defdecorator(f):@wraps(f)defwrapper(*args,**kwargs):try:returnf(*args,**kwargs)exceptExceptionase:log.warning("Note: exception below is being caught: it's printed for visibility. However OpenLineage events aren't being emitted. If you see that, task has completed successfully despite not getting OL events.")log.warning(e)returnwrapperreturndecorator
@deprecated(reason=("`airflow.providers.openlineage.utils.utils.normalize_sql` ""has been deprecated and will be removed in future"),category=AirflowProviderDeprecationWarning,)
[docs]defshould_use_external_connection(hook)->bool:# If we're at Airflow 2.10, the execution is process-isolated, so we can safely run those again.ifnot_IS_AIRFLOW_2_10_OR_HIGHER:returnhook.__class__.__name__notin["SnowflakeHook","SnowflakeSqlApiHook","RedshiftSQLHook"]returnTrue