airflow.providers.common.ai.policies.retry

LLM-powered retry policy using pydantic-ai for error classification.

Requires Airflow 3.3+ (RetryPolicy was added in AIP-105).

Classes

ErrorClassification

Structured LLM output for error classification.

LLMRetryPolicy

Retry policy that uses an LLM to classify errors and decide retry behaviour.

Module Contents

class airflow.providers.common.ai.policies.retry.ErrorClassification(/, **data)[source]

Bases: pydantic.BaseModel

Structured LLM output for error classification.

category: str[source]

One of: rate_limit, auth, network, data, resource, transient, permanent.

should_retry: bool[source]

Whether the operation should be retried.

suggested_delay_seconds: int = 0[source]

How long to wait before retrying (0 if should_retry is False).

reasoning: str[source]

Brief explanation of the classification decision.

class airflow.providers.common.ai.policies.retry.LLMRetryPolicy(llm_conn_id, model_id=None, instructions=None, fallback_rules=None, timeout=30.0)[source]

Bases: airflow.sdk.definitions.retry_policy.RetryPolicy

Retry policy that uses an LLM to classify errors and decide retry behaviour.

Uses PydanticAIHook to call any configured LLM provider (OpenAI, Anthropic, Bedrock, Vertex, Ollama, etc.) for error classification with structured output.

When the LLM call itself fails, the policy falls back to fallback_rules (if provided) or returns DEFAULT to use the task’s standard retry logic.

Parameters:
  • llm_conn_id (str) – Airflow connection ID for the LLM provider.

  • model_id (str | None) – Model identifier override (e.g. "openai:gpt-4o-mini" for cost efficiency). If not set, uses the model from the connection.

  • instructions (str | None) – Custom system prompt for classification. Defaults to a general-purpose error classifier.

  • fallback_rules (list[airflow.sdk.definitions.retry_policy.RetryRule] | None) – Optional list of RetryRule applied when the LLM call fails. Provides a deterministic safety net.

  • timeout (float) – Maximum seconds to wait for the LLM response before falling back. Defaults to 30s. The LLM provider’s own timeout (e.g. 600s for Anthropic) is much longer; this keeps the retry decision path fast even when the provider is degraded.

llm_conn_id[source]
model_id = None[source]
instructions = Multiline-String[source]
Show Value
"""You are an error classifier for a data pipeline system. Given an error message from a failed task, classify it into one of these categories:

- rate_limit: API throttling or quota exceeded. Should retry after a delay.
- auth: Credentials invalid, expired, or missing permissions. Should NOT retry.
- network: Transient connectivity issue. Should retry quickly.
- data: Schema validation, type mismatch, or bad input data. Should NOT retry.
- resource: Resource not found or unavailable (e.g., missing table, bucket). Should NOT retry.
- transient: Temporary issue likely to resolve on its own. Should retry.
- permanent: Problem that won't resolve without code or config changes. Should NOT retry.

Set suggested_delay_seconds based on the error type: 60 for rate limits, 10 for network, 30 for transient. Set 0 for errors that should not retry."""
fallback_rules = None[source]
timeout = 30.0[source]
evaluate(exception, try_number, max_tries, context=None)[source]

Decide whether and how to retry given the failure.

Note

AirflowFailException and AirflowSensorTimeout always fail the task immediately. The retry policy is never consulted for these exceptions.

Parameters:
  • exception (BaseException) – The exception that caused the task failure.

  • try_number (int) – Current try number (1-based).

  • max_tries (int) – Maximum tries configured on the task.

  • context (airflow.sdk.definitions.context.Context | None) – Airflow task context (may be None).

Was this entry helpful?