import sys import copy import librosa import logging import argparse import numpy as np import soundfile as sf import moviepy.editor as mpy # from modelscope.pipelines import pipeline # from modelscope.utils.constant import Tasks from subtitle_utils import generate_srt, generate_srt_clip, distribute_spk from trans_utils import pre_proc, proc, write_state, load_state, proc_spk, generate_vad_data # from argparse_tools import ArgumentParser, get_commandline_args from moviepy.editor import * from moviepy.video.tools.subtitles import SubtitlesClip class VideoClipper(): def __init__(self, asr_pipeline, sd_pipeline=None): logging.warning("Initializing VideoClipper.") self.asr_pipeline = asr_pipeline self.sd_pipeline = sd_pipeline def recog(self, audio_input, sd_switch='no', state=None): if state is None: state = {} sr, data = audio_input assert sr == 16000, "16kHz sample rate required, {} given.".format(sr) if len(data.shape) == 2: # multi-channel wav input logging.warning("Input wav shape: {}, only first channel reserved.").format(data.shape) data = data[:,0] state['audio_input'] = (sr, data) data = data.astype(np.float64) rec_result = self.asr_pipeline(audio_in=data) if sd_switch == 'yes': vad_data = generate_vad_data(data.astype(np.float32), rec_result['sentences'], sr) sd_result = self.sd_pipeline(audio=vad_data, batch_size=1) rec_result['sd_sentences'] = distribute_spk(rec_result['sentences'], sd_result['text']) res_srt = generate_srt(rec_result['sd_sentences']) state['sd_sentences'] = rec_result['sd_sentences'] else: res_srt = generate_srt(rec_result['sentences']) state['recog_res_raw'] = rec_result['text_postprocessed'] state['timestamp'] = rec_result['time_stamp'] state['sentences'] = rec_result['sentences'] res_text = rec_result['text'] return res_text, res_srt, state def clip(self, dest_text, start_ost, end_ost, state, dest_spk=None): # get from state audio_input = state['audio_input'] recog_res_raw = state['recog_res_raw'] timestamp = state['timestamp'] sentences = state['sentences'] sr, data = audio_input data = data.astype(np.float64) all_ts = [] if dest_spk is None or dest_spk == '' or 'sd_sentences' not in state: for _dest_text in dest_text.split('#'): _dest_text = pre_proc(_dest_text) ts = proc(recog_res_raw, timestamp, _dest_text) for _ts in ts: all_ts.append(_ts) else: for _dest_spk in dest_spk.split('#'): ts = proc_spk(_dest_spk, state['sd_sentences']) for _ts in ts: all_ts.append(_ts) ts = all_ts ts.sort() srt_index = 0 clip_srt = "" if len(ts): start, end = ts[0] start = min(max(0, start+start_ost*16), len(data)) end = min(max(0, end+end_ost*16), len(data)) res_audio = data[start:end] start_end_info = "from {} to {}".format(start/16000, end/16000) srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index) clip_srt += srt_clip for _ts in ts[1:]: # multiple sentence input or multiple output matched start, end = _ts start = min(max(0, start+start_ost*16), len(data)) end = min(max(0, end+end_ost*16), len(data)) start_end_info += ", from {} to {}".format(start, end) res_audio = np.concatenate([res_audio, data[start+start_ost*16:end+end_ost*16]], -1) srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index-1) clip_srt += srt_clip if len(ts): message = "{} periods found in the speech: ".format(len(ts)) + start_end_info else: message = "No period found in the speech, return raw speech. You may check the recognition result and try other destination text." res_audio = data return (sr, res_audio), message, clip_srt def video_recog(self, vedio_filename, sd_switch='no'): vedio_filename = vedio_filename clip_video_file = vedio_filename[:-4] + '_clip.mp4' video = mpy.VideoFileClip(vedio_filename) audio_file = vedio_filename[:-3] + 'wav' video.audio.write_audiofile(audio_file) wav = librosa.load(audio_file, sr=16000)[0] state = { 'vedio_filename': vedio_filename, 'clip_video_file': clip_video_file, 'video': video, } # res_text, res_srt = self.recog((16000, wav), state) return self.recog((16000, wav), sd_switch, state) def video_clip(self, dest_text, start_ost, end_ost, state, font_size=32, font_color='white', add_sub=False, dest_spk=None): # get from state recog_res_raw = state['recog_res_raw'] timestamp = state['timestamp'] sentences = state['sentences'] video = state['video'] clip_video_file = state['clip_video_file'] vedio_filename = state['vedio_filename'] all_ts = [] srt_index = 0 if dest_spk is None or dest_spk == '' or 'sd_sentences' not in state: for _dest_text in dest_text.split('#'): _dest_text = pre_proc(_dest_text) ts = proc(recog_res_raw, timestamp, _dest_text) for _ts in ts: all_ts.append(_ts) else: for _dest_spk in dest_spk.split('#'): ts = proc_spk(_dest_spk, state['sd_sentences']) for _ts in ts: all_ts.append(_ts) time_acc_ost = 0.0 ts = all_ts ts.sort() clip_srt = "" if len(ts): start, end = ts[0][0] / 16000, ts[0][1] / 16000 srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index, time_acc_ost=time_acc_ost) start, end = start+start_ost/1000.0, end+end_ost/1000.0 video_clip = video.subclip(start, end) start_end_info = "from {} to {}".format(start, end) clip_srt += srt_clip if add_sub: generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color) subtitles = SubtitlesClip(subs, generator) video_clip = CompositeVideoClip([video_clip, subtitles.set_pos(('center','bottom'))]) concate_clip = [video_clip] time_acc_ost += end+end_ost/1000.0 - (start+start_ost/1000.0) for _ts in ts[1:]: start, end = _ts[0] / 16000, _ts[1] / 16000 srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index-1, time_acc_ost=time_acc_ost) start, end = start+start_ost/1000.0, end+end_ost/1000.0 _video_clip = video.subclip(start, end) start_end_info += ", from {} to {}".format(start, end) clip_srt += srt_clip if add_sub: generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color) subtitles = SubtitlesClip(subs, generator) _video_clip = CompositeVideoClip([_video_clip, subtitles.set_pos(('center','bottom'))]) concate_clip.append(copy.copy(_video_clip)) time_acc_ost += end+end_ost/1000.0 - (start+start_ost/1000.0) message = "{} periods found in the audio: ".format(len(ts)) + start_end_info logging.warning("Concating...") if len(concate_clip) > 1: video_clip = concatenate_videoclips(concate_clip) video_clip.write_videofile(clip_video_file, audio_codec="aac") else: clip_video_file = vedio_filename message = "No period found in the audio, return raw speech. You may check the recognition result and try other destination text." srt_clip = '' return clip_video_file, message, clip_srt