# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import logging
import secrets
import string
from typing import TYPE_CHECKING
import pendulum
from deprecated import deprecated
from kubernetes.client.rest import ApiException
from slugify import slugify
from airflow.compat.functools import cache
from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
if TYPE_CHECKING:
    from airflow.models.taskinstancekey import TaskInstanceKey
[docs]log = logging.getLogger(__name__) 
[docs]alphanum_lower = string.ascii_lowercase + string.digits 
[docs]POD_NAME_MAX_LENGTH = 63  # Matches Linux kernel's HOST_NAME_MAX default value minus 1. 
def rand_str(num):
    """Generate random lowercase alphanumeric string of length num.
    :meta private:
    """
    return "".join(secrets.choice(alphanum_lower) for _ in range(num))
def add_unique_suffix(*, name: str, rand_len: int = 8, max_len: int = POD_NAME_MAX_LENGTH) -> str:
    """Add random string to pod or job name while staying under max length.
    :param name: name of the pod or job
    :param rand_len: length of the random string to append
    :param max_len: maximum length of the pod name
    :meta private:
    """
    suffix = "-" + rand_str(rand_len)
    return name[: max_len - len(suffix)].strip("-.") + suffix
@deprecated(
    reason="This function is deprecated. Please use `add_unique_suffix`",
    category=AirflowProviderDeprecationWarning,
)
def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = POD_NAME_MAX_LENGTH) -> str:
    """Add random string to pod name while staying under max length.
    :param pod_name: name of the pod
    :param rand_len: length of the random string to append
    :param max_len: maximum length of the pod name
    :meta private:
    """
    return add_unique_suffix(name=pod_name, rand_len=rand_len, max_len=max_len)
[docs]def create_unique_id(
    dag_id: str | None = None,
    task_id: str | None = None,
    *,
    max_length: int = POD_NAME_MAX_LENGTH,
    unique: bool = True,
) -> str:
    """
    Generate unique pod or job ID given a dag_id and / or task_id.
    :param dag_id: DAG ID
    :param task_id: Task ID
    :param max_length: max number of characters
    :param unique: whether a random string suffix should be added
    :return: A valid identifier for a kubernetes pod name
    """
    if not (dag_id or task_id):
        raise ValueError("Must supply either dag_id or task_id.")
    name = ""
    if dag_id:
        name += dag_id
    if task_id:
        if name:
            name += "-"
        name += task_id
    base_name = slugify(name, lowercase=True)[:max_length].strip(".-")
    if unique:
        return add_unique_suffix(name=base_name, rand_len=8, max_len=max_length)
    else:
        return base_name 
@deprecated(
    reason="This function is deprecated. Please use `create_unique_id`.",
    category=AirflowProviderDeprecationWarning,
)
[docs]def create_pod_id(
    dag_id: str | None = None,
    task_id: str | None = None,
    *,
    max_length: int = POD_NAME_MAX_LENGTH,
    unique: bool = True,
) -> str:
    """
    Generate unique pod ID given a dag_id and / or task_id.
    :param dag_id: DAG ID
    :param task_id: Task ID
    :param max_length: max number of characters
    :param unique: whether a random string suffix should be added
    :return: A valid identifier for a kubernetes pod name
    """
    return create_unique_id(dag_id=dag_id, task_id=task_id, max_length=max_length, unique=unique) 
[docs]def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
    """Build a TaskInstanceKey based on pod annotations."""
    log.debug("Creating task key for annotations %s", annotations)
    dag_id = annotations["dag_id"]
    task_id = annotations["task_id"]
    try_number = int(annotations["try_number"])
    annotation_run_id = annotations.get("run_id")
    map_index = int(annotations.get("map_index", -1))
    # Compat: Look up the run_id from the TI table!
    from airflow.models.dagrun import DagRun
    from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
    from airflow.settings import Session
    if not annotation_run_id and "execution_date" in annotations:
        execution_date = pendulum.parse(annotations["execution_date"])
        # Do _not_ use create-session, we don't want to expunge
        session = Session()
        task_instance_run_id = (
            session.query(TaskInstance.run_id)
            .join(TaskInstance.dag_run)
            .filter(
                TaskInstance.dag_id == dag_id,
                TaskInstance.task_id == task_id,
                DagRun.execution_date == execution_date,
            )
            .scalar()
        )
    else:
        task_instance_run_id = annotation_run_id
    return TaskInstanceKey(
        dag_id=dag_id,
        task_id=task_id,
        run_id=task_instance_run_id,
        try_number=try_number,
        map_index=map_index,
    ) 
@cache
[docs]def should_retry_creation(exception: BaseException) -> bool:
    """
    Check if an Exception indicates a transient error and warrants retrying.
    This function is needed for preventing 'No agent available' error. The error appears time to time
    when users try to create a Resource or Job. This issue is inside kubernetes and in the current moment
    has no solution. Like a temporary solution we decided to retry Job or Resource creation request each
    time when this error appears.
    More about this issue here: https://github.com/cert-manager/cert-manager/issues/6457
    """
    if isinstance(exception, ApiException):
        return str(exception.status) == "500"
    return False