Spaces:
Runtime error
Runtime error
import os | |
import csv | |
import uuid | |
import json | |
import logging | |
import pinecone | |
import gradio as gr | |
from PIL import Image | |
from typing import Union | |
from openai import Client | |
from pinecone import Index | |
from services import audio_model, gcp | |
if not os.path.exists('tts_model'): # Get TTS model | |
audio_model.download_model() | |
from services.audio import * | |
from services.video import * | |
pinecone.init(api_key=os.getenv('PINECONE_API_KEY'), environment=os.getenv('PINECONE_ENV')) | |
INDEX = Index(os.getenv('PINECONE_INDEX')) | |
OPENAI_CLIENT = Client() | |
TRANSLATE_LANGUAGES = {'español': 'es', 'ingles': 'en', 'portugués': 'pt'} | |
TRANSLATE_GREET = {'Saludo': 'greeting', 'Despedida': 'goodbye', 'Error': 'error'} | |
def add_data_table(table: list[list[str]], *data: str) -> tuple[list[list[str]], list[str]]: | |
""" | |
Adds the data to the table. Some data consist of two columns others only one. | |
So depending on that, the new row and returned value will be different. | |
:param table: table to add the data to | |
:param data: new row to be added to the table | |
:return: updated table and list of strings for cleaning the input | |
""" | |
if len(data) == 3: # It is the greet tab | |
new_value = '', *data[1:] | |
elif data[-1] in ['español', 'ingles', 'portugués']: | |
new_value = '', data[-1] | |
else: | |
new_value = '', '' | |
# The table is empty, do not append it but replace the first row | |
if all(column == '' for column in table[0]): | |
table[0] = ['❌', *data] | |
# Add the new data | |
else: | |
table.append(['❌', *data]) | |
return table, *new_value | |
def remove_data_table(table: list[list[str]], evt: gr.SelectData) -> list[list[str]]: | |
""" | |
Deletes a row on the table if the selected column is the first one. | |
:param table: clicked table | |
:param evt: the event (has info of the position of the click) | |
:return: updated table | |
""" | |
# The clicked column is not the first one (the one with the X), do not do anything | |
if evt.index[1] != 0: | |
return table | |
# The list only has one row, do not delete it, just put the default one | |
if len(table) == 1: | |
table[0] = ['' for _ in range(len(table[0]))] | |
# Delete the row | |
else: | |
del table[evt.index[0]] | |
return table | |
def add_language(languages: list[str]) -> Union[gr.Error, tuple[gr.helpers, gr.helpers, gr.helpers]]: | |
""" | |
Updated the dropdown with the selected languages | |
:param languages: list of selected languages | |
:return: three updated dropdowns if at least 1 language was selected, otherwise an error | |
""" | |
if len(languages) == 0: | |
raise gr.Error('Debe seleccionar al menos 1 idioma') | |
return ( | |
gr.update(choices=[i for i in languages], value=languages[0], interactive=True), | |
gr.update(choices=[i for i in languages], value=languages[0], interactive=True), | |
gr.update(choices=[i for i in languages], value=languages[0], interactive=True) | |
) | |
def create_chatbot( | |
client: str, name: str, messages_table: list[list[str]], random_table: list[list[str]], | |
questions_table: list[list[str]], image: Image | |
) -> gr.helpers: | |
""" | |
Creation of the chatbot. It creates all the audios, videos csv files for the given tables | |
(greetings, goodbyes, errors and random) and uploads them to GCP, and it creates the | |
vectorstore with the given questions and answers. | |
:param client: name of the client (Nosotras, Visit Orlando, etc.) | |
:param name: name of the chatbot (Bella, Roomie, etc.) | |
:param messages_table: table with the greetings, goodbyes and errors messages | |
:param random_table: table with the random data about the client | |
:param questions_table: table with the questions and answers for each question | |
:param image: image used as base for the videos | |
:return: updates the value of a button (know lets know the user if the process is done or there was an error) | |
""" | |
# Set up general info | |
client_name = client.lower().replace(' ', '-') | |
_ = name.lower() # TODO: use it | |
# Group messages by their type (greeting, goodbye or error) and language | |
messages = dict() | |
for message in messages_table: | |
msg = message[1] | |
type_msg = TRANSLATE_GREET[message[2]] | |
language_msg = TRANSLATE_LANGUAGES[message[-1]] | |
os.makedirs(f'assets/{client_name}/{type_msg}s', exist_ok=True) | |
if type_msg not in messages: | |
messages[type_msg] = {language_msg: [msg]} | |
else: | |
if language_msg not in messages[type_msg]: | |
messages[type_msg][language_msg] = [msg] | |
else: | |
messages[type_msg][language_msg].append(msg) | |
# Create CSV files (greeting, goodbye and error) | |
for type_msg in messages: | |
for language in messages[type_msg]: | |
with (open(f'assets/{client_name}/{type_msg}s/{language}.csv', mode='w', encoding='utf-8', newline='') | |
as outfile): | |
writer = csv.writer(outfile) | |
for msg in messages[type_msg][language]: | |
writer.writerow([msg]) | |
# Create the audios (greeting, goodbye and error) | |
path_audios = f'assets/{client_name}/media/audio' | |
os.makedirs(path_audios, exist_ok=True) | |
for type_msg in messages: | |
for language in messages[type_msg]: | |
for i, msg in enumerate(messages[type_msg][language]): | |
full_path = f'{path_audios}/{type_msg}_{language}_{i}' | |
get_audio(msg, language, full_path) | |
# Group random audios by their language | |
random = dict() | |
for _, msg, language in random_table: | |
short_language = TRANSLATE_LANGUAGES[language] | |
if short_language not in random: | |
random[short_language] = [msg] | |
else: | |
random[short_language].append(msg) | |
# Create the random audios | |
for language in random: | |
for i, msg in enumerate(random[language]): | |
full_path = f'{path_audios}/random_{language}_{i}' | |
get_audio(msg, language, full_path) | |
# Save image | |
os.makedirs(f'assets/{client_name}/media/image', exist_ok=True) | |
image.save(f'assets/{client_name}/media/image/base.png') | |
# Upload files and audios to bucket in GCP | |
gcp.upload_folder(client_name, f'assets/{client_name}') | |
# Create videos for the generated audios and the waiting video (it is muted) | |
path_videos = f'assets/{client_name}/media/video' | |
os.makedirs(path_videos, exist_ok=True) | |
list_audios = os.listdir(path_audios) + ['waiting.wav'] | |
for audio_file in list_audios: | |
name_file = audio_file.split('.')[0] | |
link_audio = gcp.get_link_file(client_name, 'audio', audio_file) | |
link_image = gcp.get_link_file(client_name, 'image', 'base.png') | |
try: | |
get_video(link_audio, link_image, f'{path_videos}/{name_file}') | |
except Exception as e: | |
gr.Error(f'Problema con la creación del video, hable con el administrador. Error: {e}') | |
logging.error(e) | |
return gr.update(value='ERROR!', interactive=False) | |
# Upload videos to GCP | |
gcp.upload_folder(client_name, path_videos) | |
# Set up vectorstore | |
vectors = [] | |
for _, question, context in questions_table: | |
vector = { | |
"id": str(uuid.uuid4()), | |
"values": _get_embedding(question), | |
"metadata": {'Text': context}, | |
} | |
vectors.append(vector) | |
INDEX.upsert(vectors=vectors, namespace=f'{client_name}-context') | |
# Change text in the button | |
return gr.update(value='Chatbot created!!!', interactive=False) | |
def save_prompts(client: str, context_prompt: str, prompts_table: list[list[str]]) -> None: | |
""" | |
Saves all the prompts (standalone and one for each language) and uploads them to Google Cloud Storage | |
:param client: name of the client | |
:param context_prompt: standalone prompt used to search into the vectorstore | |
:param prompts_table: table with the prompt of each language | |
:return: None | |
""" | |
client_name = client.lower().replace(' ', '-') | |
path_prompts = f'assets/{client_name}/prompts' | |
os.makedirs(path_prompts, exist_ok=True) | |
# Save standalone prompt. It is the same for all languages | |
with open(f'{path_prompts}/prompt_standalone_q.txt', mode='w', encoding='utf-8') as outfile: | |
outfile.write(context_prompt) | |
# Save the prompt of each language | |
for _, prompt, language in prompts_table: | |
language_prompt = TRANSLATE_LANGUAGES[language] | |
with open(f'{path_prompts}/prompt_{language_prompt}.txt', mode='w', encoding='utf-8') as outfile: | |
outfile.write(prompt) | |
gcp.upload_folder(client_name, path_prompts) | |
return | |
def generate_json(client: str, languages: list[str], max_num_questions: int, chatbot_name: str) -> gr.helpers: | |
""" | |
Creates a json file with the environment variables used in the API | |
:param client: | |
:param languages: | |
:param max_num_questions: | |
:param chatbot_name: | |
:return: gradio file with the value as the path of the json file | |
""" | |
# Format the name and the languages | |
short_languages = ''.join(f'{TRANSLATE_LANGUAGES[language]},' for language in languages) | |
short_languages = short_languages[:-1] | |
client_name = client.lower().replace(' ', '-') | |
json_object = json.dumps( | |
{ | |
'CLIENT_NAME': client_name, 'MODEL_OPENAI': os.getenv('OPENAI_MODEL'), 'LANGUAGES': short_languages, | |
'MAX_NUM_QUESTIONS': max_num_questions, 'NUM_VECTORS_CONTEXT': 10, 'THRESHOLD_RECYCLE': 0.97, | |
'OPENAI_API_KEY': 'Check OpenAI for this', 'CHATBOT_NAME': chatbot_name, 'HAS_ROADMAP': 0, | |
'SAVE_ANSWERS': 0, 'USE_RECYCLED_DATA': 1 | |
}, | |
indent=4 | |
) | |
path_json = f"assets/{client_name}/chatbot_variables.json" | |
with open(path_json, mode='w', encoding='utf-8') as outfile: | |
outfile.write(json_object) | |
return gr.update(value=path_json, label='Output file', interactive=True) | |
def _get_embedding(sentence: str) -> list[float]: | |
""" | |
Gets the embedding of a word/sentence/paragraph | |
:param sentence: input of the model | |
:return: list of floats representing the embedding | |
""" | |
response = OPENAI_CLIENT.embeddings.create( | |
input=sentence, | |
model='text-embedding-ada-002' | |
) | |
return response.data[0].embedding | |