# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""All executors."""importloggingfromcontextlibimportsuppressfromtypingimportOptionalfromairflow.exceptionsimportAirflowConfigExceptionfromairflow.executors.base_executorimportBaseExecutorfromairflow.executors.executor_constantsimport(CELERY_EXECUTOR,CELERY_KUBERNETES_EXECUTOR,DASK_EXECUTOR,DEBUG_EXECUTOR,KUBERNETES_EXECUTOR,LOCAL_EXECUTOR,SEQUENTIAL_EXECUTOR,)fromairflow.utils.module_loadingimportimport_string
[docs]defget_default_executor(cls)->BaseExecutor:"""Creates a new instance of the configured executor if none exists and returns it"""ifcls._default_executorisnotNone:returncls._default_executorfromairflow.configurationimportconfexecutor_name=conf.get('core','EXECUTOR')cls._default_executor=cls.load_executor(executor_name)returncls._default_executor
@classmethod
[docs]defload_executor(cls,executor_name:str)->BaseExecutor:""" Loads the executor. This supports the following formats: * by executor name for core executor * by ``{plugin_name}.{class_name}`` for executor from plugins * by import path. :return: an instance of executor class via executor_name """ifexecutor_name==CELERY_KUBERNETES_EXECUTOR:returncls.__load_celery_kubernetes_executor()ifexecutor_nameincls.executors:log.debug("Loading core executor: %s",executor_name)returnimport_string(cls.executors[executor_name])()# If the executor name looks like "plugin executor path" then try to load plugins.ifexecutor_name.count(".")==1:log.debug("The executor name looks like the plugin path (executor_name=%s). Trying to load a ""executor from a plugin",executor_name,)withsuppress(ImportError,AttributeError):# Load plugins here for executors as at that time the plugins might not have been# initialized yetfromairflowimportplugins_managerplugins_manager.integrate_executor_plugins()returnimport_string(f"airflow.executors.{executor_name}")()log.debug("Loading executor from custom path: %s",executor_name)try:executor=import_string(executor_name)()exceptImportErrorase:log.error(e)raiseAirflowConfigException(f'The module/attribute could not be loaded. Please check "executor" key in "core" section. 'f'Current value: "{executor_name}".')log.info("Loaded executor: %s",executor_name)returnexecutor
@classmethoddef__load_celery_kubernetes_executor(cls)->BaseExecutor:""":return: an instance of CeleryKubernetesExecutor"""celery_executor=import_string(cls.executors[CELERY_EXECUTOR])()kubernetes_executor=import_string(cls.executors[KUBERNETES_EXECUTOR])()celery_kubernetes_executor_cls=import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR])returncelery_kubernetes_executor_cls(celery_executor,kubernetes_executor)