Spaces:
Runtime error
Runtime error
import torch | |
import pytube as pt | |
import torchaudio | |
import gradio as gr | |
import yt_dlp as youtube_dl | |
from transformers import pipeline | |
from transformers.pipelines.audio_utils import ffmpeg_read | |
from pyannote.audio import Pipeline | |
from pydub import AudioSegment | |
import pandas as pd | |
from tqdm import tqdm | |
import shutil | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
# Load model directly | |
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq | |
from transformers import ( | |
AutomaticSpeechRecognitionPipeline, | |
WhisperForConditionalGeneration, | |
WhisperTokenizer, | |
WhisperProcessor, | |
) | |
import tempfile | |
import time | |
import os | |
from moviepy.editor import * | |
MODEL_NAME = "nadsoft/hamsa-v0.2-beta" | |
BATCH_SIZE = 8 | |
FILE_LIMIT_MB = 1000 | |
YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files | |
lang = 'ar' | |
device = 0 if torch.cuda.is_available() else "cpu" | |
auth_token = os.environ.get("auth_token") | |
language = "arabic" | |
task = "transcribe" | |
file1 = 'meet.mp4' | |
file2 = 'audio.wav' | |
file3 = 'result.csv' | |
file4 = 'transcripts.csv' | |
#delete the file if exists | |
if os.path.exists(file1): | |
os.remove(file1) | |
if os.path.exists(file2): | |
os.remove(file2) | |
model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME,use_auth_token=auth_token) | |
tokenizer = WhisperTokenizer.from_pretrained(MODEL_NAME, language=language, task=task,use_auth_token=auth_token) | |
processor = WhisperProcessor.from_pretrained(MODEL_NAME, language=language, task=task,use_auth_token=auth_token) | |
feature_extractor = processor.feature_extractor | |
forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task) | |
pipe = pipeline( | |
task="automatic-speech-recognition", | |
model=model, | |
tokenizer=tokenizer, | |
feature_extractor=feature_extractor, | |
chunk_length_s=30, | |
device=device, | |
) | |
pipe.model.config.forced_decoder_ids = pipe.tokenizer.get_decoder_prompt_ids(language=lang, task="transcribe") | |
def speaker_diarization(): | |
pipeline = Pipeline.from_pretrained( | |
"pyannote/speaker-diarization-3.1", | |
use_auth_token=str(auth_token), # optional) | |
).to(torch.device("cuda")) | |
# send pipeline to GPU (when available) | |
waveform, sample_rate = torchaudio.load('audio.wav') | |
diarization = pipeline({"waveform": waveform, "sample_rate": sample_rate}) | |
df = pd.DataFrame(columns=['start', 'stop', 'speaker']) | |
for turn, _, speaker in diarization.itertracks(yield_label=True): | |
df = df.append({'start': turn.start, 'stop': turn.end, 'speaker': speaker}, ignore_index=True) | |
# if the speaker is the same for 2 rows or more then merge them | |
new_df = pd.DataFrame(columns=['start', 'stop', 'speaker']) | |
for i in range(len(df)): | |
if i == 0: | |
new_df = new_df.append({'start': df['start'][i], 'stop': df['stop'][i], 'speaker': df['speaker'][i]}, ignore_index=True) | |
else: | |
if df['speaker'][i] == df['speaker'][i-1]: | |
new_df['stop'][len(new_df)-1] = df['stop'][i] | |
else: | |
new_df = new_df.append({'start': df['start'][i], 'stop': df['stop'][i], 'speaker': df['speaker'][i]}, ignore_index=True) | |
new_df.to_csv('result.csv', index=False) | |
def save_audio_chunks(data_path, new_df): | |
# load the audio file | |
audio = AudioSegment.from_wav('audio.wav') | |
# save each chunk | |
for i in tqdm(range(len(new_df))): | |
start = new_df['start'][i]* 1000 | |
stop = new_df['stop'][i]* 1000 | |
audio_chunk = audio[start:stop] | |
audio_chunk.export(data_path + '/audio_'+new_df['speaker'][i] + '_'+ str(i) + '.wav', format="wav") | |
def download(url): | |
os.system('yt-dlp '+url+' -o meet.mp4') | |
def mp4_2_audio(): | |
video = VideoFileClip("meet.mp4") | |
audio = video.audio | |
audio.write_audiofile('audio.wav') | |
def random_response(message): | |
if os.path.exists('data/'): | |
shutil.rmtree('data/') | |
#delete the file if exists | |
if os.path.exists(file1): | |
os.remove(file1) | |
if os.path.exists(file2): | |
os.remove(file2) | |
if os.path.exists(file3): | |
os.remove(file3) | |
if os.path.exists(file4): | |
os.remove(file4) | |
download(message) | |
mp4_2_audio() | |
full_transcript = pipe('audio.wav')['text'] | |
print('full trans : ', full_transcript ) | |
data_path = 'data' | |
#check if the folder exists | |
if not os.path.exists(data_path): | |
os.makedirs(data_path) | |
speaker_diarization() | |
df = pd.read_csv('result.csv') | |
#get all the speakers and | |
speakers = df['speaker'].unique() | |
#create a new dataframe speaker and transcript | |
new_df = pd.DataFrame(columns=['speaker', 'transcript','index']) | |
# save the audio chunks | |
save_audio_chunks(data_path, df) | |
# for each speaker | |
speakers_all = df['speaker'].unique() | |
# make a for a list for each speaker audio path from the data folder | |
all_audio_files = [audio for audio in os.listdir(data_path) if audio.endswith('.wav')] | |
# loop over the audio files and add the speaker and transcript to the new dataframe | |
for audio in tqdm(all_audio_files): | |
try: | |
# get the speaker name the name is (audio_SPEAKER_04_3) the speaker is the second part and the number is the third part and the number after the _ is the index | |
speaker = audio.split('_')[1] | |
speaker_num = audio.split('_')[2] | |
speaker = speaker + '_' + speaker_num | |
index = audio.split('_')[3].split('.')[0] | |
# get the transcript | |
transcript = pipe(data_path + '/' + audio)['text'] | |
except: | |
transcript = "no_text" | |
# append to the new dataframe | |
new_df = new_df.append({'speaker': speaker, 'transcript': transcript , 'index': index | |
}, ignore_index=True) | |
# save the new dataframe | |
new_df.to_csv('transcripts.csv', index=False) | |
#make sure the index is int and the data sorted by the index from 0 to the end | |
new_df['index'] = new_df['index'].astype(int) | |
return full_transcript, 'meet.mp4' , new_df.sort_values(by=['index']) | |
demo = gr.Interface( | |
fn=random_response, | |
inputs=[gr.Textbox(label='Tiktok Video URL')], | |
outputs=[gr.Textbox(rtl = True , text_align = 'rights',label='Full text transcript'),gr.Video(label='Tiktok Video'),gr.Dataframe(label='Speakers')], | |
theme=gr.themes.Monochrome(), | |
) | |
if __name__ == "__main__": | |
demo.queue().launch(share=True) | |