|
from fastapi import FastAPI, File, UploadFile, Form, WebSocket, Request
|
|
from fastapi.responses import JSONResponse, HTMLResponse, FileResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
import os
|
|
import shutil
|
|
import cv2
|
|
import numpy as np
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip, ImageSequenceClip, ImageClip, concatenate_videoclips, concatenate_audioclips, vfx
|
|
from moviepy.audio.fx.all import audio_loop
|
|
import subprocess
|
|
import uuid
|
|
import asyncio
|
|
from dotenv import load_dotenv
|
|
from collections import deque
|
|
from components import pexels, utils
|
|
import json
|
|
import asyncio
|
|
import websockets
|
|
from starlette.websockets import WebSocketDisconnect
|
|
|
|
|
|
load_dotenv()
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
app.mount("/static", StaticFiles(directory="public"), name="static")
|
|
|
|
|
|
task_queue = asyncio.Queue()
|
|
|
|
async def notify_client(task_id, message):
|
|
async with websockets.connect("ws://localhost:8000/ws") as websocket:
|
|
await websocket.send(json.dumps({"task_id": task_id, "message": message}))
|
|
|
|
async def process_image_task(task):
|
|
task_id = task['task_id']
|
|
task_folder = task['task_folder']
|
|
image_paths = task.get('image_paths', [])
|
|
voice_off_path = task['voice_off_path']
|
|
background_music_path = task.get('background_music_path')
|
|
|
|
clips = []
|
|
for path in image_paths:
|
|
try:
|
|
if path.lower().endswith(('.png', '.jpg', '.jpeg')):
|
|
clip = ImageClip(path).set_duration(2)
|
|
elif path.lower().endswith(('.mp4', '.avi', '.mov')):
|
|
clip = VideoFileClip(path)
|
|
if clip.duration > 7:
|
|
clip = clip.subclip(0, 7).fx(vfx.fadein, 0.5).fx(vfx.fadeout, 0.5)
|
|
else:
|
|
clip = clip.fx(vfx.fadein, 0.5).fx(vfx.fadeout, 0.5)
|
|
else:
|
|
print(f"Skipping non-image and non-video file: {path}")
|
|
continue
|
|
|
|
clips.append(clip)
|
|
except Exception as e:
|
|
print(f"Error processing file {path}: {e}")
|
|
continue
|
|
|
|
if not clips:
|
|
return JSONResponse({'message': 'Không có ảnh hoặc video hợp lệ để tạo video'})
|
|
|
|
final_clip = concatenate_videoclips(clips, method="compose")
|
|
|
|
|
|
total_duration = sum([clip.duration for clip in clips])
|
|
|
|
|
|
voice_off_clip = AudioFileClip(voice_off_path)
|
|
if total_duration < voice_off_clip.duration:
|
|
final_clip = final_clip.loop(duration=voice_off_clip.duration)
|
|
elif total_duration > voice_off_clip.duration:
|
|
final_clip = final_clip.subclip(0, voice_off_clip.duration)
|
|
|
|
|
|
final_audio = voice_off_clip
|
|
if background_music_path:
|
|
background_music_clip = AudioFileClip(background_music_path).volumex(0.3)
|
|
if background_music_clip.duration < voice_off_clip.duration:
|
|
background_music_clip = audio_loop(background_music_clip, duration=voice_off_clip.duration)
|
|
elif background_music_clip.duration > voice_off_clip.duration:
|
|
background_music_clip = background_music_clip.subclip(0, voice_off_clip.duration)
|
|
final_audio = CompositeAudioClip([final_audio, background_music_clip])
|
|
|
|
final_clip = final_clip.set_audio(final_audio)
|
|
final_output_path = os.path.join(task_folder, 'final_output.mp4')
|
|
try:
|
|
final_clip.write_videofile(final_output_path, codec='libx264', fps=16)
|
|
|
|
|
|
shutil.rmtree(task_folder)
|
|
|
|
return JSONResponse({'message': 'Video đã được tạo thành công', 'task_id': task_id})
|
|
except Exception as e:
|
|
return JSONResponse({'message': 'Lỗi khi xử lý video', 'error': str(e)})
|
|
|
|
async def process_tasks():
|
|
while True:
|
|
if not task_queue.empty():
|
|
task = await task_queue.get()
|
|
task_id = task['task_id']
|
|
await notify_client(task_id, "Task đang được xử lý")
|
|
if 'image_paths' in task:
|
|
await process_image_task(task)
|
|
else:
|
|
await process_pexels_task(task)
|
|
await notify_client(task_id, "Task đã hoàn thành")
|
|
await asyncio.sleep(1)
|
|
|
|
@app.websocket("/ws")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
await websocket.accept()
|
|
try:
|
|
while True:
|
|
message = await websocket.receive_text()
|
|
await websocket.send_text(message)
|
|
except WebSocketDisconnect:
|
|
print("WebSocket connection closed")
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
asyncio.create_task(process_tasks())
|
|
|
|
@app.post("/create-video")
|
|
async def create_video(
|
|
imageFolder: list[UploadFile] = File(...),
|
|
voiceOff: UploadFile = File(...),
|
|
backgroundMusic: UploadFile = File(None)
|
|
):
|
|
task_id = str(uuid.uuid4())
|
|
task_folder = os.path.join('tasks', task_id)
|
|
os.makedirs(task_folder, exist_ok=True)
|
|
|
|
temp_folder = os.path.join(task_folder, 'temp_images')
|
|
os.makedirs(temp_folder, exist_ok=True)
|
|
|
|
image_paths = []
|
|
for file in imageFolder:
|
|
file_filename = os.path.basename(file.filename)
|
|
file_path = os.path.join(temp_folder, file_filename)
|
|
try:
|
|
with open(file_path, "wb") as buffer:
|
|
buffer.write(await file.read())
|
|
image_paths.append(file_path)
|
|
except Exception as e:
|
|
return JSONResponse({'message': 'Lỗi khi tải lên tệp', 'error': str(e)})
|
|
|
|
voice_off_path = os.path.join(task_folder, 'voice_off.mp3')
|
|
try:
|
|
with open(voice_off_path, "wb") as buffer:
|
|
buffer.write(await voiceOff.read())
|
|
except Exception as e:
|
|
return JSONResponse({'message': 'Lỗi khi tải lên tệp voice off', 'error': str(e)})
|
|
|
|
background_music_path = None
|
|
if backgroundMusic:
|
|
background_music_path = os.path.join(task_folder, 'background_music.mp3')
|
|
try:
|
|
with open(background_music_path, "wb") as buffer:
|
|
buffer.write(await backgroundMusic.read())
|
|
except Exception as e:
|
|
return JSONResponse({'message': 'Lỗi khi tải lên tệp nhạc nền', 'error': str(e)})
|
|
|
|
await task_queue.put({
|
|
'task_id': task_id,
|
|
'task_folder': task_folder,
|
|
'image_paths': image_paths,
|
|
'voice_off_path': voice_off_path,
|
|
'background_music_path': background_music_path
|
|
})
|
|
|
|
return JSONResponse({'message': 'Task đã được thêm vào hàng đợi', 'task_id': task_id})
|
|
|
|
async def process_pexels_task(task):
|
|
task_id = task['task_id']
|
|
task_folder = task['task_folder']
|
|
image_paths = task.get('image_paths', [])
|
|
voice_off_path = task['voice_off_path']
|
|
background_music_path = task.get('background_music_path')
|
|
|
|
clips = []
|
|
for path in image_paths:
|
|
try:
|
|
clip = VideoFileClip(path)
|
|
if clip.duration < 2:
|
|
clip = clip.loop(duration=2)
|
|
else:
|
|
clip = clip.set_duration(2)
|
|
clips.append(clip)
|
|
except Exception as e:
|
|
print(f"Error processing file {path}: {e}")
|
|
continue
|
|
|
|
if not clips:
|
|
return JSONResponse({'message': 'Không có video hợp lệ để tạo video'})
|
|
|
|
final_clip = concatenate_videoclips(clips, method="compose")
|
|
|
|
|
|
try:
|
|
voice_off_clip = AudioFileClip(voice_off_path)
|
|
final_audio = voice_off_clip
|
|
if background_music_path:
|
|
background_music_clip = AudioFileClip(background_music_path).volumex(0.3)
|
|
if background_music_clip.duration < voice_off_clip.duration:
|
|
background_music_clip = audio_loop(background_music_clip, duration=voice_off_clip.duration)
|
|
elif background_music_clip.duration > voice_off_clip.duration:
|
|
background_music_clip = background_music_clip.subclip(0, voice_off_clip.duration)
|
|
final_audio = CompositeAudioClip([final_audio, background_music_clip])
|
|
final_clip = final_clip.set_audio(final_audio)
|
|
final_output_path = os.path.join(task_folder, 'final_output.mp4')
|
|
final_clip.write_videofile(final_output_path, codec='libx264', fps=16)
|
|
|
|
|
|
shutil.rmtree(task_folder)
|
|
|
|
except Exception as e:
|
|
return JSONResponse({'message': 'Lỗi khi xử lý video', 'error': str(e)})
|
|
|
|
@app.get("/")
|
|
async def read_index():
|
|
return FileResponse("public/index.html")
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000) |