Source code for airflow.utils.dag_processing

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import logging
import multiprocessing
import os
import re
import signal
import sys
import time
import zipfile
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from collections import namedtuple
from datetime import timedelta
from importlib import import_module
import enum

import psutil
from six.moves import range, reload_module
from sqlalchemy import or_
from tabulate import tabulate

# To avoid circular imports
import airflow.models
from airflow import configuration as conf
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
from airflow.settings import Stats
from airflow.models import errors
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State


class SimpleDag(BaseDag):
    """
    A simplified representation of a DAG that contains all attributes
    required for instantiating and scheduling its associated tasks.
    """

    def __init__(self, dag, pickle_id=None):
        """
        :param dag: the DAG
        :type dag: airflow.models.DAG
        :param pickle_id: ID associated with the pickled version of this DAG.
        :type pickle_id: unicode
        """
        self._dag_id = dag.dag_id
        self._task_ids = [task.task_id for task in dag.tasks]
        self._full_filepath = dag.full_filepath
        self._is_paused = dag.is_paused
        self._concurrency = dag.concurrency
        self._pickle_id = pickle_id
        self._task_special_args = {}
        for task in dag.tasks:
            special_args = {}
            if task.task_concurrency is not None:
                special_args['task_concurrency'] = task.task_concurrency
            if len(special_args) > 0:
                self._task_special_args[task.task_id] = special_args

    @property
    def dag_id(self):
        """
        :return: the DAG ID
        :rtype: unicode
        """
        return self._dag_id

    @property
    def task_ids(self):
        """
        :return: A list of task IDs that are in this DAG
        :rtype: list[unicode]
        """
        return self._task_ids

    @property
    def full_filepath(self):
        """
        :return: The absolute path to the file that contains this DAG's definition
        :rtype: unicode
        """
        return self._full_filepath

    @property
    def concurrency(self):
        """
        :return: maximum number of tasks that can run simultaneously from this DAG
        :rtype: int
        """
        return self._concurrency

    @property
    def is_paused(self):
        """
        :return: whether this DAG is paused or not
        :rtype: bool
        """
        return self._is_paused

    @property
    def pickle_id(self):
        """
        :return: The pickle ID for this DAG, if it has one. Otherwise None.
        :rtype: unicode
        """
        return self._pickle_id

    @property
    def task_special_args(self):
        return self._task_special_args

    def get_task_special_arg(self, task_id, special_arg_name):
        if task_id in self._task_special_args and special_arg_name in self._task_special_args[task_id]:
            return self._task_special_args[task_id][special_arg_name]
        else:
            return None


class SimpleTaskInstance(object):
    def __init__(self, ti):
        self._dag_id = ti.dag_id
        self._task_id = ti.task_id
        self._execution_date = ti.execution_date
        self._start_date = ti.start_date
        self._end_date = ti.end_date
        self._try_number = ti.try_number
        self._state = ti.state
        self._executor_config = ti.executor_config
        if hasattr(ti, 'run_as_user'):
            self._run_as_user = ti.run_as_user
        else:
            self._run_as_user = None
        if hasattr(ti, 'pool'):
            self._pool = ti.pool
        else:
            self._pool = None
        if hasattr(ti, 'priority_weight'):
            self._priority_weight = ti.priority_weight
        else:
            self._priority_weight = None
        self._queue = ti.queue
        self._key = ti.key

    @property
    def dag_id(self):
        return self._dag_id

    @property
    def task_id(self):
        return self._task_id

    @property
    def execution_date(self):
        return self._execution_date

    @property
    def start_date(self):
        return self._start_date

    @property
    def end_date(self):
        return self._end_date

    @property
    def try_number(self):
        return self._try_number

    @property
    def state(self):
        return self._state

    @property
    def pool(self):
        return self._pool

    @property
    def priority_weight(self):
        return self._priority_weight

    @property
    def queue(self):
        return self._queue

    @property
    def key(self):
        return self._key

    @property
    def executor_config(self):
        return self._executor_config

    @provide_session
    def construct_task_instance(self, session=None, lock_for_update=False):
        """
        Construct a TaskInstance from the database based on the primary key

        :param session: DB session.
        :param lock_for_update: if True, indicates that the database should
            lock the TaskInstance (issuing a FOR UPDATE clause) until the
            session is committed.
        """
        TI = airflow.models.TaskInstance

        qry = session.query(TI).filter(
            TI.dag_id == self._dag_id,
            TI.task_id == self._task_id,
            TI.execution_date == self._execution_date)

        if lock_for_update:
            ti = qry.with_for_update().first()
        else:
            ti = qry.first()
        return ti


