Priority Weights¶
priority_weight
defines priorities in the executor queue. The default priority_weight
is 1
, and can be
bumped to any integer. Moreover, each task has a true priority_weight
that is calculated based on its
weight_rule
which defines the weighting method used for the effective total priority weight of the task.
Below are the weighting methods. By default, Airflow’s weighting method is downstream
.
downstream
The effective weight of the task is the aggregate sum of all downstream descendants. As a result, upstream tasks will have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple DAG run instances and desire to have all upstream tasks to complete for all runs before each DAG can continue processing downstream tasks.
upstream
The effective weight is the aggregate sum of all upstream ancestors. This is the opposite where downstream tasks have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple DAG run instances and prefer to have each DAG complete before starting upstream tasks of other DAG runs.
absolute
The effective weight is the exact priority_weight
specified
without additional weighting. You may want to do this when you
know exactly what priority weight each task should have.
Additionally, when set to absolute
, there is bonus effect of
significantly speeding up the task creation process as for very
large DAGs.
The priority_weight
parameter can be used in conjunction with Pools.
Custom Weight Rule¶
New in version 2.9.0.
You can implement your own custom weighting method by extending the PriorityWeightStrategy
class and
registering it in a plugin.
class DecreasingPriorityStrategy(PriorityWeightStrategy):
"""A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""
def get_weight(self, ti: TaskInstance):
return max(3 - ti._try_number + 1, 1)
class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
name = "decreasing_priority_weight_strategy_plugin"
priority_weight_strategies = [DecreasingPriorityStrategy]
Then to use it, you can create an instance of the custom class and provide it in the weight_rule
parameter
of the task or provide the path of the custom class:
from custom_weight_rule_module import CustomPriorityWeightStrategy
# provide the class instance
task1 = BashOperator(task_id="task", bash_command="echo 1", weight_rule=CustomPriorityWeightStrategy())
# or provide the path of the class
task1 = BashOperator(
task_id="task",
bash_command="echo 1",
weight_rule="custom_weight_rule_module.CustomPriorityWeightStrategy",
)
This is an experimental feature.