Source code for airflow.example_dags.example_kubernetes_executor
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This is an example dag for using a Kubernetes Executor Configuration."""from__future__importannotationsimportloggingimportosimportpendulumfromairflowimportDAGfromairflow.configurationimportconffromairflow.decoratorsimporttaskfromairflow.example_dags.libs.helperimportprint_stuff
try:fromkubernetes.clientimportmodelsask8sexceptImportError:log.warning("The example_kubernetes_executor example DAG requires the kubernetes provider."" Please install it with: pip install apache-airflow[cncf.kubernetes]")
ifk8s:withDAG(dag_id="example_kubernetes_executor",schedule=None,start_date=pendulum.datetime(2021,1,1,tz="UTC"),catchup=False,tags=["example3"],)asdag:# You can use annotations on your kubernetes pods!
@task(executor_config=start_task_executor_config)defstart_task():print_stuff()# [START task_with_volume]executor_config_volume_mount={"pod_override":k8s.V1Pod(spec=k8s.V1PodSpec(containers=[k8s.V1Container(name="base",volume_mounts=[k8s.V1VolumeMount(mount_path="/foo/",name="example-kubernetes-test-volume")],)],volumes=[k8s.V1Volume(name="example-kubernetes-test-volume",host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),)],)),}@task(executor_config=executor_config_volume_mount)deftest_volume_mount():""" Tests whether the volume has been mounted. """withopen("/foo/volume_mount_test.txt","w")asfoo:foo.write("Hello")return_code=os.system("cat /foo/volume_mount_test.txt")ifreturn_code!=0:raiseValueError(f"Error when checking volume mount. Return code {return_code}")volume_task=test_volume_mount()# [END task_with_volume]# [START task_with_sidecar]executor_config_sidecar={"pod_override":k8s.V1Pod(spec=k8s.V1PodSpec(containers=[k8s.V1Container(name="base",volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/",name="shared-empty-dir")],),k8s.V1Container(name="sidecar",image="ubuntu",args=['echo "retrieved from mount" > /shared/test.txt'],command=["bash","-cx"],volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/",name="shared-empty-dir")],),],volumes=[k8s.V1Volume(name="shared-empty-dir",empty_dir=k8s.V1EmptyDirVolumeSource()),],)),}@task(executor_config=executor_config_sidecar)deftest_sharedvolume_mount():""" Tests whether the volume has been mounted. """foriinrange(5):try:return_code=os.system("cat /shared/test.txt")ifreturn_code!=0:raiseValueError(f"Error when checking volume mount. Return code {return_code}")exceptValueErrorase:ifi>4:raiseesidecar_task=test_sharedvolume_mount()# [END task_with_sidecar]# You can add labels to podsexecutor_config_non_root={"pod_override":k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release":"stable"}))}@task(executor_config=executor_config_non_root)defnon_root_task():print_stuff()third_task=non_root_task()executor_config_other_ns={"pod_override":k8s.V1Pod(metadata=k8s.V1ObjectMeta(namespace="test-namespace",labels={"release":"stable"}))}@task(executor_config=executor_config_other_ns)defother_namespace_task():print_stuff()other_ns_task=other_namespace_task()worker_container_repository=conf.get("kubernetes_executor","worker_container_repository")worker_container_tag=conf.get("kubernetes_executor","worker_container_tag")# You can also change the base image, here we used the worker image for demonstration.# Note that the image must have the same configuration as the# worker image. Could be that you want to run this task in a special docker image that has a zip# library built-in. You build the special docker image on top your worker image.kube_exec_config_special={"pod_override":k8s.V1Pod(spec=k8s.V1PodSpec(containers=[k8s.V1Container(name="base",image=f"{worker_container_repository}:{worker_container_tag}"),]))}@task(executor_config=kube_exec_config_special)defbase_image_override_task():print_stuff()base_image_task=base_image_override_task()# Use k8s_client.V1Affinity to define node affinityk8s_affinity=k8s.V1Affinity(pod_anti_affinity=k8s.V1PodAntiAffinity(required_during_scheduling_ignored_during_execution=[k8s.V1PodAffinityTerm(label_selector=k8s.V1LabelSelector(match_expressions=[k8s.V1LabelSelectorRequirement(key="app",operator="In",values=["airflow"])]),topology_key="kubernetes.io/hostname",)]))# Use k8s_client.V1Toleration to define node tolerationsk8s_tolerations=[k8s.V1Toleration(key="dedicated",operator="Equal",value="airflow")]# Use k8s_client.V1ResourceRequirements to define resource limitsk8s_resource_requirements=k8s.V1ResourceRequirements(requests={"memory":"512Mi"},limits={"memory":"512Mi"})kube_exec_config_resource_limits={"pod_override":k8s.V1Pod(spec=k8s.V1PodSpec(containers=[k8s.V1Container(name="base",resources=k8s_resource_requirements,)],affinity=k8s_affinity,tolerations=k8s_tolerations,))}@task(executor_config=kube_exec_config_resource_limits)deftask_with_resource_limits():print_stuff()four_task=task_with_resource_limits()(start_task()>>[volume_task,other_ns_task,sidecar_task]>>third_task>>[base_image_task,four_task])