AgentOperator & @task.agent

Use AgentOperator or the @task.agent decorator to run an LLM agent with tools — the agent reasons about the prompt, calls tools (database queries, API calls, etc.) in a multi-turn loop, and returns a final answer.

This is different from LLMOperator, which sends a single prompt and returns the output. AgentOperator manages a stateful tool-call loop where the LLM decides which tools to call and when to stop.

SQL Agent

The most common pattern: give an agent access to a database so it can answer questions by writing and executing SQL.

airflow/providers/common/ai/example_dags/example_agent.py[source]

if SQLToolset is not None:

    @dag(tags=["example"])
    def example_agent_operator_sql():
        AgentOperator(
            task_id="analyst",
            prompt="What are the top 5 customers by order count?",
            llm_conn_id="pydanticai_default",
            system_prompt=(
                "You are a SQL analyst. Use the available tools to explore "
                "the schema and answer the question with data."
            ),
            toolsets=[
                SQLToolset(
                    db_conn_id="postgres_default",
                    allowed_tables=["customers", "orders"],
                    max_rows=20,
                )
            ],
        )

The SQLToolset provides four tools to the agent:

Tool

Description

list_tables

Lists available table names (filtered by allowed_tables if set)

get_schema

Returns column names and types for a table

query

Executes a SQL query and returns rows as JSON

check_query

Validates SQL syntax without executing it

Hook-based Tools

Wrap any Airflow Hook’s methods as agent tools using HookToolset. Only methods you explicitly list are exposed — there is no auto-discovery.

airflow/providers/common/ai/example_dags/example_agent.py[source]

@dag(tags=["example"])
def example_agent_operator_hook():
    from airflow.providers.http.hooks.http import HttpHook

    http_hook = HttpHook(http_conn_id="my_api")

    AgentOperator(
        task_id="api_explorer",
        prompt="What endpoints are available and what does /status return?",
        llm_conn_id="pydanticai_default",
        system_prompt="You are an API explorer. Use the tools to discover and call endpoints.",
        toolsets=[
            HookToolset(
                http_hook,
                allowed_methods=["run"],
                tool_name_prefix="http_",
            )
        ],
    )


TaskFlow Decorator

The @task.agent decorator wraps AgentOperator. The function returns the prompt string; all other parameters are passed to the operator.

airflow/providers/common/ai/example_dags/example_agent.py[source]

if SQLToolset is not None:

    @dag(tags=["example"])
    def example_agent_decorator():
        @task.agent(
            llm_conn_id="pydanticai_default",
            system_prompt="You are a data analyst. Use tools to answer questions.",
            toolsets=[
                SQLToolset(
                    db_conn_id="postgres_default",
                    allowed_tables=["orders"],
                )
            ],
        )
        def analyze(question: str):
            return f"Answer this question about our orders data: {question}"

        analyze("What was our total revenue last month?")

Multimodal prompts

The decorated callable may also return a Sequence[UserContent] – for example, a list mixing strings with ImageUrl, BinaryContent, or other pydantic-ai user-content types – to send vision, audio, or document inputs to the model. This mirrors the input types accepted by pydantic-ai’s Agent.run_sync.

from pydantic_ai.messages import ImageUrl


@task.agent(llm_conn_id="pydanticai_default", system_prompt="You are an image analyst.")
def analyze_review(image_url: str):
    return ["Describe what you see:", ImageUrl(url=image_url)]

Note

Combining a non-string prompt with enable_hitl_review=True is not currently supported – the HITL session model stores the prompt as a string, so a Sequence prompt will raise at the review boundary. Widening HITL review to multimodal prompts is tracked as a follow-up.

Structured Output

Set output_type to a Pydantic BaseModel subclass to get structured data back. The model instance is pushed to XCom unchanged so downstream tasks can type-hint the class directly (def downstream(result: MyModel)) and use attribute access (result.field).

The declared output_type (and any BaseModel reachable from Union/Optional/list shapes) is registered for XCom deserialization by the worker when it loads the DAG, before any task runs. The Pydantic class must be defined at module scope and bound to an attribute matching its __name__. Same-DAG downstream tasks need no configuration. The UI’s XCom viewer renders the value via the stringify path (no configuration needed; see the LLMOperator guide for the exact representation). Cross-DAG xcom_pull consumers still need the class qualname added to [core] allowed_deserialization_classes.

airflow/providers/common/ai/example_dags/example_agent.py[source]

