File size: 3,971 Bytes
2252f3d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
import cv2
import torch
import numpy as np
import imageio
def aug_matrix(w1, h1, w2, h2):
dx = (w2 - w1) / 2.0
dy = (h2 - h1) / 2.0
matrix_trans = np.array([[1.0, 0, dx], [0, 1.0, dy], [0, 0, 1.0]])
scale = np.min([float(w2) / w1, float(h2) / h1])
M = get_affine_matrix(center=(w2 / 2.0, h2 / 2.0),
translate=(0, 0),
scale=scale)
M = np.array(M + [0., 0., 1.]).reshape(3, 3)
M = M.dot(matrix_trans)
return M
def get_affine_matrix(center, translate, scale):
cx, cy = center
tx, ty = translate
M = [1, 0, 0, 0, 1, 0]
M = [x * scale for x in M]
# Apply translation and of center translation: RSS * C^-1
M[2] += M[0] * (-cx) + M[1] * (-cy)
M[5] += M[3] * (-cx) + M[4] * (-cy)
# Apply center translation: T * C * RSS * C^-1
M[2] += cx + tx
M[5] += cy + ty
return M
class BaseStreamer():
"""This streamer will return images at 512x512 size.
"""
def __init__(self,
width=512,
height=512,
pad=True,
mean=(0.5, 0.5, 0.5),
std=(0.5, 0.5, 0.5),
**kwargs):
self.width = width
self.height = height
self.pad = pad
self.mean = np.array(mean)
self.std = np.array(std)
self.loader = self.create_loader()
def create_loader(self):
raise NotImplementedError
yield np.zeros((600, 400, 3)) # in RGB (0, 255)
def __getitem__(self, index):
image = next(self.loader)
in_height, in_width, _ = image.shape
M = aug_matrix(in_width, in_height, self.width, self.height, self.pad)
image = cv2.warpAffine(image,
M[0:2, :], (self.width, self.height),
flags=cv2.INTER_CUBIC)
input = np.float32(image)
input = (input / 255.0 - self.mean) / self.std # TO [-1.0, 1.0]
input = input.transpose(2, 0, 1) # TO [3 x H x W]
return torch.from_numpy(input).float()
def __len__(self):
raise NotImplementedError
class CaptureStreamer(BaseStreamer):
"""This streamer takes webcam as input.
"""
def __init__(self, id=0, width=512, height=512, pad=True, **kwargs):
super().__init__(width, height, pad, **kwargs)
self.capture = cv2.VideoCapture(id)
def create_loader(self):
while True:
_, image = self.capture.read()
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # RGB
yield image
def __len__(self):
return 100_000_000
def __del__(self):
self.capture.release()
class VideoListStreamer(BaseStreamer):
"""This streamer takes a list of video files as input.
"""
def __init__(self, files, width=512, height=512, pad=True, **kwargs):
super().__init__(width, height, pad, **kwargs)
self.files = files
self.captures = [imageio.get_reader(f) for f in files]
self.nframes = sum([
int(cap._meta["fps"] * cap._meta["duration"])
for cap in self.captures
])
def create_loader(self):
for capture in self.captures:
for image in capture: # RGB
yield image
def __len__(self):
return self.nframes
def __del__(self):
for capture in self.captures:
capture.close()
class ImageListStreamer(BaseStreamer):
"""This streamer takes a list of image files as input.
"""
def __init__(self, files, width=512, height=512, pad=True, **kwargs):
super().__init__(width, height, pad, **kwargs)
self.files = files
def create_loader(self):
for f in self.files:
image = cv2.imread(f, cv2.IMREAD_UNCHANGED)[:, :, 0:3]
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # RGB
yield image
def __len__(self):
return len(self.files)
|