OpenSearch¶
Operators¶
Create an Index in OpenSearch¶
Use OpenSearchCreateIndexOperator
to create a new index in an OpenSearch domain.
create_index = OpenSearchCreateIndexOperator(
task_id="create_index",
index_name=INDEX_NAME,
index_body={"settings": {"index": {"number_of_shards": 1}}},
)
Add a Document to an Index on OpenSearch¶
Use OpenSearchAddDocumentOperator
to add single documents to an OpenSearch Index
add_document_by_args = OpenSearchAddDocumentOperator(
task_id="add_document_with_args",
index_name=INDEX_NAME,
doc_id=1,
document={"log_group_id": 1, "logger": "python", "message": "hello world"},
)
add_document_by_class = OpenSearchAddDocumentOperator(
task_id="add_document_by_class",
doc_class=LogDocument(log_group_id=2, logger="airflow", message="hello airflow"),
)
Run a query against an OpenSearch Index¶
Use OpenSearchQueryOperator
to run a query against an OpenSearch index.
search_low_level = OpenSearchQueryOperator(
task_id="low_level_query",
index_name="system_test",
query={"query": {"bool": {"must": {"match": {"message": "hello world"}}}}},
)
search = Search()
search._index = [INDEX_NAME]
search_object = search.filter("term", logger="airflow").query("match", message="hello airflow")
search_high_level = OpenSearchQueryOperator(task_id="high_level_query", search_object=search_object)