# Pydantic output classes must be defined at module scope so downstream
# tasks can re-import them when deserializing the XCom payload.
class Analysis(BaseModel):
    """Structured analysis output for the agent example."""

    summary: str
    top_items: list[str]
    row_count: int


airflow/providers/common/ai/example_dags/example_agent.py[source]

if SQLToolset is not None:

    @dag(tags=["example"])
    def example_agent_structured_output():
        @task.agent(
            llm_conn_id="pydanticai_default",
            system_prompt="You are a data analyst. Return structured results.",
            output_type=Analysis,
            toolsets=[SQLToolset(db_conn_id="postgres_default")],
        )
        def analyze(question: str):
            return f"Analyze: {question}"

        analyze("What are the trending products this week?")

Chaining with Downstream Tasks

The agent’s output is pushed to XCom like any other operator, so downstream tasks can consume it.

airflow/providers/common/ai/example_dags/example_agent.py[source]

if SQLToolset is not None:

    @dag(tags=["example"])
    def example_agent_chain():
        @task.agent(
            llm_conn_id="pydanticai_default",
            system_prompt="You are a SQL analyst.",
            toolsets=[SQLToolset(db_conn_id="postgres_default", allowed_tables=["orders"])],
        )
        def investigate(question: str):
            return f"Investigate: {question}"

        @task
        def send_report(analysis: str):
            """Send the agent's analysis to a downstream system."""
            print(f"Report: {analysis}")
            return analysis

        result = investigate("Summarize order trends for last quarter")
        send_report(result)

Multi-turn Sessions

By default each agent run is a cold, single-turn conversation. To carry a conversation across runs – a chat or iterative agent where “and the third one?” must resolve against an earlier answer – pass message_history.

When message_history is set, the operator seeds the run with those prior turns and, after the run, pushes the full updated transcript (result.all_messages()) to XCom under the key message_history. The next run reads it back to resume the conversation. None (the default) keeps the single-turn behavior unchanged.

