import os import json import base64 import shutil import requests import gradio as gr from datetime import datetime from huggingface_hub import hf_hub_download, HfApi def get_main_data(): """ Returns the scores parameters and authors """ scores_parameters = [ 'Personalidad', 'Intereses', 'Lenguaje/Estilo', 'Autenticidad', 'Habilidad de conversación', 'Marca/Producto', 'Identificación', 'Experiencia de uso', 'Recomendación', 'Conversación orgánica' ] authors = ['Sofia', 'Eliza', 'Sindy', 'Carlos', 'Andres', 'Adriana', 'Carolina', 'Valeria'] return scores_parameters, authors def make_invisible(): """ Makes visible a row """ return gr.Row.update(visible=False) def make_visible(): """ Makes visibles 2 rows """ return gr.Row.update(visible=True), gr.Row.update(visible=True) def play_searching(language: str): """ """ if language == 'Español': l = 'es' elif language == 'English': l = 'en' else: l = 'pt' return f'videos/searching_{l}.mp4' def _query(payload, size_gpu: str): """ Returns the json from a post request. It is done to the BellaAPI """ API_TOKEN = os.getenv(f'API_TOKEN') API_URL = os.getenv(f'API_URL_{size_gpu}') headers = { "Authorization": f"Bearer {API_TOKEN}", "Content-Type": "application/json" } response = requests.post(API_URL, headers=headers, json=payload) return response.json() def _download_media(url: str, type_media: str) -> None: """ Downloads a video or audio (depending on the type_media) that can be used inside a gr.Video or gr.Audio """ name = 'video.mp4' if type_media == 'video' else 'audio.wav' with requests.get(url, stream=True) as r, open(name, "wb") as f: shutil.copyfileobj(r.raw, f) def init_chatbot(chatbot: list[tuple[str, str]], language: str): """ Returns a greeting video, with its transcription and the user_id that will be used later in the other requests """ # Select language if language == 'Español': l = 'es' elif language == 'English': l = 'en' else: l = 'pt' # Call API with the following json inputs = {"inputs": { "date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'language': l, 'get_video': True }} output = _query(inputs, 'small') chatbot.append(('', output['answer'])) _download_media(output['link_media'], 'video') return 'video.mp4', chatbot, output['user_id'] def get_answer_text( question: str, chatbot: list[tuple[str, str]], user_id: str, checkbox: bool, size_gpu: str, detect_language: bool ): """ Gets the answer of the chatbot """ get_language = None if detect_language is False else True # Create json and send it to the API inputs = {'inputs': { 'text': question, 'user_id': user_id, 'get_video': checkbox, 'get_language': get_language }} output = _query(inputs, size_gpu) return _update_elements(question, chatbot, output, checkbox, '') def get_answer_audio( audio_path, chatbot: list[tuple[str, str]], user_id: str, checkbox: bool, size_gpu: str, detect_language: bool ): """ Gets the answer of the chatbot """ # Encode audio data to Base64 with open(audio_path, 'rb') as audio_file: audio_data = audio_file.read() encoded_audio = base64.b64encode(audio_data).decode('utf-8') get_language = None if detect_language is False else True # Create json and send it to the API inputs = {'inputs': { 'is_audio': True, 'audio': encoded_audio, 'user_id': user_id, 'get_video': checkbox, 'get_language': get_language }} output = _query(inputs, size_gpu) # Transcription of the audio question = output['question'] return _update_elements(question, chatbot, output, checkbox, None) def _update_elements(question, chatbot, output, checkbox, clean): """ Adds the video, output audio, interaction and cleans the text or audio """ chatbot.append((question, output['answer'])) link_media = output['link_media'] if checkbox: _download_media(link_media, 'video') return 'video.mp4', None, chatbot, clean else: _download_media(link_media, 'audio') return 'videos/waiting.mp4', 'audio.wav', chatbot, clean def save_scores(author: gr.Dropdown, history: gr.Chatbot, opinion: gr.Textbox, *score_values): """ Saves the scores and chat's info into the json file """ # Get the parameters for each score score_parameters, _ = get_main_data() # Get the score of each parameter scores = dict() for parameter, score in zip(score_parameters, score_values): # Check the score is a valid value if not, raise Error if score is None: raise gr.Error('Asegúrese de haber seleccionado al menos 1 opción en cada categoría') scores[parameter] = score # Get all the messages including their reaction chat = [] for conversation in history: info = { 'message': conversation[0], 'answer': conversation[1] } chat.append(info) date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Save the info session = dict( opinion=opinion, scores=scores, chat=chat, author=author, date=date ) # Open the file, add the new info and save it hf_hub_download( repo_id=os.environ.get('DATASET_NAME'), repo_type='dataset', filename="data.json", token=os.environ.get('HUB_TOKEN'), local_dir="./" ) with open('data.json', 'r') as infile: past_sessions = json.load(infile) # Add the new info past_sessions['sessions'].append(session) with open('data.json', 'w', encoding='utf-8') as outfile: json.dump(past_sessions, outfile, indent=4, ensure_ascii=False) # Save the updated file api = HfApi(token=os.environ.get('HUB_TOKEN')) api.upload_file( path_or_fileobj="data.json", path_in_repo="data.json", repo_id=os.environ.get('DATASET_NAME'), repo_type='dataset' ) # Return a confirmation message return 'Done'