Spaces:
Running
on
Zero
Running
on
Zero
import torch | |
import time | |
import moviepy.editor as mp | |
import psutil | |
import gradio as gr | |
import spaces | |
from transformers import pipeline | |
from transformers.pipelines.audio_utils import ffmpeg_read | |
import base64 | |
import requests | |
DEFAULT_MODEL_NAME = "distil-whisper/distil-large-v3" | |
DEFAULT_MODEL_NAME = "openai/whisper-large-v3" | |
BATCH_SIZE = 8 | |
print('start app') | |
device = 0 if torch.cuda.is_available() else "cpu" | |
if device == "cpu": | |
DEFAULT_MODEL_NAME = "openai/whisper-tiny" | |
def load_pipeline(model_name): | |
return pipeline( | |
task="automatic-speech-recognition", | |
model=model_name, | |
chunk_length_s=30, | |
device=device, | |
) | |
pipe = load_pipeline(DEFAULT_MODEL_NAME) | |
openai_pipe=load_pipeline("openai/whisper-large-v3") | |
default_pipe = load_pipeline(DEFAULT_MODEL_NAME) | |
#pipe = None | |
from gpustat import GPUStatCollection | |
def update_gpu_status(): | |
if torch.cuda.is_available() == False: | |
return "No Nvidia Device" | |
try: | |
gpu_stats = GPUStatCollection.new_query() | |
for gpu in gpu_stats: | |
# Assuming you want to monitor the first GPU, index 0 | |
gpu_id = gpu.index | |
gpu_name = gpu.name | |
gpu_utilization = gpu.utilization | |
memory_used = gpu.memory_used | |
memory_total = gpu.memory_total | |
memory_utilization = (memory_used / memory_total) * 100 | |
gpu_status=(f"GPU {gpu_id}: {gpu_name}, Utilization: {gpu_utilization}%, Memory Used: {memory_used}MB, Memory Total: {memory_total}MB, Memory Utilization: {memory_utilization:.2f}%") | |
return gpu_status | |
except Exception as e: | |
print(f"Error getting GPU stats: {e}") | |
return torch_update_gpu_status() | |
def torch_update_gpu_status(): | |
if torch.cuda.is_available(): | |
gpu_info = torch.cuda.get_device_name(0) | |
gpu_memory = torch.cuda.mem_get_info(0) | |
total_memory = gpu_memory[1] / (1024 * 1024) | |
free_memory=gpu_memory[0] /(1024 *1024) | |
used_memory = (gpu_memory[1] - gpu_memory[0]) / (1024 * 1024) | |
gpu_status = f"GPU: {gpu_info} Free Memory:{free_memory}MB Total Memory: {total_memory:.2f} MB Used Memory: {used_memory:.2f} MB" | |
else: | |
gpu_status = "No GPU available" | |
return gpu_status | |
def update_cpu_status(): | |
import datetime | |
# Get the current time | |
current_time = datetime.datetime.now().time() | |
# Convert the time to a string | |
time_str = current_time.strftime("%H:%M:%S") | |
cpu_percent = psutil.cpu_percent() | |
cpu_status = f"CPU Usage: {cpu_percent}% {time_str}" | |
return cpu_status | |
def update_status(): | |
gpu_status = update_gpu_status() | |
cpu_status = update_cpu_status() | |
sys_status=gpu_status+"\n\n"+cpu_status | |
return sys_status | |
def refresh_status(): | |
return update_status() | |
def transcribe(audio_path, model_name): | |
print(str(time.time())+' start transcribe ') | |
if audio_path is None: | |
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
if model_name is None: | |
model_name=DEFAULT_MODEL_NAME | |
audio_path=audio_path.strip() | |
model_name=model_name.strip() | |
global pipe | |
if model_name != pipe.model.name_or_path: | |
print("old model is:"+ pipe.model.name_or_path ) | |
if model_name=="openai/whisper-large-v3": | |
pipe=openai_pipe | |
print(str(time.time())+" use openai model " + pipe.model.name_or_path) | |
elif model_name==DEFAULT_MODEL_NAME: | |
pipe=default_pipe | |
print(str(time.time())+" use default model " + pipe.model.name_or_path) | |
else: | |
print(str(time.time())+' start load model ' + model_name) | |
pipe = load_pipeline(model_name) | |
print(str(time.time())+' finished load model ' + model_name) | |
start_time = time.time() # Record the start time | |
print(str(time.time())+' start processing and set recording start time point') | |
# Load the audio file and calculate its duration | |
audio = mp.AudioFileClip(audio_path) | |
audio_duration = audio.duration | |
print(str(time.time())+' start pipe ') | |
text = pipe(audio_path, batch_size=BATCH_SIZE, generate_kwargs={"task": "transcribe"}, return_timestamps=True)["text"] | |
end_time = time.time() # Record the end time | |
transcription_time = end_time - start_time # Calculate the transcription time | |
# Create the transcription time output with additional information | |
transcription_time_output = ( | |
f"Transcription Time: {transcription_time:.2f} seconds\n" | |
f"Audio Duration: {audio_duration:.2f} seconds\n" | |
f"Model Used: {model_name}\n" | |
f"Device Used: {'GPU' if torch.cuda.is_available() else 'CPU'}" | |
) | |
print(str(time.time())+' return transcribe '+ text ) | |
return text, transcription_time_output | |
def handle_upload_audio(audio_path,model_name,old_transcription=''): | |
print('old_trans:' + old_transcription) | |
(text,transcription_time_output)=transcribe(audio_path,model_name) | |
return text+'\n\n'+old_transcription, transcription_time_output | |
def handle_base64_audio(base64_data, model_name, old_transcription=''): | |
# Decode base64 data and save it as a temporary audio file | |
binary_data = base64.b64decode(base64_data) | |
audio_path = "temp_audio.wav" | |
with open(audio_path, "wb") as f: | |
f.write(binary_data) | |
# Transcribe the audio file | |
(text, transcription_time_output) = transcribe(audio_path, model_name) | |
# Remove the temporary audio file | |
import os | |
os.remove(audio_path) | |
return text + '\n\n' + old_transcription, transcription_time_output | |
graudio=gr.Audio(type="filepath",show_download_button=True) | |
grmodel_textbox=gr.Textbox( | |
label="Model Name", | |
value=DEFAULT_MODEL_NAME, | |
placeholder="Enter the model name", | |
info="Some available models: distil-whisper/distil-large-v3 distil-whisper/distil-medium.en Systran/faster-distil-whisper-large-v3 Systran/faster-whisper-large-v3 Systran/faster-whisper-medium openai/whisper-tiny, openai/whisper-base, openai/whisper-medium, openai/whisper-large-v3", | |
) | |
groutputs=[gr.TextArea(label="Transcription",elem_id="transcription_textarea",interactive=True,lines=20,show_copy_button=True), | |
gr.TextArea(label="Transcription Info",interactive=True,show_copy_button=True)] | |
mf_transcribe = gr.Interface( | |
fn=handle_upload_audio, | |
inputs=[ | |
graudio, #"numpy" or filepath | |
#gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), | |
grmodel_textbox, | |
], | |
outputs=groutputs, | |
theme="huggingface", | |
title="Whisper Transcription", | |
description=( | |
"Scroll to Bottom to show system status. " | |
"Transcribe long-form microphone or audio file after uploaded audio! " | |
"Notice: the space need some time to get a gpu to run, so there may be a delay " | |
), | |
allow_flagging="never", | |
) | |
grmodel_textbox_64=gr.Textbox( | |
label="Model Name", | |
value=DEFAULT_MODEL_NAME, | |
placeholder="Enter the model name", | |
info="Some available models: distil-whisper/distil-large-v3 distil-whisper/distil-medium.en Systran/faster-distil-whisper-large-v3 Systran/faster-whisper-large-v3 Systran/faster-whisper-medium openai/whisper-tiny, openai/whisper-base, openai/whisper-medium, openai/whisper-large-v3", | |
) | |
groutputs_64=[gr.TextArea(label="Transcription 64",elem_id="transcription_textarea_64",interactive=True,lines=20,show_copy_button=True), | |
gr.TextArea(label="Transcription Info 64",interactive=True,show_copy_button=True)] | |
base_transcribe= gr.Interface( | |
fn=handle_base64_audio, | |
inputs=[ | |
gr.Textbox(label="Base64 Audio Data URL", placeholder="Enter the base64 audio data URL"), | |
grmodel_textbox_64, | |
], | |
outputs=groutputs_64, | |
) | |
demo = gr.Blocks() | |
def onload(): | |
while True: | |
print('onload loop excution') | |
time.sleep(2) | |
return update_status(); | |
with demo: | |
tabbed_interface = gr.TabbedInterface( | |
[ | |
mf_transcribe, | |
base_transcribe | |
], | |
["Audio", "Base64 Audio"], | |
) | |
with gr.Row(): | |
refresh_button = gr.Button("Refresh Status") | |
sys_status_output = gr.Textbox(label="System Status", interactive=False) | |
# Link the refresh button to the refresh_status function | |
refresh_button.click(refresh_status, None, [sys_status_output]) | |
graudio.stop_recording(handle_upload_audio, inputs=[graudio, grmodel_textbox, groutputs[0]], outputs=groutputs) | |
graudio.upload(handle_upload_audio, inputs=[graudio, grmodel_textbox, groutputs[0]], outputs=groutputs) | |
# Load the initial status using update_status function | |
demo.load(onload, inputs=None, outputs=None, queue=False) | |
# Launch the Gradio app | |
demo.launch(share=True) | |
print('launched\n\n') | |