Hamsa-Tiktok / app.py
Ahmed107's picture
Update app.py
c796c98 verified
raw
history blame
6.49 kB
import torch
import pytube as pt
import torchaudio
import gradio as gr
import yt_dlp as youtube_dl
from transformers import pipeline
from transformers.pipelines.audio_utils import ffmpeg_read
from pyannote.audio import Pipeline
from pydub import AudioSegment
import pandas as pd
from tqdm import tqdm
import shutil
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
# Load model directly
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq
from transformers import (
AutomaticSpeechRecognitionPipeline,
WhisperForConditionalGeneration,
WhisperTokenizer,
WhisperProcessor,
)
import tempfile
import time
import os
from moviepy.editor import *
MODEL_NAME = "nadsoft/hamsa-v0.2-beta"
BATCH_SIZE = 8
FILE_LIMIT_MB = 1000
YT_LENGTH_LIMIT_S = 3600 # limit to 1 hour YouTube files
lang = 'ar'
device = 0 if torch.cuda.is_available() else "cpu"
auth_token = os.environ.get("auth_token")
language = "arabic"
task = "transcribe"
file1 = 'meet.mp4'
file2 = 'audio.wav'
file3 = 'result.csv'
file4 = 'transcripts.csv'
#delete the file if exists
if os.path.exists(file1):
os.remove(file1)
if os.path.exists(file2):
os.remove(file2)
model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME,use_auth_token=auth_token)
tokenizer = WhisperTokenizer.from_pretrained(MODEL_NAME, language=language, task=task,use_auth_token=auth_token)
processor = WhisperProcessor.from_pretrained(MODEL_NAME, language=language, task=task,use_auth_token=auth_token)
feature_extractor = processor.feature_extractor
forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task)
pipe = pipeline(
task="automatic-speech-recognition",
model=model,
tokenizer=tokenizer,
feature_extractor=feature_extractor,
chunk_length_s=30,
device=device,
)
pipe.model.config.forced_decoder_ids = pipe.tokenizer.get_decoder_prompt_ids(language=lang, task="transcribe")
def speaker_diarization():
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=str(auth_token), # optional)
).to(torch.device("cuda"))
# send pipeline to GPU (when available)
waveform, sample_rate = torchaudio.load('audio.wav')
diarization = pipeline({"waveform": waveform, "sample_rate": sample_rate})
df = pd.DataFrame(columns=['start', 'stop', 'speaker'])
for turn, _, speaker in diarization.itertracks(yield_label=True):
df = df.append({'start': turn.start, 'stop': turn.end, 'speaker': speaker}, ignore_index=True)
# if the speaker is the same for 2 rows or more then merge them
new_df = pd.DataFrame(columns=['start', 'stop', 'speaker'])
for i in range(len(df)):
if i == 0:
new_df = new_df.append({'start': df['start'][i], 'stop': df['stop'][i], 'speaker': df['speaker'][i]}, ignore_index=True)
else:
if df['speaker'][i] == df['speaker'][i-1]:
new_df['stop'][len(new_df)-1] = df['stop'][i]
else:
new_df = new_df.append({'start': df['start'][i], 'stop': df['stop'][i], 'speaker': df['speaker'][i]}, ignore_index=True)
new_df.to_csv('result.csv', index=False)
def save_audio_chunks(data_path, new_df):
# load the audio file
audio = AudioSegment.from_wav('audio.wav')
# save each chunk
for i in tqdm(range(len(new_df))):
start = new_df['start'][i]* 1000
stop = new_df['stop'][i]* 1000
audio_chunk = audio[start:stop]
audio_chunk.export(data_path + '/audio_'+new_df['speaker'][i] + '_'+ str(i) + '.wav', format="wav")
def download(url):
os.system('yt-dlp '+url+' -o meet.mp4')
def mp4_2_audio():
video = VideoFileClip("meet.mp4")
audio = video.audio
audio.write_audiofile('audio.wav')
def random_response(message):
if os.path.exists('data/'):
shutil.rmtree('data/')
#delete the file if exists
if os.path.exists(file1):
os.remove(file1)
if os.path.exists(file2):
os.remove(file2)
if os.path.exists(file3):
os.remove(file3)
if os.path.exists(file4):
os.remove(file4)
download(message)
mp4_2_audio()
full_transcript = pipe('audio.wav')['text']
print('full trans : ', full_transcript )
data_path = 'data'
#check if the folder exists
if not os.path.exists(data_path):
os.makedirs(data_path)
speaker_diarization()
df = pd.read_csv('result.csv')
#get all the speakers and
speakers = df['speaker'].unique()
#create a new dataframe speaker and transcript
new_df = pd.DataFrame(columns=['speaker', 'transcript','index'])
# save the audio chunks
save_audio_chunks(data_path, df)
# for each speaker
speakers_all = df['speaker'].unique()
# make a for a list for each speaker audio path from the data folder
all_audio_files = [audio for audio in os.listdir(data_path) if audio.endswith('.wav')]
# loop over the audio files and add the speaker and transcript to the new dataframe
for audio in tqdm(all_audio_files):
try:
# get the speaker name the name is (audio_SPEAKER_04_3) the speaker is the second part and the number is the third part and the number after the _ is the index
speaker = audio.split('_')[1]
speaker_num = audio.split('_')[2]
speaker = speaker + '_' + speaker_num
index = audio.split('_')[3].split('.')[0]
# get the transcript
transcript = pipe(data_path + '/' + audio)['text']
except:
transcript = "no_text"
# append to the new dataframe
new_df = new_df.append({'speaker': speaker, 'transcript': transcript , 'index': index
}, ignore_index=True)
# save the new dataframe
new_df.to_csv('transcripts.csv', index=False)
#make sure the index is int and the data sorted by the index from 0 to the end
new_df['index'] = new_df['index'].astype(int)
return full_transcript, 'meet.mp4' , new_df.sort_values(by=['index'])
demo = gr.Interface(
fn=random_response,
inputs=[gr.Textbox(label='Tiktok Video URL')],
outputs=[gr.Textbox(rtl = True , text_align = 'rights',label='Full text transcript'),gr.Video(label='Tiktok Video'),gr.Dataframe(label='Speakers')],
theme=gr.themes.Monochrome(),
)
if __name__ == "__main__":
demo.queue().launch(share=True)