File size: 8,305 Bytes
68385fa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
import gradio as gr
import torch
import yt_dlp
import os
import subprocess
import json
from threading import Thread
from transformers import AutoTokenizer, AutoModelForCausalLM
import spaces
import moviepy.editor as mp
import time
import langdetect
import uuid
HF_TOKEN = os.environ.get("HF_TOKEN")
print("Starting the program...")
model_path = "internlm/internlm2_5-7b-chat"
print(f"Loading model {model_path}...")
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16, trust_remote_code=True).cuda()
model = model.eval()
print("Model successfully loaded.")
def generate_unique_filename(extension):
return f"{uuid.uuid4()}{extension}"
def cleanup_files(*files):
for file in files:
if file and os.path.exists(file):
os.remove(file)
print(f"Removed file: {file}")
def download_youtube_audio(url):
print(f"Downloading audio from YouTube: {url}")
output_path = generate_unique_filename(".wav")
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
}],
'outtmpl': output_path,
'keepvideo': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# Check if the file was renamed to .wav.wav
if os.path.exists(output_path + ".wav"):
os.rename(output_path + ".wav", output_path)
if os.path.exists(output_path):
print(f"Audio download completed. File saved at: {output_path}")
print(f"File size: {os.path.getsize(output_path)} bytes")
else:
print(f"Error: File {output_path} not found after download.")
return output_path
def transcribe_audio(file_path):
print(f"Starting transcription of file: {file_path}")
temp_audio = None
if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')):
print("Video file detected. Extracting audio...")
try:
video = mp.VideoFileClip(file_path)
temp_audio = generate_unique_filename(".wav")
video.audio.write_audiofile(temp_audio)
file_path = temp_audio
except Exception as e:
print(f"Error extracting audio from video: {e}")
raise
print(f"Does the file exist? {os.path.exists(file_path)}")
print(f"File size: {os.path.getsize(file_path) if os.path.exists(file_path) else 'N/A'} bytes")
output_file = generate_unique_filename(".json")
command = [
"insanely-fast-whisper",
"--file-name", file_path,
"--device-id", "0",
"--model-name", "openai/whisper-large-v3",
"--task", "transcribe",
"--timestamp", "chunk",
"--transcript-path", output_file
]
print(f"Executing command: {' '.join(command)}")
try:
result = subprocess.run(command, check=True, capture_output=True, text=True)
print(f"Standard output: {result.stdout}")
print(f"Error output: {result.stderr}")
except subprocess.CalledProcessError as e:
print(f"Error running insanely-fast-whisper: {e}")
print(f"Standard output: {e.stdout}")
print(f"Error output: {e.stderr}")
raise
print(f"Reading transcription file: {output_file}")
try:
with open(output_file, "r") as f:
transcription = json.load(f)
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
print(f"File content: {open(output_file, 'r').read()}")
raise
if "text" in transcription:
result = transcription["text"]
else:
result = " ".join([chunk["text"] for chunk in transcription.get("chunks", [])])
print("Transcription completed.")
# Cleanup
cleanup_files(output_file)
if temp_audio:
cleanup_files(temp_audio)
return result
@spaces.GPU(duration=90)
def generate_summary_stream(transcription):
print("Starting summary generation...")
print(f"Transcription length: {len(transcription)} characters")
detected_language = langdetect.detect(transcription)
prompt = f"""Summarize the following video transcription in 150-300 words.
The summary should be in the same language as the transcription, which is detected as {detected_language}.
Please ensure that the summary captures the main points and key ideas of the transcription:
{transcription[:300000]}..."""
response, history = model.chat(tokenizer, prompt, history=[])
print(f"Final summary generated: {response[:100]}...")
print("Summary generation completed.")
return response
def process_youtube(url):
if not url:
print("YouTube URL not provided.")
return "Please enter a YouTube URL.", None
print(f"Processing YouTube URL: {url}")
audio_file = None
try:
audio_file = download_youtube_audio(url)
if not os.path.exists(audio_file):
raise FileNotFoundError(f"File {audio_file} does not exist after download.")
print(f"Audio file found: {audio_file}")
print("Starting transcription...")
transcription = transcribe_audio(audio_file)
print(f"Transcription completed. Length: {len(transcription)} characters")
return transcription, None
except Exception as e:
print(f"Error processing YouTube: {e}")
return f"Processing error: {str(e)}", None
finally:
if audio_file and os.path.exists(audio_file):
cleanup_files(audio_file)
print(f"Directory content after processing: {os.listdir('.')}")
def process_uploaded_video(video_path):
print(f"Processing uploaded video: {video_path}")
try:
print("Starting transcription...")
transcription = transcribe_audio(video_path)
print(f"Transcription completed. Length: {len(transcription)} characters")
return transcription, None
except Exception as e:
print(f"Error processing video: {e}")
return f"Processing error: {str(e)}", None
print("Setting up Gradio interface...")
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# π₯ Video Transcription and Smart Summary
"""
)
with gr.Tabs():
with gr.TabItem("π€ Video Upload"):
video_input = gr.Video(label="Drag and drop or click to upload")
video_button = gr.Button("π Process Video", variant="primary")
with gr.TabItem("π YouTube Link"):
url_input = gr.Textbox(label="Paste YouTube URL here", placeholder="https://www.youtube.com/watch?v=...")
url_button = gr.Button("π Process URL", variant="primary")
with gr.Row():
with gr.Column():
transcription_output = gr.Textbox(label="π Transcription", lines=10, show_copy_button=True)
with gr.Column():
summary_output = gr.Textbox(label="π Summary", lines=10, show_copy_button=True)
summary_button = gr.Button("π Generate Summary", variant="secondary")
gr.Markdown(
"""
### How to use:
1. Upload a video or paste a YouTube link.
2. Click 'Process' to get the transcription.
3. Click 'Generate Summary' to get a summary of the content.
*Note: Processing may take a few minutes depending on the video length.*
"""
)
def process_video_and_update(video):
if video is None:
return "No video uploaded.", "Please upload a video."
print(f"Video received: {video}")
transcription, _ = process_uploaded_video(video)
print(f"Returned transcription: {transcription[:100] if transcription else 'No transcription generated'}...")
return transcription or "Transcription error", ""
video_button.click(process_video_and_update, inputs=[video_input], outputs=[transcription_output, summary_output])
url_button.click(process_youtube, inputs=[url_input], outputs=[transcription_output, summary_output])
summary_button.click(generate_summary_stream, inputs=[transcription_output], outputs=[summary_output])
print("Launching Gradio interface...")
demo.launch() |