Source code for airflow.providers.cncf.kubernetes.cli.kubernetes_command
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Kubernetes sub-commands."""from__future__importannotationsimportosimportsysfromdatetimeimportdatetime,timedeltafromkubernetesimportclientfromkubernetes.client.api_clientimportApiClientfromkubernetes.client.restimportApiExceptionfromairflow.modelsimportDagRun,TaskInstancefromairflow.providers.cncf.kubernetesimportpod_generatorfromairflow.providers.cncf.kubernetes.executors.kubernetes_executorimportKubeConfigfromairflow.providers.cncf.kubernetes.kube_clientimportget_kube_clientfromairflow.providers.cncf.kubernetes.kubernetes_helper_functionsimportcreate_unique_idfromairflow.providers.cncf.kubernetes.pod_generatorimportPodGeneratorfromairflow.providers.cncf.kubernetes.version_compatimportAIRFLOW_V_3_0_PLUSfromairflow.utilsimportcliascli_utils,yamlfromairflow.utils.cliimportget_dagfromairflow.utils.providers_configuration_loaderimportproviders_configuration_loaded@cli_utils.action_cli@providers_configuration_loaded
[docs]defgenerate_pod_yaml(args):"""Generate yaml files for each task in the DAG. Used for testing output of KubernetesExecutor."""logical_date=args.logical_dateifAIRFLOW_V_3_0_PLUSelseargs.execution_datedag=get_dag(subdir=args.subdir,dag_id=args.dag_id)yaml_output_path=args.output_pathifAIRFLOW_V_3_0_PLUS:dr=DagRun(dag.dag_id,logical_date=logical_date)else:dr=DagRun(dag.dag_id,execution_date=logical_date)kube_config=KubeConfig()fortaskindag.tasks:ti=TaskInstance(task,None)ti.dag_run=drpod=PodGenerator.construct_pod(dag_id=args.dag_id,task_id=ti.task_id,pod_id=create_unique_id(args.dag_id,ti.task_id),try_number=ti.try_number,kube_image=kube_config.kube_image,date=ti.logical_dateifAIRFLOW_V_3_0_PLUSelseti.execution_date,args=ti.command_as_list(),pod_override_object=PodGenerator.from_obj(ti.executor_config),scheduler_job_id="worker-config",namespace=kube_config.executor_namespace,base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),with_mutation_hook=True,)api_client=ApiClient()date_string=pod_generator.datetime_to_label_safe_datestring(logical_date)yaml_file_name=f"{args.dag_id}_{ti.task_id}_{date_string}.yml"os.makedirs(os.path.dirname(yaml_output_path+"/airflow_yaml_output/"),exist_ok=True)withopen(yaml_output_path+"/airflow_yaml_output/"+yaml_file_name,"w")asoutput:sanitized_pod=api_client.sanitize_for_serialization(pod)output.write(yaml.dump(sanitized_pod))print(f"YAML output can be found at {yaml_output_path}/airflow_yaml_output/")
[docs]defcleanup_pods(args):"""Clean up k8s pods in evicted/failed/succeeded/pending states."""namespace=args.namespacemin_pending_minutes=args.min_pending_minutes# protect newly created pods from deletionifmin_pending_minutes<5:min_pending_minutes=5# https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/# All Containers in the Pod have terminated in success, and will not be restarted.pod_succeeded="succeeded"# The Pod has been accepted by the Kubernetes cluster,# but one or more of the containers has not been set up and made ready to run.pod_pending="pending"# All Containers in the Pod have terminated, and at least one Container has terminated in failure.# That is, the Container either exited with non-zero status or was terminated by the system.pod_failed="failed"# https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/pod_reason_evicted="evicted"# If pod is failed and restartPolicy is:# * Always: Restart Container; Pod phase stays Running.# * OnFailure: Restart Container; Pod phase stays Running.# * Never: Pod phase becomes Failed.pod_restart_policy_never="never"print("Loading Kubernetes configuration")kube_client=get_kube_client()print(f"Listing pods in namespace {namespace}")airflow_pod_labels=["dag_id","task_id","try_number","airflow_version",]list_kwargs={"namespace":namespace,"limit":500,"label_selector":",".join(airflow_pod_labels)}whileTrue:pod_list=kube_client.list_namespaced_pod(**list_kwargs)forpodinpod_list.items:pod_name=pod.metadata.nameprint(f"Inspecting pod {pod_name}")pod_phase=pod.status.phase.lower()pod_reason=pod.status.reason.lower()ifpod.status.reasonelse""pod_restart_policy=pod.spec.restart_policy.lower()current_time=datetime.now(pod.metadata.creation_timestamp.tzinfo)if(pod_phase==pod_succeededor(pod_phase==pod_failedandpod_restart_policy==pod_restart_policy_never)or(pod_reason==pod_reason_evicted)or(pod_phase==pod_pendingandcurrent_time-pod.metadata.creation_timestamp>timedelta(minutes=min_pending_minutes))):print(f'Deleting pod "{pod_name}" phase "{pod_phase}" and reason "{pod_reason}", 'f'restart policy "{pod_restart_policy}"')try:_delete_pod(pod.metadata.name,namespace)exceptApiExceptionase:print(f"Can't remove POD: {e}",file=sys.stderr)else:print(f"No action taken on pod {pod_name}")continue_token=pod_list.metadata._continueifnotcontinue_token:breaklist_kwargs["_continue"]=continue_token
def_delete_pod(name,namespace):""" Delete a namespaced pod. Helper Function for cleanup_pods. """kube_client=get_kube_client()delete_options=client.V1DeleteOptions()print(f'Deleting POD "{name}" from "{namespace}" namespace')api_response=kube_client.delete_namespaced_pod(name=name,namespace=namespace,body=delete_options)print(api_response)