# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
LocalExecutor
.. seealso::
For more information on how the LocalExecutor works, take a look at the guide:
:ref:`executor:LocalExecutor`
"""
import multiprocessing
import subprocess
from builtins import range
from queue import Empty
from airflow.executors.base_executor import BaseExecutor
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
[docs]class LocalWorker(multiprocessing.Process, LoggingMixin):
"""LocalWorker Process implementation to run airflow commands. Executes the given
command and puts the result into a result queue when done, terminating execution."""
def __init__(self, result_queue):
"""
:param result_queue: the queue to store result states tuples (key, State)
:type result_queue: multiprocessing.Queue
"""
super(LocalWorker, self).__init__()
self.daemon = True
self.result_queue = result_queue
self.key = None
self.command = None
[docs] def execute_work(self, key, command):
"""
Executes command received and stores result state in queue.
:param key: the key to identify the TI
:type key: tuple(dag_id, task_id, execution_date)
:param command: the command to execute
:type command: str
"""
if key is None:
return
self.log.info("%s running %s", self.__class__.__name__, command)
try:
subprocess.check_call(command, close_fds=True)
state = State.SUCCESS
except subprocess.CalledProcessError as e:
state = State.FAILED
self.log.error("Failed to execute task %s.", str(e))
# TODO: Why is this commented out?
# raise e
self.result_queue.put((key, state))
[docs] def run(self):
self.execute_work(self.key, self.command)
[docs]class QueuedLocalWorker(LocalWorker):
"""LocalWorker implementation that is waiting for tasks from a queue and will
continue executing commands as they become available in the queue. It will terminate
execution once the poison token is found."""
def __init__(self, task_queue, result_queue):
super(QueuedLocalWorker, self).__init__(result_queue=result_queue)
self.task_queue = task_queue
[docs] def run(self):
while True:
key, command = self.task_queue.get()
try:
if key is None:
# Received poison pill, no more tasks to run
break
self.execute_work(key, command)
finally:
self.task_queue.task_done()
[docs]class LocalExecutor(BaseExecutor):
"""
LocalExecutor executes tasks locally in parallel. It uses the
multiprocessing Python library and queues to parallelize the execution
of tasks.
"""
[docs] class _UnlimitedParallelism(object):
"""Implements LocalExecutor with unlimited parallelism, starting one process
per each command to execute."""
def __init__(self, executor):
"""
:param executor: the executor instance to implement.
:type executor: LocalExecutor
"""
self.executor = executor
[docs] def start(self):
self.executor.workers_used = 0
self.executor.workers_active = 0
[docs] def execute_async(self, key, command):
"""
:param key: the key to identify the TI
:type key: tuple(dag_id, task_id, execution_date)
:param command: the command to execute
:type command: str
"""
local_worker = LocalWorker(self.executor.result_queue)
local_worker.key = key
local_worker.command = command
self.executor.workers_used += 1
self.executor.workers_active += 1
local_worker.start()
[docs] def sync(self):
while not self.executor.result_queue.empty():
results = self.executor.result_queue.get()
self.executor.change_state(*results)
self.executor.workers_active -= 1
[docs] def end(self):
while self.executor.workers_active > 0:
self.executor.sync()
[docs] class _LimitedParallelism(object):
"""Implements LocalExecutor with limited parallelism using a task queue to
coordinate work distribution."""
def __init__(self, executor):
self.executor = executor
[docs] def start(self):
self.queue = self.executor.manager.Queue()
self.executor.workers = [
QueuedLocalWorker(self.queue, self.executor.result_queue)
for _ in range(self.executor.parallelism)
]
self.executor.workers_used = len(self.executor.workers)
for w in self.executor.workers:
w.start()
[docs] def execute_async(self, key, command):
"""
:param key: the key to identify the TI
:type key: tuple(dag_id, task_id, execution_date)
:param command: the command to execute
:type command: str
"""
self.queue.put((key, command))
[docs] def sync(self):
while True:
try:
results = self.executor.result_queue.get_nowait()
try:
self.executor.change_state(*results)
finally:
self.executor.result_queue.task_done()
except Empty:
break
[docs] def end(self):
# Sending poison pill to all worker
for _ in self.executor.workers:
self.queue.put((None, None))
# Wait for commands to finish
self.queue.join()
self.executor.sync()
[docs] def start(self):
self.manager = multiprocessing.Manager()
self.result_queue = self.manager.Queue()
self.workers = []
self.workers_used = 0
self.workers_active = 0
self.impl = (LocalExecutor._UnlimitedParallelism(self) if self.parallelism == 0
else LocalExecutor._LimitedParallelism(self))
self.impl.start()
[docs] def execute_async(self, key, command, queue=None, executor_config=None):
if command[0:2] != ["airflow", "run"]:
raise ValueError('The command must start with ["airflow", "run"].')
self.impl.execute_async(key=key, command=command)
[docs] def sync(self):
self.impl.sync()
[docs] def end(self):
self.impl.end()
self.manager.shutdown()