Spaces:
Build error
Build error
File size: 5,990 Bytes
b21ecef 3f43bed b21ecef 7ce50df b21ecef 7db5fdc b21ecef 7ce50df 7db5fdc 2e0131e 3f43bed 7ce50df 8d6dff7 b21ecef 5a55679 adf2674 aa5411c adf2674 a8f0ac1 cfaa24c adf2674 3f43bed eaf44d3 3f43bed b21ecef 3f43bed b21ecef 3f43bed b21ecef 3f43bed fa979ce b21ecef a72265c 7db5fdc d7ce0e1 2e0131e 7db5fdc 31bc5a1 3bf79f7 31bc5a1 7db5fdc 749c554 b21ecef fed3bd3 fa979ce b21ecef fa979ce 7db5fdc 2e0131e 7db5fdc be70964 37e01c8 be70964 b21ecef 7db5fdc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import json
import os
import pathlib
import time
from pathlib import Path
from shutil import rmtree
import anvil.server
import anvil.media
import dotenv
import gradio as gr
import requests
from download import download_generator, caption_generator
dotenv.load_dotenv()
@anvil.server.background_task
@anvil.server.callable
def call_gradio_api(api_name='test_api', data=()):
port = os.environ.get('SERVER_PORT', 8111)
gradio_base_url = os.environ.get('GRADIO_URL', f'http://127.0.0.1:{port}/api/')
gradio_url = f"{gradio_base_url}{api_name}"
payload = json.dumps({"data": data})
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
print(f"Calling {gradio_url} with {payload}")
resp = requests.request("POST", gradio_url, headers=headers, data=payload)
print(f"Finished calling gradio API with result")
print(resp.text)
if api_name == 'caption':
return json.loads(resp.text)
if api_name == 'transcribe_translate':
return json.loads(resp.text)
# data = resp.json()['data'][0]
# data = json.loads(data)
# return {"status": data['status'], "captions": data['captions'], "language": data['language']}
data = resp.json()['data'][0]
if data:
video_path = data[0]
translated_video = anvil.media.from_file(video_path)
return {'translated_video': translated_video,
'log': data[1],
'text': data[2],
'language': data[3],
'duration': resp.json()['duration']}
def remote_download(url):
print(f"remote_download: Downloading {url}")
final_response = ''
subbed_video_media = None
whisper_result = None
for response in download_generator(url):
if 'whisper_result' in response:
whisper_result = response.get('whisper_result')
final_response = response['message']
print(final_response)
if 'sub_video' in response:
subbed_video_media = response['sub_video']
return subbed_video_media, final_response, whisper_result["text"], whisper_result["language"]
def test_api(url=''):
print(f'Request from Anvil with URL {url}, faking a long ass request that takes 15 seconds')
time.sleep(15)
# fake a long ass request that takes 15 seconds
# TODO: add a video output here to test how events are done
# TODO: add an anvil server pingback to show we completed the queue operation
return f"I've slept for 15 seconds and now I'm done. "
def caption(downloadable_url="", uid="", language="Autodetect", override_model_size=""):
"""
:param media_id: The twitter media ID object
:param user_id_str: The twitter user ID string
:param tweet_url: tweet URL can potentially not exist in the future, so we can upload on behalf of the user
:return:
"""
status, whisper_result_captions, detected_language = caption_generator(downloadable_url, uid, language, override_model_size)
anvil.server.launch_background_task('add_captions_to_video', uid, whisper_result_captions)
return {'status': status, 'message': 'started a background process to upload subtitles to {uid}' }
def transcribe_translate(downloadable_url="", uid="", language="Autodetect", override_model_size=""):
"""
:param media_id: The twitter media ID object
:param user_id_str: The twitter user ID string
:param tweet_url: tweet URL can potentially not exist in the future, so we can upload on behalf of the user
:return:
"""
status, whisper_result_captions, detected_language = caption_generator(downloadable_url, uid, language, override_model_size)
return json.dumps({"status": status, "captions": whisper_result_captions, "language": detected_language})
def render_api_elements(url_input, download_status, output_text, sub_video, output_file):
with gr.Group(elem_id='fake_ass_group') as api_buttons:
# This is a hack to get APIs registered with the blocks interface
translate_result = gr.Textbox(visible=False)
translate_language = gr.Textbox(visible=False)
gr.Button("API", visible=False)\
.click(api_name='cleanup_output_dir',
fn=cleanup_output_dir, queue=True, inputs=[], outputs=[])
gr.Button("API", visible=False)\
.click(api_name='test_api', queue=True, fn=test_api, inputs=[url_input], outputs=[])
gr.Button("remote_download", visible=False)\
.click(api_name='remote_download', queue=True, fn=remote_download, inputs=[url_input], outputs=[download_status, output_text, translate_result, translate_language])
# creating fake elements just make gradio, cause I can't define an API signature like a sane person
gr.Button("caption", visible=False)\
.click(api_name='caption',
queue=True,
fn=caption,
inputs=[
gr.Text(label='tweet_url'),
gr.Text(label='media_uid'),
gr.Text(label='language (optional)'),
gr.Dropdown(label='Model Size', choices=['base', 'tiny', 'small', 'medium', 'large']),
],
outputs=[
gr.Text(label='response_json')
])
gr.Button("transcribe_translate", visible=False)\
.click(api_name='transcribe_translate',
queue=True,
fn=transcribe_translate,
inputs=[
gr.Text(label='tweet_url'),
gr.Text(label='media_uid'),
gr.Text(label='language (optional)'),
gr.Dropdown(label='Model Size', choices=['base', 'tiny', 'small', 'medium', 'large']),
],
outputs=[
gr.Text(label='response_json')
])
return api_buttons
@anvil.server.callable
def cleanup_output_dir():
#make sure we're in the main directory
os.chdir(pathlib.Path(__file__).parent.parent.absolute())
#delete the output directory contents
for path in Path("output").glob("**/*"):
if path.is_file():
path.unlink()
elif path.is_dir():
rmtree(path) |