Spaces:
Runtime error
Runtime error
import os | |
import csv | |
from services import audio | |
import random | |
import pinecone | |
import gradio as gr | |
from openai import OpenAI | |
OPENAI_CLIENT = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
OPENAI_MODEL = os.getenv("OPENAI_MODEL") | |
pinecone.init(api_key=os.getenv("PINECONE_API_KEY"), environment=os.getenv("PINECONE_ENV")) | |
INDEX = pinecone.Index(os.getenv("PINECONE_INDEX")) | |
def start_chat(client_name: str) -> tuple[list[list[str | None]], gr.helpers, gr.helpers]: | |
""" | |
Initialize chat with greeting text and audio in spanish | |
:param client_name: name of the client | |
:return: (chat history with greeting, audio with updated file and gradio update with visible=True) | |
""" | |
# Get greeting text and audio, the first one available in spanish | |
with open(f'assets/{client_name}/greetings/es.csv', mode='r', encoding='utf-8') as infile: | |
reader = csv.reader(infile) | |
greeting = next(reader)[0] | |
audio_name = f'assets/{client_name}/media/audio/greeting_es_0.wav' | |
# Initialize chat | |
chat_history = [['', greeting]] | |
return chat_history, gr.update(value=f'{audio_name}'), gr.update(visible=True) | |
def get_random_data(client_name: str) -> gr.helpers: | |
""" | |
Returns an audio with a random data in spanish | |
:param client_name: name of the client for this chatbot | |
:return: gradio audio updated with a random data from the client | |
""" | |
random_options = [] | |
path_audios = f'assets/{client_name}/media/audio' | |
for random_audio in os.listdir(path_audios): | |
if random_audio.startswith('random') and 'es' in random_audio: | |
random_options.append(os.path.join(path_audios, random_audio)) | |
# Get any of the found random files | |
num = random.randint(0, len(random_options) - 1) | |
return gr.update(value=random_options[num]) | |
def get_answer( | |
chat_history: list[tuple[str, str]], user_input: str, client_name: str, general_prompt: str, context_prompt: str | |
) -> tuple[list[tuple[str, str]], str, gr.helpers]: | |
""" | |
Gets the answer from the chatbot and returns it as an audio and text | |
:param chat_history: previous chat history | |
:param user_input: user question | |
:param client_name: name of the client | |
:param general_prompt: prompt used for answering the questions | |
:param context_prompt: prompt used for finding the context in the vectorstore | |
:return: | |
""" | |
# Format chat history to OpenAI format msg history | |
msg_history = [{'role': 'system', 'content': general_prompt}] | |
for i, (user, bot) in enumerate(chat_history): | |
if i == 0: | |
msg_history.append({'role': 'assistant', 'content': bot}) | |
else: | |
msg_history.append({'role': 'user', 'content': user}) | |
msg_history.append({'role': 'assistant', 'content': bot}) | |
# Get standalone question | |
standalone_question = _get_standalone_question(user_input, msg_history, context_prompt) | |
# Get context | |
context = _get_context(standalone_question, client_name) | |
# Get answer from chatbot | |
response = _get_response(context, msg_history, user_input, general_prompt) | |
# Get audio: | |
audio.get_audio(response, 'es') | |
# Update chat_history | |
chat_history.append((user_input, response)) | |
return chat_history, "", gr.update(value='output.wav') | |
def _get_response(context: str, message_history: list[dict], question: str, prompt: str) -> str: | |
""" | |
Gets the response from ChatGPT | |
:param context: text obtained from the vectorstore | |
:param message_history: chat history in the format used by OpenAI | |
:param question: user question | |
:param prompt: prompt used to answer the questions | |
:return: response from ChatGPT | |
""" | |
message_history[0]['content'] = prompt.replace('CONTEXT', context) | |
message_history.append({'role': 'user', 'content': question}) | |
return _call_api(message_history, 0.7) | |
def _get_embedding(text: str) -> list[float]: | |
""" | |
Gets the embedding of a given text | |
:param text: input text | |
:return: embedding of the text | |
""" | |
response = OPENAI_CLIENT.embeddings.create( | |
input=text, | |
model='text-embedding-ada-002' | |
) | |
return response.data[0].embedding | |
def _call_api(message_history: list[dict], temperature: float) -> str: | |
""" | |
Gets response form OpenAI API | |
:param message_history: chat history in the format used by OpenAI | |
:param temperature: randomness of the output | |
:return: ChatGPT answer | |
""" | |
response = OPENAI_CLIENT.chat.completions.create( | |
model=OPENAI_MODEL, | |
temperature=temperature, | |
messages=message_history | |
) | |
return response.choices[0].message.content | |
def _get_standalone_question(question: str, message_history: list[dict], prompt_q: str) -> str: | |
""" | |
Gets a standalone question/phrase based on the user's question and the previous messages. Used since | |
some questions are too simple like "yes, please" | |
:param question: user question | |
:param message_history: msg history in the format used by OpenAI | |
:param prompt_q: prompt used to get a text that will be used in the vectorstore | |
:return: string with the standalone phrase | |
""" | |
# Format the message history like: Human: blablablá \nAssistant: blablablá | |
history = '' | |
for i, msg in enumerate(message_history): | |
if i == 0: | |
continue # Omit the prompt | |
if i % 2 == 0: | |
history += f'Human: {msg["content"]}\n' | |
else: | |
history += f'Assistant: {msg["content"]}\n' | |
# Add history and question to the prompt and call chatgpt | |
prompt = [{'role': 'system', 'content': ''}] | |
content = prompt_q.replace('HISTORY', history).replace('QUESTION', question) | |
prompt[0]['content'] = content | |
return _call_api(prompt, 0.01) | |
def _get_context(question: str, client_name: str) -> str: | |
""" | |
Gets the 10 nearest vectors to the given question | |
:param question: standalone text | |
:param client_name: name of the client, used as namespace in the vectorstore | |
:return: formatted text with the nearest vectors | |
""" | |
q_embedding = _get_embedding(question) | |
# Get most similar vectors | |
result = INDEX.query( | |
vector=q_embedding, | |
top_k=10, | |
include_metadata=True, | |
namespace=f'{client_name}-context' | |
)['matches'] | |
# Crete a string based on the text of each vector | |
context = '' | |
for r in result: | |
context += r['metadata']['Text'] + '\n' | |
return context | |