Source code for airflow.providers.openlineage.utils.utils
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportdatetimeimportjsonimportloggingimportosfromcontextlibimportsuppressfromfunctoolsimportwrapsfromtypingimportTYPE_CHECKING,Any,Iterablefromurllib.parseimportparse_qsl,urlencode,urlparse,urlunparseimportattrsfromattrsimportasdict# TODO: move this maybe to Airflow's logic?fromopenlineage.client.utilsimportRedactMixinfromairflow.compat.functoolsimportcachefromairflow.configurationimportconffromairflow.providers.openlineage.plugins.facetsimport(AirflowMappedTaskRunFacet,AirflowRunFacet,)fromairflow.utils.contextimportAirflowContextDeprecationWarningfromairflow.utils.log.secrets_maskerimportRedactable,Redacted,SecretsMasker,should_hide_value_for_keyifTYPE_CHECKING:fromairflow.modelsimportDAG,BaseOperator,Connection,DagRun,TaskInstance
[docs]defurl_to_https(url)->str|None:# Ensure URL existsifnoturl:returnNonebase_url=Noneifurl.startswith("git@"):part=url.split("git@")[1:2]ifpart:base_url=f'https://{part[0].replace(":","/",1)}'elifurl.startswith("https://"):base_url=urlifnotbase_url:raiseValueError(f"Unable to extract location from: {url}")ifbase_url.endswith(".git"):base_url=base_url[:-4]returnbase_url
[docs]defredacted_connection_uri(conn:Connection,filtered_params=None,filtered_prefixes=None):""" Return the connection URI for the given Connection. This method additionally filters URI by removing query parameters that are known to carry sensitive data like username, password, access key. """iffiltered_prefixesisNone:filtered_prefixes=[]iffiltered_paramsisNone:filtered_params=[]deffilter_key_params(k:str):returnknotinfiltered_paramsandany(substrinkforsubstrinfiltered_prefixes)conn_uri=conn.get_uri()parsed=urlparse(conn_uri)# Remove username and passwordnetloc=f"{parsed.hostname}"+(f":{parsed.port}"ifparsed.portelse"")parsed=parsed._replace(netloc=netloc)ifparsed.query:query_dict=dict(parse_qsl(parsed.query))ifconn.EXTRA_KEYinquery_dict:query_dict=json.loads(query_dict[conn.EXTRA_KEY])filtered_qs={k:vfork,vinquery_dict.items()ifnotfilter_key_params(k)}parsed=parsed._replace(query=urlencode(filtered_qs))returnurlunparse(parsed)
[docs]defget_custom_facets(task_instance:TaskInstance|None=None)->dict[str,Any]:custom_facets={}# check for -1 comes from SmartSensor compatibility with dynamic task mapping# this comes from Airflow codeifhasattr(task_instance,"map_index")andgetattr(task_instance,"map_index")!=-1:custom_facets["airflow_mappedTask"]=AirflowMappedTaskRunFacet.from_task_instance(task_instance)returncustom_facets
[docs]classInfoJsonEncodable(dict):""" Airflow objects might not be json-encodable overall. The class provides additional attributes to control what and how is encoded: * renames: a dictionary of attribute name changes * | casts: a dictionary consisting of attribute names | and corresponding methods that should change | object value * includes: list of attributes to be included in encoding * excludes: list of attributes to be excluded from encoding Don't use both includes and excludes. """
def__init__(self,obj):self.obj=objself._fields=[]self._cast_fields()self._rename_fields()self._include_fields()dict.__init__(self,**{field:InfoJsonEncodable._cast_basic_types(getattr(self,field))forfieldinself._fields},)@staticmethoddef_cast_basic_types(value):ifisinstance(value,datetime.datetime):returnvalue.isoformat()ifisinstance(value,(set,list,tuple)):returnstr(list(value))returnvaluedef_rename_fields(self):forfield,renamedinself.renames.items():ifhasattr(self.obj,field):setattr(self,renamed,getattr(self.obj,field))self._fields.append(renamed)def_cast_fields(self):forfield,funcinself.casts.items():setattr(self,field,func(self.obj))self._fields.append(field)def_include_fields(self):ifself.includesandself.excludes:raiseException("Don't use both includes and excludes.")ifself.includes:forfieldinself.includes:iffieldnotinself._fieldsandhasattr(self.obj,field):setattr(self,field,getattr(self.obj,field))self._fields.append(field)else:forfield,valinself.obj.__dict__.items():iffieldnotinself._fieldsandfieldnotinself.excludesandfieldnotinself.renames:setattr(self,field,val)self._fields.append(field)
[docs]classDagInfo(InfoJsonEncodable):"""Defines encoding DAG object to JSON."""
[docs]classOpenLineageRedactor(SecretsMasker):""" This class redacts sensitive data similar to SecretsMasker in Airflow logs. The difference is that our default max recursion depth is way higher - due to the structure of OL events we need more depth. Additionally, we allow data structures to specify data that needs not to be redacted by specifying _skip_redact list by deriving RedactMixin. """@classmethod
def_redact(self,item:Redactable,name:str|None,depth:int,max_depth:int)->Redacted:ifdepth>max_depth:returnitemtry:# It's impossible to check the type of variable in a dict without accessing it, and# this already causes warning - so suppress itwithsuppress(AirflowContextDeprecationWarning):iftype(item).__name__=="Proxy":# Those are deprecated values in _DEPRECATION_REPLACEMENTS# in airflow.utils.context.Contextreturn"<<non-redactable: Proxy>>"ifnameandshould_hide_value_for_key(name):returnself._redact_all(item,depth,max_depth)ifattrs.has(type(item)):# TODO: fixme when mypy gets compatible with new attrsfordict_key,subvalinattrs.asdict(item,recurse=False# type: ignore[arg-type]).items():if_is_name_redactable(dict_key,item):setattr(item,dict_key,self._redact(subval,name=dict_key,depth=(depth+1),max_depth=max_depth),)returnitemelifis_json_serializable(item)andhasattr(item,"__dict__"):fordict_key,subvalinitem.__dict__.items():iftype(subval).__name__=="Proxy":return"<<non-redactable: Proxy>>"if_is_name_redactable(dict_key,item):setattr(item,dict_key,self._redact(subval,name=dict_key,depth=(depth+1),max_depth=max_depth),)returnitemelse:returnsuper()._redact(item,name,depth,max_depth)exceptExceptionasexc:log.warning("Unable to redact %r. Error was: %s: %s",item,type(exc).__name__,exc)returnitem