Source code for tests.system.cncf.kubernetes.example_kubernetes_async
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This is an example dag for using the KubernetesPodOperator."""from__future__importannotationsimportosfromdatetimeimportdatetimefromkubernetes.clientimportmodelsask8sfromairflowimportDAGfromairflow.providers.cncf.kubernetes.operators.podimportKubernetesPodOperatorfromairflow.providers.cncf.kubernetes.secretimportSecretfromairflow.providers.standard.operators.bashimportBashOperator# [START howto_operator_k8s_cluster_resources]
# [START howto_operator_async_log]kubernetes_task_async_log=KubernetesPodOperator(task_id="kubernetes_task_async_log",namespace="kubernetes_task_async_log",in_cluster=False,name="astro_k8s_test_pod",image="ubuntu",cmds=["bash","-cx",("i=0; ""while [ $i -ne 100 ]; ""do i=$(($i+1)); ""echo $i; ""sleep 1; ""done; ""mkdir -p /airflow/xcom/; "'echo \'{"message": "good afternoon!"}\' > /airflow/xcom/return.json'),],do_xcom_push=True,deferrable=True,get_logs=True,logging_interval=5,)# [END howto_operator_async_log]# [START howto_operator_k8s_private_image_async]quay_k8s_async=KubernetesPodOperator(task_id="kubernetes_private_img_task_async",namespace="default",image="quay.io/apache/bash",image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],cmds=["bash","-cx"],arguments=["echo","10","echo pwd"],labels={"foo":"bar"},name="airflow-private-image-pod",on_finish_action="delete_pod",in_cluster=True,get_logs=True,deferrable=True,)# [END howto_operator_k8s_private_image_async]# [START howto_operator_k8s_write_xcom_async]write_xcom_async=KubernetesPodOperator(task_id="kubernetes_write_xcom_task_async",namespace="default",image="alpine",cmds=["sh","-c","mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],name="write-xcom",do_xcom_push=True,on_finish_action="delete_pod",in_cluster=True,get_logs=True,deferrable=True,)pod_task_xcom_result_async=BashOperator(task_id="pod_task_xcom_result_async",bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",)write_xcom_async>>pod_task_xcom_result_async# [END howto_operator_k8s_write_xcom_async]fromtests_common.test_utils.watcherimportwatcher# This test needs watcher in order to properly mark success/failure# when "tearDown" task with trigger rule is part of the DAGlist(dag.tasks)>>watcher()fromtests_common.test_utils.system_testsimportget_test_run# noqa: E402# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)