test-bella-nosotras / functions.py
vmoras's picture
Add video and audio
4cc32d2
import os
import base64
import shutil
import requests
import gradio as gr
from datetime import datetime
def make_invisible():
"""
Makes visible a row
"""
return gr.Row.update(visible=False)
def make_visible():
"""
Makes visibles a rows
"""
return gr.Row.update(visible=True)
def _query(payload, backend_location: str, type_query: str):
"""
Returns the json from a post request. It is done to the BellaAPI
"""
API_TOKEN = os.getenv(f'API_TOKEN')
API_URL = os.getenv(f'API_URL_{backend_location}') + f'{type_query}/'
headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json"
}
response = requests.post(API_URL, headers=headers, json=payload)
return response.json()
def _download_media(url: str, type_media: str) -> None:
"""
Downloads a video or audio (depending on the type_media) that can be
used inside a gr.Video or gr.Audio
"""
name = 'video.mp4' if type_media == 'video' else 'audio.wav'
with requests.get(url, stream=True) as r, open(name, "wb") as f:
shutil.copyfileobj(r.raw, f)
def init_chatbot(chatbot: list[tuple[str, str]], backend_location: str):
"""
Returns a greeting video, with its transcription and the user_id that
will be used later in the other requests
"""
l = 'es'
# Call API with the following json
inputs = {
"date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'language': l,
'get_video': True
}
output = _query(inputs, backend_location, 'start_chat')
chatbot.append(('', output['answer']))
_download_media(output['link_media'], 'video')
return 'video.mp4', chatbot, output['user_id']
def get_answer_text(
question: str, chatbot: list[tuple[str, str]], user_id: str, checkbox_video: bool, checkbox_audio: bool,
backend_location: str
):
"""
Gets the answer of the chatbot
"""
# Create json and send it to the API
inputs = {
'text': question, 'user_id': user_id, 'get_video': checkbox_video, 'get_audio': checkbox_audio,
}
output = _query(inputs, backend_location, 'get_answer')
return _update_elements(question, chatbot, output, checkbox_video, checkbox_audio, '')
def get_answer_audio(
audio_path, chatbot: list[tuple[str, str]], user_id: str, checkbox_video: bool, checkbox_audio: bool,
backend_location: str
):
"""
Gets the answer of the chatbot
"""
# Encode audio data to Base64
with open(audio_path, 'rb') as audio_file:
audio_data = audio_file.read()
encoded_audio = base64.b64encode(audio_data).decode('utf-8')
# Create json and send it to the API
inputs = {
'is_audio': True, 'audio': encoded_audio, 'user_id': user_id, 'get_video': checkbox_video,
'get_audio': checkbox_audio
}
output = _query(inputs, backend_location, 'get_answer')
# Transcription of the audio
question = output['question']
return _update_elements(question, chatbot, output, checkbox_video, checkbox_audio, None)
def _update_elements(question, chatbot, output, checkbox_video, checkbox_audio, clean):
"""
Adds the video, output audio, interaction and cleans the text or audio
"""
chatbot.append((question, output['answer']))
link_media = output['link_media']
if checkbox_video:
_download_media(link_media, 'video')
return 'video.mp4', None, chatbot, clean
elif checkbox_audio:
_download_media(link_media, 'audio')
return None, 'audio.wav', chatbot, clean
else:
return None, None, chatbot, clean