import time import uuid import cv2 import gradio as gr import numpy as np import spaces import supervision as sv import torch # Ensuring torch import remains from transformers import AutoModelForZeroShotObjectDetection, AutoProcessor # Detect if CUDA is available and set the device accordingly device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Load the processor and model from Hugging Face processor = AutoProcessor.from_pretrained("omlab/omdet-turbo-swin-tiny-hf") model = AutoModelForZeroShotObjectDetection.from_pretrained("omlab/omdet-turbo-swin-tiny-hf").to(device) # Custom CSS to enhance text area visibility css = """ .feedback textarea {font-size: 24px !important} """ # Initialize global variables global classes global detections global labels global threshold # Set default values classes = "person, university, class, Liectenstein" detections = None labels = None threshold = 0.2 # Instantiate annotators for bounding boxes, masks, and labels BOUNDING_BOX_ANNOTATOR = sv.BoundingBoxAnnotator() MASK_ANNOTATOR = sv.MaskAnnotator() LABEL_ANNOTATOR = sv.LabelAnnotator() # Frame subsampling factor for video processing efficiency SUBSAMPLE = 2 def annotate_image(input_image, detections, labels) -> np.ndarray: """Applies mask, bounding box, and label annotations to a given image.""" output_image = MASK_ANNOTATOR.annotate(input_image, detections) output_image = BOUNDING_BOX_ANNOTATOR.annotate(output_image, detections) output_image = LABEL_ANNOTATOR.annotate(output_image, detections, labels=labels) return output_image @spaces.GPU def process_video(input_video, confidence_threshold, classes_new, progress=gr.Progress(track_tqdm=True)): """Processes the input video frame by frame, performs object detection, and saves the output video.""" global detections, labels, classes, threshold classes = classes_new threshold = confidence_threshold # Generate a unique file name for the output video result_file_name = f"output_{uuid.uuid4()}.mp4" # Read input video and set up output video writer cap = cv2.VideoCapture(input_video) video_codec = cv2.VideoWriter_fourcc(*"mp4v") # MP4 codec fps = int(cap.get(cv2.CAP_PROP_FPS)) width, height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) desired_fps = fps // SUBSAMPLE iterating, frame = cap.read() # Prepare video writer for output segment_file = cv2.VideoWriter(result_file_name, video_codec, desired_fps, (width, height)) batch, frames, predict_index = [], [], [] n_frames = 0 while iterating: if n_frames % SUBSAMPLE == 0: predict_index.append(len(frames)) batch.append(frame) frames.append(frame) # Process a batch of frames at once if len(batch) == desired_fps: classes_list = classes.strip().split(",") results, fps_value = query(batch, classes_list, threshold, (width, height)) for i, frame in enumerate(frames): if i in predict_index: batch_idx = predict_index.index(i) detections = sv.Detections( xyxy=results[batch_idx]["boxes"].cpu().detach().numpy(), confidence=results[batch_idx]["scores"].cpu().detach().numpy(), class_id=np.array([classes_list.index(result_class) for result_class in results[batch_idx]["classes"]]), data={"class_name": results[batch_idx]["classes"]}, ) labels = results[batch_idx]["classes"] frame = annotate_image(input_image=frame, detections=detections, labels=labels) segment_file.write(frame) # Finalize and yield result segment_file.release() yield result_file_name, gr.Markdown(f'