The operator does not decide where a session is stored – that keying is deployment-specific. The pattern is three tasks: load the prior transcript for the session, run the agent, store the updated transcript. The example keys a JSON file in object storage by session_id (use s3:// / gs:// in a deployment); the first run starts from an empty "[]".

airflow/providers/common/ai/example_dags/example_agent.py[source]

@dag(tags=["example"], params={"session_id": "demo-session"})
def example_agent_session():
    """Resume a conversation across runs via ``message_history``.

    The agent step seeds itself with the prior transcript and re-emits the
    updated transcript to XCom (key ``message_history``). Loading and storing
    that transcript under a session key is the DAG's job -- here, a JSON file in
    object storage keyed by ``session_id``. Swap the path for ``s3://`` /
    ``gs://`` in a deployment.
    """
    sessions_root = ObjectStoragePath("file:///tmp/airflow_agent_sessions")

    @task
    def load_history(session_id: str) -> str:
        path = sessions_root / f"{session_id}.json"
        # First turn: no file yet -> start a fresh session (empty transcript).
        return path.read_text() if path.exists() else "[]"

    @task.agent(
        llm_conn_id="pydanticai_default",
        system_prompt="You are a helpful assistant. Use the earlier turns for context.",
        # The XComArg both wires the dependency and resolves to the JSON transcript.
        message_history=load_history("{{ params.session_id }}"),
    )
    def ask(question: str) -> str:
        return question

    @task
    def save_history(session_id: str, transcript: str) -> None:
        # Local/fsspec object storage does not auto-create parent dirs on write.
        sessions_root.mkdir(parents=True, exist_ok=True)
        (sessions_root / f"{session_id}.json").write_text(transcript)

    answer = ask("And what did I ask you a moment ago?")
    saved = save_history(
        "{{ params.session_id }}",
        # The agent step pushes the post-run transcript under this XCom key.
        "{{ ti.xcom_pull(task_ids='ask', key='message_history') }}",
    )
    # save runs after the agent so the pulled transcript is the fresh one.
    answer >> saved


message_history accepts a list of pydantic-ai ModelMessage objects or their JSON form (str / bytes), so the value emitted to XCom feeds straight back in on the next run. When pulling it via a template, pass default='[]' (as above) so the first run – which has no XCom yet – starts a fresh session instead of trying to parse the string "None".

The transcript is cumulative: each turn appends to it, so it grows for the life of the session. For long sessions, configure an object-storage XCom backend or trim older turns before the next run rather than feeding the whole history back unbounded.

Note

message_history cannot be combined with enable_hitl_review – the operator raises at construction. The post-review (human-approved) transcript is not recoverable today, so emitting the pre-review transcript would silently drop the reviewed turns.

Durable Execution

Agent tasks can involve multiple LLM calls and tool invocations. If a task fails mid-run (network error, timeout, transient API failure), a plain retry re-executes every LLM call and tool call from scratch – repeating work that already succeeded and incurring additional cost.

Setting durable=True caches each LLM response and tool result to ObjectStorage as it completes. On retry, completed steps are replayed from the cache and only the remaining steps run against the live model and tools. The cache is deleted after successful completion.

Durable execution only helps when the task has retries configured. Without retries there is nothing to replay.

Configuration

Set the cache location in airflow.cfg. The task raises ValueError at runtime if durable=True and the option is missing.

[common.ai]
# Local filesystem -- suitable for development
durable_cache_path = file:///tmp/airflow_durable_cache

The value is an ObjectStorage URI, so any supported backend works. For production, use a shared store so retries on a different worker can read the cache:

[common.ai]
durable_cache_path = s3://my-bucket/airflow/durable-cache

Operator example

airflow/providers/common/ai/example_dags/example_agent_durable.py[source]

if SQLToolset is not None:

    @dag(default_args={"retries": 3, "retry_delay": timedelta(seconds=30)}, tags=["example"])
    def example_agent_durable_operator():
        """Agent with durable execution -- resumes from the last model call on retry."""
        AgentOperator(
            task_id="durable_analyst",
            prompt="What are the top 5 customers by order count?",
            llm_conn_id="pydanticai_default",
            system_prompt=(
                "You are a SQL analyst. Use the available tools to explore "
                "the schema and answer the question with data."
            ),
            durable=True,
            toolsets=[
                SQLToolset(
                    db_conn_id="postgres_default",
                    allowed_tables=["customers", "orders"],
                    max_rows=20,
                )
            ],
        )

Decorator example

airflow/providers/common/ai/example_dags/example_agent_durable.py[source]

if SQLToolset is not None:

    @dag(default_args={"retries": 3, "retry_delay": timedelta(seconds=30)}, tags=["example"])
    def example_agent_durable_decorator():
        @task.agent(
            llm_conn_id="pydanticai_default",
            system_prompt="You are a data analyst. Use tools to answer questions.",
            durable=True,
            toolsets=[
                SQLToolset(
                    db_conn_id="postgres_default",
                    allowed_tables=["orders"],
                )
            ],
        )
        def analyze(question: str):
            return f"Answer this question about our orders data: {question}"

        analyze("What was our total revenue last month?")

How it works

  1. On first execution, each LLM response and tool result is saved to a JSON file as the agent progresses, together with a fingerprint of the request that produced it (model, message history, settings, and tools for LLM steps; tool name, arguments, and call id for tool steps).

  2. If the task fails and Airflow retries it, completed steps are loaded from the cache and returned without calling the model or tool. Steps not yet in the cache proceed normally.

  3. Before a step is replayed, its stored fingerprint is compared against the current request. If anything changed between attempts – the system prompt, the model, the toolset, model settings, or the conversation so far – the stale entry is discarded, a warning is logged, and the step re-runs live. A divergence also invalidates the steps after it: re-running an LLM step produces fresh tool call ids, so tool results recorded under the old conversation no longer match. A changed agent costs a re-run; it never replays responses that belong to a different conversation.

  4. After successful completion, the cache file is deleted.

Replay verification compares the requests sent to models and tools, not the code behind them. Editing a tool’s implementation between attempts does not invalidate an already-cached result for an identical call, and pointing llm_conn_id at a different endpoint serving the same model name does not invalidate cached responses – delete the cache file to force a fully fresh run.

After the run, a single INFO summary line reports how many steps were replayed vs executed fresh. Per-step detail is available at DEBUG level.

The cache file is named {dag_id}_{task_id}_{run_id}.json (with _{map_index} appended for mapped tasks) and stored under the configured durable_cache_path. To force a completely fresh run, delete the cache file for that task.

Note

Runs that fail permanently (exhaust all retries) leave their cache file behind. These orphaned files do not affect future DAG runs (each run gets its own file) but will consume storage. Clean them up periodically or add a lifecycle policy to the storage backend.

Side effects and idempotency

Durable execution caches return values, not side effects. When a step is replayed, the tool’s code does not run – only the stored return value is returned. Two things follow from this:

  • If a tool completed successfully and its result was cached, the tool will not run again on retry. Any side effect it produced (writing a file, sending a message) already happened during the original run and is not repeated.

  • If a tool fails before its result is cached, it will run again on retry. A tool that partially completed (e.g. sent an email then raised an exception) may produce the side effect a second time.

All built-in toolsets (SQLToolset with allow_writes=False, HookToolset in read-only mode) are read-only and replay safely. For custom tools with non-idempotent side effects, design the tool to be idempotent. For example, check whether the operation already completed before acting, or use database constraints to prevent duplicate writes.

Tool results must be JSON-serializable to be cached. If a tool returns a non-serializable value (e.g. BinaryContent from MCP tools), that step is skipped with a warning and will re-execute on retry instead of replaying from cache. The task itself still succeeds.

Capabilities (pydantic-ai)

pydantic-ai capabilities bundle tools, lifecycle hooks, instructions, and model settings into composable units. Common ones include Thinking (reasoning at a configurable effort level), WebSearch, WebFetch, ImageGeneration, and MCP.

AgentOperator does not yet expose a first-class capabilities= kwarg, but anything passed through agent_params is forwarded to the underlying Agent(...) constructor.

airflow/providers/common/ai/example_dags/example_agent_capabilities.py[source]

@dag(tags=["example"])
def example_agent_capabilities_thinking():
    AgentOperator(
        task_id="reasoner",
        prompt="Walk through the steps to compute the 10th Fibonacci number, then give the answer.",
        llm_conn_id="pydanticai_default",
        system_prompt="You are a careful mathematician. Think before answering.",
        agent_params={
            "capabilities": [Thinking(effort="high")],
        },
    )


Capabilities compose with toolsets – pydantic-ai merges tools from both.

airflow/providers/common/ai/example_dags/example_agent_capabilities.py[source]

if SQLToolset is not None:

    @dag(tags=["example"])
    def example_agent_capabilities_composed():
        AgentOperator(
            task_id="analyst",
            prompt="Cross-reference our top customers with their recent public news. Think first.",
            llm_conn_id="pydanticai_default",
            system_prompt=(
                "You are a sales analyst. Query the database for customers, then search the web "
                "for recent news. Reason carefully about which leads to surface."
            ),
            toolsets=[
                SQLToolset(
                    db_conn_id="postgres_default",
                    allowed_tables=["customers", "orders"],
                    max_rows=20,
                ),
            ],
            agent_params={
                "capabilities": [Thinking(effort="medium"), WebSearch()],
            },
        )

Warning

agent_params is a templated field, which Airflow serializes by calling str() on values it doesn’t natively understand. Capability instances are not yet round-trip-safe through DAG serialization, so the examples below construct them inside the @dag function – not at module level. First-class capabilities= support on AgentOperator (with proper serializer hooks) is tracked as a follow-up.

Code Mode (Monty sandbox)

Set code_mode=True to collapse the agent’s tools into a single run_code tool powered by the Monty sandbox (via pydantic-ai-harness). Instead of one model round-trip per tool call, the model writes a single Python snippet that calls the tools as functions – with loops, conditionals, and asyncio.gather – in one turn. For multi-tool workflows this cuts round-trips and token use.

The generated code runs in Monty’s deny-by-default sandbox: it cannot read the filesystem, the network, or environment variables. It can only call the tools you registered. Code mode therefore does not widen what the agent can reach – the tools it calls still run in the worker – it only changes how the model invokes them. See Toolsets security for the tool boundary.

When to use it

Code mode pays off for orchestration-heavy, computation-light workflows: calling several tools, looping over their results, filtering, and combining them. Collapsing many sequential tool calls into one turn is where the round-trip and token savings come from – the example above answers a per-customer question in a single run_code block instead of one model round-trip per customer.

It is not a general-purpose code runtime. The generated code is only the glue between tool calls; every real capability must come from a tool. Monty runs a subset of Python and cannot import third-party libraries (pandas, numpy, requests, boto3, …) and has no filesystem or network access. If a task needs to crunch data inline with a library, you have two options, both better than code mode:

  • Push the work into a tool. Do the aggregation in SQL (SQLToolset), or expose a hook method that returns the processed result (HookToolset). The tool runs in the full worker environment with all its dependencies, and code mode just orchestrates it.

  • Use a container-based execution environment (e.g. Docker or E2B via pydantic-ai-harness) instead of the in-process Monty sandbox. These support third-party packages but pay a per-run container cost and a larger security surface, so reach for them only when inline library code is genuinely required.

Requires the code-mode extra:

pip install "apache-airflow-providers-common-ai[code-mode]"

airflow/providers/common/ai/example_dags/example_agent.py[source]

@dag(tags=["example"])
def example_agent_operator_code_mode():
    AgentOperator(
        task_id="code_mode_analyst",
        prompt="For the top 3 customers by order count, what was each one's total spend?",
        llm_conn_id="pydanticai_default",
        system_prompt="You are a SQL analyst. Write Python that calls the tools to answer.",
        toolsets=[SQLToolset(db_conn_id="postgres_default", allowed_tables=["customers", "orders"])],
        # Requires the `code-mode` extra:
        #   pip install "apache-airflow-providers-common-ai[code-mode]"
        code_mode=True,
    )


Unlike passing a capability through agent_params (see Capabilities (pydantic-ai)), code_mode is a plain boolean and is serialization-safe: the CodeMode capability is built at execution time, not stored on the serialized operator.

Note

Monty is pre-1.0. The code-mode extra is opt-in so its dependency churn never affects the base provider install.

Parameters

  • prompt: The prompt to send to the agent (operator) or the return value of the decorated function (decorator).

  • llm_conn_id: Airflow connection ID for the LLM provider.

  • model_id: Model identifier (e.g. "openai:gpt-5"). Overrides the connection’s extra field.

  • system_prompt: System-level instructions for the agent. Supports Jinja templating.

  • output_type: Expected output type (default: str). Set to a Pydantic BaseModel for structured output.

  • toolsets: List of pydantic-ai toolsets (SQLToolset, HookToolset, AgentSkillsToolset for AgentSkillsToolset, etc.).

  • enable_tool_logging: Wrap each toolset in LoggingToolset so that every tool call is logged in real time. Default True.

  • agent_params: Additional keyword arguments passed to the pydantic-ai Agent constructor (e.g. retries, model_settings, capabilities). See Capabilities (pydantic-ai) for how to enable pydantic-ai capabilities such as Thinking, WebSearch, and ImageGeneration.

  • usage_limits: Optional pydantic-ai UsageLimits enforced on every agent run (initial run, durable replay, and HITL regeneration). Use it to cap requests, tokens, or tool calls per task – agents are particularly prone to runaway tool loops, so tool_calls_limit is a useful guardrail. See LLMOperator for an example. Default None.

  • durable: When True, enables step-level caching of model responses and tool results via ObjectStorage. On retry, cached steps are replayed instead of re-executing expensive LLM calls. Requires the [common.ai] durable_cache_path config option to be set. Default False.

  • code_mode: When True, wraps the agent’s tools in a single run_code tool that the model drives by writing Python, executed in the Monty sandbox. Requires the code-mode extra. Default False. See Code Mode (Monty sandbox).

  • message_history: Prior conversation to seed a multi-turn session, as a list of pydantic-ai ModelMessage objects or their JSON form (str / bytes). When set, the post-run transcript is pushed to XCom under the key message_history for the next run to resume. Default None (single-turn). See Multi-turn Sessions.

Logging

All AI operators automatically log a post-run summary after run_sync() completes. AgentOperator additionally wraps toolsets for real-time per-tool-call logging (controlled by enable_tool_logging).

Real-time tool call logging (AgentOperator only) — each tool call is logged as it happens:

INFO - Tool call: list_tables
INFO - Tool list_tables returned in 0.12s
INFO - Tool call: get_schema
INFO - Tool get_schema returned in 0.08s
INFO - Tool call: query
INFO - Tool query returned in 0.34s

Tool arguments are logged at DEBUG level to avoid leaking sensitive data at the default log level.

Post-run summary (all operators) — after the LLM run finishes, a summary is logged with model name, token usage, and the full tool call sequence:

INFO - LLM run complete: model=gpt-5, requests=4, tool_calls=3, input_tokens=2847, output_tokens=512, total_tokens=3359
INFO - Tool call sequence: list_tables -> get_schema -> query

At DEBUG level, the LLM output is also logged (truncated to 500 characters).

Both layers use Airflow’s ::group:: / ::endgroup:: log markers, which render as collapsible sections in the Airflow UI task log viewer.

To disable real-time tool logging while keeping the post-run summary:

AgentOperator(
    task_id="my_agent",
    prompt="...",
    llm_conn_id="my_llm",
    toolsets=[SQLToolset(db_conn_id="my_db")],
    enable_tool_logging=False,
)

Security

See also

Toolsets — Security for defense layers, allowed_tables limitations, HookToolset guidelines, recommended configurations, and the production checklist.

Was this entry helpful?