Mark Duppenthaler
Update with streaming input
5cae5d7
raw
history blame
No virus
3.35 kB
from __future__ import annotations
import os
import gradio as gr
import numpy as np
import torch
import torchaudio
from seamless_communication.models.inference.translator import Translator
from transformers import pipeline
p = pipeline("automatic-speech-recognition")
from pydub import AudioSegment
import time
from time import sleep
def transcribe(audio, state=""):
# sleep(2)
print('state', state)
text = p(audio)["text"]
state += text + " "
return state
def blocks():
with gr.Blocks() as demo:
total_audio_bytes_state = gr.State(bytes())
total_text_state = gr.State("")
# input_audio = gr.Audio(label="Input Audio", type="filepath", format="mp3")
input_audio = gr.Audio(label="Input Audio", type="filepath", format="mp3", source="microphone", streaming=True)
with gr.Row():
with gr.Column():
stream_as_bytes_btn = gr.Button("Stream as Bytes")
stream_as_bytes_output = gr.Audio(format="bytes", streaming=True)
stream_output_text = gr.Textbox(label="Translated text")
def stream_bytes(audio_file, total_audio_bytes_state, total_text_state):
chunk_size = 30000
print(f"audio_file {audio_file}, size {os.path.getsize(audio_file)}")
with open(audio_file, "rb") as f:
while True:
chunk = f.read(chunk_size)
if chunk:
total_audio_bytes_state += chunk
print('yielding chunk', len(chunk))
print('total audio bytes', len(total_audio_bytes_state))
print(f"Text state: {total_text_state}")
# This does the whole thing every time
# total_text = transcribe(chunk, "")
# yield total_audio_bytes_state, total_text, total_audio_bytes_state, total_text_state
# This translates just the new part every time
total_text_state = transcribe(chunk, total_text_state)
total_text = total_text_state
# total_text = transcribe(chunk, total_text)
yield total_audio_bytes_state, total_text, total_audio_bytes_state, total_text_state
# sleep(3)
else:
break
def clear():
print('clearing')
return [bytes(), ""]
stream_as_bytes_btn.click(stream_bytes, [input_audio, total_audio_bytes_state, total_text_state], [stream_as_bytes_output, stream_output_text, total_audio_bytes_state, total_text_state])
input_audio.change(stream_bytes, [input_audio, total_audio_bytes_state, total_text_state], [stream_as_bytes_output, stream_output_text, total_audio_bytes_state, total_text_state])
input_audio.clear(clear, None, [total_audio_bytes_state, total_text_state])
input_audio.start_recording(clear, None, [total_audio_bytes_state, total_text_state])
demo.queue().launch()
# if __name__ == "__main__":
blocks()