import gradio as gr from huggingface_hub import HfApi, hf_hub_url import os from pathlib import Path import gc import requests from requests.adapters import HTTPAdapter from urllib3.util import Retry from utils import get_token, set_token, is_repo_exists, get_user_agent, get_download_file, list_uniq import re from PIL import Image import json TEMP_DIR = "." def parse_urls(s): url_pattern = "https?://[\\w/:%#\\$&\\?\\(\\)~\\.=\\+\\-]+" try: urls = re.findall(url_pattern, s) return list(urls) except Exception: return [] def to_urls(l: list[str]): return "\n".join(l) def uniq_urls(s): return to_urls(list_uniq(parse_urls(s))) def upload_safetensors_to_repo(filename, repo_id, repo_type, is_private, progress=gr.Progress(track_tqdm=True)): output_filename = Path(filename).name hf_token = get_token() api = HfApi(token=hf_token) try: if not is_repo_exists(repo_id, repo_type): api.create_repo(repo_id=repo_id, repo_type=repo_type, token=hf_token, private=is_private) progress(0, desc=f"Start uploading... {filename} to {repo_id}") api.upload_file(path_or_fileobj=filename, path_in_repo=output_filename, repo_type=repo_type, revision="main", token=hf_token, repo_id=repo_id) progress(1, desc="Uploaded.") url = hf_hub_url(repo_id=repo_id, repo_type=repo_type, filename=output_filename) except Exception as e: print(f"Error: Failed to upload to {repo_id}. {e}") gr.Warning(f"Error: Failed to upload to {repo_id}. {e}") return None finally: Path(filename).unlink() return url def download_file(dl_url, civitai_key, progress=gr.Progress(track_tqdm=True)): download_dir = TEMP_DIR progress(0, desc=f"Start downloading... {dl_url}") output_filename = get_download_file(download_dir, dl_url, civitai_key) return output_filename def save_civitai_info(dl_url, filename, progress=gr.Progress(track_tqdm=True)): json_str, html_str, image_path = get_civitai_json(dl_url, True, filename) if not json_str: return "", "", "" json_path = str(Path(TEMP_DIR, Path(filename).stem + ".json")) html_path = str(Path(TEMP_DIR, Path(filename).stem + ".html")) try: with open(json_path, 'w') as f: json.dump(json_str, f, indent=2) with open(html_path, mode='w', encoding="utf-8") as f: f.write(html_str) return json_path, html_path, image_path except Exception as e: print(f"Error: Failed to save info file {json_path}, {html_path} {e}") return "", "", "" def upload_info_to_repo(dl_url, filename, repo_id, repo_type, is_private, progress=gr.Progress(track_tqdm=True)): def upload_file(api, filename, repo_id, repo_type, hf_token): if not Path(filename).exists(): return api.upload_file(path_or_fileobj=filename, path_in_repo=Path(filename).name, repo_type=repo_type, revision="main", token=hf_token, repo_id=repo_id) Path(filename).unlink() hf_token = get_token() api = HfApi(token=hf_token) try: if not is_repo_exists(repo_id, repo_type): api.create_repo(repo_id=repo_id, repo_type=repo_type, token=hf_token, private=is_private) progress(0, desc=f"Downloading info... {filename}") json_path, html_path, image_path = save_civitai_info(dl_url, filename) progress(0, desc=f"Start uploading info... {filename} to {repo_id}") if not json_path: return else: upload_file(api, json_path, repo_id, repo_type, hf_token) if html_path: upload_file(api, html_path, repo_id, repo_type, hf_token) if image_path: upload_file(api, image_path, repo_id, repo_type, hf_token) progress(1, desc="Info uploaded.") return except Exception as e: print(f"Error: Failed to upload info to {repo_id}. {e}") gr.Warning(f"Error: Failed to upload info to {repo_id}. {e}") return def download_civitai(dl_url, civitai_key, hf_token, urls, newrepo_id, repo_type="model", is_private=True, is_info=False, progress=gr.Progress(track_tqdm=True)): if hf_token: set_token(hf_token) else: set_token(os.environ.get("HF_TOKEN")) # default huggingface write token if not civitai_key: civitai_key = os.environ.get("CIVITAI_API_KEY") # default Civitai API key if not newrepo_id: newrepo_id = os.environ.get("HF_REPO") # default repo to upload if not get_token() or not civitai_key: raise gr.Error("HF write token and Civitai API key is required.") dl_urls = parse_urls(dl_url) if not urls: urls = [] for u in dl_urls: file = download_file(u, civitai_key) if not Path(file).exists() or not Path(file).is_file(): continue url = upload_safetensors_to_repo(file, newrepo_id, repo_type, is_private) if url: urls.append(url) if is_info: upload_info_to_repo(u, file, newrepo_id, repo_type, is_private) progress(1, desc="Processing...") md = f'### Your repo: [{newrepo_id}]({"https://huggingface.co/datasets/" if repo_type == "dataset" else "https://huggingface.co/"}{newrepo_id})\n' for u in urls: md += f"- Uploaded [{str(u)}]({str(u)})\n" gc.collect() return gr.update(value=urls, choices=urls), gr.update(value=md) CIVITAI_TYPE = ["Checkpoint", "TextualInversion", "Hypernetwork", "AestheticGradient", "LORA", "Controlnet", "Poses"] CIVITAI_BASEMODEL = ["Pony", "SD 1.5", "SDXL 1.0", "Flux.1 D", "Flux.1 S"] CIVITAI_SORT = ["Highest Rated", "Most Downloaded", "Newest"] CIVITAI_PERIOD = ["AllTime", "Year", "Month", "Week", "Day"] def search_on_civitai(query: str, types: list[str], allow_model: list[str] = [], limit: int = 100, sort: str = "Highest Rated", period: str = "AllTime", tag: str = ""): user_agent = get_user_agent() headers = {'User-Agent': user_agent, 'content-type': 'application/json'} base_url = 'https://civitai.com/api/v1/models' params = {'sort': sort, 'period': period, 'limit': limit, 'nsfw': 'true'} if len(types) != 0: params["types"] = types if query: params["query"] = query if tag: params["tag"] = tag session = requests.Session() retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) session.mount("https://", HTTPAdapter(max_retries=retries)) try: r = session.get(base_url, params=params, headers=headers, stream=True, timeout=(3.0, 30)) except Exception as e: print(e) return None else: if not r.ok: return None json = r.json() if 'items' not in json: return None items = [] for j in json['items']: for model in j['modelVersions']: item = {} if len(allow_model) != 0 and model['baseModel'] not in set(allow_model): continue item['name'] = j['name'] item['creator'] = j['creator']['username'] if 'creator' in j.keys() and 'username' in j['creator'].keys() else "" item['tags'] = j['tags'] if 'tags' in j.keys() else [] item['model_name'] = model['name'] if 'name' in model.keys() else "" item['base_model'] = model['baseModel'] if 'baseModel' in model.keys() else "" item['dl_url'] = model['downloadUrl'] if 'images' in model.keys() and len(model["images"]) != 0: item['md'] = f'thumbnail
[Model URL](https://civitai.com/models/{j["id"]})' else: item['md'] = f'[Model URL](https://civitai.com/models/{j["id"]})' items.append(item) return items civitai_last_results = {} def search_civitai(query, types, base_model=[], sort=CIVITAI_SORT[0], period=CIVITAI_PERIOD[0], tag=""): global civitai_last_results items = search_on_civitai(query, types, base_model, 100, sort, period, tag) if not items: return gr.update(choices=[("", "")], value="", visible=False),\ gr.update(value="", visible=False), gr.update(visible=True), gr.update(visible=True) civitai_last_results = {} choices = [] for item in items: base_model_name = "Pony🐴" if item['base_model'] == "Pony" else item['base_model'] name = f"{item['name']} (for {base_model_name} / By: {item['creator']} / Tags: {', '.join(item['tags'])})" value = item['dl_url'] choices.append((name, value)) civitai_last_results[value] = item if not choices: return gr.update(choices=[("", "")], value="", visible=False),\ gr.update(value="", visible=False), gr.update(visible=True), gr.update(visible=True) result = civitai_last_results.get(choices[0][1], "None") md = result['md'] if result else "" return gr.update(choices=choices, value=choices[0][1], visible=True), gr.update(value=md, visible=True),\ gr.update(visible=True), gr.update(visible=True) def get_civitai_json(dl_url: str, is_html: bool=False, image_baseurl: str=""): if not image_baseurl: image_baseurl = dl_url default = ("", "", "") if is_html else "" if "https://civitai.com/api/download/models/" not in dl_url: return default user_agent = get_user_agent() headers = {'User-Agent': user_agent, 'content-type': 'application/json'} base_url = 'https://civitai.com/api/v1/model-versions/' params = {} session = requests.Session() retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504]) session.mount("https://", HTTPAdapter(max_retries=retries)) url = base_url + str(dl_url.split("/")[-1]) try: r = session.get(url, params=params, headers=headers, stream=True, timeout=(3.0, 15)) if not r.ok: return default json = dict(r.json()).copy() html = "" image = "" if "modelId" in json.keys(): url = f"https://civitai.com/models/{json['modelId']}" r = session.get(url, params=params, headers=headers, stream=True, timeout=(3.0, 15)) if not r.ok: return json, html, image html = r.text if 'images' in json.keys() and len(json["images"]) != 0: url = json["images"][0]["url"] r = session.get(url, params=params, headers=headers, stream=True, timeout=(3.0, 15)) if not r.ok: return json, html, image image_temp = str(Path(TEMP_DIR, "image" + Path(url.split("/")[-1]).suffix)) image = str(Path(TEMP_DIR, Path(image_baseurl.split("/")[-1]).stem + ".png")) with open(image_temp, 'wb') as f: f.write(r.content) Image.open(image_temp).convert('RGBA').save(image) return json, html, image except Exception as e: print(e) return default def select_civitai_item(results: list[str]): if "http" not in "".join(results): return gr.update(value="None", visible=True) result = civitai_last_results.get(results[-1], "None") md = result['md'] if result else "" json = {} #json, html, image = get_civitai_json(results[-1], True) # for debugging return gr.update(value=md, visible=True), gr.update(value=json, visible=False) def add_civitai_item(results: list[str], dl_url: str): if "http" not in "".join(results): return gr.update(value=dl_url) new_url = dl_url if dl_url else "" for result in results: if "http" not in result: continue new_url += f"\n{result}" if new_url else f"{result}" new_url = uniq_urls(new_url) return gr.update(value=new_url)