airflow.providers.common.ai.operators.llm_schema_compare

Operator for cross-system schema drift detection powered by LLM reasoning.

Attributes

DEFAULT_SYSTEM_PROMPT

Classes

SchemaMismatch

A single schema mismatch between data sources.

SchemaCompareResult

Structured output from schema comparison.

LLMSchemaCompareOperator

Compare schemas across different database systems and detect drift using LLM reasoning.

Module Contents

class airflow.providers.common.ai.operators.llm_schema_compare.SchemaMismatch(/, **data)[source]

Bases: pydantic.BaseModel

A single schema mismatch between data sources.

source: str = None[source]
target: str = None[source]
column: str = None[source]
source_type: str = None[source]
target_type: str = None[source]
severity: Literal['critical', 'warning', 'info'] = None[source]
description: str = None[source]
suggested_action: str = None[source]
migration_query: str = None[source]
class airflow.providers.common.ai.operators.llm_schema_compare.SchemaCompareResult(/, **data)[source]

Bases: pydantic.BaseModel

Structured output from schema comparison.

compatible: bool = None[source]
mismatches: list[SchemaMismatch] = None[source]
summary: str = None[source]
airflow.providers.common.ai.operators.llm_schema_compare.DEFAULT_SYSTEM_PROMPT = Multiline-String[source]
Show Value
"""Consider cross-system type equivalences:
- varchar(n) / text / string / TEXT may be compatible
- int / integer / int4 / INT32 are equivalent
- bigint / int8 / int64 / BIGINT are equivalent
- timestamp / timestamptz / TIMESTAMP_NTZ / datetime may differ in timezone handling
- numeric(p,s) / decimal(p,s) / NUMBER — check precision and scale
- boolean / bool / BOOLEAN / tinyint(1) — check semantic equivalence

Severity levels:
- critical: Will cause data loading failures or data loss (e.g., column missing in target, incompatible types)
- warning: May cause data quality issues (e.g., precision loss, timezone mismatch)
- info: Cosmetic differences that won't affect data loading (e.g., varchar length differences within safe range)

"""
class airflow.providers.common.ai.operators.llm_schema_compare.LLMSchemaCompareOperator(*, data_sources=None, db_conn_ids=None, table_names=None, context_strategy='full', system_prompt=DEFAULT_SYSTEM_PROMPT, **kwargs)[source]

Bases: airflow.providers.common.ai.operators.llm.LLMOperator

Compare schemas across different database systems and detect drift using LLM reasoning.

The LLM handles complex cross-system type mapping that simple equality checks miss (e.g., varchar(255) vs string, timestamp vs timestamptz).

Accepts data sources via two patterns:

  1. data_sources — a list of DataSourceConfig for each system. If the connection resolves to a DbApiHook, schema is introspected via SQLAlchemy; otherwise DataFusion is used.

  2. db_conn_ids + table_names — shorthand for comparing the same table across multiple database connections (all must resolve to DbApiHook).

Parameters:
  • prompt – Instructions for the LLM on what to compare and flag.

  • llm_conn_id – Connection ID for the LLM provider.

  • model_id – Model identifier (e.g. "openai:gpt-5").

  • system_prompt (str) – Instructions included in the LLM system prompt. Defaults to DEFAULT_SYSTEM_PROMPT which contains cross-system type equivalences and severity definitions. Passing a value replaces the default system prompt

  • agent_params – Extra keyword arguments for the pydantic-ai Agent.

  • data_sources (list[airflow.providers.common.sql.config.DataSourceConfig] | None) – List of DataSourceConfig objects, one per system.

  • db_conn_ids (list[str] | None) – Connection IDs for databases to compare (used with table_names).

  • table_names (list[str] | None) – Tables to introspect from each db_conn_id.

  • context_strategy (Literal['basic', 'full']) – "basic" for column names and types only; "full" to include primary keys, foreign keys, and indexes. Default "full".

template_fields: collections.abc.Sequence[str] = ('prompt', 'llm_conn_id', 'model_id', 'system_prompt', 'agent_params', 'data_sources',...[source]
data_sources = [][source]
db_conn_ids = [][source]
table_names = [][source]
context_strategy = 'full'[source]
system_prompt = Multiline-String[source]
Show Value
"""Consider cross-system type equivalences:
- varchar(n) / text / string / TEXT may be compatible
- int / integer / int4 / INT32 are equivalent
- bigint / int8 / int64 / BIGINT are equivalent
- timestamp / timestamptz / TIMESTAMP_NTZ / datetime may differ in timezone handling
- numeric(p,s) / decimal(p,s) / NUMBER — check precision and scale
- boolean / bool / BOOLEAN / tinyint(1) — check semantic equivalence

Severity levels:
- critical: Will cause data loading failures or data loss (e.g., column missing in target, incompatible types)
- warning: May cause data quality issues (e.g., precision loss, timezone mismatch)
- info: Cosmetic differences that won't affect data loading (e.g., varchar length differences within safe range)

"""
execute(context)[source]

Derive when creating an operator.

The main method to execute the task. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Was this entry helpful?