import json import os import shutil import subprocess import sys import time import math import cv2 import requests from pydub import AudioSegment import numpy as np from dotenv import load_dotenv import gradio as gr # Load environment variables from .env file load_dotenv(override=True) # Read API keys from environment variables OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") LEMONFOX_API_KEY = os.getenv("LEMONFOX_API_KEY") narration_api = "openai" def parse(narration): data = [] narrations = [] lines = narration.split("\n") for line in lines: if line.startswith('Narrator: '): text = line.replace('Narrator: ', '') data.append({ "type": "text", "content": text.strip('"'), }) narrations.append(text.strip('"')) elif line.startswith('['): background = line.strip('[]') data.append({ "type": "image", "description": background, }) return data, narrations def create(data, output_folder, voice="shimmer"): # Add voice parameter with default value if not os.path.exists(output_folder): os.makedirs(output_folder) n = 0 for element in data: if element["type"] != "text": continue n += 1 output_file = os.path.join(output_folder, f"narration_{n}.mp3") if narration_api == "openai": tts_url = 'https://api.openai.com/v1/audio/speech' headers = { 'Authorization': f'Bearer {OPENAI_API_KEY}', 'Content-Type': 'application/json' } payload = { "model": "tts-1", "input": element["content"], "voice": voice # Use the selected voice here } response = requests.post(tts_url, json=payload, headers=headers) if response.status_code == 200: with open(output_file, "wb") as f: f.write(response.content) else: print(f"Failed to generate audio for prompt: {element['content']}. Status Code: {response.status_code}") def generate(prompt, output_file, size="576x1024"): url = 'https://api.lemonfox.ai/v1/images/generations' headers = { 'Authorization': LEMONFOX_API_KEY, 'Content-Type': 'application/json' } data = { 'prompt': prompt, 'size': size, 'n': 1 } try: response = requests.post(url, json=data, headers=headers) if response.ok: response_data = response.json() if 'data' in response_data and len(response_data['data']) > 0: image_info = response_data['data'][0] image_url = image_info['url'] image_response = requests.get(image_url) with open(output_file, 'wb') as f: f.write(image_response.content) else: print(f"No image data found for prompt: {prompt}") else: print(f"Failed to generate image for prompt: {prompt}. Status Code: {response.status_code}") except Exception as e: print(f"Error occurred while processing prompt: {prompt}") print(str(e)) def create_from_data(data, output_dir): if not os.path.exists(output_dir): os.makedirs(output_dir) image_number = 0 for element in data: if element["type"] != "image": continue image_number += 1 image_name = f"image_{image_number}.webp" generate(element["description"], os.path.join(output_dir, image_name)) def get_audio_duration(audio_file): return len(AudioSegment.from_file(audio_file)) def resize_image(image, width, height): aspect_ratio = image.shape[1] / image.shape[0] if aspect_ratio > (width / height): new_width = width new_height = int(width / aspect_ratio) else: new_height = height new_width = int(height * aspect_ratio) return cv2.resize(image, (new_width, new_height)) def write_text(text, frame, video_writer): font = cv2.FONT_HERSHEY_SIMPLEX white_color = (255, 255, 255) black_color = (0, 0, 0) thickness = 10 font_scale = 3 border = 5 text_size = cv2.getTextSize(text, font, font_scale, thickness)[0] text_x = (frame.shape[1] - text_size[0]) // 2 text_y = (frame.shape[0] + text_size[1]) // 2 org = (text_x, text_y) frame = cv2.putText(frame, text, org, font, font_scale, black_color, thickness + border * 2, cv2.LINE_AA) frame = cv2.putText(frame, text, org, font, font_scale, white_color, thickness, cv2.LINE_AA) video_writer.write(frame) def add_narration_to_video(narrations, input_video, output_dir, output_file, text_color, text_position): offset = 50 cap = cv2.VideoCapture(input_video) temp_video = os.path.join(output_dir, "with_transcript.mp4") # Change file extension to MP4 out = cv2.VideoWriter(temp_video, cv2.VideoWriter_fourcc(*'mp4v'), 30, (int(cap.get(3)), int(cap.get(4)))) full_narration = AudioSegment.empty() for i, narration in enumerate(narrations): audio = os.path.join(output_dir, "narrations", f"narration_{i+1}.mp3") duration = get_audio_duration(audio) narration_frames = math.floor(duration / 1000 * 30) full_narration += AudioSegment.from_file(audio) char_count = len(narration.replace(" ", "")) ms_per_char = duration / char_count frames_written = 0 words = narration.split(" ") for w, word in enumerate(words): word_ms = len(word) * ms_per_char if i == 0 and w == 0: word_ms -= offset if word_ms < 0: word_ms = 0 for _ in range(math.floor(word_ms/1000*30)): ret, frame = cap.read() if not ret: break write_text(word, frame, out) frames_written += 1 for _ in range(narration_frames - frames_written): ret, frame = cap.read() out.write(frame) while out.isOpened(): ret, frame = cap.read() if not ret: break out.write(frame) temp_narration = os.path.join(output_dir, "narration.mp3") full_narration.export(temp_narration, format="mp3") cap.release() out.release() cv2.destroyAllWindows() ffmpeg_command = [ 'ffmpeg', '-y', '-i', temp_video, '-i', temp_narration, '-map', '0:v', '-map', '1:a', '-c:v', 'libx264', # Use H.264 codec '-c:a', 'aac', '-strict', 'experimental', os.path.join(output_dir, output_file) ] subprocess.run(ffmpeg_command, capture_output=True) os.remove(temp_video) os.remove(temp_narration) def create_video(narrations, output_dir, output_file, text_color, text_position): width, height = 1080, 1920 frame_rate = 30 fade_time = 1000 fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Change codec to MP4V temp_video = os.path.join(output_dir, "temp_video.mp4") # Change file extension to MP4 out = cv2.VideoWriter(temp_video, fourcc, frame_rate, (width, height)) image_paths = os.listdir(os.path.join(output_dir, "images")) image_count = len(image_paths) for i in range(image_count): image1 = cv2.imread(os.path.join(output_dir, "images", f"image_{i+1}.webp")) if i+1 < image_count: image2 = cv2.imread(os.path.join(output_dir, "images", f"image_{i+2}.webp")) else: image2 = cv2.imread(os.path.join(output_dir, "images", f"image_1.webp")) image1 = resize_image(image1, width, height) image2 = resize_image(image2, width, height) narration = os.path.join(output_dir, "narrations", f"narration_{i+1}.mp3") duration = get_audio_duration(narration) if i > 0: duration -= fade_time if i == image_count-1: duration -= fade_time for _ in range(math.floor(duration/1000*30)): vertical_video_frame = np.zeros((height, width, 3), dtype=np.uint8) vertical_video_frame[:image1.shape[0], :] = image1 out.write(vertical_video_frame) for alpha in np.linspace(0, 1, math.floor(fade_time/1000*30)): blended_image = cv2.addWeighted(image1, 1 - alpha, image2, alpha, 0) vertical_video_frame = np.zeros((height, width, 3), dtype=np.uint8) vertical_video_frame[:image1.shape[0], :] = blended_image out.write(vertical_video_frame) out.release() cv2.destroyAllWindows() add_narration_to_video(narrations, temp_video, output_dir, output_file, text_color, text_position) os.remove(temp_video) def generate_video(topic, voice="shimmer"): short_id = str(int(time.time())) basedir = os.path.join("shorts", short_id) if not os.path.exists(basedir): os.makedirs(basedir) filename = topic.replace("_", " ").replace("/", "_").replace(".", "_") output_file = f"{filename}.mp4" # Change file extension to MP4 chat_url = 'https://api.openai.com/v1/chat/completions' headers = { 'Authorization': f'Bearer {OPENAI_API_KEY}', 'Content-Type': 'application/json' } payload = { "model": "gpt-3.5-turbo", "messages": [ { "role": "system", "content": "You are a viral youTube short video creator." }, { "role": "user", "content": f"""Make a 60 second video on: \n\n{topic} and you will need to generate a very short description of images for each of the scenes. They will be used for background AI images. Note that the script will be fed into a text-to-speech engine, so dont use special characters. Respond with a pair of an image prompt in square brackets and a script below it. Both of them should be on their own lines, as follows: ### [Description of a background image] Narrator: "Sentence of narration" ###""" } ] } response = requests.post(chat_url, json=payload, headers=headers) if response.status_code == 200: response_text = response.json()['choices'][0]['message']['content'] response_text = response_text.replace("’", "'").replace("`", "'").replace("…", "...").replace("“", '"').replace("”", '"') with open(os.path.join(basedir, f"response.txt"), "a") as f: f.write(response_text + "\n") data, narrations = parse(response_text) with open(os.path.join(basedir, f"data.json"), "a") as f: json.dump(data, f, ensure_ascii=False) f.write("\n") print(f"Generating narration for: {topic}...") create(data, os.path.join(basedir, f"narrations"), voice=voice) print("Generating images...") create_from_data(data, os.path.join(basedir, f"images")) print("Generating video...") create_video(narrations, basedir, output_file, text_color="white", text_position="center") print("Deleting files and folders...") os.remove(os.path.join(basedir, "response.txt")) os.remove(os.path.join(basedir, "data.json")) shutil.rmtree(os.path.join(basedir, "narrations")) shutil.rmtree(os.path.join(basedir, "images")) print(f"DONE! Here's your video: {os.path.join(basedir, output_file)}") return os.path.join(basedir, output_file) else: print(f"Failed to generate script for source material: {topic}. Status Code: {response.status_code}") return None iface = gr.Interface( concurrency_limit=20, fn=generate_video, inputs=["text", gr.Dropdown(['alloy', 'shimmer', 'fable', 'onyx', 'nova', 'echo'], label="Select Voice")], outputs="video", css=".gradio-container {display: none}" ) iface.launch()