#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
LocalExecutor
.. seealso::
For more information on how the LocalExecutor works, take a look at the guide:
:ref:`executor:LocalExecutor`
"""
import logging
import os
import subprocess
from abc import abstractmethod
from multiprocessing import Manager, Process
from multiprocessing.managers import SyncManager
from queue import Empty, Queue # pylint: disable=unused-import # noqa: F401
from typing import Any, List, Optional, Tuple, Union # pylint: disable=unused-import # noqa: F401
from setproctitle import setproctitle # pylint: disable=no-name-in-module
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import NOT_STARTED_MESSAGE, PARALLELISM, BaseExecutor, CommandType
from airflow.models.taskinstance import ( # pylint: disable=unused-import # noqa: F401
TaskInstanceKey,
TaskInstanceStateType,
)
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
# This is a work to be executed by a worker.
# It can Key and Command - but it can also be None, None which is actually a
# "Poison Pill" - worker seeing Poison Pill should take the pill and ... die instantly.
[docs]ExecutorWorkType = Tuple[Optional[TaskInstanceKey], Optional[CommandType]]
[docs]class LocalWorkerBase(Process, LoggingMixin):
"""
LocalWorkerBase implementation to run airflow commands. Executes the given
command and puts the result into a result queue when done, terminating execution.
:param result_queue: the queue to store result state
"""
def __init__(self, result_queue: 'Queue[TaskInstanceStateType]'):
super().__init__(target=self.do_work)
self.daemon: bool = True
self.result_queue: 'Queue[TaskInstanceStateType]' = result_queue
[docs] def run(self):
# We know we've just started a new process, so lets disconnect from the metadata db now
settings.engine.pool.dispose()
settings.engine.dispose()
return super().run()
[docs] def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
"""
Executes command received and stores result state in queue.
:param key: the key to identify the task instance
:param command: the command to execute
"""
if key is None:
return
self.log.info("%s running %s", self.__class__.__name__, command)
if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
state = self._execute_work_in_subprocess(command)
else:
state = self._execute_work_in_fork(command)
self.result_queue.put((key, state))
[docs] def _execute_work_in_subprocess(self, command: CommandType) -> str:
try:
subprocess.check_call(command, close_fds=True)
return State.SUCCESS
except subprocess.CalledProcessError as e:
self.log.error("Failed to execute task %s.", str(e))
return State.FAILED
[docs] def _execute_work_in_fork(self, command: CommandType) -> str:
pid = os.fork()
if pid:
# In parent, wait for the child
pid, ret = os.waitpid(pid, 0)
return State.SUCCESS if ret == 0 else State.FAILED
from airflow.sentry import Sentry
ret = 1
try:
import signal
from airflow.cli.cli_parser import get_parser
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGUSR2, signal.SIG_DFL)
parser = get_parser()
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(command[1:])
args.shut_down_logging = False
setproctitle(f"airflow task supervisor: {command}")
args.func(args)
ret = 0
return State.SUCCESS
except Exception as e: # pylint: disable=broad-except
self.log.error("Failed to execute task %s.", str(e))
finally:
Sentry.flush()
logging.shutdown()
os._exit(ret) # pylint: disable=protected-access
raise RuntimeError('unreachable -- keep mypy happy')
@abstractmethod
[docs] def do_work(self):
"""Called in the subprocess and should then execute tasks"""
raise NotImplementedError()
[docs]class LocalWorker(LocalWorkerBase):
"""
Local worker that executes the task.
:param result_queue: queue where results of the tasks are put.
:param key: key identifying task instance
:param command: Command to execute
"""
def __init__(
self, result_queue: 'Queue[TaskInstanceStateType]', key: TaskInstanceKey, command: CommandType
):
super().__init__(result_queue)
self.key: TaskInstanceKey = key
self.command: CommandType = command
[docs] def do_work(self) -> None:
self.execute_work(key=self.key, command=self.command)
[docs]class QueuedLocalWorker(LocalWorkerBase):
"""
LocalWorker implementation that is waiting for tasks from a queue and will
continue executing commands as they become available in the queue.
It will terminate execution once the poison token is found.
:param task_queue: queue from which worker reads tasks
:param result_queue: queue where worker puts results after finishing tasks
"""
def __init__(self, task_queue: 'Queue[ExecutorWorkType]', result_queue: 'Queue[TaskInstanceStateType]'):
super().__init__(result_queue=result_queue)
self.task_queue = task_queue
[docs] def do_work(self) -> None:
while True:
try:
key, command = self.task_queue.get()
except EOFError:
self.log.info(
"Failed to read tasks from the task queue because the other "
"end has closed the connection. Terminating worker %s.",
self.name,
)
break
try:
if key is None or command is None:
# Received poison pill, no more tasks to run
break
self.execute_work(key=key, command=command)
finally:
self.task_queue.task_done()
[docs]class LocalExecutor(BaseExecutor):
"""
LocalExecutor executes tasks locally in parallel.
It uses the multiprocessing Python library and queues to parallelize the execution
of tasks.
:param parallelism: how many parallel processes are run in the executor
"""
def __init__(self, parallelism: int = PARALLELISM):
super().__init__(parallelism=parallelism)
self.manager: Optional[SyncManager] = None
self.result_queue: Optional['Queue[TaskInstanceStateType]'] = None
self.workers: List[QueuedLocalWorker] = []
self.workers_used: int = 0
self.workers_active: int = 0
self.impl: Optional[
Union['LocalExecutor.UnlimitedParallelism', 'LocalExecutor.LimitedParallelism']
] = None
[docs] class UnlimitedParallelism:
"""
Implements LocalExecutor with unlimited parallelism, starting one process
per each command to execute.
:param executor: the executor instance to implement.
"""
def __init__(self, executor: 'LocalExecutor'):
self.executor: 'LocalExecutor' = executor
[docs] def start(self) -> None:
"""Starts the executor."""
self.executor.workers_used = 0
self.executor.workers_active = 0
# pylint: disable=unused-argument # pragma: no cover
[docs] def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: Optional[str] = None,
executor_config: Optional[Any] = None,
) -> None:
"""
Executes task asynchronously.
:param key: the key to identify the task instance
:param command: the command to execute
:param queue: Name of the queue
:param executor_config: configuration for the executor
"""
if not self.executor.result_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
local_worker = LocalWorker(self.executor.result_queue, key=key, command=command)
self.executor.workers_used += 1
self.executor.workers_active += 1
local_worker.start()
# pylint: enable=unused-argument # pragma: no cover
[docs] def sync(self) -> None:
"""Sync will get called periodically by the heartbeat method."""
if not self.executor.result_queue:
raise AirflowException("Executor should be started first")
while not self.executor.result_queue.empty():
results = self.executor.result_queue.get()
self.executor.change_state(*results)
self.executor.workers_active -= 1
[docs] def end(self) -> None:
"""
This method is called when the caller is done submitting job and
wants to wait synchronously for the job submitted previously to be
all done.
"""
while self.executor.workers_active > 0:
self.executor.sync()
[docs] class LimitedParallelism:
"""
Implements LocalExecutor with limited parallelism using a task queue to
coordinate work distribution.
:param executor: the executor instance to implement.
"""
def __init__(self, executor: 'LocalExecutor'):
self.executor: 'LocalExecutor' = executor
self.queue: Optional['Queue[ExecutorWorkType]'] = None
[docs] def start(self) -> None:
"""Starts limited parallelism implementation."""
if not self.executor.manager:
raise AirflowException(NOT_STARTED_MESSAGE)
self.queue = self.executor.manager.Queue()
if not self.executor.result_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
self.executor.workers = [
QueuedLocalWorker(self.queue, self.executor.result_queue)
for _ in range(self.executor.parallelism)
]
self.executor.workers_used = len(self.executor.workers)
for worker in self.executor.workers:
worker.start()
[docs] def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: Optional[str] = None, # pylint: disable=unused-argument
executor_config: Optional[Any] = None, # pylint: disable=unused-argument
) -> None:
"""
Executes task asynchronously.
:param key: the key to identify the task instance
:param command: the command to execute
:param queue: name of the queue
:param executor_config: configuration for the executor
"""
if not self.queue:
raise AirflowException(NOT_STARTED_MESSAGE)
self.queue.put((key, command))
[docs] def sync(self):
"""Sync will get called periodically by the heartbeat method."""
while True:
try:
results = self.executor.result_queue.get_nowait()
try:
self.executor.change_state(*results)
finally:
self.executor.result_queue.task_done()
except Empty:
break
[docs] def end(self):
"""Ends the executor. Sends the poison pill to all workers."""
for _ in self.executor.workers:
self.queue.put((None, None))
# Wait for commands to finish
self.queue.join()
self.executor.sync()
[docs] def start(self) -> None:
"""Starts the executor"""
self.manager = Manager()
self.result_queue = self.manager.Queue()
self.workers = []
self.workers_used = 0
self.workers_active = 0
self.impl = (
LocalExecutor.UnlimitedParallelism(self)
if self.parallelism == 0
else LocalExecutor.LimitedParallelism(self)
)
self.impl.start()
[docs] def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: Optional[str] = None,
executor_config: Optional[Any] = None,
) -> None:
"""Execute asynchronously."""
if not self.impl:
raise AirflowException(NOT_STARTED_MESSAGE)
self.validate_command(command)
self.impl.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)
[docs] def sync(self) -> None:
"""Sync will get called periodically by the heartbeat method."""
if not self.impl:
raise AirflowException(NOT_STARTED_MESSAGE)
self.impl.sync()
[docs] def end(self) -> None:
"""
Ends the executor.
:return:
"""
if not self.impl:
raise AirflowException(NOT_STARTED_MESSAGE)
if not self.manager:
raise AirflowException(NOT_STARTED_MESSAGE)
self.log.info(
"Shutting down LocalExecutor"
"; waiting for running tasks to finish. Signal again if you don't want to wait."
)
self.impl.end()
self.manager.shutdown()
"""Terminate the executor is not doing anything."""