Spaces:
Sleeping
Sleeping
File size: 3,557 Bytes
3b66cf0 9078b4e 3b66cf0 9ebf935 9078b4e 3b66cf0 9ebf935 3b66cf0 9ebf935 9078b4e 3b66cf0 de9278c 0f9fd16 2b15352 b2caf4f 3b66cf0 9ebf935 3b66cf0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
import websocket # websocket-client
import uuid
import json
import urllib.request
import urllib.parse
import random
from PIL import Image
import io
import base64
import io
import os
import gradio as gr
server_address = os.environ.get("URL_API")
json_data=os.environ.get("JSON_API")
client_id = str(uuid.uuid4())
def queue_prompt(prompt):
p = {"prompt": prompt, "client_id": client_id}
data = json.dumps(p, indent=4).encode('utf-8') # Prettify JSON for print
req = urllib.request.Request(f"http://{server_address}/prompt", data=data)
return json.loads(urllib.request.urlopen(req).read())
def get_image(filename, subfolder, folder_type):
data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
url_values = urllib.parse.urlencode(data)
with urllib.request.urlopen(f"http://{server_address}/view?{url_values}") as response:
return response.read()
def get_history(prompt_id):
with urllib.request.urlopen(f"http://{server_address}/history/{prompt_id}") as response:
return json.loads(response.read())
def get_images(ws,prompt,progress):
progress=gr.Progress(track_tqdm=True)
prompt_id = queue_prompt(prompt)['prompt_id']
output_images = {}
last_reported_percentage = 0
while True:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'progress':
data = message['data']
current_progress = data['value']
max_progress = data['max']
percentage = int((current_progress / max_progress) * 100)
if percentage >= last_reported_percentage + 10:
last_reported_percentage = percentage
progress(percentage/100)
elif message['type'] == 'executing':
data = message['data']
if data['node'] is None and data['prompt_id'] == prompt_id:
break # Execution is done
else:
continue # Previews are binary data
history = get_history(prompt_id)[prompt_id]
for o in history['outputs']:
for node_id in history['outputs']:
node_output = history['outputs'][node_id]
if 'images' in node_output:
images_output = []
for image in node_output['images']:
image_data = get_image(image['filename'], image['subfolder'], image['type'])
images_output.append(image_data)
output_images[node_id] = images_output
return output_images
def pil_to_base64(image):
buffer = io.BytesIO()
image.save(buffer, format="PNG")
base64_string=base64.b64encode(buffer.getvalue()).decode("utf-8")
return f"data:image/png;base64,{base64_string}"
def generate_images(positive_prompt,image,progress):
ws = websocket.WebSocket()
ws_url = f"ws://{server_address}/ws?clientId={client_id}"
ws.connect(ws_url)
data = json.loads(json_data)
data["49"]["inputs"]["text"] = positive_prompt
if image:
data["90"]["inputs"]["images"]["base64"] = [pil_to_base64(image)]
else:
data.pop("90", None)
data.pop("68", None)
data["62"]["inputs"]["images"] = ["61",0]
seed = random.randint(1, 1000000000)
data["47"]["inputs"]["noise_seed"] = seed
images = get_images(ws,data,progress)
ws.close()
for node_id in images:
for image_data in images[node_id]:
image = Image.open(io.BytesIO(image_data))
return image
|