Source code for airflow.example_dags.example_setup_teardown_taskflow

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of setup and teardown tasks."""

from __future__ import annotations

import pendulum

from airflow.decorators import setup, task, task_group, teardown
from airflow.models.dag import DAG

with DAG(
    dag_id="example_setup_teardown_taskflow",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task
[docs] def my_first_task(): print("Hello 1")
@task def my_second_task(): print("Hello 2") @task def my_third_task(): print("Hello 3") # you can set setup / teardown relationships with the `as_teardown` method. task_1 = my_first_task() task_2 = my_second_task() task_3 = my_third_task() task_1 >> task_2 >> task_3.as_teardown(setups=task_1) # The method `as_teardown` will mark task_3 as teardown, task_1 as setup, and # arrow task_1 >> task_3. # Now if you clear task_2, then its setup task, task_1, will be cleared in # addition to its teardown task, task_3 # it's also possible to use a decorator to mark a task as setup or # teardown when you define it. see below. @setup def outer_setup(): print("I am outer_setup") return "some cluster id" @teardown def outer_teardown(cluster_id): print("I am outer_teardown") print(f"Tearing down cluster: {cluster_id}") @task def outer_work(): print("I am just a normal task") @task_group def section_1(): @setup def inner_setup(): print("I set up") return "some_cluster_id" @task def inner_work(cluster_id): print(f"doing some work with {cluster_id=}") @teardown def inner_teardown(cluster_id): print(f"tearing down {cluster_id=}") # this passes the return value of `inner_setup` to both `inner_work` and `inner_teardown` inner_setup_task = inner_setup() inner_work(inner_setup_task) >> inner_teardown(inner_setup_task) # by using the decorators, outer_setup and outer_teardown are already marked as setup / teardown # now we just need to make sure they are linked directly. At a low level, what we need # to do so is the following:: # s = outer_setup() # t = outer_teardown() # s >> t # s >> outer_work() >> t # Thus, s and t are linked directly, and outer_work runs in between. We can take advantage of # the fact that we are in taskflow, along with the context manager on teardowns, as follows: with outer_teardown(outer_setup()): outer_work() # and let's put section 1 inside the outer setup and teardown tasks section_1()

Was this entry helpful?