Source code for airflow.configuration

# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from builtins import str
from collections import OrderedDict
import copy
import errno
from future import standard_library
import multiprocessing
import logging
import os
import shlex
import six
from six import iteritems
import subprocess
import sys
import warnings

from backports.configparser import ConfigParser, _UNSET, NoOptionError, NoSectionError
import yaml
from zope.deprecation import deprecated

from airflow.exceptions import AirflowConfigException
from airflow.utils.module_loading import import_string


log = logging.getLogger(__name__)

# show Airflow's deprecation warnings
if not sys.warnoptions:
        action='default', category=DeprecationWarning, module='airflow')
        action='default', category=PendingDeprecationWarning, module='airflow')

def generate_fernet_key():
        from cryptography.fernet import Fernet
    except ImportError:
        return ''
        return Fernet.generate_key().decode()

def expand_env_var(env_var):
    Expands (potentially nested) env vars by repeatedly applying
    `expandvars` and `expanduser` until interpolation stops having
    any effect.
    if not env_var:
        return env_var
    while True:
        interpolated = os.path.expanduser(os.path.expandvars(str(env_var)))
        if interpolated == env_var:
            return interpolated
            env_var = interpolated

def run_command(command):
    Runs command and returns stdout
    process = subprocess.Popen(
    output, stderr = [stream.decode(sys.getdefaultencoding(), 'ignore')
                      for stream in process.communicate()]

    if process.returncode != 0:
        raise AirflowConfigException(
            "Cannot execute {}. Error code is: {}. Output: {}, Stderr: {}"
            .format(command, process.returncode, output, stderr)

    return output

def _get_config_value_from_secret_backend(config_key):
    """Get Config option values from Secret Backend"""
    from airflow import secrets
    secrets_client = secrets.get_custom_secret_backend()
    if not secrets_client:
        return None
    return secrets_client.get_config(config_key)

def _read_default_config_file(file_name):
    templates_dir = os.path.join(os.path.dirname(__file__), 'config_templates')
    file_path = os.path.join(templates_dir, file_name)
    if six.PY2:
        with open(file_path) as file_handle:
            config =
            return config.decode('utf-8'), file_path
        with open(file_path, encoding='utf-8') as file_handle:
            return, file_path

DEFAULT_CONFIG, DEFAULT_CONFIG_FILE_PATH = _read_default_config_file('default_airflow.cfg')
TEST_CONFIG, TEST_CONFIG_FILE_PATH = _read_default_config_file('default_test.cfg')

def default_config_yaml():
    Read Airflow configs from YAML file

    :return: Python dictionary containing configs & their info
    templates_dir = os.path.join(os.path.dirname(__file__), 'config_templates')
    file_path = os.path.join(templates_dir, "config.yml")

    with open(file_path) as config_file:
        return yaml.safe_load(config_file)

class AirflowConfigParser(ConfigParser):

    # These configuration elements can be fetched as the stdout of commands
    # following the "{section}__{name}__cmd" pattern, the idea behind this
    # is to not store password on boxes in text files.
    # These configs can also be fetched from Secrets backend
    # following the "{section}__{name}__secret" pattern
    sensitive_config_values = {
        ('core', 'sql_alchemy_conn'),
        ('core', 'fernet_key'),
        ('celery', 'broker_url'),
        ('celery', 'flower_basic_auth'),
        ('celery', 'result_backend'),
        # Todo: remove this in Airflow 1.11
        ('celery', 'celery_result_backend'),
        ('atlas', 'password'),
        ('smtp', 'smtp_password'),
        ('ldap', 'bind_password'),
        ('kubernetes', 'git_password'),

    # A two-level mapping of (section -> new_name -> old_name). When reading
    # new_name, the old_name will be checked to see if it exists. If it does a
    # DeprecationWarning will be issued and the old name will be used instead
    deprecated_options = {
        'celery': {
            # Remove these keys in Airflow 1.11
            'worker_concurrency': 'celeryd_concurrency',
            'result_backend': 'celery_result_backend',
            'broker_url': 'celery_broker_url',
            'ssl_active': 'celery_ssl_active',
            'ssl_cert': 'celery_ssl_cert',
            'ssl_key': 'celery_ssl_key',
        'elasticsearch': {
            'host': 'elasticsearch_host',
            'log_id_template': 'elasticsearch_log_id_template',
            'end_of_log_mark': 'elasticsearch_end_of_log_mark',
            'frontend': 'elasticsearch_frontend',
            'write_stdout': 'elasticsearch_write_stdout',
            'json_format': 'elasticsearch_json_format',
            'json_fields': 'elasticsearch_json_fields'


    # A mapping of old default values that we want to change and warn the user
    # about. Mapping of section -> setting -> { old, replace, by_version }
    deprecated_values = {
        'core': {
            'task_runner': ('BashTaskRunner', 'StandardTaskRunner', '2.0'),

    # This method transforms option names on every read, get, or set operation.
    # This changes from the default behaviour of ConfigParser from lowercasing
    # to instead be case-preserving
    def optionxform(self, optionstr):
        return optionstr

    def __init__(self, default_config=None, *args, **kwargs):
        super(AirflowConfigParser, self).__init__(*args, **kwargs)

        self.airflow_defaults = ConfigParser(*args, **kwargs)
        if default_config is not None:

        self.is_validated = False

    def _validate(self):
        for section, replacement in self.deprecated_values.items():
            for name, info in replacement.items():
                old, new, version = info
                if self.get(section, name, fallback=None) == old:
                    # Make sure the env var option is removed, otherwise it
                    # would be read and used instead of the value we set
                    env_var = self._env_var_name(section, name)
                    os.environ.pop(env_var, None)

                    self.set(section, name, new)
                        'The {name} setting in [{section}] has the old default value '
                        'of {old!r}. This value has been changed to {new!r} in the '
                        'running config, but please update your config before Apache '
                        'Airflow {version}.'.format(
                            name=name, section=section, old=old, new=new, version=version

        self.is_validated = True

    def _validate_config_dependencies(self):
        Validate that config values aren't invalid given other config values
        or system-level limitations and requirements.

        if (
                self.get("core", "executor") not in ('DebugExecutor', 'SequentialExecutor') and
                "sqlite" in self.get('core', 'sql_alchemy_conn')):
            raise AirflowConfigException(
                "error: cannot use sqlite with the {}".format(
                    self.get('core', 'executor')))

        elif (
            self.getboolean("webserver", "authenticate") and
            self.get("webserver", "owner_mode") not in ['user', 'ldapgroup']
            raise AirflowConfigException(
                "error: owner_mode option should be either "
                "'user' or 'ldapgroup' when filtering by owner is set")

        elif (
            self.getboolean("webserver", "authenticate") and
            self.get("webserver", "owner_mode").lower() == 'ldapgroup' and
            self.get("webserver", "auth_backend") != (
            raise AirflowConfigException(
                "error: attempt at using ldapgroup "
                "filtering without using the Ldap backend")

        if self.has_option('core', 'mp_start_method'):
            mp_start_method = self.get('core', 'mp_start_method')
            start_method_options = multiprocessing.get_all_start_methods()

            if mp_start_method not in start_method_options:
                raise AirflowConfigException(
                    "mp_start_method should not be " + mp_start_method +
                    ". Possible values are " + ", ".join(start_method_options))

    def _env_var_name(section, key):
        return 'AIRFLOW__{S}__{K}'.format(S=section.upper(), K=key.upper())

    def _get_env_var_option(self, section, key):
        # must have format AIRFLOW__{SECTION}__{KEY} (note double underscore)
        env_var = self._env_var_name(section, key)
        if env_var in os.environ:
            return expand_env_var(os.environ[env_var])
        # alternatively AIRFLOW__{SECTION}__{KEY}_CMD (for a command)
        env_var_cmd = env_var + '_CMD'
        if env_var_cmd in os.environ:
            # if this is a valid command key...
            if (section, key) in self.sensitive_config_values:
                return run_command(os.environ[env_var_cmd])
        # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
        env_var_secret_path = env_var + '_SECRET'
        if env_var_secret_path in os.environ:
            # if this is a valid secret path...
            if (section, key) in self.sensitive_config_values:
                return _get_config_value_from_secret_backend(os.environ[env_var_secret_path])

    def _get_cmd_option(self, section, key):
        fallback_key = key + '_cmd'
        # if this is a valid command key...
        if (section, key) in self.sensitive_config_values:
            if super(AirflowConfigParser, self).has_option(section, fallback_key):
                command = super(AirflowConfigParser, self).get(section, fallback_key)
                return run_command(command)

    def _get_secret_option(self, section, key):
        """Get Config option values from Secret Backend"""
        fallback_key = key + '_secret'
        # if this is a valid secret key...
        if (section, key) in self.sensitive_config_values:
            if super(AirflowConfigParser, self).has_option(section, fallback_key):
                secrets_path = super(AirflowConfigParser, self).get(section, fallback_key)
                return _get_config_value_from_secret_backend(secrets_path)

    def get(self, section, key, **kwargs):
        section = str(section).lower()
        key = str(key).lower()

        deprecated_name = self.deprecated_options.get(section, {}).get(key, None)

        # first check environment variables
        option = self._get_env_var_option(section, key)
        if option is not None:
            return option
        if deprecated_name:
            option = self._get_env_var_option(section, deprecated_name)
            if option is not None:
                self._warn_deprecate(section, key, deprecated_name)
                return option

        # ...then the config file
        if super(AirflowConfigParser, self).has_option(section, key):
            # Use the parent's methods to get the actual config here to be able to
            # separate the config from default config.
            return expand_env_var(
                super(AirflowConfigParser, self).get(section, key, **kwargs))
        if deprecated_name:
            if super(AirflowConfigParser, self).has_option(section, deprecated_name):
                self._warn_deprecate(section, key, deprecated_name)
                return expand_env_var(super(AirflowConfigParser, self).get(

        # ...then commands
        option = self._get_cmd_option(section, key)
        if option:
            return option
        if deprecated_name:
            option = self._get_cmd_option(section, deprecated_name)
            if option:
                self._warn_deprecate(section, key, deprecated_name)
                return option

        # ...then from secret backends
        option = self._get_secret_option(section, key)
        if option:
            return option
        if deprecated_name:
            option = self._get_secret_option(section, deprecated_name)
            if option:
                self._warn_deprecate(section, key, deprecated_name)
                return option

        # ...then the default config
        if self.airflow_defaults.has_option(section, key) or 'fallback' in kwargs:
            return expand_env_var(
                self.airflow_defaults.get(section, key, **kwargs))

                "section/key [%s/%s] not found in config", section, key

            raise AirflowConfigException(
                "section/key [{section}/{key}] not found "
                "in config".format(section=section, key=key))

    def getimport(self, section, key, **kwargs):
        Reads options, imports the full qualified name, and returns the object.
        In case of failure, it throws an exception a clear message with the key aad the section names
        :return: The object or None, if the option is empty
        full_qualified_path = conf.get(section=section, key=key, **kwargs)
        if not full_qualified_path:
            return None

            return import_string(full_qualified_path)
        except ImportError as e:
            raise AirflowConfigException(
                'The object could not be loaded. Please check "{key}" key in "{section}" section. '
                'Current value: "{full_qualified_path}".'.format(
                    key=key, section=section, full_qualified_path=full_qualified_path)

    def getboolean(self, section, key, **kwargs):
        val = str(self.get(section, key, **kwargs)).lower().strip()
        if '#' in val:
            val = val.split('#')[0].strip()
        if val in ('t', 'true', '1'):
            return True
        elif val in ('f', 'false', '0'):
            return False
            raise ValueError(
                'The value for configuration option "{}:{}" is not a '
                'boolean (received "{}").'.format(section, key, val))

    def getint(self, section, key, **kwargs):
        return int(self.get(section, key, **kwargs))

    def getfloat(self, section, key, **kwargs):
        return float(self.get(section, key, **kwargs))

    def read(self, filenames, **kwargs):
        super(AirflowConfigParser, self).read(filenames, **kwargs)

    def read_dict(self, *args, **kwargs):
        super(AirflowConfigParser, self).read_dict(*args, **kwargs)

    def has_option(self, section, option):
            # Using self.get() to avoid reimplementing the priority order
            # of config variables (env, config, cmd, defaults)
            # UNSET to avoid logging a warning about missing values
            self.get(section, option, fallback=_UNSET)
            return True
        except (NoOptionError, NoSectionError):
            return False

    def remove_option(self, section, option, remove_default=True):
        Remove an option if it exists in config from a file or
        default config. If both of config have the same option, this removes
        the option in both configs unless remove_default=False.
        if super(AirflowConfigParser, self).has_option(section, option):
            super(AirflowConfigParser, self).remove_option(section, option)

        if self.airflow_defaults.has_option(section, option) and remove_default:
            self.airflow_defaults.remove_option(section, option)

    def getsection(self, section):
        Returns the section as a dict. Values are converted to int, float, bool
        as required.

        :param section: section from the config
        :rtype: dict
        if (section not in self._sections and
                section not in self.airflow_defaults._sections):
            return None

        _section = copy.deepcopy(self.airflow_defaults._sections[section])

        if section in self._sections:

        section_prefix = 'AIRFLOW__{S}__'.format(S=section.upper())
        for env_var in sorted(os.environ.keys()):
            if env_var.startswith(section_prefix):
                key = env_var.replace(section_prefix, '')
                if key.endswith("_CMD"):
                    key = key[:-4]
                key = key.lower()
                _section[key] = self._get_env_var_option(section, key)

        for key, val in iteritems(_section):
                val = int(val)
            except ValueError:
                    val = float(val)
                except ValueError:
                    if val.lower() in ('t', 'true'):
                        val = True
                    elif val.lower() in ('f', 'false'):
                        val = False
            _section[key] = val
        return _section

    def write(self, fp, space_around_delimiters=True):
        # This is based on the configparser.RawConfigParser.write method code to add support for
        # reading options from environment variables.
        if space_around_delimiters:
            d = " {} ".format(self._delimiters[0])  # type: ignore
            d = self._delimiters[0]  # type: ignore
        if self._defaults:
            self._write_section(fp, self.default_section, self._defaults.items(), d)  # type: ignore
        for section in self._sections:
            self._write_section(fp, section, self.getsection(section).items(), d)  # type: ignore

    def as_dict(
            self, display_source=False, display_sensitive=False, raw=False,
            include_env=True, include_cmds=True, include_secret=True
        Returns the current configuration as an OrderedDict of OrderedDicts.

        :param display_source: If False, the option value is returned. If True,
            a tuple of (option_value, source) is returned. Source is either
            'airflow.cfg', 'default', 'env var', or 'cmd'.
        :type display_source: bool
        :param display_sensitive: If True, the values of options set by env
            vars and bash commands will be displayed. If False, those options
            are shown as '< hidden >'
        :type display_sensitive: bool
        :param raw: Should the values be output as interpolated values, or the
            "raw" form that can be fed back in to ConfigParser
        :type raw: bool
        :param include_env: Should the value of configuration from AIRFLOW__
            environment variables be included or not
        :type include_env: bool
        :param include_cmds: Should the result of calling any *_cmd config be
            set (True, default), or should the _cmd options be left as the
            command to run (False)
        :type include_cmds: bool
        :param include_secret: Should the result of calling any *_secret config be
            set (True, default), or should the _secret options be left as the
            path to get the secret from (False)
        :type include_secret: bool
        :return: Dictionary, where the key is the name of the section and the content is
            the dictionary with the name of the parameter and its value.
        cfg = {}
        configs = [
            ('default', self.airflow_defaults),
            ('airflow.cfg', self),

        for (source_name, config) in configs:
            for section in config.sections():
                sect = cfg.setdefault(section, OrderedDict())
                for (k, val) in config.items(section=section, raw=raw):
                    if display_source:
                        val = (val, source_name)
                    sect[k] = val

        # add env vars and overwrite because they have priority
        if include_env:
            for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]:
                    _, section, key = ev.split('__', 2)
                    opt = self._get_env_var_option(section, key)
                except ValueError:
                if not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE':
                    opt = '< hidden >'
                elif raw:
                    opt = opt.replace('%', '%%')
                if display_source:
                    opt = (opt, 'env var')

                section = section.lower()
                # if we lower key for kubernetes_environment_variables section,
                # then we won't be able to set any Airflow environment
                # variables. Airflow only parse environment variables starts
                # with AIRFLOW_. Therefore, we need to make it a special case.
                if section != 'kubernetes_environment_variables':
                    key = key.lower()
                cfg.setdefault(section, OrderedDict()).update({key: opt})

        # add bash commands
        if include_cmds:
            for (section, key) in self.sensitive_config_values:
                opt = self._get_cmd_option(section, key)
                if opt:
                    if not display_sensitive:
                        opt = '< hidden >'
                    if display_source:
                        opt = (opt, 'cmd')
                    elif raw:
                        opt = opt.replace('%', '%%')
                    cfg.setdefault(section, OrderedDict()).update({key: opt})
                    del cfg[section][key + '_cmd']

        # add config from secret backends
        if include_secret:
            for (section, key) in self.sensitive_config_values:
                opt = self._get_secret_option(section, key)
                if opt:
                    if not display_sensitive:
                        opt = '< hidden >'
                    if display_source:
                        opt = (opt, 'secret')
                    elif raw:
                        opt = opt.replace('%', '%%')
                    cfg.setdefault(section, OrderedDict()).update({key: opt})
                    del cfg[section][key + '_secret']

        return cfg

    def load_test_config(self):
        Load the unit test configuration.

        Note: this is not reversible.
        # override any custom settings with defaults"Overriding settings with defaults from %s", DEFAULT_CONFIG_FILE_PATH)
        # then read test config"Reading default test configuration from %s", TEST_CONFIG_FILE_PATH)
        # then read any "custom" test settings"Reading test configuration from %s", TEST_CONFIG_FILE)

    def _warn_deprecate(self, section, key, deprecated_name):
            'The {old} option in [{section}] has been renamed to {new} - the old '
            'setting has been used, but please update your config.'.format(

def mkdir_p(path):
    except OSError as exc:  # Python >2.5
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            raise AirflowConfigException(
                'Error creating {}: {}'.format(path, exc.strerror))

def get_airflow_home():
    return expand_env_var(os.environ.get('AIRFLOW_HOME', '~/airflow'))

def get_airflow_config(airflow_home):
    if 'AIRFLOW_CONFIG' not in os.environ:
        return os.path.join(airflow_home, 'airflow.cfg')
    return expand_env_var(os.environ['AIRFLOW_CONFIG'])

# Setting AIRFLOW_HOME and AIRFLOW_CONFIG from environment variables, using
# "~/airflow" and "$AIRFLOW_HOME/airflow.cfg" respectively as defaults.

AIRFLOW_HOME = get_airflow_home()
AIRFLOW_CONFIG = get_airflow_config(AIRFLOW_HOME)

# Set up dags folder for unit tests
# this directory won't exist if users install via pip
_TEST_DAGS_FOLDER = os.path.join(
if os.path.exists(_TEST_DAGS_FOLDER):
    TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')

# Set up plugins folder for unit tests
_TEST_PLUGINS_FOLDER = os.path.join(
if os.path.exists(_TEST_PLUGINS_FOLDER):
    TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, 'plugins')

def parameterized_config(template):
    Generates a configuration from the provided template + variables defined in
    current scope

    :param template: a config content templated with {{variables}}
    all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
    return template.format(**all_vars)

def get_airflow_test_config(airflow_home):
    if 'AIRFLOW_TEST_CONFIG' not in os.environ:
        return os.path.join(airflow_home, 'unittests.cfg')
    return expand_env_var(os.environ['AIRFLOW_TEST_CONFIG'])

TEST_CONFIG_FILE = get_airflow_test_config(AIRFLOW_HOME)

# only generate a Fernet key if we need to create a new config file
if not os.path.isfile(TEST_CONFIG_FILE) or not os.path.isfile(AIRFLOW_CONFIG):
    FERNET_KEY = generate_fernet_key()
    FERNET_KEY = ''

    '# ----------------------- TEMPLATE BEGINS HERE -----------------------')
if not os.path.isfile(TEST_CONFIG_FILE):
        'Creating new Airflow config file for unit tests in: %s', TEST_CONFIG_FILE
    with open(TEST_CONFIG_FILE, 'w') as f:
        cfg = parameterized_config(TEST_CONFIG)
if not os.path.isfile(AIRFLOW_CONFIG):
        'Creating new Airflow config file in: %s',
    with open(AIRFLOW_CONFIG, 'w') as f:
        cfg = parameterized_config(DEFAULT_CONFIG)
        cfg = cfg.split(TEMPLATE_START)[-1].strip()
        if six.PY2:
            cfg = cfg.encode('utf8')
        f.write(cfg)"Reading the config from %s", AIRFLOW_CONFIG)

if conf.has_option('core', 'AIRFLOW_HOME'):
    msg = (
        'Specifying both AIRFLOW_HOME environment variable and airflow_home '
        'in the config file is deprecated. Please use only the AIRFLOW_HOME '
        'environment variable and remove the config file entry.'
    if 'AIRFLOW_HOME' in os.environ:
        warnings.warn(msg, category=DeprecationWarning)
    elif conf.get('core', 'airflow_home') == AIRFLOW_HOME:
            'Specifying airflow_home in the config file is deprecated. As you '
            'have left it at the default value you should remove the setting '
            'from your airflow.cfg and suffer no change in behaviour.',
        AIRFLOW_HOME = conf.get('core', 'airflow_home')
        warnings.warn(msg, category=DeprecationWarning)

# Warn about old config file. We used to read ~/airflow/airflow.cfg even if
# that AIRFLOW_HOME was set to something else
_old_config_file = os.path.expanduser("~/airflow/airflow.cfg")
if _old_config_file != AIRFLOW_CONFIG and os.path.isfile(_old_config_file):
        'You have two airflow.cfg files: {old} and {new}. Airflow used to look '
        'at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different '
        'value. Airflow will now only read {new}, and you should remove the '
        'other file'.format(old=_old_config_file, new=AIRFLOW_CONFIG),


if conf.getboolean('webserver', 'rbac'):
    if not os.path.isfile(WEBSERVER_CONFIG):'Creating new FAB webserver config file in: %s', WEBSERVER_CONFIG)
        DEFAULT_WEBSERVER_CONFIG, _ = _read_default_config_file('')
        with open(WEBSERVER_CONFIG, 'w') as file:

if conf.getboolean('core', 'unit_test_mode'):

# Historical convenience functions to access config entries

load_test_config = conf.load_test_config
get = conf.get
getboolean = conf.getboolean
getfloat = conf.getfloat
getint = conf.getint
getsection = conf.getsection
has_option = conf.has_option
remove_option = conf.remove_option
as_dict = conf.as_dict
set = conf.set # noqa

for func in [load_test_config, get, getboolean, getfloat, getint, has_option,
             remove_option, as_dict, set]:
        "Accessing configuration method '{f.__name__}' directly from "
        "the configuration module is deprecated. Please access the "
        "configuration from the 'configuration.conf' object via "

Was this entry helpful?