app-audio_video_recognize / videoclipper.py
glt3953's picture
Upload 3 files
33a8656
raw
history blame
No virus
8.35 kB
import sys
import copy
import librosa
import logging
import argparse
import numpy as np
import soundfile as sf
import moviepy.editor as mpy
# from modelscope.pipelines import pipeline
# from modelscope.utils.constant import Tasks
from subtitle_utils import generate_srt, generate_srt_clip, distribute_spk
from trans_utils import pre_proc, proc, write_state, load_state, proc_spk, generate_vad_data
# from argparse_tools import ArgumentParser, get_commandline_args
from moviepy.editor import *
from moviepy.video.tools.subtitles import SubtitlesClip
class VideoClipper():
def __init__(self, asr_pipeline, sd_pipeline=None):
logging.warning("Initializing VideoClipper.")
self.asr_pipeline = asr_pipeline
self.sd_pipeline = sd_pipeline
def recog(self, audio_input, sd_switch='no', state=None):
if state is None:
state = {}
sr, data = audio_input
assert sr == 16000, "16kHz sample rate required, {} given.".format(sr)
if len(data.shape) == 2: # multi-channel wav input
logging.warning("Input wav shape: {}, only first channel reserved.").format(data.shape)
data = data[:,0]
state['audio_input'] = (sr, data)
data = data.astype(np.float64)
rec_result = self.asr_pipeline(audio_in=data)
if sd_switch == 'yes':
vad_data = generate_vad_data(data.astype(np.float32), rec_result['sentences'], sr)
sd_result = self.sd_pipeline(audio=vad_data, batch_size=1)
rec_result['sd_sentences'] = distribute_spk(rec_result['sentences'], sd_result['text'])
res_srt = generate_srt(rec_result['sd_sentences'])
state['sd_sentences'] = rec_result['sd_sentences']
else:
res_srt = generate_srt(rec_result['sentences'])
state['recog_res_raw'] = rec_result['text_postprocessed']
state['timestamp'] = rec_result['time_stamp']
state['sentences'] = rec_result['sentences']
res_text = rec_result['text']
return res_text, res_srt, state
def clip(self, dest_text, start_ost, end_ost, state, dest_spk=None):
# get from state
audio_input = state['audio_input']
recog_res_raw = state['recog_res_raw']
timestamp = state['timestamp']
sentences = state['sentences']
sr, data = audio_input
data = data.astype(np.float64)
all_ts = []
if dest_spk is None or dest_spk == '' or 'sd_sentences' not in state:
for _dest_text in dest_text.split('#'):
_dest_text = pre_proc(_dest_text)
ts = proc(recog_res_raw, timestamp, _dest_text)
for _ts in ts: all_ts.append(_ts)
else:
for _dest_spk in dest_spk.split('#'):
ts = proc_spk(_dest_spk, state['sd_sentences'])
for _ts in ts: all_ts.append(_ts)
ts = all_ts
ts.sort()
srt_index = 0
clip_srt = ""
if len(ts):
start, end = ts[0]
start = min(max(0, start+start_ost*16), len(data))
end = min(max(0, end+end_ost*16), len(data))
res_audio = data[start:end]
start_end_info = "from {} to {}".format(start/16000, end/16000)
srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index)
clip_srt += srt_clip
for _ts in ts[1:]: # multiple sentence input or multiple output matched
start, end = _ts
start = min(max(0, start+start_ost*16), len(data))
end = min(max(0, end+end_ost*16), len(data))
start_end_info += ", from {} to {}".format(start, end)
res_audio = np.concatenate([res_audio, data[start+start_ost*16:end+end_ost*16]], -1)
srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index-1)
clip_srt += srt_clip
if len(ts):
message = "{} periods found in the speech: ".format(len(ts)) + start_end_info
else:
message = "No period found in the speech, return raw speech. You may check the recognition result and try other destination text."
res_audio = data
return (sr, res_audio), message, clip_srt
def video_recog(self, vedio_filename, sd_switch='no'):
vedio_filename = vedio_filename
clip_video_file = vedio_filename[:-4] + '_clip.mp4'
video = mpy.VideoFileClip(vedio_filename)
audio_file = vedio_filename[:-3] + 'wav'
video.audio.write_audiofile(audio_file)
wav = librosa.load(audio_file, sr=16000)[0]
state = {
'vedio_filename': vedio_filename,
'clip_video_file': clip_video_file,
'video': video,
}
# res_text, res_srt = self.recog((16000, wav), state)
return self.recog((16000, wav), sd_switch, state)
def video_clip(self, dest_text, start_ost, end_ost, state, font_size=32, font_color='white', add_sub=False, dest_spk=None):
# get from state
recog_res_raw = state['recog_res_raw']
timestamp = state['timestamp']
sentences = state['sentences']
video = state['video']
clip_video_file = state['clip_video_file']
vedio_filename = state['vedio_filename']
all_ts = []
srt_index = 0
if dest_spk is None or dest_spk == '' or 'sd_sentences' not in state:
for _dest_text in dest_text.split('#'):
_dest_text = pre_proc(_dest_text)
ts = proc(recog_res_raw, timestamp, _dest_text)
for _ts in ts: all_ts.append(_ts)
else:
for _dest_spk in dest_spk.split('#'):
ts = proc_spk(_dest_spk, state['sd_sentences'])
for _ts in ts: all_ts.append(_ts)
time_acc_ost = 0.0
ts = all_ts
ts.sort()
clip_srt = ""
if len(ts):
start, end = ts[0][0] / 16000, ts[0][1] / 16000
srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index, time_acc_ost=time_acc_ost)
start, end = start+start_ost/1000.0, end+end_ost/1000.0
video_clip = video.subclip(start, end)
start_end_info = "from {} to {}".format(start, end)
clip_srt += srt_clip
if add_sub:
generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color)
subtitles = SubtitlesClip(subs, generator)
video_clip = CompositeVideoClip([video_clip, subtitles.set_pos(('center','bottom'))])
concate_clip = [video_clip]
time_acc_ost += end+end_ost/1000.0 - (start+start_ost/1000.0)
for _ts in ts[1:]:
start, end = _ts[0] / 16000, _ts[1] / 16000
srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index-1, time_acc_ost=time_acc_ost)
start, end = start+start_ost/1000.0, end+end_ost/1000.0
_video_clip = video.subclip(start, end)
start_end_info += ", from {} to {}".format(start, end)
clip_srt += srt_clip
if add_sub:
generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color)
subtitles = SubtitlesClip(subs, generator)
_video_clip = CompositeVideoClip([_video_clip, subtitles.set_pos(('center','bottom'))])
concate_clip.append(copy.copy(_video_clip))
time_acc_ost += end+end_ost/1000.0 - (start+start_ost/1000.0)
message = "{} periods found in the audio: ".format(len(ts)) + start_end_info
logging.warning("Concating...")
if len(concate_clip) > 1:
video_clip = concatenate_videoclips(concate_clip)
video_clip.write_videofile(clip_video_file, audio_codec="aac")
else:
clip_video_file = vedio_filename
message = "No period found in the audio, return raw speech. You may check the recognition result and try other destination text."
srt_clip = ''
return clip_video_file, message, clip_srt