Priority Weights

priority_weight defines priorities in the executor queue. The default priority_weight is 1, and can be bumped to any integer. Moreover, each task has a true priority_weight that is calculated based on its weight_rule which defines the weighting method used for the effective total priority weight of the task.

Below are the weighting methods. By default, Airflow’s weighting method is downstream.

downstream

The effective weight of the task is the aggregate sum of all downstream descendants. As a result, upstream tasks will have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple DAG run instances and desire to have all upstream tasks to complete for all runs before each DAG can continue processing downstream tasks.

upstream

The effective weight is the aggregate sum of all upstream ancestors. This is the opposite where downstream tasks have higher weight and will be scheduled more aggressively when using positive weight values. This is useful when you have multiple DAG run instances and prefer to have each DAG complete before starting upstream tasks of other DAG runs.

absolute

The effective weight is the exact priority_weight specified without additional weighting. You may want to do this when you know exactly what priority weight each task should have. Additionally, when set to absolute, there is bonus effect of significantly speeding up the task creation process as for very large DAGs.

The priority_weight parameter can be used in conjunction with Pools.

Custom Weight Rule

New in version 2.9.0.

You can implement your own custom weighting method by extending the PriorityWeightStrategy class and registering it in a plugin.

airflow/example_dags/plugins/decreasing_priority_weight_strategy.py[source]

class DecreasingPriorityStrategy(PriorityWeightStrategy):
    """A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""

    def get_weight(self, ti: TaskInstance):
        return max(3 - ti._try_number + 1, 1)


class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
    name = "decreasing_priority_weight_strategy_plugin"
    priority_weight_strategies = [DecreasingPriorityStrategy]


Then to use it, you can create an instance of the custom class and provide it in the weight_rule parameter of the task or provide the path of the custom class:

from custom_weight_rule_module import CustomPriorityWeightStrategy

# provide the class instance
task1 = BashOperator(task_id="task", bash_command="echo 1", weight_rule=CustomPriorityWeightStrategy())

# or provide the path of the class
task1 = BashOperator(
    task_id="task",
    bash_command="echo 1",
    weight_rule="custom_weight_rule_module.CustomPriorityWeightStrategy",
)

This is an experimental feature.

Was this entry helpful?