LlamaIndex LlamaIndexEmbeddingOperator¶
Chunk a list[dict] of documents and produce embedding vectors using
LlamaIndex. Designed to feed the output of
DocumentLoaderOperator
into vector storage (pgvector, Pinecone, Weaviate, …).
The operator passes the embedding model directly to
VectorStoreIndex(..., embed_model=...) – it does not mutate
LlamaIndex’s global Settings singleton, so concurrent tasks in the same
worker process don’t race on shared model state.
Basic usage¶
@dag(schedule=None, tags=["example"])
def example_llamaindex_embed():
"""Chunk + embed a directory of documents and persist the index locally."""
load = DocumentLoaderOperator(
task_id="load",
source_path="/opt/airflow/data/library/**/*",
file_extensions=[".pdf", ".md", ".txt"],
)
embed = LlamaIndexEmbeddingOperator(
task_id="embed",
documents=load.output, # XCom direct -- never via Jinja (list[dict])
embed_model="text-embedding-3-small",
llm_conn_id="llamaindex_default",
chunk_size=512,
chunk_overlap=50,
persist_dir="/opt/airflow/data/library_index",
)
load >> embed
documents is templated, so loader.output (XCom direct) is resolved
to a native list[dict] before execute runs.
Bring-your-own embedding model¶
LlamaIndex doesn’t ship a universal embedding-model initializer, so the
operator’s embed_model parameter accepts either:
a string model name (e.g.
"text-embedding-3-small") – the operator constructs anOpenAIEmbeddingviaLlamaIndexHookusingllm_conn_id/embed_conn_id, ora pre-built
BaseEmbeddinginstance – bypass the hook entirely. Use this for Cohere, Bedrock, Vertex, HuggingFace, etc.:
@dag(schedule=None, tags=["example"])
def example_llamaindex_byo_embed_model():
"""Use a non-OpenAI embedding by instantiating the LlamaIndex class directly.
LlamaIndex doesn't ship a universal init helper, so the operator accepts
a pre-built ``BaseEmbedding`` instance and bypasses the hook entirely.
Install the matching extra:
``pip install llama-index-embeddings-cohere``.
"""
@task
def build_cohere_embedder():
from llama_index.embeddings.cohere import CohereEmbedding
from airflow.providers.common.compat.sdk import BaseHook
conn = BaseHook.get_connection("cohere_default")
return CohereEmbedding(model_name="embed-english-v3.0", cohere_api_key=conn.password)
@task
def empty_doc_list() -> list[dict]:
return [{"text": "Cohere demo content", "metadata": {}}]
embed = LlamaIndexEmbeddingOperator(
task_id="embed",
documents=empty_doc_list(),
embed_model=build_cohere_embedder(),
persist_dir="/opt/airflow/data/cohere_index",
)
embed
Persisting to cloud storage¶
persist_dir accepts local paths and storage URIs (s3://, gs://,
azure://, file://) resolved via
ObjectStoragePath. Pass persist_conn_id to
point at the Airflow connection that holds the cloud credentials:
@dag(schedule=None, tags=["example"])
def example_llamaindex_cloud_persist():
"""Persist the index directly to S3 -- no separate upload step."""
load = DocumentLoaderOperator(
task_id="load",
source_path="s3://my-bucket/library/",
source_conn_id="aws_default",
file_extensions=[".pdf"],
)
embed = LlamaIndexEmbeddingOperator(
task_id="embed",
documents=load.output,
embed_model="text-embedding-3-small",
llm_conn_id="llamaindex_default",
persist_dir="s3://my-bucket/indexes/library/",
persist_conn_id="aws_default",
)
load >> embed
Parameters¶
Parameter |
Description |
|---|---|
|
|
|
String model name OR pre-built |
|
Airflow connection ID used when |
|
Optional separate connection ID for the embedding provider. Falls
back to |
|
Sentence-splitter chunk size (default 512). |
|
Overlap between chunks (default 50). |
|
Local path or storage URI to persist the LlamaIndex index. |
|
Cloud credentials connection ID for |
Output¶
Returns a dict with:
{
"document_count": int,
"chunk_count": int,
"persist_dir": str | None,
"chunks": [
{"text": str, "metadata": dict, "vector": list[float]},
...
],
}