from typing import Any, Dict
from fastapi import Depends
from modules import shared
from modules.api import models, helpers
def post_shutdown():
shared.log.info('Shutdown request received')
import sys
def get_motd():
import requests
motd = ''
ver = shared.get_version()
if ver.get('updated', None) is not None:
motd = f"version <b>{ver['hash']} {ver['updated']}</b> <span style='color: var(--primary-500)'>{ver['url'].split('/')[-1]}</span><br>"
if shared.opts.motd:
res = requests.get('https://vladmandic.github.io/automatic/motd', timeout=10)
if res.status_code == 200:
msg = (res.text or '').strip()
shared.log.info(f'MOTD: {msg if len(msg) > 0 else "N/A"}')
motd += res.text
return motd
def get_version():
return shared.get_version()
def get_platform():
from installer import get_platform as installer_get_platform
from modules.loader import get_packages as loader_get_packages
return { **installer_get_platform(), **loader_get_packages() }
def get_log_buffer(req: models.ReqLog = Depends()):
lines = shared.log.buffer[:req.lines] if req.lines > 0 else shared.log.buffer.copy()
if req.clear:
return lines
def get_config():
options = {}
for k in shared.opts.data.keys():
if shared.opts.data_labels.get(k) is not None:
options.update({k: shared.opts.data.get(k, shared.opts.data_labels.get(k).default)})
options.update({k: shared.opts.data.get(k, None)})
if 'sd_lyco' in options:
del options['sd_lyco']
if 'sd_lora' in options:
del options['sd_lora']
return options
def set_config(req: Dict[str, Any]):
updated = []
for k, v in req.items():
updated.append({ k: shared.opts.set(k, v) })
return { "updated": updated }
def get_cmd_flags():
return vars(shared.cmd_opts)
def get_progress(req: models.ReqProgress = Depends()):
import time
if shared.state.job_count == 0:
return models.ResProgress(progress=0, eta_relative=0, state=shared.state.dict(), textinfo=shared.state.textinfo)
current_image = None
if shared.state.current_image and not req.skip_current_image:
current_image = helpers.encode_pil_to_base64(shared.state.current_image)
batch_x = max(shared.state.job_no, 0)
batch_y = max(shared.state.job_count, 1)
step_x = max(shared.state.sampling_step, 0)
step_y = max(shared.state.sampling_steps, 1)
current = step_y * batch_x + step_x
total = step_y * batch_y
progress = current / total if current > 0 and total > 0 else 0
time_since_start = time.time() - shared.state.time_start
eta_relative = (time_since_start / progress) - time_since_start if progress > 0 else 0
res = models.ResProgress(progress=progress, eta_relative=eta_relative, state=shared.state.dict(), current_image=current_image, textinfo=shared.state.textinfo)
return res
def post_interrupt():
return {}
def post_skip():
def get_memory():
import os
import psutil
process = psutil.Process(os.getpid())
res = process.memory_info() # only rss is cross-platform guaranteed so we dont rely on other values
ram_total = 100 * res.rss / process.memory_percent() # and total memory is calculated as actual value is not cross-platform safe
ram = { 'free': ram_total - res.rss, 'used': res.rss, 'total': ram_total }
except Exception as err:
ram = { 'error': f'{err}' }
import torch
if torch.cuda.is_available():
s = torch.cuda.mem_get_info()
system = { 'free': s[0], 'used': s[1] - s[0], 'total': s[1] }
s = dict(torch.cuda.memory_stats(shared.device))
allocated = { 'current': s['allocated_bytes.all.current'], 'peak': s['allocated_bytes.all.peak'] }
reserved = { 'current': s['reserved_bytes.all.current'], 'peak': s['reserved_bytes.all.peak'] }
active = { 'current': s['active_bytes.all.current'], 'peak': s['active_bytes.all.peak'] }
inactive = { 'current': s['inactive_split_bytes.all.current'], 'peak': s['inactive_split_bytes.all.peak'] }
warnings = { 'retries': s['num_alloc_retries'], 'oom': s['num_ooms'] }
cuda = {
'system': system,
'active': active,
'allocated': allocated,
'reserved': reserved,
'inactive': inactive,
'events': warnings,
cuda = { 'error': 'unavailable' }
except Exception as err:
cuda = { 'error': f'{err}' }
return models.ResMemory(ram = ram, cuda = cuda)