Spaces:
Running
on
L40S
Running
on
L40S
import cv2 | |
import torch | |
import numpy as np | |
import imageio | |
def aug_matrix(w1, h1, w2, h2): | |
dx = (w2 - w1) / 2.0 | |
dy = (h2 - h1) / 2.0 | |
matrix_trans = np.array([[1.0, 0, dx], [0, 1.0, dy], [0, 0, 1.0]]) | |
scale = np.min([float(w2) / w1, float(h2) / h1]) | |
M = get_affine_matrix(center=(w2 / 2.0, h2 / 2.0), | |
translate=(0, 0), | |
scale=scale) | |
M = np.array(M + [0., 0., 1.]).reshape(3, 3) | |
M = M.dot(matrix_trans) | |
return M | |
def get_affine_matrix(center, translate, scale): | |
cx, cy = center | |
tx, ty = translate | |
M = [1, 0, 0, 0, 1, 0] | |
M = [x * scale for x in M] | |
# Apply translation and of center translation: RSS * C^-1 | |
M[2] += M[0] * (-cx) + M[1] * (-cy) | |
M[5] += M[3] * (-cx) + M[4] * (-cy) | |
# Apply center translation: T * C * RSS * C^-1 | |
M[2] += cx + tx | |
M[5] += cy + ty | |
return M | |
class BaseStreamer(): | |
"""This streamer will return images at 512x512 size. | |
""" | |
def __init__(self, | |
width=512, | |
height=512, | |
pad=True, | |
mean=(0.5, 0.5, 0.5), | |
std=(0.5, 0.5, 0.5), | |
**kwargs): | |
self.width = width | |
self.height = height | |
self.pad = pad | |
self.mean = np.array(mean) | |
self.std = np.array(std) | |
self.loader = self.create_loader() | |
def create_loader(self): | |
raise NotImplementedError | |
yield np.zeros((600, 400, 3)) # in RGB (0, 255) | |
def __getitem__(self, index): | |
image = next(self.loader) | |
in_height, in_width, _ = image.shape | |
M = aug_matrix(in_width, in_height, self.width, self.height, self.pad) | |
image = cv2.warpAffine(image, | |
M[0:2, :], (self.width, self.height), | |
flags=cv2.INTER_CUBIC) | |
input = np.float32(image) | |
input = (input / 255.0 - self.mean) / self.std # TO [-1.0, 1.0] | |
input = input.transpose(2, 0, 1) # TO [3 x H x W] | |
return torch.from_numpy(input).float() | |
def __len__(self): | |
raise NotImplementedError | |
class CaptureStreamer(BaseStreamer): | |
"""This streamer takes webcam as input. | |
""" | |
def __init__(self, id=0, width=512, height=512, pad=True, **kwargs): | |
super().__init__(width, height, pad, **kwargs) | |
self.capture = cv2.VideoCapture(id) | |
def create_loader(self): | |
while True: | |
_, image = self.capture.read() | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # RGB | |
yield image | |
def __len__(self): | |
return 100_000_000 | |
def __del__(self): | |
self.capture.release() | |
class VideoListStreamer(BaseStreamer): | |
"""This streamer takes a list of video files as input. | |
""" | |
def __init__(self, files, width=512, height=512, pad=True, **kwargs): | |
super().__init__(width, height, pad, **kwargs) | |
self.files = files | |
self.captures = [imageio.get_reader(f) for f in files] | |
self.nframes = sum([ | |
int(cap._meta["fps"] * cap._meta["duration"]) | |
for cap in self.captures | |
]) | |
def create_loader(self): | |
for capture in self.captures: | |
for image in capture: # RGB | |
yield image | |
def __len__(self): | |
return self.nframes | |
def __del__(self): | |
for capture in self.captures: | |
capture.close() | |
class ImageListStreamer(BaseStreamer): | |
"""This streamer takes a list of image files as input. | |
""" | |
def __init__(self, files, width=512, height=512, pad=True, **kwargs): | |
super().__init__(width, height, pad, **kwargs) | |
self.files = files | |
def create_loader(self): | |
for f in self.files: | |
image = cv2.imread(f, cv2.IMREAD_UNCHANGED)[:, :, 0:3] | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # RGB | |
yield image | |
def __len__(self): | |
return len(self.files) | |