Spaces:
Runtime error
Runtime error
File size: 6,470 Bytes
e0d9c8e |
|
import os
import csv
from services import audio
import random
import pinecone
import gradio as gr
from openai import OpenAI
OPENAI_CLIENT = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
OPENAI_MODEL = os.getenv("OPENAI_MODEL")
pinecone.init(api_key=os.getenv("PINECONE_API_KEY"), environment=os.getenv("PINECONE_ENV"))
INDEX = pinecone.Index(os.getenv("PINECONE_INDEX"))
def start_chat(client_name: str) -> tuple[list[list[str | None]], gr.helpers, gr.helpers]:
"""
Initialize chat with greeting text and audio in spanish
:param client_name: name of the client
:return: (chat history with greeting, audio with updated file and gradio update with visible=True)
"""
# Get greeting text and audio, the first one available in spanish
with open(f'assets/{client_name}/greetings/es.csv', mode='r', encoding='utf-8') as infile:
reader = csv.reader(infile)
greeting = next(reader)[0]
audio_name = f'assets/{client_name}/media/audio/greeting_es_0.wav'
# Initialize chat
chat_history = [['', greeting]]
return chat_history, gr.update(value=f'{audio_name}'), gr.update(visible=True)
def get_random_data(client_name: str) -> gr.helpers:
"""
Returns an audio with a random data in spanish
:param client_name: name of the client for this chatbot
:return: gradio audio updated with a random data from the client
"""
random_options = []
path_audios = f'assets/{client_name}/media/audio'
for random_audio in os.listdir(path_audios):
if random_audio.startswith('random') and 'es' in random_audio:
random_options.append(os.path.join(path_audios, random_audio))
# Get any of the found random files
num = random.randint(0, len(random_options) - 1)
return gr.update(value=random_options[num])
def get_answer(
chat_history: list[tuple[str, str]], user_input: str, client_name: str, general_prompt: str, context_prompt: str
) -> tuple[list[tuple[str, str]], str, gr.helpers]:
"""
Gets the answer from the chatbot and returns it as an audio and text
:param chat_history: previous chat history
:param user_input: user question
:param client_name: name of the client
:param general_prompt: prompt used for answering the questions
:param context_prompt: prompt used for finding the context in the vectorstore
:return:
"""
# Format chat history to OpenAI format msg history
msg_history = [{'role': 'system', 'content': general_prompt}]
for i, (user, bot) in enumerate(chat_history):
if i == 0:
msg_history.append({'role': 'assistant', 'content': bot})
else:
msg_history.append({'role': 'user', 'content': user})
msg_history.append({'role': 'assistant', 'content': bot})
# Get standalone question
standalone_question = _get_standalone_question(user_input, msg_history, context_prompt)
# Get context
context = _get_context(standalone_question, client_name)
# Get answer from chatbot
response = _get_response(context, msg_history, user_input, general_prompt)
# Get audio:
audio.get_audio(response, 'es')
# Update chat_history
chat_history.append((user_input, response))
return chat_history, "", gr.update(value='output.wav')
def _get_response(context: str, message_history: list[dict], question: str, prompt: str) -> str:
"""
Gets the response from ChatGPT
:param context: text obtained from the vectorstore
:param message_history: chat history in the format used by OpenAI
:param question: user question
:param prompt: prompt used to answer the questions
:return: response from ChatGPT
"""
message_history[0]['content'] = prompt.replace('CONTEXT', context)
message_history.append({'role': 'user', 'content': question})
return _call_api(message_history, 0.7)
def _get_embedding(text: str) -> list[float]:
"""
Gets the embedding of a given text
:param text: input text
:return: embedding of the text
"""
response = OPENAI_CLIENT.embeddings.create(
input=text,
model='text-embedding-ada-002'
)
return response.data[0].embedding
def _call_api(message_history: list[dict], temperature: float) -> str:
"""
Gets response form OpenAI API
:param message_history: chat history in the format used by OpenAI
:param temperature: randomness of the output
:return: ChatGPT answer
"""
response = OPENAI_CLIENT.chat.completions.create(
model=OPENAI_MODEL,
temperature=temperature,
messages=message_history
)
return response.choices[0].message.content
def _get_standalone_question(question: str, message_history: list[dict], prompt_q: str) -> str:
"""
Gets a standalone question/phrase based on the user's question and the previous messages. Used since
some questions are too simple like "yes, please"
:param question: user question
:param message_history: msg history in the format used by OpenAI
:param prompt_q: prompt used to get a text that will be used in the vectorstore
:return: string with the standalone phrase
"""
# Format the message history like: Human: blablablá \nAssistant: blablablá
history = ''
for i, msg in enumerate(message_history):
if i == 0:
continue # Omit the prompt
if i % 2 == 0:
history += f'Human: {msg["content"]}\n'
else:
history += f'Assistant: {msg["content"]}\n'
# Add history and question to the prompt and call chatgpt
prompt = [{'role': 'system', 'content': ''}]
content = prompt_q.replace('HISTORY', history).replace('QUESTION', question)
prompt[0]['content'] = content
return _call_api(prompt, 0.01)
def _get_context(question: str, client_name: str) -> str:
"""
Gets the 10 nearest vectors to the given question
:param question: standalone text
:param client_name: name of the client, used as namespace in the vectorstore
:return: formatted text with the nearest vectors
"""
q_embedding = _get_embedding(question)
# Get most similar vectors
result = INDEX.query(
vector=q_embedding,
top_k=10,
include_metadata=True,
namespace=f'{client_name}-context'
)['matches']
# Crete a string based on the text of each vector
context = ''
for r in result:
context += r['metadata']['Text'] + '\n'
return context
|