merges_d / mergekit /merge.py
Auber's picture
Upload folder using huggingface_hub
83a9b56 verified
raw
history blame
7.29 kB
# Copyright (C) 2024 Charles O. Goddard
#
# This software is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
import os
import shutil
from typing import Optional
import tqdm
import transformers
from mergekit.architecture import ArchitectureInfo, get_architecture_info
from mergekit.card import generate_card
from mergekit.config import MergeConfiguration
from mergekit.graph import Executor
from mergekit.io.tasks import LoaderCache
from mergekit.options import MergeOptions
from mergekit.plan import MergePlanner
from mergekit.tokenizer import TokenizerInfo
def run_merge(
merge_config: MergeConfiguration,
out_path: str,
options: MergeOptions,
config_source: Optional[str] = None,
):
if options.random_seed is not None:
transformers.trainer_utils.set_seed(options.random_seed)
if not merge_config.models and not merge_config.slices:
raise RuntimeError("No output requested")
model_arch_info = [
get_architecture_info(m.config(trust_remote_code=options.trust_remote_code))
for m in merge_config.referenced_models()
]
if not options.allow_crimes:
if not all(a == model_arch_info[0] for a in model_arch_info[1:]):
raise RuntimeError(
"Must specify --allow-crimes to attempt to mix different architectures"
)
arch_info = model_arch_info[0]
# initialize loader cache and set options
loader_cache = LoaderCache()
loader_cache.setup(options=options)
# create config for output model
cfg_out = _model_out_config(
merge_config, arch_info, trust_remote_code=options.trust_remote_code
)
# warm up loader cache
for model in (
pbar := tqdm.tqdm(
merge_config.referenced_models(),
desc="Warmup loader cache",
disable=options.quiet,
)
):
loader_cache.get(model)
del pbar
logging.info("Planning operations")
targets = MergePlanner(
merge_config,
arch_info,
options=options,
out_model_config=cfg_out,
).plan_to_disk(out_path=out_path)
exec = Executor(
tasks=targets,
math_device="cuda" if options.cuda else "cpu",
storage_device="cuda" if options.low_cpu_memory else "cpu",
)
tokenizer = None
for _task, value in exec.run(quiet=options.quiet):
if isinstance(value, TokenizerInfo):
tokenizer = value.tokenizer
if tokenizer:
_update_config_vocab(cfg_out, tokenizer)
logging.info("Saving config")
cfg_out.save_pretrained(out_path)
if options.write_model_card:
if not config_source:
config_source = merge_config.to_yaml()
card_md = generate_card(
config=merge_config,
config_yaml=config_source,
name=os.path.basename(out_path),
)
with open(os.path.join(out_path, "README.md"), "w", encoding="utf-8") as fp:
fp.write(card_md)
with open(
os.path.join(out_path, "mergekit_config.yml"), "w", encoding="utf-8"
) as fp:
fp.write(config_source)
if tokenizer is None and options.copy_tokenizer:
try:
_copy_tokenizer(
merge_config, out_path, trust_remote_code=options.trust_remote_code
)
except Exception as e:
logging.error(
"Failed to copy tokenizer. The merge was still successful, just copy it from somewhere else.",
exc_info=e,
)
if tokenizer:
logging.info("Saving tokenizer")
tokenizer.save_pretrained(out_path, safe_serialization=True)
def _copy_tokenizer(
merge_config: MergeConfiguration, out_path: str, trust_remote_code: bool = False
):
donor_model = merge_config.base_model or (merge_config.referenced_models()[0])
if os.path.exists(
os.path.join(donor_model.model.path, "tokenizer_config.json")
) and (
os.path.exists(os.path.join(donor_model.model.path, "tokenizer.json"))
or os.path.exists(os.path.join(donor_model.model.path, "tokenizer.model"))
):
logging.info(f"Copying tokenizer from {donor_model}")
for file_name in [
"tokenizer_config.json",
"special_tokens_map.json",
"tokenizer.json",
"tokenizer.model",
]:
if os.path.exists(os.path.join(donor_model.model.path, file_name)):
shutil.copy(
os.path.join(donor_model.model.path, file_name),
os.path.join(out_path, file_name),
)
return
# fallback: try actually loading the tokenizer and saving it
logging.info(f"Reserializing tokenizer from {donor_model}")
tokenizer = transformers.AutoTokenizer.from_pretrained(
donor_model.model.path,
revision=donor_model.model.revision,
trust_remote_code=trust_remote_code,
)
tokenizer.save_pretrained(out_path, safe_serialization=True)
def _model_out_config(
config: MergeConfiguration,
arch_info: ArchitectureInfo,
trust_remote_code: bool = False,
) -> transformers.PretrainedConfig:
"""Return a configuration for the resulting model."""
if config.base_model:
res = config.base_model.config(trust_remote_code=trust_remote_code)
else:
res = config.referenced_models()[0].config(trust_remote_code=trust_remote_code)
if config.out_dtype:
res.torch_dtype = config.out_dtype
elif config.dtype:
res.torch_dtype = config.dtype
if config.slices:
try:
num_layers = sum(
s.sources[0].layer_range[1] - s.sources[0].layer_range[0]
for s in config.slices
)
setattr(res, arch_info.num_layers_config_key(), num_layers)
except Exception as e:
logging.warning(
"Unable to set number of layers in output config - you may need to manually correct it.",
exc_info=e,
)
return res
def _update_config_vocab(
config: transformers.PretrainedConfig,
tokenizer: transformers.PreTrainedTokenizerBase,
):
try:
config.vocab_size = len(tokenizer.get_vocab())
except Exception as e:
logging.warning(
"Unable to set vocabulary size in output config - you may need to manually correct it.",
exc_info=e,
)
__all__ = ["MergeOptions", "run_merge"]