|
import hashlib
|
|
import requests
|
|
import json
|
|
import re
|
|
import os
|
|
from datetime import datetime
|
|
|
|
from server import PromptServer
|
|
import folder_paths
|
|
|
|
from ..utils import get_dict_value, load_json_file, path_exists, save_json_file
|
|
from ..utils_userdata import read_userdata_json, save_userdata_json, delete_userdata_file
|
|
|
|
|
|
def _get_info_cache_file(data_type: str, file_hash: str):
|
|
return f'info/{file_hash}.{data_type}.json'
|
|
|
|
|
|
async def delete_model_info(file: str,
|
|
model_type,
|
|
del_info=True,
|
|
del_metadata=True,
|
|
del_civitai=True):
|
|
"""Delete the info json, and the civitai & metadata caches."""
|
|
file_path = get_folder_path(file, model_type)
|
|
if file_path is None:
|
|
return
|
|
if del_info:
|
|
try_info_path = f'{file_path}.rgthree-info.json'
|
|
if os.path.isfile(try_info_path):
|
|
os.remove(try_info_path)
|
|
if del_civitai or del_metadata:
|
|
file_hash = _get_sha256_hash(file_path)
|
|
if del_civitai:
|
|
json_file_path = _get_info_cache_file(file_hash, 'civitai')
|
|
delete_userdata_file(json_file_path)
|
|
if del_metadata:
|
|
json_file_path = _get_info_cache_file(file_hash, 'metadata')
|
|
delete_userdata_file(json_file_path)
|
|
|
|
|
|
async def get_model_info(file: str,
|
|
model_type,
|
|
default=None,
|
|
maybe_fetch_civitai=False,
|
|
force_fetch_civitai=False,
|
|
maybe_fetch_metadata=False,
|
|
force_fetch_metadata=False,
|
|
light=False):
|
|
"""Compiles a model info given a stored file next to the model, and/or metadata/civitai."""
|
|
|
|
file_path = get_folder_path(file, model_type)
|
|
if file_path is None:
|
|
return default
|
|
|
|
info_data = {}
|
|
should_save = False
|
|
|
|
try_info_path = f'{file_path}.rgthree-info.json'
|
|
if path_exists(try_info_path):
|
|
info_data = load_json_file(try_info_path)
|
|
|
|
if 'file' not in info_data:
|
|
info_data['file'] = file
|
|
should_save = True
|
|
if 'path' not in info_data:
|
|
info_data['path'] = file_path
|
|
should_save = True
|
|
|
|
|
|
|
|
img_next_to_file = None
|
|
for ext in ['jpg', 'png', 'jpeg']:
|
|
try_path = f'{os.path.splitext(file_path)[0]}.{ext}'
|
|
if path_exists(try_path):
|
|
img_next_to_file = try_path
|
|
break
|
|
|
|
if 'images' not in info_data:
|
|
info_data['images'] = []
|
|
should_save = True
|
|
|
|
if img_next_to_file:
|
|
img_next_to_file_url = f'/rgthree/api/loras/img?file={file}'
|
|
if len(info_data['images']) == 0 or info_data['images'][0]['url'] != img_next_to_file_url:
|
|
info_data['images'].insert(0, {'url': img_next_to_file_url})
|
|
should_save = True
|
|
|
|
|
|
|
|
if light and not maybe_fetch_metadata and not force_fetch_metadata and not maybe_fetch_civitai and not force_fetch_civitai:
|
|
return info_data
|
|
|
|
if 'raw' not in info_data:
|
|
info_data['raw'] = {}
|
|
should_save = True
|
|
|
|
should_save = _update_data(info_data) or should_save
|
|
|
|
should_fetch_civitai = force_fetch_civitai is True or (maybe_fetch_civitai is True and
|
|
'civitai' not in info_data['raw'])
|
|
should_fetch_metadata = force_fetch_metadata is True or (maybe_fetch_metadata is True and
|
|
'metadata' not in info_data['raw'])
|
|
|
|
if should_fetch_metadata:
|
|
data_meta = _get_model_metadata(file, model_type, default={}, refresh=force_fetch_metadata)
|
|
should_save = _merge_metadata(info_data, data_meta) or should_save
|
|
|
|
if should_fetch_civitai:
|
|
data_civitai = _get_model_civitai_data(file,
|
|
model_type,
|
|
default={},
|
|
refresh=force_fetch_civitai)
|
|
should_save = _merge_civitai_data(info_data, data_civitai) or should_save
|
|
|
|
if 'sha256' not in info_data:
|
|
file_hash = _get_sha256_hash(file_path)
|
|
if file_hash is not None:
|
|
info_data['sha256'] = file_hash
|
|
should_save = True
|
|
|
|
if should_save:
|
|
if 'trainedWords' in info_data:
|
|
|
|
info_data['trainedWords'] = sorted(info_data['trainedWords'],
|
|
key=lambda w: w['count'] if 'count' in w else 99999,
|
|
reverse=True)
|
|
save_model_info(file, info_data, model_type)
|
|
|
|
|
|
await PromptServer.instance.send("rgthree-refreshed-lora-info", {"data": info_data})
|
|
|
|
return info_data
|
|
|
|
|
|
def _update_data(info_data: dict) -> bool:
|
|
"""Ports old data to new data if necessary."""
|
|
should_save = False
|
|
|
|
if 'triggerWords' in info_data and len(info_data['triggerWords']) > 0:
|
|
civitai_words = ','.join((get_dict_value(info_data, 'raw.civitai.triggerWords', default=[]) +
|
|
get_dict_value(info_data, 'raw.civitai.trainedWords', default=[])))
|
|
if 'trainedWords' not in info_data:
|
|
info_data['trainedWords'] = []
|
|
for trigger_word in info_data['triggerWords']:
|
|
word_data = next((data for data in info_data['trainedWords'] if data['word'] == trigger_word),
|
|
None)
|
|
if word_data is None:
|
|
word_data = {'word': trigger_word}
|
|
info_data['trainedWords'].append(word_data)
|
|
if trigger_word in civitai_words:
|
|
word_data['civitai'] = True
|
|
else:
|
|
word_data['user'] = True
|
|
|
|
del info_data['triggerWords']
|
|
should_save = True
|
|
return should_save
|
|
|
|
|
|
def _merge_metadata(info_data: dict, data_meta: dict) -> bool:
|
|
"""Returns true if data was saved."""
|
|
should_save = False
|
|
|
|
base_model_file = get_dict_value(data_meta, 'ss_sd_model_name', None)
|
|
if base_model_file:
|
|
info_data['baseModelFile'] = base_model_file
|
|
|
|
|
|
trained_words = {}
|
|
if 'ss_tag_frequency' in data_meta and isinstance(data_meta['ss_tag_frequency'], dict):
|
|
for bucket_value in data_meta['ss_tag_frequency'].values():
|
|
if isinstance(bucket_value, dict):
|
|
for tag, count in bucket_value.items():
|
|
if tag not in trained_words:
|
|
trained_words[tag] = {'word': tag, 'count': 0, 'metadata': True}
|
|
trained_words[tag]['count'] = trained_words[tag]['count'] + count
|
|
|
|
if 'trainedWords' not in info_data:
|
|
info_data['trainedWords'] = list(trained_words.values())
|
|
should_save = True
|
|
else:
|
|
|
|
merged_dict = {}
|
|
for existing_word_data in info_data['trainedWords']:
|
|
merged_dict[existing_word_data['word']] = existing_word_data
|
|
for new_key, new_word_data in trained_words.items():
|
|
if new_key not in merged_dict:
|
|
merged_dict[new_key] = {}
|
|
merged_dict[new_key] = {**merged_dict[new_key], **new_word_data}
|
|
info_data['trainedWords'] = list(merged_dict.values())
|
|
should_save = True
|
|
|
|
|
|
|
|
info_data['raw']['metadata'] = data_meta
|
|
should_save = True
|
|
|
|
if 'sha256' not in info_data and '_sha256' in data_meta:
|
|
info_data['sha256'] = data_meta['_sha256']
|
|
should_save = True
|
|
|
|
return should_save
|
|
|
|
|
|
def _merge_civitai_data(info_data: dict, data_civitai: dict) -> bool:
|
|
"""Returns true if data was saved."""
|
|
should_save = False
|
|
|
|
if 'name' not in info_data:
|
|
info_data['name'] = get_dict_value(data_civitai, 'model.name', '')
|
|
should_save = True
|
|
version_name = get_dict_value(data_civitai, 'name')
|
|
if version_name is not None:
|
|
info_data['name'] += f' - {version_name}'
|
|
|
|
if 'type' not in info_data:
|
|
info_data['type'] = get_dict_value(data_civitai, 'model.type')
|
|
should_save = True
|
|
if 'baseModel' not in info_data:
|
|
info_data['baseModel'] = get_dict_value(data_civitai, 'baseModel')
|
|
should_save = True
|
|
|
|
|
|
civitai_trigger = get_dict_value(data_civitai, 'triggerWords', default=[])
|
|
civitai_trained = get_dict_value(data_civitai, 'trainedWords', default=[])
|
|
civitai_words = ','.join(civitai_trigger + civitai_trained)
|
|
if civitai_words:
|
|
civitai_words = re.sub(r"\s*,\s*", ",", civitai_words)
|
|
civitai_words = re.sub(r",+", ",", civitai_words)
|
|
civitai_words = re.sub(r"^,", "", civitai_words)
|
|
civitai_words = re.sub(r",$", "", civitai_words)
|
|
if civitai_words:
|
|
civitai_words = civitai_words.split(',')
|
|
if 'trainedWords' not in info_data:
|
|
info_data['trainedWords'] = []
|
|
for trigger_word in civitai_words:
|
|
word_data = next(
|
|
(data for data in info_data['trainedWords'] if data['word'] == trigger_word), None)
|
|
if word_data is None:
|
|
word_data = {'word': trigger_word}
|
|
info_data['trainedWords'].append(word_data)
|
|
word_data['civitai'] = True
|
|
|
|
if 'sha256' not in info_data:
|
|
info_data['sha256'] = data_civitai['_sha256']
|
|
should_save = True
|
|
|
|
if 'modelId' in data_civitai:
|
|
info_data['links'] = info_data['links'] if 'links' in info_data else []
|
|
civitai_link = f'https://civitai.com/models/{get_dict_value(data_civitai, "modelId")}'
|
|
if get_dict_value(data_civitai, "id"):
|
|
civitai_link += f'?modelVersionId={get_dict_value(data_civitai, "id")}'
|
|
info_data['links'].append(civitai_link)
|
|
info_data['links'].append(data_civitai['_civitai_api'])
|
|
should_save = True
|
|
|
|
|
|
if 'images' in data_civitai:
|
|
info_data_image_urls = list(map(lambda i: i['url']
|
|
if 'url' in i else None, info_data['images']))
|
|
for img in data_civitai['images']:
|
|
img_url = get_dict_value(img, 'url')
|
|
if img_url is not None and img_url not in info_data_image_urls:
|
|
img_id = os.path.splitext(os.path.basename(img_url))[0] if img_url is not None else None
|
|
img_data = {
|
|
'url': img_url,
|
|
'civitaiUrl': f'https://civitai.com/images/{img_id}' if img_id is not None else None,
|
|
'width': get_dict_value(img, 'width'),
|
|
'height': get_dict_value(img, 'height'),
|
|
'type': get_dict_value(img, 'type'),
|
|
'nsfwLevel': get_dict_value(img, 'nsfwLevel'),
|
|
'seed': get_dict_value(img, 'meta.seed'),
|
|
'positive': get_dict_value(img, 'meta.prompt'),
|
|
'negative': get_dict_value(img, 'meta.negativePrompt'),
|
|
'steps': get_dict_value(img, 'meta.steps'),
|
|
'sampler': get_dict_value(img, 'meta.sampler'),
|
|
'cfg': get_dict_value(img, 'meta.cfgScale'),
|
|
'model': get_dict_value(img, 'meta.Model'),
|
|
'resources': get_dict_value(img, 'meta.resources'),
|
|
}
|
|
info_data['images'].append(img_data)
|
|
should_save = True
|
|
|
|
|
|
if 'civitai' not in info_data['raw']:
|
|
info_data['raw']['civitai'] = data_civitai
|
|
should_save = True
|
|
|
|
return should_save
|
|
|
|
|
|
def _get_model_civitai_data(file: str, model_type, default=None, refresh=False):
|
|
"""Gets the civitai data, either cached from the user directory, or from civitai api."""
|
|
file_hash = _get_sha256_hash(get_folder_path(file, model_type))
|
|
if file_hash is None:
|
|
return None
|
|
|
|
json_file_path = _get_info_cache_file(file_hash, 'civitai')
|
|
|
|
api_url = f'https://civitai.com/api/v1/model-versions/by-hash/{file_hash}'
|
|
file_data = read_userdata_json(json_file_path)
|
|
if file_data is None or refresh is True:
|
|
try:
|
|
response = requests.get(api_url, timeout=5000)
|
|
data = response.json()
|
|
save_userdata_json(json_file_path, {
|
|
'url': api_url,
|
|
'timestamp': datetime.now().timestamp(),
|
|
'response': data
|
|
})
|
|
file_data = read_userdata_json(json_file_path)
|
|
except requests.exceptions.RequestException as e:
|
|
print(e)
|
|
response = file_data['response'] if file_data is not None and 'response' in file_data else None
|
|
if response is not None:
|
|
response['_sha256'] = file_hash
|
|
response['_civitai_api'] = api_url
|
|
return response if response is not None else default
|
|
|
|
|
|
def _get_model_metadata(file: str, model_type, default=None, refresh=False):
|
|
"""Gets the metadata from the file itself."""
|
|
file_path = get_folder_path(file, model_type)
|
|
file_hash = _get_sha256_hash(file_path)
|
|
if file_hash is None:
|
|
return default
|
|
|
|
json_file_path = _get_info_cache_file(file_hash, 'metadata')
|
|
|
|
file_data = read_userdata_json(json_file_path)
|
|
if file_data is None or refresh is True:
|
|
data = _read_file_metadata_from_header(file_path)
|
|
if data is not None:
|
|
file_data = {'url': file, 'timestamp': datetime.now().timestamp(), 'response': data}
|
|
save_userdata_json(json_file_path, file_data)
|
|
response = file_data['response'] if file_data is not None and 'response' in file_data else None
|
|
if response is not None:
|
|
response['_sha256'] = file_hash
|
|
return response if response is not None else default
|
|
|
|
|
|
def _read_file_metadata_from_header(file_path: str) -> dict:
|
|
"""Reads the file's header and returns a JSON dict metdata if available."""
|
|
data = None
|
|
try:
|
|
if file_path.endswith('.safetensors'):
|
|
with open(file_path, "rb") as file:
|
|
|
|
|
|
header_size = int.from_bytes(file.read(8), "little", signed=False)
|
|
|
|
if header_size <= 0:
|
|
raise BufferError("Invalid header size")
|
|
|
|
header = file.read(header_size)
|
|
if header is None:
|
|
raise BufferError("Invalid header")
|
|
|
|
header_json = json.loads(header)
|
|
data = header_json["__metadata__"] if "__metadata__" in header_json else None
|
|
|
|
if data is not None:
|
|
for key, value in data.items():
|
|
if isinstance(value, str) and value.startswith('{') and value.endswith('}'):
|
|
try:
|
|
value_as_json = json.loads(value)
|
|
data[key] = value_as_json
|
|
except Exception:
|
|
print(f'metdata for field {key} did not parse as json')
|
|
except requests.exceptions.RequestException as e:
|
|
print(e)
|
|
data = None
|
|
|
|
return data
|
|
|
|
|
|
def get_folder_path(file: str, model_type):
|
|
"""Gets the file path ensuring it exists."""
|
|
file_path = folder_paths.get_full_path(model_type, file)
|
|
if file_path and not path_exists(file_path):
|
|
file_path = os.path.abspath(file_path)
|
|
if not path_exists(file_path):
|
|
file_path = None
|
|
return file_path
|
|
|
|
|
|
def _get_sha256_hash(file_path: str):
|
|
"""Returns the hash for the file."""
|
|
if not file_path or not path_exists(file_path):
|
|
return None
|
|
file_hash = None
|
|
sha256_hash = hashlib.sha256()
|
|
with open(file_path, "rb") as f:
|
|
|
|
for byte_block in iter(lambda: f.read(4096), b""):
|
|
sha256_hash.update(byte_block)
|
|
file_hash = sha256_hash.hexdigest()
|
|
return file_hash
|
|
|
|
|
|
async def set_model_info_partial(file: str, model_type: str, info_data_partial):
|
|
"""Sets partial data into the existing model info data."""
|
|
info_data = await get_model_info(file, model_type, default={})
|
|
info_data = {**info_data, **info_data_partial}
|
|
save_model_info(file, info_data, model_type)
|
|
|
|
|
|
def save_model_info(file: str, info_data, model_type):
|
|
"""Saves the model info alongside the model itself."""
|
|
file_path = get_folder_path(file, model_type)
|
|
if file_path is None:
|
|
return
|
|
try_info_path = f'{file_path}.rgthree-info.json'
|
|
save_json_file(try_info_path, info_data)
|
|
|