jeffmeloy's picture
Update ner_merge.py
b391c98 verified
raw
history blame
14.7 kB
"""
NER (Normalized Effective Rank) quantifies dimensional utilization across layers using entropy analysis of singular value distributions. NER calculation involves Singular Value Decomposition (SVD) of weight matrix A. Singular values form a probability distribution through normalization. Entropy H of this distribution yields the Effective Rank (ERank) as 2^H. Normalizing by maximum possible entropy H_max produces a value between 0 and 1, measuring dimensional utilization efficiency.
Run the script with:
python mastermerge.py --config mastermerge_config.yaml (optional)
The script loads configuration, processes each model by downloading, loading weights, normalizing layers, calculating NER for each layer, using NER to identify the optimal layer, finally creating a composite model with the highest ner in each layer.
**License**
Use, modify, and distribute as you see fit. Good luck with that shit.
Copyright 2024, nobody. No rights reserved.
"""
import torch
import json
import argparse
import shutil
from tqdm import tqdm
import os
import yaml
from typing import Optional
from datetime import datetime
from torch.cuda.amp import autocast
from huggingface_hub import snapshot_download
from transformers import AutoModelForCausalLM
from transformers import AutoConfig
def download_model(model_name: str, models_dir: str) -> Optional[str]:
"""Download model from Hugging Face Hub."""
local_path = os.path.join(models_dir, model_name.replace("/", "_"))
if not os.path.exists(local_path):
print(f"Downloading {model_name} to {local_path}")
try:
snapshot_download(
repo_id=model_name,
local_dir=local_path,
local_dir_use_symlinks=False,
revision="main",
)
print(f"Successfully downloaded {model_name}")
except Exception as e:
print(f"Error downloading {model_name}: {e}")
return None
else:
print(f"Model {model_name} already exists at {local_path}")
return local_path
def load_model(model_path: str, device: str = "cuda") -> Optional[AutoModelForCausalLM]:
"""Load model from local path."""
try:
return AutoModelForCausalLM.from_pretrained(
model_path,
torch_dtype=torch.bfloat16,
low_cpu_mem_usage=True,
trust_remote_code=True,
device_map=device,
)
except Exception as e:
print(f"Error loading model: {e}")
return None
def calculate_normalized_effective_rank(A: torch.Tensor) -> float:
""" "Calculate the Normalized Effective Rank (NER) of a matrix."""
try:
# get the singular values
if A.dtype != torch.float32:
A = A.float()
if A.dim() == 1:
A = A.unsqueeze(0)
if 1 in A.shape:
S = A.abs().view(-1)
else:
S = torch.linalg.svdvals(A)
S = S[S > 1e-12]
if S.numel() == 0:
return 0.0
# normalize the singular values
S_sum = S.sum()
S /= S_sum
# calculate and return normalized effective rank
log_S = torch.log2(S)
H = -torch.dot(S, log_S)
H_max = torch.log2(
torch.tensor(float(S.numel()), dtype=torch.float32, device=S.device)
)
return (H / H_max).item() if H_max > 0 else 0.0
except Exception as e:
print(f"Error calculating NER: {e}")
return 0.0
def normalize_tensor(A: torch.Tensor) -> torch.Tensor:
"""Normalize input tensor."""
A_min, A_max = A.min(), A.max()
return (A - A_min) / max(A_max - A_min, 1e-10)
def save_metrics_to_json(model_name: str, layer_metrics: dict, output_dir: str) -> None:
model_name_slug = model_name.replace("/", "-").replace("_", "-")
filename = os.path.join(output_dir, f"metrics_results_{model_name_slug}.json")
with open(filename, "w") as f:
json.dump(layer_metrics, f, indent=4)
print(f"Metrics saved to {filename}")
def load_config(config_path: str) -> dict:
"""Load configuration from YAML file."""
with open(config_path, "r") as file:
return yaml.safe_load(file)
def metric_file_exists(model_name: str, metric_dir: str) -> bool:
"""Check if metric file already exists for the given model."""
model_name_slug = model_name.replace("/", "-").replace("_", "-")
filename = os.path.join(metric_dir, f"metrics_results_{model_name_slug}.json")
return os.path.exists(filename)
def load_all_metrics(config: dict) -> dict:
"""Load all metrics from the metric directory."""
all_metrics = {}
for model_name in [config["base_model"]] + config["fine_tuned_models"]:
model_name_slug = model_name.replace("/", "-").replace("_", "-")
filename = os.path.join(
config["metric_dir"], f"metrics_results_{model_name_slug}.json"
)
with open(filename, "r") as f:
all_metrics[model_name] = json.load(f)
return all_metrics
def identify_common_layers(all_metrics: dict) -> list:
"""Identify common layers across all models."""
layer_sets = [set(model_metrics.keys()) for model_metrics in all_metrics.values()]
common_layers = set.intersection(*layer_sets)
return list(common_layers)
def identify_layers(all_metrics: dict) -> list:
"""Identify the superset of layers across all models, maintaining their relative order."""
superset_layers = []
added_layers = set()
for model_metrics in all_metrics.values():
for layer in model_metrics.keys():
if layer not in added_layers:
superset_layers.append(layer)
added_layers.add(layer)
return superset_layers
def select_best_layers(common_layers: list, all_metrics: dict) -> dict:
"""Select best layers"""
layer_selection = {}
for layer in common_layers:
best_model = max(
all_metrics.keys(), key=lambda model: all_metrics[model][layer]["ner"]
)
layer_selection[layer] = best_model
print("Selected layers:")
print(json.dumps(layer_selection, indent=4))
return layer_selection
def save_composite_model(
composite_model: AutoModelForCausalLM, layer_selection: dict, config: dict
) -> None:
"""Save composite model to the output directory."""
date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
output_name = f"composite_model_{date_str}"
output_dir = os.path.join(config["output_dir"], output_name)
os.makedirs(output_dir, exist_ok=True)
composite_model.save_pretrained(output_dir)
generate_merge_report(layer_selection, output_dir, config)
# Copy tokenizer files from the base model to the output directory
base_model_path = os.path.join(
config["models_dir"], config["base_model"].replace("/", "_")
)
tokenizer_files = ["tokenizer_config.json", "tokenizer.json", "vocab.json"]
for file in tokenizer_files:
src_path = os.path.join(base_model_path, file)
dst_path = os.path.join(output_dir, file)
if os.path.exists(src_path):
shutil.copy2(src_path, dst_path)
else:
print(f"Warning: {file} not found in the base model directory.")
print(f"Composite model and tokenizer files saved to: {output_dir}")
def generate_merge_report(layer_selection: dict, output_dir, config: dict) -> None:
"""Generate merge report and save to the output directory."""
report = {
"base_model": config["base_model"],
"fine_tuned_models": config["fine_tuned_models"],
"layer_selection": layer_selection,
}
report_file = os.path.join(output_dir, "merge_report.json")
with open(report_file, "w") as f:
json.dump(report, f, indent=4)
print(f"Merge report saved to {report_file}")
print(json.dumps(report, indent=4))
def create_composite_model(
base_model_name: str, layer_selection: dict, config: dict
) -> AutoModelForCausalLM:
"""Create composite model by merging selected layers."""
models_dir = config["models_dir"]
base_model_path = os.path.join(models_dir, base_model_name.replace("/", "_"))
base_model = load_model(base_model_path)
for layer_name, source_model_name in layer_selection.items():
print(f"Processing: {source_model_name} - {layer_name}")
source_model_path = os.path.join(
models_dir, source_model_name.replace("/", "_")
)
source_model = load_model(source_model_path, device="cpu")
layer_parts = layer_name.split(".")
source_layer = source_model
for part in layer_parts:
source_layer = getattr(source_layer, part)
source_layer = source_layer.to("cuda")
target_layer = base_model
for part in layer_parts[:-1]:
target_layer = getattr(target_layer, part)
setattr(target_layer, layer_parts[-1], source_layer)
print("Added to layer to composite model")
del source_model, source_layer, part, target_layer, layer_parts
torch.cuda.empty_cache()
return base_model
def get_num_layers(model_path: str) -> int:
"""Dynamically determine the number of layers in the model."""
config = AutoConfig.from_pretrained(model_path)
if hasattr(config, "num_hidden_layers"):
return config.num_hidden_layers
elif hasattr(config, "n_layer"):
return config.n_layer
else:
raise ValueError("Could not determine the number of layers in the model.")
def get_model_metrics(config: dict) -> None:
"""Get metrics for all models in the configuration."""
models_dir = config["models_dir"]
os.makedirs(models_dir, exist_ok=True)
os.makedirs(config["output_dir"], exist_ok=True)
models = [config["base_model"]] + config["fine_tuned_models"]
metrics = ["ner"]
for model_name in models:
if metric_file_exists(model_name, config["metric_dir"]):
print(f"Metric file for {model_name} already exists. Skipping...")
continue
local_model_path = download_model(model_name, models_dir)
if not local_model_path:
print(f"Skipping failed model: {model_name}")
continue
layer_metrics = process_model(model_name, local_model_path, metrics, config)
save_metrics_to_json(model_name, layer_metrics, config["metric_dir"])
@torch.inference_mode()
def process_model(
model_name: str, local_model_path: str, metrics: list, config: dict
) -> dict:
"""Process a single model to calculate and save metrics."""
print(f"Processing model: {model_name}")
with autocast(enabled=True):
model = load_model(local_model_path)
if not model:
print(f"Failed to load model: {model_name}")
return
all_layers, layer_names = collect_and_normalize_weights(model)
del model
torch.cuda.synchronize()
torch.cuda.empty_cache()
layer_metrics = calculate_metrics_for_layers(layer_names, all_layers, metrics)
del all_layers
torch.cuda.synchronize()
torch.cuda.empty_cache()
save_metrics_to_json(model_name, layer_metrics, config["metric_dir"])
return layer_metrics
def collect_and_normalize_weights(
model: AutoModelForCausalLM,
) -> tuple[list[torch.Tensor], list[str]]:
"""Collect and normalize all layers from the model (only normalize once)."""
all_layers = [
module.weight.data
for name, module in model.named_modules()
if hasattr(module, "weight")
]
for i, layer in enumerate(all_layers): # Normalize weights
if layer.ndim < 2:
layer = layer.unsqueeze(0) # Make it at least 2D
layer = normalize_tensor(layer.to(torch.float32))
all_layers[i] = layer.to(torch.bfloat16) # Back to bfloat16 and original device
layer_names = [
name for name, module in model.named_modules() if hasattr(module, "weight")
]
return all_layers, layer_names
def calculate_metrics_for_layers(
layer_names: list[str], normalized_layers: list[torch.Tensor], metrics: list[str]
) -> dict:
"""Calculate metrics for each layer."""
layer_metrics = {}
with torch.no_grad():
for idx, (name, normalized_layer) in enumerate(
tqdm(zip(layer_names, normalized_layers), desc="Processing:")
):
print(f" Layer: {name}")
layer_metrics[name] = {}
print(f"Layer {name} shape: {normalized_layer.shape}")
for metric in metrics:
print(f"Calculating {metric} for layer {name}")
try:
result = calculate_normalized_effective_rank(normalized_layer)
except Exception as e:
print(f"Error calculating {metric} for layer {name}: {e}")
result = 0.0
layer_metrics[name][metric] = result
print(f"{metric} for layer {name}: {result}")
torch.cuda.empty_cache()
return layer_metrics
def normalize_metrics(metrics: dict) -> dict:
"""Normalize each metric to be between 0 and 1."""
normalized = {metric: [] for metric in next(iter(metrics.values())).keys()}
for metric in normalized.keys():
values = [layer_metrics[metric] for layer_metrics in metrics.values()]
min_val, max_val = min(values), max(values)
normalized[metric] = [
0 if max_val == min_val else (v - min_val) / (max_val - min_val)
for v in values
]
return normalized
def merge_models(config: dict) -> None:
"""Merge models based on the given configuration."""
all_metrics = load_all_metrics(config)
layers = identify_layers(all_metrics)
layer_selection = select_best_layers(layers, all_metrics)
layer_selection = dict(sorted(layer_selection.items()))
composite_model = create_composite_model(
config["base_model"], layer_selection, config
)
save_composite_model(composite_model, layer_selection, config)
def main(config_path: str) -> None:
"""Main function to run the model merging process."""
config = load_config(config_path)
get_model_metrics(config)
print("Metric calculation completed.")
merge_models(config)
print(f"Saved composite model and merge report to: {config['output_dir']}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="mastermerge: Advanced model merging tool"
)
parser.add_argument(
"--config",
type=str,
default="mastermerge_config.yaml",
help="Path to configuration file",
)
args = parser.parse_args()
main(args.config)