Spaces:
Runtime error
Runtime error
File size: 6,470 Bytes
e0d9c8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
import os
import csv
from services import audio
import random
import pinecone
import gradio as gr
from openai import OpenAI
OPENAI_CLIENT = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
OPENAI_MODEL = os.getenv("OPENAI_MODEL")
pinecone.init(api_key=os.getenv("PINECONE_API_KEY"), environment=os.getenv("PINECONE_ENV"))
INDEX = pinecone.Index(os.getenv("PINECONE_INDEX"))
def start_chat(client_name: str) -> tuple[list[list[str | None]], gr.helpers, gr.helpers]:
"""
Initialize chat with greeting text and audio in spanish
:param client_name: name of the client
:return: (chat history with greeting, audio with updated file and gradio update with visible=True)
"""
# Get greeting text and audio, the first one available in spanish
with open(f'assets/{client_name}/greetings/es.csv', mode='r', encoding='utf-8') as infile:
reader = csv.reader(infile)
greeting = next(reader)[0]
audio_name = f'assets/{client_name}/media/audio/greeting_es_0.wav'
# Initialize chat
chat_history = [['', greeting]]
return chat_history, gr.update(value=f'{audio_name}'), gr.update(visible=True)
def get_random_data(client_name: str) -> gr.helpers:
"""
Returns an audio with a random data in spanish
:param client_name: name of the client for this chatbot
:return: gradio audio updated with a random data from the client
"""
random_options = []
path_audios = f'assets/{client_name}/media/audio'
for random_audio in os.listdir(path_audios):
if random_audio.startswith('random') and 'es' in random_audio:
random_options.append(os.path.join(path_audios, random_audio))
# Get any of the found random files
num = random.randint(0, len(random_options) - 1)
return gr.update(value=random_options[num])
def get_answer(
chat_history: list[tuple[str, str]], user_input: str, client_name: str, general_prompt: str, context_prompt: str
) -> tuple[list[tuple[str, str]], str, gr.helpers]:
"""
Gets the answer from the chatbot and returns it as an audio and text
:param chat_history: previous chat history
:param user_input: user question
:param client_name: name of the client
:param general_prompt: prompt used for answering the questions
:param context_prompt: prompt used for finding the context in the vectorstore
:return:
"""
# Format chat history to OpenAI format msg history
msg_history = [{'role': 'system', 'content': general_prompt}]
for i, (user, bot) in enumerate(chat_history):
if i == 0:
msg_history.append({'role': 'assistant', 'content': bot})
else:
msg_history.append({'role': 'user', 'content': user})
msg_history.append({'role': 'assistant', 'content': bot})
# Get standalone question
standalone_question = _get_standalone_question(user_input, msg_history, context_prompt)
# Get context
context = _get_context(standalone_question, client_name)
# Get answer from chatbot
response = _get_response(context, msg_history, user_input, general_prompt)
# Get audio:
audio.get_audio(response, 'es')
# Update chat_history
chat_history.append((user_input, response))
return chat_history, "", gr.update(value='output.wav')
def _get_response(context: str, message_history: list[dict], question: str, prompt: str) -> str:
"""
Gets the response from ChatGPT
:param context: text obtained from the vectorstore
:param message_history: chat history in the format used by OpenAI
:param question: user question
:param prompt: prompt used to answer the questions
:return: response from ChatGPT
"""
message_history[0]['content'] = prompt.replace('CONTEXT', context)
message_history.append({'role': 'user', 'content': question})
return _call_api(message_history, 0.7)
def _get_embedding(text: str) -> list[float]:
"""
Gets the embedding of a given text
:param text: input text
:return: embedding of the text
"""
response = OPENAI_CLIENT.embeddings.create(
input=text,
model='text-embedding-ada-002'
)
return response.data[0].embedding
def _call_api(message_history: list[dict], temperature: float) -> str:
"""
Gets response form OpenAI API
:param message_history: chat history in the format used by OpenAI
:param temperature: randomness of the output
:return: ChatGPT answer
"""
response = OPENAI_CLIENT.chat.completions.create(
model=OPENAI_MODEL,
temperature=temperature,
messages=message_history
)
return response.choices[0].message.content
def _get_standalone_question(question: str, message_history: list[dict], prompt_q: str) -> str:
"""
Gets a standalone question/phrase based on the user's question and the previous messages. Used since
some questions are too simple like "yes, please"
:param question: user question
:param message_history: msg history in the format used by OpenAI
:param prompt_q: prompt used to get a text that will be used in the vectorstore
:return: string with the standalone phrase
"""
# Format the message history like: Human: blablablá \nAssistant: blablablá
history = ''
for i, msg in enumerate(message_history):
if i == 0:
continue # Omit the prompt
if i % 2 == 0:
history += f'Human: {msg["content"]}\n'
else:
history += f'Assistant: {msg["content"]}\n'
# Add history and question to the prompt and call chatgpt
prompt = [{'role': 'system', 'content': ''}]
content = prompt_q.replace('HISTORY', history).replace('QUESTION', question)
prompt[0]['content'] = content
return _call_api(prompt, 0.01)
def _get_context(question: str, client_name: str) -> str:
"""
Gets the 10 nearest vectors to the given question
:param question: standalone text
:param client_name: name of the client, used as namespace in the vectorstore
:return: formatted text with the nearest vectors
"""
q_embedding = _get_embedding(question)
# Get most similar vectors
result = INDEX.query(
vector=q_embedding,
top_k=10,
include_metadata=True,
namespace=f'{client_name}-context'
)['matches']
# Crete a string based on the text of each vector
context = ''
for r in result:
context += r['metadata']['Text'] + '\n'
return context
|