Source code for airflow.providers.openai.hooks.openai
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Any,BinaryIO,LiteralfromopenaiimportOpenAIifTYPE_CHECKING:fromopenai.typesimportFileDeleted,FileObjectfromopenai.types.betaimport(Assistant,AssistantDeleted,Thread,ThreadDeleted,VectorStore,VectorStoreDeleted,)fromopenai.types.beta.threadsimportMessage,Runfromopenai.types.beta.vector_storesimportVectorStoreFile,VectorStoreFileBatch,VectorStoreFileDeletedfromopenai.types.chatimport(ChatCompletionAssistantMessageParam,ChatCompletionFunctionMessageParam,ChatCompletionMessage,ChatCompletionSystemMessageParam,ChatCompletionToolMessageParam,ChatCompletionUserMessageParam,)fromairflow.hooks.baseimportBaseHook
[docs]classOpenAIHook(BaseHook):""" Use OpenAI SDK to interact with OpenAI APIs. .. seealso:: https://platform.openai.com/docs/introduction/overview :param conn_id: :ref:`OpenAI connection id <howto/connection:openai>` """
[docs]defget_ui_field_behaviour(cls)->dict[str,Any]:"""Return custom field behaviour."""return{"hidden_fields":["schema","port","login"],"relabeling":{"password":"API Key"},"placeholders":{},}
[docs]defconn(self)->OpenAI:"""Return an OpenAI connection object."""returnself.get_conn()
[docs]defget_conn(self)->OpenAI:"""Return an OpenAI connection object."""conn=self.get_connection(self.conn_id)extras=conn.extra_dejsonopenai_client_kwargs=extras.get("openai_client_kwargs",{})api_key=openai_client_kwargs.pop("api_key",None)orconn.passwordbase_url=openai_client_kwargs.pop("base_url",None)orconn.hostorNonereturnOpenAI(api_key=api_key,base_url=base_url,**openai_client_kwargs,)
[docs]defcreate_chat_completion(self,messages:list[ChatCompletionSystemMessageParam|ChatCompletionUserMessageParam|ChatCompletionAssistantMessageParam|ChatCompletionToolMessageParam|ChatCompletionFunctionMessageParam],model:str="gpt-3.5-turbo",**kwargs:Any,)->list[ChatCompletionMessage]:""" Create a model response for the given chat conversation and returns a list of chat completions. :param messages: A list of messages comprising the conversation so far :param model: ID of the model to use """response=self.conn.chat.completions.create(model=model,messages=messages,**kwargs)returnresponse.choices
[docs]defcreate_assistant(self,model:str="gpt-3.5-turbo",**kwargs:Any)->Assistant:""" Create an OpenAI assistant using the given model. :param model: The OpenAI model for the assistant to use. """assistant=self.conn.beta.assistants.create(model=model,**kwargs)returnassistant
[docs]defget_assistant(self,assistant_id:str)->Assistant:""" Get an OpenAI assistant. :param assistant_id: The ID of the assistant to retrieve. """assistant=self.conn.beta.assistants.retrieve(assistant_id=assistant_id)returnassistant
[docs]defget_assistants(self,**kwargs:Any)->list[Assistant]:"""Get a list of Assistant objects."""assistants=self.conn.beta.assistants.list(**kwargs)returnassistants.data
[docs]defmodify_assistant(self,assistant_id:str,**kwargs:Any)->Assistant:""" Modify an existing Assistant object. :param assistant_id: The ID of the assistant to be modified. """assistant=self.conn.beta.assistants.update(assistant_id=assistant_id,**kwargs)returnassistant
[docs]defdelete_assistant(self,assistant_id:str)->AssistantDeleted:""" Delete an OpenAI Assistant for a given ID. :param assistant_id: The ID of the assistant to delete. """response=self.conn.beta.assistants.delete(assistant_id=assistant_id)returnresponse
[docs]defcreate_thread(self,**kwargs:Any)->Thread:"""Create an OpenAI thread."""thread=self.conn.beta.threads.create(**kwargs)returnthread
[docs]defmodify_thread(self,thread_id:str,metadata:dict[str,Any])->Thread:""" Modify an existing Thread object. :param thread_id: The ID of the thread to modify. Only the metadata can be modified. :param metadata: Set of 16 key-value pairs that can be attached to an object. """thread=self.conn.beta.threads.update(thread_id=thread_id,metadata=metadata)returnthread
[docs]defdelete_thread(self,thread_id:str)->ThreadDeleted:""" Delete an OpenAI thread for a given thread_id. :param thread_id: The ID of the thread to delete. """response=self.conn.beta.threads.delete(thread_id=thread_id)returnresponse
[docs]defcreate_message(self,thread_id:str,role:Literal["user","assistant"],content:str,**kwargs:Any)->Message:""" Create a message for a given Thread. :param thread_id: The ID of the thread to create a message for. :param role: The role of the entity that is creating the message. Allowed values include: 'user', 'assistant'. :param content: The content of the message. """thread_message=self.conn.beta.threads.messages.create(thread_id=thread_id,role=role,content=content,**kwargs)returnthread_message
[docs]defget_messages(self,thread_id:str,**kwargs:Any)->list[Message]:""" Return a list of messages for a given Thread. :param thread_id: The ID of the thread the messages belong to. """messages=self.conn.beta.threads.messages.list(thread_id=thread_id,**kwargs)returnmessages.data
[docs]defmodify_message(self,thread_id:str,message_id,**kwargs:Any)->Message:""" Modify an existing message for a given Thread. :param thread_id: The ID of the thread to which this message belongs. :param message_id: The ID of the message to modify. """thread_message=self.conn.beta.threads.messages.update(thread_id=thread_id,message_id=message_id,**kwargs)returnthread_message
[docs]defcreate_run(self,thread_id:str,assistant_id:str,**kwargs:Any)->Run:""" Create a run for a given thread and assistant. :param thread_id: The ID of the thread to run. :param assistant_id: The ID of the assistant to use to execute this run. """run=self.conn.beta.threads.runs.create(thread_id=thread_id,assistant_id=assistant_id,**kwargs)returnrun
[docs]defcreate_run_and_poll(self,thread_id:str,assistant_id:str,**kwargs:Any)->Run:""" Create a run for a given thread and assistant and then polls until completion. :param thread_id: The ID of the thread to run. :param assistant_id: The ID of the assistant to use to execute this run. :return: An OpenAI Run object """run=self.conn.beta.threads.runs.create_and_poll(thread_id=thread_id,assistant_id=assistant_id,**kwargs)returnrun
[docs]defget_run(self,thread_id:str,run_id:str)->Run:""" Retrieve a run for a given thread and run. :param thread_id: The ID of the thread that was run. :param run_id: The ID of the run to retrieve. """run=self.conn.beta.threads.runs.retrieve(thread_id=thread_id,run_id=run_id)returnrun
[docs]defget_runs(self,thread_id:str,**kwargs:Any)->list[Run]:""" Return a list of runs belonging to a thread. :param thread_id: The ID of the thread the run belongs to. """runs=self.conn.beta.threads.runs.list(thread_id=thread_id,**kwargs)returnruns.data
[docs]defmodify_run(self,thread_id:str,run_id:str,**kwargs:Any)->Run:""" Modify a run on a given thread. :param thread_id: The ID of the thread that was run. :param run_id: The ID of the run to modify. """run=self.conn.beta.threads.runs.update(thread_id=thread_id,run_id=run_id,**kwargs)returnrun
[docs]defcreate_embeddings(self,text:str|list[str]|list[int]|list[list[int]],model:str="text-embedding-ada-002",**kwargs:Any,)->list[float]:""" Generate embeddings for the given text using the given model. :param text: The text to generate embeddings for. :param model: The model to use for generating embeddings. """response=self.conn.embeddings.create(model=model,input=text,**kwargs)embeddings:list[float]=response.data[0].embeddingreturnembeddings
[docs]defupload_file(self,file:str,purpose:Literal["fine-tune","assistants"])->FileObject:""" Upload a file that can be used across various endpoints. The size of all the files uploaded by one organization can be up to 100 GB. :param file: The File object (not file name) to be uploaded. :param purpose: The intended purpose of the uploaded file. Use "fine-tune" for Fine-tuning and "assistants" for Assistants and Messages. """withopen(file,"rb")asfile_stream:file_object=self.conn.files.create(file=file_stream,purpose=purpose)returnfile_object
[docs]defget_file(self,file_id:str)->FileObject:""" Return information about a specific file. :param file_id: The ID of the file to use for this request. """file=self.conn.files.retrieve(file_id=file_id)returnfile
[docs]defget_files(self)->list[FileObject]:"""Return a list of files that belong to the user's organization."""files=self.conn.files.list()returnfiles.data
[docs]defdelete_file(self,file_id:str)->FileDeleted:""" Delete a file. :param file_id: The ID of the file to be deleted. """response=self.conn.files.delete(file_id=file_id)returnresponse
[docs]defcreate_vector_store(self,**kwargs:Any)->VectorStore:"""Create a vector store."""vector_store=self.conn.beta.vector_stores.create(**kwargs)returnvector_store
[docs]defget_vector_stores(self,**kwargs:Any)->list[VectorStore]:"""Return a list of vector stores."""vector_stores=self.conn.beta.vector_stores.list(**kwargs)returnvector_stores.data
[docs]defget_vector_store(self,vector_store_id:str)->VectorStore:""" Retrieve a vector store. :param vector_store_id: The ID of the vector store to retrieve. """vector_store=self.conn.beta.vector_stores.retrieve(vector_store_id=vector_store_id)returnvector_store
[docs]defmodify_vector_store(self,vector_store_id:str,**kwargs:Any)->VectorStore:""" Modify a vector store. :param vector_store_id: The ID of the vector store to modify. """vector_store=self.conn.beta.vector_stores.update(vector_store_id=vector_store_id,**kwargs)returnvector_store
[docs]defdelete_vector_store(self,vector_store_id:str)->VectorStoreDeleted:""" Delete a vector store. :param vector_store_id: The ID of the vector store to delete. """response=self.conn.beta.vector_stores.delete(vector_store_id=vector_store_id)returnresponse
[docs]defupload_files_to_vector_store(self,vector_store_id:str,files:list[BinaryIO])->VectorStoreFileBatch:""" Upload files to a vector store and poll until completion. :param vector_store_id: The ID of the vector store the files are to be uploaded to. :param files: A list of binary files to upload. """file_batch=self.conn.beta.vector_stores.file_batches.upload_and_poll(vector_store_id=vector_store_id,files=files)returnfile_batch
[docs]defget_vector_store_files(self,vector_store_id:str)->list[VectorStoreFile]:""" Return a list of vector store files. :param vector_store_id: """vector_store_files=self.conn.beta.vector_stores.files.list(vector_store_id=vector_store_id)returnvector_store_files.data
[docs]defdelete_vector_store_file(self,vector_store_id:str,file_id:str)->VectorStoreFileDeleted:""" Delete a vector store file. This will remove the file from the vector store but the file itself will not be deleted. To delete the file, use delete_file. :param vector_store_id: The ID of the vector store that the file belongs to. :param file_id: The ID of the file to delete. """response=self.conn.beta.vector_stores.files.delete(vector_store_id=vector_store_id,file_id=file_id)returnresponse