Google Kubernetes Engine Operators¶
Google Kubernetes Engine (GKE) provides a managed environment for deploying, managing, and scaling your containerized applications using Google infrastructure. The GKE environment consists of multiple machines (specifically, Compute Engine instances) grouped together to form a cluster.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Manage GKE cluster¶
A cluster is the foundation of GKE - all workloads run on top of the cluster. It is made up on a cluster master and worker nodes. The lifecycle of the master is managed by GKE when creating or deleting a cluster. The worker nodes are represented as Compute Engine VM instances that GKE creates on your behalf when creating a cluster.
Create GKE cluster¶
Here is an example of a cluster definition:
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}
A dict object like this, or a
Cluster
definition, is required when creating a cluster with
GKECreateClusterOperator
.
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
)
You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
deferrable=True,
)
Delete GKE cluster¶
To delete a cluster, use
GKEDeleteClusterOperator
.
This would also delete all the nodes allocated to the cluster.
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
)
You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
deferrable=True,
)
Manage workloads on a GKE cluster¶
GKE works with containerized applications, such as those created on Docker, and deploys them to run on the cluster. These are called workloads, and when deployed on the cluster they leverage the CPU and memory resources of the cluster to run effectively.
Run a Pod on a GKE cluster¶
There are two operators available in order to run a pod on a GKE cluster:
GKEStartPodOperator
extends KubernetesPodOperator
to provide authorization using Google Cloud credentials.
There is no need to manage the kube_config
file, as it will be generated automatically.
All Kubernetes parameters (except config_file
) are also valid for the GKEStartPodOperator
.
For more information on KubernetesPodOperator
, please look at: KubernetesPodOperator guide.
Using with Private cluster¶
All clusters have a canonical endpoint. The endpoint is the IP address of the Kubernetes API server that
Airflow use to communicate with your cluster master. The endpoint is displayed in Cloud Console under the Endpoints field of the cluster’s Details tab, and in the
output of gcloud container clusters describe
in the endpoint field.
Private clusters have two unique endpoint values: privateEndpoint
, which is an internal IP address, and
publicEndpoint
, which is an external one. Running GKEStartPodOperator
against a private cluster
sets the external IP address as the endpoint by default. If you prefer to use the internal IP as the
endpoint, you need to set use_internal_ip
parameter to True
.
Using with Autopilot (serverless) cluster¶
When running on serverless cluster like GKE Autopilot, the pod startup can sometimes take longer due to cold start.
During the pod startup, the status is checked in regular short intervals and warning messages are emitted if the pod
has not yet started. You can increase this interval length via the startup_check_interval_seconds
parameter, with
recommendation of 60 seconds.
Use of XCom¶
We can enable the usage of XCom on the operator. This works by launching a sidecar container
with the pod specified. The sidecar is automatically mounted when the XCom usage is specified and its mount point
is the path /airflow/xcom
. To provide values to the XCom, ensure your Pod writes it into a file called
return.json
in the sidecar. The contents of this can then be used downstream in your DAG.
Here is an example of it being used:
pod_task_xcom = GKEStartPodOperator(
task_id="pod_task_xcom",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
do_xcom_push=True,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom",
in_cluster=False,
on_finish_action="delete_pod",
)
And then use it in other operators:
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:
pod_task_xcom_async = GKEStartPodOperator(
task_id="pod_task_xcom_async",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom-async",
in_cluster=False,
on_finish_action="delete_pod",
do_xcom_push=True,
deferrable=True,
get_logs=True,
)
Reference¶
For further information, look at: