Source code for airflow.providers.cncf.kubernetes.operators.custom_object_launcher
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Launches Custom object."""from__future__importannotationsimporttimefromcopyimportdeepcopyfromdatetimeimportdatetimeasdtfromfunctoolsimportcached_propertyimporttenacityfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.cncf.kubernetes.resource_convert.configmapimport(convert_configmap,convert_configmap_to_volume,)fromairflow.providers.cncf.kubernetes.resource_convert.env_variableimportconvert_env_varsfromairflow.providers.cncf.kubernetes.resource_convert.secretimport(convert_image_pull_secrets,convert_secret,)fromairflow.providers.cncf.kubernetes.utils.pod_managerimportPodManagerfromairflow.utils.log.logging_mixinimportLoggingMixinfromkubernetes.clientimportCoreV1Api,CustomObjectsApi,modelsask8sfromkubernetes.client.restimportApiException
[docs]defshould_retry_start_spark_job(exception:BaseException)->bool:"""Check if an Exception indicates a transient error and warrants retrying."""ifisinstance(exception,ApiException):returnstr(exception.status)=="409"returnFalse
[docs]defvalidate(self):ifself.spec.get("dynamicAllocation",{}).get("enabled"):ifnotall([self.spec["dynamicAllocation"].get("minExecutors"),self.spec["dynamicAllocation"].get("maxExecutors"),]):raiseAirflowException("Make sure min/max value for dynamic allocation is passed")
[docs]defdriver_resources(self):"""Return resources to use."""driver={}ifself.driver["cpu"].get("request"):driver["cores"]=self.driver["cpu"]["request"]ifself.driver["cpu"].get("limit"):driver["coreLimit"]=self.driver["cpu"]["limit"]ifself.driver["memory"].get("limit"):driver["memory"]=self.driver["memory"]["limit"]ifself.driver["gpu"].get("name")andself.driver["gpu"].get("quantity"):driver["gpu"]={"name":self.driver["gpu"]["name"],"quantity":self.driver["gpu"]["quantity"]}returndriver
@property
[docs]defexecutor_resources(self):"""Return resources to use."""executor={}ifself.executor["cpu"].get("request"):executor["cores"]=self.executor["cpu"]["request"]ifself.executor["cpu"].get("limit"):executor["coreLimit"]=self.executor["cpu"]["limit"]ifself.executor["memory"].get("limit"):executor["memory"]=self.executor["memory"]["limit"]ifself.executor["gpu"].get("name")andself.executor["gpu"].get("quantity"):executor["gpu"]={"name":self.executor["gpu"]["name"],"quantity":self.executor["gpu"]["quantity"],}returnexecutor
[docs]defconvert_resources(self):ifisinstance(self.driver["memory"].get("limit"),str):if"G"inself.driver["memory"]["limit"]or"Gi"inself.driver["memory"]["limit"]:self.driver["memory"]["limit"]=float(self.driver["memory"]["limit"].rstrip("Gi G"))*1024elif"m"inself.driver["memory"]["limit"]:self.driver["memory"]["limit"]=float(self.driver["memory"]["limit"].rstrip("m"))# Adjusting the memory value as operator adds 40% to the given valueself.driver["memory"]["limit"]=str(int(self.driver["memory"]["limit"]/1.4))+"m"ifisinstance(self.executor["memory"].get("limit"),str):if"G"inself.executor["memory"]["limit"]or"Gi"inself.executor["memory"]["limit"]:self.executor["memory"]["limit"]=(float(self.executor["memory"]["limit"].rstrip("Gi G"))*1024)elif"m"inself.executor["memory"]["limit"]:self.executor["memory"]["limit"]=float(self.executor["memory"]["limit"].rstrip("m"))# Adjusting the memory value as operator adds 40% to the given valueself.executor["memory"]["limit"]=str(int(self.executor["memory"]["limit"]/1.4))+"m"ifself.driver["cpu"].get("request"):self.driver["cpu"]["request"]=int(float(self.driver["cpu"]["request"]))ifself.driver["cpu"].get("limit"):self.driver["cpu"]["limit"]=str(self.driver["cpu"]["limit"])ifself.executor["cpu"].get("request"):self.executor["cpu"]["request"]=int(float(self.executor["cpu"]["request"]))ifself.executor["cpu"].get("limit"):self.executor["cpu"]["limit"]=str(self.executor["cpu"]["limit"])ifself.driver["gpu"].get("quantity"):self.driver["gpu"]["quantity"]=int(float(self.driver["gpu"]["quantity"]))ifself.executor["gpu"].get("quantity"):self.executor["gpu"]["quantity"]=int(float(self.executor["gpu"]["quantity"]))
[docs]classCustomObjectStatus:"""Status of the PODs."""
[docs]defstart_spark_job(self,image=None,code_path=None,startup_timeout:int=600):""" Launch the pod synchronously and waits for completion. :param image: image name :param code_path: path to the .py file for python and jar file for scala :param startup_timeout: Timeout for startup of the pod (if pod is pending for too long, fails task) :return: """try:ifimage:self.body["spec"]["image"]=imageifcode_path:self.body["spec"]["mainApplicationFile"]=code_pathself.log.debug("Spark Job Creation Request Submitted")self.spark_obj_spec=self.custom_obj_api.create_namespaced_custom_object(group=self.api_group,version=self.api_version,namespace=self.namespace,plural=self.plural,body=self.body,)self.log.debug("Spark Job Creation Response: %s",self.spark_obj_spec)# Wait for the driver pod to come aliveself.pod_spec=k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels=self.spark_obj_spec["spec"]["driver"].get("labels"),name=self.spark_obj_spec["metadata"]["name"]+"-driver",namespace=self.namespace,))curr_time=dt.now()whileself.spark_job_not_running(self.spark_obj_spec):self.log.warning("Spark job submitted but not yet started. job_id: %s",self.spark_obj_spec["metadata"]["name"],)self.check_pod_start_failure()delta=dt.now()-curr_timeifdelta.total_seconds()>=startup_timeout:pod_status=self.pod_manager.read_pod(self.pod_spec).status.container_statusesraiseAirflowException(f"Job took too long to start. pod status: {pod_status}")time.sleep(10)exceptExceptionase:self.log.exception("Exception when attempting to create spark job")raiseereturnself.pod_spec,self.spark_obj_spec
[docs]defspark_job_not_running(self,spark_obj_spec):"""Test if spark_obj_spec has not started."""spark_job_info=self.custom_obj_api.get_namespaced_custom_object_status(group=self.api_group,version=self.api_version,namespace=self.namespace,name=spark_obj_spec["metadata"]["name"],plural=self.plural,)driver_state=spark_job_info.get("status",{}).get("applicationState",{}).get("state","SUBMITTED")ifdriver_state==CustomObjectStatus.FAILED:err=spark_job_info.get("status",{}).get("applicationState",{}).get("errorMessage","N/A")try:self.pod_manager.fetch_container_logs(pod=self.pod_spec,container_name="spark-kubernetes-driver")exceptException:passraiseAirflowException(f"Spark Job Failed. Error stack: {err}")returndriver_state==CustomObjectStatus.SUBMITTED
[docs]defdelete_spark_job(self,spark_job_name=None):"""Delete spark job."""spark_job_name=spark_job_nameorself.spark_obj_spec.get("metadata",{}).get("name")ifnotspark_job_name:self.log.warning("Spark job not found: %s",spark_job_name)returntry:self.custom_obj_api.delete_namespaced_custom_object(group=self.api_group,version=self.api_version,namespace=self.namespace,plural=self.plural,name=spark_job_name,)exceptApiExceptionase:# If the pod is already deletedifstr(e.status)!="404":raise