dubbing / video.py
adastmin's picture
Upload 18 files
597a3c5
"""
The Video class represents a reference to a video from either a file or web link. This class should implement the ncessary info to dub a video.
"""
from io import StringIO
import time
import ffmpeg
from yt_dlp import YoutubeDL
import utils
from pydub import AudioSegment
from dub_line import load_subs
import json
import numpy as np
import librosa
import soundfile as sf
class Video:
def __init__(self, video_URL, loading_progress_hook=print):
self.start_time = self.end_time = 0
self.downloaded = False
self.subs = self.subs_adjusted = self.subs_removed = []
self.background_track = self.vocal_track = None
self.speech_diary = self.speech_diary_adjusted = None
self.load_video(video_URL, loading_progress_hook)
# This is responsible for loading the app's audio and subtitles from a video file or YT link
def load_video(self, video_path, progress_hook=print):
sub_path = ""
if video_path.startswith("http"):
self.downloaded = True
try:
video_path, sub_path, self.yt_sub_streams = self.download_video(video_path, progress_hook)
except: return
progress_hook({"status":"complete"})
else:
self.downloaded = False
self.file = video_path
if not (self.downloaded and not sub_path):
try:
self.subs = self.subs_adjusted = load_subs(utils.get_output_path(self.file, '.srt'), sub_path or video_path)
except:
progress_hook({"status": "subless"})
self.audio = AudioSegment.from_file(video_path)
self.duration = float(ffmpeg.probe(video_path)["format"]["duration"])
if self.subs:
self.update_time(0, self.duration)
def download_video(self, link, progress_hook=print):
options = {
'outtmpl': 'output/%(id)s.%(ext)s',
'writesubtitles': True,
"subtitleslangs": ["all"],
"progress_hooks": (progress_hook,)
}
try:
with YoutubeDL(options) as ydl:
info = ydl.extract_info(link)
return ydl.prepare_filename(info), list(info["subtitles"].values())[0][-1]["filepath"] if info["subtitles"] else None, info["subtitles"]
except Exception as e:
print('AHHH\n',e,'\nAHHHHHH')
progress_hook({"status": "error", "error": e})
raise e
def update_time(self, start, end):
self.start_time = start
self.end_time = end
# clamp the subs to the crop time specified
start_line = utils.find_nearest([sub.start for sub in self.subs], start)
end_line = utils.find_nearest([sub.start for sub in self.subs], end)
self.subs_adjusted = self.subs[start_line:end_line]
if self.speech_diary:
self.update_diary_timing()
def list_streams(self):
probe = ffmpeg.probe(self.file)["streams"]
if self.downloaded:
subs = [{"name": stream[-1]['name'], "stream": stream[-1]['filepath']} for stream in self.yt_sub_streams.values()]
else:
subs = [{"name": stream['tags'].get('language', 'unknown'), "stream": stream['index']} for stream in probe if stream["codec_type"] == "subtitle"]
return {
"audio": [stream for stream in probe if stream["codec_type"] == "audio"],
"subs": subs
}
def get_snippet(self, start, end):
return self.audio[start*1000:end*1000]
# Crops the video's audio segment to reduce memory size
def crop_audio(self, isolated_vocals):
# ffmpeg -i .\saiki.mkv -vn -ss 84 -to 1325 crop.wav
source_file = self.vocal_track if isolated_vocals and self.vocal_track else self.file
output = utils.get_output_path(source_file, "-crop.wav")
(
ffmpeg
.input(self.file, ss=self.start_time, to=self.end_time)
.output(output)
.global_args('-loglevel', 'error')
.global_args('-vn')
.run(overwrite_output=True)
)
return output
def filter_multilingual_subtiles(self, progress_hook=print, exclusion="English"):
multi_lingual_subs = []
removed_subs = []
# Speechbrain is being a lil bitch about this path on Windows all of the sudden
snippet_path = "video_snippet.wav" # utils.get_output_path('video_snippet', '.wav')
for i, sub in enumerate(self.subs_adjusted):
self.get_snippet(sub.start, sub.end).export(snippet_path, format="wav")
if sub.get_language(snippet_path) != exclusion:
multi_lingual_subs.append(sub)
else:
removed_subs.append(sub)
progress_hook(i, f"{i}/{len(self.subs_adjusted)}: {sub.text}")
self.subs_adjusted = multi_lingual_subs
self.subs_removed = removed_subs
progress_hook(-1, "done")
# This funxion is is used to only get the snippets of the audio that appear in subs_adjusted after language filtration or cropping, irregardless of the vocal splitting.
# This should be called AFTER filter multilingual and BEFORE vocal isolation. Not useful yet
# OKAY THERE HAS TO BE A FASTER WAY TO DO THIS X_X
# def isolate_subs(self):
# base = AudioSegment.silent(duration=self.duration*1000, frame_rate=self.audio.frame_rate, channels=self.audio.channels, frame_width=self.audio.frame_width)
# samples = np.array(base.get_array_of_samples())
# frame_rate = base.frame_rate
# for sub in self.subs_adjusted:
# copy = np.array(self.get_snippet(sub.start, sub.end).get_array_of_samples())
# start_sample = int(sub.start * frame_rate)
# end_sample = int(sub.end * frame_rate)
# # Ensure that the copy array has the same length as the region to replace
# copy = copy[:end_sample - start_sample] # Trim if necessary
# samples[start_sample:end_sample] = copy
# return AudioSegment(
# samples.tobytes(),
# frame_rate=frame_rate,
# sample_width=base.sample_width, # Adjust sample_width as needed (2 bytes for int16)
# channels=base.channels
# )
def isolate_subs(self, subs):
empty_audio = AudioSegment.silent(self.duration * 1000, frame_rate=self.audio.frame_rate)
empty_audio = self.audio
first_sub = subs[0]
empty_audio = empty_audio[0:first_sub.start].silent((first_sub.end-first_sub.start)*1000)
for i, sub in enumerate(subs[:-1]):
print(sub.text)
empty_audio = empty_audio[sub.end:subs[i+1].start].silent((subs[i+1].start-sub.end)*1000, frame_rate=empty_audio.frame_rate, channels=empty_audio.channels, sample_width=empty_audio.sample_width, frame_width=empty_audio.frame_width)
return empty_audio
def run_dubbing(self, progress_hook=None):
total_errors = 0
operation_start_time = time.process_time()
empty_audio = AudioSegment.silent(self.duration * 1000, frame_rate=22050)
status = ""
# with concurrent.futures.ThreadPoolExecutor(max_workers=100) as pool:
# tasks = [pool.submit(dub_task, sub, i) for i, sub in enumerate(subs_adjusted)]
# for future in concurrent.futures.as_completed(tasks):
# pass
for i, sub in enumerate(self.subs_adjusted):
status = f"{i}/{len(self.subs_adjusted)}"
progress_hook(i, f"{status}: {sub.text}")
try:
line = sub.dub_line_file(False)
empty_audio = empty_audio.overlay(line, sub.start*1000)
except Exception as e:
print(e)
total_errors += 1
self.dub_track = empty_audio.export(utils.get_output_path(self.file, '-dubtrack.wav'), format="wav").name
progress_hook(i+1, "Mixing New Audio")
self.mix_av(mixing_ratio=1)
progress_hook(-1)
print(f"TOTAL TIME TAKEN: {time.process_time() - operation_start_time}")
# print(total_errors)
# This runs an ffmpeg command to combine the audio, video, and subtitles with a specific ratio of how loud to make the dubtrack
def mix_av(self, mixing_ratio=1, dubtrack=None, output_path=None):
# i hate python, plz let me use self in func def
if not dubtrack: dubtrack = self.dub_track
if not output_path: output_path = utils.get_output_path(self.file, '-dubbed.mkv')
input_video = ffmpeg.input(self.file)
input_audio = input_video.audio
if self.background_track:
input_audio = ffmpeg.input(self.background_track)
input_dub = ffmpeg.input(dubtrack).audio
mixed_audio = ffmpeg.filter([input_audio, input_dub], 'amix', duration='first', weights=f"1 {mixing_ratio}")
output = (
# input_video['s']
ffmpeg.output(input_video['v'], mixed_audio, output_path, vcodec="copy", acodec="aac")
.global_args('-loglevel', 'error')
.global_args('-shortest')
)
ffmpeg.run(output, overwrite_output=True)
# Change the subs to either a file or a different stream from the video file
def change_subs(self, stream_index=-1):
if self.downloaded:
sub_path = list(self.yt_sub_streams.values())[stream_index][-1]['filepath']
self.subs = self.subs_adjusted = load_subs(utils.get_output_path(sub_path, '.srt'), sub_path)
else:
# ffmpeg -i output.mkv -map 0:s:1 frick.srt
sub_path = utils.get_output_path(self.file, '.srt')
ffmpeg.input(self.file).output(sub_path, map=f"0:s:{stream_index}").run(overwrite_output=True)
self.subs = self.subs_adjusted = load_subs(sub_path)
def change_audio(self, stream_index=-1):
audio_path = utils.get_output_path(self.file, f"-${stream_index}.wav")
ffmpeg.input(self.file).output(audio_path, map=f"0:a:{stream_index}").run(overwrite_output=True)
self.audio = AudioSegment.from_file(audio_path)