airflow.providers.common.ai.example_dags.example_langchain_10k

SEC 10-K financial analysis – LangChain RAG with live SEC EDGAR data.

Two production-shaped Dags that demonstrate a multi-company financial research pipeline using real SEC 10-K filings fetched from the EDGAR public API: one fetches and indexes filings on a schedule, the other decomposes a comparison question at runtime and fans out retrieval via Dynamic Task Mapping.

Filings are fetched from the SEC EDGAR public API (free, no auth required) using stock ticker symbols. Any US publicly-traded company is supported – configure via the tickers Dag parameter.

This is the LangChain counterpart to example_llamaindex_10k.py. Both share the same DAG shape (decompose -> fan-out retrieval -> collect -> synthesize -> approve) and the same live SEC EDGAR data source, demonstrating that the framework choice is a swappable implementation detail while Airflow provides the orchestration.

example_langchain_10k_index (weekly schedule):

fetch_filings (@task, live from SEC EDGAR)
    -> build_index (@task, mapped x N companies)
        Uses LangChainHook + RecursiveCharacterTextSplitter + FAISS

example_langchain_10k_analysis (manual trigger):

analyst_question  (HITLEntryOperator)
    -> get_question        (@task)
    -> get_tickers         (@task)
    -> decompose_question  (@task.llm, structured output)
    -> extract_sub_questions (@task)
    -> build_retrieval_kwargs (@task)
    -> retrieve             (@task, mapped x N sub-questions)
    -> collect_results      (@task)
    -> synthesize_report    (LLMOperator, UsageLimits + structured output)
    -> format_report        (@task, readable text for reviewer)
    -> review_report        (ApprovalOperator)

What this makes visible that a notebook hides:

  • The LLM decides how many sub-questions to create – N is unknown at parse time, determined at runtime via Dynamic Task Mapping.

  • Each retrieval is an independent task instance: if one company’s index is unavailable, only that instance retries.

  • The synthesis step’s token budget (UsageLimits) and output schema (AnalysisReport) are auditable in XCom.

  • An analyst reviews the report before it reaches the investment committee.

LangChain-specific components used:

  • LangChainHook – vendor-agnostic model dispatch via init_embeddings

  • RecursiveCharacterTextSplitter – character-based chunking

  • FAISS – in-process vector store (no external server)

Pattern difference from the LlamaIndex counterpart:

The LlamaIndex example uses LlamaIndexEmbeddingOperator and LlamaIndexRetrievalOperator (dedicated operators). Here, indexing and retrieval are plain @task functions that call LangChain’s FAISS and RecursiveCharacterTextSplitter directly via LangChainHook, because LangChain does not yet have dedicated Airflow operators. The DAG shape is identical; the operator-vs-task distinction is the only structural difference.

Before running:

  1. Install LangChain packages:

    pip install langchain langchain-openai langchain-text-splitters \
                langchain-community faiss-cpu
    
  2. Create an LLM connection pydanticai_default for synthesis and decomposition, and langchain_default for embedding and retrieval.

  3. Update EDGAR_USER_AGENT with your name and email (SEC requires a descriptive User-Agent header on all EDGAR API requests).

  4. Run example_langchain_10k_index once to fetch filings and build the sample indexes.

  5. Trigger example_langchain_10k_analysis to run the query pipeline.

Attributes

LLM_CONN_ID

LANGCHAIN_CONN_ID

EMBEDDING_MODEL

INDEX_BASE_DIR

DEFAULT_TICKERS

EDGAR_USER_AGENT

DECOMPOSE_SYSTEM_PROMPT

SYNTHESIS_SYSTEM_PROMPT

DEFAULT_QUESTION

Classes

SubQuestion

One sub-question targeting a specific company.

DecomposedQuestion

LLM-produced decomposition of the analyst's question.

AnalysisReport

Structured financial comparison report.

Functions

example_langchain_10k_index()

