""" NER (Normalized Effective Rank) quantifies dimensional utilization across layers using entropy analysis of singular value distributions. NER calculation involves Singular Value Decomposition (SVD) of weight matrix A. Singular values form a probability distribution through normalization. Entropy H of this distribution yields the Effective Rank (ERank) as 2^H. Normalizing by maximum possible entropy H_max produces a value between 0 and 1, measuring dimensional utilization efficiency. Run the script with: python mastermerge.py --config mastermerge_config.yaml (optional) The script loads configuration, processes each model by downloading, loading weights, normalizing layers, calculating NER for each layer, using NER to identify the optimal layer, finally creating a composite model with the highest ner in each layer. **License** Use, modify, and distribute as you see fit. Good luck with that shit. Copyright 2024, nobody. No rights reserved. """ import torch import json import argparse import shutil from tqdm import tqdm import os import yaml from typing import Optional from datetime import datetime from torch.cuda.amp import autocast from huggingface_hub import snapshot_download from transformers import AutoModelForCausalLM from transformers import AutoConfig def download_model(model_name: str, models_dir: str) -> Optional[str]: """Download model from Hugging Face Hub.""" local_path = os.path.join(models_dir, model_name.replace("/", "_")) if not os.path.exists(local_path): print(f"Downloading {model_name} to {local_path}") try: snapshot_download( repo_id=model_name, local_dir=local_path, local_dir_use_symlinks=False, revision="main", ) print(f"Successfully downloaded {model_name}") except Exception as e: print(f"Error downloading {model_name}: {e}") return None else: print(f"Model {model_name} already exists at {local_path}") return local_path def load_model(model_path: str, device: str = "cuda") -> Optional[AutoModelForCausalLM]: """Load model from local path.""" try: return AutoModelForCausalLM.from_pretrained( model_path, torch_dtype=torch.bfloat16, low_cpu_mem_usage=True, trust_remote_code=True, device_map=device, ) except Exception as e: print(f"Error loading model: {e}") return None def calculate_normalized_effective_rank(A: torch.Tensor) -> float: """ "Calculate the Normalized Effective Rank (NER) of a matrix.""" try: # get the singular values if A.dtype != torch.float32: A = A.float() if A.dim() == 1: A = A.unsqueeze(0) if 1 in A.shape: S = A.abs().view(-1) else: S = torch.linalg.svdvals(A) S = S[S > 1e-12] if S.numel() == 0: return 0.0 # normalize the singular values S_sum = S.sum() S /= S_sum # calculate and return normalized effective rank log_S = torch.log2(S) H = -torch.dot(S, log_S) H_max = torch.log2( torch.tensor(float(S.numel()), dtype=torch.float32, device=S.device) ) return (H / H_max).item() if H_max > 0 else 0.0 except Exception as e: print(f"Error calculating NER: {e}") return 0.0 def normalize_tensor(A: torch.Tensor) -> torch.Tensor: """Normalize input tensor.""" A_min, A_max = A.min(), A.max() return (A - A_min) / max(A_max - A_min, 1e-10) def save_metrics_to_json(model_name: str, layer_metrics: dict, output_dir: str) -> None: model_name_slug = model_name.replace("/", "-").replace("_", "-") filename = os.path.join(output_dir, f"metrics_results_{model_name_slug}.json") with open(filename, "w") as f: json.dump(layer_metrics, f, indent=4) print(f"Metrics saved to {filename}") def load_config(config_path: str) -> dict: """Load configuration from YAML file.""" with open(config_path, "r") as file: return yaml.safe_load(file) def metric_file_exists(model_name: str, metric_dir: str) -> bool: """Check if metric file already exists for the given model.""" model_name_slug = model_name.replace("/", "-").replace("_", "-") filename = os.path.join(metric_dir, f"metrics_results_{model_name_slug}.json") return os.path.exists(filename) def load_all_metrics(config: dict) -> dict: """Load all metrics from the metric directory.""" all_metrics = {} for model_name in [config["base_model"]] + config["fine_tuned_models"]: model_name_slug = model_name.replace("/", "-").replace("_", "-") filename = os.path.join( config["metric_dir"], f"metrics_results_{model_name_slug}.json" ) with open(filename, "r") as f: all_metrics[model_name] = json.load(f) return all_metrics def identify_common_layers(all_metrics: dict) -> list: """Identify common layers across all models.""" layer_sets = [set(model_metrics.keys()) for model_metrics in all_metrics.values()] common_layers = set.intersection(*layer_sets) return list(common_layers) def identify_layers(all_metrics: dict) -> list: """Identify the superset of layers across all models, maintaining their relative order.""" superset_layers = [] added_layers = set() for model_metrics in all_metrics.values(): for layer in model_metrics.keys(): if layer not in added_layers: superset_layers.append(layer) added_layers.add(layer) return superset_layers def select_best_layers(common_layers: list, all_metrics: dict) -> dict: """Select best layers""" layer_selection = {} for layer in common_layers: best_model = max( all_metrics.keys(), key=lambda model: all_metrics[model][layer]["ner"] ) layer_selection[layer] = best_model print("Selected layers:") print(json.dumps(layer_selection, indent=4)) return layer_selection def save_composite_model( composite_model: AutoModelForCausalLM, layer_selection: dict, config: dict ) -> None: """Save composite model to the output directory.""" date_str = datetime.now().strftime("%Y%m%d_%H%M%S") output_name = f"composite_model_{date_str}" output_dir = os.path.join(config["output_dir"], output_name) os.makedirs(output_dir, exist_ok=True) composite_model.save_pretrained(output_dir) generate_merge_report(layer_selection, output_dir, config) # Copy tokenizer files from the base model to the output directory base_model_path = os.path.join( config["models_dir"], config["base_model"].replace("/", "_") ) tokenizer_files = ["tokenizer_config.json", "tokenizer.json", "vocab.json"] for file in tokenizer_files: src_path = os.path.join(base_model_path, file) dst_path = os.path.join(output_dir, file) if os.path.exists(src_path): shutil.copy2(src_path, dst_path) else: print(f"Warning: {file} not found in the base model directory.") print(f"Composite model and tokenizer files saved to: {output_dir}") def generate_merge_report(layer_selection: dict, output_dir, config: dict) -> None: """Generate merge report and save to the output directory.""" report = { "base_model": config["base_model"], "fine_tuned_models": config["fine_tuned_models"], "layer_selection": layer_selection, } report_file = os.path.join(output_dir, "merge_report.json") with open(report_file, "w") as f: json.dump(report, f, indent=4) print(f"Merge report saved to {report_file}") print(json.dumps(report, indent=4)) def create_composite_model( base_model_name: str, layer_selection: dict, config: dict ) -> AutoModelForCausalLM: """Create composite model by merging selected layers.""" models_dir = config["models_dir"] base_model_path = os.path.join(models_dir, base_model_name.replace("/", "_")) base_model = load_model(base_model_path) for layer_name, source_model_name in layer_selection.items(): print(f"Processing: {source_model_name} - {layer_name}") source_model_path = os.path.join( models_dir, source_model_name.replace("/", "_") ) source_model = load_model(source_model_path, device="cpu") layer_parts = layer_name.split(".") source_layer = source_model for part in layer_parts: source_layer = getattr(source_layer, part) source_layer = source_layer.to("cuda") target_layer = base_model for part in layer_parts[:-1]: target_layer = getattr(target_layer, part) setattr(target_layer, layer_parts[-1], source_layer) print("Added to layer to composite model") del source_model, source_layer, part, target_layer, layer_parts torch.cuda.empty_cache() return base_model def get_num_layers(model_path: str) -> int: """Dynamically determine the number of layers in the model.""" config = AutoConfig.from_pretrained(model_path) if hasattr(config, "num_hidden_layers"): return config.num_hidden_layers elif hasattr(config, "n_layer"): return config.n_layer else: raise ValueError("Could not determine the number of layers in the model.") def get_model_metrics(config: dict) -> None: """Get metrics for all models in the configuration.""" models_dir = config["models_dir"] os.makedirs(models_dir, exist_ok=True) os.makedirs(config["output_dir"], exist_ok=True) models = [config["base_model"]] + config["fine_tuned_models"] metrics = ["ner"] for model_name in models: if metric_file_exists(model_name, config["metric_dir"]): print(f"Metric file for {model_name} already exists. Skipping...") continue local_model_path = download_model(model_name, models_dir) if not local_model_path: print(f"Skipping failed model: {model_name}") continue layer_metrics = process_model(model_name, local_model_path, metrics, config) save_metrics_to_json(model_name, layer_metrics, config["metric_dir"]) @torch.inference_mode() def process_model( model_name: str, local_model_path: str, metrics: list, config: dict ) -> dict: """Process a single model to calculate and save metrics.""" print(f"Processing model: {model_name}") with autocast(enabled=True): model = load_model(local_model_path) if not model: print(f"Failed to load model: {model_name}") return all_layers, layer_names = collect_and_normalize_weights(model) del model torch.cuda.synchronize() torch.cuda.empty_cache() layer_metrics = calculate_metrics_for_layers(layer_names, all_layers, metrics) del all_layers torch.cuda.synchronize() torch.cuda.empty_cache() save_metrics_to_json(model_name, layer_metrics, config["metric_dir"]) return layer_metrics def collect_and_normalize_weights( model: AutoModelForCausalLM, ) -> tuple[list[torch.Tensor], list[str]]: """Collect and normalize all layers from the model (only normalize once).""" all_layers = [ module.weight.data for name, module in model.named_modules() if hasattr(module, "weight") ] for i, layer in enumerate(all_layers): # Normalize weights if layer.ndim < 2: layer = layer.unsqueeze(0) # Make it at least 2D layer = normalize_tensor(layer.to(torch.float32)) all_layers[i] = layer.to(torch.bfloat16) # Back to bfloat16 and original device layer_names = [ name for name, module in model.named_modules() if hasattr(module, "weight") ] return all_layers, layer_names def calculate_metrics_for_layers( layer_names: list[str], normalized_layers: list[torch.Tensor], metrics: list[str] ) -> dict: """Calculate metrics for each layer.""" layer_metrics = {} with torch.no_grad(): for idx, (name, normalized_layer) in enumerate( tqdm(zip(layer_names, normalized_layers), desc="Processing:") ): print(f" Layer: {name}") layer_metrics[name] = {} print(f"Layer {name} shape: {normalized_layer.shape}") for metric in metrics: print(f"Calculating {metric} for layer {name}") try: result = calculate_normalized_effective_rank(normalized_layer) except Exception as e: print(f"Error calculating {metric} for layer {name}: {e}") result = 0.0 layer_metrics[name][metric] = result print(f"{metric} for layer {name}: {result}") torch.cuda.empty_cache() return layer_metrics def normalize_metrics(metrics: dict) -> dict: """Normalize each metric to be between 0 and 1.""" normalized = {metric: [] for metric in next(iter(metrics.values())).keys()} for metric in normalized.keys(): values = [layer_metrics[metric] for layer_metrics in metrics.values()] min_val, max_val = min(values), max(values) normalized[metric] = [ 0 if max_val == min_val else (v - min_val) / (max_val - min_val) for v in values ] return normalized def merge_models(config: dict) -> None: """Merge models based on the given configuration.""" all_metrics = load_all_metrics(config) layers = identify_layers(all_metrics) layer_selection = select_best_layers(layers, all_metrics) layer_selection = dict(sorted(layer_selection.items())) composite_model = create_composite_model( config["base_model"], layer_selection, config ) save_composite_model(composite_model, layer_selection, config) def main(config_path: str) -> None: """Main function to run the model merging process.""" config = load_config(config_path) get_model_metrics(config) print("Metric calculation completed.") merge_models(config) print(f"Saved composite model and merge report to: {config['output_dir']}") if __name__ == "__main__": parser = argparse.ArgumentParser( description="mastermerge: Advanced model merging tool" ) parser.add_argument( "--config", type=str, default="mastermerge_config.yaml", help="Path to configuration file", ) args = parser.parse_args() main(args.config)