ViDove / pipeline.py
JiaenLiu
small fix
147a645
raw
history blame
4.67 kB
import openai
from pytube import YouTube
import argparse
import os
import io
parser = argparse.ArgumentParser()
parser.add_argument("--link", help="youtube video link here", default=None, type=str, required=False)
parser.add_argument("--local_path", help="local video path here", default=None, type=str, required=False)
parser.add_argument("--text_file", help="text file path here", default=None, type=str, required=False) # New argument
parser.add_argument("--download", help="download path", default='./downloads', type=str, required=False)
parser.add_argument("--result", help="translate result path", default='./results', type=str, required=False)
parser.add_argument("--video_name", help="video name", default='placeholder', type=str, required=False)
parser.add_argument("--model_name", help="model name only support text-davinci-003 and gpt-3.5-turbo", type=str, required=False, default="gpt-3.5-turbo")
args = parser.parse_args()
if args.link is None and args.local_path is None and args.text_file is None:
print("need video source or text file")
exit()
# set openai api key
openai.api_key = os.getenv("OPENAI_API_KEY")
DOWNLOAD_PATH = args.download
RESULT_PATH = args.result
VIDEO_NAME = args.video_name
model_name = args.model_name
# get source audio
if args.link is not None and args.local_path is None:
# Download audio from YouTube
video_link = args.link
video = None
audio = None
try:
video = YouTube(video_link)
audio = video.streams.filter(only_audio=True, file_extension='mp4').first()
if audio:
audio.download(DOWNLOAD_PATH)
print('Download Completed!')
else:
print("Error: Audio stream not found")
except Exception as e:
print("Connection Error")
print(e)
if audio:
audio_file = open('{}/{}'.format(DOWNLOAD_PATH, audio.default_filename), "rb")
VIDEO_NAME = audio.default_filename.split('.')[0]
else:
print("Error: Unable to download audio from the YouTube video")
exit()
elif args.local_path is not None:
# Read from local
audio_file = open(args.local_path, "rb")
# Instead of using the script_en variable directly, we'll use script_input
if args.text_file is not None:
with open(args.text_file, 'r') as f:
script_input = f.read()
else:
# perform speech-to-text and save it in <video name>_en.txt under RESULT PATH.
if not os.path.exists("{}/{}_en.txt".format(RESULT_PATH, VIDEO_NAME)):
transcript = openai.Audio.transcribe("whisper-1", audio_file)
with open("{}/{}_en.txt".format(RESULT_PATH, VIDEO_NAME), 'w') as f:
f.write(transcript['text'])
# split the video script(open ai prompt limit: about 5000)
with open("{}/{}_en.txt".format(RESULT_PATH, VIDEO_NAME), 'r') as f:
script_en = f.read()
# N = len(script_en)
# script_split = script_en.split('.')
script_input = script_en
# Split the video script by sentences and create chunks within the token limit
n_threshold = 1500 # Token limit for the GPT-3 model
script_split = script_input.split('.')
script_arr = []
script = ""
for sentence in script_split:
if len(script) + len(sentence) + 1 <= n_threshold:
script += sentence + '.'
else:
script_arr.append(script.strip())
script = sentence + '.'
if script.strip():
script_arr.append(script.strip())
# Translate and save
for s in script_arr:
# using chatgpt model
if model_name == "gpt-3.5-turbo":
print(s + "\n")
response = openai.ChatCompletion.create(
model=model_name,
messages = [
{"role": "system", "content": "You are a helpful assistant that translates English to Chinese and have decent background in starcraft2."},
{"role": "user", "content": 'Translate the following English text to Chinese: "{}"'.format(s)}
],
temperature=0.15
)
with open(f"{RESULT_PATH}/{VIDEO_NAME}_zh.txt", 'a+') as f:
f.write(response['choices'][0]['message']['content'].strip())
# using davinci model
if model_name == "text-davinci-003":
prompt = f"Please help me translate this into Chinese:\n\n{s}\n\n"
print(prompt)
response = openai.Completion.create(
model=model_name,
prompt=prompt,
temperature=0.1,
max_tokens=2000,
top_p=1.0,
frequency_penalty=0.0,
presence_penalty=0.0
)
with open(f"{RESULT_PATH}/{VIDEO_NAME}_zh.txt", 'a+') as f:
f.write(response['choices'][0]['text'].strip())