Source code for airflow.providers.opensearch.operators.opensearch
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportSequencefromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,AnyfromopensearchpyimportRequestsHttpConnectionfromopensearchpy.exceptionsimportOpenSearchExceptionfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.opensearch.hooks.opensearchimportOpenSearchHookifTYPE_CHECKING:fromopensearchpyimportConnectionasOpenSearchConnectionClassfromairflow.utils.contextimportContext
[docs]classOpenSearchQueryOperator(BaseOperator):""" Run a query search against a given index on an OpenSearch cluster and returns results. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:OpenSearchQueryOperator` :param query: A Dictionary OpenSearch DSL query. :param search_object: A Search object from opensearch-dsl. :param index_name: The name of the index to search for documents. :param opensearch_conn_id: opensearch connection to use :param opensearch_conn_class: opensearch connection class to use :param log_query: Whether to log the query used. Defaults to True and logs query used. """
[docs]defhook(self)->OpenSearchHook:"""Get an instance of an OpenSearchHook."""returnOpenSearchHook(open_search_conn_id=self.opensearch_conn_id,open_search_conn_class=self.opensearch_conn_class,log_query=self.log_query,)
[docs]defexecute(self,context:Context)->Any:"""Execute a search against a given index or a Search object on an OpenSearch Cluster."""result=Noneifself.queryisnotNone:ifnotself.query.get("query"):raiseAirflowException("Query input is missing required field Query in dictionary")ifself.index_nameisNone:raiseAirflowException("Index name is required when using the query input.")try:result=self.hook.search(index_name=self.index_name,query=self.query)exceptOpenSearchExceptionase:raiseAirflowException(e)elifself.search_objectisnotNone:try:result=self.search_object.using(self.hook.client).execute()exceptOpenSearchExceptionase:raiseAirflowException(e)else:raiseAirflowException("""Input missing required input of query or search_object. Either query or search_object is required.""")returnresult
[docs]classOpenSearchCreateIndexOperator(BaseOperator):""" Create a new index on an OpenSearch cluster with a given index name. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:OpenSearchCreateIndexOperator` :param index_name: The name of the index to be created. :param index_body: A dictionary that defines index settings :param opensearch_conn_id: opensearch connection to use """def__init__(self,*,index_name:str,index_body:dict[str,Any],opensearch_conn_id:str="opensearch_default",**kwargs,)->None:super().__init__(**kwargs)self.index_name=index_nameself.index_body=index_bodyself.opensearch_conn_id=opensearch_conn_id@cached_property
[docs]defhook(self)->OpenSearchHook:"""Get an instance of an OpenSearchHook."""returnOpenSearchHook(open_search_conn_id=self.opensearch_conn_id,log_query=False)
[docs]defexecute(self,context:Context)->Any:"""Create an index on an OpenSearch cluster."""try:self.hook.client.indices.create(index=self.index_name,body=self.index_body)exceptOpenSearchExceptionase:raiseAirflowException(e)
[docs]classOpenSearchAddDocumentOperator(BaseOperator):""" Add a new document to a given Index or overwrite an existing one. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:OpenSearchAddDocumentOperator` :param index_name: The name of the index to put the document. :param document: A dictionary representation of the document. :param document_id: The id for the document in the index. :param doc_class: A Document subclassed object using opensearch-dsl :param opensearch_conn_id: opensearch connection to use """def__init__(self,*,index_name:str|None=None,document:dict[str,Any]|None=None,doc_id:int|None=None,doc_class:Any|None=None,opensearch_conn_id:str="opensearch_default",**kwargs,)->None:super().__init__(**kwargs)self.index_name=index_nameself.document=documentself.doc_id=doc_idself.doc_class=doc_classself.opensearch_conn_id=opensearch_conn_id@cached_property
[docs]defhook(self)->OpenSearchHook:"""Get an instance of an OpenSearchHook."""returnOpenSearchHook(open_search_conn_id=self.opensearch_conn_id,log_query=False)
[docs]defexecute(self,context:Context)->Any:"""Save a document to a given index on an OpenSearch cluster."""ifself.doc_classisnotNone:try:doc=self.doc_class.init(using=self.hook.client)result=doc.save(using=self.hook.client)exceptOpenSearchExceptionase:raiseAirflowException(e)elifself.index_nameisnotNoneandself.documentisnotNoneandself.doc_idisnotNone:try:result=self.hook.index(index_name=self.index_name,document=self.document,doc_id=self.doc_id)exceptOpenSearchExceptionase:raiseAirflowException(e)else:raiseAirflowException("Index name, document dictionary and doc_id or a Document subclassed object is required.")returnresult