Fetch 10-K filings from SEC EDGAR and build per-company FAISS indexes.

example_langchain_10k_analysis()

Multi-company financial comparison via LLM-driven sub-question decomposition.

Module Contents

airflow.providers.common.ai.example_dags.example_langchain_10k.LLM_CONN_ID = 'pydanticai_default'[source]
airflow.providers.common.ai.example_dags.example_langchain_10k.LANGCHAIN_CONN_ID = 'langchain_default'[source]
airflow.providers.common.ai.example_dags.example_langchain_10k.EMBEDDING_MODEL = 'openai:text-embedding-3-small'[source]
airflow.providers.common.ai.example_dags.example_langchain_10k.INDEX_BASE_DIR = '/opt/airflow/data/indexes/10k_langchain'[source]
airflow.providers.common.ai.example_dags.example_langchain_10k.DEFAULT_TICKERS = 'AAPL,MSFT,UBER,LYFT,AMZN'[source]
airflow.providers.common.ai.example_dags.example_langchain_10k.EDGAR_USER_AGENT = 'Apache Airflow Example dev@airflow.apache.org'[source]
class airflow.providers.common.ai.example_dags.example_langchain_10k.SubQuestion(/, **data)[source]

Bases: pydantic.BaseModel

One sub-question targeting a specific company.

sub_question: str[source]
ticker: str[source]
class airflow.providers.common.ai.example_dags.example_langchain_10k.DecomposedQuestion(/, **data)[source]

Bases: pydantic.BaseModel

LLM-produced decomposition of the analyst’s question.

sub_questions: list[SubQuestion][source]
class airflow.providers.common.ai.example_dags.example_langchain_10k.AnalysisReport(/, **data)[source]

Bases: pydantic.BaseModel

Structured financial comparison report.

executive_summary: str[source]
company_findings: list[dict][source]
key_risks: list[str][source]
recommendations: list[str][source]
airflow.providers.common.ai.example_dags.example_langchain_10k.DECOMPOSE_SYSTEM_PROMPT = "You are a financial research assistant. Given a comparison question and a list of companies...[source]
airflow.providers.common.ai.example_dags.example_langchain_10k.SYNTHESIS_SYSTEM_PROMPT = "You are a senior financial analyst. Given retrieval results from multiple companies' 10-K...[source]
airflow.providers.common.ai.example_dags.example_langchain_10k.DEFAULT_QUESTION = 'Compare the risk factors and revenue trends across these companies. Which company faces the...[source]
airflow.providers.common.ai.example_dags.example_langchain_10k.example_langchain_10k_index()[source]

Fetch 10-K filings from SEC EDGAR and build per-company FAISS indexes.

Runs weekly to refresh indexes when new filings arrive. Each company gets its own persisted FAISS index via Dynamic Task Mapping.

Task graph:

fetch_filings (@task, live from SEC EDGAR)
    -> build_index (@task x N companies, LangChain FAISS)
airflow.providers.common.ai.example_dags.example_langchain_10k.example_langchain_10k_analysis()[source]

Multi-company financial comparison via LLM-driven sub-question decomposition.

An analyst submits a comparison question. The LLM decomposes it into company-specific sub-questions (N decided at runtime), each sub-question retrieves from the appropriate company’s FAISS index in parallel via Dynamic Task Mapping, and the results are synthesized into a structured report for human review.

Task graph:

analyst_input     (HITLEntryOperator, tickers + question)
    -> get_question        (@task)
    -> get_tickers         (@task)
    -> decompose_question  (@task.llm, structured output)
    -> extract_sub_questions (@task)
    -> build_retrieval_kwargs (@task)
    -> retrieve             (@task x N, LangChain FAISS)
    -> collect_results      (@task)
    -> synthesize_report    (LLMOperator, UsageLimits + AnalysisReport)
    -> format_report        (@task, readable text for reviewer)
    -> review_report        (ApprovalOperator)

Was this entry helpful?