# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Base executor - this is the base class for all the implemented executors."""from__future__importannotationsimportsysimportwarningsfromcollectionsimportOrderedDictfromtypingimportAny,Counter,List,Optional,Sequence,Tuplefromairflow.callbacks.base_callback_sinkimportBaseCallbackSinkfromairflow.callbacks.callback_requestsimportCallbackRequestfromairflow.configurationimportconffromairflow.exceptionsimportRemovedInAirflow3Warningfromairflow.models.taskinstanceimportTaskInstance,TaskInstanceKeyfromairflow.statsimportStatsfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.stateimportState
[docs]classBaseExecutor(LoggingMixin):""" Class to derive in order to interface with executor-type systems like Celery, Kubernetes, Local, Sequential and the likes. :param parallelism: how many jobs should run at one time. Set to ``0`` for infinity """
[docs]defstart(self):# pragma: no cover"""Executors may need to get things started."""
[docs]defqueue_command(self,task_instance:TaskInstance,command:CommandType,priority:int=1,queue:str|None=None,):"""Queues command to task"""iftask_instance.keynotinself.queued_tasks:self.log.info("Adding to queue: %s",command)self.queued_tasks[task_instance.key]=(command,priority,queue,task_instance)else:self.log.error("could not queue task %s",task_instance.key)
[docs]defqueue_task_instance(self,task_instance:TaskInstance,mark_success:bool=False,pickle_id:str|None=None,ignore_all_deps:bool=False,ignore_depends_on_past:bool=False,ignore_task_deps:bool=False,ignore_ti_state:bool=False,pool:str|None=None,cfg_path:str|None=None,)->None:"""Queues task instance."""pool=poolortask_instance.pool# TODO (edgarRd): AIRFLOW-1985:# cfg_path is needed to propagate the config values if using impersonation# (run_as_user), given that there are different code paths running tasks.# For a long term solution we need to address AIRFLOW-1986command_list_to_run=task_instance.command_as_list(local=True,mark_success=mark_success,ignore_all_deps=ignore_all_deps,ignore_depends_on_past=ignore_depends_on_past,ignore_task_deps=ignore_task_deps,ignore_ti_state=ignore_ti_state,pool=pool,pickle_id=pickle_id,cfg_path=cfg_path,)self.log.debug("created command %s",command_list_to_run)self.queue_command(task_instance,command_list_to_run,priority=task_instance.task.priority_weight_total,queue=task_instance.task.queue,
)
[docs]defhas_task(self,task_instance:TaskInstance)->bool:""" Checks if a task is either queued or running in this executor. :param task_instance: TaskInstance :return: True if the task is known to this executor """returntask_instance.keyinself.queued_tasksortask_instance.keyinself.running
[docs]defsync(self)->None:""" Sync will get called periodically by the heartbeat method. Executors should override this to perform gather statuses. """
[docs]defheartbeat(self)->None:"""Heartbeat sent to trigger new jobs."""ifnotself.parallelism:open_slots=len(self.queued_tasks)else:open_slots=self.parallelism-len(self.running)num_running_tasks=len(self.running)num_queued_tasks=len(self.queued_tasks)self.log.debug("%s running task instances",num_running_tasks)self.log.debug("%s in queue",num_queued_tasks)self.log.debug("%s open slots",open_slots)Stats.gauge("executor.open_slots",open_slots)Stats.gauge("executor.queued_tasks",num_queued_tasks)Stats.gauge("executor.running_tasks",num_running_tasks)self.trigger_tasks(open_slots)# Calling child class sync methodself.log.debug("Calling the %s sync method",self.__class__)self.sync()
[docs]deforder_queued_tasks_by_priority(self)->list[tuple[TaskInstanceKey,QueuedTaskInstanceType]]:""" Orders the queued tasks by priority. :return: List of tuples from the queued_tasks according to the priority. """returnsorted(self.queued_tasks.items(),key=lambdax:x[1][1],reverse=True,
)
[docs]deftrigger_tasks(self,open_slots:int)->None:""" Initiates async execution of the queued tasks, up to the number of available slots. :param open_slots: Number of open slots """sorted_queue=self.order_queued_tasks_by_priority()task_tuples=[]for_inrange(min((open_slots,len(self.queued_tasks)))):key,(command,_,queue,ti)=sorted_queue.pop(0)# If a task makes it here but is still understood by the executor# to be running, it generally means that the task has been killed# externally and not yet been marked as failed.## However, when a task is deferred, there is also a possibility of# a race condition where a task might be scheduled again during# trigger processing, even before we are able to register that the# deferred task has completed. In this case and for this reason,# we make a small number of attempts to see if the task has been# removed from the running set in the meantime.ifkeyinself.running:attempt=self.attempts[key]ifattempt<QUEUEING_ATTEMPTS-1:self.attempts[key]=attempt+1self.log.info("task %s is still running",key)continue# We give up and remove the task from the queue.self.log.error("could not queue task %s (still running after %d attempts)",key,attempt)delself.attempts[key]delself.queued_tasks[key]else:task_tuples.append((key,command,queue,ti.executor_config))iftask_tuples:self._process_tasks(task_tuples)
[docs]defchange_state(self,key:TaskInstanceKey,state:str,info=None)->None:""" Changes state of the task. :param info: Executor information for the task instance :param key: Unique key for the task instance :param state: State to set for the task. """self.log.debug("Changing state: %s",key)try:self.running.remove(key)exceptKeyError:self.log.debug("Could not find key: %s",str(key))self.event_buffer[key]=state,info
[docs]deffail(self,key:TaskInstanceKey,info=None)->None:""" Set fail state for the event. :param info: Executor information for the task instance :param key: Unique key for the task instance """self.change_state(key,State.FAILED,info)
[docs]defsuccess(self,key:TaskInstanceKey,info=None)->None:""" Set success state for the event. :param info: Executor information for the task instance :param key: Unique key for the task instance """self.change_state(key,State.SUCCESS,info)
[docs]defget_event_buffer(self,dag_ids=None)->dict[TaskInstanceKey,EventBufferValueType]:""" Returns and flush the event buffer. In case dag_ids is specified it will only return and flush events for the given dag_ids. Otherwise it returns and flushes all events. :param dag_ids: the dag_ids to return events for; returns all if given ``None``. :return: a dict of events """cleared_events:dict[TaskInstanceKey,EventBufferValueType]={}ifdag_idsisNone:cleared_events=self.event_bufferself.event_buffer={}else:forti_keyinlist(self.event_buffer.keys()):ifti_key.dag_idindag_ids:cleared_events[ti_key]=self.event_buffer.pop(ti_key)returncleared_events
[docs]defexecute_async(self,key:TaskInstanceKey,command:CommandType,queue:str|None=None,executor_config:Any|None=None,)->None:# pragma: no cover""" This method will execute the command asynchronously. :param key: Unique key for the task instance :param command: Command to run :param queue: name of the queue :param executor_config: Configuration passed to the executor. """raiseNotImplementedError()
[docs]defend(self)->None:# pragma: no cover""" This method is called when the caller is done submitting job and wants to wait synchronously for the job submitted previously to be all done. """raiseNotImplementedError()
[docs]defterminate(self):"""This method is called when the daemon receives a SIGTERM"""raiseNotImplementedError()
[docs]deftry_adopt_task_instances(self,tis:Sequence[TaskInstance])->Sequence[TaskInstance]:""" Try to adopt running task instances that have been abandoned by a SchedulerJob dying. Anything that is not adopted will be cleared by the scheduler (and then become eligible for re-scheduling) :return: any TaskInstances that were unable to be adopted """# By default, assume Executors cannot adopt tasks, so just say we failed to adopt anything.# Subclasses can do better!returntis
@property
[docs]defslots_available(self):"""Number of new tasks this executor instance can accept"""ifself.parallelism:returnself.parallelism-len(self.running)-len(self.queued_tasks)else:returnsys.maxsize
@staticmethod
[docs]defvalidate_command(command:list[str])->None:""" Back-compat method to Check if the command to execute is airflow command :param command: command to check :return: None """warnings.warn(""" The `validate_command` method is deprecated. Please use ``validate_airflow_tasks_run_command`` """,RemovedInAirflow3Warning,stacklevel=2,)BaseExecutor.validate_airflow_tasks_run_command(command)
@staticmethod
[docs]defvalidate_airflow_tasks_run_command(command:list[str])->tuple[str|None,str|None]:""" Check if the command to execute is airflow command Returns tuple (dag_id,task_id) retrieved from the command (replaced with None values if missing) """ifcommand[0:3]!=["airflow","tasks","run"]:raiseValueError('The command must start with ["airflow", "tasks", "run"].')iflen(command)>3and"--help"notincommand:dag_id:str|None=Nonetask_id:str|None=Noneforargincommand[3:]:ifnotarg.startswith("--"):ifdag_idisNone:dag_id=argelse:task_id=argbreakreturndag_id,task_idreturnNone,None
[docs]defdebug_dump(self):"""Called in response to SIGUSR2 by the scheduler"""self.log.info("executor.queued (%d)\n\t%s",len(self.queued_tasks),"\n\t".join(map(repr,self.queued_tasks.items())),)self.log.info("executor.running (%d)\n\t%s",len(self.running),"\n\t".join(map(repr,self.running)))self.log.info("executor.event_buffer (%d)\n\t%s",len(self.event_buffer),"\n\t".join(map(repr,self.event_buffer.items())),
)
[docs]defsend_callback(self,request:CallbackRequest)->None:"""Sends callback for execution. Provides a default implementation which sends the callback to the `callback_sink` object. :param request: Callback request to be executed. """ifnotself.callback_sink:raiseValueError("Callback sink is not ready.")self.callback_sink.send(request)