Spaces:
Build error
Build error
import whisper | |
import datetime | |
import gradio as gr | |
import pandas as pd | |
import time | |
import os | |
import numpy as np | |
from sklearn.cluster import AgglomerativeClustering | |
import torch | |
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding | |
from pyannote.audio import Audio, Pipeline | |
from pyannote.core import Segment | |
from gpuinfo import GPUInfo | |
from util import * | |
import wave | |
import contextlib | |
from transformers import pipeline | |
import psutil | |
source_language_list = [key[0] for key in source_languages.items()] | |
MODEL_NAME = "openai/whisper-base.en" | |
lang = "en" | |
device = 0 if torch.cuda.is_available() else "cpu" | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=MODEL_NAME, | |
chunk_length_s=30, | |
device=device, | |
) | |
pipe.model.config.forced_decoder_ids = pipe.tokenizer.get_decoder_prompt_ids(language=lang, task="transcribe") | |
embedding_model = PretrainedSpeakerEmbedding( | |
"speechbrain/spkrec-ecapa-voxceleb", | |
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")) | |
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1", | |
use_auth_token="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") | |
def transcribe(microphone, file_upload): | |
print("Beginning transcribe...") | |
warn_output = "" | |
if (microphone is not None) and (file_upload is not None): | |
warn_output = ( | |
"WARNING: You've uploaded an audio file and used the microphone. " | |
"The recorded file from the microphone will be used and the uploaded audio will be discarded.\n" | |
) | |
elif (microphone is None) and (file_upload is None): | |
return "ERROR: You have to either use the microphone or upload an audio file" | |
file = microphone if microphone is not None else file_upload | |
text = pipe(file)["text"] | |
return warn_output + text | |
def convert_time(secs): | |
return datetime.timedelta(seconds=round(secs)) | |
def speech_to_text(audio_file_path, selected_source_lang, whisper_model, num_speakers, output_types=['csv','docx','md']): | |
""" | |
# Transcribe youtube link using OpenAI Whisper | |
1. Using Open AI's Whisper model to seperate audio into segments and generate transcripts. | |
2. Generating speaker embeddings for each segments. | |
3. Applying agglomerative clustering on the embeddings to identify the speaker for each segment. | |
Speech Recognition is based on models from OpenAI Whisper https://github.com/openai/whisper | |
Speaker diarization model and pipeline from by https://github.com/pyannote/pyannote-audio | |
""" | |
print("Loading model...") | |
torch.cuda.empty_cache() | |
model = whisper.load_model(whisper_model) | |
time_start = time.time() | |
try: | |
upload_name = audio_file_path.orig_name | |
file_name = audio_file_path.name | |
except: | |
upload_name = "output.mp3" | |
file_name = audio_file_path | |
if(audio_file_path == None): | |
raise ValueError("Error no video input") | |
try: | |
_,file_ending = os.path.splitext(f'{file_name}') | |
print(f'file ending is {file_ending}') | |
audio_file = file_name.replace(file_ending, ".wav") | |
print("starting conversion to wav") | |
os.system(f'ffmpeg -y -i "{file_name}" -ar 16000 -ac 1 -c:a pcm_s16le "{audio_file}"') | |
# Get duration | |
with contextlib.closing(wave.open(audio_file,'r')) as f: | |
frames = f.getnframes() | |
rate = f.getframerate() | |
duration = frames / float(rate) | |
print(f"conversion to wav ready, duration of audio file: {duration}") | |
# Transcribe audio | |
options = dict(language=selected_source_lang, beam_size=5, best_of=5) | |
transcribe_options = dict(task="transcribe", **options) | |
result = model.transcribe(audio_file, **transcribe_options) | |
segments = result["segments"] | |
print("starting whisper done with whisper") | |
except Exception as e: | |
raise RuntimeError("Error converting video to audio") | |
try: | |
# Create embedding | |
def segment_embedding(segment): | |
audio = Audio() | |
start = segment["start"] | |
# Whisper overshoots the end timestamp in the last segment | |
end = min(duration, segment["end"]) | |
clip = Segment(start, end) | |
waveform, sample_rate = audio.crop(audio_file, clip) | |
return embedding_model(waveform[None]) | |
embeddings = np.zeros(shape=(len(segments), 192)) | |
for i, segment in enumerate(segments): | |
embeddings[i] = segment_embedding(segment) | |
embeddings = np.nan_to_num(embeddings) | |
print(f'Embedding shape: {embeddings.shape}') | |
# Assign speaker label | |
clustering = AgglomerativeClustering(num_speakers).fit(embeddings) | |
labels = clustering.labels_ | |
for i in range(len(segments)): | |
segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1) | |
# Make output | |
objects = { | |
'Start' : [], | |
'End': [], | |
'Speaker': [], | |
'Text': [] | |
} | |
text = '' | |
for (i, segment) in enumerate(segments): | |
if i == 0 or segments[i - 1]["speaker"] != segment["speaker"]: | |
objects['Start'].append(str(convert_time(segment["start"]))) | |
objects['Speaker'].append(segment["speaker"]) | |
if i != 0: | |
objects['End'].append(str(convert_time(segments[i - 1]["end"]))) | |
objects['Text'].append(text) | |
text = '' | |
text += segment["text"] + ' ' | |
objects['End'].append(str(convert_time(segments[i - 1]["end"]))) | |
objects['Text'].append(text) | |
time_end = time.time() | |
time_diff = time_end - time_start | |
memory = psutil.virtual_memory() | |
gpu_utilization, gpu_memory = GPUInfo.gpu_usage() | |
gpu_utilization = gpu_utilization[0] if len(gpu_utilization) > 0 else 0 | |
gpu_memory = gpu_memory[0] if len(gpu_memory) > 0 else 0 | |
system_info = f""" | |
*Memory: {memory.total / (1024 * 1024 * 1024):.2f}GB, used: {memory.percent}%, available: {memory.available / (1024 * 1024 * 1024):.2f}GB.* | |
*Processing time: {time_diff:.5} seconds.* | |
*GPU Utilization: {gpu_utilization}%, GPU Memory: {gpu_memory}MiB.* | |
""" | |
os.remove(file_name) | |
print(output_types) | |
docx = not set(['docx']).isdisjoint(output_types) | |
markdown = not set(['md']).isdisjoint(output_types) | |
csv = not set(['csv']).isdisjoint(output_types) | |
other_outs = zip_files(otheroutputs(objects, csv=csv, markdown=markdown, docx=docx,upload_name=upload_name)) | |
return pd.DataFrame(objects), system_info, other_outs | |
except Exception as e: | |
raise RuntimeError("Error Running inference with local model", e) | |
def main(): | |
df_init = pd.DataFrame(columns=['Start', 'End', 'Speaker', 'Text']) | |
memory = psutil.virtual_memory() | |
try: | |
cuda_device_model = {torch.cuda.get_device_name(torch.cuda.current_device())} | |
except: | |
cuda_device_model = "CUDA not found" | |
system_info = gr.Markdown(f"*Memory: {memory.total / (1024 * 1024 * 1024):.2f}GB, used: {memory.percent}%, available: {memory.available / (1024 * 1024 * 1024):.2f}GB* Have CUDA?: {torch.cuda.is_available()} CUDA Device: {cuda_device_model}") | |
transcription_df = gr.DataFrame(value=df_init,label="Transcription dataframe", row_count=(0, "dynamic"), max_rows = 10, wrap=True, overflow_row_behaviour='paginate') | |
zip_download = gr.File(label="Output") | |
title = "Whisper speaker diarization" | |
demo = gr.Blocks(title=title) | |
demo.queue(concurrency_count=3) | |
demo.encrypt = False | |
selected_source_lang = gr.Dropdown(choices=source_language_list, type="value", value="en", label="Spoken language in recording", interactive=True) | |
selected_whisper_model = gr.Dropdown(choices=whisper_models, type="value", value="base", label="Selected Whisper model", interactive=True) | |
number_speakers = gr.Number(precision=0, value=2, label="Selected number of speakers", interactive=True) | |
out_formats = ["docx","md","csv"] | |
output_types = gr.CheckboxGroup(choices=out_formats, value=out_formats, label="Select output types", interactive=True) | |
with demo: | |
with gr.Tab("Transcribe Audio Files"): | |
with gr.Row(): | |
gr.HTML('<script defer data-domain="transcribe.orgmycology.com" src="https://a.duckles.nz/js/plausible.js"></script>') | |
gr.Markdown("""## Transcribe your audio files | |
This tool will help you transcribe audio files, tag the speakers (i.e. Speaker 1, Speaker 2). | |
Steps: | |
1. Upload file (drag/drop to upload area or click and select) | |
2. Select language | |
2. Select model version (larger size == slower, but higher accuracy) | |
3. Hint at the number of speakers in the audio file (doesn't have to be exact) | |
3. Choose output formats you'd like | |
4. Click Transcribe! | |
5. Wait for it to finish, and download the outputfile | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
upload_diarize = gr.File(type="file", label="Upload Audio", interactive=True) | |
with gr.Row(): | |
with gr.Column(): | |
selected_source_lang.render() | |
selected_whisper_model.render() | |
number_speakers.render() | |
output_types.render() | |
transcribe_btn = gr.Button(" 🟢 Transcribe! ") | |
transcribe_btn.click(speech_to_text, [upload_diarize, selected_source_lang, selected_whisper_model, number_speakers, output_types], [transcription_df, system_info, zip_download], api_name="diarized_transcribe") | |
with gr.Row(): | |
with gr.Column(): | |
zip_download.render() | |
transcription_df.render() | |
system_info.render() | |
demo.launch(show_error=True, debug=True) | |
if __name__ == "__main__": | |
import sys | |
input_file = sys.argv[1] | |
selected_source_lang = "en" | |
selected_whisper_model = "base" | |
number_speakers = 2 | |
speech_to_text(input_file, selected_source_lang, selected_whisper_model, number_speakers ) | |
else: | |
main() | |