Source code for airflow.providers.cncf.kubernetes.kube_config
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromairflow.configurationimportconffromairflow.exceptionsimportAirflowConfigExceptionfromairflow.settingsimportAIRFLOW_HOME
[docs]classKubeConfig:"""Configuration for Kubernetes."""
def__init__(self):configuration_dict=conf.as_dict(display_sensitive=True)self.core_configuration=configuration_dict[self.core_section]self.airflow_home=AIRFLOW_HOMEself.dags_folder=conf.get(self.core_section,"dags_folder")self.parallelism=conf.getint(self.core_section,"parallelism")self.pod_template_file=conf.get(self.kubernetes_section,"pod_template_file",fallback=None)self.delete_worker_pods=conf.getboolean(self.kubernetes_section,"delete_worker_pods")self.delete_worker_pods_on_failure=conf.getboolean(self.kubernetes_section,"delete_worker_pods_on_failure")self.worker_pods_creation_batch_size=conf.getint(self.kubernetes_section,"worker_pods_creation_batch_size")self.worker_container_repository=conf.get(self.kubernetes_section,"worker_container_repository")self.worker_container_tag=conf.get(self.kubernetes_section,"worker_container_tag")ifself.worker_container_repositoryandself.worker_container_tag:self.kube_image=f"{self.worker_container_repository}:{self.worker_container_tag}"else:self.kube_image=None# The Kubernetes Namespace in which the Scheduler and Webserver reside. Note# that if your# cluster has RBAC enabled, your scheduler may need service account permissions to# create, watch, get, and delete pods in this namespace.self.kube_namespace=conf.get(self.kubernetes_section,"namespace")self.multi_namespace_mode=conf.getboolean(self.kubernetes_section,"multi_namespace_mode")ifself.multi_namespace_modeandconf.get(self.kubernetes_section,"multi_namespace_mode_namespace_list"):self.multi_namespace_mode_namespace_list=conf.get(self.kubernetes_section,"multi_namespace_mode_namespace_list").split(",")else:self.multi_namespace_mode_namespace_list=None# The Kubernetes Namespace in which pods will be created by the executor. Note# that if your# cluster has RBAC enabled, your workers may need service account permissions to# interact with cluster components.self.executor_namespace=conf.get(self.kubernetes_section,"namespace")self.worker_pods_queued_check_interval=conf.getint(self.kubernetes_section,"worker_pods_queued_check_interval")self.kube_client_request_args=conf.getjson(self.kubernetes_section,"kube_client_request_args",fallback={})ifnotisinstance(self.kube_client_request_args,dict):raiseAirflowConfigException(f"[{self.kubernetes_section}] 'kube_client_request_args' expected a JSON dict, got "+type(self.kube_client_request_args).__name__)ifself.kube_client_request_args:if"_request_timeout"inself.kube_client_request_argsandisinstance(self.kube_client_request_args["_request_timeout"],list):self.kube_client_request_args["_request_timeout"]=tuple(self.kube_client_request_args["_request_timeout"])self.delete_option_kwargs=conf.getjson(self.kubernetes_section,"delete_option_kwargs",fallback={})ifnotisinstance(self.delete_option_kwargs,dict):raiseAirflowConfigException(f"[{self.kubernetes_section}] 'delete_option_kwargs' expected a JSON dict, got "+type(self.delete_option_kwargs).__name__)