|
|
|
import assemblyai as aai |
|
from transformers import T5Tokenizer, T5ForConditionalGeneration, AutoModelForSequenceClassification, AutoTokenizer |
|
from deep_translator import GoogleTranslator |
|
import spacy |
|
import gradio as gr |
|
from pydub import AudioSegment |
|
import os |
|
from resemblyzer import VoiceEncoder, preprocess_wav |
|
from pathlib import Path |
|
import torch |
|
import numpy as np |
|
import requests |
|
from tempfile import NamedTemporaryFile |
|
from yt_dlp import YoutubeDL |
|
from urllib.parse import urlparse |
|
from sklearn.cluster import AgglomerativeClustering |
|
|
|
|
|
aai.settings.api_key = "00f66859f24e4cefa15c9beefa13e4ce" |
|
transcriber = aai.Transcriber() |
|
|
|
def transcribe_audio(audio_file_path): |
|
transcript = transcriber.transcribe(audio_file_path) |
|
transcription_text = transcript.text if hasattr(transcript, 'text') else "" |
|
transcription_words = transcript.words if hasattr(transcript, 'words') else [] |
|
return transcription_text, transcription_words |
|
|
|
|
|
def translate_text(text, target_language): |
|
translator = GoogleTranslator(source='auto', target=target_language) |
|
chunk_size = 4999 |
|
translated_chunks = [] |
|
for i in range(0, len(text), chunk_size): |
|
chunk = text[i:i + chunk_size] |
|
translated_chunk = translator.translate(chunk) |
|
translated_chunks.append(translated_chunk) |
|
translated_text = " ".join(translated_chunks) |
|
return translated_text |
|
|
|
|
|
tokenizer = T5Tokenizer.from_pretrained('t5-base') |
|
model_t5 = T5ForConditionalGeneration.from_pretrained('t5-base') |
|
|
|
def summarize_text(text, source_language, target_language): |
|
if source_language == 'urdu': |
|
text = translate_text(text, 'en') |
|
inputs = tokenizer.encode("summarize: " + text, return_tensors="pt", max_length=512, truncation=True) |
|
summary_ids = model_t5.generate(inputs, max_length=150, min_length=30, length_penalty=2.0, num_beams=4, early_stopping=True) |
|
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True) |
|
if source_language == 'urdu': |
|
summary = translate_text(summary, target_language) |
|
return summary |
|
|
|
|
|
def ensure_spacy_model(): |
|
try: |
|
nlp = spacy.load("en_core_web_sm") |
|
except OSError: |
|
from spacy.cli import download |
|
download("en_core_web_sm") |
|
nlp = spacy.load("en_core_web_sm") |
|
return nlp |
|
|
|
nlp = ensure_spacy_model() |
|
|
|
def extract_key_points(text): |
|
doc = nlp(text) |
|
tasks = [] |
|
for ent in doc.ents: |
|
if ent.label_ in ["TASK", "DATE", "PERSON", "ORG"]: |
|
tasks.append(ent.text) |
|
return tasks |
|
|
|
|
|
def identify_speakers(audio_file_path): |
|
wav_fpath = Path(audio_file_path) |
|
wav = preprocess_wav(wav_fpath) |
|
|
|
|
|
vad_model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad', model='silero_vad', trust_repo=True) |
|
(get_speech_timestamps, _, _, _, _) = utils |
|
sampling_rate = 16000 |
|
|
|
|
|
speech_timestamps = get_speech_timestamps(wav, vad_model, sampling_rate=sampling_rate) |
|
|
|
encoder = VoiceEncoder() |
|
speaker_segments = [] |
|
|
|
for ts in speech_timestamps: |
|
start, end = ts['start'], ts['end'] |
|
segment = wav[start:end] |
|
speaker_embeds = encoder.embed_utterance(segment) |
|
speaker_segments.append((start / sampling_rate, end / sampling_rate, speaker_embeds)) |
|
|
|
|
|
embeddings = np.vstack([seg[2] for seg in speaker_segments]) |
|
clustering = AgglomerativeClustering(n_clusters=None, distance_threshold=0.75).fit(embeddings) |
|
speaker_labels = clustering.labels_ |
|
|
|
|
|
merged_segments = [] |
|
for i, (start_time, end_time, _) in enumerate(speaker_segments): |
|
label = speaker_labels[i] |
|
if merged_segments and merged_segments[-1][0] == label: |
|
merged_segments[-1] = (label, merged_segments[-1][1], end_time) |
|
else: |
|
merged_segments.append((label, start_time, end_time)) |
|
|
|
return merged_segments, len(np.unique(speaker_labels)) |
|
|
|
|
|
model_sentiment = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment") |
|
tokenizer_sentiment = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment") |
|
|
|
def analyze_sentiment(text): |
|
max_length = 512 |
|
inputs = tokenizer_sentiment(text, return_tensors="pt", truncation=True, padding=True, max_length=max_length) |
|
outputs = model_sentiment(**inputs) |
|
probs = torch.nn.functional.softmax(outputs.logits, dim=-1) |
|
sentiment = torch.argmax(probs, dim=1).item() |
|
sentiment_map = {0: "Negative", 1: "Neutral", 2: "Positive"} |
|
return sentiment_map[sentiment] |
|
|
|
|
|
output_dir = "./output" |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
def download_audio_from_youtube(url): |
|
ydl_opts = { |
|
'format': 'bestaudio/best', |
|
'postprocessors': [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'wav', |
|
'preferredquality': '192', |
|
}], |
|
'outtmpl': './output/%(id)s.%(ext)s', |
|
'quiet': True |
|
} |
|
with YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(url, download=True) |
|
audio_file = ydl.prepare_filename(info_dict) |
|
base, ext = os.path.splitext(audio_file) |
|
audio_file = base + '.wav' |
|
return audio_file |
|
|
|
|
|
def process_meeting(file, url, language): |
|
audio_path = None |
|
if file is not None: |
|
file_path = file.name |
|
audio_path = os.path.join(output_dir, "uploaded_audio.wav") |
|
|
|
|
|
if file_path.endswith(('.mp4', '.avi', '.mov', '.mkv')): |
|
video = AudioSegment.from_file(file_path) |
|
video.export(audio_path, format="wav") |
|
else: |
|
audio_path = file_path |
|
elif url is not None: |
|
parsed_url = urlparse(url) |
|
if "youtube.com" in parsed_url.netloc or "youtu.be" in parsed_url.netloc: |
|
audio_path = download_audio_from_youtube(url) |
|
else: |
|
response = requests.get(url) |
|
with NamedTemporaryFile(delete=False, suffix=".wav") as temp_file: |
|
temp_file.write(response.content) |
|
audio_path = temp_file.name |
|
|
|
if audio_path is None: |
|
return "Please provide either a file or a URL." |
|
|
|
transcription, words = transcribe_audio(audio_path) |
|
|
|
|
|
if language == "urdu": |
|
translated_text = translate_text(transcription, 'ur') |
|
else: |
|
translated_text = transcription |
|
|
|
|
|
summary = summarize_text(translated_text, language, 'ur') |
|
key_points = extract_key_points(translated_text) |
|
|
|
|
|
speakers, num_speakers = identify_speakers(audio_path) |
|
|
|
|
|
speaker_transcripts = {i: [] for i in range(num_speakers)} |
|
|
|
for label, start_time, end_time in speakers: |
|
segment = [word.text for word in words if start_time <= word.start / 1000 <= end_time] |
|
text_segment = " ".join(segment) |
|
speaker_transcripts[label].append(text_segment) |
|
|
|
speaker_details = "" |
|
for label, segments in speaker_transcripts.items(): |
|
speaker_name = f"Speaker {label + 1}" |
|
speaker_details += f"{speaker_name}:\n" |
|
speaker_details += "\n".join(segments) + "\n\n" |
|
|
|
|
|
sentiment = analyze_sentiment(transcription) |
|
|
|
speaker_details = f"Total number of speakers: {num_speakers}\n" + speaker_details |
|
|
|
return transcription, translated_text, key_points, summary, speaker_details, sentiment |
|
|
|
|
|
iface = gr.Interface( |
|
fn=process_meeting, |
|
inputs=[ |
|
gr.File(label="Upload Meeting Recording"), |
|
gr.Textbox(label="Enter Meeting URL"), |
|
gr.Radio(["english", "urdu"], label="Select Summary Language") |
|
], |
|
outputs=[ |
|
gr.Textbox(label="Transcription", lines=20), |
|
gr.Textbox(label="Translated Text", lines=20), |
|
gr.Textbox(label="Key Points", lines=20), |
|
gr.Textbox(label="Summary", lines=20), |
|
gr.Textbox(label="Speakers", lines=20), |
|
gr.Textbox(label="Sentiment", lines=1) |
|
], |
|
title="Smart AI Meeting Assistant", |
|
description=""" |
|
<div style='text-align: center;'>by Ayesha Ameen & Sana Sadiq</div> |
|
<br>Upload your meeting recording or enter a publicly accessible URL and choose the summary language (English or Urdu). |
|
""", |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch(share=True, debug=True) |
|
|