Source code for airflow.providers.openlineage.conf
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
from typing import Any
from airflow.compat.functools import cache
from airflow.configuration import conf
_CONFIG_SECTION = "openlineage"
@cache
[docs]def config_path(check_legacy_env_var: bool = True) -> str:
"""[openlineage] config_path."""
option = conf.get(_CONFIG_SECTION, "config_path", fallback="")
if check_legacy_env_var and not option:
option = os.getenv("OPENLINEAGE_CONFIG", "")
return option
@cache
[docs]def is_source_enabled() -> bool:
"""[openlineage] disable_source_code."""
option = conf.get(_CONFIG_SECTION, "disable_source_code", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE", "")
return option.lower() not in ("true", "1", "t")
@cache
[docs]def disabled_operators() -> set[str]:
"""[openlineage] disabled_for_operators."""
option = conf.get(_CONFIG_SECTION, "disabled_for_operators", fallback="")
return set(operator.strip() for operator in option.split(";") if operator.strip())
@cache
[docs]def selective_enable() -> bool:
return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback=False)
@cache
@cache
[docs]def namespace() -> str:
"""[openlineage] namespace."""
option = conf.get(_CONFIG_SECTION, "namespace", fallback="")
if not option:
option = os.getenv("OPENLINEAGE_NAMESPACE", "default")
return option
@cache
[docs]def transport() -> dict[str, Any]:
"""[openlineage] transport."""
option = conf.getjson(_CONFIG_SECTION, "transport", fallback={})
if not isinstance(option, dict):
raise ValueError(f"OpenLineage transport `{option}` is not a dict")
return option
@cache
[docs]def is_disabled() -> bool:
"""[openlineage] disabled + some extra checks."""
def _is_true(val):
return str(val).lower().strip() in ("true", "1", "t")
option = conf.get(_CONFIG_SECTION, "disabled", fallback="")
if _is_true(option):
return True
option = os.getenv("OPENLINEAGE_DISABLED", "")
if _is_true(option):
return True
# Check if both 'transport' and 'config_path' are not present and also
# if legacy 'OPENLINEAGE_URL' environment variables is not set
return transport() == {} and config_path(True) == "" and os.getenv("OPENLINEAGE_URL", "") == ""