Source code for airflow.providers.cncf.kubernetes.kube_client
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Client for kubernetes communication."""from__future__importannotationsimportloggingimporturllib3.utilfromairflow.configurationimportconf
def_get_default_configuration()->Configuration:ifhasattr(Configuration,"get_default_copy"):returnConfiguration.get_default_copy()returnConfiguration()def_disable_verify_ssl()->None:configuration=_get_default_configuration()configuration.verify_ssl=FalseConfiguration.set_default(configuration)exceptImportErrorase:# We need an exception class to be able to use it in ``except`` elsewhere# in the code baseApiException=BaseExceptionhas_kubernetes=False_import_err=edef_enable_tcp_keepalive()->None:""" This function enables TCP keepalive mechanism. This prevents urllib3 connection to hang indefinitely when idle connection is time-outed on services like cloud load balancers or firewalls. See https://github.com/apache/airflow/pull/11406 for detailed explanation. Please ping @michalmisiewicz or @dimberman in the PR if you want to modify this function. """importsocketfromurllib3.connectionimportHTTPConnection,HTTPSConnectiontcp_keep_idle=conf.getint("kubernetes_executor","tcp_keep_idle")tcp_keep_intvl=conf.getint("kubernetes_executor","tcp_keep_intvl")tcp_keep_cnt=conf.getint("kubernetes_executor","tcp_keep_cnt")socket_options=[(socket.SOL_SOCKET,socket.SO_KEEPALIVE,1)]ifhasattr(socket,"TCP_KEEPIDLE"):socket_options.append((socket.IPPROTO_TCP,socket.TCP_KEEPIDLE,tcp_keep_idle))else:log.debug("Unable to set TCP_KEEPIDLE on this platform")ifhasattr(socket,"TCP_KEEPINTVL"):socket_options.append((socket.IPPROTO_TCP,socket.TCP_KEEPINTVL,tcp_keep_intvl))else:log.debug("Unable to set TCP_KEEPINTVL on this platform")ifhasattr(socket,"TCP_KEEPCNT"):socket_options.append((socket.IPPROTO_TCP,socket.TCP_KEEPCNT,tcp_keep_cnt))else:log.debug("Unable to set TCP_KEEPCNT on this platform")HTTPSConnection.default_socket_options=HTTPSConnection.default_socket_options+socket_optionsHTTPConnection.default_socket_options=HTTPConnection.default_socket_options+socket_options
[docs]defget_kube_client(in_cluster:bool|None=None,cluster_context:str|None=None,config_file:str|None=None,)->client.CoreV1Api:""" Retrieves Kubernetes client. :param in_cluster: whether we are in cluster :param cluster_context: context of the cluster :param config_file: configuration file :return: kubernetes client """ifin_clusterisNone:in_cluster=conf.getboolean("kubernetes_executor","in_cluster")ifnothas_kubernetes:raise_import_errifconf.getboolean("kubernetes_executor","enable_tcp_keepalive"):_enable_tcp_keepalive()configuration=_get_default_configuration()api_client_retry_configuration=conf.getjson("kubernetes_executor","api_client_retry_configuration",fallback={})ifnotconf.getboolean("kubernetes_executor","verify_ssl"):_disable_verify_ssl()ifisinstance(api_client_retry_configuration,dict):configuration.retries=urllib3.util.Retry(**api_client_retry_configuration)else:raiseValueError("api_client_retry_configuration should be a dictionary")ifin_cluster:config.load_incluster_config(client_configuration=configuration)else:ifcluster_contextisNone:cluster_context=conf.get("kubernetes_executor","cluster_context",fallback=None)ifconfig_fileisNone:config_file=conf.get("kubernetes_executor","config_file",fallback=None)config.load_kube_config(config_file=config_file,context=cluster_context,client_configuration=configuration)ifnotconf.getboolean("kubernetes_executor","verify_ssl"):configuration.verify_ssl=Falsessl_ca_cert=conf.get("kubernetes_executor","ssl_ca_cert")ifssl_ca_cert:configuration.ssl_ca_cert=ssl_ca_certapi_client=client.ApiClient(configuration=configuration)returnclient.CoreV1Api(api_client)