## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""LocalExecutor.. seealso:: For more information on how the LocalExecutor works, take a look at the guide: :ref:`executor:LocalExecutor`"""from__future__importannotationsimportloggingimportosimportsubprocessfromabcimportabstractmethodfrommultiprocessingimportManager,Processfrommultiprocessing.managersimportSyncManagerfromqueueimportEmpty,QueuefromtypingimportAny,Optional,Tuplefromsetproctitleimportgetproctitle,setproctitlefromairflowimportsettingsfromairflow.exceptionsimportAirflowExceptionfromairflow.executors.base_executorimportNOT_STARTED_MESSAGE,PARALLELISM,BaseExecutor,CommandTypefromairflow.models.taskinstanceimportTaskInstanceKey,TaskInstanceStateTypefromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.stateimportState# This is a work to be executed by a worker.# It can Key and Command - but it can also be None, None which is actually a# "Poison Pill" - worker seeing Poison Pill should take the pill and ... die instantly.
[docs]classLocalWorkerBase(Process,LoggingMixin):""" LocalWorkerBase implementation to run airflow commands. Executes the given command and puts the result into a result queue when done, terminating execution. :param result_queue: the queue to store result state """def__init__(self,result_queue:Queue[TaskInstanceStateType]):super().__init__(target=self.do_work)self.daemon:bool=Trueself.result_queue:Queue[TaskInstanceStateType]=result_queue
[docs]defrun(self):# We know we've just started a new process, so lets disconnect from the metadata db nowsettings.engine.pool.dispose()settings.engine.dispose()setproctitle("airflow worker -- LocalExecutor")returnsuper().run()
[docs]defexecute_work(self,key:TaskInstanceKey,command:CommandType)->None:""" Executes command received and stores result state in queue. :param key: the key to identify the task instance :param command: the command to execute """ifkeyisNone:returnself.log.info("%s running %s",self.__class__.__name__,command)setproctitle(f"airflow worker -- LocalExecutor: {command}")ifsettings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:state=self._execute_work_in_subprocess(command)else:state=self._execute_work_in_fork(command)self.result_queue.put((key,state))# Remove the command since the worker is done executing the tasksetproctitle("airflow worker -- LocalExecutor")
def_execute_work_in_subprocess(self,command:CommandType)->str:try:subprocess.check_call(command,close_fds=True)returnState.SUCCESSexceptsubprocess.CalledProcessErrorase:self.log.error("Failed to execute task %s.",str(e))returnState.FAILEDdef_execute_work_in_fork(self,command:CommandType)->str:pid=os.fork()ifpid:# In parent, wait for the childpid,ret=os.waitpid(pid,0)returnState.SUCCESSifret==0elseState.FAILEDfromairflow.sentryimportSentryret=1try:importsignalfromairflow.cli.cli_parserimportget_parsersignal.signal(signal.SIGINT,signal.SIG_DFL)signal.signal(signal.SIGTERM,signal.SIG_DFL)signal.signal(signal.SIGUSR2,signal.SIG_DFL)parser=get_parser()# [1:] - remove "airflow" from the start of the commandargs=parser.parse_args(command[1:])args.shut_down_logging=Falsesetproctitle(f"airflow task supervisor: {command}")args.func(args)ret=0returnState.SUCCESSexceptExceptionase:self.log.exception("Failed to execute task %s.",e)returnState.FAILEDfinally:Sentry.flush()logging.shutdown()os._exit(ret)@abstractmethod
[docs]defdo_work(self):"""Called in the subprocess and should then execute tasks"""raiseNotImplementedError()
[docs]classLocalWorker(LocalWorkerBase):""" Local worker that executes the task. :param result_queue: queue where results of the tasks are put. :param key: key identifying task instance :param command: Command to execute """def__init__(self,result_queue:Queue[TaskInstanceStateType],key:TaskInstanceKey,command:CommandType):super().__init__(result_queue)self.key:TaskInstanceKey=keyself.command:CommandType=command
[docs]classQueuedLocalWorker(LocalWorkerBase):""" LocalWorker implementation that is waiting for tasks from a queue and will continue executing commands as they become available in the queue. It will terminate execution once the poison token is found. :param task_queue: queue from which worker reads tasks :param result_queue: queue where worker puts results after finishing tasks """def__init__(self,task_queue:Queue[ExecutorWorkType],result_queue:Queue[TaskInstanceStateType]):super().__init__(result_queue=result_queue)self.task_queue=task_queue
[docs]defdo_work(self)->None:whileTrue:try:key,command=self.task_queue.get()exceptEOFError:self.log.info("Failed to read tasks from the task queue because the other ""end has closed the connection. Terminating worker %s.",self.name,)breaktry:ifkeyisNoneorcommandisNone:# Received poison pill, no more tasks to runbreakself.execute_work(key=key,command=command)finally:self.task_queue.task_done()
[docs]classLocalExecutor(BaseExecutor):""" LocalExecutor executes tasks locally in parallel. It uses the multiprocessing Python library and queues to parallelize the execution of tasks. :param parallelism: how many parallel processes are run in the executor """def__init__(self,parallelism:int=PARALLELISM):super().__init__(parallelism=parallelism)ifself.parallelism<0:raiseAirflowException("parallelism must be bigger than or equal to 0")self.manager:SyncManager|None=Noneself.result_queue:Queue[TaskInstanceStateType]|None=Noneself.workers:list[QueuedLocalWorker]=[]self.workers_used:int=0self.workers_active:int=0self.impl:None|(LocalExecutor.UnlimitedParallelism|LocalExecutor.LimitedParallelism)=None
[docs]classUnlimitedParallelism:""" Implements LocalExecutor with unlimited parallelism, starting one process per each command to execute. :param executor: the executor instance to implement. """def__init__(self,executor:LocalExecutor):self.executor:LocalExecutor=executor
[docs]defstart(self)->None:"""Starts the executor."""self.executor.workers_used=0self.executor.workers_active=0
[docs]defexecute_async(self,key:TaskInstanceKey,command:CommandType,queue:str|None=None,executor_config:Any|None=None,)->None:""" Executes task asynchronously. :param key: the key to identify the task instance :param command: the command to execute :param queue: Name of the queue :param executor_config: configuration for the executor """ifnotself.executor.result_queue:raiseAirflowException(NOT_STARTED_MESSAGE)local_worker=LocalWorker(self.executor.result_queue,key=key,command=command)self.executor.workers_used+=1self.executor.workers_active+=1local_worker.start()
[docs]defsync(self)->None:"""Sync will get called periodically by the heartbeat method."""ifnotself.executor.result_queue:raiseAirflowException("Executor should be started first")whilenotself.executor.result_queue.empty():results=self.executor.result_queue.get()self.executor.change_state(*results)self.executor.workers_active-=1
[docs]defend(self)->None:""" This method is called when the caller is done submitting job and wants to wait synchronously for the job submitted previously to be all done. """whileself.executor.workers_active>0:self.executor.sync()
[docs]classLimitedParallelism:""" Implements LocalExecutor with limited parallelism using a task queue to coordinate work distribution. :param executor: the executor instance to implement. """def__init__(self,executor:LocalExecutor):self.executor:LocalExecutor=executorself.queue:Queue[ExecutorWorkType]|None=None
[docs]defexecute_async(self,key:TaskInstanceKey,command:CommandType,queue:str|None=None,executor_config:Any|None=None,)->None:""" Executes task asynchronously. :param key: the key to identify the task instance :param command: the command to execute :param queue: name of the queue :param executor_config: configuration for the executor """ifnotself.queue:raiseAirflowException(NOT_STARTED_MESSAGE)self.queue.put((key,command))
[docs]defsync(self):"""Sync will get called periodically by the heartbeat method."""whileTrue:try:results=self.executor.result_queue.get_nowait()try:self.executor.change_state(*results)finally:self.executor.result_queue.task_done()exceptEmpty:break
[docs]defend(self):"""Ends the executor. Sends the poison pill to all workers."""for_inself.executor.workers:self.queue.put((None,None))# Wait for commands to finishself.queue.join()self.executor.sync()
[docs]defstart(self)->None:"""Starts the executor"""old_proctitle=getproctitle()setproctitle("airflow executor -- LocalExecutor")self.manager=Manager()setproctitle(old_proctitle)self.result_queue=self.manager.Queue()self.workers=[]self.workers_used=0self.workers_active=0self.impl=(LocalExecutor.UnlimitedParallelism(self)ifself.parallelism==0elseLocalExecutor.LimitedParallelism(self))self.impl.start()
[docs]defsync(self)->None:"""Sync will get called periodically by the heartbeat method."""ifnotself.impl:raiseAirflowException(NOT_STARTED_MESSAGE)self.impl.sync()
[docs]defend(self)->None:""" Ends the executor. :return: """ifnotself.impl:raiseAirflowException(NOT_STARTED_MESSAGE)ifnotself.manager:raiseAirflowException(NOT_STARTED_MESSAGE)self.log.info("Shutting down LocalExecutor""; waiting for running tasks to finish. Signal again if you don't want to wait.")self.impl.end()self.manager.shutdown()
[docs]defterminate(self):"""Terminate the executor is not doing anything."""