AyeshaAmeen's picture
Update app.py
42a75a1 verified
import assemblyai as aai
from transformers import T5Tokenizer, T5ForConditionalGeneration, AutoModelForSequenceClassification, AutoTokenizer
from deep_translator import GoogleTranslator
import spacy
import gradio as gr
from pydub import AudioSegment
import os
from resemblyzer import VoiceEncoder, preprocess_wav
from pathlib import Path
import torch
import numpy as np
import requests
from tempfile import NamedTemporaryFile
from yt_dlp import YoutubeDL
from urllib.parse import urlparse
from sklearn.cluster import AgglomerativeClustering
# Step 1: Set AssemblyAI API Key
aai.settings.api_key = "00f66859f24e4cefa15c9beefa13e4ce"
transcriber = aai.Transcriber()
def transcribe_audio(audio_file_path):
transcript = transcriber.transcribe(audio_file_path)
transcription_text = transcript.text if hasattr(transcript, 'text') else ""
transcription_words = transcript.words if hasattr(transcript, 'words') else []
return transcription_text, transcription_words
# Step 2: Language Translation (English and Urdu) with chunking
def translate_text(text, target_language):
translator = GoogleTranslator(source='auto', target=target_language)
chunk_size = 4999 # Ensure we do not exceed the limit
translated_chunks = []
for i in range(0, len(text), chunk_size):
chunk = text[i:i + chunk_size]
translated_chunk = translator.translate(chunk)
translated_chunks.append(translated_chunk)
translated_text = " ".join(translated_chunks)
return translated_text
# Step 3: Summarization with T5 Model
tokenizer = T5Tokenizer.from_pretrained('t5-base')
model_t5 = T5ForConditionalGeneration.from_pretrained('t5-base')
def summarize_text(text, source_language, target_language):
if source_language == 'urdu':
text = translate_text(text, 'en') # Translate to English for summarization
inputs = tokenizer.encode("summarize: " + text, return_tensors="pt", max_length=512, truncation=True)
summary_ids = model_t5.generate(inputs, max_length=150, min_length=30, length_penalty=2.0, num_beams=4, early_stopping=True)
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
if source_language == 'urdu':
summary = translate_text(summary, target_language) # Translate back to Urdu
return summary
# Step 4: Key Points Extraction with spaCy
def ensure_spacy_model():
try:
nlp = spacy.load("en_core_web_sm")
except OSError:
from spacy.cli import download
download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm")
return nlp
nlp = ensure_spacy_model()
def extract_key_points(text):
doc = nlp(text)
tasks = []
for ent in doc.ents:
if ent.label_ in ["TASK", "DATE", "PERSON", "ORG"]:
tasks.append(ent.text)
return tasks
# Step 5: Speaker Identification using silero and resemblyzer
def identify_speakers(audio_file_path):
wav_fpath = Path(audio_file_path)
wav = preprocess_wav(wav_fpath)
# Load the silero VAD model and utilities
vad_model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad', model='silero_vad', trust_repo=True)
(get_speech_timestamps, _, _, _, _) = utils
sampling_rate = 16000 # Set the sampling rate
# Get speech timestamps using silero VAD
speech_timestamps = get_speech_timestamps(wav, vad_model, sampling_rate=sampling_rate)
encoder = VoiceEncoder()
speaker_segments = []
for ts in speech_timestamps:
start, end = ts['start'], ts['end']
segment = wav[start:end]
speaker_embeds = encoder.embed_utterance(segment)
speaker_segments.append((start / sampling_rate, end / sampling_rate, speaker_embeds))
# Use AgglomerativeClustering to cluster the speakers
embeddings = np.vstack([seg[2] for seg in speaker_segments])
clustering = AgglomerativeClustering(n_clusters=None, distance_threshold=0.75).fit(embeddings)
speaker_labels = clustering.labels_
# Merge adjacent segments identified as the same speaker
merged_segments = []
for i, (start_time, end_time, _) in enumerate(speaker_segments):
label = speaker_labels[i]
if merged_segments and merged_segments[-1][0] == label:
merged_segments[-1] = (label, merged_segments[-1][1], end_time)
else:
merged_segments.append((label, start_time, end_time))
return merged_segments, len(np.unique(speaker_labels))
# Step 6: Sentiment Analysis using transformers
model_sentiment = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")
tokenizer_sentiment = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")
def analyze_sentiment(text):
max_length = 512 # Set the maximum length for the tokenizer
inputs = tokenizer_sentiment(text, return_tensors="pt", truncation=True, padding=True, max_length=max_length)
outputs = model_sentiment(**inputs)
probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
sentiment = torch.argmax(probs, dim=1).item()
sentiment_map = {0: "Negative", 1: "Neutral", 2: "Positive"}
return sentiment_map[sentiment]
# Ensure the directory exists
output_dir = "./output"
os.makedirs(output_dir, exist_ok=True)
# Step 7: Download audio from YouTube using yt-dlp
def download_audio_from_youtube(url):
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
'preferredquality': '192',
}],
'outtmpl': './output/%(id)s.%(ext)s',
'quiet': True
}
with YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(url, download=True)
audio_file = ydl.prepare_filename(info_dict)
base, ext = os.path.splitext(audio_file)
audio_file = base + '.wav'
return audio_file
# Step 8: Gradio Interface Setup
def process_meeting(file, url, language):
audio_path = None
if file is not None:
file_path = file.name
audio_path = os.path.join(output_dir, "uploaded_audio.wav")
# Convert video to audio if necessary
if file_path.endswith(('.mp4', '.avi', '.mov', '.mkv')):
video = AudioSegment.from_file(file_path)
video.export(audio_path, format="wav")
else:
audio_path = file_path
elif url is not None:
parsed_url = urlparse(url)
if "youtube.com" in parsed_url.netloc or "youtu.be" in parsed_url.netloc:
audio_path = download_audio_from_youtube(url)
else:
response = requests.get(url)
with NamedTemporaryFile(delete=False, suffix=".wav") as temp_file:
temp_file.write(response.content)
audio_path = temp_file.name
if audio_path is None:
return "Please provide either a file or a URL."
transcription, words = transcribe_audio(audio_path)
# Step 2: Translation based on user-selected language
if language == "urdu":
translated_text = translate_text(transcription, 'ur')
else: # default to English
translated_text = transcription
# Step 3: Summarization and Key Points Extraction
summary = summarize_text(translated_text, language, 'ur')
key_points = extract_key_points(translated_text)
# Step 4: Speaker Identification
speakers, num_speakers = identify_speakers(audio_path)
# Map speakers to their spoken text
speaker_transcripts = {i: [] for i in range(num_speakers)}
for label, start_time, end_time in speakers:
segment = [word.text for word in words if start_time <= word.start / 1000 <= end_time]
text_segment = " ".join(segment)
speaker_transcripts[label].append(text_segment)
speaker_details = ""
for label, segments in speaker_transcripts.items():
speaker_name = f"Speaker {label + 1}"
speaker_details += f"{speaker_name}:\n"
speaker_details += "\n".join(segments) + "\n\n"
# Step 5: Sentiment Analysis
sentiment = analyze_sentiment(transcription)
speaker_details = f"Total number of speakers: {num_speakers}\n" + speaker_details
return transcription, translated_text, key_points, summary, speaker_details, sentiment
# Step 9: Launch Gradio Interface with Scrollbars
iface = gr.Interface(
fn=process_meeting,
inputs=[
gr.File(label="Upload Meeting Recording"),
gr.Textbox(label="Enter Meeting URL"),
gr.Radio(["english", "urdu"], label="Select Summary Language")
],
outputs=[
gr.Textbox(label="Transcription", lines=20),
gr.Textbox(label="Translated Text", lines=20),
gr.Textbox(label="Key Points", lines=20),
gr.Textbox(label="Summary", lines=20),
gr.Textbox(label="Speakers", lines=20),
gr.Textbox(label="Sentiment", lines=1)
],
title="Smart AI Meeting Assistant",
description="""
<div style='text-align: center;'>by Ayesha Ameen & Sana Sadiq</div>
<br>Upload your meeting recording or enter a publicly accessible URL and choose the summary language (English or Urdu).
""",
)
if __name__ == "__main__":
iface.launch(share=True, debug=True)