Source code for airflow.providers.openlineage.conf
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module provides functions for safely retrieving and handling OpenLineage configurations.
For the legacy boolean env variables `OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE` and `OPENLINEAGE_DISABLED`,
any string not equal to "true", "1", or "t" should be treated as False, to maintain backward compatibility.
Support for legacy variables will be removed in Airflow 3.
"""
from __future__ import annotations
import os
from typing import Any
# Disable caching if we're inside tests - this makes config easier to mock.
if os.getenv("PYTEST_VERSION"):
[docs] def decorator(func):
return func
cache = decorator
else:
from airflow.compat.functools import cache
from airflow.configuration import conf
_CONFIG_SECTION = "openlineage"
def _is_true(arg: Any) -> bool:
return str(arg).lower().strip() in ("true", "1", "t")
@cache
[docs]def config_path(check_legacy_env_var: bool = True) -> str:
"""[openlineage] config_path."""
option = conf.get(_CONFIG_SECTION, "config_path", fallback="")
if check_legacy_env_var and not option:
option = os.getenv("OPENLINEAGE_CONFIG", "")
return option
@cache
[docs]def is_source_enabled() -> bool:
"""[openlineage] disable_source_code."""
option = conf.getboolean(_CONFIG_SECTION, "disable_source_code", fallback="False")
if option is False: # Check legacy variable
option = _is_true(os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", ""))
# when disable_source_code is True, is_source_enabled() should be False; hence the "not"
return not option
@cache
[docs]def disabled_operators() -> set[str]:
"""[openlineage] disabled_for_operators."""
option = conf.get(_CONFIG_SECTION, "disabled_for_operators", fallback="")
return set(operator.strip() for operator in option.split(";") if operator.strip())
@cache
[docs]def selective_enable() -> bool:
"""[openlineage] selective_enable."""
return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback="False")
@cache
@cache
[docs]def custom_run_facets() -> set[str]:
"""[openlineage] custom_run_facets."""
option = conf.get(_CONFIG_SECTION, "custom_run_facets", fallback="")
return set(
custom_facet_function.strip()
for custom_facet_function in option.split(";")
if custom_facet_function.strip()
)
@cache
[docs]def namespace() -> str:
"""[openlineage] namespace."""
option = conf.get(_CONFIG_SECTION, "namespace", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_NAMESPACE", "default")
return option
@cache
[docs]def transport() -> dict[str, Any]:
"""[openlineage] transport."""
option = conf.getjson(_CONFIG_SECTION, "transport", fallback={})
if not isinstance(option, dict):
raise ValueError(f"OpenLineage transport `{option}` is not a dict")
return option
@cache
[docs]def is_disabled() -> bool:
"""[openlineage] disabled + check if any configuration is present."""
if conf.getboolean(_CONFIG_SECTION, "disabled", fallback="False"):
return True
if _is_true(os.getenv("OPENLINEAGE_DISABLED", "")): # Check legacy variable
return True
# Check if both 'transport' and 'config_path' are not present and also
# if legacy 'OPENLINEAGE_URL' environment variables is not set
return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == ""
@cache
[docs]def dag_state_change_process_pool_size() -> int:
"""[openlineage] dag_state_change_process_pool_size."""
return conf.getint(_CONFIG_SECTION, "dag_state_change_process_pool_size", fallback="1")
@cache
[docs]def execution_timeout() -> int:
"""[openlineage] execution_timeout."""
return conf.getint(_CONFIG_SECTION, "execution_timeout", fallback="10")
@cache
[docs]def include_full_task_info() -> bool:
"""[openlineage] include_full_task_info."""
return conf.getboolean(_CONFIG_SECTION, "include_full_task_info", fallback="False")
@cache
[docs]def debug_mode() -> bool:
"""[openlineage] debug_mode."""
return conf.getboolean(_CONFIG_SECTION, "debug_mode", fallback="False")