Source code for airflow.providers.celery.cli.celery_command
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Celery command."""from__future__importannotationsimportloggingimportsysfromcontextlibimportcontextmanagerfrommultiprocessingimportProcessimportpsutilimportsqlalchemy.excfromlockfile.pidlockfileimportread_pid_from_pidfile,remove_existing_pidfilefromairflowimportsettingsfromairflow.configurationimportconffromairflow.providers.celery.version_compatimportAIRFLOW_V_3_0_PLUSfromairflow.utilsimportcliascli_utilsfromairflow.utils.cliimportsetup_locationsfromairflow.utils.serve_logsimportserve_logsfromceleryimportmaybe_patch_concurrency# type: ignore[attr-defined]fromcelery.app.defaultsimportDEFAULT_TASK_LOG_FMTfromcelery.signalsimportafter_setup_logger
def_run_command_with_daemon_option(*args,**kwargs):try:ifAIRFLOW_V_3_0_PLUS:fromairflow.cli.commands.local_commands.daemon_utilsimportrun_command_with_daemon_optionelse:fromairflow.cli.commands.daemon_utilsimportrun_command_with_daemon_optionrun_command_with_daemon_option(*args,**kwargs)exceptImportError:fromairflow.exceptionsimportAirflowOptionalProviderFeatureExceptionraiseAirflowOptionalProviderFeatureException("Failed to import run_command_with_daemon_option. This feature is only available in Airflow versions >= 2.8.0")def_providers_configuration_loaded(func):defwrapper(*args,**kwargs):try:fromairflow.utils.providers_configuration_loaderimportproviders_configuration_loadedproviders_configuration_loaded(func)(*args,**kwargs)exceptImportErrorase:fromairflow.exceptionsimportAirflowOptionalProviderFeatureExceptionraiseAirflowOptionalProviderFeatureException("Failed to import providers_configuration_loaded. This feature is only available in Airflow versions >= 2.8.0")fromereturnwrapper@cli_utils.action_cli@_providers_configuration_loaded
[docs]defflower(args):"""Start Flower, Celery monitoring tool."""# This needs to be imported locally to not trigger Providers Manager initializationfromairflow.providers.celery.executors.celery_executorimportappascelery_appoptions=["flower",conf.get("celery","BROKER_URL"),f"--address={args.hostname}",f"--port={args.port}",]ifargs.broker_api:options.append(f"--broker-api={args.broker_api}")ifargs.url_prefix:options.append(f"--url-prefix={args.url_prefix}")ifargs.basic_auth:options.append(f"--basic-auth={args.basic_auth}")ifargs.flower_conf:options.append(f"--conf={args.flower_conf}")_run_command_with_daemon_option(args=args,process_name="flower",callback=lambda:celery_app.start(options))
[docs]deflogger_setup_handler(logger,**kwargs):""" Reconfigure the logger. * remove any previously configured handlers * logs of severity error, and above goes to stderr, * logs of severity lower than error goes to stdout. """ifconf.getboolean("logging","celery_stdout_stderr_separation",fallback=False):celery_formatter=logging.Formatter(DEFAULT_TASK_LOG_FMT)classNoErrorOrAboveFilter(logging.Filter):"""Allow only logs with level *lower* than ERROR to be reported."""deffilter(self,record):returnrecord.levelno<logging.ERRORbelow_error_handler=logging.StreamHandler(sys.stdout)below_error_handler.addFilter(NoErrorOrAboveFilter())below_error_handler.setFormatter(celery_formatter)from_error_handler=logging.StreamHandler(sys.stderr)from_error_handler.setLevel(logging.ERROR)from_error_handler.setFormatter(celery_formatter)logger.handlers[:]=[below_error_handler,from_error_handler]
[docs]defworker(args):"""Start Airflow Celery worker."""# This needs to be imported locally to not trigger Providers Manager initializationfromairflow.providers.celery.executors.celery_executorimportappascelery_app# Disable connection pool so that celery worker does not hold an unnecessary db connectionsettings.reconfigure_orm(disable_connection_pool=True)ifnotsettings.validate_session():raiseSystemExit("Worker exiting, database connection precheck failed.")autoscale=args.autoscaleskip_serve_logs=args.skip_serve_logsifautoscaleisNoneandconf.has_option("celery","worker_autoscale"):autoscale=conf.get("celery","worker_autoscale")ifhasattr(celery_app.backend,"ResultSession"):# Pre-create the database tables now, otherwise SQLA via Celery has a# race condition where one of the subprocesses can die with "Table# already exists" error, because SQLA checks for which tables exist,# then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT# EXISTStry:session=celery_app.backend.ResultSession()session.close()exceptsqlalchemy.exc.IntegrityError:# At least on postgres, trying to create a table that already exist# gives a unique constraint violation or the# "pg_type_typname_nsp_index" table. If this happens we can ignore# it, we raced to create the tables and lost.pass# backwards-compatible: https://github.com/apache/airflow/pull/21506#pullrequestreview-879893763celery_log_level=conf.get("logging","CELERY_LOGGING_LEVEL")ifnotcelery_log_level:celery_log_level=conf.get("logging","LOGGING_LEVEL")# Setup Celery workeroptions=["worker","-O","fair","--queues",args.queues,"--concurrency",args.concurrency,"--hostname",args.celery_hostname,"--loglevel",celery_log_level,]ifautoscale:options.extend(["--autoscale",autoscale])ifargs.without_mingle:options.append("--without-mingle")ifargs.without_gossip:options.append("--without-gossip")ifconf.has_option("celery","pool"):pool=conf.get("celery","pool")options.extend(["--pool",pool])# Celery pools of type eventlet and gevent use greenlets, which# requires monkey patching the app:# https://eventlet.net/doc/patching.html#monkey-patch# Otherwise task instances hang on the workers and are never# executed.maybe_patch_concurrency(["-P",pool])worker_pid_file_path,stdout,stderr,log_file=setup_locations(process=WORKER_PROCESS_NAME,stdout=args.stdout,stderr=args.stderr,log=args.log_file,pid=args.pid,)defrun_celery_worker():with_serve_logs(skip_serve_logs):celery_app.worker_main(options)ifargs.umask:umask=args.umaskelse:umask=conf.get("celery","worker_umask",fallback=settings.DAEMON_UMASK)_run_command_with_daemon_option(args=args,process_name=WORKER_PROCESS_NAME,callback=run_celery_worker,should_setup_logging=True,umask=umask,pid_file=worker_pid_file_path,)