import os import audio import random import pinecone import gradio as gr from openai import OpenAI OPENAI_CLIENT = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) pinecone.init(api_key=os.getenv("PINECONE_API_TOKEN"), environment=os.getenv("PINECONE_ENVIRONMENT")) def start_chat(chat_history: list[list[str | None]]): # Get greeting text and audio greeting = '' audio_name = '' chat_history.append(['', greeting]) return chat_history, gr.update(value=f'{audio_name}.wav'), gr.update(visible=False) def get_random_data(client_name: str): random_options = [] path_audios = f'assets/{client_name}/media/audio' for random_audio in os.listdir(path_audios): if random_audio.startswith('random') and 'es' in random_audio: random_options.append(random_audio) num = random.randint(0, len(random_options) - 1) return gr.update(value=random_options[num]) def get_answer( chat_history: list[tuple[str, str]], user_input: str, client_name: str, general_prompt: str,context_prompt: str ): # Format chat history to OpenAI format msg history msg_history = [{'role': 'system', 'content': general_prompt}] for i, msg in enumerate(chat_history): if i == 0: continue # Omit the prompt if i % 2 == 0: msg_history.append({'role': 'user', 'content': msg}) else: msg_history.append({'role': 'assistant', 'content': msg}) # Get standalone question standalone_question = _get_standalone_question(user_input, msg_history, context_prompt) # Get context context = _get_context(standalone_question, client_name) # Get answer from chatbot response = _get_response(context, msg_history, user_input, general_prompt) # Get audio audio.get_audio(response, 'es') # Update chat_history chat_history.append((user_input, response)) return chat_history, "", gr.update(value='output.wav') def _get_response(context: str, message_history: list[dict], question: str, prompt: str) -> str: message_history[0]['content'] = prompt.replace('CONTEXT', context) message_history.append({'role': 'user', 'content': question}) return _call_api(message_history) def _get_embedding(text: str) -> list[float]: response = OPENAI_CLIENT.embeddings.create( input=text, model='text-embedding-ada-002' ) return response.data[0].embedding def _call_api(message_history: list[dict]) -> str: response = OPENAI_CLIENT.chat.completions.create( model='gpt-4-turbo-preview', temperature=0.7, messages=message_history ) return response.choices[0].message.content def _get_standalone_question(question: str, message_history: list[dict], prompt_q: str) -> str: # Format the message history like: Human: blablablá \nAssistant: blablablá history = '' for i, msg in enumerate(message_history): if i == 0: continue # Omit the prompt if i % 2 == 0: history += f'Human: {msg["content"]}\n' else: history += f'Assistant: {msg["content"]}\n' # Add history and question to the prompt and call chatgpt prompt = [{'role': 'system', 'content': ''}] content = prompt_q.replace('HISTORY', history).replace('QUESTION', question) prompt[0]['content'] = content return _call_api(prompt) def _get_context(question: str, client_name: str) -> str: q_embedding = _get_embedding(question) # Get most similar vectors index = pinecone.Index(client_name) result = index.query( vector=q_embedding, top_k=10, include_metadata=True, namespace=f'{client_name}-context' )['matches'] # Crete a string based on the text of each vector context = '' for r in result: context += r['metadata']['Text'] + '\n' return context