from fastapi import FastAPI, File, UploadFile, Form, WebSocket, Request from fastapi.responses import JSONResponse, HTMLResponse, FileResponse from fastapi.staticfiles import StaticFiles import os import shutil import cv2 import numpy as np from PIL import Image, ImageDraw, ImageFont from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip, ImageSequenceClip, ImageClip, concatenate_videoclips, concatenate_audioclips, vfx from moviepy.audio.fx.all import audio_loop import subprocess import uuid import asyncio from dotenv import load_dotenv from collections import deque from components import pexels, utils import json import asyncio import websockets from starlette.websockets import WebSocketDisconnect # Tải biến môi trường từ tệp .env load_dotenv() app = FastAPI() # Serve static files app.mount("/static", StaticFiles(directory="public"), name="static") # Hàng đợi để quản lý các tasks task_queue = asyncio.Queue() async def notify_client(task_id, message): async with websockets.connect("ws://localhost:8000/ws") as websocket: await websocket.send(json.dumps({"task_id": task_id, "message": message})) async def process_image_task(task): task_id = task['task_id'] task_folder = task['task_folder'] image_paths = task.get('image_paths', []) voice_off_path = task['voice_off_path'] background_music_path = task.get('background_music_path') clips = [] for path in image_paths: try: if path.lower().endswith(('.png', '.jpg', '.jpeg')): clip = ImageClip(path).set_duration(2) elif path.lower().endswith(('.mp4', '.avi', '.mov')): clip = VideoFileClip(path) if clip.duration > 7: clip = clip.subclip(0, 7).fx(vfx.fadein, 0.5).fx(vfx.fadeout, 0.5) else: clip = clip.fx(vfx.fadein, 0.5).fx(vfx.fadeout, 0.5) else: print(f"Skipping non-image and non-video file: {path}") continue clips.append(clip) except Exception as e: print(f"Error processing file {path}: {e}") continue if not clips: return JSONResponse({'message': 'Không có ảnh hoặc video hợp lệ để tạo video'}) final_clip = concatenate_videoclips(clips, method="compose") # Tính tổng thời lượng của các clip total_duration = sum([clip.duration for clip in clips]) # Xử lý voice off voice_off_clip = AudioFileClip(voice_off_path) if total_duration < voice_off_clip.duration: final_clip = final_clip.loop(duration=voice_off_clip.duration) elif total_duration > voice_off_clip.duration: final_clip = final_clip.subclip(0, voice_off_clip.duration) # Chèn âm thanh nền vào video final_audio = voice_off_clip if background_music_path: background_music_clip = AudioFileClip(background_music_path).volumex(0.3) if background_music_clip.duration < voice_off_clip.duration: background_music_clip = audio_loop(background_music_clip, duration=voice_off_clip.duration) elif background_music_clip.duration > voice_off_clip.duration: background_music_clip = background_music_clip.subclip(0, voice_off_clip.duration) final_audio = CompositeAudioClip([final_audio, background_music_clip]) final_clip = final_clip.set_audio(final_audio) final_output_path = os.path.join(task_folder, 'final_output.mp4') try: final_clip.write_videofile(final_output_path, codec='libx264', fps=16) # Xóa các tệp tạm sau khi hoàn thành shutil.rmtree(task_folder) return JSONResponse({'message': 'Video đã được tạo thành công', 'task_id': task_id}) except Exception as e: return JSONResponse({'message': 'Lỗi khi xử lý video', 'error': str(e)}) async def process_tasks(): while True: if not task_queue.empty(): task = await task_queue.get() task_id = task['task_id'] await notify_client(task_id, "Task đang được xử lý") if 'image_paths' in task: await process_image_task(task) else: await process_pexels_task(task) await notify_client(task_id, "Task đã hoàn thành") await asyncio.sleep(1) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: message = await websocket.receive_text() await websocket.send_text(message) except WebSocketDisconnect: print("WebSocket connection closed") @app.on_event("startup") async def startup_event(): asyncio.create_task(process_tasks()) @app.post("/create-video") async def create_video( imageFolder: list[UploadFile] = File(...), voiceOff: UploadFile = File(...), backgroundMusic: UploadFile = File(None) ): task_id = str(uuid.uuid4()) task_folder = os.path.join('tasks', task_id) os.makedirs(task_folder, exist_ok=True) temp_folder = os.path.join(task_folder, 'temp_images') os.makedirs(temp_folder, exist_ok=True) image_paths = [] for file in imageFolder: file_filename = os.path.basename(file.filename) file_path = os.path.join(temp_folder, file_filename) try: with open(file_path, "wb") as buffer: buffer.write(await file.read()) image_paths.append(file_path) except Exception as e: return JSONResponse({'message': 'Lỗi khi tải lên tệp', 'error': str(e)}) voice_off_path = os.path.join(task_folder, 'voice_off.mp3') try: with open(voice_off_path, "wb") as buffer: buffer.write(await voiceOff.read()) except Exception as e: return JSONResponse({'message': 'Lỗi khi tải lên tệp voice off', 'error': str(e)}) background_music_path = None if backgroundMusic: background_music_path = os.path.join(task_folder, 'background_music.mp3') try: with open(background_music_path, "wb") as buffer: buffer.write(await backgroundMusic.read()) except Exception as e: return JSONResponse({'message': 'Lỗi khi tải lên tệp nhạc nền', 'error': str(e)}) await task_queue.put({ 'task_id': task_id, 'task_folder': task_folder, 'image_paths': image_paths, 'voice_off_path': voice_off_path, 'background_music_path': background_music_path }) return JSONResponse({'message': 'Task đã được thêm vào hàng đợi', 'task_id': task_id}) async def process_pexels_task(task): task_id = task['task_id'] task_folder = task['task_folder'] image_paths = task.get('image_paths', []) voice_off_path = task['voice_off_path'] background_music_path = task.get('background_music_path') clips = [] for path in image_paths: try: clip = VideoFileClip(path) if clip.duration < 2: clip = clip.loop(duration=2) else: clip = clip.set_duration(2) clips.append(clip) except Exception as e: print(f"Error processing file {path}: {e}") continue if not clips: return JSONResponse({'message': 'Không có video hợp lệ để tạo video'}) final_clip = concatenate_videoclips(clips, method="compose") # Chèn âm thanh nền vào video try: voice_off_clip = AudioFileClip(voice_off_path) final_audio = voice_off_clip if background_music_path: background_music_clip = AudioFileClip(background_music_path).volumex(0.3) if background_music_clip.duration < voice_off_clip.duration: background_music_clip = audio_loop(background_music_clip, duration=voice_off_clip.duration) elif background_music_clip.duration > voice_off_clip.duration: background_music_clip = background_music_clip.subclip(0, voice_off_clip.duration) final_audio = CompositeAudioClip([final_audio, background_music_clip]) final_clip = final_clip.set_audio(final_audio) final_output_path = os.path.join(task_folder, 'final_output.mp4') final_clip.write_videofile(final_output_path, codec='libx264', fps=16) # Xóa các tệp tạm sau khi hoàn thành shutil.rmtree(task_folder) except Exception as e: return JSONResponse({'message': 'Lỗi khi xử lý video', 'error': str(e)}) @app.get("/") async def read_index(): return FileResponse("public/index.html") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)