comfyui-api / app.py
gosign's picture
Update app.py
640b59e verified
import os
import io
import json
import base64
import random
import urllib.request
import urllib.parse
import websocket
import uuid
from dotenv import load_dotenv
from flask import Flask, request, jsonify, render_template, send_file, send_from_directory
from PIL import Image
from werkzeug.utils import secure_filename
import urllib.parse
import urllib.request
import time
# Load environment variables from the .env file
load_dotenv()
# Initialize Flask app
app = Flask(__name__)
ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'webp'} # Define supported image types
# Set server and websocket addresses from environment variables
server_address = os.getenv("SERVER_ADDRESS")
ws_address = os.getenv("WS_ADDRESS")
# Generate a unique client ID
client_id = str(uuid.uuid4())
def allowed_file(filename):
"""Check if the uploaded file has an allowed extension."""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def save_base64_image(b64_string):
"""Decode a base64 string and save it as an image in the static folder."""
try:
# Handle Data URI scheme if present
if ',' in b64_string:
header, encoded = b64_string.split(',', 1)
ext = header.split('/')[1].split(';')[0] if '/' in header else 'png'
else:
encoded = b64_string
ext = 'png'
# Decode the image data
image_data = base64.b64decode(encoded)
# Generate a unique path for the image in the static folder
image_path = f"static/{uuid.uuid4()}.{ext}"
# Ensure directory exists
os.makedirs('static', exist_ok=True)
# Save the image
with open(image_path, 'wb') as f:
f.write(image_data)
print(f"Image saved at: {image_path}", flush=True)
# Return the path and URL of the saved image
image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}"
print(f"Image path (local): {image_path}", flush=True)
print(f"Image URL (public): {image_url}", flush=True)
return image_path, image_url
except Exception as e:
raise ValueError(f"Failed to save image: {e}")
def get_image(filename, subfolder, image_type, token):
url_values = {'filename': filename, 'subfolder': subfolder, 'type': image_type}
url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {token}")
try:
return urllib.request.urlopen(req).read()
except urllib.error.HTTPError as e:
print(f"HTTP Error: {e.code} - {e.reason}")
print(e.read())
raise
def get_images(ws, workflow, token):
prompt_id = queue_prompt(workflow, token)
output_images = {}
while True:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['node'] is None and data['prompt_id'] == prompt_id:
break # Execution is done
# Sleep for 3 seconds before the next iteration
time.sleep(3)
history = get_history(prompt_id, token)[prompt_id]
for node_id in history['outputs']:
node_output = history['outputs'][node_id]
images_output = []
if 'images' in node_output:
for image in node_output['images']:
image_data = get_image(image['filename'], image['subfolder'], image['type'], token)
images_output.append(image_data)
output_images[node_id] = images_output
return output_images
# Default route for home welcome
@app.route('/')
def home():
return render_template('home.html')
################################################
# Generate text to image using FLUX1.DEV Model #
################################################
# Generate image route
@app.route('/generate_image', methods=['POST'])
def generate_image():
data = request.json
# Extract the token from the request headers
token = request.headers.get('Authorization')
if token is None:
return jsonify({'error': 'No token provided'}), 400
if token.startswith("Bearer "):
token = token.split(" ")[1]
# Base64 decode the encoded token
# token = base64.b64decode(token).decode("utf-8")
if 'text_prompt' not in data:
return jsonify({'error': 'No text prompt provided'}), 400
text_prompt = data['text_prompt']
# Get the path to the current file's directory
current_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(current_dir, 'workflows/flux1_dev_checkpoint_workflow_api.json')
with open(file_path, 'r', encoding='utf-8') as file:
workflow_jsondata = file.read()
workflow = json.loads(workflow_jsondata)
workflow["6"]["inputs"]["text"] = text_prompt
# Generate a random 15-digit seed as an integer
seednum = random.randint(100000000000000, 999999999999999)
workflow["31"]["inputs"]["seed"] = seednum
ws = websocket.WebSocket()
try:
ws.connect(f"{ws_address}?clientId={client_id}&token={token}", header=
{"Authorization": f"Bearer {token}"})
except websocket.WebSocketException as e:
return jsonify({'error': f'WebSocket connection failed: {str(e)}'}), 500
images = get_images(ws, workflow, token)
ws.close()
output_images_base64 = []
for node_id in images:
for image_data in images[node_id]:
image = Image.open(io.BytesIO(image_data))
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
output_images_base64.append(img_str)
return jsonify({'images': output_images_base64})
###################################################
# Edit image with text prompt using OmniGen Model #
###################################################
# Route: OmniGen image to image
@app.route('/omnigen/image_to_image', methods=['POST'])
def omnigen_image_to_image():
data = request.json
# Extract and validate token
token = request.headers.get('Authorization')
if not token or not token.startswith("Bearer "):
return jsonify({'error': 'Valid Bearer token required'}), 400
token = token.split(" ")[1]
# Validate text prompt
text_prompt = data.get('text_prompt')
if not text_prompt or not text_prompt.strip():
return jsonify({'error': 'Text prompt is required'}), 400
steps = data.get('steps')
if not steps:
steps = 50
image_url = data.get('image_url')
if not image_url:
return jsonify({'error': 'image_url is required'}), 400
# Handle uploaded image or base64 image
image_file = request.files.get('image')
base64_image = data.get('base64_image')
image_path = None # Initialize image path
try:
if image_file:
# Check if the file has an allowed extension
if not allowed_file(image_file.filename):
return jsonify({'error': 'Unsupported image format'}), 400
# Secure the filename
filename = secure_filename(image_file.filename)
# Generate a unique path for the image
unique_filename = f"{uuid.uuid4()}_{filename}"
image_path = os.path.join('static', unique_filename)
# Ensure the 'static' directory exists
os.makedirs('static', exist_ok=True)
# Save the image to the static directory
image_file.save(image_path)
# Construct the public URL to access the image
image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}"
elif base64_image:
# Save base64 image
try:
pass
# image_path, image_url = save_base64_image(base64_image)
# image_url = "https://drive.google.com/uc?id=1JEHEy0zCVWOob4421hLQIPMbO_ebeCPS&export=download"
except Exception as e:
raise ValueError(f'Invalid base64 image data: {str(e)}')
else:
return jsonify({'error': 'Image is required (file or base64)'}), 400
# Load workflow configuration
current_dir = os.path.dirname(os.path.abspath(__file__))
workflow_path = os.path.join(current_dir, 'workflows/omnigen_image_to_image_workflow_api.json')
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow = json.load(f)
# Modify workflow with inputs
workflow["6"]["inputs"]["prompt"] = "in image_1 " + text_prompt
workflow["6"]["inputs"]["num_inference_steps"] = steps
workflow["12"]["inputs"]["url"] = image_url
# WebSocket connection to queue the prompt
ws = websocket.WebSocket()
ws.connect(f"{ws_address}?clientId={client_id}&token={token}",
header={"Authorization": f"Bearer {token}"})
images = get_images(ws, workflow, token)
ws.close()
output_images_base64 = []
for node_id in images:
for image_data in images[node_id]:
image = Image.open(io.BytesIO(image_data))
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
output_images_base64.append(img_str)
return jsonify({'images': output_images_base64}), 200
except Exception as e:
return jsonify({'message': 'Unable to connect to the server. Make sure the server is running', 'error': str(e)}), 500
finally:
pass
# Always delete the image if it was saved
if image_path and os.path.exists(image_path):
os.remove(image_path)
print(f"Deleted temporary image: {image_path}", flush=True)
# Get image route
@app.route('/get_image/<filename>', methods=['GET'])
def get_image_file(filename):
return send_file(filename, mimetype='image/png')
# Route to serve images
@app.route('/static/<path:filename>', methods=['GET'])
def serve_static(filename):
print(f"Request for static file: {filename}", flush=True)
return send_from_directory('static', filename)
# Make a request route
def make_request(url, data=None, headers=None):
req = urllib.request.Request(url, data=data, headers=headers)
try:
with urllib.request.urlopen(req) as response:
response_body = response.read().decode() # Decode the response
# print(response_body)
return json.loads(response_body) # Convert to JSON if valid
except urllib.error.HTTPError as e:
print(f"HTTPError: {e.code}, {e.reason}")
print(e.read().decode()) # Print detailed error response
except urllib.error.URLError as e:
print(f"URLError: {e.reason}")
# Helper: Queue the prompt
def queue_prompt(workflow, token):
payload = {"prompt": workflow, "client_id": client_id}
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
response = make_request(f"{server_address}/prompt", data=json.dumps(payload).encode('utf-8'), headers=headers)
if not response or 'prompt_id' not in response:
raise ValueError("Failed to queue the prompt. Check the request or API response.")
return response['prompt_id']
# Get ComfyUI prompt history
def get_history(prompt_id, token):
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
return make_request(f"{server_address}/history/{prompt_id}", headers=headers)
def get_video_data(filename, subfolder, token):
"""
Retrieve a video from the server using filename, subfolder, and token.
"""
# Handle empty subfolder case gracefully
subfolder = subfolder or '' # Default to empty string if None
# Construct query parameters
url_values = {
'filename': filename
}
# Build the URL with encoded query parameters
url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}"
print(f"Requesting URL: {url}", flush=True)
# Prepare the request with authorization token
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {token}")
try:
# Fetch and return the video data
response = urllib.request.urlopen(req)
return response.read()
except urllib.error.HTTPError as e:
print(f"HTTP Error: {e.code} - {e.reason}")
print(e.read().decode()) # Decode error message for readability
raise
except urllib.error.URLError as e:
print(f"URL Error: {e.reason}")
raise
########################################################
# Generate image to video using CogVideoX-5B-12V Model #
########################################################
# Route: Image to Video
@app.route('/v1/image_to_video', methods=['POST'])
def v1_image_to_video():
data = request.json
# Extract and validate token
token = request.headers.get('Authorization')
if not token or not token.startswith("Bearer "):
return jsonify({'error': 'Valid Bearer token required'}), 400
token = token.split(" ")[1]
# Validate text prompt
text_prompt = data.get('text_prompt')
frame_rate = data.get('frame_rate')
steps = data.get('steps')
if not text_prompt or not text_prompt.strip():
return jsonify({'error': 'Text prompt is required'}), 400
# Check if frame_rate is missing or invalid
if not frame_rate: # If frame_rate is None, empty, or 0
frame_rate = 24 # Default to 24 fps
else:
try:
frame_rate = int(frame_rate)
if frame_rate not in [8, 12, 24]: # Ensure it's one of the allowed values
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400
except ValueError:
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400
if not steps:
steps = 50
# Handle uploaded image or base64 image
image_file = request.files.get('image')
base64_image = data.get('base64_image')
image_path = None # Initialize image path
try:
if image_file:
# Check if the file has an allowed extension
if not allowed_file(image_file.filename):
return jsonify({'error': 'Unsupported image format'}), 400
# Secure the filename
filename = secure_filename(image_file.filename)
# Generate a unique path for the image
unique_filename = f"{uuid.uuid4()}_{filename}"
image_path = os.path.join('static', unique_filename)
# Ensure the 'static' directory exists
os.makedirs('static', exist_ok=True)
# Save the image to the static directory
image_file.save(image_path)
# Construct the public URL to access the image
image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}"
elif base64_image:
# Save base64 image
try:
image_path, image_url = save_base64_image(base64_image)
except Exception as e:
raise ValueError(f'Invalid base64 image data: {str(e)}')
else:
return jsonify({'error': 'Image is required (file or base64)'}), 400
# Load workflow configuration
current_dir = os.path.dirname(os.path.abspath(__file__))
workflow_path = os.path.join(current_dir, 'workflows/cogvideox_image_to_video_workflow_api.json')
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow = json.load(f)
# Modify workflow with inputs
workflow["30"]["inputs"]["prompt"] = text_prompt
workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion, blur"
workflow["44"]["inputs"]["frame_rate"] = frame_rate
workflow["57"]["inputs"]["steps"] = steps
workflow["73"]["inputs"]["url"] = image_url
# WebSocket connection to queue the prompt
ws = websocket.WebSocket()
ws.connect(f"{ws_address}?clientId={client_id}&token={token}",
header={"Authorization": f"Bearer {token}"})
# Queue the prompt
prompt_id = queue_prompt(workflow, token)
return jsonify({'prompt_id': prompt_id, 'message': 'Prompt queued successfully', 'get_video_url': f'https://gosign-de-comfyui-api.hf.space/v1/video_tasks/{prompt_id}'}), 202
except Exception as e:
return jsonify({'message': 'Unbale to connect to the server. Make sure the server is running', 'error': str(e)}), 500
finally:
pass
# Always delete the image if it was saved
# if image_path and os.path.exists(image_path):
# os.remove(image_path)
# print(f"Deleted temporary image: {image_path}", flush=True)
###################################################
# Generate text to video using CogVideoX-5B Model #
###################################################
# Route: Text to Video
@app.route('/v1/text_to_video', methods=['POST'])
def v1_text_to_video():
data = request.json
# Extract and validate token
token = request.headers.get('Authorization')
if not token or not token.startswith("Bearer "):
return jsonify({'error': 'Valid Bearer token required'}), 400
token = token.split(" ")[1]
# Validate text prompt
text_prompt = data.get('text_prompt')
frame_rate = data.get('frame_rate')
steps = data.get('steps')
if not text_prompt or not text_prompt.strip():
return jsonify({'error': 'Text prompt is required'}), 400
# Check if frame_rate is missing or invalid
if not frame_rate: # If frame_rate is None, empty, or 0
frame_rate = 24 # Default to 24 fps
else:
try:
frame_rate = int(frame_rate)
if frame_rate not in [8, 12, 24]: # Ensure it's one of the allowed values
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400
except ValueError:
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400
if not steps:
steps = 50
try:
# Load workflow configuration
current_dir = os.path.dirname(os.path.abspath(__file__))
workflow_path = os.path.join(current_dir, 'workflows/cogvideox_text_to_video_workflow_api.json')
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow = json.load(f)
# Modify workflow with inputs
workflow["30"]["inputs"]["prompt"] = text_prompt
workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion, blur"
workflow["33"]["inputs"]["frame_rate"] = frame_rate
workflow["34"]["inputs"]["steps"] = steps
# WebSocket connection to queue the prompt
ws = websocket.WebSocket()
ws.connect(f"{ws_address}?clientId={client_id}&token={token}",
header={"Authorization": f"Bearer {token}"})
# Queue the prompt
prompt_id = queue_prompt(workflow, token)
return jsonify({'prompt_id': prompt_id, 'message': 'Prompt queued successfully', 'get_video_url': f'https://gosign-de-comfyui-api.hf.space/v1/video_tasks/{prompt_id}'}), 202
except Exception as e:
return jsonify({'message': 'Unbale to connect to the server. Make sure the server is running', 'error': str(e)}), 500
# Get video_tasks route
@app.route('/v1/video_tasks/<prompt_id>', methods=['GET'])
def video_tasks(prompt_id):
token = request.headers.get('Authorization')
if not token or not token.startswith("Bearer "):
return jsonify({'error': 'Valid Bearer token required'}), 400
token = token.split(" ")[1]
try:
# Establish WebSocket connection to fetch real-time status
ws = websocket.WebSocket()
ws.connect(f"{ws_address}?clientId={client_id}&token={token}",
header={"Authorization": f"Bearer {token}"})
# Request current status of the specific prompt
ws.send(json.dumps({"type": "get_status", "prompt_id": prompt_id}))
response = json.loads(ws.recv())
# Extract the necessary fields for the specific prompt
queue_remaining = response.get('data', {}).get('status', {}).get('exec_info', {}).get('queue_remaining', 0)
# Now proceed to check if the prompt has completed successfully
history = get_history(prompt_id, token).get(prompt_id, {})
if not history:
return jsonify({
'message': 'Video is being generated.',
'status': 'pending',
'prompts_in_queue': queue_remaining
}), 202
video_data = None
# Extract video or GIF details
for node_id, node_output in history.get('outputs', {}).items():
if 'gifs' in node_output:
video = node_output['gifs'][0] # Take the first available GIF/video
try:
video_data = get_video_data(video['filename'], video['subfolder'], token)
break # Stop after fetching the first valid video
except Exception as e:
print(f"Failed to retrieve video: {str(e)}")
if not video_data:
return jsonify({'error': 'Failed to retrieve video data.'}), 500
# Save the video locally
# local_video_path = f"static/generated_image_to_video_{prompt_id}.mp4"
# with open(local_video_path, 'wb') as f:
# f.write(video_data)
# Send the video as an HTTP response
return send_file(
io.BytesIO(video_data),
mimetype='video/mp4',
as_attachment=True,
download_name='generated_video.mp4'
)
except Exception as e:
return jsonify({'error': str(e)}), 500
# Route: Image to Video old
@app.route('/image_to_video', methods=['POST'])
def image_to_video():
data = request.json
# Extract and validate token
token = request.headers.get('Authorization')
if not token or not token.startswith("Bearer "):
return jsonify({'error': 'Valid Bearer token required'}), 400
token = token.split(" ")[1]
# Validate text prompt
text_prompt = data.get('text_prompt')
frame_rate = data.get('frame_rate')
steps = data.get('steps')
if not text_prompt or not text_prompt.strip():
return jsonify({'error': 'Text prompt is required'}), 400
# Check if frame_rate is missing or invalid
if not frame_rate: # If frame_rate is None, empty, or 0
frame_rate = 24 # Default to 24 fps
else:
try:
frame_rate = int(frame_rate)
if frame_rate not in [8, 12, 24]:
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400
except ValueError:
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400
if not steps:
steps = 50
# Handle uploaded image or base64 image
image_file = request.files.get('image')
base64_image = data.get('base64_image')
image_path = None # Initialize image path
try:
if image_file:
# Check if the file has an allowed extension
if not allowed_file(image_file.filename):
return jsonify({'error': 'Unsupported image format'}), 400
# Secure the filename
filename = secure_filename(image_file.filename)
# Generate a unique path for the image
unique_filename = f"{uuid.uuid4()}_{filename}"
image_path = os.path.join('static', unique_filename)
# Ensure the 'static' directory exists
os.makedirs('static', exist_ok=True)
# Save the image to the static directory
image_file.save(image_path)
# Construct the public URL to access the image
image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}"
elif base64_image:
# Save base64 image
try:
image_path, image_url = save_base64_image(base64_image)
except Exception as e:
raise ValueError(f'Invalid base64 image data: {str(e)}')
else:
return jsonify({'error': 'Image is required (file or base64)'}), 400
# Load workflow configuration
current_dir = os.path.dirname(os.path.abspath(__file__))
workflow_path = os.path.join(current_dir, 'workflows/cogvideox_image_to_video_workflow_api.json')
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow = json.load(f)
# Modify workflow with inputs
workflow["30"]["inputs"]["prompt"] = text_prompt
workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion, blur"
workflow["44"]["inputs"]["frame_rate"] = frame_rate
workflow["57"]["inputs"]["steps"] = steps
workflow["73"]["inputs"]["url"] = image_url
# WebSocket connection to queue and monitor workflow
ws = websocket.WebSocket()
ws.connect(f"{ws_address}?clientId={client_id}&token={token}",
header={"Authorization": f"Bearer {token}"})
# Queue the prompt and wait for completion
prompt_id = queue_prompt(workflow, token)
# Wait for workflow execution to complete
last_ping = time.time()
PING_INTERVAL = 30
# Wait for workflow execution to complete
while True:
message = json.loads(ws.recv())
if message.get('type') == 'executing' and message['data']['node'] is None \
and message['data']['prompt_id'] == prompt_id:
break
# Send a ping if PING_INTERVAL has passed
if time.time() - last_ping > PING_INTERVAL:
ws.send('ping')
last_ping = time.time()
# Fetch the history of the workflow
history = get_history(prompt_id, token).get(prompt_id, {})
video_data = None
# Find the video or GIF data from the outputs
for node_id, node_output in history.get('outputs', {}).items():
if 'gifs' in node_output:
video = node_output['gifs'][0] # Take the first video/GIF
try:
video_data = get_video_data(video['filename'], video['subfolder'], token)
break # Stop after fetching the first valid video
except Exception as e:
print(f"Failed to retrieve video: {str(e)}")
# Ensure video data was retrieved
if not video_data:
raise ValueError('Failed to generate video')
# Save the video locally
local_video_path = f"static/generated_image_to_video_{prompt_id}.mp4"
with open(local_video_path, 'wb') as f:
f.write(video_data)
# Construct the public URL for the video
# video_url = f"https://gosign-de-comfyui-api.hf.space/{local_video_path}"
# Prepare the response with the video URL
# response_data = {
# 'video_url': video_url,
# 'message': 'Video generated successfully'
# }
# return jsonify(response_data), 200
# Send the video as an HTTP response
response = send_file(
io.BytesIO(video_data),
mimetype='video/mp4',
as_attachment=True,
download_name='generated_video.mp4'
)
return response
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
# Always delete the image if it was saved
if image_path and os.path.exists(image_path):
os.remove(image_path)
print(f"Deleted temporary image: {image_path}", flush=True)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)