class SimpleDagBag(BaseDagBag):
    """
    A collection of SimpleDag objects with some convenience methods.
    """

    def __init__(self, simple_dags):
        """
        Constructor.

        :param simple_dags: SimpleDag objects that should be in this
        :type list(airflow.utils.dag_processing.SimpleDagBag)
        """
        self.simple_dags = simple_dags
        self.dag_id_to_simple_dag = {}

        for simple_dag in simple_dags:
            self.dag_id_to_simple_dag[simple_dag.dag_id] = simple_dag

    @property
    def dag_ids(self):
        """
        :return: IDs of all the DAGs in this
        :rtype: list[unicode]
        """
        return self.dag_id_to_simple_dag.keys()

    def get_dag(self, dag_id):
        """
        :param dag_id: DAG ID
        :type dag_id: unicode
        :return: if the given DAG ID exists in the bag, return the BaseDag
        corresponding to that ID. Otherwise, throw an Exception
        :rtype: airflow.utils.dag_processing.SimpleDag
        """
        if dag_id not in self.dag_id_to_simple_dag:
            raise AirflowException("Unknown DAG ID {}".format(dag_id))
        return self.dag_id_to_simple_dag[dag_id]


[docs]def list_py_file_paths(directory, safe_mode=True, include_examples=None): """ Traverse a directory and look for Python files. :param directory: the directory to traverse :type directory: unicode :param safe_mode: whether to use a heuristic to determine whether a file contains Airflow DAG definitions :return: a list of paths to Python files in the specified directory :rtype: list[unicode] """ if include_examples is None: include_examples = conf.getboolean('core', 'LOAD_EXAMPLES') file_paths = [] if directory is None: return [] elif os.path.isfile(directory): return [directory] elif os.path.isdir(directory): patterns_by_dir = {} for root, dirs, files in os.walk(directory, followlinks=True): patterns = patterns_by_dir.get(root, []) ignore_file = os.path.join(root, '.airflowignore') if os.path.isfile(ignore_file): with open(ignore_file, 'r') as f: # If we have new patterns create a copy so we don't change # the previous list (which would affect other subdirs) patterns = patterns + [p for p in f.read().split('\n') if p] # If we can ignore any subdirs entirely we should - fewer paths # to walk is better. We have to modify the ``dirs`` array in # place for this to affect os.walk dirs[:] = [ d for d in dirs if not any(re.search(p, os.path.join(root, d)) for p in patterns) ] # We want patterns defined in a parent folder's .airflowignore to # apply to subdirs too for d in dirs: patterns_by_dir[os.path.join(root, d)] = patterns for f in files: try: file_path = os.path.join(root, f) if not os.path.isfile(file_path): continue mod_name, file_ext = os.path.splitext( os.path.split(file_path)[-1]) if file_ext != '.py' and not zipfile.is_zipfile(file_path): continue if any([re.findall(p, file_path) for p in patterns]): continue # Heuristic that guesses whether a Python file contains an # Airflow DAG definition. might_contain_dag = True if safe_mode and not zipfile.is_zipfile(file_path): with open(file_path, 'rb') as f: content = f.read() might_contain_dag = all( [s in content for s in (b'DAG', b'airflow')]) if not might_contain_dag: continue file_paths.append(file_path) except Exception: log = LoggingMixin().log log.exception("Error while examining %s", f) if include_examples: import airflow.example_dags example_dag_folder = airflow.example_dags.__path__[0] file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, False)) return file_paths
class AbstractDagFileProcessor(object): """ Processes a DAG file. See SchedulerJob.process_file() for more details. """ __metaclass__ = ABCMeta @abstractmethod def start(self): """ Launch the process to process the file """ raise NotImplementedError() @abstractmethod def terminate(self, sigkill=False): """ Terminate (and then kill) the process launched to process the file """ raise NotImplementedError() @property @abstractmethod def pid(self): """ :return: the PID of the process launched to process the given file """ raise NotImplementedError() @property @abstractmethod def exit_code(self): """ After the process is finished, this can be called to get the return code :return: the exit code of the process :rtype: int """ raise NotImplementedError() @property @abstractmethod def done(self): """ Check if the process launched to process this file is done. :return: whether the process is finished running :rtype: bool """ raise NotImplementedError() @property @abstractmethod def result(self): """ :return: result of running SchedulerJob.process_file() :rtype: list[airflow.utils.dag_processing.SimpleDag] """ raise NotImplementedError() @property @abstractmethod def start_time(self): """ :return: When this started to process the file :rtype: datetime """ raise NotImplementedError() @property @abstractmethod def file_path(self): """ :return: the path to the file that this is processing :rtype: unicode """ raise NotImplementedError() DagParsingStat = namedtuple('DagParsingStat', ['file_paths', 'all_pids', 'done', 'all_files_processed', 'result_count']) class DagParsingSignal(enum.Enum): AGENT_HEARTBEAT = 'agent_heartbeat' MANAGER_DONE = 'manager_done' TERMINATE_MANAGER = 'terminate_manager' END_MANAGER = 'end_manager' class DagFileProcessorAgent(LoggingMixin): """ Agent for DAG file processing. It is responsible for all DAG parsing related jobs in scheduler process. Mainly it can spin up DagFileProcessorManager in a subprocess, collect DAG parsing results from it and communicate signal/DAG parsing stat with it. """ def __init__(self, dag_directory, file_paths, max_runs, processor_factory, async_mode): """ :param dag_directory: Directory where DAG definitions are kept. All files in file_paths should be under this directory :type dag_directory: unicode :param file_paths: list of file paths that contain DAG definitions :type file_paths: list[unicode] :param max_runs: The number of times to parse and schedule each file. -1 for unlimited. :type max_runs: int :param processor_factory: function that creates processors for DAG definition files. Arguments are (dag_definition_path, log_file_path) :type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessor) :param async_mode: Whether to start agent in async mode :type async_mode: bool """ self._file_paths = file_paths self._file_path_queue = [] self._dag_directory = dag_directory self._max_runs = max_runs self._processor_factory = processor_factory self._async_mode = async_mode # Map from file path to the processor self._processors = {} # Map from file path to the last runtime self._last_runtime = {} # Map from file path to the last finish time self._last_finish_time = {} # Map from file path to the number of runs self._run_count = defaultdict(int) # Pids of DAG parse self._all_pids = [] # Pipe for communicating signals self._parent_signal_conn, self._child_signal_conn = multiprocessing.Pipe() # Pipe for communicating DagParsingStat self._stat_queue = multiprocessing.Queue() self._result_queue = multiprocessing.Queue() self._process = None self._done = False # Initialized as true so we do not deactivate w/o any actual DAG parsing. self._all_files_processed = True self._result_count = 0 def start(self): """ Launch DagFileProcessorManager processor and start DAG parsing loop in manager. """ self._process = self._launch_process(self._dag_directory, self._file_paths, self._max_runs, self._processor_factory, self._child_signal_conn, self._stat_queue, self._result_queue, self._async_mode) self.log.info("Launched DagFileProcessorManager with pid: %s", self._process.pid) def heartbeat(self): """ Should only be used when launched DAG file processor manager in sync mode. Send agent heartbeat signal to the manager. """ self._parent_signal_conn.send(DagParsingSignal.AGENT_HEARTBEAT) def wait_until_finished(self): """ Should only be used when launched DAG file processor manager in sync mode. Wait for done signal from the manager. """ while True: if self._parent_signal_conn.recv() == DagParsingSignal.MANAGER_DONE: break @staticmethod def _launch_process(dag_directory, file_paths, max_runs, processor_factory, signal_conn, _stat_queue, result_queue, async_mode): def helper(): # Reload configurations and settings to avoid collision with parent process. # Because this process may need custom configurations that cannot be shared, # e.g. RotatingFileHandler. And it can cause connection corruption if we # do not recreate the SQLA connection pool. os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True' # Replicating the behavior of how logging module was loaded # in logging_config.py reload_module(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit('.', 1)[0])) reload_module(airflow.settings) airflow.settings.initialize() del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] processor_manager = DagFileProcessorManager(dag_directory, file_paths, max_runs, processor_factory, signal_conn, _stat_queue, result_queue, async_mode) processor_manager.start() p = multiprocessing.Process(target=helper, args=(), name="DagFileProcessorManager") p.start() return p def harvest_simple_dags(self): """ Harvest DAG parsing results from result queue and sync metadata from stat queue. :return: List of parsing result in SimpleDag format. """ # Metadata and results to be harvested can be inconsistent, # but it should not be a big problem. self._sync_metadata() # Heartbeating after syncing metadata so we do not restart manager # if it processed all files for max_run times and exit normally. self._heartbeat_manager() simple_dags = [] # multiprocessing.Queue().qsize will not work on MacOS. if sys.platform == "darwin": qsize = self._result_count else: qsize = self._result_queue.qsize() for _ in range(qsize): simple_dags.append(self._result_queue.get()) self._result_count = 0 return simple_dags def _heartbeat_manager(self): """ Heartbeat DAG file processor and start it if it is not alive. :return: """ if self._process and not self._process.is_alive() and not self.done: self.start() def _sync_metadata(self): """ Sync metadata from stat queue and only keep the latest stat. :return: """ while not self._stat_queue.empty(): stat = self._stat_queue.get() self._file_paths = stat.file_paths self._all_pids = stat.all_pids self._done = stat.done self._all_files_processed = stat.all_files_processed self._result_count += stat.result_count @property def file_paths(self): return self._file_paths @property def done(self): return self._done @property def all_files_processed(self): return self._all_files_processed def terminate(self): """ Send termination signal to DAG parsing processor manager and expect it to terminate all DAG file processors. """ self.log.info("Sending termination message to manager.") self._child_signal_conn.send(DagParsingSignal.TERMINATE_MANAGER) def end(self): """ Terminate (and then kill) the manager process launched. :return: """ if not self._process: self.log.warn('Ending without manager process.') return this_process = psutil.Process(os.getpid()) try: manager_process = psutil.Process(self._process.pid) except psutil.NoSuchProcess: self.log.info("Manager process not running.") return # First try SIGTERM if manager_process.is_running() \ and manager_process.pid in [x.pid for x in this_process.children()]: self.log.info("Terminating manager process: %s", manager_process.pid) manager_process.terminate() # TODO: Remove magic number timeout = 5 self.log.info("Waiting up to %ss for manager process to exit...", timeout) try: psutil.wait_procs({manager_process}, timeout) except psutil.TimeoutExpired: self.log.debug("Ran out of time while waiting for " "processes to exit") # Then SIGKILL if manager_process.is_running() \ and manager_process.pid in [x.pid for x in this_process.children()]: self.log.info("Killing manager process: %s", manager_process.pid) manager_process.kill() manager_process.wait() class DagFileProcessorManager(LoggingMixin): """ Given a list of DAG definition files, this kicks off several processors in parallel to process them and put the results to a multiprocessing.Queue for DagFileProcessorAgent to harvest. The parallelism is limited and as the processors finish, more are launched. The files are processed over and over again, but no more often than the specified interval. :type _file_path_queue: list[unicode] :type _processors: dict[unicode, AbstractDagFileProcessor] :type _last_runtime: dict[unicode, float] :type _last_finish_time: dict[unicode, datetime.datetime] """ def __init__(self, dag_directory, file_paths, max_runs, processor_factory, signal_conn, stat_queue, result_queue, async_mode=True): """ :param dag_directory: Directory where DAG definitions are kept. All files in file_paths should be under this directory :type dag_directory: unicode :param file_paths: list of file paths that contain DAG definitions :type file_paths: list[unicode] :param max_runs: The number of times to parse and schedule each file. -1 for unlimited. :type max_runs: int :param processor_factory: function that creates processors for DAG definition files. Arguments are (dag_definition_path) :type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessor) :param signal_conn: connection to communicate signal with processor agent. :type signal_conn: airflow.models.connection.Connection :param stat_queue: the queue to use for passing back parsing stat to agent. :type stat_queue: multiprocessing.Queue :param result_queue: the queue to use for passing back the result to agent. :type result_queue: multiprocessing.Queue :param async_mode: whether to start the manager in async mode :type async_mode: bool """ self._file_paths = file_paths self._file_path_queue = [] self._dag_directory = dag_directory self._max_runs = max_runs self._processor_factory = processor_factory self._signal_conn = signal_conn self._stat_queue = stat_queue self._result_queue = result_queue self._async_mode = async_mode self._parallelism = conf.getint('scheduler', 'max_threads') if 'sqlite' in conf.get('core', 'sql_alchemy_conn') and self._parallelism > 1: self.log.error("Cannot use more than 1 thread when using sqlite. " "Setting parallelism to 1") self._parallelism = 1 # Parse and schedule each file no faster than this interval. self._file_process_interval = conf.getint('scheduler', 'min_file_process_interval') # How often to print out DAG file processing stats to the log. Default to # 30 seconds. self.print_stats_interval = conf.getint('scheduler', 'print_stats_interval') # How many seconds do we wait for tasks to heartbeat before mark them as zombies. self._zombie_threshold_secs = ( conf.getint('scheduler', 'scheduler_zombie_task_threshold')) # Map from file path to the processor self._processors = {} # Map from file path to the last runtime self._last_runtime = {} # Map from file path to the last finish time self._last_finish_time = {} self._last_zombie_query_time = timezone.utcnow() # Last time that the DAG dir was traversed to look for files self.last_dag_dir_refresh_time = timezone.utcnow() # Last time stats were printed self.last_stat_print_time = timezone.datetime(2000, 1, 1) # TODO: Remove magic number self._zombie_query_interval = 10 # Map from file path to the number of runs self._run_count = defaultdict(int) # Manager heartbeat key. self._heart_beat_key = 'heart-beat' # How often to scan the DAGs directory for new files. Default to 5 minutes. self.dag_dir_list_interval = conf.getint('scheduler', 'dag_dir_list_interval') self._log = logging.getLogger('airflow.processor_manager') signal.signal(signal.SIGINT, self._exit_gracefully) signal.signal(signal.SIGTERM, self._exit_gracefully) def _exit_gracefully(self, signum, frame): """ Helper method to clean up DAG file processors to avoid leaving orphan processes. """ self.log.info("Exiting gracefully upon receiving signal %s", signum) self.terminate() self.end() self.log.debug("Finished terminating DAG processors.") sys.exit(os.EX_OK) def start(self): """ Use multiple processes to parse and generate tasks for the DAGs in parallel. By processing them in separate processes, we can get parallelism and isolation from potentially harmful user code. """ self.log.info("Processing files using up to %s processes at a time ", self._parallelism) self.log.info("Process each file at most once every %s seconds", self._file_process_interval) self.log.info( "Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval ) if self._async_mode: self.log.debug("Starting DagFileProcessorManager in async mode") self.start_in_async() else: self.log.debug("Starting DagFileProcessorManager in sync mode") self.start_in_sync() def start_in_async(self): """ Parse DAG files repeatedly in a standalone loop. """ while True: loop_start_time = time.time() if self._signal_conn.poll(): agent_signal = self._signal_conn.recv() if agent_signal == DagParsingSignal.TERMINATE_MANAGER: self.terminate() break elif agent_signal == DagParsingSignal.END_MANAGER: self.end() sys.exit(os.EX_OK) self._refresh_dag_dir() simple_dags = self.heartbeat() for simple_dag in simple_dags: self._result_queue.put(simple_dag) self._print_stat() all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths) max_runs_reached = self.max_runs_reached() dag_parsing_stat = DagParsingStat(self._file_paths, self.get_all_pids(), max_runs_reached, all_files_processed, len(simple_dags)) self._stat_queue.put(dag_parsing_stat) if max_runs_reached: self.log.info("Exiting dag parsing loop as all files " "have been processed %s times", self._max_runs) break loop_duration = time.time() - loop_start_time if loop_duration < 1: sleep_length = 1 - loop_duration self.log.debug("Sleeping for %.2f seconds to prevent excessive logging", sleep_length) time.sleep(sleep_length) def start_in_sync(self): """ Parse DAG files in a loop controlled by DagParsingSignal. Actual DAG parsing loop will run once upon receiving one agent heartbeat message and will report done when finished the loop. """ while True: agent_signal = self._signal_conn.recv() if agent_signal == DagParsingSignal.TERMINATE_MANAGER: self.terminate() break elif agent_signal == DagParsingSignal.END_MANAGER: self.end() sys.exit(os.EX_OK) elif agent_signal == DagParsingSignal.AGENT_HEARTBEAT: self._refresh_dag_dir() simple_dags = self.heartbeat() for simple_dag in simple_dags: self._result_queue.put(simple_dag) self._print_stat() all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths) max_runs_reached = self.max_runs_reached() dag_parsing_stat = DagParsingStat(self._file_paths, self.get_all_pids(), self.max_runs_reached(), all_files_processed, len(simple_dags)) self._stat_queue.put(dag_parsing_stat) self.wait_until_finished() self._signal_conn.send(DagParsingSignal.MANAGER_DONE) if max_runs_reached: self.log.info("Exiting dag parsing loop as all files " "have been processed %s times", self._max_runs) self._signal_conn.send(DagParsingSignal.MANAGER_DONE) break def _refresh_dag_dir(self): """ Refresh file paths from dag dir if we haven't done it for too long. """ elapsed_time_since_refresh = (timezone.utcnow() - self.last_dag_dir_refresh_time).total_seconds() if elapsed_time_since_refresh > self.dag_dir_list_interval: # Build up a list of Python files that could contain DAGs self.log.info("Searching for files in %s", self._dag_directory) self._file_paths = list_py_file_paths(self._dag_directory) self.last_dag_dir_refresh_time = timezone.utcnow() self.log.info("There are %s files in %s", len(self._file_paths), self._dag_directory) self.set_file_paths(self._file_paths) try: self.log.debug("Removing old import errors") self.clear_nonexistent_import_errors() except Exception: self.log.exception("Error removing old import errors") def _print_stat(self): """ Occasionally print out stats about how fast the files are getting processed """ if ((timezone.utcnow() - self.last_stat_print_time).total_seconds() > self.print_stats_interval): if len(self._file_paths) > 0: self._log_file_processing_stats(self._file_paths) self.last_stat_print_time = timezone.utcnow() @provide_session def clear_nonexistent_import_errors(self, session): """ Clears import errors for files that no longer exist. :param session: session for ORM operations :type session: sqlalchemy.orm.session.Session """ query = session.query(errors.ImportError) if self._file_paths: query = query.filter( ~errors.ImportError.filename.in_(self._file_paths) ) query.delete(synchronize_session='fetch') session.commit() def _log_file_processing_stats(self, known_file_paths): """ Print out stats about how files are getting processed. :param known_file_paths: a list of file paths that may contain Airflow DAG definitions :type known_file_paths: list[unicode] :return: None """ # File Path: Path to the file containing the DAG definition # PID: PID associated with the process that's processing the file. May # be empty. # Runtime: If the process is currently running, how long it's been # running for in seconds. # Last Runtime: If the process ran before, how long did it take to # finish in seconds # Last Run: When the file finished processing in the previous run. headers = ["File Path", "PID", "Runtime", "Last Runtime", "Last Run"] rows = [] for file_path in known_file_paths: last_runtime = self.get_last_runtime(file_path) file_name = os.path.basename(file_path) file_name = os.path.splitext(file_name)[0].replace(os.sep, '.') if last_runtime: Stats.gauge( 'dag_processing.last_runtime.{}'.format(file_name), last_runtime ) processor_pid = self.get_pid(file_path) processor_start_time = self.get_start_time(file_path) runtime = ((timezone.utcnow() - processor_start_time).total_seconds() if processor_start_time else None) last_run = self.get_last_finish_time(file_path) if last_run: seconds_ago = (timezone.utcnow() - last_run).total_seconds() Stats.gauge( 'dag_processing.last_run.seconds_ago.{}'.format(file_name), seconds_ago ) rows.append((file_path, processor_pid, runtime, last_runtime, last_run)) # Sort by longest last runtime. (Can't sort None values in python3) rows = sorted(rows, key=lambda x: x[3] or 0.0) formatted_rows = [] for file_path, pid, runtime, last_runtime, last_run in rows: formatted_rows.append((file_path, pid, "{:.2f}s".format(runtime) if runtime else None, "{:.2f}s".format(last_runtime) if last_runtime else None, last_run.strftime("%Y-%m-%dT%H:%M:%S") if last_run else None)) log_str = ("\n" + "=" * 80 + "\n" + "DAG File Processing Stats\n\n" + tabulate(formatted_rows, headers=headers) + "\n" + "=" * 80) self.log.info(log_str) @property def file_paths(self): return self._file_paths def get_pid(self, file_path): """ :param file_path: the path to the file that's being processed :type file_path: unicode :return: the PID of the process processing the given file or None if the specified file is not being processed :rtype: int """ if file_path in self._processors: return self._processors[file_path].pid return None def get_all_pids(self): """ :return: a list of the PIDs for the processors that are running :rtype: List[int] """ return [x.pid for x in self._processors.values()] def get_runtime(self, file_path): """ :param file_path: the path to the file that's being processed :type file_path: unicode :return: the current runtime (in seconds) of the process that's processing the specified file or None if the file is not currently being processed """ if file_path in self._processors: return (timezone.utcnow() - self._processors[file_path].start_time)\ .total_seconds() return None def get_last_runtime(self, file_path): """ :param file_path: the path to the file that was processed :type file_path: unicode :return: the runtime (in seconds) of the process of the last run, or None if the file was never processed. :rtype: float """ return self._last_runtime.get(file_path) def get_last_finish_time(self, file_path): """ :param file_path: the path to the file that was processed :type file_path: unicode :return: the finish time of the process of the last run, or None if the file was never processed. :rtype: datetime """ return self._last_finish_time.get(file_path) def get_start_time(self, file_path): """ :param file_path: the path to the file that's being processed :type file_path: unicode :return: the start time of the process that's processing the specified file or None if the file is not currently being processed :rtype: datetime """ if file_path in self._processors: return self._processors[file_path].start_time return None def set_file_paths(self, new_file_paths): """ Update this with a new set of paths to DAG definition files. :param new_file_paths: list of paths to DAG definition files :type new_file_paths: list[unicode] :return: None """ self._file_paths = new_file_paths self._file_path_queue = [x for x in self._file_path_queue if x in new_file_paths] # Stop processors that are working on deleted files filtered_processors = {} for file_path, processor in self._processors.items(): if file_path in new_file_paths: filtered_processors[file_path] = processor else: self.log.warning("Stopping processor for %s", file_path) processor.terminate() self._processors = filtered_processors def processing_count(self): """ :return: the number of files currently being processed :rtype: int """ return len(self._processors) def wait_until_finished(self): """ Sleeps until all the processors are done. """ for file_path, processor in self._processors.items(): while not processor.done: time.sleep(0.1) def heartbeat(self): """ This should be periodically called by the manager loop. This method will kick off new processes to process DAG definition files and read the results from the finished processors. :return: a list of SimpleDags that were produced by processors that have finished since the last time this was called :rtype: list[airflow.utils.dag_processing.SimpleDag] """ finished_processors = {} """:type : dict[unicode, AbstractDagFileProcessor]""" running_processors = {} """:type : dict[unicode, AbstractDagFileProcessor]""" for file_path, processor in self._processors.items(): if processor.done: self.log.debug("Processor for %s finished", file_path) now = timezone.utcnow() finished_processors[file_path] = processor self._last_runtime[file_path] = (now - processor.start_time).total_seconds() self._last_finish_time[file_path] = now self._run_count[file_path] += 1 else: running_processors[file_path] = processor self._processors = running_processors self.log.debug("%s/%s DAG parsing processes running", len(self._processors), self._parallelism) self.log.debug("%s file paths queued for processing", len(self._file_path_queue)) # Collect all the DAGs that were found in the processed files simple_dags = [] for file_path, processor in finished_processors.items(): if processor.result is None: self.log.warning( "Processor for %s exited with return code %s.", processor.file_path, processor.exit_code ) else: for simple_dag in processor.result: simple_dags.append(simple_dag) # Generate more file paths to process if we processed all the files # already. if len(self._file_path_queue) == 0: # If the file path is already being processed, or if a file was # processed recently, wait until the next batch file_paths_in_progress = self._processors.keys() now = timezone.utcnow() file_paths_recently_processed = [] for file_path in self._file_paths: last_finish_time = self.get_last_finish_time(file_path) if (last_finish_time is not None and (now - last_finish_time).total_seconds() < self._file_process_interval): file_paths_recently_processed.append(file_path) files_paths_at_run_limit = [file_path for file_path, num_runs in self._run_count.items() if num_runs == self._max_runs] files_paths_to_queue = list(set(self._file_paths) - set(file_paths_in_progress) - set(file_paths_recently_processed) - set(files_paths_at_run_limit)) for file_path, processor in self._processors.items(): self.log.debug( "File path %s is still being processed (started: %s)", processor.file_path, processor.start_time.isoformat() ) self.log.debug( "Queuing the following files for processing:\n\t%s", "\n\t".join(files_paths_to_queue) ) self._file_path_queue.extend(files_paths_to_queue) zombies = self._find_zombies() # Start more processors if we have enough slots and files to process while (self._parallelism - len(self._processors) > 0 and len(self._file_path_queue) > 0): file_path = self._file_path_queue.pop(0) processor = self._processor_factory(file_path, zombies) processor.start() self.log.debug( "Started a process (PID: %s) to generate tasks for %s", processor.pid, file_path ) self._processors[file_path] = processor # Update heartbeat count. self._run_count[self._heart_beat_key] += 1 return simple_dags @provide_session def _find_zombies(self, session): """ Find zombie task instances, which are tasks haven't heartbeated for too long. :return: Zombie task instances in SimpleTaskInstance format. """ now = timezone.utcnow() zombies = [] if (now - self._last_zombie_query_time).total_seconds() \ > self._zombie_query_interval: # to avoid circular imports from airflow.jobs import LocalTaskJob as LJ self.log.info("Finding 'running' jobs without a recent heartbeat") TI = airflow.models.TaskInstance limit_dttm = timezone.utcnow() - timedelta( seconds=self._zombie_threshold_secs) self.log.info("Failing jobs without heartbeat after %s", limit_dttm) tis = ( session.query(TI) .join(LJ, TI.job_id == LJ.id) .filter(TI.state == State.RUNNING) .filter( or_( LJ.state != State.RUNNING, LJ.latest_heartbeat < limit_dttm, ) ).all() ) self._last_zombie_query_time = timezone.utcnow() for ti in tis: zombies.append(SimpleTaskInstance(ti)) return zombies def max_runs_reached(self): """ :return: whether all file paths have been processed max_runs times """ if self._max_runs == -1: # Unlimited runs. return False for file_path in self._file_paths: if self._run_count[file_path] < self._max_runs: return False if self._run_count[self._heart_beat_key] < self._max_runs: return False return True def terminate(self): """ Stops all running processors :return: None """ for processor in self._processors.values(): processor.terminate() def end(self): """ Kill all child processes on exit since we don't want to leave them as orphaned. """ pids_to_kill = self.get_all_pids() if len(pids_to_kill) > 0: # First try SIGTERM this_process = psutil.Process(os.getpid()) # Only check child processes to ensure that we don't have a case # where we kill the wrong process because a child process died # but the PID got reused. child_processes = [x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill] for child in child_processes: self.log.info("Terminating child PID: %s", child.pid) child.terminate() # TODO: Remove magic number timeout = 5 self.log.info("Waiting up to %s seconds for processes to exit...", timeout) try: psutil.wait_procs( child_processes, timeout=timeout, callback=lambda x: self.log.info('Terminated PID %s', x.pid)) except psutil.TimeoutExpired: self.log.debug("Ran out of time while waiting for processes to exit") # Then SIGKILL child_processes = [x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill] if len(child_processes) > 0: self.log.info("SIGKILL processes that did not terminate gracefully") for child in child_processes: self.log.info("Killing child PID: %s", child.pid) child.kill() child.wait()