airflow.providers.common.ai.toolsets.sql

Curated SQL toolset wrapping DbApiHook for agentic database workflows.

Classes

SQLToolset

Curated toolset that gives an LLM agent safe access to a SQL database.

Module Contents

class airflow.providers.common.ai.toolsets.sql.SQLToolset(db_conn_id, *, allowed_tables=None, schema=None, allow_writes=False, max_rows=50)[source]

Bases: pydantic_ai.toolsets.abstract.AbstractToolset[Any]

Curated toolset that gives an LLM agent safe access to a SQL database.

Provides four tools — list_tables, get_schema, query, and check_query — inspired by LangChain’s SQLDatabaseToolkit pattern.

Uses a DbApiHook resolved lazily from the given db_conn_id.

When a tool fails, the database’s error message is returned to the agent as a retry (pydantic_ai.ModelRetry) so the model can correct its SQL within the run instead of failing the task. pydantic-ai bounds this by the tool’s max_retries, so an unrecoverable error – a bad connection or an auth failure – exhausts the retries and fails the task for Airflow to retry. The toolset does not inspect the error type or message.

Parameters:
  • db_conn_id (str) – Airflow connection ID for the database.

  • allowed_tables (list[str] | None) –

    Restrict the agent to a fixed set of tables. None (default) exposes every table in schema. Entries may be schema-qualified ("SCHEMA.TABLE") to span multiple schemas in one database – common on warehouses such as Snowflake. list_tables introspects each referenced schema and returns the matching tables fully qualified, and get_schema routes to the table’s own schema. Unqualified entries use schema. Matching is case-insensitive, since databases reflect identifiers in their own case.

    When set, the list is enforced on the query and check_query tools as well as on discovery: every table a query reaches – through subqueries, CTEs, JOINs, set operations, DESCRIBE, catalog views such as information_schema, or DML – must be on the list, resolved with its database/catalog, or the query is rejected before it runs. CTE references are excluded by lexical scope (a same-named CTE in another scope never hides a real table). Constructs the list cannot describe are rejected outright while it is active: table-valued functions (dblink), TABLE('name') row sources, the TABLE <name> shorthand, SHOW, dynamic SQL, and inline comments (where parser-vs-engine differences such as MySQL /*! ... */ executable comments hide).

    Note

    This is an application-level guardrail, enforced by parsing the SQL with sqlglot. It is strong defense-in-depth but not a substitute for database permissions: it cannot police data reached through a function whose argument is itself SQL or a path – pg_read_file('...') (a file) or query_to_xml('SELECT ... FROM other_table', ...) and dblink in scalar position (a table, read through a string the parser cannot inspect) – and any query the engine parses differently from sqlglot is a residual gap. For a hard guarantee, also point db_conn_id at a least-privilege role whose SELECT grants are limited to the same tables.

  • schema (str | None) – Default schema/namespace for table listing and introspection, used for unqualified allowed_tables entries and unqualified get_schema calls. Schema-qualified allowed_tables entries override it per table.

  • allow_writes (bool) – Allow data-modifying SQL (INSERT, UPDATE, DELETE, etc.). Default False — only SELECT-family statements are permitted.

  • max_rows (int) – Maximum number of rows returned from the query tool. Default 50.

property id: str[source]

An ID for the toolset that is unique among all toolsets registered with the same agent.

If you’re implementing a concrete implementation that users can instantiate more than once, you should let them optionally pass a custom ID to the constructor and return that here.

A toolset needs to have an ID in order to be used in a durable execution environment like Temporal, in which case the ID will be used to identify the toolset’s activities within the workflow.

async get_tools(ctx)[source]

The tools that are available in this toolset.

async call_tool(name, tool_args, ctx, tool)[source]

Call a tool with the given arguments.

Args:

name: The name of the tool to call. tool_args: The arguments to pass to the tool. ctx: The run context. tool: The tool definition returned by [get_tools][pydantic_ai.toolsets.AbstractToolset.get_tools] that was called.

Was this entry helpful?