comfyui-api / app.py
Muhammad Waqas
Added: Generate image to video
c9dba17
raw
history blame
13.7 kB
import os
import io
import json
import base64
import random
import urllib.request
import urllib.parse
import websocket
import requests
import uuid
from dotenv import load_dotenv
from flask import Flask, request, jsonify, render_template, send_file, send_from_directory
from PIL import Image
from werkzeug.utils import secure_filename
import urllib.parse
import urllib.request
# Load environment variables from the .env file
load_dotenv()
# Initialize Flask app
app = Flask(__name__)
ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'webp'} # Define supported image types
# Set server and websocket addresses from environment variables
server_address = os.getenv("SERVER_ADDRESS")
ws_address = os.getenv("WS_ADDRESS")
# Generate a unique client ID
client_id = str(uuid.uuid4())
def allowed_file(filename):
"""Check if the uploaded file has an allowed extension."""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def save_base64_image(b64_string):
"""Decode a base64 string and save it as an image in the static folder."""
try:
# Handle Data URI scheme if present
if ',' in b64_string:
header, encoded = b64_string.split(',', 1)
ext = header.split('/')[1].split(';')[0] if '/' in header else 'png'
else:
encoded = b64_string
ext = 'png'
# Decode the image data
image_data = base64.b64decode(encoded)
# Generate a unique path for the image in the static folder
image_path = f"static/{uuid.uuid4()}.{ext}"
# Ensure directory exists
os.makedirs('static', exist_ok=True)
# Save the image
with open(image_path, 'wb') as f:
f.write(image_data)
print(f"Image saved at: {image_path}")
# Return the path and URL of the saved image
image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}"
return image_path, image_url
except Exception as e:
raise ValueError(f"Failed to save image: {e}")
def get_image(filename, subfolder, image_type, token):
url_values = {'filename': filename, 'subfolder': subfolder, 'type': image_type}
url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {token}")
try:
return urllib.request.urlopen(req).read()
except urllib.error.HTTPError as e:
print(f"HTTP Error: {e.code} - {e.reason}")
print(e.read())
raise
def get_images(ws, workflow, token):
prompt_id = queue_prompt(workflow, token)
output_images = {}
while True:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['node'] is None and data['prompt_id'] == prompt_id:
break # Execution is done
history = get_history(prompt_id, token)[prompt_id]
for node_id in history['outputs']:
node_output = history['outputs'][node_id]
images_output = []
if 'images' in node_output:
for image in node_output['images']:
image_data = get_image(image['filename'], image['subfolder'], image['type'], token)
images_output.append(image_data)
output_images[node_id] = images_output
return output_images
# Default route for home welcome
@app.route('/')
def home():
return render_template('home.html')
####################################################
# Generate text to image using FLUX1.DEV Checkpoint#
####################################################
# Generate image route
@app.route('/generate_image', methods=['POST'])
def generate_image():
data = request.json
# Extract the token from the request headers
token = request.headers.get('Authorization')
if token is None:
return jsonify({'error': 'No token provided'}), 400
if token.startswith("Bearer "):
token = token.split(" ")[1]
# Base64 decode the encoded token
# token = base64.b64decode(token).decode("utf-8")
if 'text_prompt' not in data:
return jsonify({'error': 'No text prompt provided'}), 400
text_prompt = data['text_prompt']
# Get the path to the current file's directory
current_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(current_dir, 'workflows/flux1_dev_checkpoint_workflow_api.json')
with open(file_path, 'r', encoding='utf-8') as file:
workflow_jsondata = file.read()
workflow = json.loads(workflow_jsondata)
workflow["6"]["inputs"]["text"] = text_prompt
# workflow["7"]["inputs"]["text"] = "text, watermark, low quality, extra hands, extra legs."
# seednum = random.randint(1, 9999999999999)
# workflow["3"]["inputs"]["seed"] = seednum
# Generate a random 15-digit seed as an integer
seednum = random.randint(100000000000000, 999999999999999)
workflow["31"]["inputs"]["seed"] = seednum
ws = websocket.WebSocket()
try:
ws.connect(f"{ws_address}?clientId={client_id}&token={token}", header=
{"Authorization": f"Bearer {token}"})
except websocket.WebSocketException as e:
return jsonify({'error': f'WebSocket connection failed: {str(e)}'}), 500
images = get_images(ws, workflow, token)
ws.close()
output_images_base64 = []
for node_id in images:
for image_data in images[node_id]:
image = Image.open(io.BytesIO(image_data))
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
output_images_base64.append(img_str)
return jsonify({'images': output_images_base64})
# Get image route
@app.route('/get_image/<filename>', methods=['GET'])
def get_image_file(filename):
return send_file(filename, mimetype='image/png')
# Route to serve images
@app.route('/static/<path:filename>')
def serve_static(filename):
return send_from_directory('static', filename)
def make_request(url, data=None, headers=None):
req = urllib.request.Request(url, data=data, headers=headers)
try:
with urllib.request.urlopen(req) as response:
response_body = response.read().decode() # Decode the response
# print(response_body)
return json.loads(response_body) # Convert to JSON if valid
except urllib.error.HTTPError as e:
print(f"HTTPError: {e.code}, {e.reason}")
print(e.read().decode()) # Print detailed error response
except urllib.error.URLError as e:
print(f"URLError: {e.reason}")
# Helper: Queue the prompt
def queue_prompt(workflow, token):
payload = {"prompt": workflow, "client_id": client_id}
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
response = make_request(f"{server_address}/prompt", data=json.dumps(payload).encode('utf-8'), headers=headers)
if not response or 'prompt_id' not in response:
raise ValueError("Failed to queue the prompt. Check the request or API response.")
return response['prompt_id']
def get_history(prompt_id, token):
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
return make_request(f"{server_address}/history/{prompt_id}", headers=headers)
def get_video_data(filename, subfolder, token):
"""
Retrieve a video from the server using filename, subfolder, and token.
"""
# Handle empty subfolder case gracefully
subfolder = subfolder or '' # Default to empty string if None
# Construct query parameters
# url_values = {
# 'filename': filename,
# 'subfolder': subfolder,
# 'type': 'video'
# }
# Construct query parameters
url_values = {
'filename': filename
}
# Build the URL with encoded query parameters
url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}"
print(f"Requesting URL: {url}")
# Prepare the request with authorization token
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {token}")
try:
# Fetch and return the video data
response = urllib.request.urlopen(req)
return response.read()
except urllib.error.HTTPError as e:
print(f"HTTP Error: {e.code} - {e.reason}")
print(e.read().decode()) # Decode error message for readability
raise
except urllib.error.URLError as e:
print(f"URL Error: {e.reason}")
raise
##################################################
# Generate image to video using CogVideoX-5B-12V #
##################################################
# Route: Image to Video
@app.route('/image_to_video', methods=['POST'])
def image_to_video():
data = request.json
# Extract and validate token
token = request.headers.get('Authorization')
if not token or not token.startswith("Bearer "):
return jsonify({'error': 'Valid Bearer token required'}), 400
token = token.split(" ")[1]
# Validate text prompt
text_prompt = data.get('text_prompt')
if not text_prompt:
return jsonify({'error': 'Text prompt is required'}), 400
# Handle uploaded image or base64 image
image_file = request.files.get('image')
base64_image = data.get('base64_image')
if image_file:
# Check if the file has an allowed extension
if not allowed_file(image_file.filename):
return jsonify({'error': 'Unsupported image format'}), 400
# Secure the filename
filename = secure_filename(image_file.filename)
# Generate a unique path for the image
unique_filename = f"{uuid.uuid4()}_{filename}"
image_path = os.path.join('static', unique_filename)
# Ensure the 'static' directory exists
os.makedirs('static', exist_ok=True)
# Save the image to the static directory
image_file.save(image_path)
# Construct the public URL to access the image
image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}"
elif base64_image:
# Save base64 image
try:
image_path, image_url = save_base64_image(base64_image)
# return jsonify({'image_url': image_url})
except Exception as e:
return jsonify({'error': f'Invalid base64 image data: {str(e)}'}), 400
else:
return jsonify({'error': 'Image is required (file or base64)'}), 400
# Load workflow configuration
current_dir = os.path.dirname(os.path.abspath(__file__))
workflow_path = os.path.join(current_dir, 'workflows/cogvideox_image_to_video_workflow_api.json')
with open(workflow_path, 'r', encoding='utf-8') as f:
workflow = json.load(f)
# Modify workflow with inputs
workflow["30"]["inputs"]["prompt"] = text_prompt
workflow["73"]["inputs"]["url"] = image_url
workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion"
workflow["57"]["inputs"]["seed"] = random.randint(100000000000000, 999999999999999)
# WebSocket connection to queue and monitor workflow
ws = websocket.WebSocket()
try:
ws.connect(f"{ws_address}?clientId={client_id}&token={token}", header={"Authorization": f"Bearer {token}"})
except websocket.WebSocketException as e:
return jsonify({'error': f'WebSocket connection failed: {str(e)}'}), 500
try:
prompt_id = queue_prompt(workflow, token)
except ValueError as e:
return jsonify({'error': str(e)}), 400
# Wait for workflow execution to complete
while True:
message = json.loads(ws.recv())
if message.get('type') == 'executing' and message['data']['node'] is None and message['data']['prompt_id'] == prompt_id:
break
# Fetch the complete history for the provided prompt_id
history = get_history(prompt_id, token).get(prompt_id, {})
print(f"History for prompt {prompt_id}: {history}")
video_data = None # Initialize video data
# Loop through history outputs to find video or gif data
for node_id, node_output in history.get('outputs', {}).items():
if 'gifs' in node_output:
video = node_output['gifs'][0] # Take the first GIF or video
try:
print(f"Fetching video: {video['filename']} from {video['subfolder']}")
video_data = get_video_data(video['filename'], video['subfolder'], token)
break # Stop after successfully fetching the first video or GIF
except Exception as e:
print(f"Failed to retrieve video: {str(e)}")
# Check if video data was retrieved successfully
if not video_data:
return jsonify({'error': 'Failed to generate video'}), 500
# Save the video locally
# local_video_path = f"/tmp/generated_video_{uuid.uuid4()}.mp4"
# with open(local_video_path, 'wb') as f:
# f.write(video_data)
# Return the video as an HTTP response
response = send_file(
io.BytesIO(video_data),
mimetype='video/mp4',
as_attachment=True,
download_name='generated_video.mp4'
)
# Delete the saved image after sending the response
if image_path and os.path.exists(image_path):
os.remove(image_path) # Remove the image file
return response
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True) # Removed 'debug=True'