File size: 8,305 Bytes
68385fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import gradio as gr
import torch
import yt_dlp
import os
import subprocess
import json
from threading import Thread
from transformers import AutoTokenizer, AutoModelForCausalLM
import spaces
import moviepy.editor as mp
import time
import langdetect
import uuid

HF_TOKEN = os.environ.get("HF_TOKEN")
print("Starting the program...")

model_path = "internlm/internlm2_5-7b-chat"
print(f"Loading model {model_path}...")
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16, trust_remote_code=True).cuda()
model = model.eval()
print("Model successfully loaded.")

def generate_unique_filename(extension):
    return f"{uuid.uuid4()}{extension}"

def cleanup_files(*files):
    for file in files:
        if file and os.path.exists(file):
            os.remove(file)
            print(f"Removed file: {file}")

def download_youtube_audio(url):
    print(f"Downloading audio from YouTube: {url}")
    output_path = generate_unique_filename(".wav")
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'wav',
        }],
        'outtmpl': output_path,
        'keepvideo': True,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])
    
    # Check if the file was renamed to .wav.wav
    if os.path.exists(output_path + ".wav"):
        os.rename(output_path + ".wav", output_path)
    
    if os.path.exists(output_path):
        print(f"Audio download completed. File saved at: {output_path}")
        print(f"File size: {os.path.getsize(output_path)} bytes")
    else:
        print(f"Error: File {output_path} not found after download.")
    
    return output_path


def transcribe_audio(file_path):
    print(f"Starting transcription of file: {file_path}")
    temp_audio = None
    if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')):
        print("Video file detected. Extracting audio...")
        try:
            video = mp.VideoFileClip(file_path)
            temp_audio = generate_unique_filename(".wav")
            video.audio.write_audiofile(temp_audio)
            file_path = temp_audio
        except Exception as e:
            print(f"Error extracting audio from video: {e}")
            raise
    
    print(f"Does the file exist? {os.path.exists(file_path)}")
    print(f"File size: {os.path.getsize(file_path) if os.path.exists(file_path) else 'N/A'} bytes")
    
    output_file = generate_unique_filename(".json")
    command = [
        "insanely-fast-whisper",
        "--file-name", file_path,
        "--device-id", "0",
        "--model-name", "openai/whisper-large-v3",
        "--task", "transcribe",
        "--timestamp", "chunk",
        "--transcript-path", output_file
    ]
    print(f"Executing command: {' '.join(command)}")
    try:
        result = subprocess.run(command, check=True, capture_output=True, text=True)
        print(f"Standard output: {result.stdout}")
        print(f"Error output: {result.stderr}")
    except subprocess.CalledProcessError as e:
        print(f"Error running insanely-fast-whisper: {e}")
        print(f"Standard output: {e.stdout}")
        print(f"Error output: {e.stderr}")
        raise
    
    print(f"Reading transcription file: {output_file}")
    try:
        with open(output_file, "r") as f:
            transcription = json.load(f)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        print(f"File content: {open(output_file, 'r').read()}")
        raise
    
    if "text" in transcription:
        result = transcription["text"]
    else:
        result = " ".join([chunk["text"] for chunk in transcription.get("chunks", [])])
    
    print("Transcription completed.")
    
    # Cleanup
    cleanup_files(output_file)
    if temp_audio:
        cleanup_files(temp_audio)
    
    return result

@spaces.GPU(duration=90)
def generate_summary_stream(transcription):
    print("Starting summary generation...")
    print(f"Transcription length: {len(transcription)} characters")
    
    detected_language = langdetect.detect(transcription)
    
    prompt = f"""Summarize the following video transcription in 150-300 words. 
    The summary should be in the same language as the transcription, which is detected as {detected_language}.
    Please ensure that the summary captures the main points and key ideas of the transcription:
    {transcription[:300000]}..."""
    
    response, history = model.chat(tokenizer, prompt, history=[])
    print(f"Final summary generated: {response[:100]}...")
    print("Summary generation completed.")
    return response

def process_youtube(url):
    if not url:
        print("YouTube URL not provided.")
        return "Please enter a YouTube URL.", None
    print(f"Processing YouTube URL: {url}")
    
    audio_file = None
    try:
        audio_file = download_youtube_audio(url)
        if not os.path.exists(audio_file):
            raise FileNotFoundError(f"File {audio_file} does not exist after download.")
        
        print(f"Audio file found: {audio_file}")
        print("Starting transcription...")
        transcription = transcribe_audio(audio_file)
        print(f"Transcription completed. Length: {len(transcription)} characters")
        return transcription, None
    except Exception as e:
        print(f"Error processing YouTube: {e}")
        return f"Processing error: {str(e)}", None
    finally:
        if audio_file and os.path.exists(audio_file):
            cleanup_files(audio_file)
        print(f"Directory content after processing: {os.listdir('.')}")

def process_uploaded_video(video_path):
    print(f"Processing uploaded video: {video_path}")
    try:
        print("Starting transcription...")
        transcription = transcribe_audio(video_path)
        print(f"Transcription completed. Length: {len(transcription)} characters")
        return transcription, None
    except Exception as e:
        print(f"Error processing video: {e}")
        return f"Processing error: {str(e)}", None

print("Setting up Gradio interface...")
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown(
        """
        # πŸŽ₯ Video Transcription and Smart Summary
        
        """
    )
    
    with gr.Tabs():
        with gr.TabItem("πŸ“€ Video Upload"):
            video_input = gr.Video(label="Drag and drop or click to upload")
            video_button = gr.Button("πŸš€ Process Video", variant="primary")
        
        with gr.TabItem("πŸ”— YouTube Link"):
            url_input = gr.Textbox(label="Paste YouTube URL here", placeholder="https://www.youtube.com/watch?v=...")
            url_button = gr.Button("πŸš€ Process URL", variant="primary")
    
    with gr.Row():
        with gr.Column():
            transcription_output = gr.Textbox(label="πŸ“ Transcription", lines=10, show_copy_button=True)
        with gr.Column():
            summary_output = gr.Textbox(label="πŸ“Š Summary", lines=10, show_copy_button=True)
    
    summary_button = gr.Button("πŸ“ Generate Summary", variant="secondary")
    
    gr.Markdown(
        """
        ### How to use:
        1. Upload a video or paste a YouTube link.
        2. Click 'Process' to get the transcription.
        3. Click 'Generate Summary' to get a summary of the content.
        
        *Note: Processing may take a few minutes depending on the video length.*
        """
    )
    
    def process_video_and_update(video):
        if video is None:
            return "No video uploaded.", "Please upload a video."
        print(f"Video received: {video}")
        transcription, _ = process_uploaded_video(video)
        print(f"Returned transcription: {transcription[:100] if transcription else 'No transcription generated'}...")
        return transcription or "Transcription error", ""

    video_button.click(process_video_and_update, inputs=[video_input], outputs=[transcription_output, summary_output])
    url_button.click(process_youtube, inputs=[url_input], outputs=[transcription_output, summary_output])
    summary_button.click(generate_summary_stream, inputs=[transcription_output], outputs=[summary_output])

print("Launching Gradio interface...")
demo.launch()