tdnm2 / app.py
TDN-M's picture
Upload 4 files
392cc4b verified
from fastapi import FastAPI, File, UploadFile, Form, WebSocket, Request
from fastapi.responses import JSONResponse, HTMLResponse, FileResponse
from fastapi.staticfiles import StaticFiles
import os
import shutil
import cv2
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip, ImageSequenceClip, ImageClip, concatenate_videoclips, concatenate_audioclips, vfx
from moviepy.audio.fx.all import audio_loop
import subprocess
import uuid
import asyncio
from dotenv import load_dotenv
from collections import deque
from components import pexels, utils
import json
import asyncio
import websockets
from starlette.websockets import WebSocketDisconnect
# Tải biến môi trường từ tệp .env
load_dotenv()
app = FastAPI()
# Serve static files
app.mount("/static", StaticFiles(directory="public"), name="static")
# Hàng đợi để quản lý các tasks
task_queue = asyncio.Queue()
async def notify_client(task_id, message):
async with websockets.connect("ws://localhost:8000/ws") as websocket:
await websocket.send(json.dumps({"task_id": task_id, "message": message}))
async def process_image_task(task):
task_id = task['task_id']
task_folder = task['task_folder']
image_paths = task.get('image_paths', [])
voice_off_path = task['voice_off_path']
background_music_path = task.get('background_music_path')
clips = []
for path in image_paths:
try:
if path.lower().endswith(('.png', '.jpg', '.jpeg')):
clip = ImageClip(path).set_duration(2)
elif path.lower().endswith(('.mp4', '.avi', '.mov')):
clip = VideoFileClip(path)
if clip.duration > 7:
clip = clip.subclip(0, 7).fx(vfx.fadein, 0.5).fx(vfx.fadeout, 0.5)
else:
clip = clip.fx(vfx.fadein, 0.5).fx(vfx.fadeout, 0.5)
else:
print(f"Skipping non-image and non-video file: {path}")
continue
clips.append(clip)
except Exception as e:
print(f"Error processing file {path}: {e}")
continue
if not clips:
return JSONResponse({'message': 'Không có ảnh hoặc video hợp lệ để tạo video'})
final_clip = concatenate_videoclips(clips, method="compose")
# Tính tổng thời lượng của các clip
total_duration = sum([clip.duration for clip in clips])
# Xử lý voice off
voice_off_clip = AudioFileClip(voice_off_path)
if total_duration < voice_off_clip.duration:
final_clip = final_clip.loop(duration=voice_off_clip.duration)
elif total_duration > voice_off_clip.duration:
final_clip = final_clip.subclip(0, voice_off_clip.duration)
# Chèn âm thanh nền vào video
final_audio = voice_off_clip
if background_music_path:
background_music_clip = AudioFileClip(background_music_path).volumex(0.3)
if background_music_clip.duration < voice_off_clip.duration:
background_music_clip = audio_loop(background_music_clip, duration=voice_off_clip.duration)
elif background_music_clip.duration > voice_off_clip.duration:
background_music_clip = background_music_clip.subclip(0, voice_off_clip.duration)
final_audio = CompositeAudioClip([final_audio, background_music_clip])
final_clip = final_clip.set_audio(final_audio)
final_output_path = os.path.join(task_folder, 'final_output.mp4')
try:
final_clip.write_videofile(final_output_path, codec='libx264', fps=16)
# Xóa các tệp tạm sau khi hoàn thành
shutil.rmtree(task_folder)
return JSONResponse({'message': 'Video đã được tạo thành công', 'task_id': task_id})
except Exception as e:
return JSONResponse({'message': 'Lỗi khi xử lý video', 'error': str(e)})
async def process_tasks():
while True:
if not task_queue.empty():
task = await task_queue.get()
task_id = task['task_id']
await notify_client(task_id, "Task đang được xử lý")
if 'image_paths' in task:
await process_image_task(task)
else:
await process_pexels_task(task)
await notify_client(task_id, "Task đã hoàn thành")
await asyncio.sleep(1)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
message = await websocket.receive_text()
await websocket.send_text(message)
except WebSocketDisconnect:
print("WebSocket connection closed")
@app.on_event("startup")
async def startup_event():
asyncio.create_task(process_tasks())
@app.post("/create-video")
async def create_video(
imageFolder: list[UploadFile] = File(...),
voiceOff: UploadFile = File(...),
backgroundMusic: UploadFile = File(None)
):
task_id = str(uuid.uuid4())
task_folder = os.path.join('tasks', task_id)
os.makedirs(task_folder, exist_ok=True)
temp_folder = os.path.join(task_folder, 'temp_images')
os.makedirs(temp_folder, exist_ok=True)
image_paths = []
for file in imageFolder:
file_filename = os.path.basename(file.filename)
file_path = os.path.join(temp_folder, file_filename)
try:
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
image_paths.append(file_path)
except Exception as e:
return JSONResponse({'message': 'Lỗi khi tải lên tệp', 'error': str(e)})
voice_off_path = os.path.join(task_folder, 'voice_off.mp3')
try:
with open(voice_off_path, "wb") as buffer:
buffer.write(await voiceOff.read())
except Exception as e:
return JSONResponse({'message': 'Lỗi khi tải lên tệp voice off', 'error': str(e)})
background_music_path = None
if backgroundMusic:
background_music_path = os.path.join(task_folder, 'background_music.mp3')
try:
with open(background_music_path, "wb") as buffer:
buffer.write(await backgroundMusic.read())
except Exception as e:
return JSONResponse({'message': 'Lỗi khi tải lên tệp nhạc nền', 'error': str(e)})
await task_queue.put({
'task_id': task_id,
'task_folder': task_folder,
'image_paths': image_paths,
'voice_off_path': voice_off_path,
'background_music_path': background_music_path
})
return JSONResponse({'message': 'Task đã được thêm vào hàng đợi', 'task_id': task_id})
async def process_pexels_task(task):
task_id = task['task_id']
task_folder = task['task_folder']
image_paths = task.get('image_paths', [])
voice_off_path = task['voice_off_path']
background_music_path = task.get('background_music_path')
clips = []
for path in image_paths:
try:
clip = VideoFileClip(path)
if clip.duration < 2:
clip = clip.loop(duration=2)
else:
clip = clip.set_duration(2)
clips.append(clip)
except Exception as e:
print(f"Error processing file {path}: {e}")
continue
if not clips:
return JSONResponse({'message': 'Không có video hợp lệ để tạo video'})
final_clip = concatenate_videoclips(clips, method="compose")
# Chèn âm thanh nền vào video
try:
voice_off_clip = AudioFileClip(voice_off_path)
final_audio = voice_off_clip
if background_music_path:
background_music_clip = AudioFileClip(background_music_path).volumex(0.3)
if background_music_clip.duration < voice_off_clip.duration:
background_music_clip = audio_loop(background_music_clip, duration=voice_off_clip.duration)
elif background_music_clip.duration > voice_off_clip.duration:
background_music_clip = background_music_clip.subclip(0, voice_off_clip.duration)
final_audio = CompositeAudioClip([final_audio, background_music_clip])
final_clip = final_clip.set_audio(final_audio)
final_output_path = os.path.join(task_folder, 'final_output.mp4')
final_clip.write_videofile(final_output_path, codec='libx264', fps=16)
# Xóa các tệp tạm sau khi hoàn thành
shutil.rmtree(task_folder)
except Exception as e:
return JSONResponse({'message': 'Lỗi khi xử lý video', 'error': str(e)})
@app.get("/")
async def read_index():
return FileResponse("public/index.html")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)