Spaces:
Runtime error
Runtime error
import json | |
import os | |
import shutil | |
import subprocess | |
import sys | |
import time | |
import math | |
import cv2 | |
import requests | |
from pydub import AudioSegment | |
import numpy as np | |
from dotenv import load_dotenv | |
import gradio as gr | |
# Load environment variables from .env file | |
load_dotenv(override=True) | |
# Read API keys from environment variables | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
LEMONFOX_API_KEY = os.getenv("LEMONFOX_API_KEY") | |
narration_api = "openai" | |
def parse(narration): | |
data = [] | |
narrations = [] | |
lines = narration.split("\n") | |
for line in lines: | |
if line.startswith('Narrator: '): | |
text = line.replace('Narrator: ', '') | |
data.append({ | |
"type": "text", | |
"content": text.strip('"'), | |
}) | |
narrations.append(text.strip('"')) | |
elif line.startswith('['): | |
background = line.strip('[]') | |
data.append({ | |
"type": "image", | |
"description": background, | |
}) | |
return data, narrations | |
def create(data, output_folder, voice="shimmer"): # Add voice parameter with default value | |
if not os.path.exists(output_folder): | |
os.makedirs(output_folder) | |
n = 0 | |
for element in data: | |
if element["type"] != "text": | |
continue | |
n += 1 | |
output_file = os.path.join(output_folder, f"narration_{n}.mp3") | |
if narration_api == "openai": | |
tts_url = 'https://api.openai.com/v1/audio/speech' | |
headers = { | |
'Authorization': f'Bearer {OPENAI_API_KEY}', | |
'Content-Type': 'application/json' | |
} | |
payload = { | |
"model": "tts-1", | |
"input": element["content"], | |
"voice": voice # Use the selected voice here | |
} | |
response = requests.post(tts_url, json=payload, headers=headers) | |
if response.status_code == 200: | |
with open(output_file, "wb") as f: | |
f.write(response.content) | |
else: | |
print(f"Failed to generate audio for prompt: {element['content']}. Status Code: {response.status_code}") | |
def generate(prompt, output_file, size="576x1024"): | |
url = 'https://api.lemonfox.ai/v1/images/generations' | |
headers = { | |
'Authorization': LEMONFOX_API_KEY, | |
'Content-Type': 'application/json' | |
} | |
data = { | |
'prompt': prompt, | |
'size': size, | |
'n': 1 | |
} | |
try: | |
response = requests.post(url, json=data, headers=headers) | |
if response.ok: | |
response_data = response.json() | |
if 'data' in response_data and len(response_data['data']) > 0: | |
image_info = response_data['data'][0] | |
image_url = image_info['url'] | |
image_response = requests.get(image_url) | |
with open(output_file, 'wb') as f: | |
f.write(image_response.content) | |
else: | |
print(f"No image data found for prompt: {prompt}") | |
else: | |
print(f"Failed to generate image for prompt: {prompt}. Status Code: {response.status_code}") | |
except Exception as e: | |
print(f"Error occurred while processing prompt: {prompt}") | |
print(str(e)) | |
def create_from_data(data, output_dir): | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
image_number = 0 | |
for element in data: | |
if element["type"] != "image": | |
continue | |
image_number += 1 | |
image_name = f"image_{image_number}.webp" | |
generate(element["description"], os.path.join(output_dir, image_name)) | |
def get_audio_duration(audio_file): | |
return len(AudioSegment.from_file(audio_file)) | |
def resize_image(image, width, height): | |
aspect_ratio = image.shape[1] / image.shape[0] | |
if aspect_ratio > (width / height): | |
new_width = width | |
new_height = int(width / aspect_ratio) | |
else: | |
new_height = height | |
new_width = int(height * aspect_ratio) | |
return cv2.resize(image, (new_width, new_height)) | |
def write_text(text, frame, video_writer): | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
white_color = (255, 255, 255) | |
black_color = (0, 0, 0) | |
thickness = 10 | |
font_scale = 3 | |
border = 5 | |
text_size = cv2.getTextSize(text, font, font_scale, thickness)[0] | |
text_x = (frame.shape[1] - text_size[0]) // 2 | |
text_y = (frame.shape[0] + text_size[1]) // 2 | |
org = (text_x, text_y) | |
frame = cv2.putText(frame, text, org, font, font_scale, black_color, thickness + border * 2, cv2.LINE_AA) | |
frame = cv2.putText(frame, text, org, font, font_scale, white_color, thickness, cv2.LINE_AA) | |
video_writer.write(frame) | |
def add_narration_to_video(narrations, input_video, output_dir, output_file, text_color, text_position): | |
offset = 50 | |
cap = cv2.VideoCapture(input_video) | |
temp_video = os.path.join(output_dir, "with_transcript.mp4") # Change file extension to MP4 | |
out = cv2.VideoWriter(temp_video, cv2.VideoWriter_fourcc(*'mp4v'), 30, (int(cap.get(3)), int(cap.get(4)))) | |
full_narration = AudioSegment.empty() | |
for i, narration in enumerate(narrations): | |
audio = os.path.join(output_dir, "narrations", f"narration_{i+1}.mp3") | |
duration = get_audio_duration(audio) | |
narration_frames = math.floor(duration / 1000 * 30) | |
full_narration += AudioSegment.from_file(audio) | |
char_count = len(narration.replace(" ", "")) | |
ms_per_char = duration / char_count | |
frames_written = 0 | |
words = narration.split(" ") | |
for w, word in enumerate(words): | |
word_ms = len(word) * ms_per_char | |
if i == 0 and w == 0: | |
word_ms -= offset | |
if word_ms < 0: | |
word_ms = 0 | |
for _ in range(math.floor(word_ms/1000*30)): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
write_text(word, frame, out) | |
frames_written += 1 | |
for _ in range(narration_frames - frames_written): | |
ret, frame = cap.read() | |
out.write(frame) | |
while out.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
out.write(frame) | |
temp_narration = os.path.join(output_dir, "narration.mp3") | |
full_narration.export(temp_narration, format="mp3") | |
cap.release() | |
out.release() | |
cv2.destroyAllWindows() | |
ffmpeg_command = [ | |
'ffmpeg', | |
'-y', | |
'-i', temp_video, | |
'-i', temp_narration, | |
'-map', '0:v', | |
'-map', '1:a', | |
'-c:v', 'libx264', # Use H.264 codec | |
'-c:a', 'aac', | |
'-strict', 'experimental', | |
os.path.join(output_dir, output_file) | |
] | |
subprocess.run(ffmpeg_command, capture_output=True) | |
os.remove(temp_video) | |
os.remove(temp_narration) | |
def create_video(narrations, output_dir, output_file, text_color, text_position): | |
width, height = 1080, 1920 | |
frame_rate = 30 | |
fade_time = 1000 | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Change codec to MP4V | |
temp_video = os.path.join(output_dir, "temp_video.mp4") # Change file extension to MP4 | |
out = cv2.VideoWriter(temp_video, fourcc, frame_rate, (width, height)) | |
image_paths = os.listdir(os.path.join(output_dir, "images")) | |
image_count = len(image_paths) | |
for i in range(image_count): | |
image1 = cv2.imread(os.path.join(output_dir, "images", f"image_{i+1}.webp")) | |
if i+1 < image_count: | |
image2 = cv2.imread(os.path.join(output_dir, "images", f"image_{i+2}.webp")) | |
else: | |
image2 = cv2.imread(os.path.join(output_dir, "images", f"image_1.webp")) | |
image1 = resize_image(image1, width, height) | |
image2 = resize_image(image2, width, height) | |
narration = os.path.join(output_dir, "narrations", f"narration_{i+1}.mp3") | |
duration = get_audio_duration(narration) | |
if i > 0: | |
duration -= fade_time | |
if i == image_count-1: | |
duration -= fade_time | |
for _ in range(math.floor(duration/1000*30)): | |
vertical_video_frame = np.zeros((height, width, 3), dtype=np.uint8) | |
vertical_video_frame[:image1.shape[0], :] = image1 | |
out.write(vertical_video_frame) | |
for alpha in np.linspace(0, 1, math.floor(fade_time/1000*30)): | |
blended_image = cv2.addWeighted(image1, 1 - alpha, image2, alpha, 0) | |
vertical_video_frame = np.zeros((height, width, 3), dtype=np.uint8) | |
vertical_video_frame[:image1.shape[0], :] = blended_image | |
out.write(vertical_video_frame) | |
out.release() | |
cv2.destroyAllWindows() | |
add_narration_to_video(narrations, temp_video, output_dir, output_file, text_color, text_position) | |
os.remove(temp_video) | |
def generate_video(topic, voice="shimmer"): | |
short_id = str(int(time.time())) | |
basedir = os.path.join("shorts", short_id) | |
if not os.path.exists(basedir): | |
os.makedirs(basedir) | |
filename = topic.replace("_", " ").replace("/", "_").replace(".", "_") | |
output_file = f"{filename}.mp4" # Change file extension to MP4 | |
chat_url = 'https://api.openai.com/v1/chat/completions' | |
headers = { | |
'Authorization': f'Bearer {OPENAI_API_KEY}', | |
'Content-Type': 'application/json' | |
} | |
payload = { | |
"model": "gpt-3.5-turbo", | |
"messages": [ | |
{ | |
"role": "system", | |
"content": "You are a viral youTube short video creator." | |
}, | |
{ | |
"role": "user", | |
"content": f"""Make a 60 second video on: \n\n{topic} and you will need to generate a very short description of images for each of the scenes. They will be used for background AI images. Note that the script will be fed into a text-to-speech engine, so dont use special characters. Respond with a pair of an image prompt in square brackets and a script below it. Both of them should be on their own lines, as follows: | |
### | |
[Description of a background image] | |
Narrator: "Sentence of narration" | |
###""" | |
} | |
] | |
} | |
response = requests.post(chat_url, json=payload, headers=headers) | |
if response.status_code == 200: | |
response_text = response.json()['choices'][0]['message']['content'] | |
response_text = response_text.replace("’", "'").replace("`", "'").replace("…", "...").replace("“", '"').replace("”", '"') | |
with open(os.path.join(basedir, f"response.txt"), "a") as f: | |
f.write(response_text + "\n") | |
data, narrations = parse(response_text) | |
with open(os.path.join(basedir, f"data.json"), "a") as f: | |
json.dump(data, f, ensure_ascii=False) | |
f.write("\n") | |
print(f"Generating narration for: {topic}...") | |
create(data, os.path.join(basedir, f"narrations"), voice=voice) | |
print("Generating images...") | |
create_from_data(data, os.path.join(basedir, f"images")) | |
print("Generating video...") | |
create_video(narrations, basedir, output_file, text_color="white", text_position="center") | |
print("Deleting files and folders...") | |
os.remove(os.path.join(basedir, "response.txt")) | |
os.remove(os.path.join(basedir, "data.json")) | |
shutil.rmtree(os.path.join(basedir, "narrations")) | |
shutil.rmtree(os.path.join(basedir, "images")) | |
print(f"DONE! Here's your video: {os.path.join(basedir, output_file)}") | |
return os.path.join(basedir, output_file) | |
else: | |
print(f"Failed to generate script for source material: {topic}. Status Code: {response.status_code}") | |
return None | |
iface = gr.Interface( | |
concurrency_limit=20, | |
fn=generate_video, | |
inputs=["text", gr.Dropdown(['alloy', 'shimmer', 'fable', 'onyx', 'nova', 'echo'], label="Select Voice")], | |
outputs="video", | |
css=".gradio-container {display: none}" | |
) | |
iface.launch() |