Priority Weights¶
priority_weight defines priorities in the executor queue. The default priority_weight is 1, and can be
bumped to any integer. Moreover, each task has a true priority_weight that is calculated based on its
weight_rule which defines the weighting method used for the effective total priority weight of the task.
Below are the weighting methods. By default, Airflow’s weighting method is downstream.
downstreamThe effective weight of the task is the aggregate sum of all downstream descendants. As a result, upstream tasks will have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple DAG run instances and desire to have all upstream tasks to complete for all runs before each DAG can continue processing downstream tasks.
upstreamThe effective weight is the aggregate sum of all upstream ancestors. This is the opposite where downstream tasks have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple DAG run instances and prefer to have each DAG complete before starting upstream tasks of other DAG runs.
absoluteThe effective weight is the exact priority_weight specified
without additional weighting. You may want to do this when you
know exactly what priority weight each task should have.
Additionally, when set to absolute, there is bonus effect of
significantly speeding up the task creation process as for very
large DAGs.
The priority_weight parameter can be used in conjunction with Pools.
Custom Weight Rule¶
New in version 2.9.0.
You can implement your own custom weighting method by extending the PriorityWeightStrategy class and
registering it in a plugin.
class DecreasingPriorityStrategy(PriorityWeightStrategy):
"""A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""
def get_weight(self, ti: TaskInstance):
return max(3 - ti._try_number + 1, 1)
class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
name = "decreasing_priority_weight_strategy_plugin"
priority_weight_strategies = [DecreasingPriorityStrategy]
Then to use it, you can create an instance of the custom class and provide it in the weight_rule parameter
of the task or provide the path of the custom class:
from custom_weight_rule_module import CustomPriorityWeightStrategy
# provide the class instance
task1 = BashOperator(task_id="task", bash_command="echo 1", weight_rule=CustomPriorityWeightStrategy())
# or provide the path of the class
task1 = BashOperator(
task_id="task",
bash_command="echo 1",
weight_rule="custom_weight_rule_module.CustomPriorityWeightStrategy",
)
This is an experimental feature.