airflow.providers.weaviate.hooks.weaviate

Module Contents

Classes

WeaviateHook

Interact with Weaviate database to store vectors. This hook uses the 'conn_id'.

Attributes

ExitingSchemaOptions

HTTP_RETRY_STATUS_CODE

REQUESTS_EXCEPTIONS_TYPES

airflow.providers.weaviate.hooks.weaviate.ExitingSchemaOptions[source]
airflow.providers.weaviate.hooks.weaviate.HTTP_RETRY_STATUS_CODE = [429, 500, 503, 504][source]
airflow.providers.weaviate.hooks.weaviate.REQUESTS_EXCEPTIONS_TYPES = ()[source]
airflow.providers.weaviate.hooks.weaviate.check_http_error_is_retryable(exc)[source]
class airflow.providers.weaviate.hooks.weaviate.WeaviateHook(conn_id=default_conn_name, *args, **kwargs)[source]

Bases: airflow.hooks.base.BaseHook

Interact with Weaviate database to store vectors. This hook uses the ‘conn_id’.

Parameters

conn_id (str) – The connection id to use when connecting to Weaviate. <howto/connection:weaviate>

conn_name_attr = 'conn_id'[source]
default_conn_name = 'weaviate_default'[source]
conn_type = 'weaviate'[source]
hook_name = 'Weaviate'[source]
classmethod get_connection_form_widgets()[source]

Return connection widgets to add to connection form.

classmethod get_ui_field_behaviour()[source]

Return custom field behaviour.

get_conn()[source]

Return connection for the hook.

conn()[source]

Returns a Weaviate client.

test_connection()[source]
create_collection(name, **kwargs)[source]

Create a new collection.

get_collection(name)[source]

Get a collection by name.

Parameters

name (str) – The name of the collection to get.

delete_collections(collection_names, if_error='stop')[source]

Delete all or specific collections if collection_names are provided.

Parameters
  • collection_names (list[str] | str) – list of collection names to be deleted.

  • if_error (str) – define the actions to be taken if there is an error while deleting a collection, possible options are stop and continue

Returns

if if_error=continue return list of collections which we failed to delete. if if_error=stop returns None.

Return type

list[str] | None

get_collection_configuration(collection_name)[source]

Get the collection configuration from Weaviate.

Parameters

collection_name (str) – The collection for which to return the collection configuration.

update_collection_configuration(collection_name, **kwargs)[source]

Update the collection configuration.

batch_data(collection_name, data, vector_col='Vector', uuid_col='id', retry_attempts_per_object=5, references=None)[source]

Add multiple objects or object references at once into weaviate.

Parameters
  • collection_name (str) – The name of the collection that objects belongs to.

  • data (list[dict[str, Any]] | pandas.DataFrame | None) – list or dataframe of objects we want to add.

  • vector_col (str) – name of the column containing the vector.

  • uuid_col (str) – Name of the column containing the UUID.

  • retry_attempts_per_object (int) – number of time to try in case of failure before giving up.

  • references (weaviate.collections.classes.internal.ReferenceInputs | None) – The references of the object to be added as a dictionary. Use wvc.Reference.to to create the correct values in the dict.

query_with_vector(embeddings, collection_name, properties, certainty=0.7, limit=1, **kwargs)[source]

Query weaviate database with near vectors.

This method uses a vector search using a Get query. we are using a with_near_vector to provide weaviate with a query with vector itself. This is needed for query a Weaviate class with a custom, external vectorizer. Weaviate then converts this into a vector through the inference API (OpenAI in this particular example) and uses that vector as the basis for a vector search.

query_with_text(search_text, collection_name, properties, limit=1, **kwargs)[source]

Query using near text.

This method uses a vector search using a Get query. we are using a nearText operator to provide weaviate with a query search_text. Weaviate then converts this into a vector through the inference API (OpenAI in this particular example) and uses that vector as the basis for a vector search.

create_object(data_object, collection_name, **kwargs)[source]

Create a new object.

Parameters
  • data_object (dict) – Object to be added. If type is str it should be either a URL or a file.

  • collection_name (str) – Collection name associated with the object given.

  • kwargs – Additional parameters to be passed to weaviate_client.data_object.create()

