airflow.providers.common.ai.durable.caching_toolset

Caching toolset wrapper for durable execution.

Attributes

log

Classes

CachingToolset

Wraps a toolset to cache tool call results in ObjectStorage for durable execution.

Module Contents

airflow.providers.common.ai.durable.caching_toolset.log[source]
class airflow.providers.common.ai.durable.caching_toolset.CachingToolset[source]

Bases: pydantic_ai.toolsets.wrapper.WrapperToolset[Any]

Wraps a toolset to cache tool call results in ObjectStorage for durable execution.

On each call_tool() invocation, checks if a cached result exists for the current step index and was produced by the same call (same tool name, arguments, and model-issued tool_call_id – compared via fingerprint). If so, returns the cached result without executing the tool. Otherwise, executes the tool and caches the result. A fingerprint mismatch means the conversation diverged from the previous attempt; the stale entry is discarded and the tool runs live.

The step index is grabbed before the first await, so parallel tool calls via asyncio.gather get deterministic indices (tasks start executing their synchronous preamble in creation order).

storage: airflow.providers.common.ai.durable.storage.DurableStorage[source]
counter: airflow.providers.common.ai.durable.step_counter.DurableStepCounter[source]
async call_tool(name, tool_args, ctx, tool)[source]

Call a tool with the given arguments.

Args:

name: The name of the tool to call. tool_args: The arguments to pass to the tool. ctx: The run context. tool: The tool definition returned by [get_tools][pydantic_ai.toolsets.AbstractToolset.get_tools] that was called.

Was this entry helpful?