Source code for airflow.providers.microsoft.azure.utils
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import warnings
from functools import wraps
def _ensure_prefixes(conn_type):
"""
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
"""
def dec(func):
@wraps(func)
def inner():
field_behaviors = func()
conn_attrs = {"host", "schema", "login", "password", "port", "extra"}
def _ensure_prefix(field):
if field not in conn_attrs and not field.startswith("extra__"):
return f"extra__{conn_type}__{field}"
else:
return field
if "placeholders" in field_behaviors:
placeholders = field_behaviors["placeholders"]
field_behaviors["placeholders"] = {_ensure_prefix(k): v for k, v in placeholders.items()}
return field_behaviors
return inner
return dec
[docs]def get_field(*, conn_id: str, conn_type: str, extras: dict, field_name: str):
"""Get field from extra, first checking short name, then for backcompat we check for prefixed name."""
backcompat_prefix = f"extra__{conn_type}__"
backcompat_key = f"{backcompat_prefix}{field_name}"
ret = None
if field_name.startswith("extra__"):
raise ValueError(
f"Got prefixed name {field_name}; please remove the '{backcompat_prefix}' prefix "
"when using this method."
)
if field_name in extras:
if backcompat_key in extras:
warnings.warn(
f"Conflicting params `{field_name}` and `{backcompat_key}` found in extras for conn "
f"{conn_id}. Using value for `{field_name}`. Please ensure this is the correct "
f"value and remove the backcompat key `{backcompat_key}`."
)
ret = extras[field_name]
elif backcompat_key in extras:
ret = extras.get(backcompat_key)
if ret == "":
return None
return ret