from app_settings import AppSettings from utils import show_system_info import constants from argparse import ArgumentParser from context import Context from constants import APP_VERSION, LCM_DEFAULT_MODEL_OPENVINO from models.interface_types import InterfaceType from constants import DEVICE from state import get_settings import traceback from fastapi import FastAPI,Body import uvicorn import json import logging from PIL import Image import time from diffusers.utils import load_image import base64 import io from datetime import datetime from typing import Any from backend.models.lcmdiffusion_setting import DiffusionTask from frontend.utils import is_reshape_required from concurrent.futures import ThreadPoolExecutor context = Context(InterfaceType.WEBUI) previous_width = 0 previous_height = 0 previous_model_id = "" previous_num_of_images = 0 # parser = ArgumentParser(description=f"FAST SD CPU {constants.APP_VERSION}") # parser.add_argument( # "-s", # "--share", # action="store_true", # help="Create sharable link(Web UI)", # required=False, # ) # group = parser.add_mutually_exclusive_group(required=False) # group.add_argument( # "-g", # "--gui", # action="store_true", # help="Start desktop GUI", # ) # group.add_argument( # "-w", # "--webui", # action="store_true", # help="Start Web UI", # ) # group.add_argument( # "-r", # "--realtime", # action="store_true", # help="Start realtime inference UI(experimental)", # ) # group.add_argument( # "-v", # "--version", # action="store_true", # help="Version", # ) # parser.add_argument( # "--lcm_model_id", # type=str, # help="Model ID or path,Default SimianLuo/LCM_Dreamshaper_v7", # default="SimianLuo/LCM_Dreamshaper_v7", # ) # parser.add_argument( # "--prompt", # type=str, # help="Describe the image you want to generate", # ) # parser.add_argument( # "--image_height", # type=int, # help="Height of the image", # default=512, # ) # parser.add_argument( # "--image_width", # type=int, # help="Width of the image", # default=512, # ) # parser.add_argument( # "--inference_steps", # type=int, # help="Number of steps,default : 4", # default=4, # ) # parser.add_argument( # "--guidance_scale", # type=int, # help="Guidance scale,default : 1.0", # default=1.0, # ) # parser.add_argument( # "--number_of_images", # type=int, # help="Number of images to generate ,default : 1", # default=1, # ) # parser.add_argument( # "--seed", # type=int, # help="Seed,default : -1 (disabled) ", # default=-1, # ) # parser.add_argument( # "--use_openvino", # action="store_true", # help="Use OpenVINO model", # ) # parser.add_argument( # "--use_offline_model", # action="store_true", # help="Use offline model", # ) # parser.add_argument( # "--use_safety_checker", # action="store_false", # help="Use safety checker", # ) # parser.add_argument( # "--use_lcm_lora", # action="store_true", # help="Use LCM-LoRA", # ) # parser.add_argument( # "--base_model_id", # type=str, # help="LCM LoRA base model ID,Default Lykon/dreamshaper-8", # default="Lykon/dreamshaper-8", # ) # parser.add_argument( # "--lcm_lora_id", # type=str, # help="LCM LoRA model ID,Default latent-consistency/lcm-lora-sdv1-5", # default="latent-consistency/lcm-lora-sdv1-5", # ) # parser.add_argument( # "-i", # "--interactive", # action="store_true", # help="Interactive CLI mode", # ) # parser.add_argument( # "--use_tiny_auto_encoder", # action="store_true", # help="Use tiny auto encoder for SD (TAESD)", # ) # args = parser.parse_args() # if args.version: # print(APP_VERSION) # exit() # parser.print_help() show_system_info() print(f"Using device : {constants.DEVICE}") app_settings = get_settings() print(f"Found {len(app_settings.lcm_models)} LCM models in config/lcm-models.txt") print( f"Found {len(app_settings.stable_diffsuion_models)} stable diffusion models in config/stable-diffusion-models.txt" ) print( f"Found {len(app_settings.lcm_lora_models)} LCM-LoRA models in config/lcm-lora-models.txt" ) print( f"Found {len(app_settings.openvino_lcm_models)} OpenVINO LCM models in config/openvino-lcm-models.txt" ) app_settings.settings.lcm_diffusion_setting.use_openvino = True # from frontend.webui.ui import start_webui # print("Starting web UI mode") # start_webui( # args.share, # ) # app = FastAPI(name="mutilParam") print("我执行了app.py") # @app.get("/") # def root(): # return {"API": "hello"} # @app.post("/img2img") # async def predict(prompt=Body(...),imgbase64data=Body(...),negative_prompt=Body(None),userId=Body(None)): # MAX_QUEUE_SIZE = 4 # start = time.time() # print("参数",imgbase64data,prompt) # image_data = base64.b64decode(imgbase64data) # image1 = Image.open(io.BytesIO(image_data)) # w, h = image1.size # newW = 512 # newH = int(h * newW / w) # img = image1.resize((newW, newH)) # end1 = time.time() # now = datetime.now() # print(now) # print("图像:", img.size) # print("加载管道:", end1 - start) # global previous_height, previous_width, previous_model_id, previous_num_of_images, app_settings # app_settings.settings.lcm_diffusion_setting.prompt = prompt # app_settings.settings.lcm_diffusion_setting.negative_prompt = negative_prompt # app_settings.settings.lcm_diffusion_setting.init_image = image1 # app_settings.settings.lcm_diffusion_setting.strength = 0.6 # app_settings.settings.lcm_diffusion_setting.diffusion_task = ( # DiffusionTask.image_to_image.value # ) # model_id = app_settings.settings.lcm_diffusion_setting.openvino_lcm_model_id # reshape = False # app_settings.settings.lcm_diffusion_setting.image_height=newH # image_width = app_settings.settings.lcm_diffusion_setting.image_width # image_height = app_settings.settings.lcm_diffusion_setting.image_height # num_images = app_settings.settings.lcm_diffusion_setting.number_of_images # reshape = is_reshape_required( # previous_width, # image_width, # previous_height, # image_height, # previous_model_id, # model_id, # previous_num_of_images, # num_images, # ) # with ThreadPoolExecutor(max_workers=1) as executor: # future = executor.submit( # context.generate_text_to_image, # app_settings.settings, # reshape, # DEVICE, # ) # images = future.result() # previous_width = image_width # previous_height = image_height # previous_model_id = model_id # previous_num_of_images = num_images # output_image = images[0] # end2 = time.time() # print("测试",output_image) # print("s生成完成:", end2 - end1) # # 将图片对象转换为bytes # image_data = io.BytesIO() # # 将图像保存到BytesIO对象中,格式为JPEG # output_image.save(image_data, format='JPEG') # # 将BytesIO对象的内容转换为字节串 # image_data_bytes = image_data.getvalue() # output_image_base64 = base64.b64encode(image_data_bytes).decode('utf-8') # print("完成的图片:", output_image_base64) # return output_image_base64 # @app.post("/predict") # async def predict(prompt=Body(...)): # return f"您好,{prompt}"