whispernotes / app.py
miaohaiyuan's picture
init deliver
36a4bf9
raw
history blame
19.8 kB
import gradio as gr
import json
import os
import sys
import tempfile
import whisperx
import ffmpeg
import tiktoken
from io import BytesIO
from glob import glob
from dotenv import load_dotenv
from download import download_video_audio, delete_download
from groq import Groq
from openai import OpenAI
from langdetect import detect
from langdetect.lang_detect_exception import LangDetectException
from translation_agent.utils import *
os.environ["FFMPEG_PATH"] = "D:\\ffmpeg\\bin\\ffmpeg.exe"
os.environ["no_proxy"] = "localhost, 127.0.0.1, ::1"
load_dotenv()
GROQ_API_KEY = os.environ.get("GROQ_API_KEY", None)
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"))
model = os.getenv("OPENAI_MODEL") or "gpt-4o"
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB
FILE_TOO_LARGE_MESSAGE = "The audio file is too large. If you used a YouTube link, please try a shorter video clip. If you uploaded an audio file, try trimming or compressing the audio to under 100 MB."
audio_file_path = None
progress=gr.Progress()
def detect_language(text):
try:
language = detect(text)
return language
except LangDetectException as e:
print(f"Error detecting language: {e}")
return None
class GenerationStatistics:
def __init__(self, input_time=0, output_time=0, input_tokens=0, output_tokens=0, total_time=0, model_name=model):
self.input_time = input_time
self.output_time = output_time
self.input_tokens = input_tokens
self.output_tokens = output_tokens
self.total_time = total_time
self.model_name = model_name
def get_input_speed(self):
return self.input_tokens / self.input_time if self.input_time != 0 else 0
def get_output_speed(self):
return self.output_tokens / self.output_time if self.output_time != 0 else 0
def add(self, other):
if not isinstance(other, GenerationStatistics):
raise TypeError("Can only add GenerationStatistics objects")
self.input_time += other.input_time
self.output_time += other.output_time
self.input_tokens += other.input_tokens
self.output_tokens += other.output_tokens
self.total_time += other.total_time
def __str__(self):
return (f"\n## {self.get_output_speed():.2f} T/s ⚡\nRound trip time: {self.total_time:.2f}s Model: {self.model_name}\n\n"
f"| Metric | Input | Output | Total |\n"
f"|-----------------|----------------|-----------------|----------------|\n"
f"| Speed (T/s) | {self.get_input_speed():.2f} | {self.get_output_speed():.2f} | {(self.input_tokens + self.output_tokens) / self.total_time if self.total_time != 0 else 0:.2f} |\n"
f"| Tokens | {self.input_tokens} | {self.output_tokens} | {self.input_tokens + self.output_tokens} |\n"
f"| Inference Time (s) | {self.input_time:.2f} | {self.output_time:.2f} | {self.total_time:.2f} |")
class NoteSection:
def __init__(self, structure, transcript):
self.structure = structure
self.contents = {title: "" for title in self.flatten_structure(structure)}
def flatten_structure(self, structure):
sections = []
for title, content in structure.items():
sections.append(title)
if isinstance(content, dict):
sections.extend(self.flatten_structure(content))
return sections
def update_content(self, title, new_content):
try:
self.contents[title] += new_content
except TypeError as e:
pass
def return_existing_contents(self, level=1) -> str:
existing_content = ""
for title, content in self.structure.items():
if self.contents[title].strip():
existing_content += f"{'#' * level} {title}\n{self.contents[title]}.\n\n"
if isinstance(content, dict):
existing_content += self.get_markdown_content(content, level + 1)
return existing_content
def get_markdown_content(self, structure=None, level=1):
if structure is None:
structure = self.structure
markdown_content = ""
for title, content in structure.items():
if self.contents[title].strip():
markdown_content += f"{'#' * level} {title}\n{self.contents[title]}.\n\n"
if isinstance(content, dict):
markdown_content += self.get_markdown_content(content, level + 1)
return markdown_content
def transcribe_audio(audio_file):
model = whisperx.load_model("small", device="cuda", compute_type="float16")
result = model.transcribe(audio_file)
transcription = ''
segments = result['segments']
for segment in segments:
transcription += segment['text']
transcription += '\n'
return transcription
def generate_notes_structure(transcript: str, model: str = model, lang: str="en"):
shot_example = """
"Introduction": "Introduction to the AMA session, including the topic of Groq scaling architecture and the panelists",
"Panelist Introductions": "Brief introductions from Igor, Andrew, and Omar, covering their backgrounds and roles at Groq",
"Groq Scaling Architecture Overview": "High-level overview of Groq's scaling architecture, covering hardware, software, and cloud components",
"Hardware Perspective": "Igor's overview of Groq's hardware approach, using an analogy of city traffic management to explain the traditional compute approach and Groq's innovative approach",
"Traditional Compute": "Description of traditional compute approach, including asynchronous nature, queues, and poor utilization of infrastructure",
"Groq's Approach": "Description of Groq's approach, including pre-orchestrated movement of data, low latency, high energy efficiency, and high utilization of resources",
"Hardware Implementation": "Igor's explanation of the hardware implementation, including a comparison of GPU and LPU architectures"
}"""
messages=[
{
"role": "system",
"content": "Write in JSON format:\n\n{\"Title of section goes here\":\"Description of section goes here\",\"Title of section goes here\":\"Description of section goes here\",\"Title of section goes here\":\"Description of section goes here\"}"
},
{
"role": "user",
#"content": f"Please respond in language [{lang}]. ### Transcript {transcript}\n\n### Example\n\n{shot_example}### Instructions\n\nCreate a structure for comprehensive notes on the above Transcript information. The output format is shown in examples. The section titles and content descriptions must be comprehensive and adapt to the content of transcript. Quality over quantity. "
"content": f"Please respond in language [{lang}]. ### Transcript {transcript}\n\n### Instructions\n\nCreate a structure for comprehensive notes on the above Transcript information. The output is json format as shown in system content. Section titles and content descriptions must be comprehensive and adapt to the content of transcript. Quality over quantity. "
}
]
completion = client.chat.completions.create(
model=model,
messages=messages,
temperature=0.3,
#max_tokens=128000,
top_p=1,
stream=False,
stop=None,
)
usage = completion.usage
#statistics_to_return = GenerationStatistics(input_tokens=usage.prompt_tokens, output_tokens=usage.completion_tokens, model_name=model)
return completion.choices[0].message.content
def generate_section(transcript: str, existing_notes: str, section: str, model: str = model, lang: str="en"):
stream = client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": "Respond in language [{lang}]. You are an expert writer. Generate a comprehensive note for the section provided based factually on the transcript provided. Do *not* repeat any content from previous sections."
},
{
"role": "user",
"content": f"Respond in language [{lang}]. ### Transcript\n\n{transcript}\n\n### Existing Notes\n\n{existing_notes}\n\n### Instructions\n\nGenerate comprehensive notes for this section only based on the transcript: \n\n{section}."
}
],
temperature=0.3,
max_tokens=16000,
top_p=1,
stream=True,
stop=None,
)
for chunk in stream:
tokens = chunk.choices[0].delta.content
if tokens:
yield tokens
def process_audio(audio_file, youtube_link):
global audio_file_path
i = 1; j = 3 #for progress
if youtube_link:
j += 1; progress((i,j), desc="download video..."); i += 1
audio_file_path = download_video_audio(youtube_link)
if audio_file_path is None:
return "Failed to download audio from YouTube link. Please try again."
elif audio_file:
audio_file_path = audio_file.name
else:
return "Please provide either an audio file or a YouTube link."
if os.path.getsize(audio_file_path) > MAX_FILE_SIZE:
return FILE_TOO_LARGE_MESSAGE
progress((i,j), desc="Start transcribe audio..."); i += 1
transcription_text = transcribe_audio(audio_file_path)
print("transcription_text=",transcription_text)
encoding = tiktoken.get_encoding("cl100k_base")
token_count = len(encoding.encode(transcription_text))
print("token_count=",token_count)
#transcription_output.label = f"tokens: {token_count}"
lang = detect_language(transcription_text[:100])
print("detect lang=",lang)
progress((i,j), desc="Generating notes structure..."); i += 1
notes_structure = generate_notes_structure(transcription_text, lang=lang)
print("notes_structure=", notes_structure)
progress((i,j), desc="Generating notes section..."); i += 1
try:
notes_structure_json = json.loads(notes_structure)
print("notes_structure_json=",notes_structure_json)
notes = NoteSection(structure=notes_structure_json, transcript=transcription_text)
total_generation_statistics = GenerationStatistics(model_name=model)
for title, content in notes_structure_json.items():
if isinstance(content, str):
content_stream = generate_section(transcript=transcription_text, existing_notes=notes.return_existing_contents(), section=(title + ": " + content), lang=lang)
for chunk in content_stream:
if isinstance(chunk, GenerationStatistics):
total_generation_statistics.add(chunk)
elif chunk is not None:
notes.update_content(title, chunk)
return transcription_text, notes.get_markdown_content(), notes.get_markdown_content()
except json.JSONDecodeError:
return "Failed to decode the notes structure. Please try again."
def generate_notes(transcription_text):
lang = detect_language(transcription_text[:100])
print("detect lang=",lang)
notes_structure = generate_notes_structure(transcription_text, lang=lang)
print("notes_structure=", notes_structure)
try:
notes_structure_json = json.loads(notes_structure)
print("notes_structure_json=",notes_structure_json)
notes = NoteSection(structure=notes_structure_json, transcript=transcription_text)
total_generation_statistics = GenerationStatistics(model_name=model)
for title, content in notes_structure_json.items():
if isinstance(content, str):
content_stream = generate_section(transcript=transcription_text, existing_notes=notes.return_existing_contents(), section=(title + ": " + content), lang=lang)
for chunk in content_stream:
if isinstance(chunk, GenerationStatistics):
total_generation_statistics.add(chunk)
elif chunk is not None:
notes.update_content(title, chunk)
return notes.get_markdown_content(), notes.get_markdown_content()
except json.JSONDecodeError:
return "Failed to decode the notes structure. Please try again."
'''
iface = gr.Interface(
fn=process_audio,
inputs=[
gr.File(label="Upload Audio File"),
gr.Textbox(label="YouTube Link"),
gr.Textbox(label="Groq API Key", type="password")
],
outputs=gr.Textbox(label="Generated Notes"),
title="GroqNotes: Create structured notes from audio",
description="Generate notes from audio using Groq, Whisper, and Llama3"
)
'''
CSS = """
h1 {
text-align: center;
display: block;
height: 10vh;
align-content: center;
}
footer {
visibility: hidden;
}
.texts {
min-height: 100px;
}
"""
def clear():
return None, None, None, None, None, None
def translate_text(source_text, source_lang, target_lang, country=None, max_tokens=MAX_TOKENS_PER_CHUNK):
ic(f"start to translate transcription from {source_lang} to {target_lang}")
num_tokens_in_text = num_tokens_in_string(source_text)
ic(num_tokens_in_text)
if num_tokens_in_text < max_tokens:
ic("Translating text as single chunk")
progress((1,3), desc="First translation...")
#Note: use yield from B() if put yield in function B()
translation_1 = one_chunk_initial_translation(
source_lang, target_lang, source_text
)
yield translation_1, None
progress((2,3), desc="Reflecton...")
reflection = one_chunk_reflect_on_translation(
source_lang, target_lang, source_text, translation_1, country
)
yield reflection, None
progress((3,3), desc="Final translation...")
translation_2 = one_chunk_improve_translation(
source_lang, target_lang, source_text, translation_1, reflection
)
yield translation_2, None
else:
ic("Translating text as multiple chunks")
token_size = calculate_chunk_size(
token_count=num_tokens_in_text, token_limit=max_tokens
)
ic(token_size)
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
model_name = "gpt-4",
chunk_size=token_size,
chunk_overlap=0,
)
source_text_chunks = text_splitter.split_text(source_text)
progress((1,3), desc="First translation...")
translation_1_chunks = multichunk_initial_translation(
source_lang, target_lang, source_text_chunks
)
ic(translation_1_chunks)
translation_1 = "".join(translation_1_chunks)
#yield translation_1, None, None
progress((2,3), desc="Reflecton...")
reflection_chunks = multichunk_reflect_on_translation(
source_lang,
target_lang,
source_text_chunks,
translation_1_chunks,
country,
)
ic(reflection_chunks)
reflection = "".join(reflection_chunks)
#yield translation_1, reflection, None
progress((3,3), desc="Final translation...")
translation_2_chunks = multichunk_improve_translation(
source_lang,
target_lang,
source_text_chunks,
translation_1_chunks,
reflection_chunks,
)
ic(translation_2_chunks)
translation_2 = "".join(translation_2_chunks)
yield translation_2, None
def export_txt(strings):
if (strings is not None):
os.makedirs("outputs", exist_ok=True)
base_count = len(glob(os.path.join("outputs", "*.txt")))
file_path = os.path.join("outputs", f"{base_count:06d}.txt")
with open(file_path, "w", encoding="utf-8") as f:
f.write(strings)
return gr.update(value=file_path, label = "Ready to download markdown Summary")
def update_ui(transcription_output):
tokens = ""
encoding = tiktoken.get_encoding("cl100k_base")
token_count = len(encoding.encode(transcription_output))
print("token_count=",token_count)
if (token_count > 0):
tokens = f"tokens: {token_count}"
return gr.update(label=tokens)
with gr.Blocks(theme="soft", css=CSS) as demo:
gr.Markdown("# Whisper and Translation Agent")
with gr.Row():
with gr.Column(scale=1):
endpoint = gr.Dropdown(
label="Endpoint",
choices=["Groq","OpenAI","DeepSeek","Baichuan","Ollama","Huggingface"],
value="Groq",
)
model = gr.Textbox(label="Model", value="llama3-70b-8192", )
api_key = gr.Textbox(label="API_KEY", type="password", )
with gr.Column(scale=5):
with gr.Row():
file_input = gr.File(file_types=["audio", "video"])
text_input = gr.Textbox(placeholder="Enter youtube link")
with gr.Row():
clear_btn = gr.Button("CLEAR")
extract_btn = gr.Button("Extract")
with gr.Row():
with gr.Column(scale=1):
source_lang = gr.Dropdown(label="Source Lang(Auto-Detect)", choices=["English", "Chinese", "Spanish"], value="English")
target_lang = gr.Dropdown(label="Target Lang", choices=["English", "Chinese", "Spanish"], value="Chinese")
switch_Btn = gr.Button(value="🔄️")
translate_btn = gr.Button("Translate")
download_btn = gr.DownloadButton(label="Download")
with gr.Column(scale=5):
with gr.Row():
with gr.Tab("Transcription"):
transcription_output = gr.Textbox(label='', lines=5, show_copy_button=True)
with gr.Tab("Summary"):
summary_output = gr.Textbox(label='', lines=5, show_copy_button=True, elem_classes="texts")
with gr.Tab("Markdown"):
markdown_output = gr.Markdown(label='Markdown Summary', elem_classes="texts", height=500)
with gr.Row():
with gr.Tab("Translated Transcription"):
translated_transcription_output = gr.Textbox(label='', lines=5, show_copy_button=True)
with gr.Tab("Translated Summary"):
translated_summary_output = gr.Textbox(label='', elem_classes="texts", lines=5, show_copy_button=True)
with gr.Tab("Markdown Summary"):
translated_summary_markdown = gr.Markdown(label='', elem_classes="texts", height=500)
clear_btn.click(clear, outputs=[file_input, text_input, transcription_output, summary_output, translated_transcription_output, translated_summary_output])
extract_btn.click(process_audio, inputs=[file_input, text_input], outputs=[transcription_output, summary_output, markdown_output])
translate_btn.click(translate_text, inputs=[transcription_output, source_lang, target_lang], outputs=[translated_transcription_output]
).then(
generate_notes, inputs=[translated_transcription_output], outputs=[translated_summary_output,translated_summary_markdown])
translated_summary_markdown.change(fn=export_txt, inputs=translated_summary_markdown, outputs=[download_btn])
if __name__ == "__main__":
demo.launch()