Cluster Policies¶
If you want to check or mutate DAGs or Tasks on a cluster-wide level, then a Cluster Policy will let you do that. They have three main purposes:
Checking that DAGs/Tasks meet a certain standard
Setting default arguments on DAGs/Tasks
Performing custom routing logic
There are three types of cluster policy:
dag_policy
: Takes aDAG
parameter calleddag
. Runs at load time.task_policy
: Takes aBaseOperator
parameter calledtask
. Runs at load time.task_instance_mutation_hook
: Takes aTaskInstance
parameter calledtask_instance
. Called right before task execution.
The DAG and Task cluster policies can raise the AirflowClusterPolicyViolation
exception to indicate that the dag/task they were passed is not compliant and should not be loaded.
Any extra attributes set by a cluster policy take priority over those defined in your DAG file; for example, if you set an sla
on your Task in the DAG file, and then your cluster policy also sets an sla
, the cluster policy's value will take precedence.
To configure cluster policies, you should create an airflow_local_settings.py
file in either the config
folder under your $AIRFLOW_HOME
, or place it on the $PYTHONPATH
, and then add callables to the file matching one or more of the cluster policy names above (e.g. dag_policy
)
Examples¶
DAG policies¶
This policy checks if each DAG has at least one tag defined:
Note
To avoid import cycles, if you use DAG
in type annotations in your cluster policy, be sure to import from airflow.models
and not from airflow
.
Task policies¶
Here's an example of enforcing a maximum timeout policy on every task:
You could also implement to protect against common errors, rather than as technical security controls. For example, don't run tasks without airflow owners:
If you have multiple checks to apply, it is best practice to curate these rules in a separate python module and have a single policy / task mutation hook that performs multiple of these custom checks and aggregates the various error messages so that a single AirflowClusterPolicyViolation
can be reported in the UI (and import errors table in the database).
For example, your airflow_local_settings.py
might follow this pattern:
TASK_RULES: List[Callable[[BaseOperator], None]] = [
task_must_have_owners,
]
def _check_task_rules(current_task: BaseOperator):
"""Check task rules for given task."""
notices = []
for rule in TASK_RULES:
try:
rule(current_task)
except AirflowClusterPolicyViolation as ex:
notices.append(str(ex))
if notices:
notices_list = " * " + "\n * ".join(notices)
raise AirflowClusterPolicyViolation(
f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.filepath}):\n"
f"Notices:\n"
f"{notices_list}"
)
def cluster_policy(task: BaseOperator):
"""Ensure Tasks have non-default owners."""
_check_task_rules(task)
Task instance mutation¶
Here's an example of re-routing tasks that are on their second (or greater) retry to a different queue: