DocumentLoaderOperator¶
Use DocumentLoaderOperator
to parse files into list[dict(text, metadata)] for downstream embedding
pipelines. The operator bridges Airflow’s connectivity layer (hooks that
produce bytes or local files) and the AI embedding layer (operators that
need structured text with metadata).
The operator is framework-agnostic – it has no dependency on LlamaIndex, LangChain, or any other AI framework.
Basic usage¶
.txt, .md, .csv, and .json are handled with zero extra
dependencies:
@dag(schedule=None, tags=["example"])
def example_document_loader_basic():
"""Parse a single local file -- the operator infers the format from the suffix."""
load_docs = DocumentLoaderOperator(
task_id="load_docs",
source_path="/opt/airflow/data/articles/sample.md",
)
@task
def count_chunks(docs: list[dict]) -> int:
return len(docs)
count_chunks(load_docs.output)
CSV files produce one document per row, with empty cells skipped. JSON files
with a top-level array produce one document per element; a single JSON object
produces one document. By default each dict is flattened into "key: value,
key: value" text so the embedding sees content tokens rather than JSON
syntax (see the json_text_field section below for the structured variant).
PDF parsing¶
Install the pdf extra to parse PDF files via
pypdf:
pip install apache-airflow-providers-common-ai[pdf]
Each page with extractable text becomes a separate document. Empty pages are
skipped. page_number is included in the document metadata.
DOCX parsing¶
Install the docx extra to parse Word documents via
python-docx:
pip install apache-airflow-providers-common-ai[docx]
All non-empty paragraphs are concatenated into a single document per file.
Note
DOCX extraction reads paragraph text only. Tables, headers, footers, and
footnotes are not included. For richer DOCX parsing, use a dedicated
extraction tool (Unstructured, docling) as a custom parser
backend.
Directory mode and filtering¶
Point source_path at a directory or pass a glob pattern (** enables
recursive matching). Combine with file_extensions to scope which files
are processed:
@dag(schedule=None, tags=["example"])
def example_document_loader_directory():
"""Walk a directory recursively, only picking up PDFs and Markdown."""
load_docs = DocumentLoaderOperator(
task_id="load_docs",
# `**` matches across subdirectories thanks to glob's recursive mode.
source_path="/opt/airflow/data/library/**/*",
file_extensions=[".pdf", ".md"],
metadata_fields={"corpus": "library_v3"},
)
@task
def summarise(docs: list[dict]) -> dict:
return {
"files": len({d["metadata"]["file_path"] for d in docs}),
"chunks": len(docs),
}
summarise(load_docs.output)
Directory-mode behavior when file_extensions is omitted:
Files whose name starts with a
.(.DS_Store, editor swap files,.gitkeep, …) are silently ignored.Files whose extension is not in the built-in dispatch map are skipped with a warning rather than crashing the operator. A glob pattern that matches an unknown extension is treated as intentional and parsed via the explicit
parserargument.
Loading from bytes¶
When upstream tasks produce file content as bytes (S3, GCS, HTTP, etc.),
pass them via source_bytes and tell the operator how to interpret them
with file_type. source_bytes is not a template field because Jinja
would render bytes as their repr text, which would break binary
parsing:
@dag(schedule=None, tags=["example"])
def example_document_loader_bytes():
"""Feed raw bytes from an upstream hook (e.g. an S3 download) into the parser."""
@task
def fetch_pdf_bytes() -> bytes:
# In real use this would be an S3Hook.read_key, a GCSHook.download_as_bytes,
# or any other byte-producing call.
return b"%PDF-1.4 ..."
load_docs = DocumentLoaderOperator(
task_id="load_docs",
source_bytes=fetch_pdf_bytes(),
file_type=".pdf",
metadata_fields={"corpus": "uploads"},
)
load_docs
PDF and DOCX bytes are parsed via an in-memory stream – no temporary files on disk.
Structured JSON ingestion¶
For arrays of records where one field is the body and the rest are metadata
(article ingestion, ticket exports, …), set json_text_field to the key
that holds the text. Every other key on the same item lands in metadata:
@dag(schedule=None, tags=["example"])
def example_document_loader_json_field():
"""Read an array of records, embedding only the ``body`` field per item.
Every other key (``title``, ``author``, ``published_at``, ...) lands in
``metadata`` so it stays available for filtering or display.
"""
load_docs = DocumentLoaderOperator(
task_id="load_docs",
source_path="/opt/airflow/data/articles.json",
json_text_field="body",
)
load_docs
For arbitrary API data (Salesforce SOQL results, database query exports),
a @task that maps fields to text and metadata is still appropriate when
the field shape is more complex than what json_text_field covers:
@task
def transform_cases(records: list[dict]) -> list[dict]:
return [
{
"text": f"{r['Subject']}\n\n{r['Description']}",
"metadata": {"case_id": r["Id"], "source": "salesforce"},
}
for r in records
]
No chunking¶
The operator parses files into documents; it does not split them into
fixed-size chunks. The right chunking strategy depends on the embedding
model and is intentionally left to a downstream text-splitter or embedding
operator (LlamaIndex’s LlamaIndexEmbeddingOperator, LangChain’s text splitters,
…).
Format coverage roadmap¶
The current built-in dispatch covers .txt, .md, .csv, .json,
.pdf, .docx. Additional formats are deferred to follow-ups, each
gated behind its own extra so users only install what they need:
.pptxviapython-pptx.epubviaebooklib.xlsxviaopenpyxl.html/.htmviabeautifulsoup4Image OCR (
.png/.jpg) viapytesseractAudio transcription via a model call (
LLMOperatororAgentOperatoris a better fit for transcription than this parser)
For anything not in the dispatch map, set parser explicitly ("text"
to read as plain text) or write the parser inline in a @task that calls
DocumentLoaderOperator with source_bytes for known formats.
Composing with downstream embedding operators¶
The output format (list[dict(text, metadata)]) is designed to feed
directly into embedding operators. With LlamaIndex’s LlamaIndexEmbeddingOperator:
load = DocumentLoaderOperator(
task_id="load",
source_path="/data/docs/*.pdf",
)
embed = LlamaIndexEmbeddingOperator(
task_id="embed",
documents="{{ ti.xcom_pull(task_ids='load') }}",
llm_conn_id="openai_default",
)
load >> embed
Cloud storage URIs¶
source_path accepts any URI that
ObjectStoragePath resolves via fsspec
(s3://, gs://, azure://, file://, …). Point it at a
single object or a directory; cross-directory globs in cloud URIs are not
supported in this version.
@dag(schedule=None, tags=["example"])
def example_document_loader_cloud_uri():
"""Read PDFs directly from S3 -- no separate download step."""
load_docs = DocumentLoaderOperator(
task_id="load_docs",
source_path="s3://my-bucket/reports/",
source_conn_id="aws_default",
file_extensions=[".pdf"],
)
load_docs
Use source_conn_id to point at the Airflow connection that holds the
cloud credentials (aws_default, google_cloud_default, …). For
single-file URIs, source_conn_id works the same way.
If you’d rather download the file with a dedicated provider operator first (e.g. to get retry semantics specific to that storage), the download-then-parse pattern still works:
from airflow.providers.amazon.aws.transfers.s3_to_local import S3ToLocalFilesystemOperator
download = S3ToLocalFilesystemOperator(
task_id="download",
bucket_name="my-bucket",
key="documents/report.pdf",
local_path="/tmp/report.pdf",
)
load = DocumentLoaderOperator(
task_id="load",
source_path="/tmp/report.pdf",
)
download >> load
Non-UTF-8 inputs¶
The text parsers (.txt / .md / .csv / .json) and the bytes
path default to UTF-8. To handle Windows-1252 CSVs, files with a leading
utf-8-sig byte-order mark, or any other encoding, set the encoding
parameter on the operator (and optionally encoding_errors="replace" to
tolerate mixed-encoding sources at the cost of some character loss). A
failed decode includes the offending file path in the error so
directory-mode runs are easy to diagnose.
Metadata precedence¶
Auto-extracted metadata keys – file_name, file_path, row_index,
item_index, page_number – take precedence over keys with the same
name in metadata_fields. metadata_fields fills gaps; it never
overwrites the auto-extracted shape.
Parameters¶
Parameter |
Description |
|---|---|
|
Local file, directory, or glob pattern, or a storage URI
( |
|
Airflow connection ID for the cloud-storage credentials used by
|
|
Raw file bytes from XCom. Requires |
|
File extension hint (e.g. |
|
Parsing backend. |
|
Filter for |
|
Extra key-value pairs merged into every document’s metadata. Does not override auto-extracted keys. |
|
Text encoding for the bytes path and |
|
How decode errors are handled ( |
|
When parsing JSON, treat this key as the embedding text; every other
key on the same item lands in |