get_or_create_object(collection_name, data_object, vector=None, **kwargs)[source]

Get or Create a new object.

Returns the object if already exists, return UUID if not

Parameters
  • collection_name – Collection name associated with the object given..

  • data_object (dict) – Object to be added.

  • vector (Sequence | None) – Vector associated with the object given. This argument is only used when creating object.

  • kwargs – parameters to be passed to collection.data.fetch_object_by_id() or collection.data.fetch_objects()

get_object(collection_name, **kwargs)[source]

Get objects or an object from weaviate.

Parameters

kwargs – parameters to be passed to collection.query.fetch_objects()

get_all_objects(collection_name, after=None, as_dataframe=False, **kwargs)[source]

Get all objects from weaviate.

if after is provided, it will be used as the starting point for the listing.

Parameters
  • after (str | weaviate.types.UUID | None) – uuid of the object to start listing from

  • as_dataframe (bool) – if True, returns a pandas dataframe

  • kwargs – parameters to be passed to weaviate_client.data_object.get()

delete_object(collection_name, uuid)[source]

Delete an object from weaviate.

Parameters
  • collection_name (str) – Collection name associated with the object given.

  • uuid (weaviate.types.UUID | str) – uuid of the object to be deleted

update_object(collection_name, uuid, properties=None, **kwargs)[source]

Update an object in weaviate.

Parameters
  • collection_name (str) – Collection name associated with the object given.

  • uuid (weaviate.types.UUID | str) – uuid of the object to be updated

  • properties (weaviate.collections.classes.types.Properties | None) – The properties of the object.

  • kwargs – Optional parameters to be passed to collection.data.update()

replace_object(collection_name, uuid, properties, references=None, **kwargs)[source]

Replace an object in weaviate.

Parameters
  • collection_name (str) – Collection name associated with the object given.

  • uuid (weaviate.types.UUID | str) – uuid of the object to be updated

  • properties (weaviate.collections.classes.types.Properties) – The properties of the object.

  • references (weaviate.collections.classes.internal.ReferenceInputs | None) – Any references to other objects in Weaviate.

  • kwargs – Optional parameters to be passed to collection.data.replace()

object_exists(collection_name, uuid)[source]

Check if an object exists in weaviate.

Parameters
  • collection_name (str) – Collection name associated with the object given.

  • uuid (str | weaviate.types.UUID) – The UUID of the object that may or may not exist within Weaviate.

create_or_replace_document_objects(data, collection_name, document_column, existing='skip', uuid_column=None, vector_column='Vector', verbose=False)[source]

create or replace objects belonging to documents.

In real-world scenarios, information sources like Airflow docs, Stack Overflow, or other issues are considered ‘documents’ here. It’s crucial to keep the database objects in sync with these sources. If any changes occur in these documents, this function aims to reflect those changes in the database.

Note

This function assumes responsibility for identifying changes in documents, dropping relevant database objects, and recreating them based on updated information. It’s crucial to handle this process with care, ensuring backups and validation are in place to prevent data loss or inconsistencies.

Provides users with multiple ways of dealing with existing values. replace: replace the existing objects with new objects. This option requires to identify the objects belonging to a document. which by default is done by using document_column field. skip: skip the existing objects and only add the missing objects of a document. error: raise an error if an object belonging to a existing document is tried to be created.

Parameters
  • data (pandas.DataFrame | list[dict[str, Any]] | list[pandas.DataFrame]) – A single pandas DataFrame or a list of dicts to be ingested.

  • colleciton_name – Name of the collection in Weaviate schema where data is to be ingested.

  • existing (str) – Strategy for handling existing data: ‘skip’, or ‘replace’. Default is ‘skip’.

  • document_column (str) – Column in DataFrame that identifying source document.

  • uuid_column (str | None) – Column with pre-generated UUIDs. If not provided, UUIDs will be generated.

  • vector_column (str) – Column with embedding vectors for pre-embedded data.

  • verbose (bool) – Flag to enable verbose output during the ingestion process.

Returns

list of UUID which failed to create

Return type

Sequence[dict[str, weaviate.types.UUID | str] | None]

Was this entry helpful?