import requests import json import time import subprocess import gradio as gr import uuid import os from dotenv import load_dotenv # Load environment variables load_dotenv() # API Keys OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") REPLICATE_API_TOKEN = os.getenv("REPLICATE_API_TOKEN") # URLs REPLICATE_API_URL = "https://api.replicate.com/v1/predictions" UPLOAD_URL = os.getenv("UPLOAD_URL") def get_voices(): # OpenAI TTS voices return [ ("alloy", "alloy"), ("echo", "echo"), ("fable", "fable"), ("onyx", "onyx"), ("nova", "nova"), ("shimmer", "shimmer") ] def text_to_speech(voice, text, session_id): url = "https://api.openai.com/v1/audio/speech" headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json" } data = { "model": "tts-1", "input": text, "voice": voice } response = requests.post(url, json=data, headers=headers) if response.status_code != 200: return None # Save temporary audio file with session ID audio_file_path = f'tempvoice{session_id}.mp3' with open(audio_file_path, 'wb') as audio_file: audio_file.write(response.content) return audio_file_path def upload_file(file_path): with open(file_path, 'rb') as file: files = {'fileToUpload': (os.path.basename(file_path), file)} data = {'reqtype': 'fileupload'} response = requests.post(UPLOAD_URL, files=files, data=data) if response.status_code == 200: return response.text.strip() return None def lipsync_api_call(video_url, audio_url): headers = { "Authorization": f"Bearer {REPLICATE_API_TOKEN}", "Content-Type": "application/json", "Prefer": "wait" } data = { "version": "db5a650c807b007dc5f9e5abe27c53e1b62880d1f94d218d27ce7fa802711d67", "input": { "face": video_url, "input_audio": audio_url } } response = requests.post(REPLICATE_API_URL, headers=headers, json=data) return response.json() def check_job_status(prediction_id): headers = {"Authorization": f"Bearer {REPLICATE_API_TOKEN}"} max_attempts = 30 # Limit the number of attempts for _ in range(max_attempts): response = requests.get(f"{REPLICATE_API_URL}/{prediction_id}", headers=headers) data = response.json() if data["status"] == "succeeded": return data["output"] elif data["status"] == "failed": return None time.sleep(10) return None def get_media_duration(file_path): # Fetch media duration using ffprobe cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return float(result.stdout.strip()) def combine_audio_video(video_path, audio_path, output_path): # Get durations of both video and audio video_duration = get_media_duration(video_path) audio_duration = get_media_duration(audio_path) if video_duration > audio_duration: # Trim video to match the audio length cmd = [ 'ffmpeg', '-i', video_path, '-i', audio_path, '-t', str(audio_duration), # Trim video to audio duration '-map', '0:v', '-map', '1:a', '-c:v', 'copy', '-c:a', 'aac', '-y', output_path ] else: # Loop video if it's shorter than audio loop_count = int(audio_duration // video_duration) + 1 # Calculate how many times to loop cmd = [ 'ffmpeg', '-stream_loop', str(loop_count), '-i', video_path, '-i', audio_path, '-t', str(audio_duration), # Match the duration of the final video with the audio '-map', '0:v', '-map', '1:a', '-c:v', 'copy', '-c:a', 'aac', '-shortest', '-y', output_path ] subprocess.run(cmd, check=True) def create_video_from_image(image_url, session_id): # Download the image response = requests.get(image_url) image_path = f"tempimage{session_id}.jpg" with open(image_path, "wb") as f: f.write(response.content) # Create a 10-second video from the image video_path = f"tempvideo{session_id}.mp4" cmd = [ 'ffmpeg', '-loop', '1', '-i', image_path, '-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2', # Ensure width and height are divisible by 2 '-c:v', 'libx264', '-t', '10', '-pix_fmt', 'yuv420p', video_path ] subprocess.run(cmd, check=True) # Clean up the temporary image file os.remove(image_path) return video_path def process_video(voice, url, text, progress=gr.Progress()): session_id = str(uuid.uuid4()) # Generate a unique session ID progress(0, desc="Generating speech...") audio_path = text_to_speech(voice, text, session_id) if not audio_path: return None, "Failed to generate speech audio." progress(0.2, desc="Processing media...") try: # Check if the URL is an image response = requests.head(url) content_type = response.headers.get('Content-Type', '') if content_type.startswith('image'): progress(0.3, desc="Converting image to video...") video_path = create_video_from_image(url, session_id) video_url = upload_file(video_path) else: video_url = url progress(0.4, desc="Uploading audio...") audio_url = upload_file(audio_path) if not audio_url or not video_url: raise Exception("Failed to upload audio or video file") progress(0.5, desc="Initiating lipsync...") job_data = lipsync_api_call(video_url, audio_url) if "error" in job_data: raise Exception(job_data.get("error", "Unknown error")) prediction_id = job_data["id"] progress(0.6, desc="Processing lipsync...") result_url = check_job_status(prediction_id) if result_url: progress(0.9, desc="Downloading result...") response = requests.get(result_url) output_path = f"output{session_id}.mp4" with open(output_path, "wb") as f: f.write(response.content) progress(1.0, desc="Complete!") return output_path, "Lipsync completed successfully!" else: raise Exception("Lipsync processing failed or timed out") except Exception as e: progress(0.8, desc="Falling back to simple combination...") try: if 'video_path' not in locals(): # Download the video from the URL if it wasn't created from an image video_response = requests.get(video_url) video_path = f"tempvideo{session_id}.mp4" with open(video_path, "wb") as f: f.write(video_response.content) output_path = f"output{session_id}.mp4" combine_audio_video(video_path, audio_path, output_path) progress(1.0, desc="Complete!") return output_path, f"Used fallback method. Original error: {str(e)}" except Exception as fallback_error: return None, f"All methods failed. Error: {str(fallback_error)}" finally: # Cleanup if os.path.exists(audio_path): os.remove(audio_path) if os.path.exists(f"tempvideo{session_id}.mp4"): os.remove(f"tempvideo{session_id}.mp4") def create_interface(): voices = get_voices() with gr.Blocks() as app: gr.Markdown("# Generator") with gr.Row(): with gr.Column(): voice_dropdown = gr.Dropdown(choices=[v[0] for v in voices], label="Select Voice", value=voices[0][0] if voices else None) url_input = gr.Textbox(label="Enter Video or Image URL") text_input = gr.Textbox(label="Enter text", lines=3) generate_btn = gr.Button("Generate Video") with gr.Column(): video_output = gr.Video(label="Generated Video") status_output = gr.Textbox(label="Status", interactive=False) def on_generate(voice_name, url, text): voice_id = next((v[1] for v in voices if v[0] == voice_name), None) if not voice_id: return None, "Invalid voice selected." return process_video(voice_id, url, text) generate_btn.click( fn=on_generate, inputs=[voice_dropdown, url_input, text_input], outputs=[video_output, status_output] ) return app if __name__ == "__main__": app = create_interface() app.launch()