Source code for airflow.providers.celery.executors.celery_executor
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""CeleryExecutor... seealso:: For more information on how the CeleryExecutor works, take a look at the guide: :doc:`/celery_executor`"""from__future__importannotationsimportloggingimportmathimportoperatorimporttimefromcollectionsimportCounterfromcollections.abcimportSequencefromconcurrent.futuresimportProcessPoolExecutorfrommultiprocessingimportcpu_countfromtypingimportTYPE_CHECKING,Anyfromdeprecatedimportdeprecatedfromairflow.cli.cli_configimport(ARG_DAEMON,ARG_LOG_FILE,ARG_PID,ARG_SKIP_SERVE_LOGS,ARG_STDERR,ARG_STDOUT,ARG_VERBOSE,ActionCommand,Arg,GroupCommand,lazy_load_command,)fromairflow.configurationimportconffromairflow.exceptionsimportAirflowProviderDeprecationWarning,AirflowTaskTimeoutfromairflow.executors.base_executorimportBaseExecutorfromairflow.providers.celery.version_compatimportAIRFLOW_V_3_0_PLUSfromairflow.statsimportStatsfromairflow.utils.stateimportTaskInstanceStatefromceleryimportstatesascelery_states
[docs]def__getattr__(name):# This allows us to make the Celery app accessible through the# celery_executor module without the time cost of its import and# constructionifname=="app":fromairflow.providers.celery.executors.celery_executor_utilsimportappreturnappraiseAttributeError(f"module '{__name__}' has no attribute '{name}'")
"""To start the celery worker, run the command:airflow celery worker"""# flower cli args
[docs]CELERY_COMMANDS=(ActionCommand(name="worker",help="Start a Celery worker node",func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.worker"),args=(ARG_QUEUES,ARG_CONCURRENCY,ARG_CELERY_HOSTNAME,ARG_PID,ARG_DAEMON,ARG_UMASK,ARG_STDOUT,ARG_STDERR,ARG_LOG_FILE,ARG_AUTOSCALE,ARG_SKIP_SERVE_LOGS,ARG_WITHOUT_MINGLE,ARG_WITHOUT_GOSSIP,ARG_VERBOSE,),),ActionCommand(name="flower",help="Start a Celery Flower",func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.flower"),args=(ARG_FLOWER_HOSTNAME,ARG_FLOWER_PORT,ARG_FLOWER_CONF,ARG_FLOWER_URL_PREFIX,ARG_FLOWER_BASIC_AUTH,ARG_BROKER_API,ARG_PID,ARG_DAEMON,ARG_STDOUT,ARG_STDERR,ARG_LOG_FILE,ARG_VERBOSE,),),ActionCommand(name="stop",help="Stop the Celery worker gracefully",func=lazy_load_command(f"{CELERY_CLI_COMMAND_PATH}.stop_worker"),args=(ARG_PID,ARG_VERBOSE),),)
[docs]classCeleryExecutor(BaseExecutor):""" CeleryExecutor is recommended for production use of Airflow. It allows distributing the execution of task instances to multiple worker nodes. Celery is a simple, flexible and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. """
ifTYPE_CHECKINGandAIRFLOW_V_3_0_PLUS:# In the v3 path, we store workloads, not commands as strings.# TODO: TaskSDK: move this type change into BaseExecutor
def__init__(self):super().__init__()# Celery doesn't support bulk sending the tasks (which can become a bottleneck on bigger clusters)# so we use a multiprocessing pool to speed this up.# How many worker processes are created for checking celery task state.self._sync_parallelism=conf.getint("celery","SYNC_PARALLELISM")ifself._sync_parallelism==0:self._sync_parallelism=max(1,cpu_count()-1)fromairflow.providers.celery.executors.celery_executor_utilsimportBulkStateFetcher
[docs]defstart(self)->None:self.log.debug("Starting Celery Executor using %s processes for syncing",self._sync_parallelism)
def_num_tasks_per_send_process(self,to_send_count:int)->int:""" How many Celery tasks should each worker process send. :return: Number of tasks that should be sent per process """returnmax(1,math.ceil(to_send_count/self._sync_parallelism))def_process_tasks(self,task_tuples:list[TaskTuple])->None:# Airflow V2 versionfromairflow.providers.celery.executors.celery_executor_utilsimportexecute_commandtask_tuples_to_send=[task_tuple[:3]+(execute_command,)fortask_tupleintask_tuples]self._send_tasks(task_tuples_to_send)def_process_workloads(self,input:list[workloads.All])->None:# Airflow V3 version -- have to delay imports until we know we are on v3fromairflow.executorsimportworkloadsfromairflow.providers.celery.executors.celery_executor_utilsimportexecute_workloadtasks=[(workload.ti.key,workload,workload.ti.queue,execute_workload)forworkloadininputifisinstance(workload,workloads.ExecuteTask)]iflen(tasks)!=len(input):invalid=list(workloadforworkloadininputifnotisinstance(workload,workloads.ExecuteTask))raiseValueError(f"{type(self)}._process_workloads cannot handle {invalid}")self._send_tasks(tasks)def_send_tasks(self,task_tuples_to_send:Sequence[TaskInstanceInCelery]):first_task=next(t[-1]fortintask_tuples_to_send)# Celery state queries will stuck if we do not use one same backend# for all tasks.cached_celery_backend=first_task.backendkey_and_async_results=self._send_tasks_to_celery(task_tuples_to_send)self.log.debug("Sent all tasks.")fromairflow.providers.celery.executors.celery_executor_utilsimportExceptionWithTracebackforkey,_,resultinkey_and_async_results:ifisinstance(result,ExceptionWithTraceback)andisinstance(result.exception,AirflowTaskTimeout):retries=self.task_publish_retries[key]ifretries<self.task_publish_max_retries:Stats.incr("celery.task_timeout_error")self.log.info("[Try %s of %s] Task Timeout Error for Task: (%s).",self.task_publish_retries[key]+1,self.task_publish_max_retries,tuple(key),)self.task_publish_retries[key]=retries+1continueself.queued_tasks.pop(key)self.task_publish_retries.pop(key,None)ifisinstance(result,ExceptionWithTraceback):self.log.error("%s: %s\n%s\n",CELERY_SEND_ERR_MSG_HEADER,result.exception,result.traceback)self.event_buffer[key]=(TaskInstanceState.FAILED,None)elifresultisnotNone:result.backend=cached_celery_backendself.running.add(key)self.tasks[key]=result# Store the Celery task_id in the event buffer. This will get "overwritten" if the task# has another event, but that is fine, because the only other events are success/failed at# which point we don't need the ID anymore anywayself.event_buffer[key]=(TaskInstanceState.QUEUED,result.task_id)def_send_tasks_to_celery(self,task_tuples_to_send:Sequence[TaskInstanceInCelery]):fromairflow.providers.celery.executors.celery_executor_utilsimportsend_task_to_executoriflen(task_tuples_to_send)==1orself._sync_parallelism==1:# One tuple, or max one process -> send it in the main thread.returnlist(map(send_task_to_executor,task_tuples_to_send))# Use chunks instead of a work queue to reduce context switching# since tasks are roughly uniform in sizechunksize=self._num_tasks_per_send_process(len(task_tuples_to_send))num_processes=min(len(task_tuples_to_send),self._sync_parallelism)withProcessPoolExecutor(max_workers=num_processes)assend_pool:key_and_async_results=list(send_pool.map(send_task_to_executor,task_tuples_to_send,chunksize=chunksize))returnkey_and_async_results
[docs]defsync(self)->None:ifnotself.tasks:self.log.debug("No task to query celery, skipping sync")returnself.update_all_task_states()
[docs]defdebug_dump(self)->None:"""Debug dump; called in response to SIGUSR2 by the scheduler."""super().debug_dump()self.log.info("executor.tasks (%d)\n\t%s",len(self.tasks),"\n\t".join(map(repr,self.tasks.items())))
[docs]defupdate_all_task_states(self)->None:"""Update states of the tasks."""self.log.debug("Inquiring about %s celery task(s)",len(self.tasks))state_and_info_by_celery_task_id=self.bulk_state_fetcher.get_many(self.tasks.values())self.log.debug("Inquiries completed.")forkey,async_resultinlist(self.tasks.items()):state,info=state_and_info_by_celery_task_id.get(async_result.task_id)ifstate:self.update_task_state(key,state,info)
[docs]defchange_state(self,key:TaskInstanceKey,state:TaskInstanceState,info=None,remove_running=True)->None:try:super().change_state(key,state,info,remove_running=remove_running)except(AttributeError,TypeError):# Earlier versions of the BaseExecutor don't accept the remove_running parameter for this method# TODO: Remove when min airflow version >= 2.9.2super().change_state(key,state,info)self.tasks.pop(key,None)
[docs]defupdate_task_state(self,key:TaskInstanceKey,state:str,info:Any)->None:"""Update state of a single task."""try:ifstate==celery_states.SUCCESS:self.success(key,info)elifstatein(celery_states.FAILURE,celery_states.REVOKED):self.fail(key,info)elifstatein(celery_states.STARTED,celery_states.PENDING,celery_states.RETRY):passelse:self.log.info("Unexpected state for %s: %s",key,state)exceptException:self.log.exception("Error syncing the Celery executor, ignoring it.")
[docs]deftry_adopt_task_instances(self,tis:Sequence[TaskInstance])->Sequence[TaskInstance]:# See which of the TIs are still alive (or have finished even!)## Since Celery doesn't store "SENT" state for queued commands (if we create an AsyncResult with a made# up id it just returns PENDING state for it), we have to store Celery's task_id against the TI row to# look at in future.## This process is not perfect -- we could have sent the task to celery, and crashed before we were# able to record the AsyncResult.task_id in the TaskInstance table, in which case we won't adopt the# task (it'll either run and update the TI state, or the scheduler will clear and re-queue it. Either# way it won't get executed more than once)## (If we swapped it around, and generated a task_id for Celery, stored that in TI and enqueued that# there is also still a race condition where we could generate and store the task_id, but die before# we managed to enqueue the command. Since neither way is perfect we always have to deal with this# process not being perfect.)fromcelery.resultimportAsyncResultcelery_tasks={}not_adopted_tis=[]fortiintis:ifti.external_executor_idisnotNone:celery_tasks[ti.external_executor_id]=(AsyncResult(ti.external_executor_id),ti)else:not_adopted_tis.append(ti)ifnotcelery_tasks:# Nothing to adoptreturntisstates_by_celery_task_id=self.bulk_state_fetcher.get_many(list(map(operator.itemgetter(0),celery_tasks.values())))adopted=[]cached_celery_backend=next(iter(celery_tasks.values()))[0].backendforcelery_task_id,(state,info)instates_by_celery_task_id.items():result,ti=celery_tasks[celery_task_id]result.backend=cached_celery_backendifisinstance(result.result,BaseException):e=result.result# Log the exception we got from the remote endself.log.warning("Task %s failed with error",ti.key,exc_info=e)# Set the correct elements of the state dicts, then update this# like we just queried it.self.tasks[ti.key]=resultself.running.add(ti.key)self.update_task_state(ti.key,state,info)adopted.append(f"{ti} in state {state}")ifadopted:task_instance_str="\n\t".join(adopted)self.log.info("Adopted the following %d tasks from a dead executor\n\t%s",len(adopted),task_instance_str)returnnot_adopted_tis
@deprecated(reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.",category=AirflowProviderDeprecationWarning,)
[docs]defcleanup_stuck_queued_tasks(self,tis:list[TaskInstance])->list[str]:""" Remove tasks stuck in queued from executor and fail them. This method is deprecated. Use `cleanup_tasks_stuck_in_queued` instead. """reprs=[]fortiintis:reprs.append(repr(ti))self.revoke_task(ti=ti)self.fail(ti.key)returnreprs
[docs]defrevoke_task(self,*,ti:TaskInstance):fromairflow.providers.celery.executors.celery_executor_utilsimportappcelery_async_result=self.tasks.pop(ti.key,None)ifcelery_async_result:try:app.control.revoke(celery_async_result.task_id)exceptException:self.log.exception("Error revoking task instance %s from celery",ti.key)self.running.discard(ti.key)self.queued_tasks.pop(ti.key,None)
@staticmethod
[docs]defget_cli_commands()->list[GroupCommand]:return[GroupCommand(name="celery",help="Celery components",description=("Start celery components. Works only when using CeleryExecutor. For more information, ""see https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html"),subcommands=CELERY_COMMANDS,),]
[docs]defqueue_workload(self,workload:workloads.All,session:Session|None)->None:fromairflow.executorsimportworkloadsifnotisinstance(workload,workloads.ExecuteTask):raiseRuntimeError(f"{type(self)} cannot handle workloads of type {type(workload)}")ti=workload.tiself.queued_tasks[ti.key]=workload
def_get_parser()->argparse.ArgumentParser:""" Generate documentation; used by Sphinx. :meta private: """returnCeleryExecutor._get_parser()