# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Bridge pydantic-ai toolsets into LangChain tools.
This is the reverse of pydantic-ai's upstream ``pydantic_ai.ext.langchain``
bridge. Upstream turns LangChain tools *into* a pydantic-ai toolset
(:class:`~pydantic_ai.ext.langchain.LangChainToolset`) so they can be used with
common.ai's ``AgentOperator``. This module goes the other way: it turns a
pydantic-ai :class:`~pydantic_ai.toolsets.abstract.AbstractToolset` -- such as
common.ai's :class:`~airflow.providers.common.ai.toolsets.sql.SQLToolset`,
:class:`~airflow.providers.common.ai.toolsets.hook.HookToolset`, or
:class:`~airflow.providers.common.ai.toolsets.mcp.MCPToolset` -- into a list of
LangChain ``StructuredTool`` objects, so Airflow's curated tools can be handed
to a LangChain agent or chain.
"""
from __future__ import annotations
import asyncio
import concurrent.futures
from typing import TYPE_CHECKING, Any
from pydantic_ai import RunContext
from pydantic_ai.exceptions import ModelRetry
from pydantic_ai.models.test import TestModel
from pydantic_ai.usage import RunUsage
if TYPE_CHECKING:
from collections.abc import Coroutine
from langchain_core.tools import StructuredTool
from pydantic_ai.toolsets.abstract import AbstractToolset, ToolsetTool
def _run_coro_sync(coro: Coroutine[Any, Any, Any]) -> Any:
"""
Run an awaitable to completion from synchronous code.
LangChain's ``StructuredTool.func`` is synchronous and is what an Airflow
``@task`` calls, but a pydantic-ai toolset's ``get_tools`` / ``call_tool``
are coroutines. When no event loop is running we drive the coroutine with
:func:`asyncio.run`; if one is already running in this thread (an async
caller) we run it in a worker thread to avoid nesting loops.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, coro).result()
def _build_structured_tool(
toolset: AbstractToolset[Any],
name: str,
toolset_tool: ToolsetTool[Any],
ctx: RunContext[Any],
structured_tool_cls: type[StructuredTool],
) -> StructuredTool:
"""Build a single LangChain ``StructuredTool`` from one pydantic-ai tool."""
tool_def = toolset_tool.tool_def
def _validate(kwargs: dict[str, Any]) -> dict[str, Any]:
# Mirrors what pydantic-ai's ToolManager does before dispatch, which the
# bridge bypasses. A passthrough validator (the bundled toolsets) returns
# the args unchanged; a typed one coerces them (e.g. "5" -> 5).
return toolset_tool.args_validator.validate_python(kwargs)
def _sync_call(**kwargs: Any) -> Any:
try:
return _run_coro_sync(toolset.call_tool(name, _validate(kwargs), ctx, toolset_tool))
except ModelRetry as e:
# ModelRetry is a "feed this back to the model and retry" signal, not a
# failure. Return the message as the tool output so the model self-corrects
# (see docstring); raising would abort under create_agent's default handling.
return str(e)
async def _async_call(**kwargs: Any) -> Any:
try:
return await toolset.call_tool(name, _validate(kwargs), ctx, toolset_tool)
except ModelRetry as e:
return str(e)
return structured_tool_cls.from_function(
func=_sync_call,
coroutine=_async_call,
name=name,
description=tool_def.description or name,
args_schema=tool_def.parameters_json_schema,
)