|
import requests |
|
import random |
|
from PIL import Image, ImageOps, ImageTk |
|
from datetime import datetime |
|
import time |
|
from pathlib import Path |
|
import io |
|
import zipfile |
|
|
|
|
|
import base64 |
|
import re |
|
import json |
|
|
|
BASE_URL="https://api.novelai.net" |
|
|
|
def make_turbo_prompt(gen_request): |
|
lines = gen_request['prompt'] |
|
result = { |
|
"boys": False, |
|
"girls": False, |
|
"1girl": False, |
|
"1boy": False, |
|
"1other": False, |
|
"others": False |
|
} |
|
state = { |
|
"nude,": False, |
|
"pov,": False, |
|
"cum,": False, |
|
"after ": False, |
|
"pussy juice": False, |
|
"barefoot": False, |
|
"breasts": False, |
|
"ejaculation": False, |
|
} |
|
|
|
def insert_spaces(source_list, reference_list): |
|
modified_list = source_list.copy() |
|
for index, keyword in enumerate(reference_list): |
|
if keyword not in source_list: |
|
space_count = len(keyword) |
|
modified_list.insert(index, ' ' * space_count) |
|
return modified_list |
|
|
|
keywords = gen_request['prompt'].split(', ') |
|
filtered_keywords = [] |
|
removed_indices = [] |
|
positive0, positive1, positive2, positive3 = gen_request.copy(),gen_request.copy(),gen_request.copy(),gen_request.copy() |
|
|
|
for word in result.keys(): |
|
if word in lines: |
|
result[word] = True |
|
for word in state.keys(): |
|
if word in gen_request['prompt']: |
|
state[word] = True |
|
|
|
key_index = int((len(keywords)/2)-1) |
|
|
|
if(result["1boy"]) or (result["boys"]): |
|
if(result["1girl"]): |
|
if('sex,' in gen_request['prompt']): |
|
sex_pos_keywords = ['stomach bulge','insertion', 'fucked silly', 'x-ray', 'orgasm', 'cross-section', 'uterus', 'overflow', 'rape', 'vaginal', 'anal'] |
|
facial_keywords = ['tongue','ahegao'] |
|
temp_sex_pos = [] |
|
temp_facial = [] |
|
cum_events = [] |
|
explicit_check = [] |
|
if 'open mouth' in keywords: keywords.remove('open mouth') |
|
if 'closed mouth' in keywords: keywords.remove('closed mouth') |
|
if 'after rape' in keywords: |
|
keywords.remove('after rape') |
|
explicit_check.append('after rape') |
|
for keyword in keywords: |
|
if ('sex' not in keyword and 'cum' not in keyword and 'ejaculation' not in keyword and 'vaginal' not in keyword and 'penetration' not in keyword) and all(sex_pos not in keyword for sex_pos in sex_pos_keywords) and all(facial not in keyword for facial in facial_keywords): |
|
filtered_keywords.append(keyword) |
|
elif 'sex' in keyword: |
|
removed_indices.append(keyword) |
|
elif 'penetration' in keyword: |
|
removed_indices.append(keyword) |
|
elif 'cum' in keyword and keyword != 'cum': |
|
cum_events.append(keyword) |
|
elif any(sex_pos in keyword for sex_pos in sex_pos_keywords): |
|
for sex_pos in sex_pos_keywords: |
|
if sex_pos in keyword: |
|
temp_sex_pos.append(sex_pos) |
|
elif any(facial not in keyword for facial in facial_keywords): |
|
for facial in facial_keywords: |
|
if facial in keyword: |
|
temp_facial.append(facial) |
|
filtered_keywords.insert(int((len(filtered_keywords)/2)-1), ' no penetration, imminent penetration') |
|
filtered_keywords_positive0 = filtered_keywords.copy() |
|
filtered_keywords.remove(' no penetration, imminent penetration') |
|
|
|
for i, keyword in enumerate(filtered_keywords): |
|
if 'pantyhose' in keyword: |
|
filtered_keywords[i] = 'torn ' + filtered_keywords[i] |
|
|
|
key_index = int((len(filtered_keywords)/2)-1) |
|
if 'pussy' in filtered_keywords: key_index = filtered_keywords.index('pussy') |
|
if 'penis' in filtered_keywords: key_index = filtered_keywords.index('penis') |
|
filtered_keywords[key_index:key_index] = ['motion lines', 'surprised'] |
|
for keyword in removed_indices: |
|
if 'cum' not in keyword and 'ejaculation' not in keyword: |
|
filtered_keywords.insert(key_index,keyword) |
|
if(temp_sex_pos): filtered_keywords[key_index:key_index] = temp_sex_pos |
|
if('clothed sex' in filtered_keywords and not 'bottomless' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('clothed sex')+1, 'bottomless') |
|
pos1_copied_keywords = filtered_keywords.copy() |
|
for i, keyword in enumerate(pos1_copied_keywords): |
|
if 'closed eyes' in keyword: |
|
rand_num = random.randint(0,2) |
|
if(rand_num == 0): pos1_copied_keywords[i] = 'half-' + pos1_copied_keywords[i] |
|
elif(rand_num == 1 and 'closed eyes' in pos1_copied_keywords): |
|
pos1_copied_keywords.remove('closed eyes') |
|
filtered_keywords[i] = 'half-closed eyes' |
|
filtered_keywords_positive1 = pos1_copied_keywords.copy() |
|
|
|
key_index = filtered_keywords.index('surprised') |
|
filtered_keywords.remove('surprised') |
|
filtered_keywords[key_index:key_index] = ["ejaculation","cum"] |
|
for keyword in removed_indices: |
|
if 'cum' in keyword: |
|
filtered_keywords.insert(key_index,keyword) |
|
if(temp_facial): filtered_keywords[key_index:key_index] =temp_facial |
|
filtered_keywords_positive2 = filtered_keywords.copy() |
|
|
|
for i, keyword in enumerate(filtered_keywords): |
|
if 'closed eyes' in keyword: |
|
rand_num = random.randint(0,2) |
|
if(rand_num == 0 and filtered_keywords[i] != 'half-closed eyes'): filtered_keywords[i] = 'half-' + filtered_keywords[i] |
|
elif(rand_num == 1): filtered_keywords[i] = 'empty eyes' |
|
else: filtered_keywords[i] = 'empty eyes, half-closed eyes' |
|
if 'sex' in filtered_keywords: |
|
key_index = filtered_keywords.index('sex') |
|
elif 'group sex' in filtered_keywords: |
|
key_index = filtered_keywords.index('group sex') |
|
filtered_keywords.remove('ejaculation') |
|
filtered_keywords[key_index:key_index] = ['cum drip', 'erection'] + cum_events |
|
if(explicit_check): filtered_keywords[key_index:key_index] = explicit_check |
|
if 'sex' in filtered_keywords and 'group sex' not in filtered_keywords: |
|
if('pussy' in filtered_keywords and not 'anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after vaginal, spread pussy') |
|
elif('anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after anus, cum in ass') |
|
filtered_keywords.insert(filtered_keywords.index('sex'), 'after sex') |
|
filtered_keywords.remove('sex') |
|
elif 'group sex' in filtered_keywords: |
|
if('vaginal' in filtered_keywords and not 'anal' in filtered_keywords): |
|
filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after vaginal, spread pussy') |
|
if 'multiple penises' in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake') |
|
elif('anal' in filtered_keywords): |
|
filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after anus, cum in ass') |
|
if 'multiple penises' in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake') |
|
else: filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'cum on body, {bukkake}') |
|
temp_post_keyword = [] |
|
for keyword in sex_pos_keywords: |
|
if not (keyword == 'orgasm' or keyword == 'overflow'): |
|
if keyword in filtered_keywords: |
|
temp_post_keyword.append(keyword) |
|
for keyword in temp_post_keyword: |
|
filtered_keywords.remove(keyword) |
|
|
|
positive0['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive0, filtered_keywords)).strip() |
|
positive1['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive1, filtered_keywords)).strip() |
|
positive2['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive2, filtered_keywords)).strip() |
|
positive3['prompt'] = ', '.join(filtered_keywords).strip() |
|
positive0["type"] = "turbo" |
|
positive1["type"] = "turbo" |
|
positive2["type"] = "turbo" |
|
positive3["type"] = "turbo" |
|
return positive0, positive1, positive2, positive3 |
|
|
|
def generate_image_NAI(access_token, prompt, model, action, parameters): |
|
data = { |
|
"input": prompt, |
|
"model": model, |
|
"action": action, |
|
"parameters": parameters, |
|
} |
|
|
|
response = requests.post(f"{BASE_URL}/ai/generate-image", json=data, headers={ "Authorization": f"Bearer {access_token}" }) |
|
|
|
return response.content |
|
|
|
|
|
def convert_prompt(prompt): |
|
return (prompt.replace('(','\\(').replace(')','\\)') |
|
.replace('{{', '(').replace('}}',')').replace('{', '(').replace('}', ')') |
|
.replace('[[', '[').replace(']]', ']')) |
|
def generate_image_webui(access_token, prompt, model, action, parameters): |
|
samplers = { |
|
"k_euler": "Euler", |
|
"k_euler_ancestral": "Euler a", |
|
"k_dpmpp_2s_ancestral": "DPM++ 2S a", |
|
"k_dpmpp_sde": "DPM++ SDE" |
|
} |
|
|
|
|
|
data = { |
|
"input": prompt, |
|
"model": model, |
|
"action": action, |
|
"parameters": parameters, |
|
} |
|
|
|
params = { |
|
"prompt": convert_prompt(data['input']), |
|
"negative_prompt": convert_prompt(data['parameters']['negative_prompt']), |
|
"steps": data['parameters']['steps'], |
|
"width": data['parameters']['width'], |
|
"height": data['parameters']['height'], |
|
"cfg_scale": data['parameters']['scale'], |
|
"sampler_index": samplers[data['parameters']['sampler']], |
|
"seed": data['parameters']['seed'], |
|
"seed_resize_from_h": -1, |
|
"seed_resize_from_w": -1, |
|
"denoising_strength": None, |
|
"n_iter": "1", |
|
"batch_size": data['parameters']['n_samples'] |
|
} |
|
|
|
if data['parameters']['enable_hr'] == True: |
|
params['enable_hr'] = True |
|
params["hr_upscaler"] = data['parameters']["hr_upscaler"] |
|
params["hr_scale"] = data['parameters']["hr_scale"] |
|
params["hr_second_pass_steps"] = data['parameters']["hr_second_pass_steps"] |
|
params["denoising_strength"] = data['parameters']["denoising_strength"] |
|
|
|
res = requests.post(f"{access_token}/sdapi/v1/txt2img", json=params) |
|
imageb64s = res.json()['images'] |
|
content = None |
|
for b64 in imageb64s: |
|
img = b64.encode() |
|
content = base64.b64decode(img) |
|
|
|
s = io.BytesIO() |
|
zf = zipfile.ZipFile(s, "w") |
|
zf.writestr("generated", content) |
|
zf.close() |
|
return s.getvalue() |
|
|
|
|
|
def generate_image(access_token, prompt, model, action, parameters): |
|
if re.match(r'^http[s]?://', access_token): |
|
return generate_image_webui(**locals()) |
|
return generate_image_NAI(**locals()) |
|
|
|
def generate(gen_request): |
|
|
|
params = { |
|
"legacy": False, |
|
"quality_toggle": True if gen_request["quality_toggle"] == 1 else False, |
|
"width": gen_request["width"], |
|
"height": gen_request["height"], |
|
"n_samples": 1, |
|
"seed": gen_request["seed"], |
|
"extra_noise_seed": random.randint(0,9999999999), |
|
"sampler": gen_request["sampler"], |
|
"steps": 28 if (gen_request["type"]!="upper" or "steps" not in gen_request) else gen_request["steps"], |
|
"scale": gen_request["scale"], |
|
"uncond_scale": 1.0, |
|
"negative_prompt": gen_request["negative"], |
|
"sm" : gen_request["sema"], |
|
"sm_dyn" : gen_request["sema_dyn"], |
|
"decrisper": False, |
|
"controlnet_strength": 1.0, |
|
"add_original_image": False, |
|
"cfg_rescale": gen_request["cfg_rescale"], |
|
"noise_schedule": "native", |
|
"enable_hr" : gen_request["enable_hr"] |
|
} |
|
|
|
if params["enable_hr"] == True: |
|
params["hr_upscaler"] = gen_request["hr_upscaler"] |
|
params["hr_scale"] = gen_request["hr_scale"] |
|
params["hr_second_pass_steps"] = gen_request["hr_second_pass_steps"] |
|
params["denoising_strength"] = gen_request["denoising_strength"] |
|
|
|
|
|
positive = gen_request["prompt"] |
|
|
|
filename_rule = gen_request["png_rule"] |
|
save_folder = gen_request["save_folder"] |
|
|
|
access_token = gen_request["access_token"] |
|
additional_folder = "" |
|
|
|
def resize_and_fill(image, max_size=None): |
|
if max_size is None: |
|
max_size = gen_request["user_screen_size"] |
|
original_width, original_height = image.size |
|
if original_width > max_size or original_height > max_size: |
|
|
|
image.thumbnail((max_size, max_size)) |
|
|
|
|
|
width, height = image.size |
|
new_image = Image.new("RGB", (max_size, max_size), "black") |
|
new_image.paste(image, ((max_size - width) // 2, (max_size - height) // 2)) |
|
return new_image |
|
else: |
|
return image |
|
|
|
def log_error(e, output_file_path="output_file_path"): |
|
|
|
current_time = datetime.now().strftime("%m/%d %H:%M:%S") |
|
|
|
|
|
error_message = f"#### Error occured at {current_time} ####\nError: {e}\n############################################\n" |
|
|
|
|
|
with open(f"error_log.txt", "a") as file: |
|
file.write(error_message) |
|
|
|
try: |
|
zipped_bytes = generate_image(access_token, positive, "nai-diffusion-3", "generate", params) |
|
if gen_request["png_rule"] == "count": |
|
additional_folder = "/" + gen_request["start_time"] |
|
if gen_request["type"] == "turbo": |
|
additional_folder += "/turbo" |
|
d = Path(save_folder + additional_folder) |
|
d.mkdir(parents=True, exist_ok=True) |
|
zipped = zipfile.ZipFile(io.BytesIO(zipped_bytes)) |
|
image_bytes = zipped.read(zipped.infolist()[0]) |
|
if gen_request["png_rule"] == "count": |
|
_count = gen_request["count"] |
|
filename = (d / f"{_count:05}.png" ) |
|
else: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" ) |
|
filename.write_bytes(image_bytes) |
|
i = Image.open(io.BytesIO(image_bytes)) |
|
i = ImageOps.exif_transpose(i).convert("RGB") |
|
i_resized = resize_and_fill(i) |
|
|
|
return i_resized, positive, params['seed'], i.info, str(filename) |
|
except Exception as e: |
|
try: |
|
if zipped_bytes is None: |
|
raise ValueError("Connection broken (Protocol Error)") |
|
error_message = zipped_bytes.decode('utf-8')[2:-2] |
|
except Exception as inner_exception: |
|
error_message = str(inner_exception) |
|
log_error(error_message, "path_to_output_folder") |
|
return None, error_message, params['seed'], None, None |
|
|