import mimetypes import os import tempfile import time from dataclasses import dataclass from datetime import datetime from typing import List import requests from fastapi import FastAPI from pydantic import BaseModel from starlette.responses import JSONResponse from supabase import create_client from src.components.each_necklace_video_gen import EachVideoCreator from src.components.product_page_nto_cto_gen import ProductPageImageGeneration from src.utils.logs import logger supabase_url = os.getenv('SUPABASE_URL') supabase_key = os.getenv('SUPABASE_KEY') supabase = create_client(supabase_url, supabase_key) prod_page = ProductPageImageGeneration() @dataclass class MakeupColors: lipstick: str eyeliner: str eyeshadow: str app = FastAPI() RESOURCES_DIR = "resources" os.makedirs(RESOURCES_DIR, exist_ok=True) TEMP_VIDEO_DIR = f"{RESOURCES_DIR}/temp_video" os.environ['MOVIEPY_TEMP_DIR'] = '/tmp/moviepy' def upload_to_supabase(video_path, bucket_name="JewelmirrorVideoGeneration", model_name="MOD"): """ Upload video to Supabase with organized folder structure. Format: {bucket_name}/{year}/{month}/{day}/{model_name}/{video_filename} """ logger.info(f"Uploading video to Supabase: {video_path}") try: # Check if file exists if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") # Get current date for folder structure current_date = datetime.now() year = current_date.strftime("%Y") month = current_date.strftime("%m") day = current_date.strftime("%d") unique_id = os.urandom(8).hex() original_filename = os.path.basename(video_path) filename, ext = os.path.splitext(original_filename) structured_filename = f"{filename}_{unique_id}{ext}" structured_path = f"{year}/{month}/{day}/{model_name}/{structured_filename}" content_type = mimetypes.guess_type(video_path)[0] or 'video/mp4' options = { "content-type": content_type, "x-upsert": "true", "cache-control": "max-age=3600" } logger.info(f"Uploading video with structured path: {structured_path}") # Read and upload file with open(video_path, 'rb') as f: file_data = f.read() # Upload to Supabase supabase.storage.from_(bucket_name).upload( path=structured_path, file=file_data, file_options=options ) public_url = supabase.storage.from_(bucket_name).get_public_url(structured_path) # Verify upload response = requests.head(public_url) if response.status_code != 200: logger.error(f"Upload verification failed: {response.status_code}") raise Exception(f"Upload verification failed: {response.status_code}") logger.info(f"Video uploaded successfully: {public_url}") return public_url except Exception as e: logger.error(f"Error uploading to Supabase: {str(e)}") return None def get_structured_path(video_path, model_name="MOD"): current_date = datetime.now() unique_id = os.urandom(8).hex() filename = os.path.basename(video_path) name, ext = os.path.splitext(filename) return f"{current_date.strftime('%Y')}/{current_date.strftime('%m')}/{current_date.strftime('%d')}/{model_name}/{name}_{unique_id}{ext}" def download_image(url): logger.info(f"Downloading image from {url}") try: response = requests.get(url, stream=True) response.raise_for_status() temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") with open(temp_file.name, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logger.info(f"Image downloaded successfully: {temp_file.name}") return temp_file.name except Exception as e: logger.error(f"Error downloading image from {url}: {str(e)}") print(f"Error downloading image from {url}: {str(e)}") return None # # class VideoGenerator(BaseModel): # necklace_image: str # necklace_try_on_output_images: list[str] # clothing_output_images: list[str] # makeup_output_images: list[str] # intro_video_path: str = "JewelMirror_intro.mp4" # background_audio_path: str = "TraditionalIndianVlogMusic.mp3" # font_path: str = "PlayfairDisplay-VariableFont.ttf" # image_display_duration: float = 2.5 # fps: int = 30 # necklace_title: str = "Necklace Try-On" # clothing_title: str = "Clothing Try-On" # makeup_title: str = "Makeup Try-On" # necklace_image_title: str = "Necklace Preview" # nto_image_title: str = "Necklace Try-On" # nto_cto_image_title: str = "Clothing Try-On" # makeup_image_title: str = "Makeup Try-On" # transition_duration: float = 0.5 # # # @app.post("/createvideo/") # async def create_video(request: VideoGenerator): # logger.info(f"Creating video with request: {request.dict()}") # start_time = time.time() # try: # logger.info("Downloading images...") # temp_files = { # 'necklace': download_image(request.necklace_image), # 'necklace_tryon': [download_image(url) for url in request.necklace_try_on_output_images], # 'clothing': [download_image(url) for url in request.clothing_output_images], # 'makeup': [download_image(url) for url in request.makeup_output_images] # } # file_download_time = time.time() - start_time # # if any(file is None for files in temp_files.values() for file in # (files if isinstance(files, list) else [files])): # return JSONResponse(content={"status": "error", "message": "Failed to download all required images."}, # status_code=400) # # intro_path = f"{RESOURCES_DIR}/intro/{request.intro_video_path}" # output_path = f"{TEMP_VIDEO_DIR}/video_{os.urandom(8).hex()}.mp4" # font_path = f"{RESOURCES_DIR}/fonts/{request.font_path}" # audio_path = f"{RESOURCES_DIR}/audio/{request.background_audio_path}" # logger.info(f"Creating video with paths: {intro_path}, {output_path}, {font_path}, {audio_path}") # # video_creator = VideoCreator( # intro_video_path=intro_path, # necklace_image=temp_files['necklace'], # nto_outputs=temp_files['necklace_tryon'], # nto_cto_outputs=temp_files['clothing'], # makeup_outputs=temp_files['makeup'], # font_path=font_path, # output_path=output_path, # audio_path=audio_path, # image_display_duration=request.image_display_duration, # fps=request.fps, # necklace_image_title=request.necklace_image_title, # nto_image_title=request.nto_image_title, # nto_cto_image_title=request.nto_cto_image_title, # makeup_image_title=request.makeup_image_title, # ) # logger.info("Creating video...") # # video_creator.create_final_video() # video_creation_time = time.time() - start_time - file_download_time # # url = upload_to_supabase(video_path=output_path) # logger.info(f"Video created successfully: {url}") # supabase_upload_time = time.time() - start_time - file_download_time - video_creation_time # logger.info(f"Video uploaded to Supabase: {url}") # # response = { # "status": "success", # "message": "Video created successfully.", # "video_url": url, # "timings": { # "file_download_time": file_download_time, # "video_creation_time": video_creation_time, # "supabase_upload_time": supabase_upload_time # } # } # logger.info(f"Response: {response}") # # for files in temp_files.values(): # if isinstance(files, list): # for file in files: # if os.path.exists(file): # os.unlink(file) # elif os.path.exists(files): # os.unlink(files) # # os.remove(output_path) # logger.info("Files cleaned up successfully.") # return JSONResponse(content=response, status_code=200) # # except Exception as e: # logger.error(f"Error creating video: {str(e)}") # return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) class EachNecklaceVideoGeneratorRequest(BaseModel): intro_video_path: str = "JewelMirror_intro.mp4" font_path: str = "PlayfairDisplay-VariableFont.ttf" background_audio_path: str = "TraditionalIndianVlogMusic.mp3" image_display_duration: float = 2.5 fps: int = 10 necklace_title: List[str] nto_image_title: List[List[str]] nto_cto_image_title: List[List[str]] makeup_image_title: List[List[str]] necklace_images: List[str] necklace_try_on_output_images: List[List[str]] clothing_output_images: List[List[str]] makeup_output_images: List[List[str]] background_colors: list[tuple[int, int, int]] outro_title: str = "Reach out to us for more information" address: str = "123, ABC Street, XYZ City" phone_numbers: str = "1234567890" logo_url: str = "https://lvuhhlrkcuexzqtsbqyu.supabase.co/storage/v1/object/public/MagicMirror/FullImages/default.png" transition_duration: float = 0.5 transition_type: str = "None" direction: str = "left" outro_video_path: str = "JewelMirror_outro.mp4" @app.post("/createcombinedvideo/") async def create_combined_video(request: EachNecklaceVideoGeneratorRequest): logger.info(f"Creating video with request: {request.dict()}") start_time = time.time() background_ = request.background_colors try: logger.info("Downloading images...") def process_images(image_list): if isinstance(image_list, list): return [process_images(item) for item in image_list] return download_image(image_list) temp_files = { 'necklaces': process_images(request.necklace_images), 'necklace_tryon': process_images(request.necklace_try_on_output_images), 'clothing': process_images(request.clothing_output_images), 'makeup': process_images(request.makeup_output_images), 'logo': download_image(request.logo_url) } file_download_time = time.time() - start_time def verify_files(files): logger.info(f"Verifying files: {files}") if isinstance(files, list): return all(verify_files(item) for item in files) return files is not None if not all(verify_files(files) for files in temp_files.values()): return JSONResponse( content={"status": "error", "message": "Failed to download all required images."}, status_code=400 ) intro_path = f"{RESOURCES_DIR}/intro/{request.intro_video_path}" output_path = f"{TEMP_VIDEO_DIR}/video_{os.urandom(8).hex()}.mp4" font_path = f"{RESOURCES_DIR}/fonts/{request.font_path}" audio_path = f"{RESOURCES_DIR}/audio/{request.background_audio_path}" outro_path = f"{RESOURCES_DIR}/outro/{request.outro_video_path}" try: model_name = request.necklace_try_on_output_images[0][0].split("/")[-1].split(".")[0].split("-")[1] except: model_name = "MOD" video_creator = EachVideoCreator( intro_video_path=intro_path, necklace_image=temp_files['necklaces'], nto_outputs=temp_files['necklace_tryon'], nto_cto_outputs=temp_files['clothing'], makeup_outputs=temp_files['makeup'], font_path=font_path, output_path=output_path, audio_path=audio_path, image_display_duration=request.image_display_duration, fps=request.fps, necklace_title=request.necklace_title, nto_title=request.nto_image_title, cto_title=request.nto_cto_image_title, makeup_title=request.makeup_image_title, backgrounds=background_, outro_title=request.outro_title, address=request.address, phone_numbers=request.phone_numbers, logo_image=temp_files['logo'], transition_duration=request.transition_duration, transition_type=request.transition_type, direction=request.direction, outro_video_path=outro_path ) logger.info("Creating video...") video_creator.create_final_video() video_creation_time = time.time() - start_time - file_download_time logger.info("Video created successfully.") url = upload_to_supabase(video_path=output_path, model_name=model_name) supabase_upload_time = time.time() - start_time - file_download_time - video_creation_time logger.info(f"Video uploaded to Supabase: {url}") response = { "status": "success", "message": "Video created successfully.", "video_url": url, "timings": { "file_download_time": file_download_time, "video_creation_time": video_creation_time, "supabase_upload_time": supabase_upload_time } } logger.info(f"Response: {response}") def cleanup_files(files): if isinstance(files, list): for item in files: cleanup_files(item) elif os.path.exists(files): os.unlink(files) if os.path.exists(output_path): os.remove(output_path) logger.info("Files cleaned up successfully.") return JSONResponse(content=response, status_code=200) except Exception as e: raise e return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) @app.get("/transition_list") async def get_transitions(): response = { "transitions": ["crossfade", "slide"] } return JSONResponse(content=response, status_code=200) @app.get("/resources") async def get_information(): logger.info("Getting resources information") music = os.listdir(RESOURCES_DIR + "/audio") fonts = os.listdir(RESOURCES_DIR + "/fonts") intro = os.listdir(RESOURCES_DIR + "/intro") outro = os.listdir(RESOURCES_DIR + "/outro") logger.info(f"Resources: {music}, {fonts}, {intro}") json = {"music": music, "fonts": fonts, "intro": intro, "outro": outro} return JSONResponse(content=json, status_code=200) def ensure_json_serializable(data): if isinstance(data, list): return [ensure_json_serializable(item) for item in data] elif isinstance(data, dict): return {key: ensure_json_serializable(value) for key, value in data.items()} elif hasattr(data, "__dict__"): return ensure_json_serializable(data.__dict__) else: return data @app.post("/product_page_image_generation") async def product_page_image_generation(model_image: str, necklace_id: str, necklace_category: str, storename: str, x_offset: str, y_offset: str, clothing_list: List[str], makeup_colors: MakeupColors): try: model_name = model_image.split("/")[-1].split(".")[0] model_image_path = download_image(model_image) if model_image_path is None: return JSONResponse(content={"status": "error", "message": "Failed to download model image"}, status_code=400) response = prod_page.process_full_tryon_sequence( model_image_path, necklace_id, { "id": necklace_id, "category": necklace_category, "store_name": storename, "offset_x": x_offset, "offset_y": y_offset }, clothing_list, makeup_colors, model_name=model_name ) nto_results = response["nto_results"] cto_results = response["cto_results"] mto_results = response["mto_results"] json_response = { "status": "success", "message": "Product page images generated successfully.", "nto_results": ensure_json_serializable(nto_results), "cto_results": ensure_json_serializable(cto_results), "mto_results": ensure_json_serializable(mto_results) } return JSONResponse(content=json_response, status_code=200) except Exception as e: return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)