import os import base64 import shutil import requests import gradio as gr from datetime import datetime def make_invisible(): """ Makes visible a row """ return gr.Row.update(visible=False) def make_visible(): """ Makes visibles a rows """ return gr.Row.update(visible=True) def _query(payload, backend_location: str, type_query: str): """ Returns the json from a post request. It is done to the BellaAPI """ API_TOKEN = os.getenv(f'API_TOKEN') API_URL = os.getenv(f'API_URL_{backend_location}') + f'{type_query}/' headers = { "Authorization": f"Bearer {API_TOKEN}", "Content-Type": "application/json" } response = requests.post(API_URL, headers=headers, json=payload) return response.json() def _download_media(url: str, type_media: str) -> None: """ Downloads a video or audio (depending on the type_media) that can be used inside a gr.Video or gr.Audio """ name = 'video.mp4' if type_media == 'video' else 'audio.wav' with requests.get(url, stream=True) as r, open(name, "wb") as f: shutil.copyfileobj(r.raw, f) def init_chatbot(chatbot: list[tuple[str, str]], backend_location: str): """ Returns a greeting video, with its transcription and the user_id that will be used later in the other requests """ l = 'es' # Call API with the following json inputs = { "date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'language': l, 'get_video': True } output = _query(inputs, backend_location, 'start_chat') chatbot.append(('', output['answer'])) _download_media(output['link_media'], 'video') return 'video.mp4', chatbot, output['user_id'] def get_answer_text( question: str, chatbot: list[tuple[str, str]], user_id: str, checkbox_video: bool, checkbox_audio: bool, backend_location: str ): """ Gets the answer of the chatbot """ # Create json and send it to the API inputs = { 'text': question, 'user_id': user_id, 'get_video': checkbox_video, 'get_audio': checkbox_audio, } output = _query(inputs, backend_location, 'get_answer') return _update_elements(question, chatbot, output, checkbox_video, checkbox_audio, '') def get_answer_audio( audio_path, chatbot: list[tuple[str, str]], user_id: str, checkbox_video: bool, checkbox_audio: bool, backend_location: str ): """ Gets the answer of the chatbot """ # Encode audio data to Base64 with open(audio_path, 'rb') as audio_file: audio_data = audio_file.read() encoded_audio = base64.b64encode(audio_data).decode('utf-8') # Create json and send it to the API inputs = { 'is_audio': True, 'audio': encoded_audio, 'user_id': user_id, 'get_video': checkbox_video, 'get_audio': checkbox_audio } output = _query(inputs, backend_location, 'get_answer') # Transcription of the audio question = output['question'] return _update_elements(question, chatbot, output, checkbox_video, checkbox_audio, None) def _update_elements(question, chatbot, output, checkbox_video, checkbox_audio, clean): """ Adds the video, output audio, interaction and cleans the text or audio """ chatbot.append((question, output['answer'])) link_media = output['link_media'] if checkbox_video: _download_media(link_media, 'video') return 'video.mp4', None, chatbot, clean elif checkbox_audio: _download_media(link_media, 'audio') return None, 'audio.wav', chatbot, clean else: return None, None, chatbot, clean