Spaces:
Running
Running
import os | |
import random | |
import io | |
from PIL import Image | |
from deep_translator import GoogleTranslator | |
import aiohttp | |
from quart import Quart, request, jsonify, send_file, render_template_string | |
from flask_caching import Cache # flask-cachingを使用 | |
# アプリケーションの設定 | |
app = Quart(__name__) | |
# キャッシュの設定 | |
cache_config = { | |
"CACHE_TYPE": "SimpleCache", # メモリベースのシンプルなキャッシュ | |
"CACHE_DEFAULT_TIMEOUT": 60 * 60 * 24 # 24時間 | |
} | |
app.config.from_mapping(cache_config) | |
cache = Cache(app) | |
API_URL = "https://api-inference.huggingface.co/models/black-forest-labs/FLUX.1-dev" | |
API_TOKEN = os.getenv("HF_READ_TOKEN") | |
headers = {"Authorization": f"Bearer {API_TOKEN}"} | |
timeout = 50000 # タイムアウトを300秒に設定 | |
# 非同期APIリクエストの実行関数 | |
async def query_async(prompt, negative_prompt="", steps=35, cfg_scale=7, sampler="DPM++ 2M Karras", seed=-1, strength=0.7, width=1024, height=1024, num_inference_steps=30, guidance_scale=7.5, top_k=50, top_p=0.9, eta=0.1): | |
if not prompt: | |
return None, "Prompt is required" | |
key = random.randint(0, 999) | |
# Translate the prompt from Russian to English if necessary | |
prompt = GoogleTranslator(source='ru', target='en').translate(prompt) | |
print(f'Generation {key} translation: {prompt}') | |
prompt = f"{prompt} | ultra detail, ultra elaboration, ultra quality, perfect." | |
print(f'Generation {key}: {prompt}') | |
payload = { | |
"inputs": prompt, | |
"is_negative": False, | |
"steps": steps, | |
"cfg_scale": cfg_scale, | |
"seed": seed if seed != -1 else random.randint(1, 1000000000), | |
"strength": strength, | |
"negative_prompt": negative_prompt, | |
"top_k": top_k, | |
"top_p": top_p, | |
"eta": eta, | |
"parameters": { | |
"width": width, | |
"height": height, | |
"num_inference_steps": num_inference_steps, | |
"guidance_scale": guidance_scale | |
} | |
} | |
async with aiohttp.ClientSession() as session: | |
try: | |
async with session.post(API_URL, json=payload, headers=headers, timeout=timeout) as response: | |
if response.status != 200: | |
return None, f"Error: Failed to get image. Status code: {response.status}, Details: {await response.text()}" | |
image_bytes = await response.read() | |
image = Image.open(io.BytesIO(image_bytes)) | |
return image, None | |
except asyncio.TimeoutError: | |
return None, "Error: The request timed out. Please try again." | |
except Exception as e: | |
return None, f"Request Exception: {str(e)}" | |
# Content-Security-Policyヘッダーを設定するための関数 | |
async def add_security_headers(response): | |
response.headers['Content-Security-Policy'] = ( | |
"default-src 'self'; " | |
"connect-src 'self' ^https?:\/\/[\w.-]+\.[\w.-]+(\/[\w.-]*)*(\?[^\s]*)?$" | |
"img-src 'self' data:; " | |
"style-src 'self' 'unsafe-inline'; " | |
"script-src 'self' 'unsafe-inline'; " | |
) | |
return response | |
# HTMLテンプレート | |
index_html = """ | |
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<title>Image Generator</title> | |
</head> | |
<body> | |
<h1>Welcome to the Image Generator</h1> | |
<form action="/generate" method="get"> | |
<input type="text" name="prompt" placeholder="Enter prompt" required> | |
<input type="submit" value="Generate"> | |
</form> | |
</body> | |
</html> | |
""" | |
async def index(): | |
return await render_template_string(index_html) | |
async def generate_image(): | |
prompt = request.args.get("prompt", "") | |
negative_prompt = request.args.get("negative_prompt", "") | |
steps = int(request.args.get("steps", 35)) | |
cfg_scale = float(request.args.get("cfgs", 7)) | |
sampler = request.args.get("sampler", "DPM++ 2M Karras") | |
strength = float(request.args.get("strength", 0.7)) | |
seed = int(request.args.get("seed", -1)) | |
width = int(request.args.get("width", 1024)) | |
height = int(request.args.get("height", 1024)) | |
num_inference_steps = int(request.args.get("num_inference_steps", 30)) | |
guidance_scale = float(request.args.get("guidance_scale", 7.5)) | |
top_k = int(request.args.get("top_k", 50)) | |
top_p = float(request.args.get("top_p", 0.9)) | |
eta = float(request.args.get("eta", 0.1)) | |
# キャッシュを確認 | |
cached_image = cache.get(prompt) | |
if cached_image: | |
return await send_file(io.BytesIO(cached_image), mimetype='image/png') | |
image, error = await query_async(prompt, negative_prompt, steps, cfg_scale, sampler, seed, strength, width, height, num_inference_steps, guidance_scale, top_k, top_p, eta) | |
if error: | |
return jsonify({"error": error}), 400 | |
img_bytes = io.BytesIO() | |
image.save(img_bytes, format='PNG') | |
img_bytes.seek(0) | |
# 画像をキャッシュに保存 | |
cache.set(prompt, img_bytes.getvalue()) | |
return await send_file(img_bytes, mimetype='image/png') | |
if __name__ == "__main__": | |
app.run(host='0.0.0.0', port=7860) | |