Source code for airflow.providers.celery.executors.default_celery
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Default celery configuration."""from__future__importannotationsimportjsonimportloggingimportsslimportre2fromairflow.configurationimportconffromairflow.exceptionsimportAirflowConfigException,AirflowExceptionfromairflow.providers.celery.version_compatimportAIRFLOW_V_3_0_PLUSdef_broker_supports_visibility_timeout(url):returnurl.startswith(("redis://","rediss://","sqs://","sentinel://"))
# IMPORTANT NOTE! Celery Executor has initialization done dynamically and it performs initialization when# it is imported, so we need fallbacks here in order to be able to import the class directly without# having configuration initialized before. Do not remove those fallbacks!## This is not strictly needed for production:## * for Airflow 2.6 and before the defaults will come from the core defaults# * for Airflow 2.7+ the defaults will be loaded via ProvidersManager## But it helps in our tests to import the executor class and validate if the celery code can be imported# in the current and older versions of Airflow.
ifnotisinstance(sentinel_kwargs,dict):raiseValueErrorbroker_transport_options["sentinel_kwargs"]=sentinel_kwargsexceptException:raiseAirflowException("sentinel_kwargs should be written in the correct dictionary format.")ifconf.has_option("celery","RESULT_BACKEND"):
else:log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.")result_backend=f"db+{conf.get('database','SQL_ALCHEMY_CONN')}"
# In order to not change anything pre Task Execution API, we leave this setting as it was (unset) in Airflow2ifAIRFLOW_V_3_0_PLUS:DEFAULT_CELERY_CONFIG.setdefault("worker_redirect_stdouts",False)DEFAULT_CELERY_CONFIG.setdefault("worker_hijack_root_logger",False)def_get_celery_ssl_active()->bool:try:returnconf.getboolean("celery","SSL_ACTIVE")exceptAirflowConfigException:log.warning("Celery Executor will run without SSL")returnFalse
elifbroker_urlandre2.search("rediss?://|sentinel://",broker_url):broker_use_ssl={"ssl_keyfile":conf.get("celery","SSL_KEY"),"ssl_certfile":conf.get("celery","SSL_CERT"),"ssl_ca_certs":conf.get("celery","SSL_CACERT"),"ssl_cert_reqs":ssl.CERT_REQUIRED,}else:raiseAirflowException("The broker you configured does not support SSL_ACTIVE to be True. ""Please use RabbitMQ or Redis if you would like to use SSL for broker.")DEFAULT_CELERY_CONFIG["broker_use_ssl"]=broker_use_sslexceptAirflowConfigException:raiseAirflowException("AirflowConfigException: SSL_ACTIVE is True, please ensure SSL_KEY, SSL_CERT and SSL_CACERT are set")exceptExceptionase:raiseAirflowException(f"Exception: There was an unknown Celery SSL Error. Please ensure you want to use SSL and/or have "f"all necessary certs and key ({e}).")
ifmatch_not_recommended_backend:log.warning("You have configured a result_backend using the protocol `%s`,"" it is highly recommended to use an alternative result_backend (i.e. a database).",match_not_recommended_backend.group(0).strip("://"),)