Spaces:
Runtime error
Runtime error
import os, json, requests, runpod | |
discord_token = os.getenv('com_camenduru_discord_token') | |
web_uri = os.getenv('com_camenduru_web_uri') | |
web_token = os.getenv('com_camenduru_web_token') | |
import random, time | |
import torch | |
import numpy as np | |
from PIL import Image | |
import nodes | |
from nodes import NODE_CLASS_MAPPINGS | |
from nodes import load_custom_node | |
from comfy_extras import nodes_custom_sampler | |
from comfy_extras import nodes_flux | |
from comfy import model_management | |
load_custom_node("/content/ComfyUI/custom_nodes/ComfyUI-LLaVA-OneVision") | |
DualCLIPLoader = NODE_CLASS_MAPPINGS["DualCLIPLoader"]() | |
UNETLoader = NODE_CLASS_MAPPINGS["UNETLoader"]() | |
VAELoader = NODE_CLASS_MAPPINGS["VAELoader"]() | |
LoraLoader = NODE_CLASS_MAPPINGS["LoraLoader"]() | |
FluxGuidance = nodes_flux.NODE_CLASS_MAPPINGS["FluxGuidance"]() | |
RandomNoise = nodes_custom_sampler.NODE_CLASS_MAPPINGS["RandomNoise"]() | |
BasicGuider = nodes_custom_sampler.NODE_CLASS_MAPPINGS["BasicGuider"]() | |
KSamplerSelect = nodes_custom_sampler.NODE_CLASS_MAPPINGS["KSamplerSelect"]() | |
BasicScheduler = nodes_custom_sampler.NODE_CLASS_MAPPINGS["BasicScheduler"]() | |
SamplerCustomAdvanced = nodes_custom_sampler.NODE_CLASS_MAPPINGS["SamplerCustomAdvanced"]() | |
VAEDecode = NODE_CLASS_MAPPINGS["VAEDecode"]() | |
EmptyLatentImage = NODE_CLASS_MAPPINGS["EmptyLatentImage"]() | |
DownloadAndLoadLLaVAOneVisionModel = NODE_CLASS_MAPPINGS["DownloadAndLoadLLaVAOneVisionModel"]() | |
LLaVA_OneVision_Run = NODE_CLASS_MAPPINGS["LLaVA_OneVision_Run"]() | |
LoadImage = NODE_CLASS_MAPPINGS["LoadImage"]() | |
with torch.inference_mode(): | |
llava_model = DownloadAndLoadLLaVAOneVisionModel.loadmodel("lmms-lab/llava-onevision-qwen2-0.5b-si", "cuda", "bf16", "sdpa")[0] | |
clip = DualCLIPLoader.load_clip("t5xxl_fp16.safetensors", "clip_l.safetensors", "flux")[0] | |
unet = UNETLoader.load_unet("flux1-dev.sft", "default")[0] | |
vae = VAELoader.load_vae("ae.sft")[0] | |
def closestNumber(n, m): | |
q = int(n / m) | |
n1 = m * q | |
if (n * m) > 0: | |
n2 = m * (q + 1) | |
else: | |
n2 = m * (q - 1) | |
if abs(n - n1) < abs(n - n2): | |
return n1 | |
return n2 | |
def download_file(url, save_dir='/content/ComfyUI/input'): | |
os.makedirs(save_dir, exist_ok=True) | |
file_name = url.split('/')[-1] | |
file_path = os.path.join(save_dir, file_name) | |
response = requests.get(url) | |
response.raise_for_status() | |
with open(file_path, 'wb') as file: | |
file.write(response.content) | |
return file_path | |
def generate(input): | |
values = input["input"] | |
tag_image = values['input_image_check'] | |
tag_image = download_file(tag_image) | |
final_width = values['final_width'] | |
tag_prompt = values['tag_prompt'] | |
additional_prompt = values['additional_prompt'] | |
tag_seed = values['tag_seed'] | |
tag_temp = values['tag_temp'] | |
tag_max_tokens = values['tag_max_tokens'] | |
seed = values['seed'] | |
steps = values['steps'] | |
sampler_name = values['sampler_name'] | |
scheduler = values['scheduler'] | |
guidance = values['guidance'] | |
lora_strength_model = values['lora_strength_model'] | |
lora_strength_clip = values['lora_strength_clip'] | |
lora_file = values['lora_file'] | |
# model_management.unload_all_models() | |
tag_image_width, tag_image_height = Image.open(tag_image).size | |
tag_image_aspect_ratio = tag_image_width / tag_image_height | |
final_height = final_width / tag_image_aspect_ratio | |
tag_image = LoadImage.load_image(tag_image)[0] | |
if tag_seed == 0: | |
random.seed(int(time.time())) | |
tag_seed = random.randint(0, 18446744073709551615) | |
print(tag_seed) | |
positive_prompt = LLaVA_OneVision_Run.run(tag_image, llava_model, tag_prompt, tag_max_tokens, True, tag_temp, tag_seed)[0] | |
positive_prompt = f"{additional_prompt} {positive_prompt}" | |
if seed == 0: | |
random.seed(int(time.time())) | |
seed = random.randint(0, 18446744073709551615) | |
print(seed) | |
unet_lora, clip_lora = LoraLoader.load_lora(unet, clip, lora_file, lora_strength_model, lora_strength_clip) | |
cond, pooled = clip_lora.encode_from_tokens(clip_lora.tokenize(positive_prompt), return_pooled=True) | |
cond = [[cond, {"pooled_output": pooled}]] | |
cond = FluxGuidance.append(cond, guidance)[0] | |
noise = RandomNoise.get_noise(seed)[0] | |
guider = BasicGuider.get_guider(unet_lora, cond)[0] | |
sampler = KSamplerSelect.get_sampler(sampler_name)[0] | |
sigmas = BasicScheduler.get_sigmas(unet_lora, scheduler, steps, 1.0)[0] | |
latent_image = EmptyLatentImage.generate(closestNumber(final_width, 16), closestNumber(final_height, 16))[0] | |
sample, sample_denoised = SamplerCustomAdvanced.sample(noise, guider, sampler, sigmas, latent_image) | |
decoded = VAEDecode.decode(vae, sample)[0].detach() | |
Image.fromarray(np.array(decoded*255, dtype=np.uint8)[0]).save("/content/onevision_flux.png") | |
result = "/content/onevision_flux.png" | |
response = None | |
try: | |
source_id = values['source_id'] | |
del values['source_id'] | |
source_channel = values['source_channel'] | |
del values['source_channel'] | |
job_id = values['job_id'] | |
del values['job_id'] | |
default_filename = os.path.basename(result) | |
files = {default_filename: open(result, "rb").read()} | |
payload = {"content": f"{json.dumps(values)} <@{source_id}>"} | |
response = requests.post( | |
f"https://discord.com/api/v9/channels/{source_channel}/messages", | |
data=payload, | |
headers={"authorization": f"Bot {discord_token}"}, | |
files=files | |
) | |
response.raise_for_status() | |
except Exception as e: | |
print(f"An unexpected error occurred: {e}") | |
finally: | |
if os.path.exists(result): | |
os.remove(result) | |
if response and response.status_code == 200: | |
try: | |
payload = {"jobId": job_id, "result": response.json()['attachments'][0]['url']} | |
requests.post(f"{web_uri}/api/notify", data=json.dumps(payload), headers={'Content-Type': 'application/json', "authorization": f"{web_token}"}) | |
except Exception as e: | |
print(f"An unexpected error occurred: {e}") | |
finally: | |
return {"result": response.json()['attachments'][0]['url']} | |
else: | |
return {"result": "ERROR"} | |
runpod.serverless.start({"handler": generate}) |