Source code for airflow.providers.qdrant.hooks.qdrant
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromfunctoolsimportcached_propertyfromtypingimportAnyfromgrpcimportRpcErrorfromqdrant_clientimportQdrantClientfromqdrant_client.http.exceptionsimportUnexpectedResponsefromairflow.hooks.baseimportBaseHook
[docs]classQdrantHook(BaseHook):""" Hook for interfacing with a Qdrant instance. :param conn_id: The connection id to use when connecting to Qdrant. Defaults to `qdrant_default`. """
[docs]defget_connection_form_widgets(cls)->dict[str,Any]:"""Return connection widgets to add to connection form."""fromflask_appbuilder.fieldwidgetsimportBS3TextFieldWidgetfromflask_babelimportlazy_gettextfromwtformsimportBooleanField,IntegerField,StringFieldreturn{"url":StringField(lazy_gettext("URL"),widget=BS3TextFieldWidget(),description="Optional. Qualified URL of the Qdrant instance.""Example: https://xyz-example.eu-central.aws.cloud.qdrant.io:6333",),"grpc_port":IntegerField(lazy_gettext("GPRC Port"),widget=BS3TextFieldWidget(),description="Optional. Port of the gRPC interface.",default=6334,),"prefer_gprc":BooleanField(lazy_gettext("Prefer GRPC"),widget=BS3TextFieldWidget(),description="Optional. Whether to use gPRC interface whenever possible in custom methods.",default=False,),"https":BooleanField(lazy_gettext("HTTPS"),widget=BS3TextFieldWidget(),description="Optional. Whether to use HTTPS(SSL) protocol.",),"prefix":StringField(lazy_gettext("Prefix"),widget=BS3TextFieldWidget(),description="Optional. Prefix to the REST URL path.""Example: `service/v1` will result in http://localhost:6333/service/v1/{qdrant-endpoint} for REST API.",),}
@classmethod
[docs]defget_ui_field_behaviour(cls)->dict[str,Any]:"""Return custom field behaviour."""return{"hidden_fields":["schema","login","extra"],"relabeling":{"password":"API Key"},}
[docs]defget_conn(self)->QdrantClient:"""Get a Qdrant client instance for interfacing with the database."""connection=self.get_connection(self.conn_id)host=connection.hostorNoneport=connection.portor6333api_key=connection.passwordextra=connection.extra_dejsonurl=extra.get("url",None)grpc_port=extra.get("grpc_port",6334)prefer_gprc=extra.get("prefer_gprc",False)https=extra.get("https",None)prefix=extra.get("prefix",None)returnQdrantClient(host=host,port=port,url=url,api_key=api_key,grpc_port=grpc_port,prefer_grpc=prefer_gprc,https=https,prefix=prefix,)
@cached_property
[docs]defconn(self)->QdrantClient:"""Get a Qdrant client instance for interfacing with the database."""returnself.get_conn()
[docs]defverify_connection(self)->tuple[bool,str]:"""Check the connection to the Qdrant instance."""try:self.conn.get_collections()returnTrue,"Connection established!"except(UnexpectedResponse,RpcError,ValueError)ase:returnFalse,str(e)
[docs]deftest_connection(self)->tuple[bool,str]:"""Test the connection to the Qdrant instance."""returnself.verify_connection()