Source code for airflow.providers.cncf.kubernetes.cli.definition

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from airflow.cli.cli_config import (
    ARG_DAG_ID,
    ARG_OUTPUT_PATH,
    ARG_VERBOSE,
    ActionCommand,
    Arg,
    GroupCommand,
    lazy_load_command,
    positive_int,
)
from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
    import argparse


try:
    from airflow.cli.cli_config import ARG_LOGICAL_DATE
except ImportError:  # 2.x compatibility.
    from airflow.cli.cli_config import (  # type: ignore[attr-defined, no-redef]
        ARG_EXECUTION_DATE as ARG_LOGICAL_DATE,
    )

if AIRFLOW_V_3_0_PLUS:
    from airflow.cli.cli_config import ARG_BUNDLE_NAME

[docs] ARG_COMPAT = ARG_BUNDLE_NAME
else: from airflow.cli.cli_config import ARG_SUBDIR # type: ignore[attr-defined] ARG_COMPAT = ARG_SUBDIR # CLI Args
[docs] ARG_NAMESPACE = Arg( ("--namespace",), default=conf.get("kubernetes_executor", "namespace"), help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in configuration.", )
[docs] ARG_MIN_PENDING_MINUTES = Arg( ("--min-pending-minutes",), default=30, type=positive_int(allow_zero=False), help=( "Pending pods created before the time interval are to be cleaned up, " "measured in minutes. Default value is 30(m). The minimum value is 5(m)." ), )
# CLI Commands
[docs] KUBERNETES_COMMANDS = ( ActionCommand( name="cleanup-pods", help=( "Clean up Kubernetes pods " "(created by KubernetesExecutor/KubernetesPodOperator) " "in evicted/failed/succeeded/pending states" ), func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.cleanup_pods"), args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES, ARG_VERBOSE), ), ActionCommand( name="generate-dag-yaml", help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without " "launching into a cluster", func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.generate_pod_yaml"), args=(ARG_DAG_ID, ARG_LOGICAL_DATE, ARG_COMPAT, ARG_OUTPUT_PATH, ARG_VERBOSE), ), )
[docs] def get_kubernetes_cli_commands() -> list[GroupCommand]: return [ GroupCommand( name="kubernetes", help="Tools to help run the KubernetesExecutor", subcommands=KUBERNETES_COMMANDS, ) ]
[docs] def get_parser() -> argparse.ArgumentParser: """ Generate documentation; used by Sphinx. :meta private: """ from airflow.cli.cli_parser import AirflowHelpFormatter, DefaultHelpParser, _add_command parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter) subparsers = parser.add_subparsers(dest="subcommand", metavar="GROUP_OR_COMMAND") for group_command in get_kubernetes_cli_commands(): _add_command(subparsers, group_command) return parser

Was this entry helpful?