Google Kubernetes Engine Operators¶
Google Kubernetes Engine (GKE) provides a managed environment for deploying, managing, and scaling your containerized applications using Google infrastructure. The GKE environment consists of multiple machines (specifically, Compute Engine instances) grouped together to form a cluster.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Manage GKE cluster¶
A cluster is the foundation of GKE - all workloads run on top of the cluster. It is made up on a cluster master and worker nodes. The lifecycle of the master is managed by GKE when creating or deleting a cluster. The worker nodes are represented as Compute Engine VM instances that GKE creates on your behalf when creating a cluster.
Create GKE cluster¶
Here is an example of a cluster definition:
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1, "autopilot": {"enabled": True}}
A dict object like this, or a
Cluster
definition, is required when creating a cluster with
GKECreateClusterOperator
.
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
)
You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
deferrable=True,
)
Install Kueue of specific version inside Cluster¶
Kueue is a Cloud Native Job scheduler that works with the default Kubernetes scheduler, the Job controller,
and the cluster autoscaler to provide an end-to-end batch system. Kueue implements Job queueing, deciding when
Jobs should wait and when they should start, based on quotas and a hierarchy for sharing resources fairly among teams.
Kueue supports Autopilot clusters, Standard GKE with Node Auto-provisioning and regular autoscaled node pools.
To install and use Kueue on your cluster with the help of
GKEStartKueueInsideClusterOperator
as shown in this example:
add_kueue_cluster = GKEStartKueueInsideClusterOperator(
task_id="add_kueue_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
kueue_version="v0.6.2",
)
Delete GKE cluster¶
To delete a cluster, use
GKEDeleteClusterOperator
.
This would also delete all the nodes allocated to the cluster.
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
)
You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
deferrable=True,
)
Manage workloads on a GKE cluster¶
GKE works with containerized applications, such as those created on Docker, and deploys them to run on the cluster. These are called workloads, and when deployed on the cluster they leverage the CPU and memory resources of the cluster to run effectively.
Run a Pod on a GKE cluster¶
There are two operators available in order to run a pod on a GKE cluster:
GKEStartPodOperator
extends KubernetesPodOperator
to provide authorization using Google Cloud credentials.
There is no need to manage the kube_config
file, as it will be generated automatically.
All Kubernetes parameters (except config_file
) are also valid for the GKEStartPodOperator
.
For more information on KubernetesPodOperator
, please look at: KubernetesPodOperator guide.
Using with Private cluster¶
All clusters have a canonical endpoint. The endpoint is the IP address of the Kubernetes API server that
Airflow use to communicate with your cluster master. The endpoint is displayed in Cloud Console under the Endpoints field of the cluster’s Details tab, and in the
output of gcloud container clusters describe
in the endpoint field.
Private clusters have two unique endpoint values: privateEndpoint
, which is an internal IP address, and
publicEndpoint
, which is an external one. Running GKEStartPodOperator
against a private cluster
sets the external IP address as the endpoint by default. If you prefer to use the internal IP as the
endpoint, you need to set use_internal_ip
parameter to True
.
Using with Autopilot (serverless) cluster¶
When running on serverless cluster like GKE Autopilot, the pod startup can sometimes take longer due to cold start.
During the pod startup, the status is checked in regular short intervals and warning messages are emitted if the pod
has not yet started. You can increase this interval length via the startup_check_interval_seconds
parameter, with
recommendation of 60 seconds.
Use of XCom¶
We can enable the usage of XCom on the operator. This works by launching a sidecar container
with the pod specified. The sidecar is automatically mounted when the XCom usage is specified and its mount point
is the path /airflow/xcom
. To provide values to the XCom, ensure your Pod writes it into a file called
return.json
in the sidecar. The contents of this can then be used downstream in your DAG.
Here is an example of it being used:
pod_task_xcom = GKEStartPodOperator(
task_id="pod_task_xcom",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
do_xcom_push=True,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom",
in_cluster=False,
on_finish_action="delete_pod",
)
And then use it in other operators:
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:
pod_task_xcom_async = GKEStartPodOperator(
task_id="pod_task_xcom_async",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom-async",
in_cluster=False,
on_finish_action="delete_pod",
do_xcom_push=True,
deferrable=True,
get_logs=True,
)
Run a Job on a GKE cluster¶
There are two operators available in order to run a job on a GKE cluster:
GKEStartJobOperator
extends KubernetesJobOperator
to provide authorization using Google Cloud credentials.
There is no need to manage the kube_config
file, as it will be generated automatically.
All Kubernetes parameters (except config_file
) are also valid for the GKEStartJobOperator
.
job_task = GKEStartJobOperator(
task_id="job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME,
)
GKEStartJobOperator
also supports deferrable mode. Note that it makes sense only if the wait_until_job_complete
parameter is set True
.
job_task_def = GKEStartJobOperator(
task_id="job_task_def",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME_DEF,
wait_until_job_complete=True,
deferrable=True,
)
For run Job on a GKE cluster with Kueue enabled use GKEStartKueueJobOperator
.
kueue_job_task = GKEStartKueueJobOperator(
task_id="kueue_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
queue_name=QUEUE_NAME,
namespace="default",
parallelism=3,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name="test-pi",
suspend=True,
container_resources=k8s.V1ResourceRequirements(
requests={
"cpu": 1,
"memory": "200Mi",
},
),
)
Delete a Job on a GKE cluster¶
There are two operators available in order to delete a job on a GKE cluster:
GKEDeleteJobOperator
extends KubernetesDeleteJobOperator
to provide authorization using Google Cloud credentials.
There is no need to manage the kube_config
file, as it will be generated automatically.
All Kubernetes parameters (except config_file
) are also valid for the GKEDeleteJobOperator
.
delete_job = GKEDeleteJobOperator(
task_id="delete_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=JOB_NAME,
namespace=JOB_NAMESPACE,
)
Retrieve information about Job by given name¶
You can use GKEDescribeJobOperator
to retrieve
detailed description of existing Job by providing its name and namespace.
describe_job_task = GKEDescribeJobOperator(
task_id="describe_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
job_name=job_task.output["job_name"],
namespace="default",
cluster_name=CLUSTER_NAME,
)
Retrieve list of Jobs¶
You can use GKEListJobsOperator
to retrieve
list of existing Jobs. If namespace
parameter is provided, output will include Jobs across given namespace.
If namespace
parameter is not specified, the information across all the namespaces will be outputted.
list_job_task = GKEListJobsOperator(
task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)
Create a resource in a GKE cluster¶
You can use GKECreateCustomResourceOperator
to
create resource in the specified Google Kubernetes Engine cluster.
create_resource_task = GKECreateCustomResourceOperator(
task_id="create_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
Delete a resource in a GKE cluster¶
You can use GKEDeleteCustomResourceOperator
to
delete resource in the specified Google Kubernetes Engine cluster.
delete_resource_task = GKEDeleteCustomResourceOperator(
task_id="delete_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
Suspend a Job on a GKE cluster¶
You can use GKESuspendJobOperator
to
suspend Job in the specified Google Kubernetes Engine cluster.
suspend_job = GKESuspendJobOperator(
task_id="suspend_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)
Resume a Job on a GKE cluster¶
You can use GKEResumeJobOperator
to
resume Job in the specified Google Kubernetes Engine cluster.
resume_job = GKEResumeJobOperator(
task_id="resume_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)
Reference¶
For further information, look at: