Source code for airflow.example_dags.example_setup_teardown_taskflow
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of setup and teardown tasks."""
from __future__ import annotations
import pendulum
from airflow.decorators import setup, task, task_group, teardown
from airflow.models.dag import DAG
with DAG(
    dag_id="example_setup_teardown_taskflow",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    @task
[docs]    def my_first_task():
        print("Hello 1") 
    @task
    def my_second_task():
        print("Hello 2")
    @task
    def my_third_task():
        print("Hello 3")
    # you can set setup / teardown relationships with the `as_teardown` method.
    task_1 = my_first_task()
    task_2 = my_second_task()
    task_3 = my_third_task()
    task_1 >> task_2 >> task_3.as_teardown(setups=task_1)
    # The method `as_teardown` will mark task_3 as teardown, task_1 as setup, and
    # arrow task_1 >> task_3.
    # Now if you clear task_2, then its setup task, task_1, will be cleared in
    # addition to its teardown task, task_3
    # it's also possible to use a decorator to mark a task as setup or
    # teardown when you define it. see below.
    @setup
    def outer_setup():
        print("I am outer_setup")
        return "some cluster id"
    @teardown
    def outer_teardown(cluster_id):
        print("I am outer_teardown")
        print(f"Tearing down cluster: {cluster_id}")
    @task
    def outer_work():
        print("I am just a normal task")
    @task_group
    def section_1():
        @setup
        def inner_setup():
            print("I set up")
            return "some_cluster_id"
        @task
        def inner_work(cluster_id):
            print(f"doing some work with {cluster_id=}")
        @teardown
        def inner_teardown(cluster_id):
            print(f"tearing down {cluster_id=}")
        # this passes the return value of `inner_setup` to both `inner_work` and `inner_teardown`
        inner_setup_task = inner_setup()
        inner_work(inner_setup_task) >> inner_teardown(inner_setup_task)
    # by using the decorators, outer_setup and outer_teardown are already marked as setup / teardown
    # now we just need to make sure they are linked directly.  At a low level, what we need
    # to do so is the following::
    #     s = outer_setup()
    #     t = outer_teardown()
    #     s >> t
    #     s >> outer_work() >> t
    # Thus, s and t are linked directly, and outer_work runs in between.  We can take advantage of
    # the fact that we are in taskflow, along with the context manager on teardowns, as follows:
    with outer_teardown(outer_setup()):
        outer_work()
        # and let's put section 1 inside the outer setup and teardown tasks
        section_1()