import torch import pytube as pt import torchaudio import gradio as gr import yt_dlp as youtube_dl from transformers import pipeline from transformers.pipelines.audio_utils import ffmpeg_read from pyannote.audio import Pipeline from pydub import AudioSegment import pandas as pd from tqdm import tqdm import shutil device = "cuda:0" if torch.cuda.is_available() else "cpu" torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 # Load model directly from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq from transformers import ( AutomaticSpeechRecognitionPipeline, WhisperForConditionalGeneration, WhisperTokenizer, WhisperProcessor, ) import tempfile import time import os from moviepy.editor import * MODEL_NAME = "nadsoft/hamsa-v0.2-beta" BATCH_SIZE = 8 FILE_LIMIT_MB = 1000 YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files lang = 'ar' device = 0 if torch.cuda.is_available() else "cpu" auth_token = os.environ.get("auth_token") language = "arabic" task = "transcribe" file1 = 'meet.mp4' file2 = 'audio.wav' file3 = 'result.csv' file4 = 'transcripts.csv' #delete the file if exists if os.path.exists(file1): os.remove(file1) if os.path.exists(file2): os.remove(file2) model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME,use_auth_token=auth_token) tokenizer = WhisperTokenizer.from_pretrained(MODEL_NAME, language=language, task=task,use_auth_token=auth_token) processor = WhisperProcessor.from_pretrained(MODEL_NAME, language=language, task=task,use_auth_token=auth_token) feature_extractor = processor.feature_extractor forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task) pipe = pipeline( task="automatic-speech-recognition", model=model, tokenizer=tokenizer, feature_extractor=feature_extractor, chunk_length_s=30, device=device, ) pipe.model.config.forced_decoder_ids = pipe.tokenizer.get_decoder_prompt_ids(language=lang, task="transcribe") def speaker_diarization(): pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", use_auth_token=str(auth_token), # optional) ).to(torch.device("cuda")) # send pipeline to GPU (when available) waveform, sample_rate = torchaudio.load('audio.wav') diarization = pipeline({"waveform": waveform, "sample_rate": sample_rate}) df = pd.DataFrame(columns=['start', 'stop', 'speaker']) for turn, _, speaker in diarization.itertracks(yield_label=True): df = df.append({'start': turn.start, 'stop': turn.end, 'speaker': speaker}, ignore_index=True) # if the speaker is the same for 2 rows or more then merge them new_df = pd.DataFrame(columns=['start', 'stop', 'speaker']) for i in range(len(df)): if i == 0: new_df = new_df.append({'start': df['start'][i], 'stop': df['stop'][i], 'speaker': df['speaker'][i]}, ignore_index=True) else: if df['speaker'][i] == df['speaker'][i-1]: new_df['stop'][len(new_df)-1] = df['stop'][i] else: new_df = new_df.append({'start': df['start'][i], 'stop': df['stop'][i], 'speaker': df['speaker'][i]}, ignore_index=True) new_df.to_csv('result.csv', index=False) def save_audio_chunks(data_path, new_df): # load the audio file audio = AudioSegment.from_wav('audio.wav') # save each chunk for i in tqdm(range(len(new_df))): start = new_df['start'][i]* 1000 stop = new_df['stop'][i]* 1000 audio_chunk = audio[start:stop] audio_chunk.export(data_path + '/audio_'+new_df['speaker'][i] + '_'+ str(i) + '.wav', format="wav") def download(url): os.system('yt-dlp '+url+' -o meet.mp4') def mp4_2_audio(): video = VideoFileClip("meet.mp4") audio = video.audio audio.write_audiofile('audio.wav') def random_response(message): if os.path.exists('data/'): shutil.rmtree('data/') #delete the file if exists if os.path.exists(file1): os.remove(file1) if os.path.exists(file2): os.remove(file2) if os.path.exists(file3): os.remove(file3) if os.path.exists(file4): os.remove(file4) download(message) mp4_2_audio() full_transcript = pipe('audio.wav')['text'] print('full trans : ', full_transcript ) data_path = 'data' #check if the folder exists if not os.path.exists(data_path): os.makedirs(data_path) speaker_diarization() df = pd.read_csv('result.csv') #get all the speakers and speakers = df['speaker'].unique() #create a new dataframe speaker and transcript new_df = pd.DataFrame(columns=['speaker', 'transcript','index']) # save the audio chunks save_audio_chunks(data_path, df) # for each speaker speakers_all = df['speaker'].unique() # make a for a list for each speaker audio path from the data folder all_audio_files = [audio for audio in os.listdir(data_path) if audio.endswith('.wav')] # loop over the audio files and add the speaker and transcript to the new dataframe for audio in tqdm(all_audio_files): try: # get the speaker name the name is (audio_SPEAKER_04_3) the speaker is the second part and the number is the third part and the number after the _ is the index speaker = audio.split('_')[1] speaker_num = audio.split('_')[2] speaker = speaker + '_' + speaker_num index = audio.split('_')[3].split('.')[0] # get the transcript transcript = pipe(data_path + '/' + audio)['text'] except: transcript = "no_text" # append to the new dataframe new_df = new_df.append({'speaker': speaker, 'transcript': transcript , 'index': index }, ignore_index=True) # save the new dataframe new_df.to_csv('transcripts.csv', index=False) #make sure the index is int and the data sorted by the index from 0 to the end new_df['index'] = new_df['index'].astype(int) return full_transcript, 'meet.mp4' , new_df.sort_values(by=['index']) demo = gr.Interface( fn=random_response, inputs=[gr.Textbox(label='Tiktok Video URL')], outputs=[gr.Textbox(rtl = True , text_align = 'rights',label='Full text transcript'),gr.Video(label='Tiktok Video'),gr.Dataframe(label='Speakers')], theme=gr.themes.Monochrome(), ) if __name__ == "__main__": demo.queue().launch(share=True)