Dinov2-Video / utils.py
EduardoPacheco's picture
Fixed bug
350d5de
from typing import List
import cv2
import torch
import numpy as np
from tqdm import tqdm
import supervision as sv
import torch.nn.functional as F
from transformers import AutoModel
from sklearn.decomposition import PCA
from torchvision import transforms as T
from sklearn.preprocessing import MinMaxScaler
def load_video_frames(video_path: str) -> List[np.ndarray]:
frames = []
for frame in tqdm(sv.get_video_frames_generator(source_path=video_path), unit=" frames"):
frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
return frames
def preprocess(image: np.ndarray, n_patches: int, device: str, patch_size: int = 14) -> torch.Tensor:
IMAGENET_DEFAULT_MEAN = (0.485, 0.456, 0.406)
IMAGENET_DEFAULT_STD = (0.229, 0.224, 0.225)
transform = T.Compose([
T.Resize((n_patches * patch_size, n_patches * patch_size)),
T.Normalize(mean=IMAGENET_DEFAULT_MEAN, std=IMAGENET_DEFAULT_STD),
])
img = torch.from_numpy(image).type(torch.float).permute(2, 0, 1) / 255
img_tensor = transform(img).unsqueeze(0).to(device)
return img_tensor
def process_video(
model: AutoModel,
video: str | List[np.ndarray],
is_larger: bool = True,
batch_size: int = 4,
threshold: float = 0.5,
n_patches: int = 40,
interpolate: bool = False,
device: str = "cpu"
) -> List[np.ndarray]:
# NP = N_PATCHES
# P = PATCH_SIZE
if isinstance(video, str):
frames = load_video_frames(video)
else:
frames = video
patch_size = model.config.patch_size
original_height = frames[0].shape[0] # C, H, W
original_width = frames[0].shape[1] # C, H, W
final_frames = []
pca = PCA(n_components=3)
scaler = MinMaxScaler(clip=True)
for i in range(len(frames)//batch_size):
batch = frames[i*batch_size:batch_size*(i+1)]
pixel_values = [
preprocess(f, n_patches, device, patch_size).squeeze(0) for f in batch
]
pixel_values = torch.stack(pixel_values) # B, C, NP * P, NP * P
with torch.no_grad():
out = model(pixel_values=pixel_values)
features = out.last_hidden_state[:, 1:] # B, P * P, HIDDEN_DIM
features = features.cpu().numpy()
features = features.reshape(batch_size * n_patches * n_patches, -1) # B * P * P, HIDDEN_DIM
pca_features = pca.fit_transform(features)
pca_features = scaler.fit_transform(pca_features)
if is_larger:
pca_features_bg = pca_features[:, 0] > threshold
else:
pca_features_bg = pca_features[:, 0] < threshold
pca_features_fg = ~pca_features_bg
pca_features_fg_seg = pca.fit_transform(features[pca_features_fg])
pca_features_fg_seg = scaler.fit_transform(pca_features_fg_seg)
pca_features_rgb = np.zeros((batch_size * n_patches * n_patches, 3))
pca_features_rgb[pca_features_bg] = 0
pca_features_rgb[pca_features_fg] = pca_features_fg_seg
pca_features_rgb = pca_features_rgb.reshape(batch_size, n_patches, n_patches, 3)
if interpolate:
# transformed into torch tensor
pca_features_rgb = torch.from_numpy(pca_features_rgb) # B, P, P, 3
# reshaped to B, C, P, P
pca_features_rgb = pca_features_rgb.permute(0, 3, 1, 2)
# interpolate to B, C, H, W
# reshaped to B, H, W, C
# unbind to a list of len B with np.ndarray of shape H, W, C
pca_features_rgb = F.interpolate(
pca_features_rgb,
size=(original_height, original_width),
mode='bilinear',
align_corners=False
).permute(0, 2, 3, 1).unbind(0)
# Fixing range to np.uint8
else:
pca_features_rgb = [f for f in pca_features_rgb]
# Adding to final_frames list
final_frames.extend(pca_features_rgb)
return final_frames
def create_video_from_frames_rgb(
frame_list: List[np.ndarray],
output_filename: str = "animation.mp4",
fps: int = 15
) -> str:
# Get the shape of the frames to determine video dimensions
frame_height, frame_width, _ = frame_list[0].shape
# Define the codec and create a VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # You can change the codec as needed
out = cv2.VideoWriter(output_filename, fourcc, fps, (frame_width, frame_height))
for frame in frame_list:
# Write the frame to the video file
out.write(np.uint8(frame*255))
# Release the VideoWriter object
out.release()
return output_filename