# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromfunctoolsimportwrapsfromtypingimportTYPE_CHECKING,Any,Callable,TypeVarfromairflow.decorators.baseimportTask,_TaskDecoratorfromairflow.exceptionsimportAirflowSkipExceptionifTYPE_CHECKING:fromtyping_extensionsimportTypeAliasfromairflow.models.baseoperatorimportTaskPreExecuteHookfromairflow.utils.contextimportContextBoolConditionFunc:TypeAlias=Callable[[Context],bool]MsgConditionFunc:TypeAlias="Callable[[Context], tuple[bool, str | None]]"AnyConditionFunc:TypeAlias="BoolConditionFunc | MsgConditionFunc"__all__=["run_if","skip_if"]_T=TypeVar("_T",bound="Task[..., Any] | _TaskDecorator[..., Any, Any]")
[docs]defrun_if(condition:AnyConditionFunc,skip_message:str|None=None)->Callable[[_T],_T]:""" Decorate a task to run only if a condition is met. :param condition: A function that takes a context and returns a boolean. :param skip_message: The message to log if the task is skipped. If None, a default message is used. """wrapped_condition=wrap_skip(condition,skip_messageor"Task was skipped due to condition.",reverse=True)defdecorator(task:_T)->_T:ifnotisinstance(task,_TaskDecorator):error_msg="run_if can only be used with task. decorate with @task before @run_if."raiseTypeError(error_msg)pre_execute:TaskPreExecuteHook|None=task.kwargs.get("pre_execute")new_pre_execute=combine_hooks(pre_execute,wrapped_condition)task.kwargs["pre_execute"]=new_pre_executereturntask# type: ignore[return-value]returndecorator
[docs]defskip_if(condition:AnyConditionFunc,skip_message:str|None=None)->Callable[[_T],_T]:""" Decorate a task to skip if a condition is met. :param condition: A function that takes a context and returns a boolean. :param skip_message: The message to log if the task is skipped. If None, a default message is used. """wrapped_condition=wrap_skip(condition,skip_messageor"Task was skipped due to condition.",reverse=False)defdecorator(task:_T)->_T:ifnotisinstance(task,_TaskDecorator):error_msg="skip_if can only be used with task. decorate with @task before @skip_if."raiseTypeError(error_msg)pre_execute:TaskPreExecuteHook|None=task.kwargs.get("pre_execute")new_pre_execute=combine_hooks(pre_execute,wrapped_condition)task.kwargs["pre_execute"]=new_pre_executereturntask# type: ignore[return-value]returndecorator