import argparse import json import os import shutil from tempfile import TemporaryDirectory from typing import List, Optional from huggingface_hub import CommitInfo, CommitOperationAdd, Discussion, HfApi, hf_hub_download from huggingface_hub.file_download import repo_folder_name class AlreadyExists(Exception): pass def is_index_stable_diffusion_like(config_dict): if "_class_name" not in config_dict: return False compatible_classes = [ "AltDiffusionImg2ImgPipeline", "AltDiffusionPipeline", "CycleDiffusionPipeline", "StableDiffusionImageVariationPipeline", "StableDiffusionImg2ImgPipeline", "StableDiffusionInpaintPipeline", "StableDiffusionInpaintPipelineLegacy", "StableDiffusionPipeline", "StableDiffusionPipelineSafe", "StableDiffusionUpscalePipeline", "VersatileDiffusionDualGuidedPipeline", "VersatileDiffusionImageVariationPipeline", "VersatileDiffusionPipeline", "VersatileDiffusionTextToImagePipeline", "OnnxStableDiffusionImg2ImgPipeline", "OnnxStableDiffusionInpaintPipeline", "OnnxStableDiffusionInpaintPipelineLegacy", "OnnxStableDiffusionPipeline", "StableDiffusionOnnxPipeline", "FlaxStableDiffusionPipeline", ] return config_dict["_class_name"] in compatible_classes def convert_single(model_id: str, folder: str) -> List["CommitOperationAdd"]: config_file = "model_index.json" # os.makedirs(os.path.join(folder, "scheduler"), exist_ok=True) model_index_file = hf_hub_download(repo_id=model_id, filename="model_index.json") with open(model_index_file, "r") as f: index_dict = json.load(f) if index_dict.get("feature_extractor", None) is None: print(f"{model_id} has no feature extractor") return False, False if index_dict["feature_extractor"][-1] != "CLIPFeatureExtractor": print(f"{model_id} is not out of date or is not CLIP") return False, False # old_config_file = hf_hub_download(repo_id=model_id, filename=config_file) old_config_file = model_index_file new_config_file = os.path.join(folder, config_file) success = convert_file(old_config_file, new_config_file) if success: operations = [CommitOperationAdd(path_in_repo=config_file, path_or_fileobj=new_config_file)] model_type = success return operations, model_type else: return False, False def convert_file( old_config: str, new_config: str, ): with open(old_config, "r") as f: old_dict = json.load(f) old_dict["feature_extractor"][-1] = "CLIPImageProcessor" # if "clip_sample" not in old_dict: # print("Make scheduler DDIM compatible") # old_dict["clip_sample"] = False # else: # print("No matching config") # return False with open(new_config, 'w') as f: json_str = json.dumps(old_dict, indent=2, sort_keys=True) + "\n" f.write(json_str) return "Stable Diffusion" def previous_pr(api: "HfApi", model_id: str, pr_title: str) -> Optional["Discussion"]: try: discussions = api.get_repo_discussions(repo_id=model_id) except Exception: return None for discussion in discussions: if discussion.status == "open" and discussion.is_pull_request and discussion.title == pr_title: return discussion def convert(api: "HfApi", model_id: str, force: bool = False) -> Optional["CommitInfo"]: # pr_title = "Correct `sample_size` of {}'s unet to have correct width and height default" pr_title = "Fix deprecation warning by changing `CLIPFeatureExtractor` to `CLIPImageProcessor`." info = api.model_info(model_id) filenames = set(s.rfilename for s in info.siblings) if "model_index.json" not in filenames: print(f"Model: {model_id} has no model_index.json file to change") return # if "vae/config.json" not in filenames: # print(f"Model: {model_id} has no 'vae/config.json' file to change") # return with TemporaryDirectory() as d: folder = os.path.join(d, repo_folder_name(repo_id=model_id, repo_type="models")) os.makedirs(folder) new_pr = None try: operations = None pr = previous_pr(api, model_id, pr_title) if pr is not None and not force: url = f"https://huggingface.co/{model_id}/discussions/{pr.num}" new_pr = pr raise AlreadyExists(f"Model {model_id} already has an open PR check out {url}") else: operations, model_type = convert_single(model_id, folder) if operations: pr_title = pr_title.format(model_type) # if model_type == "Stable Diffusion 1": # sample_size = 64 # image_size = 512 # elif model_type == "Stable Diffusion 2": # sample_size = 96 # image_size = 768 # pr_description = ( # f"Since `diffusers==0.9.0` the width and height is automatically inferred from the `sample_size` attribute of your unet's config. It seems like your diffusion model has the same architecture as {model_type} which means that when using this model, by default an image size of {image_size}x{image_size} should be generated. This in turn means the unet's sample size should be **{sample_size}**. \n\n In order to suppress to update your configuration on the fly and to suppress the deprecation warning added in this PR: https://github.com/huggingface/diffusers/pull/1406/files#r1035703505 it is strongly recommended to merge this PR." # ) contributor = model_id.split("/")[0] pr_description = ( f"Hey {contributor} ๐Ÿ‘‹, \n\n Your model repository seems to contain logic to load a feature extractor that is deprecated, which you should notice by seeing the warning: " "\n\n ```\ntransformers/models/clip/feature_extraction_clip.py:28: FutureWarning: The class CLIPFeatureExtractor is deprecated and will be removed in version 5 of Transformers. " f"Please use CLIPImageProcessor instead. warnings.warn(\n``` \n\n when running `pipe = DiffusionPipeline.from_pretrained({model_id})`." "This PR makes sure that the warning does not show anymore by replacing `CLIPFeatureExtractor` with `CLIPImageProcessor`. This will certainly not change or break your checkpoint, but only" "make sure that everything is up to date. \n\n Best, the ๐Ÿงจ Diffusers team." ) new_pr = api.create_commit( repo_id=model_id, operations=operations, commit_message=pr_title, commit_description=pr_description, create_pr=True, ) print(f"Pr created at {new_pr.pr_url}") else: print(f"No files to convert for {model_id}") finally: shutil.rmtree(folder) return new_pr if __name__ == "__main__": DESCRIPTION = """ Simple utility tool to convert automatically some weights on the hub to `safetensors` format. It is PyTorch exclusive for now. It works by downloading the weights (PT), converting them locally, and uploading them back as a PR on the hub. """ parser = argparse.ArgumentParser(description=DESCRIPTION) parser.add_argument( "model_id", type=str, help="The name of the model on the hub to convert. E.g. `gpt2` or `facebook/wav2vec2-base-960h`", ) parser.add_argument( "--force", action="store_true", help="Create the PR even if it already exists of if the model was already converted.", ) args = parser.parse_args() model_id = args.model_id api = HfApi() convert(api, model_id, force=args.force)