|
|
|
import cv2 |
|
import numpy as np |
|
import os |
|
|
|
|
|
def flowread(flow_path, quantize=False, concat_axis=0, *args, **kwargs): |
|
"""Read an optical flow map. |
|
|
|
Args: |
|
flow_path (ndarray or str): Flow path. |
|
quantize (bool): whether to read quantized pair, if set to True, |
|
remaining args will be passed to :func:`dequantize_flow`. |
|
concat_axis (int): The axis that dx and dy are concatenated, |
|
can be either 0 or 1. Ignored if quantize is False. |
|
|
|
Returns: |
|
ndarray: Optical flow represented as a (h, w, 2) numpy array |
|
""" |
|
if quantize: |
|
assert concat_axis in [0, 1] |
|
cat_flow = cv2.imread(flow_path, cv2.IMREAD_UNCHANGED) |
|
if cat_flow.ndim != 2: |
|
raise IOError(f'{flow_path} is not a valid quantized flow file, its dimension is {cat_flow.ndim}.') |
|
assert cat_flow.shape[concat_axis] % 2 == 0 |
|
dx, dy = np.split(cat_flow, 2, axis=concat_axis) |
|
flow = dequantize_flow(dx, dy, *args, **kwargs) |
|
else: |
|
with open(flow_path, 'rb') as f: |
|
try: |
|
header = f.read(4).decode('utf-8') |
|
except Exception: |
|
raise IOError(f'Invalid flow file: {flow_path}') |
|
else: |
|
if header != 'PIEH': |
|
raise IOError(f'Invalid flow file: {flow_path}, header does not contain PIEH') |
|
|
|
w = np.fromfile(f, np.int32, 1).squeeze() |
|
h = np.fromfile(f, np.int32, 1).squeeze() |
|
flow = np.fromfile(f, np.float32, w * h * 2).reshape((h, w, 2)) |
|
|
|
return flow.astype(np.float32) |
|
|
|
|
|
def flowwrite(flow, filename, quantize=False, concat_axis=0, *args, **kwargs): |
|
"""Write optical flow to file. |
|
|
|
If the flow is not quantized, it will be saved as a .flo file losslessly, |
|
otherwise a jpeg image which is lossy but of much smaller size. (dx and dy |
|
will be concatenated horizontally into a single image if quantize is True.) |
|
|
|
Args: |
|
flow (ndarray): (h, w, 2) array of optical flow. |
|
filename (str): Output filepath. |
|
quantize (bool): Whether to quantize the flow and save it to 2 jpeg |
|
images. If set to True, remaining args will be passed to |
|
:func:`quantize_flow`. |
|
concat_axis (int): The axis that dx and dy are concatenated, |
|
can be either 0 or 1. Ignored if quantize is False. |
|
""" |
|
if not quantize: |
|
with open(filename, 'wb') as f: |
|
f.write('PIEH'.encode('utf-8')) |
|
np.array([flow.shape[1], flow.shape[0]], dtype=np.int32).tofile(f) |
|
flow = flow.astype(np.float32) |
|
flow.tofile(f) |
|
f.flush() |
|
else: |
|
assert concat_axis in [0, 1] |
|
dx, dy = quantize_flow(flow, *args, **kwargs) |
|
dxdy = np.concatenate((dx, dy), axis=concat_axis) |
|
os.makedirs(os.path.dirname(filename), exist_ok=True) |
|
cv2.imwrite(filename, dxdy) |
|
|
|
|
|
def quantize_flow(flow, max_val=0.02, norm=True): |
|
"""Quantize flow to [0, 255]. |
|
|
|
After this step, the size of flow will be much smaller, and can be |
|
dumped as jpeg images. |
|
|
|
Args: |
|
flow (ndarray): (h, w, 2) array of optical flow. |
|
max_val (float): Maximum value of flow, values beyond |
|
[-max_val, max_val] will be truncated. |
|
norm (bool): Whether to divide flow values by image width/height. |
|
|
|
Returns: |
|
tuple[ndarray]: Quantized dx and dy. |
|
""" |
|
h, w, _ = flow.shape |
|
dx = flow[..., 0] |
|
dy = flow[..., 1] |
|
if norm: |
|
dx = dx / w |
|
dy = dy / h |
|
|
|
flow_comps = [quantize(d, -max_val, max_val, 255, np.uint8) for d in [dx, dy]] |
|
return tuple(flow_comps) |
|
|
|
|
|
def dequantize_flow(dx, dy, max_val=0.02, denorm=True): |
|
"""Recover from quantized flow. |
|
|
|
Args: |
|
dx (ndarray): Quantized dx. |
|
dy (ndarray): Quantized dy. |
|
max_val (float): Maximum value used when quantizing. |
|
denorm (bool): Whether to multiply flow values with width/height. |
|
|
|
Returns: |
|
ndarray: Dequantized flow. |
|
""" |
|
assert dx.shape == dy.shape |
|
assert dx.ndim == 2 or (dx.ndim == 3 and dx.shape[-1] == 1) |
|
|
|
dx, dy = [dequantize(d, -max_val, max_val, 255) for d in [dx, dy]] |
|
|
|
if denorm: |
|
dx *= dx.shape[1] |
|
dy *= dx.shape[0] |
|
flow = np.dstack((dx, dy)) |
|
return flow |
|
|
|
|
|
def quantize(arr, min_val, max_val, levels, dtype=np.int64): |
|
"""Quantize an array of (-inf, inf) to [0, levels-1]. |
|
|
|
Args: |
|
arr (ndarray): Input array. |
|
min_val (scalar): Minimum value to be clipped. |
|
max_val (scalar): Maximum value to be clipped. |
|
levels (int): Quantization levels. |
|
dtype (np.type): The type of the quantized array. |
|
|
|
Returns: |
|
tuple: Quantized array. |
|
""" |
|
if not (isinstance(levels, int) and levels > 1): |
|
raise ValueError(f'levels must be a positive integer, but got {levels}') |
|
if min_val >= max_val: |
|
raise ValueError(f'min_val ({min_val}) must be smaller than max_val ({max_val})') |
|
|
|
arr = np.clip(arr, min_val, max_val) - min_val |
|
quantized_arr = np.minimum(np.floor(levels * arr / (max_val - min_val)).astype(dtype), levels - 1) |
|
|
|
return quantized_arr |
|
|
|
|
|
def dequantize(arr, min_val, max_val, levels, dtype=np.float64): |
|
"""Dequantize an array. |
|
|
|
Args: |
|
arr (ndarray): Input array. |
|
min_val (scalar): Minimum value to be clipped. |
|
max_val (scalar): Maximum value to be clipped. |
|
levels (int): Quantization levels. |
|
dtype (np.type): The type of the dequantized array. |
|
|
|
Returns: |
|
tuple: Dequantized array. |
|
""" |
|
if not (isinstance(levels, int) and levels > 1): |
|
raise ValueError(f'levels must be a positive integer, but got {levels}') |
|
if min_val >= max_val: |
|
raise ValueError(f'min_val ({min_val}) must be smaller than max_val ({max_val})') |
|
|
|
dequantized_arr = (arr + 0.5).astype(dtype) * (max_val - min_val) / levels + min_val |
|
|
|
return dequantized_arr |
|
|