LLM Retry Policies¶
Added in version 3.3.0.
The LLMRetryPolicy uses an LLM to classify task errors and make intelligent
retry decisions. It works with any LLM provider supported by pydantic-ai
(OpenAI, Anthropic, Bedrock, Vertex, Ollama, etc.).
For the core retry policy concepts, see Tasks.
Setup¶
Install the provider with the LLM backend you need:
pip install 'apache-airflow-providers-common-ai[anthropic]'
Create a connection (
Admin > Connections):Connection Id:
pydanticai_defaultConnection Type:
Pydantic AIPassword: Your API key
Extra:
{"model": "anthropic:claude-haiku-4-5-20251001"}
Usage¶
from airflow.providers.common.ai.policies.retry import LLMRetryPolicy
from airflow.sdk.definitions.retry_policy import RetryAction, RetryRule
from datetime import timedelta
llm_policy = LLMRetryPolicy(
llm_conn_id="pydanticai_default",
timeout=30.0, # max seconds to wait for LLM response
fallback_rules=[ # used when LLM call fails
RetryRule(exception=ConnectionError, action=RetryAction.RETRY, retry_delay=timedelta(seconds=10)),
RetryRule(exception=PermissionError, action=RetryAction.FAIL),
],
)
@task(retries=5, retry_policy=llm_policy)
def call_external_api(): ...
How it works¶
When a task fails, LLMRetryPolicy:
Sends the exception message to the configured LLM
The LLM classifies the error into a category (
rate_limit,auth,network,data,transient,permanent)Based on the classification, returns RETRY (with a suggested delay) or FAIL
The classification reason is logged in the task logs
If the LLM call fails (provider down, timeout, bad credentials), the policy
falls back to fallback_rules if configured, or to the task’s standard
retry behaviour.
Custom instructions¶
The default classifier handles generic categories. For domain-specific
behaviour, override instructions to inject your own taxonomy. The LLM still
returns an ErrorClassification
(category, should_retry, suggested_delay_seconds, reasoning)
– only the prompt changes.
SNOWFLAKE_INSTRUCTIONS = (
"You are an error classifier for Snowflake-backed data pipelines. "
"Classify the error into one of: rate_limit, auth, network, data, "
"transient, permanent.\n\n"
"Snowflake-specific guidance:\n"
"- 'Statement queued' or 'concurrency limit' -> rate_limit, retry after 120s\n"
"- 'JWT token expired' -> transient (token rotates), retry after 30s\n"
"- 'Authentication token has expired' AFTER multiple retries -> auth, do NOT retry\n"
"- 'Column does not exist' -> data, do NOT retry (schema drift needs human fix)\n"
"- 'Warehouse suspended' -> transient, retry after 30s (auto-resume)\n\n"
"Set suggested_delay_seconds based on the error type. "
"Set 0 for errors that should not retry."
)
snowflake_policy = LLMRetryPolicy(
llm_conn_id="pydanticai_default",
instructions=SNOWFLAKE_INSTRUCTIONS,
fallback_rules=[
RetryRule(
exception=ConnectionError,
action=RetryAction.RETRY,
retry_delay=timedelta(seconds=30),
),
],
)
@task(retries=5, retry_policy=snowflake_policy)
def query_snowflake(): ...
When writing custom instructions:
The LLM must return the same
ErrorClassificationschema (category,should_retry,suggested_delay_seconds,reasoning). Mention the fields explicitly so the model fills them.Be concrete with examples (
"'Warehouse suspended' -> transient") rather than vague rules (“treat warehouse issues as recoverable”).retry_reasonis truncated to 500 chars in the audit log – keepreasoningoutputs concise.
Parameters¶
Parameter |
Default |
Description |
|---|---|---|
|
(required) |
Airflow connection ID for the LLM provider. |
|
None |
Override the model from the connection (e.g., |
|
(built-in) |
Custom system prompt for error classification. |
|
None |
List of |
|
30.0 |
Max seconds to wait for the LLM response before falling back. |
Local LLM support¶
For environments where exception data must not leave the infrastructure, point to a local model via Ollama or vLLM:
LLMRetryPolicy(
llm_conn_id="ollama_local", # host=http://localhost:11434
model_id="ollama:llama3.2",
)