Spaces:
Runtime error
Runtime error
import os | |
from dotenv import load_dotenv | |
import gradio as gr | |
import numpy as np | |
import pandas as pd | |
import torch | |
from pyannote.audio import Pipeline | |
from pydub import AudioSegment | |
from mimetypes import MimeTypes | |
import whisper | |
import tempfile | |
load_dotenv() | |
hg_token = os.getenv("HG_ACCESS_TOKEN") | |
open_api_key = os.getenv("OPENAI_API_KEY") | |
if hg_token == None: | |
print('''No hugging face access token set. | |
You need to set it via an .env or environment variable HG_ACCESS_TOKEN''') | |
exit(1) | |
def diarization(audio) -> np.array: | |
""" | |
Receives a pydub AudioSegment and returns an numpy array with all segments. | |
""" | |
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=hg_token) | |
audio.export("/tmp/dz.wav", format="wav") | |
diarization = pipeline("/tmp/dz.wav") | |
return pd.DataFrame(list(diarization.itertracks(yield_label=True)),columns=["Segment","Trackname", "Speaker"]) | |
def combine_segments(df): | |
grouped_df = df.groupby((df['Speaker'] != df['Speaker'].shift()).cumsum()) | |
return grouped_df.agg({'Segment': lambda x: x.min() | x.max(), | |
'Trackname': 'first', | |
'Speaker': 'first'}) | |
def prep_audio(audio_segment): | |
""" | |
This function preps a pydub AudioSegment for a ml model. | |
Both pyannote audio and whisper require mono audio with a 16khz rate as float32. | |
""" | |
audio_data = audio_segment.set_channels(1).set_frame_rate(16000) | |
return np.array(audio_data.get_array_of_samples()).flatten().astype(np.float32) / 32768.0 | |
def transcribe_row(row, audio): | |
segment = audio[row.start_ms:row.end_ms] | |
if open_api_key == None: | |
whisper_ml = whisper.load_model("large") | |
data = prep_audio(segment) | |
return whisper_ml.transcribe(data)['text'] | |
else: | |
print("Using openai API") | |
# the open ai whisper AI only accepts audio files with a length of at | |
# least 0.1 seconds. | |
if row['end_ms'] - row['start_ms'] < 100: | |
return "" | |
import openai | |
import tempfile | |
temp_file = f"/tmp/{row['Trackname']}.mp3" | |
segment.export(temp_file, format="mp3") | |
print(temp_file) | |
audio_file = open(temp_file, "rb") | |
return openai.Audio.translate("whisper-1", audio_file)['text'] | |
def combine_transcription(segments): | |
text = "" | |
for _,row in segments.iterrows(): | |
text += f"[{row.Speaker}]: {row.text}\n" | |
return text | |
def transcribe(audio_file: str) -> str: | |
audio = AudioSegment.from_file(audio_file) | |
print("diarization") | |
df = diarization(audio) | |
print("combining segments") | |
df = combine_segments(df) | |
df['start'] = df.Segment.apply(lambda x: x.start) | |
df['end'] = df.Segment.apply(lambda x: x.end) | |
df['start_ms'] = df.Segment.apply(lambda x: int(x.start*1000)) | |
df['end_ms'] = df.Segment.apply(lambda x: int(x.end*1000)) | |
print("transcribing segments") | |
df['text'] = df.apply(lambda x: transcribe_row(x, audio), axis=1) | |
return combine_transcription(df) | |
demo = gr.Interface( | |
fn=transcribe, | |
inputs=gr.Audio(type="filepath"), | |
outputs="text", | |
) | |
demo.launch() |