airflow.providers.common.ai.example_dags.example_llm_survey_agentic¶
Multi-query synthesis – an agentic survey analysis pattern.
Demonstrates how Dynamic Task Mapping turns a multi-dimensional research question into a fan-out / fan-in pipeline that is observable, retryable, and auditable at each step.
Question: “What does a typical Airflow deployment look like for practitioners who actively use AI tools in their workflow?”
This question cannot be answered with a single SQL query. It requires querying four independent dimensions – executor type, deployment method, cloud provider, and Airflow version – all filtered to respondents who use AI tools to write Airflow code. The results are then synthesized by a second LLM call into a single narrative characterization.
example_llm_survey_agentic (manual trigger):
decompose_question (@task)
→ generate_sql (LLMSQLQueryOperator, mapped ×4)
→ wrap_query (@task, mapped ×4)
→ run_query (AnalyticsOperator, mapped ×4)
→ collect_results (@task)
→ synthesize_answer (LLMOperator)
→ result_confirmation (ApprovalOperator)
What this makes visible that an agent harness hides:
Each sub-query is a named, logged task instance – not a hidden tool call.
If the cloud-provider query fails, only that mapped instance retries; the other three results are preserved in XCom.
The synthesis step’s inputs are fully auditable XCom values – not an opaque continuation of an LLM reasoning loop.
Before running:
Create an LLM connection named
pydanticai_default(or the value ofLLM_CONN_ID) for your chosen model provider.Place the cleaned survey CSV at the path set by
SURVEY_CSV_PATH.
Attributes¶
Functions¶
Fan-out across four survey dimensions, then synthesize into a single narrative. |
Module Contents¶
- airflow.providers.common.ai.example_dags.example_llm_survey_agentic.LLM_CONN_ID = 'pydanticai_default'[source]¶
- airflow.providers.common.ai.example_dags.example_llm_survey_agentic.SURVEY_SCHEMA = Multiline-String[source]¶
Show Value
""" Table: survey Key columns (quote all names in SQL): "How important is Airflow to your business?" TEXT "Which version of Airflow do you currently use?" TEXT "CeleryExecutor" TEXT "KubernetesExecutor" TEXT "LocalExecutor" TEXT "How do you deploy Airflow?" TEXT "What best describes your current occupation?" TEXT "What industry do you currently work in?" TEXT "How many years of experience do you have with Airflow?" TEXT "Which of the following is your company's primary cloud provider for Airflow?" TEXT "How many people work at your company?" TEXT "How many people at your company directly work on data?" TEXT "How many people at your company use Airflow?" TEXT "How likely are you to recommend Apache Airflow?" TEXT "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?" TEXT """
- airflow.providers.common.ai.example_dags.example_llm_survey_agentic.DIMENSION_KEYS = ['executor', 'deployment', 'cloud', 'airflow_version'][source]¶
- airflow.providers.common.ai.example_dags.example_llm_survey_agentic.SQL_SYSTEM_PROMPT = Multiline-String[source]¶
Show Value
"""You are a SQL analysis agent working on the table "survey". Always quote all table and column names with double quotes. For the AI usage filter column "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?": - Treat affirmative free text (yes, sometimes, occasionally, rarely, often, regularly) as AI users. - Treat explicit negatives (no, never) as non-users. - Exclude blank, NULL, and ambiguous responses from the filtered set."""
- airflow.providers.common.ai.example_dags.example_llm_survey_agentic.SYNTHESIS_SYSTEM_PROMPT = Multiline-String[source]¶
Show Value
"""You are a data analyst summarizing survey results about Apache Airflow practitioners. Write in plain, concise language suitable for a technical audience. Focus on patterns and proportions rather than raw counts."""
- airflow.providers.common.ai.example_dags.example_llm_survey_agentic.example_llm_survey_agentic()[source]¶
Fan-out across four survey dimensions, then synthesize into a single narrative.
Task graph:
decompose_question (@task) → generate_sql (LLMSQLQueryOperator ×4, via Dynamic Task Mapping) → wrap_query (@task ×4) → run_query (AnalyticsOperator ×4, via Dynamic Task Mapping) → collect_results (@task) → synthesize_answer (LLMOperator) → result_confirmation (ApprovalOperator)