import gradio as gr import torch import numpy as np from transformers import OwlViTProcessor, OwlViTForObjectDetection, ResNetModel from torchvision import transforms from PIL import Image import cv2 import torch.nn.functional as F import tempfile import os # Load models resnet = ResNetModel.from_pretrained("microsoft/resnet-50") resnet.eval() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") resnet = resnet.to(device) mixin = OwlViTForObjectDetection.from_pretrained("google/owlvit-base-patch32") processor = OwlViTProcessor.from_pretrained("google/owlvit-base-patch32") model = mixin.to(device) # Preprocess the image def preprocess_image(image): transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) return transform(image).unsqueeze(0) def extract_embedding(image): image_tensor = preprocess_image(image).to(device) with torch.no_grad(): output = resnet(image_tensor) embedding = output.pooler_output return embedding def cosine_similarity(embedding1, embedding2): return F.cosine_similarity(embedding1, embedding2) def l2_distance(embedding1, embedding2): return torch.norm(embedding1 - embedding2, p=2) def save_array_to_temp_image(arr): rgb_arr = cv2.cvtColor(arr, cv2.COLOR_BGR2RGB) img = Image.fromarray(rgb_arr) temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') temp_file_name = temp_file.name temp_file.close() img.save(temp_file_name) return temp_file_name def detect_and_crop(target_image, query_image, threshold=0.6, nms_threshold=0.3): target_sizes = torch.Tensor([target_image.size[::-1]]) inputs = processor(images=target_image, query_images=query_image, return_tensors="pt").to(device) with torch.no_grad(): outputs = model.image_guided_detection(**inputs) img = cv2.cvtColor(np.array(target_image), cv2.COLOR_BGR2RGB) outputs.logits = outputs.logits.cpu() outputs.target_pred_boxes = outputs.target_pred_boxes.cpu() results = processor.post_process_image_guided_detection(outputs=outputs, threshold=threshold, nms_threshold=nms_threshold, target_sizes=target_sizes) boxes, scores = results[0]["boxes"], results[0]["scores"] if len(boxes) == 0: return [] filtered_boxes = [] for box in boxes: x1, y1, x2, y2 = [int(i) for i in box.tolist()] cropped_img = img[y1:y2, x1:x2] if cropped_img.size != 0: filtered_boxes.append(cropped_img) return filtered_boxes def process_video(video_path, query_image, skipframes=0): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return frame_count = 0 all_results = [] while True: ret, frame = cap.read() if not ret: break if frame_count % (skipframes + 1) == 0: frame_file = save_array_to_temp_image(frame) result_frames = detect_and_crop(Image.open(frame_file), query_image) for res in result_frames: saved_res = save_array_to_temp_image(res) embedding1 = extract_embedding(query_image) embedding2 = extract_embedding(Image.open(saved_res)) dist = l2_distance(embedding1, embedding2).item() cos = cosine_similarity(embedding1, embedding2).item() all_results.append({'l2_dist': dist, 'cos': cos}) frame_count += 1 cap.release() return all_results def process_videos_and_compare(image, video, skipframes=5, threshold=0.47): def median(values): n = len(values) return (values[n // 2 - 1] + values[n // 2]) / 2 if n % 2 == 0 else values[n // 2] results = process_video(video, image, skipframes) if results: l2_dists = [item['l2_dist'] for item in results] cosines = [item['cos'] for item in results] avg_l2_dist = sum(l2_dists) / len(l2_dists) avg_cos = sum(cosines) / len(cosines) median_l2_dist = median(sorted(l2_dists)) median_cos = median(sorted(cosines)) result = { "avg_l2_dist": avg_l2_dist, "avg_cos": avg_cos, "median_l2_dist": median_l2_dist, "median_cos": median_cos, "avg_cos_dist": 1 - avg_cos, "median_cos_dist": 1 - median_cos, "is_present": avg_cos >= threshold } else: result = { "avg_l2_dist": float('inf'), "avg_cos": 0, "median_l2_dist": float('inf'), "median_cos": 0, "avg_cos_dist": float('inf'), "median_cos_dist": float('inf'), "is_present": False } return result def interface(video, image, skipframes, threshold): result = process_videos_and_compare(image, video, skipframes, threshold) return result iface = gr.Interface( fn=interface, inputs=[ gr.Video(label="Upload a Video"), gr.Image(type="pil", label="Upload a Query Image"), gr.Slider(minimum=0, maximum=10, step=1, value=5, label="Skip Frames"), gr.Slider(minimum=0.0, maximum=1.0, step=0.01, value=0.47, label="Threshold") ], outputs=[ gr.JSON(label="Result") ], title="Object Detection in Video", description=""" **Instructions:** 1. **Upload a Video**: Select a video file to upload. 2. **Upload a Query Image**: Select an image file that contains the object you want to detect in the video. 3. **Set Skip Frames**: Adjust the slider to set the number of frames to skip between each processing. 4. **Set Threshold**: Adjust the slider to set the threshold for cosine similarity to determine if the object is present in the video. 5. **View Results**: The result will show the average and median distances and similarities, and whether the object is present in the video based on the threshold. """ ) if __name__ == "__main__": iface.launch()