Cluster Policies¶
If you want to check or mutate DAGs or Tasks on a cluster-wide level, then a Cluster Policy will let you do that. They have three main purposes:
Checking that DAGs/Tasks meet a certain standard
Setting default arguments on DAGs/Tasks
Performing custom routing logic
There are three types of cluster policy:
dag_policy
: Takes aDAG
parameter calleddag
. Runs at load time of the DAG from DagBagDagBag
.task_policy
: Takes a parameter calledtask
that is of type eitherBaseOperator
orMappedOperator
(for dynamically expanded tasks). The policy gets executed when the task is created during parsing of the task from DagBag at load time. This means that the whole task definition can be altered in the task policy. It does not relate to a specific task running in a DagRun. Thetask_policy
defined is applied to all the task instances that will be executed in the future.task_instance_mutation_hook
: Takes aTaskInstance
parameter calledtask_instance
. Thetask_instance_mutation
applies not to a task but to the instance of a task that relates to a particular DagRun. It is executed in a "worker", not in the DAG file processor, just before the task instance is executed. The policy is only applied to the currently executed run (i.e. instance) of that task.
The DAG and Task cluster policies can raise the AirflowClusterPolicyViolation
exception to indicate that the DAG/task they were passed is not compliant and should not be loaded.
Any extra attributes set by a cluster policy take priority over those defined in your DAG file; for example, if you set an sla
on your Task in the DAG file, and then your cluster policy also sets an sla
, the cluster policy's value will take precedence.
To configure cluster policies, you should create an airflow_local_settings.py
file in either the config
folder under your $AIRFLOW_HOME
, or place it on the $PYTHONPATH
, and then add callables to the file matching one or more of the cluster policy names above (e.g. dag_policy
)
Examples¶
DAG policies¶
This policy checks if each DAG has at least one tag defined:
def dag_policy(dag: DAG):
"""Ensure that DAG has at least one tag"""
if not dag.tags:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
)
Note
To avoid import cycles, if you use DAG
in type annotations in your cluster policy, be sure to import from airflow.models
and not from airflow
.
Note
DAG policies are applied after the DAG has been completely loaded, so overriding the default_args
parameter has no effect. If you want to override the default operator settings, use task policies instead.
Task policies¶
Here's an example of enforcing a maximum timeout policy on every task:
class TimedOperator(BaseOperator, ABC):
timeout: timedelta
def task_policy(task: TimedOperator):
if task.task_type == "HivePartitionSensor":
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)
You could also implement to protect against common errors, rather than as technical security controls. For example, don't run tasks without airflow owners:
def task_must_have_owners(task: BaseOperator):
if task.owner and not isinstance(task.owner, str):
raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")
if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
raise AirflowClusterPolicyViolation(
f"""Task must have non-None non-default owner. Current value: {task.owner}"""
)
If you have multiple checks to apply, it is best practice to curate these rules in a separate python module and have a single policy / task mutation hook that performs multiple of these custom checks and aggregates the various error messages so that a single AirflowClusterPolicyViolation
can be reported in the UI (and import errors table in the database).
For example, your airflow_local_settings.py
might follow this pattern:
TASK_RULES: list[Callable[[BaseOperator], None]] = [
task_must_have_owners,
]
def _check_task_rules(current_task: BaseOperator):
"""Check task rules for given task."""
notices = []
for rule in TASK_RULES:
try:
rule(current_task)
except AirflowClusterPolicyViolation as ex:
notices.append(str(ex))
if notices:
notices_list = " * " + "\n * ".join(notices)
raise AirflowClusterPolicyViolation(
f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
f"Notices:\n"
f"{notices_list}"
)
def example_task_policy(task: BaseOperator):
"""Ensure Tasks have non-default owners."""
_check_task_rules(task)
Task instance mutation¶
Here's an example of re-routing tasks that are on their second (or greater) retry to a different queue:
def task_instance_mutation_hook(task_instance: TaskInstance):
if task_instance.try_number >= 1:
task_instance.queue = "retry_queue"
Note that since priority weight is determined dynamically using weight rules, you cannot alter the priority_weight
of a task instance within the mutation hook.