airflow.providers.common.ai.example_dags.example_llamaindex_10k¶
SEC 10-K financial analysis – LlamaIndex RAG with live SEC EDGAR data.
Two production-shaped Dags that demonstrate a multi-company financial research pipeline using real SEC 10-K filings fetched from the EDGAR public API: one fetches and indexes filings on a schedule, the other decomposes a comparison question at runtime and fans out retrieval via Dynamic Task Mapping.
Filings are fetched from the SEC EDGAR public API (free, no auth
required) using stock ticker symbols. Any US publicly-traded company
is supported – configure via the tickers Dag parameter.
example_llamaindex_10k_index (weekly schedule):
fetch_filings (@task, live from SEC EDGAR)
-> build_index (LlamaIndexEmbeddingOperator, mapped x N companies)
example_llamaindex_10k_analysis (manual trigger):
analyst_question (HITLEntryOperator)
-> get_question (@task)
-> get_tickers (@task)
-> decompose_question (@task.llm, structured output)
-> extract_sub_questions (@task)
-> build_retrieval_kwargs (@task)
-> retrieve (LlamaIndexRetrievalOperator x N, DTM)
-> collect_results (@task)
-> synthesize_report (LLMOperator, UsageLimits + structured output)
-> format_report (@task, readable text for reviewer)
-> review_report (ApprovalOperator)
What this makes visible that a notebook hides:
The LLM decides how many sub-questions to create – N is unknown at parse time, determined at runtime via Dynamic Task Mapping.
Each retrieval is an independent task instance: if one company’s index is unavailable, only that instance retries.
The synthesis step’s token budget (
UsageLimits) and output schema (AnalysisReport) are auditable in XCom.An analyst reviews the report before it reaches the investment committee.
Before running:
Create an LLM connection
pydanticai_defaultfor synthesis and decomposition, andllamaindex_defaultfor embedding and retrieval.Update
EDGAR_USER_AGENTwith your name and email (SEC requires a descriptive User-Agent header on all EDGAR API requests).Run
example_llamaindex_10k_indexonce to fetch filings and build the vector indexes.Trigger
example_llamaindex_10k_analysisto run the query pipeline.
Attributes¶
Classes¶
One sub-question targeting a specific company. |
|
LLM-produced decomposition of the analyst's question. |
|
Structured financial comparison report. |
Functions¶
Fetch 10-K filings from SEC EDGAR and build per-company vector indexes. |
|
Multi-company financial comparison via LLM-driven sub-question decomposition. |
Module Contents¶
- airflow.providers.common.ai.example_dags.example_llamaindex_10k.LLM_CONN_ID = 'pydanticai_default'[source]¶
- airflow.providers.common.ai.example_dags.example_llamaindex_10k.LLAMAINDEX_CONN_ID = 'llamaindex_default'[source]¶
- airflow.providers.common.ai.example_dags.example_llamaindex_10k.INDEX_BASE_DIR = '/opt/airflow/data/indexes/10k'[source]¶
- airflow.providers.common.ai.example_dags.example_llamaindex_10k.DEFAULT_TICKERS = 'AAPL,MSFT,UBER,LYFT,AMZN'[source]¶
- airflow.providers.common.ai.example_dags.example_llamaindex_10k.EDGAR_USER_AGENT = 'Apache Airflow Example dev@airflow.apache.org'[source]¶
- class airflow.providers.common.ai.example_dags.example_llamaindex_10k.SubQuestion(/, **data)[source]¶
Bases:
pydantic.BaseModelOne sub-question targeting a specific company.
- class airflow.providers.common.ai.example_dags.example_llamaindex_10k.DecomposedQuestion(/, **data)[source]¶
Bases:
pydantic.BaseModelLLM-produced decomposition of the analyst’s question.
- sub_questions: list[SubQuestion][source]¶
- class airflow.providers.common.ai.example_dags.example_llamaindex_10k.AnalysisReport(/, **data)[source]¶
Bases:
pydantic.BaseModelStructured financial comparison report.
- airflow.providers.common.ai.example_dags.example_llamaindex_10k.DECOMPOSE_SYSTEM_PROMPT = "You are a financial research assistant. Given a comparison question and a list of companies...[source]¶
- airflow.providers.common.ai.example_dags.example_llamaindex_10k.SYNTHESIS_SYSTEM_PROMPT = "You are a senior financial analyst. Given retrieval results from multiple companies' 10-K...[source]¶
- airflow.providers.common.ai.example_dags.example_llamaindex_10k.DEFAULT_QUESTION = 'Compare the risk factors and revenue trends across these companies. Which company faces the...[source]¶
- airflow.providers.common.ai.example_dags.example_llamaindex_10k.example_llamaindex_10k_index()[source]¶
Fetch 10-K filings from SEC EDGAR and build per-company vector indexes.
Runs weekly to refresh indexes when new filings arrive. Each company gets its own persisted index via Dynamic Task Mapping.
Task graph:
fetch_filings (@task, live from SEC EDGAR) -> build_index (LlamaIndexEmbeddingOperator x N companies)
- airflow.providers.common.ai.example_dags.example_llamaindex_10k.example_llamaindex_10k_analysis()[source]¶
Multi-company financial comparison via LLM-driven sub-question decomposition.
An analyst submits a comparison question. The LLM decomposes it into company-specific sub-questions (N decided at runtime), each sub-question retrieves from the appropriate company’s vector index in parallel via Dynamic Task Mapping, and the results are synthesized into a structured report for human review.
Task graph:
analyst_input (HITLEntryOperator, tickers + question) -> get_question (@task) -> get_tickers (@task) -> decompose_question (@task.llm, structured output) -> extract_sub_questions (@task) -> build_retrieval_kwargs (@task) -> retrieve (LlamaIndexRetrievalOperator x N, DTM) -> collect_results (@task) -> synthesize_report (LLMOperator, UsageLimits + AnalysisReport) -> format_report (@task, readable text for reviewer) -> review_report (ApprovalOperator)