import concurrent.futures import random import gradio as gr # from fal_api_utils import load_fal_model from .imagenhub_utils import load_imagenhub_model import spaces import requests import io, base64, json from PIL import Image import os IMAGE_GENERATION_MODELS = ['imagenhub_LCM_generation','imagenhub_SDXLTurbo_generation','imagenhub_SDXL_generation', 'imagenhub_PixArtAlpha_generation', 'imagenhub_OpenJourney_generation','imagenhub_SDXLLightning_generation', 'imagenhub_StableCascade_generation', 'imagenhub_PlayGroundV2_generation', 'imagenhub_PlayGroundV2.5_generation'] IMAGE_EDITION_MODELS = ['imagenhub_CycleDiffusion_edition', 'imagenhub_Pix2PixZero_edition', 'imagenhub_Prompt2prompt_edition', 'imagenhub_SDEdit_edition', 'imagenhub_InstructPix2Pix_edition', 'imagenhub_MagicBrush_edition', 'imagenhub_PNP_edition'] class ModelManager: def __init__(self): self.model_ig_list = IMAGE_GENERATION_MODELS self.model_ie_list = IMAGE_EDITION_MODELS self.loaded_models = {} # @spaces.GPU def load_model_pipe(self, model_name): model_source, model_name, model_type = model_name.split("_") if not model_name in self.loaded_models: if model_source == "imagenhub": pipe = load_imagenhub_model(model_name) # elif model_source == "fal": # pipe = load_fal_model(model_name, model_type) else: raise ValueError(f"Model source {model_source} not supported") self.loaded_models[model_name] = pipe else: pipe = self.loaded_models[model_name] return pipe def generate_image_playground(self, model_name, prompt): if model_name == "imagenhub_PlayGroundV2_generation": model_name = "Playground_v2" elif model_name == "imagenhub_PlayGroundV2.5_generation": model_name = "Playground_v2.5" headers = { 'Content-Type': 'application/json', 'Authorization': os.environ['PlaygroundAPI'], } data = json.dumps({"prompt": prompt, "filter_model": model_name, "scheduler": "DPMPP_2M_K", "guidance_scale": 3}) response = requests.post('https://playground.com/api/models/external/v1', headers=headers, data=data) response.raise_for_status() json_obj = response.json() image_base64 = json_obj['images'][0] img = Image.open(io.BytesIO(base64.decodebytes(bytes(image_base64, "utf-8")))) return img @spaces.GPU(duration=60) def generate_image_ig(self, prompt, model_name): if 'playground' in model_name.lower(): result = self.generate_image_playground(model_name=model_name, prompt=prompt) else: pipe = self.load_model_pipe(model_name) result = pipe(prompt=prompt) return result def generate_image_ig_parallel_anony(self, prompt, model_A, model_B): if model_A == "" and model_B == "": model_names = random.sample([model for model in self.model_ig_list], 2) else: model_names = [model_A, model_B] results = [] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_result = {executor.submit(self.generate_image_ig, prompt, model): model for model in model_names} for future in concurrent.futures.as_completed(future_to_result): result = future.result() results.append(result) return results[0], results[1], model_names[0], model_names[1] def generate_image_ig_parallel(self, prompt, model_A, model_B): results = [] model_names = [model_A, model_B] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_result = {executor.submit(self.generate_image_ig, prompt, model): model for model in model_names} for future in concurrent.futures.as_completed(future_to_result): result = future.result() results.append(result) return results[0], results[1] @spaces.GPU(duration=150) def generate_image_ie(self, textbox_source, textbox_target, textbox_instruct, source_image, model_name): pipe = self.load_model_pipe(model_name) result = pipe(src_image = source_image, src_prompt = textbox_source, target_prompt = textbox_target, instruct_prompt = textbox_instruct) return result def generate_image_ie_parallel(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B): results = [] model_names = [model_A, model_B] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_result = {executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model): model for model in model_names} for future in concurrent.futures.as_completed(future_to_result): result = future.result() results.append(result) return results[0], results[1] def generate_image_ie_parallel_anony(self, textbox_source, textbox_target, textbox_instruct, source_image, model_A, model_B): if model_A == "" and model_B == "": model_names = random.sample([model for model in self.model_ie_list], 2) else: model_names = [model_A, model_B] results = [] # model_names = [model_A, model_B] with concurrent.futures.ThreadPoolExecutor() as executor: future_to_result = {executor.submit(self.generate_image_ie, textbox_source, textbox_target, textbox_instruct, source_image, model): model for model in model_names} for future in concurrent.futures.as_completed(future_to_result): result = future.result() results.append(result) return results[0], results[1], model_names[0], model_names[1]