Source code for airflow.providers.openlineage.conf
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module provides functions for safely retrieving and handling OpenLineage configurations.To prevent errors caused by invalid user-provided configuration values, we use ``conf.get()``to fetch values as strings and perform safe conversions using custom functions.Any invalid configuration values should be treated as incorrect and replaced with default values.For example, if the default for boolean ``custom_ol_var`` is False, any non-true value provided:``"asdf"``, ``12345``, ``{"key": 1}`` or empty string, will result in False being used.By using default values for invalid configuration values, we ensure that the configurations are handledsafely, preventing potential runtime errors due to conversion issues."""from__future__importannotationsimportosfromtypingimportAnyfromairflow.compat.functoolsimportcachefromairflow.configurationimportconf_CONFIG_SECTION="openlineage"def_is_true(arg:Any)->bool:returnstr(arg).lower().strip()in("true","1","t")def_safe_int_convert(arg:Any,default:int)->int:try:returnint(arg)except(ValueError,TypeError):returndefault@cache
[docs]defis_source_enabled()->bool:"""[openlineage] disable_source_code."""option=conf.get(_CONFIG_SECTION,"disable_source_code",fallback="")ifnotoption:option=os.getenv("OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE","")# when disable_source_code is True, is_source_enabled() should be Falsereturnnot_is_true(option)
[docs]deftransport()->dict[str,Any]:"""[openlineage] transport."""option=conf.getjson(_CONFIG_SECTION,"transport",fallback={})ifnotisinstance(option,dict):raiseValueError(f"OpenLineage transport `{option}` is not a dict")returnoption
@cache
[docs]defis_disabled()->bool:"""[openlineage] disabled + check if any configuration is present."""option=conf.get(_CONFIG_SECTION,"disabled",fallback="")if_is_true(option):returnTrueoption=os.getenv("OPENLINEAGE_DISABLED","")if_is_true(option):returnTrue# Check if both 'transport' and 'config_path' are not present and also# if legacy 'OPENLINEAGE_URL' environment variables is not setreturntransport()=={}andconfig_path(True)==""andos.getenv("OPENLINEAGE_URL","")==""