|
""" |
|
NER (Normalized Effective Rank) quantifies dimensional utilization across layers using entropy analysis of singular value distributions. NER calculation involves Singular Value Decomposition (SVD) of weight matrix A. Singular values form a probability distribution through normalization. Entropy H of this distribution yields the Effective Rank (ERank) as 2^H. Normalizing by maximum possible entropy H_max produces a value between 0 and 1, measuring dimensional utilization efficiency. |
|
|
|
Run the script with: |
|
python mastermerge.py --config mastermerge_config.yaml (optional) |
|
|
|
The script loads configuration, processes each model by downloading, loading weights, normalizing layers, calculating NER for each layer, using NER to identify the optimal layer, finally creating a composite model with the highest ner in each layer. |
|
|
|
**License** |
|
Use, modify, and distribute as you see fit. Good luck with that shit. |
|
Copyright 2024, nobody. No rights reserved. |
|
""" |
|
|
|
import torch |
|
import json |
|
import argparse |
|
import shutil |
|
from tqdm import tqdm |
|
import os |
|
import yaml |
|
from typing import Optional |
|
from datetime import datetime |
|
from torch.cuda.amp import autocast |
|
from huggingface_hub import snapshot_download |
|
from transformers import AutoModelForCausalLM |
|
from transformers import AutoConfig |
|
|
|
|
|
def download_model(model_name: str, models_dir: str) -> Optional[str]: |
|
"""Download model from Hugging Face Hub.""" |
|
local_path = os.path.join(models_dir, model_name.replace("/", "_")) |
|
if not os.path.exists(local_path): |
|
print(f"Downloading {model_name} to {local_path}") |
|
try: |
|
snapshot_download( |
|
repo_id=model_name, |
|
local_dir=local_path, |
|
local_dir_use_symlinks=False, |
|
revision="main", |
|
) |
|
print(f"Successfully downloaded {model_name}") |
|
except Exception as e: |
|
print(f"Error downloading {model_name}: {e}") |
|
return None |
|
else: |
|
print(f"Model {model_name} already exists at {local_path}") |
|
|
|
return local_path |
|
|
|
|
|
def load_model(model_path: str, device: str = "cuda") -> Optional[AutoModelForCausalLM]: |
|
"""Load model from local path.""" |
|
try: |
|
return AutoModelForCausalLM.from_pretrained( |
|
model_path, |
|
torch_dtype=torch.bfloat16, |
|
low_cpu_mem_usage=True, |
|
trust_remote_code=True, |
|
device_map=device, |
|
) |
|
except Exception as e: |
|
print(f"Error loading model: {e}") |
|
return None |
|
|
|
|
|
def calculate_normalized_effective_rank(A: torch.Tensor) -> float: |
|
""" "Calculate the Normalized Effective Rank (NER) of a matrix.""" |
|
try: |
|
|
|
if A.dtype != torch.float32: |
|
A = A.float() |
|
if A.dim() == 1: |
|
A = A.unsqueeze(0) |
|
if 1 in A.shape: |
|
S = A.abs().view(-1) |
|
else: |
|
S = torch.linalg.svdvals(A) |
|
S = S[S > 1e-12] |
|
if S.numel() == 0: |
|
return 0.0 |
|
|
|
|
|
S_sum = S.sum() |
|
S /= S_sum |
|
|
|
|
|
log_S = torch.log2(S) |
|
H = -torch.dot(S, log_S) |
|
H_max = torch.log2( |
|
torch.tensor(float(S.numel()), dtype=torch.float32, device=S.device) |
|
) |
|
return (H / H_max).item() if H_max > 0 else 0.0 |
|
except Exception as e: |
|
print(f"Error calculating NER: {e}") |
|
return 0.0 |
|
|
|
|
|
def normalize_tensor(A: torch.Tensor) -> torch.Tensor: |
|
"""Normalize input tensor.""" |
|
A_min, A_max = A.min(), A.max() |
|
return (A - A_min) / max(A_max - A_min, 1e-10) |
|
|
|
|
|
def save_metrics_to_json(model_name: str, layer_metrics: dict, output_dir: str) -> None: |
|
model_name_slug = model_name.replace("/", "-").replace("_", "-") |
|
filename = os.path.join(output_dir, f"metrics_results_{model_name_slug}.json") |
|
with open(filename, "w") as f: |
|
json.dump(layer_metrics, f, indent=4) |
|
print(f"Metrics saved to {filename}") |
|
|
|
|
|
def load_config(config_path: str) -> dict: |
|
"""Load configuration from YAML file.""" |
|
with open(config_path, "r") as file: |
|
return yaml.safe_load(file) |
|
|
|
|
|
def metric_file_exists(model_name: str, metric_dir: str) -> bool: |
|
"""Check if metric file already exists for the given model.""" |
|
model_name_slug = model_name.replace("/", "-").replace("_", "-") |
|
filename = os.path.join(metric_dir, f"metrics_results_{model_name_slug}.json") |
|
return os.path.exists(filename) |
|
|
|
|
|
def load_all_metrics(config: dict) -> dict: |
|
"""Load all metrics from the metric directory.""" |
|
all_metrics = {} |
|
for model_name in [config["base_model"]] + config["fine_tuned_models"]: |
|
model_name_slug = model_name.replace("/", "-").replace("_", "-") |
|
filename = os.path.join( |
|
config["metric_dir"], f"metrics_results_{model_name_slug}.json" |
|
) |
|
with open(filename, "r") as f: |
|
all_metrics[model_name] = json.load(f) |
|
return all_metrics |
|
|
|
|
|
def identify_common_layers(all_metrics: dict) -> list: |
|
"""Identify common layers across all models.""" |
|
layer_sets = [set(model_metrics.keys()) for model_metrics in all_metrics.values()] |
|
common_layers = set.intersection(*layer_sets) |
|
return list(common_layers) |
|
|
|
|
|
def identify_layers(all_metrics: dict) -> list: |
|
"""Identify the superset of layers across all models, maintaining their relative order.""" |
|
superset_layers = [] |
|
added_layers = set() |
|
for model_metrics in all_metrics.values(): |
|
for layer in model_metrics.keys(): |
|
if layer not in added_layers: |
|
superset_layers.append(layer) |
|
added_layers.add(layer) |
|
return superset_layers |
|
|
|
|
|
def select_best_layers(common_layers: list, all_metrics: dict) -> dict: |
|
"""Select best layers""" |
|
layer_selection = {} |
|
for layer in common_layers: |
|
best_model = max( |
|
all_metrics.keys(), key=lambda model: all_metrics[model][layer]["ner"] |
|
) |
|
layer_selection[layer] = best_model |
|
|
|
print("Selected layers:") |
|
print(json.dumps(layer_selection, indent=4)) |
|
return layer_selection |
|
|
|
|
|
def save_composite_model( |
|
composite_model: AutoModelForCausalLM, layer_selection: dict, config: dict |
|
) -> None: |
|
"""Save composite model to the output directory.""" |
|
date_str = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
output_name = f"composite_model_{date_str}" |
|
output_dir = os.path.join(config["output_dir"], output_name) |
|
os.makedirs(output_dir, exist_ok=True) |
|
composite_model.save_pretrained(output_dir) |
|
generate_merge_report(layer_selection, output_dir, config) |
|
|
|
|
|
base_model_path = os.path.join( |
|
config["models_dir"], config["base_model"].replace("/", "_") |
|
) |
|
tokenizer_files = ["tokenizer_config.json", "tokenizer.json", "vocab.json"] |
|
|
|
for file in tokenizer_files: |
|
src_path = os.path.join(base_model_path, file) |
|
dst_path = os.path.join(output_dir, file) |
|
if os.path.exists(src_path): |
|
shutil.copy2(src_path, dst_path) |
|
else: |
|
print(f"Warning: {file} not found in the base model directory.") |
|
|
|
print(f"Composite model and tokenizer files saved to: {output_dir}") |
|
|
|
|
|
def generate_merge_report(layer_selection: dict, output_dir, config: dict) -> None: |
|
"""Generate merge report and save to the output directory.""" |
|
report = { |
|
"base_model": config["base_model"], |
|
"fine_tuned_models": config["fine_tuned_models"], |
|
"layer_selection": layer_selection, |
|
} |
|
report_file = os.path.join(output_dir, "merge_report.json") |
|
with open(report_file, "w") as f: |
|
json.dump(report, f, indent=4) |
|
print(f"Merge report saved to {report_file}") |
|
print(json.dumps(report, indent=4)) |
|
|
|
|
|
def create_composite_model( |
|
base_model_name: str, layer_selection: dict, config: dict |
|
) -> AutoModelForCausalLM: |
|
"""Create composite model by merging selected layers.""" |
|
models_dir = config["models_dir"] |
|
base_model_path = os.path.join(models_dir, base_model_name.replace("/", "_")) |
|
base_model = load_model(base_model_path) |
|
|
|
for layer_name, source_model_name in layer_selection.items(): |
|
print(f"Processing: {source_model_name} - {layer_name}") |
|
source_model_path = os.path.join( |
|
models_dir, source_model_name.replace("/", "_") |
|
) |
|
source_model = load_model(source_model_path, device="cpu") |
|
|
|
layer_parts = layer_name.split(".") |
|
source_layer = source_model |
|
for part in layer_parts: |
|
source_layer = getattr(source_layer, part) |
|
source_layer = source_layer.to("cuda") |
|
|
|
target_layer = base_model |
|
for part in layer_parts[:-1]: |
|
target_layer = getattr(target_layer, part) |
|
setattr(target_layer, layer_parts[-1], source_layer) |
|
|
|
print("Added to layer to composite model") |
|
del source_model, source_layer, part, target_layer, layer_parts |
|
torch.cuda.empty_cache() |
|
|
|
return base_model |
|
|
|
|
|
def get_num_layers(model_path: str) -> int: |
|
"""Dynamically determine the number of layers in the model.""" |
|
config = AutoConfig.from_pretrained(model_path) |
|
if hasattr(config, "num_hidden_layers"): |
|
return config.num_hidden_layers |
|
elif hasattr(config, "n_layer"): |
|
return config.n_layer |
|
else: |
|
raise ValueError("Could not determine the number of layers in the model.") |
|
|
|
|
|
def get_model_metrics(config: dict) -> None: |
|
"""Get metrics for all models in the configuration.""" |
|
models_dir = config["models_dir"] |
|
os.makedirs(models_dir, exist_ok=True) |
|
os.makedirs(config["output_dir"], exist_ok=True) |
|
models = [config["base_model"]] + config["fine_tuned_models"] |
|
metrics = ["ner"] |
|
|
|
for model_name in models: |
|
if metric_file_exists(model_name, config["metric_dir"]): |
|
print(f"Metric file for {model_name} already exists. Skipping...") |
|
continue |
|
|
|
local_model_path = download_model(model_name, models_dir) |
|
if not local_model_path: |
|
print(f"Skipping failed model: {model_name}") |
|
continue |
|
|
|
layer_metrics = process_model(model_name, local_model_path, metrics, config) |
|
save_metrics_to_json(model_name, layer_metrics, config["metric_dir"]) |
|
|
|
|
|
@torch.inference_mode() |
|
def process_model( |
|
model_name: str, local_model_path: str, metrics: list, config: dict |
|
) -> dict: |
|
"""Process a single model to calculate and save metrics.""" |
|
print(f"Processing model: {model_name}") |
|
with autocast(enabled=True): |
|
model = load_model(local_model_path) |
|
if not model: |
|
print(f"Failed to load model: {model_name}") |
|
return |
|
|
|
all_layers, layer_names = collect_and_normalize_weights(model) |
|
del model |
|
torch.cuda.synchronize() |
|
torch.cuda.empty_cache() |
|
|
|
layer_metrics = calculate_metrics_for_layers(layer_names, all_layers, metrics) |
|
del all_layers |
|
torch.cuda.synchronize() |
|
torch.cuda.empty_cache() |
|
|
|
save_metrics_to_json(model_name, layer_metrics, config["metric_dir"]) |
|
|
|
return layer_metrics |
|
|
|
|
|
def collect_and_normalize_weights( |
|
model: AutoModelForCausalLM, |
|
) -> tuple[list[torch.Tensor], list[str]]: |
|
"""Collect and normalize all layers from the model (only normalize once).""" |
|
all_layers = [ |
|
module.weight.data |
|
for name, module in model.named_modules() |
|
if hasattr(module, "weight") |
|
] |
|
|
|
for i, layer in enumerate(all_layers): |
|
if layer.ndim < 2: |
|
layer = layer.unsqueeze(0) |
|
layer = normalize_tensor(layer.to(torch.float32)) |
|
all_layers[i] = layer.to(torch.bfloat16) |
|
|
|
layer_names = [ |
|
name for name, module in model.named_modules() if hasattr(module, "weight") |
|
] |
|
return all_layers, layer_names |
|
|
|
|
|
def calculate_metrics_for_layers( |
|
layer_names: list[str], normalized_layers: list[torch.Tensor], metrics: list[str] |
|
) -> dict: |
|
"""Calculate metrics for each layer.""" |
|
layer_metrics = {} |
|
with torch.no_grad(): |
|
for idx, (name, normalized_layer) in enumerate( |
|
tqdm(zip(layer_names, normalized_layers), desc="Processing:") |
|
): |
|
print(f" Layer: {name}") |
|
layer_metrics[name] = {} |
|
|
|
print(f"Layer {name} shape: {normalized_layer.shape}") |
|
for metric in metrics: |
|
print(f"Calculating {metric} for layer {name}") |
|
try: |
|
result = calculate_normalized_effective_rank(normalized_layer) |
|
except Exception as e: |
|
print(f"Error calculating {metric} for layer {name}: {e}") |
|
result = 0.0 |
|
layer_metrics[name][metric] = result |
|
print(f"{metric} for layer {name}: {result}") |
|
|
|
torch.cuda.empty_cache() |
|
return layer_metrics |
|
|
|
|
|
def normalize_metrics(metrics: dict) -> dict: |
|
"""Normalize each metric to be between 0 and 1.""" |
|
normalized = {metric: [] for metric in next(iter(metrics.values())).keys()} |
|
|
|
for metric in normalized.keys(): |
|
values = [layer_metrics[metric] for layer_metrics in metrics.values()] |
|
min_val, max_val = min(values), max(values) |
|
normalized[metric] = [ |
|
0 if max_val == min_val else (v - min_val) / (max_val - min_val) |
|
for v in values |
|
] |
|
return normalized |
|
|
|
|
|
def merge_models(config: dict) -> None: |
|
"""Merge models based on the given configuration.""" |
|
all_metrics = load_all_metrics(config) |
|
layers = identify_layers(all_metrics) |
|
layer_selection = select_best_layers(layers, all_metrics) |
|
layer_selection = dict(sorted(layer_selection.items())) |
|
composite_model = create_composite_model( |
|
config["base_model"], layer_selection, config |
|
) |
|
save_composite_model(composite_model, layer_selection, config) |
|
|
|
|
|
def main(config_path: str) -> None: |
|
"""Main function to run the model merging process.""" |
|
config = load_config(config_path) |
|
|
|
get_model_metrics(config) |
|
print("Metric calculation completed.") |
|
|
|
merge_models(config) |
|
print(f"Saved composite model and merge report to: {config['output_dir']}") |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser( |
|
description="mastermerge: Advanced model merging tool" |
|
) |
|
parser.add_argument( |
|
"--config", |
|
type=str, |
|
default="mastermerge_config.yaml", |
|
help="Path to configuration file", |
|
) |
|
args = parser.parse_args() |
|
main(args.config) |
|
|