|
import datetime |
|
from glob import glob |
|
import cv2 |
|
import os |
|
from tqdm import tqdm |
|
|
|
def mp4_to_png(input_path: str, save_path: str, scale_factor: float) -> str: |
|
""" Converts mp4 to pngs for each frame of the video. |
|
Args: input_path is the path to the mp4 file, save_path is the directory to save the frames. |
|
Returns: save_path, fps the number of frames per second. |
|
""" |
|
|
|
fps = int(cv2.VideoCapture(input_path).get(cv2.CAP_PROP_FPS)) |
|
|
|
os.system(f"ffmpeg -i {input_path} -vf 'scale=iw*{scale_factor}:ih*{scale_factor}, fps={fps}' {save_path}/frame%08d.png") |
|
return fps |
|
|
|
def frame_to_timestamp(frame_number: int, fps: int): |
|
|
|
timestamp_seconds = frame_number / fps |
|
|
|
|
|
minutes, seconds = divmod(timestamp_seconds, 60) |
|
hours, minutes = divmod(minutes, 60) |
|
|
|
timestamp = f"{int(hours):02d}:{int(minutes):02d}:{seconds:06.3f}" |
|
return timestamp |
|
|
|
def vid_stitcher(frames_dir: str, output_path: str, fps: int = 30) -> str: |
|
""" |
|
Takes a list of frames as numpy arrays and writes them to a video file. |
|
""" |
|
|
|
frame_list = sorted(glob(os.path.join(frames_dir, 'frame*.png'))) |
|
|
|
|
|
frame = cv2.imread(frame_list[0]) |
|
height, width, _ = frame.shape |
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor |
|
with ThreadPoolExecutor() as executor: |
|
frames = list(executor.map(cv2.imread, frame_list)) |
|
|
|
|
|
with tqdm(total=len(frame_list), desc='Stitching frames') as pbar: |
|
for frame in frames: |
|
out.write(frame) |
|
pbar.update(1) |
|
|
|
return output_path |