airflow.providers.common.ai.example_dags.example_llamaindex_10k

SEC 10-K financial analysis – LlamaIndex RAG with live SEC EDGAR data.

Two production-shaped Dags that demonstrate a multi-company financial research pipeline using real SEC 10-K filings fetched from the EDGAR public API: one fetches and indexes filings on a schedule, the other decomposes a comparison question at runtime and fans out retrieval via Dynamic Task Mapping.

Filings are fetched from the SEC EDGAR public API (free, no auth required) using stock ticker symbols. Any US publicly-traded company is supported – configure via the tickers Dag parameter.

example_llamaindex_10k_index (weekly schedule):

fetch_filings (@task, live from SEC EDGAR)
    -> build_index (LlamaIndexEmbeddingOperator, mapped x N companies)

example_llamaindex_10k_analysis (manual trigger):

analyst_question  (HITLEntryOperator)
    -> get_question        (@task)
    -> get_tickers         (@task)
    -> decompose_question  (@task.llm, structured output)
    -> extract_sub_questions (@task)
    -> build_retrieval_kwargs (@task)
    -> retrieve             (LlamaIndexRetrievalOperator x N, DTM)
    -> collect_results      (@task)
    -> synthesize_report    (LLMOperator, UsageLimits + structured output)
    -> format_report        (@task, readable text for reviewer)
    -> review_report        (ApprovalOperator)

What this makes visible that a notebook hides:

  • The LLM decides how many sub-questions to create – N is unknown at parse time, determined at runtime via Dynamic Task Mapping.

  • Each retrieval is an independent task instance: if one company’s index is unavailable, only that instance retries.

  • The synthesis step’s token budget (UsageLimits) and output schema (AnalysisReport) are auditable in XCom.

  • An analyst reviews the report before it reaches the investment committee.

Before running:

  1. Create an LLM connection pydanticai_default for synthesis and decomposition, and llamaindex_default for embedding and retrieval.

  2. Update EDGAR_USER_AGENT with your name and email (SEC requires a descriptive User-Agent header on all EDGAR API requests).

  3. Run example_llamaindex_10k_index once to fetch filings and build the vector indexes.

  4. Trigger example_llamaindex_10k_analysis to run the query pipeline.

Attributes

LLM_CONN_ID

LLAMAINDEX_CONN_ID

INDEX_BASE_DIR

DEFAULT_TICKERS

EDGAR_USER_AGENT

DECOMPOSE_SYSTEM_PROMPT

SYNTHESIS_SYSTEM_PROMPT

DEFAULT_QUESTION

Classes

SubQuestion

One sub-question targeting a specific company.

DecomposedQuestion

LLM-produced decomposition of the analyst's question.

AnalysisReport

Structured financial comparison report.

Functions

example_llamaindex_10k_index()

Fetch 10-K filings from SEC EDGAR and build per-company vector indexes.

example_llamaindex_10k_analysis()

Multi-company financial comparison via LLM-driven sub-question decomposition.

Module Contents

airflow.providers.common.ai.example_dags.example_llamaindex_10k.LLM_CONN_ID = 'pydanticai_default'[source]
airflow.providers.common.ai.example_dags.example_llamaindex_10k.LLAMAINDEX_CONN_ID = 'llamaindex_default'[source]
airflow.providers.common.ai.example_dags.example_llamaindex_10k.INDEX_BASE_DIR = '/opt/airflow/data/indexes/10k'[source]
airflow.providers.common.ai.example_dags.example_llamaindex_10k.DEFAULT_TICKERS = 'AAPL,MSFT,UBER,LYFT,AMZN'[source]
airflow.providers.common.ai.example_dags.example_llamaindex_10k.EDGAR_USER_AGENT = 'Apache Airflow Example dev@airflow.apache.org'[source]
class airflow.providers.common.ai.example_dags.example_llamaindex_10k.SubQuestion(/, **data)[source]

Bases: pydantic.BaseModel

One sub-question targeting a specific company.

sub_question: str[source]
ticker: str[source]
class airflow.providers.common.ai.example_dags.example_llamaindex_10k.DecomposedQuestion(/, **data)[source]

Bases: pydantic.BaseModel

LLM-produced decomposition of the analyst’s question.

sub_questions: list[SubQuestion][source]
class airflow.providers.common.ai.example_dags.example_llamaindex_10k.AnalysisReport(/, **data)[source]

Bases: pydantic.BaseModel

Structured financial comparison report.

executive_summary: str[source]
company_findings: list[dict][source]
key_risks: list[str][source]
recommendations: list[str][source]
airflow.providers.common.ai.example_dags.example_llamaindex_10k.DECOMPOSE_SYSTEM_PROMPT = "You are a financial research assistant. Given a comparison question and a list of companies...[source]
airflow.providers.common.ai.example_dags.example_llamaindex_10k.SYNTHESIS_SYSTEM_PROMPT = "You are a senior financial analyst. Given retrieval results from multiple companies' 10-K...[source]
airflow.providers.common.ai.example_dags.example_llamaindex_10k.DEFAULT_QUESTION = 'Compare the risk factors and revenue trends across these companies. Which company faces the...[source]
airflow.providers.common.ai.example_dags.example_llamaindex_10k.example_llamaindex_10k_index()[source]

Fetch 10-K filings from SEC EDGAR and build per-company vector indexes.

Runs weekly to refresh indexes when new filings arrive. Each company gets its own persisted index via Dynamic Task Mapping.

Task graph:

fetch_filings (@task, live from SEC EDGAR)
    -> build_index (LlamaIndexEmbeddingOperator x N companies)
airflow.providers.common.ai.example_dags.example_llamaindex_10k.example_llamaindex_10k_analysis()[source]

Multi-company financial comparison via LLM-driven sub-question decomposition.

An analyst submits a comparison question. The LLM decomposes it into company-specific sub-questions (N decided at runtime), each sub-question retrieves from the appropriate company’s vector index in parallel via Dynamic Task Mapping, and the results are synthesized into a structured report for human review.

Task graph:

analyst_input     (HITLEntryOperator, tickers + question)
    -> get_question        (@task)
    -> get_tickers         (@task)
    -> decompose_question  (@task.llm, structured output)
    -> extract_sub_questions (@task)
    -> build_retrieval_kwargs (@task)
    -> retrieve             (LlamaIndexRetrievalOperator x N, DTM)
    -> collect_results      (@task)
    -> synthesize_report    (LLMOperator, UsageLimits + AnalysisReport)
    -> format_report        (@task, readable text for reviewer)
    -> review_report        (ApprovalOperator)

Was this entry helpful?