# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttypesfromtypingimportTYPE_CHECKING,Callablefromairflow.decoratorsimportpython_taskfromairflow.decorators.task_groupimport_TaskGroupFactoryfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.utils.setup_teardownimportSetupTeardownContextifTYPE_CHECKING:fromairflowimportXComArg
[docs]defsetup_task(func:Callable)->Callable:# Using FunctionType here since _TaskDecorator is also a callableifisinstance(func,types.FunctionType):func=python_task(func)ifisinstance(func,_TaskGroupFactory):raiseAirflowException("Task groups cannot be marked as setup or teardown.")func.is_setup=True# type: ignore[attr-defined]returnfunc
[docs]defteardown_task(_func=None,*,on_failure_fail_dagrun:bool=False)->Callable:defteardown(func:Callable)->Callable:# Using FunctionType here since _TaskDecorator is also a callableifisinstance(func,types.FunctionType):func=python_task(func)ifisinstance(func,_TaskGroupFactory):raiseAirflowException("Task groups cannot be marked as setup or teardown.")func.is_teardown=True# type: ignore[attr-defined]func.on_failure_fail_dagrun=on_failure_fail_dagrun# type: ignore[attr-defined]returnfuncif_funcisNone:returnteardownreturnteardown(_func)
[docs]classContextWrapper(list):"""A list subclass that has a context manager that pushes setup/teardown tasks to the context."""def__init__(self,tasks:list[BaseOperator|XComArg]):self.tasks=taskssuper().__init__(tasks)
[docs]def__enter__(self):operators=[]fortaskinself.tasks:ifisinstance(task,BaseOperator):operators.append(task)ifnottask.is_setupandnottask.is_teardown:raiseAirflowException("Only setup/teardown tasks can be used as context managers.")elifnottask.operator.is_setupandnottask.operator.is_teardown:raiseAirflowException("Only setup/teardown tasks can be used as context managers.")ifnotoperators:# means we have XComArgsoperators=[task.operatorfortaskinself.tasks]SetupTeardownContext.push_setup_teardown_task(operators)returnSetupTeardownContext