Source code for tests.system.cncf.kubernetes.example_kubernetes_kueue
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example Airflow DAG for Kubernetes Kueue operators."""from__future__importannotationsimportosfromdatetimeimportdatetimefromkubernetes.clientimportmodelsask8sfromairflowimportDAGfromairflow.providers.cncf.kubernetes.operators.kueueimport(KubernetesInstallKueueOperator,KubernetesStartKueueJobOperator,)fromairflow.providers.cncf.kubernetes.operators.resourceimportKubernetesCreateResourceOperator
[docs]local_conf=f"""apiVersion: kueue.x-k8s.io/v1beta1kind: LocalQueuemetadata: namespace: default # LocalQueue under team-a namespace name: {QUEUE_NAME}spec: clusterQueue: cluster-queue # Point to the ClusterQueue"""
withDAG(DAG_ID,schedule="@once",# Override to match your needsstart_date=datetime(2021,1,1),catchup=False,tags=["example","kubernetes","kueue"],)asdag:# [START howto_operator_k8s_kueue_install]
# [END howto_operator_k8s_kueue_install]create_resource_flavor=KubernetesCreateResourceOperator(task_id="create_resource_flavor",yaml_conf=flavor_conf,custom_resource_definition=True,namespaced=False,)create_cluster_queue=KubernetesCreateResourceOperator(task_id="create_cluster_queue",yaml_conf=cluster_conf,custom_resource_definition=True,namespaced=False,)create_local_queue=KubernetesCreateResourceOperator(task_id="create_local_queue",yaml_conf=local_conf,custom_resource_definition=True,)# [START howto_operator_k8s_install_kueue]start_kueue_job=KubernetesStartKueueJobOperator(task_id="kueue_job",queue_name=QUEUE_NAME,namespace="default",image="perl:5.34.0",cmds=["perl","-Mbignum=bpi","-wle","print bpi(2000)"],name="test-pi",suspend=True,container_resources=k8s.V1ResourceRequirements(requests={"cpu":1,"memory":"200Mi",},),wait_until_job_complete=True,)# [END howto_operator_k8s_install_kueue]install_kueue>>create_resource_flavor>>create_cluster_queue>>create_local_queue>>start_kueue_jobfromtests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "teardown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)