Spaces:
Running
Running
import mimetypes | |
import os | |
import tempfile | |
import time | |
from dataclasses import dataclass | |
from typing import List | |
import requests | |
from fastapi import FastAPI | |
from pydantic import BaseModel | |
from starlette.responses import JSONResponse | |
from supabase import create_client | |
from src.components.each_necklace_video_gen import EachVideoCreator | |
from src.components.product_page_nto_cto_gen import ProductPageImageGeneration | |
from src.components.vidgen import VideoCreator | |
from src.utils.logs import logger | |
supabase_url = os.getenv('SUPABASE_URL') | |
supabase_key = os.getenv('SUPABASE_KEY') | |
supabase = create_client(supabase_url, supabase_key) | |
prod_page = ProductPageImageGeneration() | |
class MakeupColors: | |
lipstick: str | |
eyeliner: str | |
eyeshadow: str | |
app = FastAPI() | |
RESOURCES_DIR = "/app/resources/" | |
os.makedirs(RESOURCES_DIR, exist_ok=True) | |
TEMP_VIDEO_DIR = f"{RESOURCES_DIR}/temp_video" | |
os.environ['MOVIEPY_TEMP_DIR'] = '/tmp/moviepy' | |
def upload_to_supabase(video_path, bucket_name="JewelmirrorVideoGeneration"): | |
logger.info(f"Uploading video to Supabase: {video_path}") | |
try: | |
if not os.path.exists(video_path): | |
raise FileNotFoundError(f"Video file not found: {video_path}") | |
content_type = mimetypes.guess_type(video_path)[0] or 'video/mp4' | |
file_name = os.path.basename(video_path) | |
options = { | |
"content-type": content_type, | |
"x-upsert": "true", | |
"cache-control": "max-age=3600" | |
} | |
logger.info(f"Uploading video to Supabase: {video_path}") | |
with open(video_path, 'rb') as f: | |
file_data = f.read() | |
supabase.storage.from_(bucket_name).upload( | |
path=file_name, | |
file=file_data, | |
file_options=options | |
) | |
public_url = supabase.storage.from_(bucket_name).get_public_url(file_name) | |
response = requests.head(public_url) | |
if response.status_code != 200: | |
logger.error(f"Upload verification failed: {response.status_code}") | |
raise Exception(f"Upload verification failed: {response.status_code}") | |
logger.info(f"Video uploaded successfully: {public_url}") | |
return public_url | |
except Exception as e: | |
logger.error(f"Error uploading to Supabase: {str(e)}") | |
return None | |
def download_image(url): | |
logger.info(f"Downloading image from {url}") | |
try: | |
response = requests.get(url, stream=True) | |
response.raise_for_status() | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") | |
with open(temp_file.name, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
logger.info(f"Image downloaded successfully: {temp_file.name}") | |
return temp_file.name | |
except Exception as e: | |
logger.error(f"Error downloading image from {url}: {str(e)}") | |
print(f"Error downloading image from {url}: {str(e)}") | |
return None | |
class VideoGenerator(BaseModel): | |
necklace_image: str | |
necklace_try_on_output_images: list[str] | |
clothing_output_images: list[str] | |
makeup_output_images: list[str] | |
intro_video_path: str = "JewelMirror_intro.mp4" | |
background_audio_path: str = "TraditionalIndianVlogMusic.mp3" | |
font_path: str = "PlayfairDisplay-VariableFont.ttf" | |
image_display_duration: float = 2.5 | |
fps: int = 30 | |
necklace_title: str = "Necklace Try-On" | |
clothing_title: str = "Clothing Try-On" | |
makeup_title: str = "Makeup Try-On" | |
necklace_image_title: str = "Necklace Preview" | |
nto_image_title: str = "Necklace Try-On" | |
nto_cto_image_title: str = "Clothing Try-On" | |
makeup_image_title: str = "Makeup Try-On" | |
transition_duration: float = 0.5 | |
async def create_video(request: VideoGenerator): | |
logger.info(f"Creating video with request: {request.dict()}") | |
start_time = time.time() | |
try: | |
logger.info("Downloading images...") | |
temp_files = { | |
'necklace': download_image(request.necklace_image), | |
'necklace_tryon': [download_image(url) for url in request.necklace_try_on_output_images], | |
'clothing': [download_image(url) for url in request.clothing_output_images], | |
'makeup': [download_image(url) for url in request.makeup_output_images] | |
} | |
file_download_time = time.time() - start_time | |
if any(file is None for files in temp_files.values() for file in | |
(files if isinstance(files, list) else [files])): | |
return JSONResponse(content={"status": "error", "message": "Failed to download all required images."}, | |
status_code=400) | |
intro_path = f"{RESOURCES_DIR}/intro/{request.intro_video_path}" | |
output_path = f"{TEMP_VIDEO_DIR}/video_{os.urandom(8).hex()}.mp4" | |
font_path = f"{RESOURCES_DIR}/fonts/{request.font_path}" | |
audio_path = f"{RESOURCES_DIR}/audio/{request.background_audio_path}" | |
logger.info(f"Creating video with paths: {intro_path}, {output_path}, {font_path}, {audio_path}") | |
video_creator = VideoCreator( | |
intro_video_path=intro_path, | |
necklace_image=temp_files['necklace'], | |
nto_outputs=temp_files['necklace_tryon'], | |
nto_cto_outputs=temp_files['clothing'], | |
makeup_outputs=temp_files['makeup'], | |
font_path=font_path, | |
output_path=output_path, | |
audio_path=audio_path, | |
image_display_duration=request.image_display_duration, | |
fps=request.fps, | |
necklace_image_title=request.necklace_image_title, | |
nto_image_title=request.nto_image_title, | |
nto_cto_image_title=request.nto_cto_image_title, | |
makeup_image_title=request.makeup_image_title, | |
) | |
logger.info("Creating video...") | |
video_creator.create_final_video() | |
video_creation_time = time.time() - start_time - file_download_time | |
url = upload_to_supabase(video_path=output_path) | |
logger.info(f"Video created successfully: {url}") | |
supabase_upload_time = time.time() - start_time - file_download_time - video_creation_time | |
logger.info(f"Video uploaded to Supabase: {url}") | |
response = { | |
"status": "success", | |
"message": "Video created successfully.", | |
"video_url": url, | |
"timings": { | |
"file_download_time": file_download_time, | |
"video_creation_time": video_creation_time, | |
"supabase_upload_time": supabase_upload_time | |
} | |
} | |
logger.info(f"Response: {response}") | |
for files in temp_files.values(): | |
if isinstance(files, list): | |
for file in files: | |
if os.path.exists(file): | |
os.unlink(file) | |
elif os.path.exists(files): | |
os.unlink(files) | |
os.remove(output_path) | |
logger.info("Files cleaned up successfully.") | |
return JSONResponse(content=response, status_code=200) | |
except Exception as e: | |
logger.error(f"Error creating video: {str(e)}") | |
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) | |
class EachNecklaceVideoGeneratorRequest(BaseModel): | |
intro_video_path: str = "JewelMirror_intro.mp4" | |
font_path: str = "PlayfairDisplay-VariableFont.ttf" | |
background_audio_path: str = "TraditionalIndianVlogMusic.mp3" | |
image_display_duration: float = 2.5 | |
fps: int = 10 | |
necklace_title: List[str] | |
nto_image_title: List[List[str]] | |
nto_cto_image_title: List[List[str]] | |
makeup_image_title: List[List[str]] | |
necklace_images: List[str] | |
necklace_try_on_output_images: List[List[str]] | |
clothing_output_images: List[List[str]] | |
makeup_output_images: List[List[str]] | |
background_colors: list[tuple[int, int, int]] | |
outro_title: str = "Reach out to us for more information" | |
address: str = "123, ABC Street, XYZ City" | |
phone_numbers: str = "1234567890" | |
logo_url: str = "https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/MagicMirror/FullImages/default.png" | |
transition_duration: float = 0.5 | |
transition_type: str = "None" | |
direction: str = "left" | |
outro_video_path: str = "JewelMirror_outro.mp4" | |
async def create_combined_video(request: EachNecklaceVideoGeneratorRequest): | |
logger.info(f"Creating video with request: {request.dict()}") | |
start_time = time.time() | |
background_ = request.background_colors | |
try: | |
logger.info("Downloading images...") | |
def process_images(image_list): | |
if isinstance(image_list, list): | |
return [process_images(item) for item in image_list] | |
return download_image(image_list) | |
temp_files = { | |
'necklaces': process_images(request.necklace_images), | |
'necklace_tryon': process_images(request.necklace_try_on_output_images), | |
'clothing': process_images(request.clothing_output_images), | |
'makeup': process_images(request.makeup_output_images), | |
'logo': download_image(request.logo_url) | |
} | |
file_download_time = time.time() - start_time | |
def verify_files(files): | |
logger.info(f"Verifying files: {files}") | |
if isinstance(files, list): | |
return all(verify_files(item) for item in files) | |
return files is not None | |
if not all(verify_files(files) for files in temp_files.values()): | |
return JSONResponse( | |
content={"status": "error", "message": "Failed to download all required images."}, | |
status_code=400 | |
) | |
intro_path = f"{RESOURCES_DIR}/intro/{request.intro_video_path}" | |
output_path = f"{TEMP_VIDEO_DIR}/video_{os.urandom(8).hex()}.mp4" | |
font_path = f"{RESOURCES_DIR}/fonts/{request.font_path}" | |
audio_path = f"{RESOURCES_DIR}/audio/{request.background_audio_path}" | |
outro_path = f"{RESOURCES_DIR}/outro/{request.outro_video_path}" | |
video_creator = EachVideoCreator( | |
intro_video_path=intro_path, | |
necklace_image=temp_files['necklaces'], | |
nto_outputs=temp_files['necklace_tryon'], | |
nto_cto_outputs=temp_files['clothing'], | |
makeup_outputs=temp_files['makeup'], | |
font_path=font_path, | |
output_path=output_path, | |
audio_path=audio_path, | |
image_display_duration=request.image_display_duration, | |
fps=request.fps, | |
necklace_title=request.necklace_title, | |
nto_title=request.nto_image_title, | |
cto_title=request.nto_cto_image_title, | |
makeup_title=request.makeup_image_title, | |
backgrounds=background_, | |
outro_title=request.outro_title, | |
address=request.address, | |
phone_numbers=request.phone_numbers, | |
logo_image=temp_files['logo'], | |
transition_duration=request.transition_duration, | |
transition_type=request.transition_type, | |
direction=request.direction, | |
outro_video_path=outro_path | |
) | |
logger.info("Creating video...") | |
video_creator.create_final_video() | |
video_creation_time = time.time() - start_time - file_download_time | |
logger.info("Video created successfully.") | |
url = upload_to_supabase(video_path=output_path) | |
supabase_upload_time = time.time() - start_time - file_download_time - video_creation_time | |
logger.info(f"Video uploaded to Supabase: {url}") | |
response = { | |
"status": "success", | |
"message": "Video created successfully.", | |
"video_url": url, | |
"timings": { | |
"file_download_time": file_download_time, | |
"video_creation_time": video_creation_time, | |
"supabase_upload_time": supabase_upload_time | |
} | |
} | |
logger.info(f"Response: {response}") | |
def cleanup_files(files): | |
if isinstance(files, list): | |
for item in files: | |
cleanup_files(item) | |
elif os.path.exists(files): | |
os.unlink(files) | |
if os.path.exists(output_path): | |
os.remove(output_path) | |
logger.info("Files cleaned up successfully.") | |
return JSONResponse(content=response, status_code=200) | |
except Exception as e: | |
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) | |
async def get_transitions(): | |
response = { | |
"transitions": ["crossfade", "slide"] | |
} | |
return JSONResponse(content=response, status_code=200) | |
async def get_information(): | |
logger.info("Getting resources information") | |
music = os.listdir(RESOURCES_DIR + "/audio") | |
fonts = os.listdir(RESOURCES_DIR + "/fonts") | |
intro = os.listdir(RESOURCES_DIR + "/intro") | |
outro = os.listdir(RESOURCES_DIR + "/outro") | |
logger.info(f"Resources: {music}, {fonts}, {intro}") | |
json = {"music": music, "fonts": fonts, "intro": intro, "outro": outro} | |
return JSONResponse(content=json, status_code=200) | |
def ensure_json_serializable(data): | |
if isinstance(data, list): | |
return [ensure_json_serializable(item) for item in data] | |
elif isinstance(data, dict): | |
return {key: ensure_json_serializable(value) for key, value in data.items()} | |
elif hasattr(data, "__dict__"): | |
return ensure_json_serializable(data.__dict__) | |
else: | |
return data | |
async def product_page_image_generation(model_image: str, necklace_id: str, necklace_category: str, storename: str, | |
x_offset: str, | |
y_offset: str, | |
clothing_list: List[str], | |
makeup_colors: MakeupColors): | |
try: | |
model_name = model_image.split("/")[-1].split(".")[0] | |
model_image_path = download_image(model_image) | |
if model_image_path is None: | |
return JSONResponse(content={"status": "error", "message": "Failed to download model image"}, | |
status_code=400) | |
response = prod_page.process_full_tryon_sequence( | |
model_image_path, | |
necklace_id, | |
{ | |
"id": necklace_id, | |
"category": necklace_category, | |
"store_name": storename, | |
"offset_x": x_offset, | |
"offset_y": y_offset | |
}, | |
clothing_list, | |
makeup_colors, | |
model_name=model_name | |
) | |
nto_results = response["nto_results"] | |
cto_results = response["cto_results"] | |
mto_results = response["mto_results"] | |
json_response = { | |
"status": "success", | |
"message": "Product page images generated successfully.", | |
"nto_results": ensure_json_serializable(nto_results), | |
"cto_results": ensure_json_serializable(cto_results), | |
"mto_results": ensure_json_serializable(mto_results) | |
} | |
return JSONResponse(content=json_response, status_code=200) | |
except Exception as e: | |
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |