Spaces:
Runtime error
Runtime error
import os | |
import base64 | |
import shutil | |
import requests | |
import gradio as gr | |
from datetime import datetime | |
def make_invisible(): | |
""" | |
Makes visible a row | |
""" | |
return gr.Row.update(visible=False) | |
def make_visible(): | |
""" | |
Makes visibles a rows | |
""" | |
return gr.Row.update(visible=True) | |
def _query(payload, backend_location: str, type_query: str): | |
""" | |
Returns the json from a post request. It is done to the BellaAPI | |
""" | |
API_TOKEN = os.getenv(f'API_TOKEN') | |
API_URL = os.getenv(f'API_URL_{backend_location}') + f'{type_query}/' | |
headers = { | |
"Authorization": f"Bearer {API_TOKEN}", | |
"Content-Type": "application/json" | |
} | |
response = requests.post(API_URL, headers=headers, json=payload) | |
return response.json() | |
def _download_media(url: str, type_media: str) -> None: | |
""" | |
Downloads a video or audio (depending on the type_media) that can be | |
used inside a gr.Video or gr.Audio | |
""" | |
name = 'video.mp4' if type_media == 'video' else 'audio.wav' | |
with requests.get(url, stream=True) as r, open(name, "wb") as f: | |
shutil.copyfileobj(r.raw, f) | |
def init_chatbot(chatbot: list[tuple[str, str]], backend_location: str): | |
""" | |
Returns a greeting video, with its transcription and the user_id that | |
will be used later in the other requests | |
""" | |
l = 'es' | |
# Call API with the following json | |
inputs = { | |
"date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
'language': l, | |
'get_video': True | |
} | |
output = _query(inputs, backend_location, 'start_chat') | |
chatbot.append(('', output['answer'])) | |
_download_media(output['link_media'], 'video') | |
return 'video.mp4', chatbot, output['user_id'] | |
def get_answer_text( | |
question: str, chatbot: list[tuple[str, str]], user_id: str, checkbox_video: bool, checkbox_audio: bool, | |
backend_location: str | |
): | |
""" | |
Gets the answer of the chatbot | |
""" | |
# Create json and send it to the API | |
inputs = { | |
'text': question, 'user_id': user_id, 'get_video': checkbox_video, 'get_audio': checkbox_audio, | |
} | |
output = _query(inputs, backend_location, 'get_answer') | |
return _update_elements(question, chatbot, output, checkbox_video, checkbox_audio, '') | |
def get_answer_audio( | |
audio_path, chatbot: list[tuple[str, str]], user_id: str, checkbox_video: bool, checkbox_audio: bool, | |
backend_location: str | |
): | |
""" | |
Gets the answer of the chatbot | |
""" | |
# Encode audio data to Base64 | |
with open(audio_path, 'rb') as audio_file: | |
audio_data = audio_file.read() | |
encoded_audio = base64.b64encode(audio_data).decode('utf-8') | |
# Create json and send it to the API | |
inputs = { | |
'is_audio': True, 'audio': encoded_audio, 'user_id': user_id, 'get_video': checkbox_video, | |
'get_audio': checkbox_audio | |
} | |
output = _query(inputs, backend_location, 'get_answer') | |
# Transcription of the audio | |
question = output['question'] | |
return _update_elements(question, chatbot, output, checkbox_video, checkbox_audio, None) | |
def _update_elements(question, chatbot, output, checkbox_video, checkbox_audio, clean): | |
""" | |
Adds the video, output audio, interaction and cleans the text or audio | |
""" | |
chatbot.append((question, output['answer'])) | |
link_media = output['link_media'] | |
if checkbox_video: | |
_download_media(link_media, 'video') | |
return 'video.mp4', None, chatbot, clean | |
elif checkbox_audio: | |
_download_media(link_media, 'audio') | |
return None, 'audio.wav', chatbot, clean | |
else: | |
return None, None, chatbot, clean | |