merges_d / mergekit /evo /strategy.py
Auber's picture
Upload folder using huggingface_hub
83a9b56 verified
raw
history blame
10.5 kB
# Copyright (C) 2024 Charles O. Goddard
#
# This software is free software: you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This software is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import asyncio
import logging
import os
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple, Union
import lm_eval.tasks
import numpy as np
import ray
import ray.util.queue
import ray.util.scheduling_strategies
import torch
from mergekit.evo.actors import InMemoryMergeEvaluator, OnDiskMergeEvaluator
from mergekit.evo.config import EvolMergeConfiguration
from mergekit.evo.genome import ModelGenome
from mergekit.evo.helpers import evaluate_model_ray, merge_model_ray
from mergekit.options import MergeOptions
class EvaluationStrategyBase(ABC):
def __init__(
self,
config: EvolMergeConfiguration,
genome: ModelGenome,
merge_options: MergeOptions,
num_gpus: Optional[int] = None,
batch_size: Optional[int] = None,
task_search_path: Union[str, List[str], None] = None,
model_storage_path: Optional[str] = None,
):
self.config = config
self.genome = genome
self.merge_options = merge_options
self.num_gpus = num_gpus or torch.cuda.device_count()
self.batch_size = batch_size
self.task_manager = lm_eval.tasks.TaskManager(include_path=task_search_path)
self.model_storage_path = model_storage_path
if self.model_storage_path:
os.makedirs(self.model_storage_path, exist_ok=True)
@abstractmethod
def evaluate_genotypes(self, genotypes: List[np.ndarray]) -> List[dict]:
pass
@abstractmethod
def evaluate_genotype(self, genotype: np.ndarray) -> dict:
pass
class ActorPoolEvaluationStrategy(EvaluationStrategyBase):
"""
Uses a fixed-size pool of actors to evaluate genotypes in parallel.
"""
def __init__(
self,
*args,
in_memory: bool = False,
vllm: bool = False,
**kwargs,
):
super().__init__(*args, **kwargs)
if in_memory:
self.actor_cls = InMemoryMergeEvaluator
else:
self.actor_cls = OnDiskMergeEvaluator
self.actor_pool = ray.util.ActorPool(
[
self.actor_cls.remote(
self.config,
self.genome,
self.merge_options,
model_storage_path=self.model_storage_path,
vllm=vllm,
batch_size=self.batch_size,
task_manager=self.task_manager,
)
for _ in range(self.num_gpus)
]
)
def evaluate_genotypes(self, genotypes: List[np.ndarray]) -> List[dict]:
return list(
self.actor_pool.map(
lambda a, x: a.evaluate_genotype.remote(x),
genotypes,
)
)
def evaluate_genotype(self, genotype: np.ndarray) -> dict:
return self.evaluate_genotypes([genotype])[0]
@ray.remote
class BufferedRayEvaluationStrategyActor:
def __init__(
self,
config: EvolMergeConfiguration,
genome: ModelGenome,
merge_options: MergeOptions,
vllm: bool = False,
num_gpus: Optional[int] = None,
batch_size: Optional[int] = None,
task_manager: Optional[lm_eval.tasks.TaskManager] = None,
model_storage_path: Optional[str] = None,
):
self.config = config
self.genome = genome
self.merge_options = merge_options
self.vllm = vllm
self.num_gpus = num_gpus or torch.cuda.device_count()
self.input_queue = []
self.batch_size = batch_size
self.task_manager = task_manager
self.model_storage_path = model_storage_path
self._shutdown = False
async def evaluate_genotype(self, genotype: np.ndarray):
future_result = asyncio.Future()
self.input_queue.append((genotype, future_result))
return await future_result
async def process_queue(self):
merging: Dict[ray.ObjectRef, asyncio.Future] = {}
merged: List[Tuple[asyncio.Future, ray.ObjectRef]] = []
evaluating: Dict[ray.ObjectRef, asyncio.Future] = {}
logging.info("Starting processing loop")
try:
while not self._shutdown:
while self.input_queue and (len(merging) + len(merged) < self.num_gpus):
genotype, future_result = self.input_queue.pop(0)
merging[
merge_model_ray.remote(
genotype,
self.genome,
self.model_storage_path,
self.merge_options,
)
] = future_result
while merged and len(evaluating) < self.num_gpus:
future_result, merged_path = merged.pop()
evaluating[
evaluate_model_ray.remote(
merged_path,
self.config.tasks,
num_fewshot=self.config.num_fewshot,
limit=self.config.limit,
vllm=self.vllm,
batch_size=self.batch_size,
task_manager=self.task_manager,
)
] = future_result
ready, _ = ray.wait(
list(merging.keys()) + list(evaluating.keys()),
num_returns=1,
fetch_local=False,
timeout=1,
)
for r in ready:
if r in merging:
future_result = merging.pop(r)
merged.append((future_result, r))
elif r in evaluating:
future_result = evaluating.pop(r)
future_result.set_result(await r)
if (
not self.input_queue
and not merging
and not merged
and not evaluating
):
await asyncio.sleep(1)
except Exception as e:
logging.error("Error in processing loop", exc_info=e)
raise
async def shutdown(self):
self._shutdown = True
class BufferedRayEvaluationStrategy(EvaluationStrategyBase):
def __init__(
self,
*args,
vllm: bool = False,
in_memory: bool = False,
**kwargs,
):
if in_memory:
raise ValueError("In-memory evaluation is not supported for buffered mode")
super().__init__(*args, **kwargs)
self.actor = BufferedRayEvaluationStrategyActor.options(
max_concurrency=1000
).remote(
self.config,
self.genome,
self.merge_options,
model_storage_path=self.model_storage_path,
vllm=vllm,
num_gpus=self.num_gpus,
task_manager=self.task_manager,
)
self.actor.process_queue.remote()
def evaluate_genotypes(self, genotypes: List[np.ndarray]) -> List[dict]:
return ray.get([self.actor.evaluate_genotype.remote(x) for x in genotypes])
def evaluate_genotype(self, genotype: np.ndarray) -> dict:
return ray.get(self.actor.evaluate_genotype.remote(genotype))
@ray.remote
def evaluate_genotype_serial(
genotype: np.ndarray,
config: EvolMergeConfiguration,
genome: ModelGenome,
merge_options: MergeOptions,
model_storage_path: Optional[str] = None,
vllm: bool = False,
batch_size: Optional[int] = None,
task_manager: Optional[lm_eval.tasks.TaskManager] = None,
):
pg = ray.util.placement_group([{"CPU": 1, "GPU": 1}], strategy="STRICT_PACK")
strat = ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy(
placement_group=pg
)
merged_path = merge_model_ray.options(scheduling_strategy=strat).remote(
genotype, genome, model_storage_path, merge_options
)
if not merged_path:
return {"score": None, "results": None}
res = ray.get(
evaluate_model_ray.options(scheduling_strategy=strat).remote(
merged_path,
config.tasks,
num_fewshot=config.num_fewshot,
limit=config.limit,
vllm=vllm,
batch_size=batch_size,
task_manager=task_manager,
)
)
ray.util.remove_placement_group(pg)
return res
class SerialEvaluationStrategy(EvaluationStrategyBase):
def __init__(
self,
*args,
vllm: bool = False,
in_memory: bool = False,
**kwargs,
):
self.vllm = vllm
if in_memory:
raise ValueError("In-memory evaluation is not supported for serial mode")
super().__init__(*args, **kwargs)
def evaluate_genotypes(self, genotypes: List[np.ndarray]) -> List[dict]:
return ray.get(
[
evaluate_genotype_serial.remote(
x,
self.config,
self.genome,
self.merge_options,
model_storage_path=self.model_storage_path,
vllm=self.vllm,
batch_size=self.batch_size,
task_manager=self.task_manager,
)
for x in genotypes
]
)
def evaluate_genotype(self, genotype: np.ndarray) -> dict:
return self.evaluate_genotypes([genotype])[0]