Spaces:
Sleeping
Sleeping
File size: 6,335 Bytes
6e2fd22 1c0b271 aa938da 6e2fd22 aa938da 6e2fd22 1c0b271 6e2fd22 1c0b271 6e2fd22 1c0b271 6e2fd22 aa938da 6e2fd22 aa938da 04c896b aa938da 6e2fd22 04c896b 6e2fd22 04c896b 6e2fd22 aa938da 6e2fd22 aa938da 04c896b aa938da 6e2fd22 04c896b 6e2fd22 04c896b 6e2fd22 aa938da 6e2fd22 1c0b271 6e2fd22 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
import os
import json
import base64
import shutil
import requests
import gradio as gr
from datetime import datetime
from huggingface_hub import hf_hub_download, HfApi
def get_main_data():
"""
Returns the scores parameters and authors
"""
scores_parameters = [
'Personalidad', 'Intereses', 'Lenguaje/Estilo', 'Autenticidad', 'Habilidad de conversaci贸n',
'Marca/Producto', 'Identificaci贸n', 'Experiencia de uso', 'Recomendaci贸n', 'Conversaci贸n org谩nica'
]
authors = ['Sofia', 'Eliza', 'Sindy', 'Carlos', 'Andres', 'Adriana', 'Carolina', 'Valeria']
return scores_parameters, authors
def make_invisible():
"""
Makes visible a row
"""
return gr.Row.update(visible=False)
def make_visible():
"""
Makes visibles 2 rows
"""
return gr.Row.update(visible=True), gr.Row.update(visible=True)
def play_searching(language: str):
"""
"""
if language == 'Espa帽ol':
l = 'es'
elif language == 'English':
l = 'en'
else:
l = 'pt'
return f'videos/searching_{l}.mp4'
def _query(payload, size_gpu: str):
"""
Returns the json from a post request. It is done to the BellaAPI
"""
API_TOKEN = os.getenv(f'API_TOKEN')
API_URL = os.getenv(f'API_URL_{size_gpu}')
headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
response = requests.post(API_URL, headers=headers, json=payload)
return response.json()
def _download_media(url: str, type_media: str) -> None:
"""
Downloads a video or audio (depending on the type_media) that can be
used inside a gr.Video or gr.Audio
"""
name = 'video.mp4' if type_media == 'video' else 'audio.wav'
with requests.get(url, stream=True) as r, open(name, "wb") as f:
shutil.copyfileobj(r.raw, f)
def init_chatbot(chatbot: list[tuple[str, str]], language: str):
"""
Returns a greeting video, with its transcription and the user_id that
will be used later in the other requests
"""
# Select language
if language == 'Espa帽ol':
l = 'es'
elif language == 'English':
l = 'en'
else:
l = 'pt'
# Call API with the following json
inputs = {"inputs": {
"date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'language': l,
'get_video': True
}}
output = _query(inputs, 'small')
chatbot.append(('', output['answer']))
_download_media(output['link_media'], 'video')
return 'video.mp4', chatbot, output['user_id']
def get_answer_text(
question: str, chatbot: list[tuple[str, str]], user_id: str, checkbox: bool, size_gpu: str,
detect_language: bool
):
"""
Gets the answer of the chatbot
"""
get_language = None if detect_language is False else True
# Create json and send it to the API
inputs = {'inputs': {
'text': question, 'user_id': user_id, 'get_video': checkbox, 'get_language': get_language
}}
output = _query(inputs, size_gpu)
return _update_elements(question, chatbot, output, checkbox, '')
def get_answer_audio(
audio_path, chatbot: list[tuple[str, str]], user_id: str, checkbox: bool, size_gpu: str,
detect_language: bool
):
"""
Gets the answer of the chatbot
"""
# Encode audio data to Base64
with open(audio_path, 'rb') as audio_file:
audio_data = audio_file.read()
encoded_audio = base64.b64encode(audio_data).decode('utf-8')
get_language = None if detect_language is False else True
# Create json and send it to the API
inputs = {'inputs': {
'is_audio': True, 'audio': encoded_audio, 'user_id': user_id, 'get_video': checkbox,
'get_language': get_language
}}
output = _query(inputs, size_gpu)
# Transcription of the audio
question = output['question']
return _update_elements(question, chatbot, output, checkbox, None)
def _update_elements(question, chatbot, output, checkbox, clean):
"""
Adds the video, output audio, interaction and cleans the text or audio
"""
chatbot.append((question, output['answer']))
link_media = output['link_media']
if checkbox:
_download_media(link_media, 'video')
return 'video.mp4', None, chatbot, clean
else:
_download_media(link_media, 'audio')
return 'videos/waiting.mp4', 'audio.wav', chatbot, clean
def save_scores(author: gr.Dropdown, history: gr.Chatbot, opinion: gr.Textbox, *score_values):
"""
Saves the scores and chat's info into the json file
"""
# Get the parameters for each score
score_parameters, _ = get_main_data()
# Get the score of each parameter
scores = dict()
for parameter, score in zip(score_parameters, score_values):
# Check the score is a valid value if not, raise Error
if score is None:
raise gr.Error('Aseg煤rese de haber seleccionado al menos 1 opci贸n en cada categor铆a')
scores[parameter] = score
# Get all the messages including their reaction
chat = []
for conversation in history:
info = {
'message': conversation[0],
'answer': conversation[1]
}
chat.append(info)
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Save the info
session = dict(
opinion=opinion,
scores=scores,
chat=chat,
author=author,
date=date
)
# Open the file, add the new info and save it
hf_hub_download(
repo_id=os.environ.get('DATASET_NAME'),
repo_type='dataset',
filename="data.json",
token=os.environ.get('HUB_TOKEN'),
local_dir="./"
)
with open('data.json', 'r') as infile:
past_sessions = json.load(infile)
# Add the new info
past_sessions['sessions'].append(session)
with open('data.json', 'w', encoding='utf-8') as outfile:
json.dump(past_sessions, outfile, indent=4, ensure_ascii=False)
# Save the updated file
api = HfApi(token=os.environ.get('HUB_TOKEN'))
api.upload_file(
path_or_fileobj="data.json",
path_in_repo="data.json",
repo_id=os.environ.get('DATASET_NAME'),
repo_type='dataset'
)
# Return a confirmation message
return 'Done'
|