Spaces:
Sleeping
Sleeping
import os | |
import io | |
import json | |
import base64 | |
import random | |
import urllib.request | |
import urllib.parse | |
import websocket | |
import uuid | |
from dotenv import load_dotenv | |
from flask import Flask, request, jsonify, render_template, send_file, send_from_directory | |
from PIL import Image | |
from werkzeug.utils import secure_filename | |
import urllib.parse | |
import urllib.request | |
import time | |
# Load environment variables from the .env file | |
load_dotenv() | |
# Initialize Flask app | |
app = Flask(__name__) | |
ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'webp'} # Define supported image types | |
# Set server and websocket addresses from environment variables | |
server_address = os.getenv("SERVER_ADDRESS") | |
ws_address = os.getenv("WS_ADDRESS") | |
# Generate a unique client ID | |
client_id = str(uuid.uuid4()) | |
def allowed_file(filename): | |
"""Check if the uploaded file has an allowed extension.""" | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def save_base64_image(b64_string): | |
"""Decode a base64 string and save it as an image in the static folder.""" | |
try: | |
# Handle Data URI scheme if present | |
if ',' in b64_string: | |
header, encoded = b64_string.split(',', 1) | |
ext = header.split('/')[1].split(';')[0] if '/' in header else 'png' | |
else: | |
encoded = b64_string | |
ext = 'png' | |
# Decode the image data | |
image_data = base64.b64decode(encoded) | |
# Generate a unique path for the image in the static folder | |
image_path = f"static/{uuid.uuid4()}.{ext}" | |
# Ensure directory exists | |
os.makedirs('static', exist_ok=True) | |
# Save the image | |
with open(image_path, 'wb') as f: | |
f.write(image_data) | |
print(f"Image saved at: {image_path}", flush=True) | |
# Return the path and URL of the saved image | |
image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}" | |
print(f"Image path (local): {image_path}", flush=True) | |
print(f"Image URL (public): {image_url}", flush=True) | |
return image_path, image_url | |
except Exception as e: | |
raise ValueError(f"Failed to save image: {e}") | |
def get_image(filename, subfolder, image_type, token): | |
url_values = {'filename': filename, 'subfolder': subfolder, 'type': image_type} | |
url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}" | |
req = urllib.request.Request(url) | |
req.add_header("Authorization", f"Bearer {token}") | |
try: | |
return urllib.request.urlopen(req).read() | |
except urllib.error.HTTPError as e: | |
print(f"HTTP Error: {e.code} - {e.reason}") | |
print(e.read()) | |
raise | |
def get_images(ws, workflow, token): | |
prompt_id = queue_prompt(workflow, token) | |
output_images = {} | |
while True: | |
out = ws.recv() | |
if isinstance(out, str): | |
message = json.loads(out) | |
if message['type'] == 'executing': | |
data = message['data'] | |
if data['node'] is None and data['prompt_id'] == prompt_id: | |
break # Execution is done | |
# Sleep for 3 seconds before the next iteration | |
time.sleep(3) | |
history = get_history(prompt_id, token)[prompt_id] | |
for node_id in history['outputs']: | |
node_output = history['outputs'][node_id] | |
images_output = [] | |
if 'images' in node_output: | |
for image in node_output['images']: | |
image_data = get_image(image['filename'], image['subfolder'], image['type'], token) | |
images_output.append(image_data) | |
output_images[node_id] = images_output | |
return output_images | |
# Default route for home welcome | |
def home(): | |
return render_template('home.html') | |
################################################ | |
# Generate text to image using FLUX1.DEV Model # | |
################################################ | |
# Generate image route | |
def generate_image(): | |
data = request.json | |
# Extract the token from the request headers | |
token = request.headers.get('Authorization') | |
if token is None: | |
return jsonify({'error': 'No token provided'}), 400 | |
if token.startswith("Bearer "): | |
token = token.split(" ")[1] | |
# Base64 decode the encoded token | |
# token = base64.b64decode(token).decode("utf-8") | |
if 'text_prompt' not in data: | |
return jsonify({'error': 'No text prompt provided'}), 400 | |
text_prompt = data['text_prompt'] | |
# Get the path to the current file's directory | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
file_path = os.path.join(current_dir, 'workflows/flux1_dev_checkpoint_workflow_api.json') | |
with open(file_path, 'r', encoding='utf-8') as file: | |
workflow_jsondata = file.read() | |
workflow = json.loads(workflow_jsondata) | |
workflow["6"]["inputs"]["text"] = text_prompt | |
# Generate a random 15-digit seed as an integer | |
seednum = random.randint(100000000000000, 999999999999999) | |
workflow["31"]["inputs"]["seed"] = seednum | |
ws = websocket.WebSocket() | |
try: | |
ws.connect(f"{ws_address}?clientId={client_id}&token={token}", header= | |
{"Authorization": f"Bearer {token}"}) | |
except websocket.WebSocketException as e: | |
return jsonify({'error': f'WebSocket connection failed: {str(e)}'}), 500 | |
images = get_images(ws, workflow, token) | |
ws.close() | |
output_images_base64 = [] | |
for node_id in images: | |
for image_data in images[node_id]: | |
image = Image.open(io.BytesIO(image_data)) | |
buffered = io.BytesIO() | |
image.save(buffered, format="PNG") | |
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
output_images_base64.append(img_str) | |
return jsonify({'images': output_images_base64}) | |
################################################### | |
# Edit image with text prompt using OmniGen Model # | |
################################################### | |
# Route: OmniGen image to image | |
def omnigen_image_to_image(): | |
data = request.json | |
# Extract and validate token | |
token = request.headers.get('Authorization') | |
if not token or not token.startswith("Bearer "): | |
return jsonify({'error': 'Valid Bearer token required'}), 400 | |
token = token.split(" ")[1] | |
# Validate text prompt | |
text_prompt = data.get('text_prompt') | |
if not text_prompt or not text_prompt.strip(): | |
return jsonify({'error': 'Text prompt is required'}), 400 | |
steps = data.get('steps') | |
if not steps: | |
steps = 50 | |
image_url = data.get('image_url') | |
if not image_url: | |
return jsonify({'error': 'image_url is required'}), 400 | |
# Handle uploaded image or base64 image | |
image_file = request.files.get('image') | |
base64_image = data.get('base64_image') | |
image_path = None # Initialize image path | |
try: | |
if image_file: | |
# Check if the file has an allowed extension | |
if not allowed_file(image_file.filename): | |
return jsonify({'error': 'Unsupported image format'}), 400 | |
# Secure the filename | |
filename = secure_filename(image_file.filename) | |
# Generate a unique path for the image | |
unique_filename = f"{uuid.uuid4()}_{filename}" | |
image_path = os.path.join('static', unique_filename) | |
# Ensure the 'static' directory exists | |
os.makedirs('static', exist_ok=True) | |
# Save the image to the static directory | |
image_file.save(image_path) | |
# Construct the public URL to access the image | |
image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}" | |
elif base64_image: | |
# Save base64 image | |
try: | |
pass | |
# image_path, image_url = save_base64_image(base64_image) | |
# image_url = "https://drive.google.com/uc?id=1JEHEy0zCVWOob4421hLQIPMbO_ebeCPS&export=download" | |
except Exception as e: | |
raise ValueError(f'Invalid base64 image data: {str(e)}') | |
else: | |
return jsonify({'error': 'Image is required (file or base64)'}), 400 | |
# Load workflow configuration | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
workflow_path = os.path.join(current_dir, 'workflows/omnigen_image_to_image_workflow_api.json') | |
with open(workflow_path, 'r', encoding='utf-8') as f: | |
workflow = json.load(f) | |
# Modify workflow with inputs | |
workflow["6"]["inputs"]["prompt"] = "in image_1 " + text_prompt | |
workflow["6"]["inputs"]["num_inference_steps"] = steps | |
workflow["12"]["inputs"]["url"] = image_url | |
# WebSocket connection to queue the prompt | |
ws = websocket.WebSocket() | |
ws.connect(f"{ws_address}?clientId={client_id}&token={token}", | |
header={"Authorization": f"Bearer {token}"}) | |
images = get_images(ws, workflow, token) | |
ws.close() | |
output_images_base64 = [] | |
for node_id in images: | |
for image_data in images[node_id]: | |
image = Image.open(io.BytesIO(image_data)) | |
buffered = io.BytesIO() | |
image.save(buffered, format="PNG") | |
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
output_images_base64.append(img_str) | |
return jsonify({'images': output_images_base64}), 200 | |
except Exception as e: | |
return jsonify({'message': 'Unable to connect to the server. Make sure the server is running', 'error': str(e)}), 500 | |
finally: | |
pass | |
# Always delete the image if it was saved | |
if image_path and os.path.exists(image_path): | |
os.remove(image_path) | |
print(f"Deleted temporary image: {image_path}", flush=True) | |
# Get image route | |
def get_image_file(filename): | |
return send_file(filename, mimetype='image/png') | |
# Route to serve images | |
def serve_static(filename): | |
print(f"Request for static file: {filename}", flush=True) | |
return send_from_directory('static', filename) | |
# Make a request route | |
def make_request(url, data=None, headers=None): | |
req = urllib.request.Request(url, data=data, headers=headers) | |
try: | |
with urllib.request.urlopen(req) as response: | |
response_body = response.read().decode() # Decode the response | |
# print(response_body) | |
return json.loads(response_body) # Convert to JSON if valid | |
except urllib.error.HTTPError as e: | |
print(f"HTTPError: {e.code}, {e.reason}") | |
print(e.read().decode()) # Print detailed error response | |
except urllib.error.URLError as e: | |
print(f"URLError: {e.reason}") | |
# Helper: Queue the prompt | |
def queue_prompt(workflow, token): | |
payload = {"prompt": workflow, "client_id": client_id} | |
headers = { | |
'Authorization': f'Bearer {token}', | |
'Content-Type': 'application/json' | |
} | |
response = make_request(f"{server_address}/prompt", data=json.dumps(payload).encode('utf-8'), headers=headers) | |
if not response or 'prompt_id' not in response: | |
raise ValueError("Failed to queue the prompt. Check the request or API response.") | |
return response['prompt_id'] | |
# Get ComfyUI prompt history | |
def get_history(prompt_id, token): | |
headers = { | |
'Authorization': f'Bearer {token}', | |
'Content-Type': 'application/json' | |
} | |
return make_request(f"{server_address}/history/{prompt_id}", headers=headers) | |
def get_video_data(filename, subfolder, token): | |
""" | |
Retrieve a video from the server using filename, subfolder, and token. | |
""" | |
# Handle empty subfolder case gracefully | |
subfolder = subfolder or '' # Default to empty string if None | |
# Construct query parameters | |
url_values = { | |
'filename': filename | |
} | |
# Build the URL with encoded query parameters | |
url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}" | |
print(f"Requesting URL: {url}", flush=True) | |
# Prepare the request with authorization token | |
req = urllib.request.Request(url) | |
req.add_header("Authorization", f"Bearer {token}") | |
try: | |
# Fetch and return the video data | |
response = urllib.request.urlopen(req) | |
return response.read() | |
except urllib.error.HTTPError as e: | |
print(f"HTTP Error: {e.code} - {e.reason}") | |
print(e.read().decode()) # Decode error message for readability | |
raise | |
except urllib.error.URLError as e: | |
print(f"URL Error: {e.reason}") | |
raise | |
######################################################## | |
# Generate image to video using CogVideoX-5B-12V Model # | |
######################################################## | |
# Route: Image to Video | |
def v1_image_to_video(): | |
data = request.json | |
# Extract and validate token | |
token = request.headers.get('Authorization') | |
if not token or not token.startswith("Bearer "): | |
return jsonify({'error': 'Valid Bearer token required'}), 400 | |
token = token.split(" ")[1] | |
# Validate text prompt | |
text_prompt = data.get('text_prompt') | |
frame_rate = data.get('frame_rate') | |
steps = data.get('steps') | |
if not text_prompt or not text_prompt.strip(): | |
return jsonify({'error': 'Text prompt is required'}), 400 | |
# Check if frame_rate is missing or invalid | |
if not frame_rate: # If frame_rate is None, empty, or 0 | |
frame_rate = 24 # Default to 24 fps | |
else: | |
try: | |
frame_rate = int(frame_rate) | |
if frame_rate not in [8, 12, 24]: # Ensure it's one of the allowed values | |
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
except ValueError: | |
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
if not steps: | |
steps = 50 | |
# Handle uploaded image or base64 image | |
image_file = request.files.get('image') | |
base64_image = data.get('base64_image') | |
image_path = None # Initialize image path | |
try: | |
if image_file: | |
# Check if the file has an allowed extension | |
if not allowed_file(image_file.filename): | |
return jsonify({'error': 'Unsupported image format'}), 400 | |
# Secure the filename | |
filename = secure_filename(image_file.filename) | |
# Generate a unique path for the image | |
unique_filename = f"{uuid.uuid4()}_{filename}" | |
image_path = os.path.join('static', unique_filename) | |
# Ensure the 'static' directory exists | |
os.makedirs('static', exist_ok=True) | |
# Save the image to the static directory | |
image_file.save(image_path) | |
# Construct the public URL to access the image | |
image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}" | |
elif base64_image: | |
# Save base64 image | |
try: | |
image_path, image_url = save_base64_image(base64_image) | |
except Exception as e: | |
raise ValueError(f'Invalid base64 image data: {str(e)}') | |
else: | |
return jsonify({'error': 'Image is required (file or base64)'}), 400 | |
# Load workflow configuration | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
workflow_path = os.path.join(current_dir, 'workflows/cogvideox_image_to_video_workflow_api.json') | |
with open(workflow_path, 'r', encoding='utf-8') as f: | |
workflow = json.load(f) | |
# Modify workflow with inputs | |
workflow["30"]["inputs"]["prompt"] = text_prompt | |
workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion, blur" | |
workflow["44"]["inputs"]["frame_rate"] = frame_rate | |
workflow["57"]["inputs"]["steps"] = steps | |
workflow["73"]["inputs"]["url"] = image_url | |
# WebSocket connection to queue the prompt | |
ws = websocket.WebSocket() | |
ws.connect(f"{ws_address}?clientId={client_id}&token={token}", | |
header={"Authorization": f"Bearer {token}"}) | |
# Queue the prompt | |
prompt_id = queue_prompt(workflow, token) | |
return jsonify({'prompt_id': prompt_id, 'message': 'Prompt queued successfully', 'get_video_url': f'https://gosign-de-comfyui-api.hf.space/v1/video_tasks/{prompt_id}'}), 202 | |
except Exception as e: | |
return jsonify({'message': 'Unbale to connect to the server. Make sure the server is running', 'error': str(e)}), 500 | |
finally: | |
pass | |
# Always delete the image if it was saved | |
# if image_path and os.path.exists(image_path): | |
# os.remove(image_path) | |
# print(f"Deleted temporary image: {image_path}", flush=True) | |
################################################### | |
# Generate text to video using CogVideoX-5B Model # | |
################################################### | |
# Route: Text to Video | |
def v1_text_to_video(): | |
data = request.json | |
# Extract and validate token | |
token = request.headers.get('Authorization') | |
if not token or not token.startswith("Bearer "): | |
return jsonify({'error': 'Valid Bearer token required'}), 400 | |
token = token.split(" ")[1] | |
# Validate text prompt | |
text_prompt = data.get('text_prompt') | |
frame_rate = data.get('frame_rate') | |
steps = data.get('steps') | |
if not text_prompt or not text_prompt.strip(): | |
return jsonify({'error': 'Text prompt is required'}), 400 | |
# Check if frame_rate is missing or invalid | |
if not frame_rate: # If frame_rate is None, empty, or 0 | |
frame_rate = 24 # Default to 24 fps | |
else: | |
try: | |
frame_rate = int(frame_rate) | |
if frame_rate not in [8, 12, 24]: # Ensure it's one of the allowed values | |
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
except ValueError: | |
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
if not steps: | |
steps = 50 | |
try: | |
# Load workflow configuration | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
workflow_path = os.path.join(current_dir, 'workflows/cogvideox_text_to_video_workflow_api.json') | |
with open(workflow_path, 'r', encoding='utf-8') as f: | |
workflow = json.load(f) | |
# Modify workflow with inputs | |
workflow["30"]["inputs"]["prompt"] = text_prompt | |
workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion, blur" | |
workflow["33"]["inputs"]["frame_rate"] = frame_rate | |
workflow["34"]["inputs"]["steps"] = steps | |
# WebSocket connection to queue the prompt | |
ws = websocket.WebSocket() | |
ws.connect(f"{ws_address}?clientId={client_id}&token={token}", | |
header={"Authorization": f"Bearer {token}"}) | |
# Queue the prompt | |
prompt_id = queue_prompt(workflow, token) | |
return jsonify({'prompt_id': prompt_id, 'message': 'Prompt queued successfully', 'get_video_url': f'https://gosign-de-comfyui-api.hf.space/v1/video_tasks/{prompt_id}'}), 202 | |
except Exception as e: | |
return jsonify({'message': 'Unbale to connect to the server. Make sure the server is running', 'error': str(e)}), 500 | |
# Get video_tasks route | |
def video_tasks(prompt_id): | |
token = request.headers.get('Authorization') | |
if not token or not token.startswith("Bearer "): | |
return jsonify({'error': 'Valid Bearer token required'}), 400 | |
token = token.split(" ")[1] | |
try: | |
# Establish WebSocket connection to fetch real-time status | |
ws = websocket.WebSocket() | |
ws.connect(f"{ws_address}?clientId={client_id}&token={token}", | |
header={"Authorization": f"Bearer {token}"}) | |
# Request current status of the specific prompt | |
ws.send(json.dumps({"type": "get_status", "prompt_id": prompt_id})) | |
response = json.loads(ws.recv()) | |
# Extract the necessary fields for the specific prompt | |
queue_remaining = response.get('data', {}).get('status', {}).get('exec_info', {}).get('queue_remaining', 0) | |
# Now proceed to check if the prompt has completed successfully | |
history = get_history(prompt_id, token).get(prompt_id, {}) | |
if not history: | |
return jsonify({ | |
'message': 'Video is being generated.', | |
'status': 'pending', | |
'prompts_in_queue': queue_remaining | |
}), 202 | |
video_data = None | |
# Extract video or GIF details | |
for node_id, node_output in history.get('outputs', {}).items(): | |
if 'gifs' in node_output: | |
video = node_output['gifs'][0] # Take the first available GIF/video | |
try: | |
video_data = get_video_data(video['filename'], video['subfolder'], token) | |
break # Stop after fetching the first valid video | |
except Exception as e: | |
print(f"Failed to retrieve video: {str(e)}") | |
if not video_data: | |
return jsonify({'error': 'Failed to retrieve video data.'}), 500 | |
# Save the video locally | |
# local_video_path = f"static/generated_image_to_video_{prompt_id}.mp4" | |
# with open(local_video_path, 'wb') as f: | |
# f.write(video_data) | |
# Send the video as an HTTP response | |
return send_file( | |
io.BytesIO(video_data), | |
mimetype='video/mp4', | |
as_attachment=True, | |
download_name='generated_video.mp4' | |
) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
# Route: Image to Video old | |
def image_to_video(): | |
data = request.json | |
# Extract and validate token | |
token = request.headers.get('Authorization') | |
if not token or not token.startswith("Bearer "): | |
return jsonify({'error': 'Valid Bearer token required'}), 400 | |
token = token.split(" ")[1] | |
# Validate text prompt | |
text_prompt = data.get('text_prompt') | |
frame_rate = data.get('frame_rate') | |
steps = data.get('steps') | |
if not text_prompt or not text_prompt.strip(): | |
return jsonify({'error': 'Text prompt is required'}), 400 | |
# Check if frame_rate is missing or invalid | |
if not frame_rate: # If frame_rate is None, empty, or 0 | |
frame_rate = 24 # Default to 24 fps | |
else: | |
try: | |
frame_rate = int(frame_rate) | |
if frame_rate not in [8, 12, 24]: | |
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
except ValueError: | |
return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
if not steps: | |
steps = 50 | |
# Handle uploaded image or base64 image | |
image_file = request.files.get('image') | |
base64_image = data.get('base64_image') | |
image_path = None # Initialize image path | |
try: | |
if image_file: | |
# Check if the file has an allowed extension | |
if not allowed_file(image_file.filename): | |
return jsonify({'error': 'Unsupported image format'}), 400 | |
# Secure the filename | |
filename = secure_filename(image_file.filename) | |
# Generate a unique path for the image | |
unique_filename = f"{uuid.uuid4()}_{filename}" | |
image_path = os.path.join('static', unique_filename) | |
# Ensure the 'static' directory exists | |
os.makedirs('static', exist_ok=True) | |
# Save the image to the static directory | |
image_file.save(image_path) | |
# Construct the public URL to access the image | |
image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}" | |
elif base64_image: | |
# Save base64 image | |
try: | |
image_path, image_url = save_base64_image(base64_image) | |
except Exception as e: | |
raise ValueError(f'Invalid base64 image data: {str(e)}') | |
else: | |
return jsonify({'error': 'Image is required (file or base64)'}), 400 | |
# Load workflow configuration | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
workflow_path = os.path.join(current_dir, 'workflows/cogvideox_image_to_video_workflow_api.json') | |
with open(workflow_path, 'r', encoding='utf-8') as f: | |
workflow = json.load(f) | |
# Modify workflow with inputs | |
workflow["30"]["inputs"]["prompt"] = text_prompt | |
workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion, blur" | |
workflow["44"]["inputs"]["frame_rate"] = frame_rate | |
workflow["57"]["inputs"]["steps"] = steps | |
workflow["73"]["inputs"]["url"] = image_url | |
# WebSocket connection to queue and monitor workflow | |
ws = websocket.WebSocket() | |
ws.connect(f"{ws_address}?clientId={client_id}&token={token}", | |
header={"Authorization": f"Bearer {token}"}) | |
# Queue the prompt and wait for completion | |
prompt_id = queue_prompt(workflow, token) | |
# Wait for workflow execution to complete | |
last_ping = time.time() | |
PING_INTERVAL = 30 | |
# Wait for workflow execution to complete | |
while True: | |
message = json.loads(ws.recv()) | |
if message.get('type') == 'executing' and message['data']['node'] is None \ | |
and message['data']['prompt_id'] == prompt_id: | |
break | |
# Send a ping if PING_INTERVAL has passed | |
if time.time() - last_ping > PING_INTERVAL: | |
ws.send('ping') | |
last_ping = time.time() | |
# Fetch the history of the workflow | |
history = get_history(prompt_id, token).get(prompt_id, {}) | |
video_data = None | |
# Find the video or GIF data from the outputs | |
for node_id, node_output in history.get('outputs', {}).items(): | |
if 'gifs' in node_output: | |
video = node_output['gifs'][0] # Take the first video/GIF | |
try: | |
video_data = get_video_data(video['filename'], video['subfolder'], token) | |
break # Stop after fetching the first valid video | |
except Exception as e: | |
print(f"Failed to retrieve video: {str(e)}") | |
# Ensure video data was retrieved | |
if not video_data: | |
raise ValueError('Failed to generate video') | |
# Save the video locally | |
local_video_path = f"static/generated_image_to_video_{prompt_id}.mp4" | |
with open(local_video_path, 'wb') as f: | |
f.write(video_data) | |
# Construct the public URL for the video | |
# video_url = f"https://gosign-de-comfyui-api.hf.space/{local_video_path}" | |
# Prepare the response with the video URL | |
# response_data = { | |
# 'video_url': video_url, | |
# 'message': 'Video generated successfully' | |
# } | |
# return jsonify(response_data), 200 | |
# Send the video as an HTTP response | |
response = send_file( | |
io.BytesIO(video_data), | |
mimetype='video/mp4', | |
as_attachment=True, | |
download_name='generated_video.mp4' | |
) | |
return response | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
finally: | |
# Always delete the image if it was saved | |
if image_path and os.path.exists(image_path): | |
os.remove(image_path) | |
print(f"Deleted temporary image: {image_path}", flush=True) | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860, debug=True) | |