PydanticAIHook

Use PydanticAIHook to interact with LLM providers via pydantic-ai.

The hook manages API credentials from an Airflow connection and creates pydantic-ai Model and Agent objects. It supports any provider that pydantic-ai supports.

Basic Usage

Use the hook in a @task function to call an LLM:

airflow/providers/common/ai/example_dags/example_pydantic_ai_hook.py[source]

@dag(schedule=None, tags=["example"])
def example_pydantic_ai_hook():
    @task
    def generate_summary(text: str) -> str:
        hook = PydanticAIHook(llm_conn_id="pydanticai_default")
        agent = hook.create_agent(output_type=str, instructions="Summarize concisely.")
        result = agent.run_sync(text)
        return result.output

    generate_summary("Apache Airflow is a platform for programmatically authoring...")


Overriding the Model

The model can be specified at three levels (highest priority first):

  1. model_id parameter on the hook

  2. model key in the connection’s extra JSON

  3. (No default — raises an error if neither is set)

# Use model from the connection's extra JSON
hook = PydanticAIHook(llm_conn_id="my_llm")

# Override with a specific model
hook = PydanticAIHook(llm_conn_id="my_llm", model_id="anthropic:claude-opus-4-6")

Structured Output

Pydantic-ai’s structured output works naturally through the hook. Define a Pydantic model for the expected output shape, then pass it as output_type:

airflow/providers/common/ai/example_dags/example_pydantic_ai_hook.py[source]

@dag(schedule=None, tags=["example"])
def example_pydantic_ai_structured_output():
    @task
    def generate_sql(prompt: str) -> dict:
        class SQLResult(BaseModel):
            query: str
            explanation: str

        hook = PydanticAIHook(llm_conn_id="pydanticai_default")
        agent = hook.create_agent(
            output_type=SQLResult,
            instructions="Generate a SQL query and explain it.",
        )
        result = agent.run_sync(prompt)
        return result.output.model_dump()

    generate_sql("Find the top 10 customers by revenue")


Loading Agent Config from a Spec File

Instead of hard-coding model name, instructions, and settings in Python, you can store them in a YAML or JSON AgentSpec file and pass its path via spec_file. This keeps prompt engineering separate from Dag logic and lets you version-control agent configs independently.

agent_spec.yaml
model: openai:gpt-4o-mini
instructions: >
  You are a concise summarizer. Given any text, respond with a single
  paragraph that captures the key points.
model_settings:
  temperature: 0.3
retries: 2

airflow/providers/common/ai/example_dags/example_pydantic_ai_hook.py[source]

@dag(schedule=None, tags=["example"])
def example_pydantic_ai_spec_file():
    """Load agent settings from a YAML spec file instead of inline code.

    The spec file (``example_agent_spec.yaml``) declares model, instructions,
    model_settings, retries, etc. If ``model_id`` or the connection's ``model``
    extra is set, that hook model takes precedence over the file's model.
    """

    @task
    def summarize_from_spec(text: str) -> str:
        spec_path = Path(__file__).parent / "example_agent_spec.yaml"
        hook = PydanticAIHook(llm_conn_id="pydanticai_default")
        # Model, instructions, temperature, and retries all come from the YAML file.
        agent = hook.create_agent(spec_file=spec_path)
        result = agent.run_sync(text)
        return result.output

    @task
    def summarize_with_additional_instructions(text: str) -> str:
        """Add call-time instructions alongside the spec file instructions."""
        spec_path = Path(__file__).parent / "example_agent_spec.yaml"
        hook = PydanticAIHook(llm_conn_id="pydanticai_default")
        agent = hook.create_agent(
            spec_file=spec_path,
            instructions="Summarize in exactly one sentence.",
        )
        result = agent.run_sync(text)
        return result.output

    body = "Apache Airflow is an open-source platform for authoring, scheduling..."
    summarize_from_spec(body)
    summarize_with_additional_instructions(body)


The model declared in the spec file is used unless model_id or the connection’s model extra is set, in which case the hook model takes precedence. Passing instructions to create_agent when a spec_file is also given appends additional instructions to the file value.

Was this entry helpful?