Source code for airflow.providers.google.cloud.operators.kubernetes_engine
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains Google Kubernetes Engine operators."""from__future__importannotationsimportwarningsfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Any,Sequenceimportrequestsimportyamlfromdeprecatedimportdeprecatedfromgoogle.api_core.exceptionsimportAlreadyExistsfromgoogle.cloud.container_v1.typesimportClusterfromkubernetes.clientimportV1JobList,modelsask8sfromkubernetes.utils.create_from_yamlimportFailToCreateErrorfrompackaging.versionimportparseasparse_versionfromairflow.configurationimportconffromairflow.exceptionsimportAirflowException,AirflowProviderDeprecationWarningfromairflow.providers.cncf.kubernetes.operators.jobimportKubernetesJobOperatorfromairflow.providers.cncf.kubernetes.operators.podimportKubernetesPodOperatorfromairflow.providers.cncf.kubernetes.operators.resourceimport(KubernetesCreateResourceOperator,KubernetesDeleteResourceOperator,)fromairflow.providers.cncf.kubernetes.utils.pod_managerimportOnFinishActionfromairflow.providers.google.cloud.hooks.kubernetes_engineimport(GKEHook,GKEKubernetesHook,)fromairflow.providers.google.cloud.links.kubernetes_engineimport(KubernetesEngineClusterLink,KubernetesEngineJobLink,KubernetesEnginePodLink,KubernetesEngineWorkloadsLink,)fromairflow.providers.google.cloud.operators.cloud_baseimportGoogleCloudBaseOperatorfromairflow.providers.google.cloud.triggers.kubernetes_engineimport(GKEJobTrigger,GKEOperationTrigger,GKEStartPodTrigger,)fromairflow.providers.google.common.hooks.base_googleimportPROVIDE_PROJECT_IDfromairflow.providers_managerimportProvidersManagerfromairflow.utils.timezoneimportutcnowtry:fromairflow.providers.cncf.kubernetes.operators.jobimportKubernetesDeleteJobOperatorexceptImportError:fromairflow.exceptionsimportAirflowOptionalProviderFeatureExceptionraiseAirflowOptionalProviderFeatureException("Failed to import KubernetesDeleteJobOperator. This operator is only available in cncf-kubernetes ""provider version >=8.1.0")ifTYPE_CHECKING:fromkubernetes.client.modelsimportV1Job,V1Podfromairflow.utils.contextimportContext
[docs]classGKEClusterAuthDetails:""" Helper for fetching information about cluster for connecting. :param cluster_name: The name of the Google Kubernetes Engine cluster. :param project_id: The Google Developers Console project id. :param use_internal_ip: Use the internal IP address as the endpoint. :param cluster_hook: airflow hook for working with kubernetes cluster. """def__init__(self,cluster_name,project_id,use_internal_ip,cluster_hook,):self.cluster_name=cluster_nameself.project_id=project_idself.use_internal_ip=use_internal_ipself.cluster_hook=cluster_hookself._cluster_url=Noneself._ssl_ca_cert=None
[docs]deffetch_cluster_info(self)->tuple[str,str]:"""Fetch cluster info for connecting to it."""cluster=self.cluster_hook.get_cluster(name=self.cluster_name,project_id=self.project_id,)ifnotself.use_internal_ip:self._cluster_url=f"https://{cluster.endpoint}"else:self._cluster_url=f"https://{cluster.private_cluster_config.private_endpoint}"self._ssl_ca_cert=cluster.master_auth.cluster_ca_certificatereturnself._cluster_url,self._ssl_ca_cert
[docs]classGKEDeleteClusterOperator(GoogleCloudBaseOperator):""" Deletes the cluster, including the Kubernetes endpoint and all worker nodes. To delete a certain cluster, you must specify the ``project_id``, the ``name`` of the cluster, the ``location`` that the cluster is in, and the ``task_id``. **Operator Creation**: :: operator = GKEClusterDeleteOperator( task_id='cluster_delete', project_id='my-project', location='cluster-location' name='cluster-name') .. seealso:: For more detail about deleting clusters have a look at the reference: https://google-cloud-python.readthedocs.io/en/latest/container/gapic/v1/api.html#google.cloud.container_v1.ClusterManagerClient.delete_cluster .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKEDeleteClusterOperator` :param project_id: The Google Developers Console [project ID or project number] :param name: The name of the resource to delete, in this case cluster name :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides. :param gcp_conn_id: The connection ID to use connecting to Google Cloud. :param api_version: The api version to use :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). :param deferrable: Run operator in the deferrable mode. :param poll_interval: Interval size which defines how often operation status is checked. """
def__init__(self,*,name:str,location:str,project_id:str=PROVIDE_PROJECT_ID,gcp_conn_id:str="google_cloud_default",api_version:str="v2",impersonation_chain:str|Sequence[str]|None=None,deferrable:bool=conf.getboolean("operators","default_deferrable",fallback=False),poll_interval:int=10,**kwargs,)->None:super().__init__(**kwargs)self.project_id=project_idself.gcp_conn_id=gcp_conn_idself.location=locationself.api_version=api_versionself.name=nameself.impersonation_chain=impersonation_chainself.deferrable=deferrableself.poll_interval=poll_intervalself._check_input()self._hook:GKEHook|None=Nonedef_check_input(self)->None:ifnotall([self.project_id,self.name,self.location]):self.log.error("One of (project_id, name, location) is missing or incorrect")raiseAirflowException("Operator has incorrect or missing input.")
[docs]defexecute_complete(self,context:Context,event:dict)->str:"""Execute after trigger job is done."""status=event["status"]message=event["message"]ifstatusin("failed","error"):self.log.exception("Trigger ended with one of the failed statuses.")raiseAirflowException(message)self.log.info(message)operation=self._get_hook().get_operation(operation_name=event["operation_name"],)returnoperation.self_link
[docs]classGKECreateClusterOperator(GoogleCloudBaseOperator):""" Create a Google Kubernetes Engine Cluster of specified dimensions and wait until the cluster is created. The **minimum** required to define a cluster to create is: ``dict()`` :: cluster_def = {"name": "my-cluster-name", "initial_node_count": 1} or ``Cluster`` proto :: from google.cloud.container_v1.types import Cluster cluster_def = Cluster(name="my-cluster-name", initial_node_count=1) **Operator Creation**: :: operator = GKEClusterCreateOperator( task_id='cluster_create', project_id='my-project', location='my-location' body=cluster_def) .. seealso:: For more detail on about creating clusters have a look at the reference: :class:`google.cloud.container_v1.types.Cluster` .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKECreateClusterOperator` :param project_id: The Google Developers Console [project ID or project number] :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides. :param body: The Cluster definition to create, can be protobuf or python dict, if dict it must match protobuf message Cluster :param gcp_conn_id: The connection ID to use connecting to Google Cloud. :param api_version: The api version to use :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). :param deferrable: Run operator in the deferrable mode. :param poll_interval: Interval size which defines how often operation status is checked. """
def__init__(self,*,location:str,body:dict|Cluster,project_id:str=PROVIDE_PROJECT_ID,gcp_conn_id:str="google_cloud_default",api_version:str="v2",impersonation_chain:str|Sequence[str]|None=None,poll_interval:int=10,deferrable:bool=conf.getboolean("operators","default_deferrable",fallback=False),**kwargs,)->None:super().__init__(**kwargs)self.project_id=project_idself.gcp_conn_id=gcp_conn_idself.location=locationself.api_version=api_versionself.body=bodyself.impersonation_chain=impersonation_chainself.poll_interval=poll_intervalself.deferrable=deferrableself._validate_input()self._hook:GKEHook|None=Nonedef_validate_input(self)->None:"""Primary validation of the input body."""self._alert_deprecated_body_fields()error_messages:list[str]=[]ifnotself._body_field("name"):error_messages.append("Field body['name'] is missing or incorrect")ifself._body_field("initial_node_count"):ifself._body_field("node_pools"):error_messages.append("Do not use filed body['initial_node_count'] and body['node_pools'] at the same time.")ifself._body_field("node_config"):ifself._body_field("node_pools"):error_messages.append("Do not use filed body['node_config'] and body['node_pools'] at the same time.")ifself._body_field("node_pools"):ifany([self._body_field("node_config"),self._body_field("initial_node_count")]):error_messages.append("The field body['node_pools'] should not be set if ""body['node_config'] or body['initial_code_count'] are specified.")ifnotany([self._body_field("node_config"),self._body_field("initial_node_count")]):ifnotself._body_field("node_pools"):error_messages.append("Field body['node_pools'] is required if none of fields ""body['initial_node_count'] or body['node_pools'] are specified.")formessageinerror_messages:self.log.error(message)iferror_messages:raiseAirflowException("Operator has incorrect or missing input.")def_body_field(self,field_name:str,default_value:Any=None)->Any:"""Extract the value of the given field name."""ifisinstance(self.body,dict):returnself.body.get(field_name,default_value)else:returngetattr(self.body,field_name,default_value)def_alert_deprecated_body_fields(self)->None:"""Generate warning messages if deprecated fields were used in the body."""deprecated_body_fields_with_replacement=[("initial_node_count","node_pool.initial_node_count"),("node_config","node_pool.config"),("zone","location"),("instance_group_urls","node_pools.instance_group_urls"),]fordeprecated_field,replacementindeprecated_body_fields_with_replacement:ifself._body_field(deprecated_field):warnings.warn(f"The body field '{deprecated_field}' is deprecated. Use '{replacement}' instead.",AirflowProviderDeprecationWarning,stacklevel=2,)
[docs]defexecute_complete(self,context:Context,event:dict)->str:status=event["status"]message=event["message"]ifstatusin("failed","error"):self.log.exception("Trigger ended with one of the failed statuses.")raiseAirflowException(message)self.log.info(message)operation=self._get_hook().get_operation(operation_name=event["operation_name"],)returnoperation.target_link
[docs]classGKEStartKueueInsideClusterOperator(GoogleCloudBaseOperator):""" Installs Kueue of specific version inside Cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKEStartKueueInsideClusterOperator` .. seealso:: For more details about Kueue have a look at the reference: https://kueue.sigs.k8s.io/docs/overview/ :param project_id: The Google Developers Console [project ID or project number]. :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides. :param cluster_name: The Cluster name in which to install Kueue. :param kueue_version: Version of Kueue to install. :param gcp_conn_id: The connection ID to use connecting to Google Cloud. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). """
[docs]defdeployment_hook(self)->GKEKubernetesHook:ifself._cluster_urlisNoneorself._ssl_ca_certisNone:raiseAttributeError("Cluster url and ssl_ca_cert should be defined before using self.deployment_hook method. ""Try to use self.get_kube_creds method",)returnGKEKubernetesHook(gcp_conn_id=self.gcp_conn_id,impersonation_chain=self.impersonation_chain,cluster_url=self._cluster_url,ssl_ca_cert=self._ssl_ca_cert,)
@cached_property
[docs]defpod_hook(self)->GKEKubernetesHook:ifself._cluster_urlisNoneorself._ssl_ca_certisNone:raiseAttributeError("Cluster url and ssl_ca_cert should be defined before using self.pod_hook method. ""Try to use self.get_kube_creds method",)returnGKEKubernetesHook(gcp_conn_id=self.gcp_conn_id,impersonation_chain=self.impersonation_chain,cluster_url=self._cluster_url,ssl_ca_cert=self._ssl_ca_cert,enable_tcp_keepalive=True,)
@staticmethoddef_get_yaml_content_from_file(kueue_yaml_url)->list[dict]:"""Download content of YAML file and separate it into several dictionaries."""response=requests.get(kueue_yaml_url,allow_redirects=True)ifresponse.status_code!=200:raiseAirflowException("Was not able to read the yaml file from given URL")returnlist(yaml.safe_load_all(response.text))
[docs]defexecute(self,context:Context):self._cluster_url,self._ssl_ca_cert=GKEClusterAuthDetails(cluster_name=self.cluster_name,project_id=self.project_id,use_internal_ip=self.use_internal_ip,cluster_hook=self.cluster_hook,).fetch_cluster_info()cluster=self.cluster_hook.get_cluster(name=self.cluster_name,project_id=self.project_id,)KubernetesEngineClusterLink.persist(context=context,task_instance=self,cluster=cluster)yaml_objects=self._get_yaml_content_from_file(kueue_yaml_url=self._kueue_yaml_url)ifself.cluster_hook.check_cluster_autoscaling_ability(cluster=cluster):try:self.pod_hook.apply_from_yaml_file(yaml_objects=yaml_objects)self.deployment_hook.check_kueue_deployment_running(name="kueue-controller-manager",namespace="kueue-system")self.log.info("Kueue installed successfully!")exceptFailToCreateError:self.log.info("Kueue is already enabled for the cluster")else:self.log.info("Cluster doesn't have ability to autoscale, will not install Kueue inside. Aborting")
[docs]classGKEStartPodOperator(KubernetesPodOperator):""" Executes a task in a Kubernetes pod in the specified Google Kubernetes Engine cluster. This Operator assumes that the system has gcloud installed and has configured a connection id with a service account. The **minimum** required to define a cluster to create are the variables ``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``, ``namespace``, and ``image`` .. seealso:: For more detail about Kubernetes Engine authentication have a look at the reference: https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl#internal_ip .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKEStartPodOperator` :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides, e.g. 'us-central1-a' :param cluster_name: The name of the Google Kubernetes Engine cluster the pod should be spawned in :param use_internal_ip: Use the internal IP address as the endpoint. :param project_id: The Google Developers Console project id :param gcp_conn_id: The Google cloud connection id to use. This allows for users to specify a service account. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). :param regional: The location param is region name. :param deferrable: Run operator in the deferrable mode. :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. Current default is `keep_pod`, but this will be changed in the next major release of this provider. :param is_delete_operator_pod: What to do when the pod reaches its final state, or the execution is interrupted. If True, delete the pod; if False, leave the pod. Current default is False, but this will be changed in the next major release of this provider. Deprecated - use `on_finish_action` instead. """
def__init__(self,*,location:str,cluster_name:str,use_internal_ip:bool=False,project_id:str=PROVIDE_PROJECT_ID,gcp_conn_id:str="google_cloud_default",impersonation_chain:str|Sequence[str]|None=None,regional:bool|None=None,on_finish_action:str|None=None,is_delete_operator_pod:bool|None=None,**kwargs,)->None:ifis_delete_operator_podisnotNone:warnings.warn("`is_delete_operator_pod` parameter is deprecated, please use `on_finish_action`",AirflowProviderDeprecationWarning,stacklevel=2,)kwargs["on_finish_action"]=(OnFinishAction.DELETE_PODifis_delete_operator_podelseOnFinishAction.KEEP_POD)else:ifon_finish_actionisnotNone:kwargs["on_finish_action"]=OnFinishAction(on_finish_action)else:warnings.warn(f"You have not set parameter `on_finish_action` in class {self.__class__.__name__}. ""Currently the default for this parameter is `keep_pod` but in a future release"" the default will be changed to `delete_pod`. To ensure pods are not deleted in"" the future you will need to set `on_finish_action=keep_pod` explicitly.",AirflowProviderDeprecationWarning,stacklevel=2,)kwargs["on_finish_action"]=OnFinishAction.KEEP_PODifregionalisnotNone:warnings.warn(f"You have set parameter regional in class {self.__class__.__name__}. ""In current implementation of the operator the parameter is not used and will ""be deleted in future.",AirflowProviderDeprecationWarning,stacklevel=2,)super().__init__(**kwargs)self.project_id=project_idself.location=locationself.cluster_name=cluster_nameself.gcp_conn_id=gcp_conn_idself.impersonation_chain=impersonation_chainself.use_internal_ip=use_internal_ipself.pod:V1Pod|None=Noneself._ssl_ca_cert:str|None=Noneself._cluster_url:str|None=Noneifself.gcp_conn_idisNone:raiseAirflowException("The gcp_conn_id parameter has become required. If you want to use Application Default ""Credentials (ADC) strategy for authorization, create an empty connection ""called `google_cloud_default`.",)# There is no need to manage the kube_config file, as it will be generated automatically.# All Kubernetes parameters (except config_file) are also valid for the GKEStartPodOperator.ifself.config_file:raiseAirflowException("config_file is not an allowed parameter for the GKEStartPodOperator.")@staticmethod@deprecated(reason="Please use `fetch_cluster_info` instead to get the cluster info for connecting to it.",category=AirflowProviderDeprecationWarning,)
[docs]defhook(self)->GKEKubernetesHook:ifself._cluster_urlisNoneorself._ssl_ca_certisNone:raiseAttributeError("Cluster url and ssl_ca_cert should be defined before using self.hook method. ""Try to use self.get_kube_creds method",)returnGKEKubernetesHook(gcp_conn_id=self.gcp_conn_id,cluster_url=self._cluster_url,ssl_ca_cert=self._ssl_ca_cert,impersonation_chain=self.impersonation_chain,enable_tcp_keepalive=True,)
[docs]defexecute(self,context:Context):"""Execute process of creating pod and executing provided command inside it."""self.fetch_cluster_info()returnsuper().execute(context)
[docs]deffetch_cluster_info(self)->tuple[str,str|None]:"""Fetch cluster info for connecting to it."""cluster=self.cluster_hook.get_cluster(name=self.cluster_name,project_id=self.project_id,)ifnotself.use_internal_ip:self._cluster_url=f"https://{cluster.endpoint}"else:self._cluster_url=f"https://{cluster.private_cluster_config.private_endpoint}"self._ssl_ca_cert=cluster.master_auth.cluster_ca_certificatereturnself._cluster_url,self._ssl_ca_cert
[docs]definvoke_defer_method(self):"""Redefine triggers which are being used in child classes."""trigger_start_time=utcnow()self.defer(trigger=GKEStartPodTrigger(pod_name=self.pod.metadata.name,pod_namespace=self.pod.metadata.namespace,trigger_start_time=trigger_start_time,cluster_url=self._cluster_url,ssl_ca_cert=self._ssl_ca_cert,get_logs=self.get_logs,startup_timeout=self.startup_timeout_seconds,cluster_context=self.cluster_context,poll_interval=self.poll_interval,in_cluster=self.in_cluster,base_container_name=self.base_container_name,on_finish_action=self.on_finish_action,gcp_conn_id=self.gcp_conn_id,impersonation_chain=self.impersonation_chain,),method_name="execute_complete",kwargs={"cluster_url":self._cluster_url,"ssl_ca_cert":self._ssl_ca_cert},)
[docs]defexecute_complete(self,context:Context,event:dict,**kwargs):# It is required for hook to be initializedself._cluster_url=kwargs["cluster_url"]self._ssl_ca_cert=kwargs["ssl_ca_cert"]returnsuper().execute_complete(context,event,**kwargs)
[docs]classGKEStartJobOperator(KubernetesJobOperator):""" Executes a Kubernetes job in the specified Google Kubernetes Engine cluster. This Operator assumes that the system has gcloud installed and has configured a connection id with a service account. The **minimum** required to define a cluster to create are the variables ``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``, ``namespace``, and ``image`` .. seealso:: For more detail about Kubernetes Engine authentication have a look at the reference: https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl#internal_ip .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKEStartJobOperator` :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides, e.g. 'us-central1-a' :param cluster_name: The name of the Google Kubernetes Engine cluster :param use_internal_ip: Use the internal IP address as the endpoint. :param project_id: The Google Developers Console project id :param gcp_conn_id: The Google cloud connection id to use. This allows for users to specify a service account. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). :param location: The location param is region name. :param deferrable: Run operator in the deferrable mode. :param poll_interval: (Deferrable mode only) polling period in seconds to check for the status of job. """
def__init__(self,*,location:str,cluster_name:str,use_internal_ip:bool=False,project_id:str=PROVIDE_PROJECT_ID,gcp_conn_id:str="google_cloud_default",impersonation_chain:str|Sequence[str]|None=None,deferrable:bool=conf.getboolean("operators","default_deferrable",fallback=False),job_poll_interval:float=10.0,**kwargs,)->None:super().__init__(**kwargs)self.project_id=project_idself.location=locationself.cluster_name=cluster_nameself.gcp_conn_id=gcp_conn_idself.impersonation_chain=impersonation_chainself.use_internal_ip=use_internal_ipself.deferrable=deferrableself.job_poll_interval=job_poll_intervalself.job:V1Job|None=Noneself._ssl_ca_cert:str|None=Noneself._cluster_url:str|None=Noneifself.gcp_conn_idisNone:raiseAirflowException("The gcp_conn_id parameter has become required. If you want to use Application Default ""Credentials (ADC) strategy for authorization, create an empty connection ""called `google_cloud_default`.",)# There is no need to manage the kube_config file, as it will be generated automatically.# All Kubernetes parameters (except config_file) are also valid for the GKEStartJobOperator.ifself.config_file:raiseAirflowException("config_file is not an allowed parameter for the GKEStartJobOperator.")@cached_property
[docs]defhook(self)->GKEKubernetesHook:ifself._cluster_urlisNoneorself._ssl_ca_certisNone:raiseAttributeError("Cluster url and ssl_ca_cert should be defined before using self.hook method. ""Try to use self.get_kube_creds method",)returnGKEKubernetesHook(gcp_conn_id=self.gcp_conn_id,cluster_url=self._cluster_url,ssl_ca_cert=self._ssl_ca_cert,)
[docs]defexecute(self,context:Context):"""Execute process of creating Job."""ifself.deferrable:kubernetes_provider=ProvidersManager().providers["apache-airflow-providers-cncf-kubernetes"]kubernetes_provider_name=kubernetes_provider.data["package-name"]kubernetes_provider_version=kubernetes_provider.versionmin_version="8.0.1"ifparse_version(kubernetes_provider_version)<=parse_version(min_version):raiseAirflowException("You are trying to use `GKEStartJobOperator` in deferrable mode with the provider "f"package {kubernetes_provider_name}=={kubernetes_provider_version} which doesn't "f"support this feature. Please upgrade it to version higher than {min_version}.")self._cluster_url,self._ssl_ca_cert=GKEClusterAuthDetails(cluster_name=self.cluster_name,project_id=self.project_id,use_internal_ip=self.use_internal_ip,cluster_hook=self.cluster_hook,).fetch_cluster_info()returnsuper().execute(context)
[docs]classGKEDescribeJobOperator(GoogleCloudBaseOperator):""" Retrieve information about Job by given name. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKEDescribeJobOperator` :param job_name: The name of the Job to delete :param project_id: The Google Developers Console project id. :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides. :param cluster_name: The name of the Google Kubernetes Engine cluster. :param namespace: The name of the Google Kubernetes Engine namespace. :param use_internal_ip: Use the internal IP address as the endpoint. :param gcp_conn_id: The connection ID to use connecting to Google Cloud. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). """
[docs]defexecute(self,context:Context)->None:self.job=self.hook.get_job(job_name=self.job_name,namespace=self.namespace)self.log.info("Retrieved description of Job %s from cluster %s:\n%s",self.job_name,self.cluster_name,self.job,)KubernetesEngineJobLink.persist(context=context,task_instance=self)returnNone
[docs]classGKEListJobsOperator(GoogleCloudBaseOperator):""" Retrieve list of Jobs. If namespace parameter is specified, the list of Jobs from dedicated namespace will be retrieved. If no namespace specified, it will output Jobs from all namespaces. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKEListJobsOperator` :param project_id: The Google Developers Console project id. :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides. :param cluster_name: The name of the Google Kubernetes Engine cluster. :param namespace: The name of the Google Kubernetes Engine namespace. :param use_internal_ip: Use the internal IP address as the endpoint. :param gcp_conn_id: The connection ID to use connecting to Google Cloud. :param do_xcom_push: If set to True the result list of Jobs will be pushed to the task result. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). """
[docs]defexecute(self,context:Context)->dict:ifself.namespace:jobs=self.hook.list_jobs_from_namespace(namespace=self.namespace)else:jobs=self.hook.list_jobs_all_namespaces()forjobinjobs.items:self.log.info("Retrieved description of Job:\n%s",job)ifself.do_xcom_push:ti=context["ti"]ti.xcom_push(key="jobs_list",value=V1JobList.to_dict(jobs))KubernetesEngineWorkloadsLink.persist(context=context,task_instance=self)returnV1JobList.to_dict(jobs)
[docs]classGKECreateCustomResourceOperator(KubernetesCreateResourceOperator):""" Create a resource in the specified Google Kubernetes Engine cluster. This Operator assumes that the system has gcloud installed and has configured a connection id with a service account. .. seealso:: For more detail about Kubernetes Engine authentication have a look at the reference: https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl#internal_ip .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKECreateCustomResourceOperator` :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides, e.g. 'us-central1-a' :param cluster_name: The name of the Google Kubernetes Engine cluster. :param use_internal_ip: Use the internal IP address as the endpoint. :param project_id: The Google Developers Console project id :param gcp_conn_id: The Google cloud connection id to use. This allows for users to specify a service account. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). """
def__init__(self,*,location:str,cluster_name:str,use_internal_ip:bool=False,project_id:str=PROVIDE_PROJECT_ID,gcp_conn_id:str="google_cloud_default",impersonation_chain:str|Sequence[str]|None=None,**kwargs,)->None:super().__init__(**kwargs)self.project_id=project_idself.location=locationself.cluster_name=cluster_nameself.gcp_conn_id=gcp_conn_idself.impersonation_chain=impersonation_chainself.use_internal_ip=use_internal_ipself._ssl_ca_cert:str|None=Noneself._cluster_url:str|None=Noneifself.gcp_conn_idisNone:raiseAirflowException("The gcp_conn_id parameter has become required. If you want to use Application Default ""Credentials (ADC) strategy for authorization, create an empty connection ""called `google_cloud_default`.",)# There is no need to manage the kube_config file, as it will be generated automatically.# All Kubernetes parameters (except config_file) are also valid for the GKEStartPodOperator.ifself.config_file:raiseAirflowException("config_file is not an allowed parameter for the GKEStartPodOperator.")@cached_property
[docs]defhook(self)->GKEKubernetesHook:ifself._cluster_urlisNoneorself._ssl_ca_certisNone:raiseAttributeError("Cluster url and ssl_ca_cert should be defined before using self.hook method. ""Try to use self.get_kube_creds method",)returnGKEKubernetesHook(gcp_conn_id=self.gcp_conn_id,cluster_url=self._cluster_url,ssl_ca_cert=self._ssl_ca_cert,impersonation_chain=self.impersonation_chain,)
[docs]defexecute(self,context:Context):"""Execute process of creating Custom Resource."""self._cluster_url,self._ssl_ca_cert=GKEClusterAuthDetails(cluster_name=self.cluster_name,project_id=self.project_id,use_internal_ip=self.use_internal_ip,cluster_hook=self.cluster_hook,).fetch_cluster_info()returnsuper().execute(context)
[docs]classGKEDeleteCustomResourceOperator(KubernetesDeleteResourceOperator):""" Delete a resource in the specified Google Kubernetes Engine cluster. This Operator assumes that the system has gcloud installed and has configured a connection id with a service account. .. seealso:: For more detail about Kubernetes Engine authentication have a look at the reference: https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl#internal_ip .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKEDeleteCustomResourceOperator` :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides, e.g. 'us-central1-a' :param cluster_name: The name of the Google Kubernetes Engine cluster. :param use_internal_ip: Use the internal IP address as the endpoint. :param project_id: The Google Developers Console project id :param gcp_conn_id: The Google cloud connection id to use. This allows for users to specify a service account. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). """
def__init__(self,*,location:str,cluster_name:str,use_internal_ip:bool=False,project_id:str=PROVIDE_PROJECT_ID,gcp_conn_id:str="google_cloud_default",impersonation_chain:str|Sequence[str]|None=None,**kwargs,)->None:super().__init__(**kwargs)self.project_id=project_idself.location=locationself.cluster_name=cluster_nameself.gcp_conn_id=gcp_conn_idself.impersonation_chain=impersonation_chainself.use_internal_ip=use_internal_ipself._ssl_ca_cert:str|None=Noneself._cluster_url:str|None=Noneifself.gcp_conn_idisNone:raiseAirflowException("The gcp_conn_id parameter has become required. If you want to use Application Default ""Credentials (ADC) strategy for authorization, create an empty connection ""called `google_cloud_default`.",)# There is no need to manage the kube_config file, as it will be generated automatically.# All Kubernetes parameters (except config_file) are also valid for the GKEStartPodOperator.ifself.config_file:raiseAirflowException("config_file is not an allowed parameter for the GKEStartPodOperator.")@cached_property
[docs]defhook(self)->GKEKubernetesHook:ifself._cluster_urlisNoneorself._ssl_ca_certisNone:raiseAttributeError("Cluster url and ssl_ca_cert should be defined before using self.hook method. ""Try to use self.get_kube_creds method",)returnGKEKubernetesHook(gcp_conn_id=self.gcp_conn_id,cluster_url=self._cluster_url,ssl_ca_cert=self._ssl_ca_cert,impersonation_chain=self.impersonation_chain,)
[docs]defexecute(self,context:Context):"""Execute process of deleting Custom Resource."""self._cluster_url,self._ssl_ca_cert=GKEClusterAuthDetails(cluster_name=self.cluster_name,project_id=self.project_id,use_internal_ip=self.use_internal_ip,cluster_hook=self.cluster_hook,).fetch_cluster_info()returnsuper().execute(context)
[docs]classGKEStartKueueJobOperator(GKEStartJobOperator):""" Executes a Kubernetes Job in Kueue in the specified Google Kubernetes Engine cluster. :param queue_name: The name of the Queue in the cluster """def__init__(self,*,queue_name:str,**kwargs,)->None:super().__init__(**kwargs)self.queue_name=queue_nameifself.suspendisFalse:raiseAirflowException("The `suspend` parameter can't be False. If you want to use Kueue for running Job"" in a Kubernetes cluster, set the `suspend` parameter to True.",)elifself.suspendisNone:warnings.warn(f"You have not set parameter `suspend` in class {self.__class__.__name__}. ""For running a Job in Kueue the `suspend` parameter should set to True.",UserWarning,stacklevel=2,)self.suspend=Trueself.labels.update({"kueue.x-k8s.io/queue-name":queue_name})self.annotations.update({"kueue.x-k8s.io/queue-name":queue_name})
[docs]classGKEDeleteJobOperator(KubernetesDeleteJobOperator):""" Delete a Kubernetes job in the specified Google Kubernetes Engine cluster. This Operator assumes that the system has gcloud installed and has configured a connection id with a service account. The **minimum** required to define a cluster to create are the variables ``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``, ``namespace`` .. seealso:: For more detail about Kubernetes Engine authentication have a look at the reference: https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl#internal_ip .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKEDeleteJobOperator` :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides, e.g. 'us-central1-a' :param cluster_name: The name of the Google Kubernetes Engine cluster :param use_internal_ip: Use the internal IP address as the endpoint. :param project_id: The Google Developers Console project id :param gcp_conn_id: The Google cloud connection id to use. This allows for users to specify a service account. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). """
def__init__(self,*,location:str,cluster_name:str,use_internal_ip:bool=False,project_id:str=PROVIDE_PROJECT_ID,gcp_conn_id:str="google_cloud_default",impersonation_chain:str|Sequence[str]|None=None,**kwargs,)->None:super().__init__(**kwargs)self.project_id=project_idself.location=locationself.cluster_name=cluster_nameself.gcp_conn_id=gcp_conn_idself.impersonation_chain=impersonation_chainself.use_internal_ip=use_internal_ipself._ssl_ca_cert:str|None=Noneself._cluster_url:str|None=Noneifself.gcp_conn_idisNone:raiseAirflowException("The gcp_conn_id parameter has become required. If you want to use Application Default ""Credentials (ADC) strategy for authorization, create an empty connection ""called `google_cloud_default`.",)# There is no need to manage the kube_config file, as it will be generated automatically.# All Kubernetes parameters (except config_file) are also valid for the GKEDeleteJobOperator.ifself.config_file:raiseAirflowException("config_file is not an allowed parameter for the GKEDeleteJobOperator.")@cached_property
[docs]defhook(self)->GKEKubernetesHook:ifself._cluster_urlisNoneorself._ssl_ca_certisNone:raiseAttributeError("Cluster url and ssl_ca_cert should be defined before using self.hook method. ""Try to use self.get_kube_creds method",)returnGKEKubernetesHook(gcp_conn_id=self.gcp_conn_id,cluster_url=self._cluster_url,ssl_ca_cert=self._ssl_ca_cert,)
[docs]defexecute(self,context:Context):"""Execute process of deleting Job."""self._cluster_url,self._ssl_ca_cert=GKEClusterAuthDetails(cluster_name=self.cluster_name,project_id=self.project_id,use_internal_ip=self.use_internal_ip,cluster_hook=self.cluster_hook,).fetch_cluster_info()returnsuper().execute(context)
[docs]classGKESuspendJobOperator(GoogleCloudBaseOperator):""" Suspend Job by given name. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKESuspendJobOperator` :param name: The name of the Job to suspend :param project_id: The Google Developers Console project id. :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides. :param cluster_name: The name of the Google Kubernetes Engine cluster. :param namespace: The name of the Google Kubernetes Engine namespace. :param use_internal_ip: Use the internal IP address as the endpoint. :param gcp_conn_id: The connection ID to use connecting to Google Cloud. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). """
[docs]defexecute(self,context:Context)->None:self.job=self.hook.patch_namespaced_job(job_name=self.name,namespace=self.namespace,body={"spec":{"suspend":True}},)self.log.info("Job %s from cluster %s was suspended.",self.name,self.cluster_name,)KubernetesEngineJobLink.persist(context=context,task_instance=self)returnk8s.V1Job.to_dict(self.job)
[docs]classGKEResumeJobOperator(GoogleCloudBaseOperator):""" Resume Job by given name. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GKEResumeJobOperator` :param name: The name of the Job to resume :param project_id: The Google Developers Console project id. :param location: The name of the Google Kubernetes Engine zone or region in which the cluster resides. :param cluster_name: The name of the Google Kubernetes Engine cluster. :param namespace: The name of the Google Kubernetes Engine namespace. :param use_internal_ip: Use the internal IP address as the endpoint. :param gcp_conn_id: The connection ID to use connecting to Google Cloud. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). """
[docs]defexecute(self,context:Context)->None:self.job=self.hook.patch_namespaced_job(job_name=self.name,namespace=self.namespace,body={"spec":{"suspend":False}},)self.log.info("Job %s from cluster %s was resumed.",self.name,self.cluster_name,)KubernetesEngineJobLink.persist(context=context,task_instance=self)returnk8s.V1Job.to_dict(self.job)