Source code for airflow.example_dags.example_kubernetes_executor_config
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This is an example dag for using a Kubernetes Executor Configuration."""importloggingimportosfromdatetimeimportdatetimefromairflowimportDAGfromairflow.example_dags.libs.helperimportprint_stufffromairflow.operators.pythonimportPythonOperatorfromairflow.settingsimportAIRFLOW_HOMElog=logging.getLogger(__name__)try:fromkubernetes.clientimportmodelsask8swithDAG(dag_id='example_kubernetes_executor_config',schedule_interval=None,start_date=datetime(2021,1,1),catchup=False,tags=['example3'],)asdag:deftest_sharedvolume_mount():""" Tests whether the volume has been mounted. """foriinrange(5):try:return_code=os.system("cat /shared/test.txt")ifreturn_code!=0:raiseValueError(f"Error when checking volume mount. Return code {return_code}")exceptValueErrorase:ifi>4:raiseedeftest_volume_mount():""" Tests whether the volume has been mounted. """withopen('/foo/volume_mount_test.txt','w')asfoo:foo.write('Hello')return_code=os.system("cat /foo/volume_mount_test.txt")ifreturn_code!=0:raiseValueError(f"Error when checking volume mount. Return code {return_code}")# You can use annotations on your kubernetes pods!start_task=PythonOperator(task_id="start_task",python_callable=print_stuff,executor_config={"pod_override":k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test":"annotation"}))},)# [START task_with_volume]volume_task=PythonOperator(task_id="task_with_volume",python_callable=test_volume_mount,executor_config={"pod_override":k8s.V1Pod(spec=k8s.V1PodSpec(containers=[k8s.V1Container(name="base",volume_mounts=[k8s.V1VolumeMount(mount_path="/foo/",name="example-kubernetes-test-volume")],)],volumes=[k8s.V1Volume(name="example-kubernetes-test-volume",host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),)],)),},)# [END task_with_volume]# [START task_with_template]task_with_template=PythonOperator(task_id="task_with_template",python_callable=print_stuff,executor_config={"pod_template_file":os.path.join(AIRFLOW_HOME,"pod_templates/basic_template.yaml"),"pod_override":k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release":"stable"})),},)# [END task_with_template]# [START task_with_sidecar]sidecar_task=PythonOperator(task_id="task_with_sidecar",python_callable=test_sharedvolume_mount,executor_config={"pod_override":k8s.V1Pod(spec=k8s.V1PodSpec(containers=[k8s.V1Container(name="base",volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/",name="shared-empty-dir")],),k8s.V1Container(name="sidecar",image="ubuntu",args=["echo \"retrieved from mount\" > /shared/test.txt"],command=["bash","-cx"],volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/",name="shared-empty-dir")],),],volumes=[k8s.V1Volume(name="shared-empty-dir",empty_dir=k8s.V1EmptyDirVolumeSource()),],)),},)# [END task_with_sidecar]# Test that we can add labels to podsthird_task=PythonOperator(task_id="non_root_task",python_callable=print_stuff,executor_config={"pod_override":k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release":"stable"}))},)other_ns_task=PythonOperator(task_id="other_namespace_task",python_callable=print_stuff,executor_config={"KubernetesExecutor":{"namespace":"test-namespace","labels":{"release":"stable"}}},)start_task>>volume_task>>third_taskstart_task>>other_ns_taskstart_task>>sidecar_taskstart_task>>task_with_templateexceptImportErrorase:log.warning("Could not import DAGs in example_kubernetes_executor_config.py: %s",str(e))log.warning("Install kubernetes dependencies with: pip install apache-airflow['cncf.kubernetes']")