airflow.providers.common.ai.example_dags.example_langchain_10k¶
SEC 10-K financial analysis – LangChain RAG with live SEC EDGAR data.
Two production-shaped Dags that demonstrate a multi-company financial research pipeline using real SEC 10-K filings fetched from the EDGAR public API: one fetches and indexes filings on a schedule, the other decomposes a comparison question at runtime and fans out retrieval via Dynamic Task Mapping.
Filings are fetched from the SEC EDGAR public API (free, no auth
required) using stock ticker symbols. Any US publicly-traded company
is supported – configure via the tickers Dag parameter.
This is the LangChain counterpart to example_llamaindex_10k.py.
Both share the same DAG shape (decompose -> fan-out retrieval -> collect
-> synthesize -> approve) and the same live SEC EDGAR data source,
demonstrating that the framework choice is a swappable implementation
detail while Airflow provides the orchestration.
example_langchain_10k_index (weekly schedule):
fetch_filings (@task, live from SEC EDGAR)
-> build_index (@task, mapped x N companies)
Uses LangChainHook + RecursiveCharacterTextSplitter + FAISS
example_langchain_10k_analysis (manual trigger):
analyst_question (HITLEntryOperator)
-> get_question (@task)
-> get_tickers (@task)
-> decompose_question (@task.llm, structured output)
-> extract_sub_questions (@task)
-> build_retrieval_kwargs (@task)
-> retrieve (@task, mapped x N sub-questions)
-> collect_results (@task)
-> synthesize_report (LLMOperator, UsageLimits + structured output)
-> format_report (@task, readable text for reviewer)
-> review_report (ApprovalOperator)
What this makes visible that a notebook hides:
The LLM decides how many sub-questions to create – N is unknown at parse time, determined at runtime via Dynamic Task Mapping.
Each retrieval is an independent task instance: if one company’s index is unavailable, only that instance retries.
The synthesis step’s token budget (
UsageLimits) and output schema (AnalysisReport) are auditable in XCom.An analyst reviews the report before it reaches the investment committee.
LangChain-specific components used:
LangChainHook– vendor-agnostic model dispatch viainit_embeddingsRecursiveCharacterTextSplitter– character-based chunkingFAISS– in-process vector store (no external server)
Pattern difference from the LlamaIndex counterpart:
The LlamaIndex example uses LlamaIndexEmbeddingOperator and
LlamaIndexRetrievalOperator (dedicated operators). Here, indexing
and retrieval are plain @task functions that call LangChain’s FAISS
and RecursiveCharacterTextSplitter directly via LangChainHook,
because LangChain does not yet have dedicated Airflow operators. The
DAG shape is identical; the operator-vs-task distinction is the only
structural difference.
Before running:
Install LangChain packages:
pip install langchain langchain-openai langchain-text-splitters \ langchain-community faiss-cpu
Create an LLM connection
pydanticai_defaultfor synthesis and decomposition, andlangchain_defaultfor embedding and retrieval.Update
EDGAR_USER_AGENTwith your name and email (SEC requires a descriptive User-Agent header on all EDGAR API requests).Run
example_langchain_10k_indexonce to fetch filings and build the sample indexes.Trigger
example_langchain_10k_analysisto run the query pipeline.
Attributes¶
Classes¶
One sub-question targeting a specific company. |
|
LLM-produced decomposition of the analyst's question. |
|
Structured financial comparison report. |
Functions¶
Fetch 10-K filings from SEC EDGAR and build per-company FAISS indexes. |
|
Multi-company financial comparison via LLM-driven sub-question decomposition. |
Module Contents¶
- airflow.providers.common.ai.example_dags.example_langchain_10k.LLM_CONN_ID = 'pydanticai_default'[source]¶
- airflow.providers.common.ai.example_dags.example_langchain_10k.LANGCHAIN_CONN_ID = 'langchain_default'[source]¶
- airflow.providers.common.ai.example_dags.example_langchain_10k.EMBEDDING_MODEL = 'openai:text-embedding-3-small'[source]¶
- airflow.providers.common.ai.example_dags.example_langchain_10k.INDEX_BASE_DIR = '/opt/airflow/data/indexes/10k_langchain'[source]¶
- airflow.providers.common.ai.example_dags.example_langchain_10k.DEFAULT_TICKERS = 'AAPL,MSFT,UBER,LYFT,AMZN'[source]¶
- airflow.providers.common.ai.example_dags.example_langchain_10k.EDGAR_USER_AGENT = 'Apache Airflow Example dev@airflow.apache.org'[source]¶
- class airflow.providers.common.ai.example_dags.example_langchain_10k.SubQuestion(/, **data)[source]¶
Bases:
pydantic.BaseModelOne sub-question targeting a specific company.
- class airflow.providers.common.ai.example_dags.example_langchain_10k.DecomposedQuestion(/, **data)[source]¶
Bases:
pydantic.BaseModelLLM-produced decomposition of the analyst’s question.
- sub_questions: list[SubQuestion][source]¶
- class airflow.providers.common.ai.example_dags.example_langchain_10k.AnalysisReport(/, **data)[source]¶
Bases:
pydantic.BaseModelStructured financial comparison report.
- airflow.providers.common.ai.example_dags.example_langchain_10k.DECOMPOSE_SYSTEM_PROMPT = "You are a financial research assistant. Given a comparison question and a list of companies...[source]¶
- airflow.providers.common.ai.example_dags.example_langchain_10k.SYNTHESIS_SYSTEM_PROMPT = "You are a senior financial analyst. Given retrieval results from multiple companies' 10-K...[source]¶
- airflow.providers.common.ai.example_dags.example_langchain_10k.DEFAULT_QUESTION = 'Compare the risk factors and revenue trends across these companies. Which company faces the...[source]¶
- airflow.providers.common.ai.example_dags.example_langchain_10k.example_langchain_10k_index()[source]¶
Fetch 10-K filings from SEC EDGAR and build per-company FAISS indexes.
Runs weekly to refresh indexes when new filings arrive. Each company gets its own persisted FAISS index via Dynamic Task Mapping.
Task graph:
fetch_filings (@task, live from SEC EDGAR) -> build_index (@task x N companies, LangChain FAISS)
- airflow.providers.common.ai.example_dags.example_langchain_10k.example_langchain_10k_analysis()[source]¶
Multi-company financial comparison via LLM-driven sub-question decomposition.
An analyst submits a comparison question. The LLM decomposes it into company-specific sub-questions (N decided at runtime), each sub-question retrieves from the appropriate company’s FAISS index in parallel via Dynamic Task Mapping, and the results are synthesized into a structured report for human review.
Task graph:
analyst_input (HITLEntryOperator, tickers + question) -> get_question (@task) -> get_tickers (@task) -> decompose_question (@task.llm, structured output) -> extract_sub_questions (@task) -> build_retrieval_kwargs (@task) -> retrieve (@task x N, LangChain FAISS) -> collect_results (@task) -> synthesize_report (LLMOperator, UsageLimits + AnalysisReport) -> format_report (@task, readable text for reviewer) -> review_report (ApprovalOperator)