Spaces:
Runtime error
Runtime error
import gradio as gr | |
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor | |
from qwen_vl_utils import process_vision_info | |
from PIL import Image | |
import cv2 | |
import numpy as np | |
import os | |
import torch | |
print(f"PyTorch version: {torch.__version__}") | |
print(f"CUDA available: {torch.cuda.is_available()}") | |
if torch.cuda.is_available(): | |
print(f"CUDA version: {torch.version.cuda}") | |
print(f"Device count: {torch.cuda.device_count()}") | |
print(f"Current device: {torch.cuda.current_device()}") | |
print(f"Device name: {torch.cuda.get_device_name()}") | |
torch.backends.cuda.matmul.allow_tf32 = True | |
torch.backends.cudnn.allow_tf32 = True | |
# Set the default tensor type to cuda | |
if torch.cuda.is_available(): | |
torch.set_default_tensor_type('torch.cuda.FloatTensor') | |
def load_model(): | |
try: | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
print(f"Using device: {device}") | |
model = Qwen2VLForConditionalGeneration.from_pretrained( | |
"Qwen/Qwen2-VL-2B-Instruct", | |
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
device_map="auto", | |
low_cpu_mem_usage=True | |
) | |
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct") | |
return model, processor, device | |
except Exception as e: | |
print(f"Error loading model: {e}") | |
return None, None, None | |
model, processor, device = load_model() | |
SYSTEM_PROMPT = """You are an expert technical analyst specializing in identifying bugs, fixing errors, and explaining code functions from visual inputs. When presented with an image or video: | |
1. If you see code, analyze it for potential bugs or errors, and suggest fixes. | |
2. If you see a function or algorithm, explain its purpose and how it works. | |
3. If you see a technical diagram or flowchart, interpret its meaning and purpose. | |
4. For any technical content, provide detailed explanations and insights. | |
Always maintain a professional and technical tone in your responses.""" | |
def process_content(file, user_prompt): | |
if file is None: | |
return "No content provided. Please upload an image or video of technical content." | |
file_path = file.name | |
file_extension = os.path.splitext(file_path)[1].lower() | |
if file_extension in ['.jpg', '.jpeg', '.png', '.bmp']: | |
image = Image.open(file_path) | |
return analyze_image(image, user_prompt) | |
elif file_extension in ['.mp4', '.avi', '.mov']: | |
return analyze_video(file_path, user_prompt) | |
else: | |
return "Unsupported file type. Please provide an image (jpg, jpeg, png, bmp) or video (mp4, avi, mov) of technical content." | |
def analyze_image(image, prompt): | |
messages = [ | |
{"role": "system", "content": SYSTEM_PROMPT}, | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "image", "image": image}, | |
{"type": "text", "text": f"Based on the system instructions, {prompt}"}, | |
], | |
} | |
] | |
return generate_response(messages) | |
def analyze_video(video_path, prompt, max_frames=16, frame_interval=30, max_resolution=224): | |
try: | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
return "Error: Could not open video file." | |
frames = [] | |
frame_count = 0 | |
while len(frames) < max_frames: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
if frame_count % frame_interval == 0: | |
h, w = frame.shape[:2] | |
if h > w: | |
new_h, new_w = max_resolution, int(w * max_resolution / h) | |
else: | |
new_h, new_w = int(h * max_resolution / w), max_resolution | |
frame = cv2.resize(frame, (new_w, new_h)) | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
frame = Image.fromarray(frame) | |
frames.append(frame) | |
frame_count += 1 | |
return generate_response([ | |
{"role": "system", "content": SYSTEM_PROMPT}, | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "video", "video": frames}, | |
{"type": "text", "text": f"Based on the system instructions, {prompt}"}, | |
], | |
} | |
]) | |
except Exception as e: | |
return f"Error processing video: {e}" | |
finally: | |
if 'cap' in locals(): | |
cap.release() | |
def generate_response(messages): | |
try: | |
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
image_inputs, video_inputs = process_vision_info(messages) | |
inputs = processor( | |
text=[text], | |
images=image_inputs, | |
videos=video_inputs, | |
padding=True, | |
return_tensors="pt" | |
) | |
# Move inputs to GPU | |
inputs = {k: v.to(device) for k, v in inputs.items()} | |
with torch.no_grad(): | |
generated_ids = model.generate( | |
**inputs, | |
max_new_tokens=512, | |
do_sample=True, | |
top_k=20, | |
top_p=0.9, | |
temperature=0.7 | |
) | |
generated_ids_trimmed = [ | |
out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) | |
] | |
output_text = processor.batch_decode( | |
generated_ids_trimmed, | |
skip_special_tokens=True, | |
clean_up_tokenization_spaces=False | |
) | |
# Clear CUDA cache | |
torch.cuda.empty_cache() | |
return output_text[0] | |
except Exception as e: | |
return f"Error generating response: {e}" | |
# Gradio interface | |
iface = gr.Interface( | |
fn=process_content, | |
inputs=[ | |
gr.File(label="Upload Image or Video of Technical Content"), | |
gr.Textbox(label="Enter your technical question", placeholder="e.g., Identify any bugs in this code and suggest fixes", value="Analyze this technical content and provide insights.") | |
], | |
outputs="text", | |
title="Technical Content Analysis", | |
description="Upload an image or video of code, diagrams, or technical content. Ask questions about bugs, errors, or explanations of functions.", | |
) | |
iface.launch(share=True) |