Spaces:
Sleeping
Sleeping
import threading | |
import time | |
import openai | |
from pytube import YouTube | |
from os import getenv | |
from pathlib import Path | |
from enum import Enum, auto | |
import logging | |
import subprocess | |
from src.srt_util.srt import SrtScript | |
from src.srt_util.srt2ass import srt2ass | |
""" | |
Youtube link | |
- link | |
- model | |
- output type | |
Video file | |
- path | |
- model | |
- output type | |
Audio file | |
- path | |
- model | |
- output type | |
""" | |
""" | |
TaskID | |
Progress: Enum | |
Computing resrouce status | |
SRT_Script : SrtScript | |
- input module -> initialize (ASR module) | |
- Pre-process | |
- Translation (%) | |
- Post process (time stamp) | |
- Output module: SRT_Script --> output(.srt) | |
- (Optional) mp4 | |
""" | |
class TaskStatus(str, Enum): | |
CREATED = 'CREATED' | |
INITIALIZING_ASR = 'INITIALIZING_ASR' | |
PRE_PROCESSING = 'PRE_PROCESSING' | |
TRANSLATING = 'TRANSLATING' | |
POST_PROCESSING = 'POST_PROCESSING' | |
OUTPUT_MODULE = 'OUTPUT_MODULE' | |
class Task: | |
def status(self): | |
with self.__status_lock: | |
return self.__status | |
def status(self, new_status): | |
with self.__status_lock: | |
self.__status = new_status | |
def __init__(self, task_id, task_local_dir, task_cfg): | |
self.__status_lock = threading.Lock() | |
self.__status = TaskStatus.CREATED | |
openai.api_key = getenv("OPENAI_API_KEY") | |
self.launch_info = task_cfg # do not use, just for fallback | |
self.task_local_dir = task_local_dir | |
self.model = task_cfg["model"] | |
self.gpu_status = 0 | |
self.output_type = task_cfg["output_type"] | |
self.target_lang = task_cfg["target_lang"] | |
self.source_lang = task_cfg["source_lang"] | |
self.field = task_cfg["field"] | |
self.task_id = task_id | |
self.audio_path = None | |
self.SRT_Script = None | |
self.result = None | |
print(f" Task ID: {self.task_id}") | |
logging.info(f" Task ID: {self.task_id}") | |
logging.info(f" {self.source_lang} -> {self.target_lang} task in {self.field}") | |
logging.info(f" Model: {self.model}") | |
logging.info(f" subtitle_type: {self.output_type['subtitle']}") | |
logging.info(f" video_ouput: \t{self.output_type['video']}") | |
logging.info(f" bilingal_ouput: \t{self.output_type['bilingal']}") | |
def fromYoutubeLink(youtube_url, task_id, task_dir, task_cfg): | |
# convert to audio | |
logging.info("Task Creation method: Youtube Link") | |
return YoutubeTask(task_id, task_dir, task_cfg, youtube_url) | |
def fromAudioFile(audio_path, task_id, task_dir, task_cfg): | |
# get audio path | |
logging.info("Task Creation method: Audio File") | |
return AudioTask(task_id, task_dir, task_cfg, audio_path) | |
def fromVideoFile(video_path, task_id, task_dir, task_cfg): | |
# get audio path | |
logging.info("Task Creation method: Video File") | |
return VideoTask(task_id, task_dir, task_cfg, video_path) | |
# Module 1 ASR: audio --> SRT_script | |
def get_srt_class(self, whisper_model='tiny', method="stable"): | |
# Instead of using the script_en variable directly, we'll use script_input | |
self.status = TaskStatus.INITIALIZING_ASR | |
self.SRT_Script = SrtScript | |
time.sleep(5) | |
pass | |
# Module 2: SRT preprocess: perform preprocess steps | |
# TODO: multi-lang and multi-field support according to task_cfg | |
def preprocess(self): | |
self.status = TaskStatus.PRE_PROCESSING | |
logging.info("--------------------Start Preprocessing SRT class--------------------") | |
self.SRT_Script.form_whole_sentence() | |
# self.SRT_Script.spell_check_term() | |
self.SRT_Script.correct_with_force_term() | |
processed_srt_path_en = str(Path(self.task_local_dir).with_suffix('')) + '_processed.srt' | |
self.SRT_Script.write_srt_file_src(processed_srt_path_en) | |
if self.output_type["subtitle"] == "ass": | |
logging.info("write English .srt file to .ass") | |
assSub_en = srt2ass(processed_srt_path_en) | |
logging.info('ASS subtitle saved as: ' + assSub_en) | |
self.script_input = self.SRT_Script.get_source_only() | |
pass | |
def update_translation_progress(self, new_progress): | |
if self.progress == TaskStatus.TRANSLATING: | |
self.progress = TaskStatus.TRANSLATING.value[0], new_progress | |
time.sleep(5) | |
# Module 3: perform srt translation | |
def translation(self): | |
time.sleep(5) | |
pass | |
# Module 4: perform srt post process steps | |
def postprocess(self): | |
self.status = TaskStatus.POST_PROCESSING | |
time.sleep(5) | |
pass | |
# Module 5: output module | |
def output_render(self): | |
self.status = TaskStatus.OUTPUT_MODULE | |
return "TODO" | |
def run_pipeline(self): | |
self.get_srt_class() | |
self.preprocess() | |
self.translation() | |
self.postprocess() | |
self.result = self.output_render() | |
class YoutubeTask(Task): | |
def __init__(self, task_id, task_local_dir, task_cfg, youtube_url): | |
super().__init__(task_id, task_local_dir, task_cfg) | |
self.youtube_url = youtube_url | |
def run(self): | |
yt = YouTube(self.youtube_url) | |
video = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first() | |
if video: | |
video.download(str(self.task_local_dir), filename=f"task_{self.task_id}.mp4") | |
logging.info(f'Video Name: {video.default_filename}') | |
logging.info(f'Video download completed to {self.task_local_dir}!') | |
else: | |
raise FileNotFoundError(f"Video stream not found for link {self.youtube_url}") | |
audio = yt.streams.filter(only_audio=True).first() | |
if audio: | |
audio.download(str(self.task_local_dir), filename=f"task_{self.task_id}.mp3") | |
logging.info(f'Audio download completed to {self.task_local_dir}!') | |
else: | |
logging.info("download audio failed, using ffmpeg to extract audio") | |
subprocess.run( | |
['ffmpeg', '-i', self.task_local_dir.joinpath(f"task_{self.task_id}.mp4"), '-f', 'mp3', | |
'-ab', '192000', '-vn', self.task_local_dir.joinpath(f"task_{self.task_id}.mp3")]) | |
logging.info("audio extraction finished") | |
self.video_path = self.task_local_dir.joinpath(f"task_{self.task_id}.mp4") | |
self.audio_path = self.task_local_dir.joinpath(f"task_{self.task_id}.mp3") | |
logging.info("Data Prep Complete. Start pipeline") | |
super().run_pipeline() | |
class AudioTask(Task): | |
def __init__(self, task_id, task_local_dir, task_cfg, audio_path): | |
super().__init__(task_id, task_local_dir, task_cfg) | |
# TODO: check audio format | |
self.audio_path = audio_path | |
self.video_path = None | |
def run(self): | |
logging.info("Data Prep Complete. Start pipeline") | |
super().run_pipeline() | |
class VideoTask(Task): | |
def __init__(self, task_id, task_local_dir, task_cfg, video_path): | |
super().__init__(task_id, task_local_dir, task_cfg) | |
# TODO: check video format {.mp4} | |
self.video_path = video_path | |
def run(self): | |
logging.info("using ffmpeg to extract audio") | |
subprocess.run( | |
['ffmpeg', '-i', self.video_path, '-f', 'mp3', | |
'-ab', '192000', '-vn', self.task_local_dir.joinpath(f"task_{self.task_id}.mp3")]) | |
logging.info("audio extraction finished") | |
self.audio_path = self.task_local_dir.joinpath(f"task_{self.task_id}.mp3") | |
logging.info("Data Prep Complete. Start pipeline") | |
super().run_pipeline() |