jhj0517 commited on
Commit
e10749b
1 Parent(s): e492152

Add video util

Browse files
Files changed (1) hide show
  1. modules/utils/video_helper.py +272 -0
modules/utils/video_helper.py ADDED
@@ -0,0 +1,272 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import subprocess
2
+ import os
3
+ from typing import List, Optional, Union
4
+ from PIL import Image
5
+ import numpy as np
6
+ from dataclasses import dataclass
7
+ import re
8
+ from pathlib import Path
9
+
10
+ from modules.utils.constants import SOUND_FILE_EXT, VIDEO_FILE_EXT, IMAGE_FILE_EXT, TRANSPARENT_VIDEO_FILE_EXT
11
+ from modules.utils.paths import TEMP_VIDEO_FRAMES_DIR, TEMP_VIDEO_OUT_FRAMES_DIR
12
+
13
+
14
+ @dataclass
15
+ class VideoInfo:
16
+ num_frames: Optional[int] = None
17
+ frame_rate: Optional[int] = None
18
+ duration: Optional[float] = None
19
+ has_sound: Optional[bool] = None
20
+ codec: Optional[str] = None
21
+
22
+
23
+ def extract_frames(
24
+ vid_input: str,
25
+ output_temp_dir: str = TEMP_VIDEO_FRAMES_DIR,
26
+ start_number: int = 0
27
+ ):
28
+ """
29
+ Extract frames as jpg files and save them into output_temp_dir. This needs FFmpeg installed.
30
+ """
31
+ os.makedirs(output_temp_dir, exist_ok=True)
32
+ output_path = os.path.join(output_temp_dir, "%05d.jpg")
33
+
34
+ command = [
35
+ 'ffmpeg',
36
+ '-y', # Enable overwriting
37
+ '-i', vid_input,
38
+ '-qscale:v', '2',
39
+ '-vf', f'scale=iw:ih',
40
+ '-start_number', str(start_number),
41
+ f'{output_path}'
42
+ ]
43
+
44
+ try:
45
+ subprocess.run(command, check=True)
46
+ except subprocess.CalledProcessError as e:
47
+ print("Error occurred while extracting frames from the video")
48
+ raise RuntimeError(f"An error occurred: {str(e)}")
49
+
50
+ return get_frames_from_dir(output_temp_dir)
51
+
52
+
53
+ def extract_sound(
54
+ vid_input: str,
55
+ output_temp_dir: str = TEMP_VIDEO_FRAMES_DIR,
56
+ ):
57
+ """
58
+ Extract audio from a video file and save it as a separate sound file. This needs FFmpeg installed.
59
+ """
60
+ if Path(vid_input).suffix == ".gif":
61
+ print("Sound extracting process has passed because gif has no sound")
62
+ return None
63
+
64
+ os.makedirs(output_temp_dir, exist_ok=True)
65
+ output_path = os.path.join(output_temp_dir, "sound.mp3")
66
+
67
+ command = [
68
+ 'ffmpeg',
69
+ '-y', # Enable overwriting
70
+ '-i', vid_input,
71
+ '-vn',
72
+ output_path
73
+ ]
74
+
75
+ try:
76
+ subprocess.run(command, check=True)
77
+ except subprocess.CalledProcessError as e:
78
+ print(f"Warning: Failed to extract sound from the video: {e}")
79
+
80
+ return output_path
81
+
82
+
83
+ def get_video_info(vid_input: str) -> VideoInfo:
84
+ """
85
+ Extract video information using ffmpeg.
86
+ """
87
+ command = [
88
+ 'ffmpeg',
89
+ '-i', vid_input,
90
+ '-map', '0:v:0',
91
+ '-c', 'copy',
92
+ '-f', 'null',
93
+ '-'
94
+ ]
95
+
96
+ try:
97
+ result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
98
+ encoding='utf-8', errors='replace', check=True)
99
+ output = result.stderr
100
+
101
+ num_frames = None
102
+ frame_rate = None
103
+ duration = None
104
+ has_sound = False
105
+ codec = None
106
+
107
+ for line in output.splitlines():
108
+ if 'Stream #0:0' in line and 'Video:' in line:
109
+ fps_match = re.search(r'(\d+(?:\.\d+)?) fps', line)
110
+ if fps_match:
111
+ frame_rate = float(fps_match.group(1))
112
+
113
+ codec_match = re.search(r'Video: (\w+)', line)
114
+ if codec_match:
115
+ codec = codec_match.group(1)
116
+
117
+ elif 'Duration:' in line:
118
+ duration_match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2}\.\d{2})', line)
119
+ if duration_match:
120
+ h, m, s = map(float, duration_match.groups())
121
+ duration = h * 3600 + m * 60 + s
122
+
123
+ elif 'Stream' in line and 'Audio:' in line:
124
+ has_sound = True
125
+
126
+ if frame_rate and duration:
127
+ num_frames = int(frame_rate * duration)
128
+
129
+ return VideoInfo(
130
+ num_frames=num_frames,
131
+ frame_rate=frame_rate,
132
+ duration=duration,
133
+ has_sound=has_sound,
134
+ codec=codec
135
+ )
136
+
137
+ except subprocess.CalledProcessError as e:
138
+ print("Error occurred while getting info from the video")
139
+ return VideoInfo()
140
+
141
+
142
+ def create_video_from_frames(
143
+ frames_dir: str,
144
+ frame_rate: Optional[int] = None,
145
+ sound_path: Optional[str] = None,
146
+ output_dir: Optional[str] = None,
147
+ output_mime_type: Optional[str] = None,
148
+ ):
149
+ """
150
+ Create a video from frames and save it to the output_path. This needs FFmpeg installed.
151
+ """
152
+ if not os.path.exists(frames_dir):
153
+ raise "frames_dir does not exist"
154
+
155
+ if output_dir is None:
156
+ output_dir = TEMP_VIDEO_OUT_FRAMES_DIR
157
+ os.makedirs(output_dir, exist_ok=True)
158
+
159
+ frame_img_mime_type = ".png"
160
+ pix_format = "yuv420p"
161
+ vid_codec, audio_codec = "libx264", "aac"
162
+
163
+ if output_mime_type is None:
164
+ output_mime_type = ".mp4"
165
+
166
+ output_mime_type = output_mime_type.lower()
167
+ if output_mime_type == ".mov":
168
+ pix_format = "yuva444p10le"
169
+ vid_codec, audio_codec = "prores_ks", "aac"
170
+
171
+ elif output_mime_type == ".webm":
172
+ pix_format = "yuva420p"
173
+ vid_codec, audio_codec = "libvpx-vp9", "libvorbis"
174
+
175
+ elif output_mime_type == ".gif":
176
+ pix_format = None
177
+ vid_codec, audio_codec = "gif", None
178
+
179
+ num_files = len(os.listdir(output_dir))
180
+ filename = f"{num_files:05d}{output_mime_type}"
181
+ output_path = os.path.join(output_dir, filename)
182
+
183
+ if sound_path is None:
184
+ temp_sound = os.path.join(TEMP_VIDEO_FRAMES_DIR, "sound.mp3")
185
+ if os.path.exists(temp_sound):
186
+ sound_path = temp_sound
187
+
188
+ if frame_rate is None:
189
+ frame_rate = 25 # Default frame rate for ffmpeg
190
+
191
+ command = [
192
+ 'ffmpeg',
193
+ '-y',
194
+ '-framerate', str(frame_rate),
195
+ '-i', os.path.join(frames_dir, f"%05d{frame_img_mime_type}"),
196
+ '-c:v', vid_codec,
197
+ ]
198
+
199
+ if output_mime_type == ".gif":
200
+ command += [
201
+ "-filter_complex", "[0:v] palettegen=reserve_transparent=on [p]; [0:v][p] paletteuse",
202
+ "-loop", "0"
203
+ ]
204
+ else:
205
+ command += [
206
+ '-pix_fmt', pix_format
207
+ ]
208
+
209
+ command += [output_path]
210
+
211
+ if output_mime_type != ".gif" and sound_path is not None:
212
+ command += [
213
+ '-i', sound_path,
214
+ '-c:a', audio_codec,
215
+ '-strict', 'experimental',
216
+ '-b:a', '192k',
217
+ '-shortest'
218
+ ]
219
+ try:
220
+ subprocess.run(command, check=True)
221
+ except subprocess.CalledProcessError as e:
222
+ print("Error occurred while creating video from frames")
223
+ return output_path
224
+
225
+
226
+ def get_frames_from_dir(vid_dir: str,
227
+ available_extensions: Optional[Union[List, str]] = None,
228
+ as_numpy: bool = False) -> List:
229
+ """Get image file paths list from the dir"""
230
+ if available_extensions is None:
231
+ available_extensions = [".jpg", ".jpeg", ".JPG", ".JPEG"]
232
+
233
+ if isinstance(available_extensions, str):
234
+ available_extensions = [available_extensions]
235
+
236
+ frame_names = [
237
+ p for p in os.listdir(vid_dir)
238
+ if os.path.splitext(p)[-1] in available_extensions
239
+ ]
240
+ if not frame_names:
241
+ return []
242
+ frame_names.sort(key=lambda x: int(os.path.splitext(x)[0]))
243
+
244
+ frames = [os.path.join(vid_dir, name) for name in frame_names]
245
+ if as_numpy:
246
+ frames = [np.array(Image.open(frame)) for frame in frames]
247
+
248
+ return frames
249
+
250
+
251
+ def clean_temp_dir(temp_dir: Optional[str] = None):
252
+ """Removes media files from the video frames directory."""
253
+ if temp_dir is None:
254
+ temp_dir = TEMP_VIDEO_FRAMES_DIR
255
+ temp_out_dir = TEMP_VIDEO_OUT_FRAMES_DIR
256
+ else:
257
+ temp_out_dir = os.path.join(temp_dir, "out")
258
+
259
+ clean_files_with_extension(temp_dir, SOUND_FILE_EXT)
260
+ clean_files_with_extension(temp_dir, IMAGE_FILE_EXT)
261
+ clean_files_with_extension(temp_out_dir, IMAGE_FILE_EXT)
262
+
263
+
264
+ def clean_files_with_extension(dir_path: str, extensions: List):
265
+ """Remove files with the given extensions from the directory."""
266
+ for filename in os.listdir(dir_path):
267
+ if filename.lower().endswith(tuple(extensions)):
268
+ file_path = os.path.join(dir_path, filename)
269
+ try:
270
+ os.remove(file_path)
271
+ except Exception as e:
272
+ print("Error while removing image files")