# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""All executors."""from__future__importannotationsimportloggingfromcontextlibimportsuppressfromenumimportEnum,uniquefromtypingimportTYPE_CHECKINGfromairflow.exceptionsimportAirflowConfigExceptionfromairflow.executors.executor_constantsimport(CELERY_EXECUTOR,CELERY_KUBERNETES_EXECUTOR,DASK_EXECUTOR,DEBUG_EXECUTOR,KUBERNETES_EXECUTOR,LOCAL_EXECUTOR,LOCAL_KUBERNETES_EXECUTOR,SEQUENTIAL_EXECUTOR,)fromairflow.utils.module_loadingimportimport_string
[docs]defget_default_executor(cls)->BaseExecutor:"""Creates a new instance of the configured executor if none exists and returns it."""ifcls._default_executorisnotNone:returncls._default_executorfromairflow.configurationimportconfexecutor_name=conf.get_mandatory_value("core","EXECUTOR")cls._default_executor=cls.load_executor(executor_name)returncls._default_executor
@classmethod
[docs]defload_executor(cls,executor_name:str)->BaseExecutor:""" Loads the executor. This supports the following formats: * by executor name for core executor * by ``{plugin_name}.{class_name}`` for executor from plugins * by import path. :return: an instance of executor class via executor_name """ifexecutor_name==CELERY_KUBERNETES_EXECUTOR:returncls.__load_celery_kubernetes_executor()elifexecutor_name==LOCAL_KUBERNETES_EXECUTOR:returncls.__load_local_kubernetes_executor()try:executor_cls,import_source=cls.import_executor_cls(executor_name)log.debug("Loading executor %s from %s",executor_name,import_source.value)exceptImportErrorase:log.error(e)raiseAirflowConfigException(f'The module/attribute could not be loaded. Please check "executor" key in "core" section. 'f'Current value: "{executor_name}".')log.info("Loaded executor: %s",executor_name)returnexecutor_cls()
@classmethod
[docs]defimport_executor_cls(cls,executor_name:str)->tuple[type[BaseExecutor],ConnectorSource]:""" Imports the executor class. Supports the same formats as ExecutorLoader.load_executor. :return: executor class via executor_name and executor import source """ifexecutor_nameincls.executors:returnimport_string(cls.executors[executor_name]),ConnectorSource.COREifexecutor_name.count(".")==1:log.debug("The executor name looks like the plugin path (executor_name=%s). Trying to import a ""executor from a plugin",executor_name,)withsuppress(ImportError,AttributeError):# Load plugins here for executors as at that time the plugins might not have been# initialized yetfromairflowimportplugins_managerplugins_manager.integrate_executor_plugins()returnimport_string(f"airflow.executors.{executor_name}"),ConnectorSource.PLUGINreturnimport_string(executor_name),ConnectorSource.CUSTOM_PATH