Spaces:
No application file
No application file
# Copyright 2022 Lunar Ring. All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import subprocess | |
import os | |
import numpy as np | |
from tqdm import tqdm | |
import cv2 | |
from typing import Callable, List, Optional, Union | |
import ffmpeg # pip install ffmpeg-python. if error with broken pipe: conda update ffmpeg | |
#%% | |
class MovieSaver(): | |
def __init__( | |
self, | |
fp_out: str, | |
fps: int = 24, | |
shape_hw: List[int] = None, | |
crf: int = 24, | |
codec: str = 'libx264', | |
preset: str ='fast', | |
pix_fmt: str = 'yuv420p', | |
silent_ffmpeg: bool = True | |
): | |
r""" | |
Initializes movie saver class - a human friendly ffmpeg wrapper. | |
After you init the class, you can dump numpy arrays x into moviesaver.write_frame(x). | |
Don't forget toi finalize movie file with moviesaver.finalize(). | |
Args: | |
fp_out: str | |
Output file name. If it already exists, it will be deleted. | |
fps: int | |
Frames per second. | |
shape_hw: List[int, int] | |
Output shape, optional argument. Can be initialized automatically when first frame is written. | |
crf: int | |
ffmpeg doc: the range of the CRF scale is 0–51, where 0 is lossless | |
(for 8 bit only, for 10 bit use -qp 0), 23 is the default, and 51 is worst quality possible. | |
A lower value generally leads to higher quality, and a subjectively sane range is 17–28. | |
Consider 17 or 18 to be visually lossless or nearly so; | |
it should look the same or nearly the same as the input but it isn't technically lossless. | |
The range is exponential, so increasing the CRF value +6 results in | |
roughly half the bitrate / file size, while -6 leads to roughly twice the bitrate. | |
codec: int | |
Number of diffusion steps. Larger values will take more compute time. | |
preset: str | |
Choose between ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow. | |
ffmpeg doc: A preset is a collection of options that will provide a certain encoding speed | |
to compression ratio. A slower preset will provide better compression | |
(compression is quality per filesize). | |
This means that, for example, if you target a certain file size or constant bit rate, | |
you will achieve better quality with a slower preset. Similarly, for constant quality encoding, | |
you will simply save bitrate by choosing a slower preset. | |
pix_fmt: str | |
Pixel format. Run 'ffmpeg -pix_fmts' in your shell to see all options. | |
silent_ffmpeg: bool | |
Surpress the output from ffmpeg. | |
""" | |
if len(os.path.split(fp_out)[0]) > 0: | |
assert os.path.isdir(os.path.split(fp_out)[0]), "Directory does not exist!" | |
self.fp_out = fp_out | |
self.fps = fps | |
self.crf = crf | |
self.pix_fmt = pix_fmt | |
self.codec = codec | |
self.preset = preset | |
self.silent_ffmpeg = silent_ffmpeg | |
if os.path.isfile(fp_out): | |
os.remove(fp_out) | |
self.init_done = False | |
self.nmb_frames = 0 | |
if shape_hw is None: | |
self.shape_hw = [-1, 1] | |
else: | |
if len(shape_hw) == 2: | |
shape_hw.append(3) | |
self.shape_hw = shape_hw | |
self.initialize() | |
print(f"MovieSaver initialized. fps={fps} crf={crf} pix_fmt={pix_fmt} codec={codec} preset={preset}") | |
def initialize(self): | |
args = ( | |
ffmpeg | |
.input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(self.shape_hw[1], self.shape_hw[0]), framerate=self.fps) | |
.output(self.fp_out, crf=self.crf, pix_fmt=self.pix_fmt, c=self.codec, preset=self.preset) | |
.overwrite_output() | |
.compile() | |
) | |
if self.silent_ffmpeg: | |
self.ffmpg_process = subprocess.Popen(args, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) | |
else: | |
self.ffmpg_process = subprocess.Popen(args, stdin=subprocess.PIPE) | |
self.init_done = True | |
self.shape_hw = tuple(self.shape_hw) | |
print(f"Initialization done. Movie shape: {self.shape_hw}") | |
def write_frame(self, out_frame: np.ndarray): | |
r""" | |
Function to dump a numpy array as frame of a movie. | |
Args: | |
out_frame: np.ndarray | |
Numpy array, in np.uint8 format. Convert with np.astype(x, np.uint8). | |
Dim 0: y | |
Dim 1: x | |
Dim 2: RGB | |
""" | |
assert out_frame.dtype == np.uint8, "Convert to np.uint8 before" | |
assert len(out_frame.shape) == 3, "out_frame needs to be three dimensional, Y X C" | |
assert out_frame.shape[2] == 3, f"need three color channels, but you provided {out_frame.shape[2]}." | |
if not self.init_done: | |
self.shape_hw = out_frame.shape | |
self.initialize() | |
assert self.shape_hw == out_frame.shape, f"You cannot change the image size after init. Initialized with {self.shape_hw}, out_frame {out_frame.shape}" | |
# write frame | |
self.ffmpg_process.stdin.write( | |
out_frame | |
.astype(np.uint8) | |
.tobytes() | |
) | |
self.nmb_frames += 1 | |
def finalize(self): | |
r""" | |
Call this function to finalize the movie. If you forget to call it your movie will be garbage. | |
""" | |
if self.nmb_frames == 0: | |
print("You did not write any frames yet! nmb_frames = 0. Cannot save.") | |
return | |
self.ffmpg_process.stdin.close() | |
self.ffmpg_process.wait() | |
duration = int(self.nmb_frames / self.fps) | |
print(f"Movie saved, {duration}s playtime, watch here: \n{self.fp_out}") | |
def concatenate_movies(fp_final: str, list_fp_movies: List[str]): | |
r""" | |
Concatenate multiple movie segments into one long movie, using ffmpeg. | |
Parameters | |
---------- | |
fp_final : str | |
Full path of the final movie file. Should end with .mp4 | |
list_fp_movies : list[str] | |
List of full paths of movie segments. | |
""" | |
assert fp_final[-4] == ".", "fp_final seems to miss file extension: {fp_final}" | |
for fp in list_fp_movies: | |
assert os.path.isfile(fp), f"Input movie does not exist: {fp}" | |
assert os.path.getsize(fp) > 100, f"Input movie seems empty: {fp}" | |
if os.path.isfile(fp_final): | |
os.remove(fp_final) | |
# make a list for ffmpeg | |
list_concat = [] | |
for fp_part in list_fp_movies: | |
list_concat.append(f"""file '{fp_part}'""") | |
# save this list | |
fp_list = "tmp_move.txt" | |
with open(fp_list, "w") as fa: | |
for item in list_concat: | |
fa.write("%s\n" % item) | |
cmd = f'ffmpeg -f concat -safe 0 -i {fp_list} -c copy {fp_final}' | |
dp_movie = os.path.split(fp_final)[0] | |
subprocess.call(cmd, shell=True) | |
os.remove(fp_list) | |
if os.path.isfile(fp_final): | |
print(f"concatenate_movies: success! Watch here: {fp_final}") | |
class MovieReader(): | |
r""" | |
Class to read in a movie. | |
""" | |
def __init__(self, fp_movie): | |
self.video_player_object = cv2.VideoCapture(fp_movie) | |
self.nmb_frames = int(self.video_player_object.get(cv2.CAP_PROP_FRAME_COUNT)) | |
self.fps_movie = int(self.video_player_object.get(cv2.CAP_PROP_FPS)) | |
self.shape = [100,100,3] | |
self.shape_is_set = False | |
def get_next_frame(self): | |
success, image = self.video_player_object.read() | |
if success: | |
if not self.shape_is_set: | |
self.shape_is_set = True | |
self.shape = image.shape | |
return image | |
else: | |
return np.zeros(self.shape) | |
#%% | |
if __name__ == "__main__": | |
fps=2 | |
list_fp_movies = [] | |
for k in range(4): | |
fp_movie = f"/tmp/my_random_movie_{k}.mp4" | |
list_fp_movies.append(fp_movie) | |
ms = MovieSaver(fp_movie, fps=fps) | |
for fn in tqdm(range(30)): | |
img = (np.random.rand(512, 1024, 3)*255).astype(np.uint8) | |
ms.write_frame(img) | |
ms.finalize() | |
fp_final = "/tmp/my_concatenated_movie.mp4" | |
concatenate_movies(fp_final, list_fp_movies) | |