# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportloggingimportosimportreimportuuidfromtypingimportTYPE_CHECKING,Anyfromurllib.parseimporturlparsefromdateutil.parserimportparsefromjinja2importEnvironmentfromairflow.models.operatorimportBaseOperatorfromairflow.models.variableimportVariablefromairflow.utils.sessionimportcreate_sessionifTYPE_CHECKING:try:fromairflow.sdk.definitions.contextimportContextexceptImportError:# TODO: Remove once provider drops support for Airflow 2fromairflow.utils.contextimportContext
[docs]defenv_var(var:str,default:str|None=None)->str:""" Use this jinja method to access the environment variable named 'var'. If there is no such environment variable set, return the default. If the default is None, raise an exception for an undefined variable. """ifvarinos.environ:returnos.environ[var]elifdefaultisnotNone:returndefaultraiseValueError(f"Env var required but not provided: '{var}'")
[docs]defnot_match(result:str,pattern:str)->str:ifpatterninresult:raiseValueError(f"Found {pattern} in {result}")return"true"
[docs]defmatch(expected,result,env:Environment)->bool:""" Check if result is "equal" to expected value. Omits keys not specified in expected value and resolves any jinja templates found. """ifisinstance(expected,dict):# Take a look only at keys present at expected dictionaryifnotisinstance(result,dict):log.error("Not a dict: %s\nExpected %s",result,expected)returnFalsefork,vinexpected.items():ifknotinresult:log.error("Key %s not in received event %s\nExpected %s",k,result,expected)returnFalseifnotmatch(v,result[k],env):log.error("For key %s, expected value %s not equals received %s\nExpected: %s, request: %s",k,v,result[k],expected,result,)returnFalseelifisinstance(expected,list):iflen(expected)!=len(result):log.error("Length does not match: expected %d, result: %d",len(expected),len(result))returnFalsefori,xinenumerate(expected):ifnotmatch(x,result[i],env):log.error("List not matched at %d\nexpected:\n%s\nresult: \n%s",i,json.dumps(x),json.dumps(result[i]),)returnFalseelifisinstance(expected,str):if"{{"inexpected:# Evaluate jinja: in some cases, we want to check only if key exists, or if# value has the right typetry:rendered=env.from_string(expected).render(result=result)exceptValueErrorase:log.error("Error rendering jinja template %s: %s",expected,e)returnFalseifstr(rendered).lower()=="true"orrendered==result:returnTruelog.error("Rendered value %s does not equal 'true' or %s",rendered,result)returnFalseelifexpected!=result:log.error("Expected value %s does not equal result %s",expected,result)returnFalseelifexpected!=result:log.error("Object of type %s: %s does not match %s",type(expected),expected,result)returnFalsereturnTrue
[docs]classOpenLineageTestOperator(BaseOperator):""" This operator is added for system testing purposes. It compares expected event templates set on initialization with ones emitted by OpenLineage integration and stored in Variables by VariableTransport. :param event_templates: dictionary where key is the key used by VariableTransport in format of <DAG_ID>.<TASK_ID>.event.<EVENT_TYPE>, and value is event template (fragment) that need to be in received events. :param file_path: alternatively, file_path pointing to file with event templates will be used :param env: jinja environment used to render event templates :param allow_duplicate_events: if set to True, allows multiple events for the same key :param clear_variables: if set to True, clears all variables after checking events :raises: ValueError if the received events do not match with expected ones. """def__init__(self,event_templates:dict[str,dict]|None=None,file_path:str|None=None,env:Environment=setup_jinja(),allow_duplicate_events:bool=False,clear_variables:bool=True,**kwargs,):super().__init__(**kwargs)
ifself.event_templatesandself.file_path:raiseValueError("Can't pass both event_templates and file_path")
[docs]defexecute(self,context:Context)->None:ifself.file_pathisnotNone:self.event_templates={}withopen(self.file_path)asf:# type: ignore[arg-type]events=json.load(f)foreventinevents:key=event["job"]["name"]+".event."+event["eventType"].lower()self.event_templates[key]=eventforkey,templateinself.event_templates.items():# type: ignore[union-attr]send_event=Variable.get(key=key,deserialize_json=True)iflen(send_event)==0:raiseValueError(f"No event for key {key}")iflen(send_event)!=1andnotself.multiple_events:raiseValueError(f"Expected one event for key {key}, got {len(send_event)}")self.log.info("Events: %s, %s, %s",send_event,len(send_event),type(send_event))ifnotmatch(template,json.loads(send_event[0]),self.env):raiseValueError("Event received does not match one specified in test")ifself.delete:withcreate_session()assession:session.query(Variable).delete()