Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import os | |
import sys | |
import tempfile | |
import whisperx | |
import ffmpeg | |
import tiktoken | |
from io import BytesIO | |
from glob import glob | |
from dotenv import load_dotenv | |
from download import download_video_audio, delete_download | |
from groq import Groq | |
from openai import OpenAI | |
from langdetect import detect | |
from langdetect.lang_detect_exception import LangDetectException | |
from translation_agent.utils import * | |
os.environ["FFMPEG_PATH"] = "D:\\ffmpeg\\bin\\ffmpeg.exe" | |
os.environ["no_proxy"] = "localhost, 127.0.0.1, ::1" | |
load_dotenv() | |
GROQ_API_KEY = os.environ.get("GROQ_API_KEY", None) | |
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL")) | |
model = os.getenv("OPENAI_MODEL") or "gpt-4o" | |
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB | |
FILE_TOO_LARGE_MESSAGE = "The audio file is too large. If you used a YouTube link, please try a shorter video clip. If you uploaded an audio file, try trimming or compressing the audio to under 100 MB." | |
audio_file_path = None | |
progress=gr.Progress() | |
def detect_language(text): | |
try: | |
language = detect(text) | |
return language | |
except LangDetectException as e: | |
print(f"Error detecting language: {e}") | |
return None | |
class GenerationStatistics: | |
def __init__(self, input_time=0, output_time=0, input_tokens=0, output_tokens=0, total_time=0, model_name=model): | |
self.input_time = input_time | |
self.output_time = output_time | |
self.input_tokens = input_tokens | |
self.output_tokens = output_tokens | |
self.total_time = total_time | |
self.model_name = model_name | |
def get_input_speed(self): | |
return self.input_tokens / self.input_time if self.input_time != 0 else 0 | |
def get_output_speed(self): | |
return self.output_tokens / self.output_time if self.output_time != 0 else 0 | |
def add(self, other): | |
if not isinstance(other, GenerationStatistics): | |
raise TypeError("Can only add GenerationStatistics objects") | |
self.input_time += other.input_time | |
self.output_time += other.output_time | |
self.input_tokens += other.input_tokens | |
self.output_tokens += other.output_tokens | |
self.total_time += other.total_time | |
def __str__(self): | |
return (f"\n## {self.get_output_speed():.2f} T/s ⚡\nRound trip time: {self.total_time:.2f}s Model: {self.model_name}\n\n" | |
f"| Metric | Input | Output | Total |\n" | |
f"|-----------------|----------------|-----------------|----------------|\n" | |
f"| Speed (T/s) | {self.get_input_speed():.2f} | {self.get_output_speed():.2f} | {(self.input_tokens + self.output_tokens) / self.total_time if self.total_time != 0 else 0:.2f} |\n" | |
f"| Tokens | {self.input_tokens} | {self.output_tokens} | {self.input_tokens + self.output_tokens} |\n" | |
f"| Inference Time (s) | {self.input_time:.2f} | {self.output_time:.2f} | {self.total_time:.2f} |") | |
class NoteSection: | |
def __init__(self, structure, transcript): | |
self.structure = structure | |
self.contents = {title: "" for title in self.flatten_structure(structure)} | |
def flatten_structure(self, structure): | |
sections = [] | |
for title, content in structure.items(): | |
sections.append(title) | |
if isinstance(content, dict): | |
sections.extend(self.flatten_structure(content)) | |
return sections | |
def update_content(self, title, new_content): | |
try: | |
self.contents[title] += new_content | |
except TypeError as e: | |
pass | |
def return_existing_contents(self, level=1) -> str: | |
existing_content = "" | |
for title, content in self.structure.items(): | |
if self.contents[title].strip(): | |
existing_content += f"{'#' * level} {title}\n{self.contents[title]}.\n\n" | |
if isinstance(content, dict): | |
existing_content += self.get_markdown_content(content, level + 1) | |
return existing_content | |
def get_markdown_content(self, structure=None, level=1): | |
if structure is None: | |
structure = self.structure | |
markdown_content = "" | |
for title, content in structure.items(): | |
if self.contents[title].strip(): | |
markdown_content += f"{'#' * level} {title}\n{self.contents[title]}.\n\n" | |
if isinstance(content, dict): | |
markdown_content += self.get_markdown_content(content, level + 1) | |
return markdown_content | |
def transcribe_audio(audio_file): | |
model = whisperx.load_model("small", device="cuda", compute_type="float16") | |
result = model.transcribe(audio_file) | |
transcription = '' | |
segments = result['segments'] | |
for segment in segments: | |
transcription += segment['text'] | |
transcription += '\n' | |
return transcription | |
def generate_notes_structure(transcript: str, model: str = model, lang: str="en"): | |
shot_example = """ | |
"Introduction": "Introduction to the AMA session, including the topic of Groq scaling architecture and the panelists", | |
"Panelist Introductions": "Brief introductions from Igor, Andrew, and Omar, covering their backgrounds and roles at Groq", | |
"Groq Scaling Architecture Overview": "High-level overview of Groq's scaling architecture, covering hardware, software, and cloud components", | |
"Hardware Perspective": "Igor's overview of Groq's hardware approach, using an analogy of city traffic management to explain the traditional compute approach and Groq's innovative approach", | |
"Traditional Compute": "Description of traditional compute approach, including asynchronous nature, queues, and poor utilization of infrastructure", | |
"Groq's Approach": "Description of Groq's approach, including pre-orchestrated movement of data, low latency, high energy efficiency, and high utilization of resources", | |
"Hardware Implementation": "Igor's explanation of the hardware implementation, including a comparison of GPU and LPU architectures" | |
}""" | |
messages=[ | |
{ | |
"role": "system", | |
"content": "Write in JSON format:\n\n{\"Title of section goes here\":\"Description of section goes here\",\"Title of section goes here\":\"Description of section goes here\",\"Title of section goes here\":\"Description of section goes here\"}" | |
}, | |
{ | |
"role": "user", | |
#"content": f"Please respond in language [{lang}]. ### Transcript {transcript}\n\n### Example\n\n{shot_example}### Instructions\n\nCreate a structure for comprehensive notes on the above Transcript information. The output format is shown in examples. The section titles and content descriptions must be comprehensive and adapt to the content of transcript. Quality over quantity. " | |
"content": f"Please respond in language [{lang}]. ### Transcript {transcript}\n\n### Instructions\n\nCreate a structure for comprehensive notes on the above Transcript information. The output is json format as shown in system content. Section titles and content descriptions must be comprehensive and adapt to the content of transcript. Quality over quantity. " | |
} | |
] | |
completion = client.chat.completions.create( | |
model=model, | |
messages=messages, | |
temperature=0.3, | |
#max_tokens=128000, | |
top_p=1, | |
stream=False, | |
stop=None, | |
) | |
usage = completion.usage | |
#statistics_to_return = GenerationStatistics(input_tokens=usage.prompt_tokens, output_tokens=usage.completion_tokens, model_name=model) | |
return completion.choices[0].message.content | |
def generate_section(transcript: str, existing_notes: str, section: str, model: str = model, lang: str="en"): | |
stream = client.chat.completions.create( | |
model=model, | |
messages=[ | |
{ | |
"role": "system", | |
"content": "Respond in language [{lang}]. You are an expert writer. Generate a comprehensive note for the section provided based factually on the transcript provided. Do *not* repeat any content from previous sections." | |
}, | |
{ | |
"role": "user", | |
"content": f"Respond in language [{lang}]. ### Transcript\n\n{transcript}\n\n### Existing Notes\n\n{existing_notes}\n\n### Instructions\n\nGenerate comprehensive notes for this section only based on the transcript: \n\n{section}." | |
} | |
], | |
temperature=0.3, | |
max_tokens=16000, | |
top_p=1, | |
stream=True, | |
stop=None, | |
) | |
for chunk in stream: | |
tokens = chunk.choices[0].delta.content | |
if tokens: | |
yield tokens | |
def process_audio(audio_file, youtube_link): | |
global audio_file_path | |
i = 1; j = 3 #for progress | |
if youtube_link: | |
j += 1; progress((i,j), desc="download video..."); i += 1 | |
audio_file_path = download_video_audio(youtube_link) | |
if audio_file_path is None: | |
return "Failed to download audio from YouTube link. Please try again." | |
elif audio_file: | |
audio_file_path = audio_file.name | |
else: | |
return "Please provide either an audio file or a YouTube link." | |
if os.path.getsize(audio_file_path) > MAX_FILE_SIZE: | |
return FILE_TOO_LARGE_MESSAGE | |
progress((i,j), desc="Start transcribe audio..."); i += 1 | |
transcription_text = transcribe_audio(audio_file_path) | |
print("transcription_text=",transcription_text) | |
encoding = tiktoken.get_encoding("cl100k_base") | |
token_count = len(encoding.encode(transcription_text)) | |
print("token_count=",token_count) | |
#transcription_output.label = f"tokens: {token_count}" | |
lang = detect_language(transcription_text[:100]) | |
print("detect lang=",lang) | |
progress((i,j), desc="Generating notes structure..."); i += 1 | |
notes_structure = generate_notes_structure(transcription_text, lang=lang) | |
print("notes_structure=", notes_structure) | |
progress((i,j), desc="Generating notes section..."); i += 1 | |
try: | |
notes_structure_json = json.loads(notes_structure) | |
print("notes_structure_json=",notes_structure_json) | |
notes = NoteSection(structure=notes_structure_json, transcript=transcription_text) | |
total_generation_statistics = GenerationStatistics(model_name=model) | |
for title, content in notes_structure_json.items(): | |
if isinstance(content, str): | |
content_stream = generate_section(transcript=transcription_text, existing_notes=notes.return_existing_contents(), section=(title + ": " + content), lang=lang) | |
for chunk in content_stream: | |
if isinstance(chunk, GenerationStatistics): | |
total_generation_statistics.add(chunk) | |
elif chunk is not None: | |
notes.update_content(title, chunk) | |
return transcription_text, notes.get_markdown_content(), notes.get_markdown_content() | |
except json.JSONDecodeError: | |
return "Failed to decode the notes structure. Please try again." | |
def generate_notes(transcription_text): | |
lang = detect_language(transcription_text[:100]) | |
print("detect lang=",lang) | |
notes_structure = generate_notes_structure(transcription_text, lang=lang) | |
print("notes_structure=", notes_structure) | |
try: | |
notes_structure_json = json.loads(notes_structure) | |
print("notes_structure_json=",notes_structure_json) | |
notes = NoteSection(structure=notes_structure_json, transcript=transcription_text) | |
total_generation_statistics = GenerationStatistics(model_name=model) | |
for title, content in notes_structure_json.items(): | |
if isinstance(content, str): | |
content_stream = generate_section(transcript=transcription_text, existing_notes=notes.return_existing_contents(), section=(title + ": " + content), lang=lang) | |
for chunk in content_stream: | |
if isinstance(chunk, GenerationStatistics): | |
total_generation_statistics.add(chunk) | |
elif chunk is not None: | |
notes.update_content(title, chunk) | |
return notes.get_markdown_content(), notes.get_markdown_content() | |
except json.JSONDecodeError: | |
return "Failed to decode the notes structure. Please try again." | |
''' | |
iface = gr.Interface( | |
fn=process_audio, | |
inputs=[ | |
gr.File(label="Upload Audio File"), | |
gr.Textbox(label="YouTube Link"), | |
gr.Textbox(label="Groq API Key", type="password") | |
], | |
outputs=gr.Textbox(label="Generated Notes"), | |
title="GroqNotes: Create structured notes from audio", | |
description="Generate notes from audio using Groq, Whisper, and Llama3" | |
) | |
''' | |
CSS = """ | |
h1 { | |
text-align: center; | |
display: block; | |
height: 10vh; | |
align-content: center; | |
} | |
footer { | |
visibility: hidden; | |
} | |
.texts { | |
min-height: 100px; | |
} | |
""" | |
def clear(): | |
return None, None, None, None, None, None | |
def translate_text(source_text, source_lang, target_lang, country=None, max_tokens=MAX_TOKENS_PER_CHUNK): | |
ic(f"start to translate transcription from {source_lang} to {target_lang}") | |
num_tokens_in_text = num_tokens_in_string(source_text) | |
ic(num_tokens_in_text) | |
if num_tokens_in_text < max_tokens: | |
ic("Translating text as single chunk") | |
progress((1,3), desc="First translation...") | |
#Note: use yield from B() if put yield in function B() | |
translation_1 = one_chunk_initial_translation( | |
source_lang, target_lang, source_text | |
) | |
yield translation_1, None | |
progress((2,3), desc="Reflecton...") | |
reflection = one_chunk_reflect_on_translation( | |
source_lang, target_lang, source_text, translation_1, country | |
) | |
yield reflection, None | |
progress((3,3), desc="Final translation...") | |
translation_2 = one_chunk_improve_translation( | |
source_lang, target_lang, source_text, translation_1, reflection | |
) | |
yield translation_2, None | |
else: | |
ic("Translating text as multiple chunks") | |
token_size = calculate_chunk_size( | |
token_count=num_tokens_in_text, token_limit=max_tokens | |
) | |
ic(token_size) | |
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( | |
model_name = "gpt-4", | |
chunk_size=token_size, | |
chunk_overlap=0, | |
) | |
source_text_chunks = text_splitter.split_text(source_text) | |
progress((1,3), desc="First translation...") | |
translation_1_chunks = multichunk_initial_translation( | |
source_lang, target_lang, source_text_chunks | |
) | |
ic(translation_1_chunks) | |
translation_1 = "".join(translation_1_chunks) | |
#yield translation_1, None, None | |
progress((2,3), desc="Reflecton...") | |
reflection_chunks = multichunk_reflect_on_translation( | |
source_lang, | |
target_lang, | |
source_text_chunks, | |
translation_1_chunks, | |
country, | |
) | |
ic(reflection_chunks) | |
reflection = "".join(reflection_chunks) | |
#yield translation_1, reflection, None | |
progress((3,3), desc="Final translation...") | |
translation_2_chunks = multichunk_improve_translation( | |
source_lang, | |
target_lang, | |
source_text_chunks, | |
translation_1_chunks, | |
reflection_chunks, | |
) | |
ic(translation_2_chunks) | |
translation_2 = "".join(translation_2_chunks) | |
yield translation_2, None | |
def export_txt(strings): | |
if (strings is not None): | |
os.makedirs("outputs", exist_ok=True) | |
base_count = len(glob(os.path.join("outputs", "*.txt"))) | |
file_path = os.path.join("outputs", f"{base_count:06d}.txt") | |
with open(file_path, "w", encoding="utf-8") as f: | |
f.write(strings) | |
return gr.update(value=file_path, label = "Ready to download markdown Summary") | |
def update_ui(transcription_output): | |
tokens = "" | |
encoding = tiktoken.get_encoding("cl100k_base") | |
token_count = len(encoding.encode(transcription_output)) | |
print("token_count=",token_count) | |
if (token_count > 0): | |
tokens = f"tokens: {token_count}" | |
return gr.update(label=tokens) | |
with gr.Blocks(theme="soft", css=CSS) as demo: | |
gr.Markdown("# Whisper and Translation Agent") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
endpoint = gr.Dropdown( | |
label="Endpoint", | |
choices=["Groq","OpenAI","DeepSeek","Baichuan","Ollama","Huggingface"], | |
value="Groq", | |
) | |
model = gr.Textbox(label="Model", value="llama3-70b-8192", ) | |
api_key = gr.Textbox(label="API_KEY", type="password", ) | |
with gr.Column(scale=5): | |
with gr.Row(): | |
file_input = gr.File(file_types=["audio", "video"]) | |
text_input = gr.Textbox(placeholder="Enter youtube link") | |
with gr.Row(): | |
clear_btn = gr.Button("CLEAR") | |
extract_btn = gr.Button("Extract") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
source_lang = gr.Dropdown(label="Source Lang(Auto-Detect)", choices=["English", "Chinese", "Spanish"], value="English") | |
target_lang = gr.Dropdown(label="Target Lang", choices=["English", "Chinese", "Spanish"], value="Chinese") | |
switch_Btn = gr.Button(value="🔄️") | |
translate_btn = gr.Button("Translate") | |
download_btn = gr.DownloadButton(label="Download") | |
with gr.Column(scale=5): | |
with gr.Row(): | |
with gr.Tab("Transcription"): | |
transcription_output = gr.Textbox(label='', lines=5, show_copy_button=True) | |
with gr.Tab("Summary"): | |
summary_output = gr.Textbox(label='', lines=5, show_copy_button=True, elem_classes="texts") | |
with gr.Tab("Markdown"): | |
markdown_output = gr.Markdown(label='Markdown Summary', elem_classes="texts", height=500) | |
with gr.Row(): | |
with gr.Tab("Translated Transcription"): | |
translated_transcription_output = gr.Textbox(label='', lines=5, show_copy_button=True) | |
with gr.Tab("Translated Summary"): | |
translated_summary_output = gr.Textbox(label='', elem_classes="texts", lines=5, show_copy_button=True) | |
with gr.Tab("Markdown Summary"): | |
translated_summary_markdown = gr.Markdown(label='', elem_classes="texts", height=500) | |
clear_btn.click(clear, outputs=[file_input, text_input, transcription_output, summary_output, translated_transcription_output, translated_summary_output]) | |
extract_btn.click(process_audio, inputs=[file_input, text_input], outputs=[transcription_output, summary_output, markdown_output]) | |
translate_btn.click(translate_text, inputs=[transcription_output, source_lang, target_lang], outputs=[translated_transcription_output] | |
).then( | |
generate_notes, inputs=[translated_transcription_output], outputs=[translated_summary_output,translated_summary_markdown]) | |
translated_summary_markdown.change(fn=export_txt, inputs=translated_summary_markdown, outputs=[download_btn]) | |
if __name__ == "__main__": | |
demo.launch() | |