import os import csv from services import audio import random import pinecone import gradio as gr from openai import OpenAI OPENAI_CLIENT = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) OPENAI_MODEL = os.getenv("OPENAI_MODEL") pinecone.init(api_key=os.getenv("PINECONE_API_KEY"), environment=os.getenv("PINECONE_ENV")) INDEX = pinecone.Index(os.getenv("PINECONE_INDEX")) def start_chat(client: str) -> tuple[list[list[str | None]], gr.helpers, gr.helpers]: """ Initialize chat with greeting text and audio in spanish :param client: name of the client :return: (chat history with greeting, audio with updated file and gradio update with visible=True) """ client_name = client.lower().replace(' ', '-') # Get greeting text and audio, the first one available in spanish with open(f'assets/{client_name}/greetings/es.csv', mode='r', encoding='utf-8') as infile: reader = csv.reader(infile) greeting = next(reader)[0] audio_name = f'assets/{client_name}/media/audio/greeting_es_0.wav' # Initialize chat chat_history = [['', greeting]] return chat_history, gr.update(value=f'{audio_name}'), gr.update(visible=True) def get_random_data(client: str) -> gr.helpers: """ Returns an audio with a random data in spanish :param client: name of the client for this chatbot :return: gradio audio updated with a random data from the client """ client_name = client.lower().replace(' ', '-') random_options = [] path_audios = f'assets/{client_name}/media/audio' for random_audio in os.listdir(path_audios): if random_audio.startswith('random') and 'es' in random_audio: random_options.append(os.path.join(path_audios, random_audio)) # Get any of the found random files num = random.randint(0, len(random_options) - 1) return gr.update(value=random_options[num]) def get_answer( chat_history: list[tuple[str, str]], user_input: str, client: str, general_prompt: str, context_prompt: str ) -> tuple[list[tuple[str, str]], str, gr.helpers]: """ Gets the answer from the chatbot and returns it as an audio and text :param chat_history: previous chat history :param user_input: user question :param client: name of the client :param general_prompt: prompt used for answering the questions :param context_prompt: prompt used for finding the context in the vectorstore :return: """ client_name = client.lower().replace(' ', '-') # Format chat history to OpenAI format msg history msg_history = [{'role': 'system', 'content': general_prompt}] for i, (user, bot) in enumerate(chat_history): if i == 0: msg_history.append({'role': 'assistant', 'content': bot}) else: msg_history.append({'role': 'user', 'content': user}) msg_history.append({'role': 'assistant', 'content': bot}) # Get standalone question standalone_question = _get_standalone_question(user_input, msg_history, context_prompt) # Get context context = _get_context(standalone_question, client_name) # Get answer from chatbot response = _get_response(context, msg_history, user_input, general_prompt) # Get audio: audio.get_audio(response, 'es') # Update chat_history chat_history.append((user_input, response)) return chat_history, "", gr.update(value='output.wav') def _get_response(context: str, message_history: list[dict], question: str, prompt: str) -> str: """ Gets the response from ChatGPT :param context: text obtained from the vectorstore :param message_history: chat history in the format used by OpenAI :param question: user question :param prompt: prompt used to answer the questions :return: response from ChatGPT """ message_history[0]['content'] = prompt.replace('CONTEXT', context) message_history.append({'role': 'user', 'content': question}) return _call_api(message_history, 0.7) def _get_embedding(text: str) -> list[float]: """ Gets the embedding of a given text :param text: input text :return: embedding of the text """ response = OPENAI_CLIENT.embeddings.create( input=text, model='text-embedding-ada-002' ) return response.data[0].embedding def _call_api(message_history: list[dict], temperature: float) -> str: """ Gets response form OpenAI API :param message_history: chat history in the format used by OpenAI :param temperature: randomness of the output :return: ChatGPT answer """ response = OPENAI_CLIENT.chat.completions.create( model=OPENAI_MODEL, temperature=temperature, messages=message_history ) return response.choices[0].message.content def _get_standalone_question(question: str, message_history: list[dict], prompt_q: str) -> str: """ Gets a standalone question/phrase based on the user's question and the previous messages. Used since some questions are too simple like "yes, please" :param question: user question :param message_history: msg history in the format used by OpenAI :param prompt_q: prompt used to get a text that will be used in the vectorstore :return: string with the standalone phrase """ # Format the message history like: Human: blablablá \nAssistant: blablablá history = '' for i, msg in enumerate(message_history): if i == 0: continue # Omit the prompt if i % 2 == 0: history += f'Human: {msg["content"]}\n' else: history += f'Assistant: {msg["content"]}\n' # Add history and question to the prompt and call chatgpt prompt = [{'role': 'system', 'content': ''}] content = prompt_q.replace('HISTORY', history).replace('QUESTION', question) prompt[0]['content'] = content return _call_api(prompt, 0.01) def _get_context(question: str, client_name: str) -> str: """ Gets the 10 nearest vectors to the given question :param question: standalone text :param client_name: name of the client, used as namespace in the vectorstore :return: formatted text with the nearest vectors """ q_embedding = _get_embedding(question) # Get most similar vectors result = INDEX.query( vector=q_embedding, top_k=10, include_metadata=True, namespace=f'{client_name}-context' )['matches'] # Crete a string based on the text of each vector context = '' for r in result: context += r['metadata']['Text'] + '\n' return context