Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
import random | |
import multiprocessing | |
import subprocess | |
import sys | |
import time | |
import signal | |
import json | |
import os | |
import requests | |
from loguru import logger | |
from decouple import config | |
URL = config('URL') | |
OUTPUT_DIR = config('OUTPUT_DIR') | |
def get_latest_image(folder): | |
files = os.listdir(folder) | |
image_files = [f for f in files if f.lower().endswith(('.png', '.jpg', '.jpeg'))] | |
image_files.sort(key=lambda x: os.path.getmtime(os.path.join(folder, x))) | |
latest_image = os.path.join(folder, image_files[-1]) if image_files else None | |
return latest_image | |
def start_queue(prompt_workflow): | |
p = {"prompt": prompt_workflow} | |
data = json.dumps(p).encode('utf-8') | |
requests.post(URL, data=data) | |
def generate_image(prompt_text): | |
with open("workflow_api.json", "r", encoding="utf-8") as file_json: | |
prompt = json.load(file_json) | |
prompt["6"]["inputs"]["text"] = f"digital artwork of a {prompt_text}" | |
previous_image = get_latest_image(OUTPUT_DIR) | |
start_queue(prompt) | |
while True: | |
latest_image = get_latest_image(OUTPUT_DIR) | |
if latest_image != previous_image: | |
return latest_image | |
time.sleep(1) | |
def run_main_script(exit_event): | |
process = subprocess.Popen([sys.executable, "C:\\Users\\ilya9\\comfyui_gradio\\ComfyUI\main.py"]) | |
logger.info(f"Скрипт main.py запущен в отдельном процессе с PID: {process.pid}") | |
while not exit_event.is_set(): | |
if process.poll() is not None: | |
logger.info("Процесс main.py завершился самостоятельно") | |
break | |
time.sleep(0.1) | |
if process.poll() is None: | |
logger.info("Завершаем процесс main.py") | |
process.terminate() | |
process.wait(timeout=5) | |
if process.poll() is None: | |
logger.info("Процесс main.py не завершился, применяем SIGKILL") | |
process.kill() | |
def signal_handler(signum, frame): | |
logger.info(f"Получен сигнал {signum}, завершаем работу") | |
exit_event.set() | |
if __name__ == "__main__": | |
exit_event = multiprocessing.Event() | |
# Устанавливаем обработчик сигналов | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
# Создаем и запускаем процесс | |
process = multiprocessing.Process(target=run_main_script, args=(exit_event,)) | |
process.start() | |
try: | |
demo = gr.Interface(fn=generate_image, inputs=["text"], outputs=["image"]) | |
demo.launch(share=True) | |
finally: | |
logger.info("Ожидаем завершения дочернего процесса") | |
exit_event.set() | |
process.join(timeout=10) | |
if process.is_alive(): | |
logger.info("Дочерний процесс не завершился, применяем terminate()") | |
process.terminate() | |
process.join(timeout=5) | |
if process.is_alive(): | |
logger.info("Дочерний процесс все еще жив, применяем kill()") | |
process.kill() | |
process.join() | |
logger.info("Основной скрипт завершил работу.") | |