Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
import os | |
import subprocess | |
from threading import Thread | |
from transformers import AutoTokenizer, AutoModelForCausalLM,pipeline | |
import spaces | |
import moviepy.editor as mp | |
import time | |
import langdetect | |
import uuid | |
from dotenv import load_dotenv | |
import whisper | |
from pathlib import Path | |
import numpy as np | |
from scipy.io import wavfile | |
load_dotenv() | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
print("Starting the program...") | |
model_path = "internlm/internlm2_5-7b-chat" | |
print(f"Loading model {model_path}...") | |
#tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) | |
#model = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.float16, trust_remote_code=True).cuda() | |
#model = model.eval() | |
print("Model successfully loaded.") | |
model = whisper.load_model("base") | |
print("Model successfully loaded.") | |
def generate_unique_filename(extension): | |
return f"{uuid.uuid4()}{extension}" | |
def cleanup_files(*files): | |
for file in files: | |
if file and os.path.exists(file): | |
os.remove(file) | |
print(f"Removed file: {file}") | |
def transcribe_audio(file_path): | |
print(f"Starting transcription of file: {file_path}") | |
temp_audio = None | |
if file_path.endswith(('.mp4', '.avi', '.mov', '.flv')): | |
print("Video file detected. Extracting audio...") | |
try: | |
video = mp.VideoFileClip(file_path) | |
temp_audio = generate_unique_filename(".wav") | |
video.audio.write_audiofile(temp_audio) | |
print(f"temp_audio : {temp_audio}") | |
model = whisper.load_model("base.en") | |
print(f"transcription1") | |
p = Path(__file__).resolve().parent | |
final_path = p / temp_audio | |
print(final_path) | |
if os.access(str(final_path), os.R_OK): | |
print("File is readable.") | |
else: | |
print("File is not readable. Check permissions.") | |
#sample_rate, audio_data = wavfile.read(str(final_path)) | |
#transcription = model.transcribe(audio_data, sample_rate=sample_rate) | |
transcription = model.transcribe(str(final_path)) | |
print(f"transcription {transcription}") | |
if "text" in transcription: | |
result = transcription["text"] | |
else: | |
result = " ".join([chunk["text"] for chunk in transcription.get("chunks", [])]) | |
#file_path = temp_audio | |
except Exception as e: | |
print(f"Error extracting audio from video: {e}") | |
raise | |
print(f"Does the file exist? {os.path.exists(file_path)}") | |
print(f"File size: {os.path.getsize(file_path) if os.path.exists(file_path) else 'N/A'} bytes") | |
try: | |
print(f"Reading transcription file: {file_path}") | |
#with open(file_path, 'r') as file: | |
#file_contents = file.read() | |
print(f"File content: {file_path}") | |
#time.sleep(5) | |
except ConnectionResetError as e: | |
print(f"Connection error occurred: {e}") | |
except Exception as e: | |
print(f"Error output: {e}") | |
print("Transcription completed.") | |
# Cleanup | |
if temp_audio: | |
cleanup_files(temp_audio) | |
return result | |
def generate_summary_stream(transcription): | |
print("Starting summary generation...") | |
print(f"Transcription length: {len(transcription)} characters") | |
#detected_language = langdetect.detect(transcription) | |
#prompt = f"""Summarize the following video transcription in 200-300 words. | |
#The summary should be in the same language as the transcription, which is detected as {detected_language}. | |
#Please ensure that the summary captures the main points and key ideas of the transcription: | |
#{transcription[:300000]}...""" | |
#response, history = model.chat(tokenizer, prompt, history=[]) | |
#print(f"Final summary generated: {response[:100]}...") | |
summarizer = pipeline("summarization") | |
summary = summarizer(transcription, max_length=500, min_length=250, do_sample=False) | |
#print(summary[0]['summary_text']) | |
print("Summary generation completed.") | |
return summary[0]['summary_text'] | |
def process_uploaded_video(video_path): | |
print(f"Processing uploaded video: {video_path}") | |
try: | |
print("Starting transcription...") | |
transcription = transcribe_audio(video_path) | |
print(f"Transcription completed. Length: {len(transcription)} characters") | |
return transcription, None | |
except Exception as e: | |
print(f"Error processing video: {e}") | |
return f"Processing error: {str(e)}", None | |
print("Setting up Gradio interface...") | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown( | |
""" | |
# π₯ Video Transcription and Smart Summary | |
Upload a video to get a transcription and AI-generated summary. | |
""" | |
) | |
with gr.Tabs(): | |
with gr.TabItem("π€ Video Upload"): | |
video_input = gr.Video(label="Drag and drop or click to upload") | |
video_button = gr.Button("π Process Video", variant="primary") | |
with gr.Row(): | |
with gr.Column(): | |
transcription_output = gr.Textbox(label="π Transcription", lines=10, show_copy_button=True) | |
with gr.Column(): | |
summary_output = gr.Textbox(label="π Summary", lines=10, show_copy_button=True) | |
summary_button = gr.Button("π Generate Summary", variant="secondary") | |
gr.Markdown( | |
""" | |
### How to use: | |
1. Upload a video. | |
2. Click 'Process' to get the transcription. | |
3. Click 'Generate Summary' to get a summary of the content. | |
*Note: Processing may take a few minutes depending on the video length.* | |
""" | |
) | |
def process_video_and_update(video): | |
if video is None: | |
return "No video uploaded.", "Please upload a video." | |
print(f"Video received: {video}") | |
transcription, _ = process_uploaded_video(video) | |
print(f"Returned transcription: {transcription[:100] if transcription else 'No transcription generated'}...") | |
return transcription or "Transcription error", "" | |
video_button.click(process_video_and_update, inputs=[video_input], outputs=[transcription_output, summary_output]) | |
summary_button.click(generate_summary_stream, inputs=[transcription_output], outputs=[summary_output]) | |
print("Launching Gradio interface...") | |
demo.launch() |