## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importdatetimeimportjsonimportloggingimporttracebackfromlogging.configimportDictConfigurator# type: ignorefromtimeimportsleepfromsqlalchemyimportand_,or_,tuple_fromairflow.exceptionsimportAirflowException,AirflowTaskTimeoutfromairflow.modelsimportBaseOperator,DagRun,SensorInstance,SkipMixin,TaskInstancefromairflow.settingsimportLOGGING_CLASS_PATHfromairflow.statsimportStatsfromairflow.utilsimporthelpers,timezonefromairflow.utils.emailimportsend_emailfromairflow.utils.log.logging_mixinimportset_contextfromairflow.utils.module_loadingimportimport_stringfromairflow.utils.netimportget_hostnamefromairflow.utils.sessionimportprovide_sessionfromairflow.utils.stateimportPokeState,Statefromairflow.utils.timeoutimporttimeout
[docs]classSensorWork:""" This class stores a sensor work with decoded context value. It is only used inside of smart sensor. Create a sensor work based on sensor instance record. A sensor work object has the following attributes: `dag_id`: sensor_instance dag_id. `task_id`: sensor_instance task_id. `execution_date`: sensor_instance execution_date. `try_number`: sensor_instance try_number `poke_context`: Decoded poke_context for the sensor task. `execution_context`: Decoded execution_context. `hashcode`: This is the signature of poking job. `operator`: The sensor operator class. `op_classpath`: The sensor operator class path `encoded_poke_context`: The raw data from sensor_instance poke_context column. `log`: The sensor work logger which will mock the corresponding task instance log. :param si: The sensor_instance ORM object. """def__init__(self,si):self.dag_id=si.dag_idself.task_id=si.task_idself.execution_date=si.execution_dateself.try_number=si.try_numberself.poke_context=json.loads(si.poke_context)ifsi.poke_contextelse{}self.execution_context=json.loads(si.execution_context)ifsi.execution_contextelse{}try:self.log=self._get_sensor_logger(si)exceptExceptionase:self.log=Noneprint(e)self.hashcode=si.hashcodeself.start_date=si.start_dateself.operator=si.operatorself.op_classpath=si.op_classpathself.encoded_poke_context=si.poke_context
[docs]defcreate_new_task_handler():""" Create task log handler for a sensor work. :return: log handler """fromairflow.utils.log.secrets_maskerimport_secrets_maskerhandler_config_copy={k:handler_config[k]forkinhandler_config}delhandler_config_copy['filters']formatter_config_copy={k:formatter_config[k]forkinformatter_config}handler=dictConfigurator.configure_handler(handler_config_copy)formatter=dictConfigurator.configure_formatter(formatter_config_copy)handler.setFormatter(formatter)# We want to share the _global_ filterer instance, not create a new onehandler.addFilter(_secrets_masker())returnhandler
def_get_sensor_logger(self,si):"""Return logger for a sensor instance object."""# The created log_id is used inside of smart sensor as the key to fetch# the corresponding in memory log handler.si.raw=False# Otherwise set_context will faillog_id="-".join([si.dag_id,si.task_id,si.execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f"),str(si.try_number)])logger=logging.getLogger('airflow.task'+'.'+log_id)iflen(logger.handlers)==0:handler=self.create_new_task_handler()logger.addHandler(handler)set_context(logger,si)line_break="-"*120logger.info(line_break)logger.info("Processing sensor task %s in smart sensor service on host: %s",self.ti_key,get_hostname())logger.info(line_break)returnlogger
[docs]defclose_sensor_logger(self):"""Close log handler for a sensor work."""forhandlerinself.log.handlers:try:handler.close()exceptExceptionase:print(e)
@property
[docs]defti_key(self):"""Key for the task instance that maps to the sensor work."""returnself.dag_id,self.task_id,self.execution_date
@property
[docs]defcache_key(self):"""Key used to query in smart sensor for cached sensor work."""returnself.operator,self.encoded_poke_context
[docs]classCachedPokeWork:""" Wrapper class for the poke work inside smart sensor. It saves the sensor_task used to poke and recent poke result state. state: poke state. sensor_task: The cached object for executing the poke function. last_poke_time: The latest time this cached work being called. to_flush: If we should flush the cached work. """def__init__(self):self.state=Noneself.sensor_task=Noneself.last_poke_time=Noneself.to_flush=False
[docs]defset_state(self,state):""" Set state for cached poke work. :param state: The sensor_instance state. """self.state=stateself.last_poke_time=timezone.utcnow()
[docs]defclear_state(self):"""Clear state for cached poke work."""self.state=None
[docs]defset_to_flush(self):"""Mark this poke work to be popped from cached dict after current loop."""self.to_flush=True
[docs]defis_expired(self):""" The cached task object expires if there is no poke for 20 minutes. :return: Boolean """returnself.to_flushor(timezone.utcnow()-self.last_poke_time).total_seconds()>1200
[docs]classSensorExceptionInfo:""" Hold sensor exception information and the type of exception. For possible transient infra failure, give the task more chance to retry before fail it. """def__init__(self,exception_info,is_infra_failure=False,infra_failure_retry_window=datetime.timedelta(minutes=130),):self._exception_info=exception_infoself._is_infra_failure=is_infra_failureself._infra_failure_retry_window=infra_failure_retry_windowself._infra_failure_timeout=Noneself.set_infra_failure_timeout()self.fail_current_run=self.should_fail_current_run()
[docs]defset_latest_exception(self,exception_info,is_infra_failure=False):""" This function set the latest exception information for sensor exception. If the exception implies an infra failure, this function will check the recorded infra failure timeout which was set at the first infra failure exception arrives. There is a 6 hours window for retry without failing current run. :param exception_info: Details of the exception information. :param is_infra_failure: If current exception was caused by transient infra failure. There is a retry window _infra_failure_retry_window that the smart sensor will retry poke function without failing current task run. """self._exception_info=exception_infoself._is_infra_failure=is_infra_failureself.set_infra_failure_timeout()self.fail_current_run=self.should_fail_current_run()
[docs]defset_infra_failure_timeout(self):""" Set the time point when the sensor should be failed if it kept getting infra failure. :return: """# Only set the infra_failure_timeout if there is no existing oneifnotself._is_infra_failure:self._infra_failure_timeout=Noneelifself._infra_failure_timeoutisNone:self._infra_failure_timeout=timezone.utcnow()+self._infra_failure_retry_window
[docs]defshould_fail_current_run(self):""" :return: Should the sensor fail :type: boolean """returnnotself.is_infra_failureortimezone.utcnow()>self._infra_failure_timeout
[docs]defis_infra_failure(self):""" :return: If the exception is an infra failure :type: boolean """returnself._is_infra_failure
[docs]defis_expired(self):""" :return: If current exception need to be kept. :type: boolean """ifnotself._is_infra_failure:returnTruereturntimezone.utcnow()>self._infra_failure_timeout+datetime.timedelta(minutes=30)
[docs]classSmartSensorOperator(BaseOperator,SkipMixin):""" Smart sensor operators are derived from this class. Smart Sensor operators keep refresh a dictionary by visiting DB. Taking qualified active sensor tasks. Different from sensor operator, Smart sensor operators poke for all sensor tasks in the dictionary at a time interval. When a criteria is met or fail by time out, it update all sensor task state in task_instance table :param soft_fail: Set to true to mark the task as SKIPPED on failure :type soft_fail: bool :param poke_interval: Time in seconds that the job should wait in between each tries. :type poke_interval: int :param smart_sensor_timeout: Time, in seconds before the internal sensor job times out if poke_timeout is not defined. :type smart_sensor_timeout: float :param shard_min: shard code lower bound (inclusive) :type shard_min: int :param shard_max: shard code upper bound (exclusive) :type shard_max: int :param poke_timeout: Time, in seconds before the task times out and fails. :type poke_timeout: float """
def__init__(self,poke_interval=180,smart_sensor_timeout=60*60*24*7,soft_fail=False,shard_min=0,shard_max=100000,poke_timeout=6.0,*args,**kwargs,):super().__init__(*args,**kwargs)# super(SmartSensorOperator, self).__init__(*args, **kwargs)self.poke_interval=poke_intervalself.soft_fail=soft_failself.timeout=smart_sensor_timeoutself._validate_input_values()self.hostname=""self.sensor_works=[]self.cached_dedup_works={}self.cached_sensor_exceptions={}self.max_tis_per_query=50self.shard_min=shard_minself.shard_max=shard_maxself.poke_timeout=poke_timeoutdef_validate_input_values(self):ifnotisinstance(self.poke_interval,(int,float))orself.poke_interval<0:raiseAirflowException("The poke_interval must be a non-negative number")ifnotisinstance(self.timeout,(int,float))orself.timeout<0:raiseAirflowException("The timeout must be a non-negative number")@provide_sessiondef_load_sensor_works(self,session=None):""" Refresh sensor instances need to be handled by this operator. Create smart sensor internal object based on the information persisted in the sensor_instance table. """SI=SensorInstancewithStats.timer()astimer:query=(session.query(SI).filter(SI.state==State.SENSING).filter(SI.shardcode<self.shard_max,SI.shardcode>=self.shard_min))tis=query.all()self.log.info("Performance query %s tis, time: %.3f",len(tis),timer.duration)# Query without checking dagrun state might keep some failed dag_run tasks alive.# Join with DagRun table will be very slow based on the number of sensor tasks we# need to handle. We query all smart tasks in this operator# and expect scheduler correct the states in _change_state_for_tis_without_dagrun()sensor_works=[]fortiintis:try:sensor_works.append(SensorWork(ti))exceptException:self.log.exception("Exception at creating sensor work for ti %s",ti.key)self.log.info("%d tasks detected.",len(sensor_works))new_sensor_works=[xforxinsensor_worksifxnotinself.sensor_works]self._update_ti_hostname(new_sensor_works)self.sensor_works=sensor_works@provide_sessiondef_update_ti_hostname(self,sensor_works,session=None):""" Update task instance hostname for new sensor works. :param sensor_works: Smart sensor internal object for a sensor task. :param session: The sqlalchemy session. """DR=DagRunTI=TaskInstancedefupdate_ti_hostname_with_count(count,sensor_works):# Using or_ instead of in_ here to prevent from full table scan.ifsession.bind.dialect.name=='mssql':ti_filter=or_(and_(TI.dag_id==ti_key.dag_id,TI.task_id==ti_key.task_id,DR.execution_date==ti_key.execution_date,)forti_keyinsensor_works)else:ti_keys=[(x.dag_id,x.task_id,x.execution_date)forxinsensor_works]ti_filter=or_(tuple_(TI.dag_id,TI.task_id,DR.execution_date)==ti_keyforti_keyinti_keys)fortiinsession.query(TI).join(TI.dag_run).filter(ti_filter):ti.hostname=self.hostnamesession.commit()returncount+len(sensor_works)count=helpers.reduce_in_chunks(update_ti_hostname_with_count,sensor_works,0,self.max_tis_per_query)ifcount:self.log.info("Updated hostname on %s tis.",count)@provide_sessiondef_mark_multi_state(self,operator,poke_hash,encoded_poke_context,state,session=None):""" Mark state for multiple tasks in the task_instance table to a new state if they have the same signature as the poke_hash. :param operator: The sensor's operator class name. :param poke_hash: The hash code generated from sensor's poke context. :param encoded_poke_context: The raw encoded poke_context. :param state: Set multiple sensor tasks to this state. :param session: The sqlalchemy session. """defmark_state(ti,sensor_instance):ti.state=statesensor_instance.state=stateifstateinState.finished:ti.end_date=end_dateti.set_duration()SI=SensorInstanceTI=TaskInstancecount_marked=0query_result=[]try:query_result=(session.query(TI,SI).join(TI,and_(TI.dag_id==SI.dag_id,TI.task_id==SI.task_id,TI.execution_date==SI.execution_date,),).filter(SI.state==State.SENSING).filter(SI.hashcode==poke_hash).filter(SI.operator==operator).with_for_update().all())end_date=timezone.utcnow()forti,sensor_instanceinquery_result:ifsensor_instance.poke_context!=encoded_poke_context:continueti.hostname=self.hostnameifti.state==State.SENSING:mark_state(ti=ti,sensor_instance=sensor_instance)count_marked+=1else:# ti.state != State.SENSINGsensor_instance.state=ti.statesession.commit()exceptException:self.log.warning("Exception _mark_multi_state in smart sensor for hashcode %s",str(poke_hash),# cast to str in advance for highlightingexc_info=True,)self.log.info("Marked %s tasks out of %s to state %s",count_marked,len(query_result),state)@provide_sessiondef_retry_or_fail_task(self,sensor_work,error,session=None):""" Change single task state for sensor task. For final state, set the end_date. Since smart sensor take care all retries in one process. Failed sensor tasks logically experienced all retries and the try_number should be set to max_tries. :param sensor_work: The sensor_work with exception. :type sensor_work: SensorWork :param error: The error message for this sensor_work. :type error: str. :param session: The sqlalchemy session. """defemail_alert(task_instance,error_info):try:subject,html_content,_=task_instance.get_email_subject_content(error_info)email=sensor_work.execution_context.get('email')send_email(email,subject,html_content)exceptException:sensor_work.log.warning("Exception alerting email.",exc_info=True)defhandle_failure(sensor_work,ti):ifsensor_work.execution_context.get('retries')andti.try_number<=ti.max_tries:# retryti.state=State.UP_FOR_RETRYifsensor_work.execution_context.get('email_on_retry')andsensor_work.execution_context.get('email'):sensor_work.log.info("%s sending email alert for retry",sensor_work.ti_key)email_alert(ti,error)else:ti.state=State.FAILEDifsensor_work.execution_context.get('email_on_failure')andsensor_work.execution_context.get('email'):sensor_work.log.info("%s sending email alert for failure",sensor_work.ti_key)email_alert(ti,error)try:dag_id,task_id,execution_date=sensor_work.ti_keyTI=TaskInstanceSI=SensorInstancesensor_instance=(session.query(SI).filter(SI.dag_id==dag_id,SI.task_id==task_id,SI.execution_date==execution_date).with_for_update().first())ifsensor_instance.hashcode!=sensor_work.hashcode:# Return without setting statereturnti=(session.query(TI).filter(TI.dag_id==dag_id,TI.task_id==task_id,TI.execution_date==execution_date).with_for_update().first())ifti:ifti.state==State.SENSING:ti.hostname=self.hostnamehandle_failure(sensor_work,ti)sensor_instance.state=State.FAILEDti.end_date=timezone.utcnow()ti.set_duration()else:sensor_instance.state=ti.statesession.merge(sensor_instance)session.merge(ti)session.commit()sensor_work.log.info("Task %s got an error: %s. Set the state to failed. Exit.",str(sensor_work.ti_key),error)sensor_work.close_sensor_logger()exceptAirflowException:sensor_work.log.warning("Exception on failing %s",sensor_work.ti_key,exc_info=True)def_check_and_handle_ti_timeout(self,sensor_work):""" Check if a sensor task in smart sensor is timeout. Could be either sensor operator timeout or general operator execution_timeout. :param sensor_work: SensorWork """task_timeout=sensor_work.execution_context.get('timeout',self.timeout)task_execution_timeout=sensor_work.execution_context.get('execution_timeout')iftask_execution_timeout:task_timeout=min(task_timeout,task_execution_timeout)if(timezone.utcnow()-sensor_work.start_date).total_seconds()>task_timeout:error="Sensor Timeout"sensor_work.log.exception(error)self._retry_or_fail_task(sensor_work,error)def_handle_poke_exception(self,sensor_work):""" Fail task if accumulated exceptions exceeds retries. :param sensor_work: SensorWork """sensor_exception=self.cached_sensor_exceptions.get(sensor_work.cache_key)error=sensor_exception.exception_infosensor_work.log.exception("Handling poke exception: %s",error)ifsensor_exception.fail_current_run:ifsensor_exception.is_infra_failure:sensor_work.log.exception("Task %s failed by infra failure in smart sensor.",sensor_work.ti_key)# There is a risk for sensor object cached in smart sensor keep throwing# exception and cause an infra failure. To make sure the sensor tasks after# retry will not fall into same object and have endless infra failure,# we mark the sensor task after an infra failure so that it can be popped# before next poke loop.cache_key=sensor_work.cache_keyself.cached_dedup_works[cache_key].set_to_flush()else:sensor_work.log.exception("Task %s failed by exceptions.",sensor_work.ti_key)self._retry_or_fail_task(sensor_work,error)else:sensor_work.log.info("Exception detected, retrying without failing current run.")self._check_and_handle_ti_timeout(sensor_work)def_process_sensor_work_with_cached_state(self,sensor_work,state):ifstate==PokeState.LANDED:sensor_work.log.info("Task %s succeeded",str(sensor_work.ti_key))sensor_work.close_sensor_logger()ifstate==PokeState.NOT_LANDED:# Handle timeout if connection valid but not landed yetself._check_and_handle_ti_timeout(sensor_work)elifstate==PokeState.POKE_EXCEPTION:self._handle_poke_exception(sensor_work)def_execute_sensor_work(self,sensor_work):ti_key=sensor_work.ti_keylog=sensor_work.logorself.loglog.info("Sensing ti: %s",str(ti_key))log.info("Poking with arguments: %s",sensor_work.encoded_poke_context)cache_key=sensor_work.cache_keyifcache_keynotinself.cached_dedup_works:# create an empty cached_work for a new cache_keyself.cached_dedup_works[cache_key]=CachedPokeWork()cached_work=self.cached_dedup_works[cache_key]ifcached_work.stateisnotNone:# Have a valid cached state, don't poke twice in certain time intervalself._process_sensor_work_with_cached_state(sensor_work,cached_work.state)returntry:withtimeout(seconds=self.poke_timeout):ifself.poke(sensor_work):# Got a landed signal, mark all tasks waiting for this partitioncached_work.set_state(PokeState.LANDED)self._mark_multi_state(sensor_work.operator,sensor_work.hashcode,sensor_work.encoded_poke_context,State.SUCCESS,)log.info("Task %s succeeded",str(ti_key))sensor_work.close_sensor_logger()else:# Not landed yet. Handle possible timeoutcached_work.set_state(PokeState.NOT_LANDED)self._check_and_handle_ti_timeout(sensor_work)self.cached_sensor_exceptions.pop(cache_key,None)exceptExceptionase:# The retry_infra_failure decorator inside hive_hooks will raise exception with# is_infra_failure == True. Long poking timeout here is also considered an infra# failure. Other exceptions should fail.is_infra_failure=getattr(e,'is_infra_failure',False)orisinstance(e,AirflowTaskTimeout)exception_info=traceback.format_exc()cached_work.set_state(PokeState.POKE_EXCEPTION)ifcache_keyinself.cached_sensor_exceptions:self.cached_sensor_exceptions[cache_key].set_latest_exception(exception_info,is_infra_failure=is_infra_failure)else:self.cached_sensor_exceptions[cache_key]=SensorExceptionInfo(exception_info,is_infra_failure=is_infra_failure)self._handle_poke_exception(sensor_work)
[docs]defflush_cached_sensor_poke_results(self):"""Flush outdated cached sensor states saved in previous loop."""forkey,cached_workinself.cached_dedup_works.copy().items():ifcached_work.is_expired():self.cached_dedup_works.pop(key,None)else:cached_work.state=Noneforti_key,sensor_exceptioninself.cached_sensor_exceptions.copy().items():ifsensor_exception.fail_current_runorsensor_exception.is_expired():self.cached_sensor_exceptions.pop(ti_key,None)
[docs]defpoke(self,sensor_work):""" Function that the sensors defined while deriving this class should override. """cached_work=self.cached_dedup_works[sensor_work.cache_key]ifnotcached_work.sensor_task:init_args=dict(list(sensor_work.poke_context.items())+[('task_id',sensor_work.task_id)])operator_class=import_string(sensor_work.op_classpath)cached_work.sensor_task=operator_class(**init_args)returncached_work.sensor_task.poke(sensor_work.poke_context)
def_emit_loop_stats(self):try:count_poke=0count_poke_success=0count_poke_exception=0count_exception_failures=0count_infra_failure=0forcached_workinself.cached_dedup_works.values():ifcached_work.stateisNone:continuecount_poke+=1ifcached_work.state==PokeState.LANDED:count_poke_success+=1elifcached_work.state==PokeState.POKE_EXCEPTION:count_poke_exception+=1forcached_exceptioninself.cached_sensor_exceptions.values():ifcached_exception.is_infra_failureandcached_exception.fail_current_run:count_infra_failure+=1ifcached_exception.fail_current_run:count_exception_failures+=1Stats.gauge("smart_sensor_operator.poked_tasks",count_poke)Stats.gauge("smart_sensor_operator.poked_success",count_poke_success)Stats.gauge("smart_sensor_operator.poked_exception",count_poke_exception)Stats.gauge("smart_sensor_operator.exception_failures",count_exception_failures)Stats.gauge("smart_sensor_operator.infra_failures",count_infra_failure)exceptException:self.log.exception("Exception at getting loop stats %s")
[docs]defexecute(self,context):started_at=timezone.utcnow()self.hostname=get_hostname()whileTrue:poke_start_time=timezone.utcnow()self.flush_cached_sensor_poke_results()self._load_sensor_works()self.log.info("Loaded %s sensor_works",len(self.sensor_works))Stats.gauge("smart_sensor_operator.loaded_tasks",len(self.sensor_works))forsensor_workinself.sensor_works:self._execute_sensor_work(sensor_work)duration=(timezone.utcnow()-poke_start_time).total_seconds()self.log.info("Taking %s to execute %s tasks.",duration,len(self.sensor_works))Stats.timing("smart_sensor_operator.loop_duration",duration)Stats.gauge("smart_sensor_operator.executed_tasks",len(self.sensor_works))self._emit_loop_stats()ifduration<self.poke_interval:sleep(self.poke_interval-duration)if(timezone.utcnow()-started_at).total_seconds()>self.timeout:self.log.info("Time is out for smart sensor.")return