|
|
|
|
|
|
|
"""Implements a Hugging Causal LM wrapped inside a :class:`.ComposerModel`.""" |
|
|
|
import os |
|
from copy import deepcopy |
|
import warnings |
|
import numpy as np |
|
import logging |
|
from typing import ( |
|
TYPE_CHECKING, |
|
Any, |
|
List, |
|
Mapping, |
|
Optional, |
|
Tuple, |
|
Union, |
|
Dict, |
|
) |
|
|
|
import torch |
|
import torch.nn as nn |
|
from types import SimpleNamespace |
|
from composer.models.huggingface import peft_installed |
|
from composer.utils import dist |
|
|
|
from torchmetrics import Metric |
|
from transformers import ( |
|
AutoConfig, |
|
AutoModelForCausalLM, |
|
PretrainedConfig, |
|
PreTrainedModel, |
|
PreTrainedTokenizerBase, |
|
PreTrainedTokenizerFast, |
|
PreTrainedTokenizer, |
|
) |
|
|
|
from llmfoundry.models.hf.hf_fsdp import hf_get_init_device |
|
from llmfoundry.models.layers.attention import is_flash_v2_installed |
|
from llmfoundry.models.utils import init_empty_weights |
|
from llmfoundry.utils.config_utils import get_hf_config_value |
|
|
|
from composer.models.huggingface import HuggingFaceModel |
|
from compose_rl.reward_learning.utils import prepare_hf_sequence_classification_model_for_fsdp, SequenceClassifierOutput |
|
|
|
if TYPE_CHECKING: |
|
from peft import PeftModel |
|
|
|
__all__ = ['ComposerHFSequenceClassification'] |
|
|
|
log = logging.getLogger(__name__) |
|
|
|
|
|
Tokenizer = Union[PreTrainedTokenizer, PreTrainedTokenizerFast] |
|
|
|
|
|
def layer_init(layer: nn.Module, std: float=np.sqrt(2), bias_const: float=0.0): |
|
torch.nn.init.normal_(layer.weight, std=std) |
|
torch.nn.init.constant_(layer.bias, val=bias_const) |
|
return layer |
|
|
|
|
|
class RewardModelConfig(PretrainedConfig): |
|
model_type = "pairwise_rm" |
|
|
|
def __init__( |
|
self, |
|
base_model: str = "meta-llama/Meta-Llama-3-70B-Instruct", |
|
base_config: PretrainedConfig = AutoConfig.from_pretrained("meta-llama/Meta-Llama-3-70B-Instruct"), |
|
p_dropout: float = 0.0, |
|
n_labels: int = 1, |
|
bias: float = 0.0, |
|
return_logits: bool = False, |
|
pretrain_cfg: Dict[str, Any] = {}, |
|
pretrained: bool = False, |
|
**kwargs: Any, |
|
): |
|
super().__init__(**kwargs) |
|
self.base_model = base_model |
|
self.base_config = base_config |
|
temp_config = deepcopy(base_config) |
|
if not isinstance(base_config, dict): |
|
temp_config = base_config.__dict__ |
|
for key, value in temp_config.items(): |
|
if key not in ["_name_or_path", "architectures"]: |
|
setattr(self, key, value) |
|
self.p_dropout = p_dropout |
|
self.n_labels = n_labels |
|
self.bias = bias |
|
self.return_logits = return_logits |
|
self.pretrain_cfg = pretrain_cfg |
|
self.pretrained = pretrained |
|
|
|
|
|
class ValueHead(nn.Module): |
|
|
|
def __init__(self, config: RewardModelConfig): |
|
super().__init__() |
|
self.dense = nn.Linear(config.hidden_size, config.hidden_size) |
|
self.dropout = nn.Dropout(config.p_dropout) |
|
self.score = layer_init( |
|
nn.Linear(config.hidden_size, config.n_labels), |
|
std=1 / np.sqrt(config.hidden_size + 1), |
|
) |
|
self.score = nn.Linear(config.hidden_size, config.n_labels) |
|
|
|
def forward(self, hidden_states: torch.Tensor, **kwargs: Any): |
|
hidden_states = self.dropout(hidden_states) |
|
hidden_states = self.dense(hidden_states) |
|
hidden_states = torch.tanh(hidden_states) |
|
hidden_states = self.dropout(hidden_states) |
|
output = self.score(hidden_states) |
|
return output |
|
|
|
|
|
class AutoModelForCausalLMWithRM(PreTrainedModel): |
|
config_class = RewardModelConfig |
|
|
|
def __init__(self, config: RewardModelConfig): |
|
super().__init__(config) |
|
self.config = config |
|
pretrain_cfg = config.pretrain_cfg |
|
pretrained = config.pretrained |
|
if pretrained: |
|
self.lm_backbone = AutoModelForCausalLM.from_pretrained( |
|
config.base_model, |
|
config=config.base_config, |
|
**pretrain_cfg, |
|
) |
|
else: |
|
|
|
if isinstance(config.base_config, dict): |
|
config.base_config = AutoConfig.from_pretrained(config.base_model, **config.base_config) |
|
self.lm_backbone = AutoModelForCausalLM.from_config( |
|
config.base_config, |
|
trust_remote_code=True, |
|
) |
|
self.value_head = ValueHead(config) |
|
|
|
def generate(self, *args: Any, **kwargs: Any): |
|
return self.lm_backbone.generate(**kwargs) |
|
|
|
def resize_token_embeddings( |
|
self, new_num_tokens: Optional[int] = None, pad_to_multiple_of: Optional[int] = None |
|
) -> nn.Embedding: |
|
|
|
self.config.base_config.vocab_size = new_num_tokens |
|
model_embeds = super().resize_token_embeddings(new_num_tokens=new_num_tokens, pad_to_multiple_of=pad_to_multiple_of) |
|
return model_embeds |
|
|
|
def set_input_embeddings(self, new_embeddings): |
|
return self.lm_backbone.set_input_embeddings(new_embeddings) |
|
|
|
def get_input_embeddings(self): |
|
return self.lm_backbone.get_input_embeddings() |
|
|
|
def set_output_embeddings(self, new_embeddings): |
|
return self.lm_backbone.set_output_embeddings(new_embeddings) |
|
|
|
def get_output_embeddings(self): |
|
return self.lm_backbone.get_output_embeddings() |
|
|
|
def forward( |
|
self, |
|
input_ids: torch.LongTensor = None, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
position_ids: Optional[torch.LongTensor] = None, |
|
past_key_values: Optional[Any] = None, |
|
inputs_embeds: Optional[torch.FloatTensor] = None, |
|
labels: Optional[torch.LongTensor] = None, |
|
use_cache: Optional[bool] = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
cache_position: Optional[torch.LongTensor] = None, |
|
**kwargs: Any, |
|
): |
|
output = self.lm_backbone( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
position_ids=position_ids, |
|
past_key_values=past_key_values, |
|
inputs_embeds=inputs_embeds, |
|
labels=labels, |
|
use_cache=use_cache, |
|
output_attentions=output_attentions, |
|
output_hidden_states=True, |
|
return_dict=True, |
|
cache_position=cache_position, |
|
) |
|
scores = self.value_head(output.hidden_states[-1]).squeeze(-1) - self.config.bias |
|
|
|
logits = None |
|
if self.config.return_logits: |
|
logits = output.logits |
|
|
|
return SequenceClassifierOutput( |
|
loss=output.loss, |
|
scores=scores, |
|
logits=logits, |
|
past_key_values=output.past_key_values, |
|
hidden_states=output.hidden_states, |
|
attentions=output.attentions, |
|
) |
|
|
|
|
|
class ComposerHFSequenceClassification(HuggingFaceModel): |
|
|
|
"""Configures a :class:`.HuggingFaceModel` around a Causal LM. |
|
|
|
Args: |
|
pretrained_model_name_or_path (str): The name of or local path to |
|
the HF Causal LM (e.g., `gpt2` to instantiate a GPT2LMHeadModel). |
|
config_overrides (dict, optional): An optional dictionary of keyword |
|
arguments that override the default configuration associated with |
|
cfg.pretrained_model_name_or_path. |
|
pretrained (bool): Whether to instantiate the model with pre-trained |
|
weights coming from cfg.pretrained_model_name_or_path. If ``True``, |
|
cfg.config_overrides must be compatible with the pre-trained weights. |
|
init_device ('cpu' | 'meta'): Which device, 'cpu' or 'meta', to |
|
initialize the model on. Currently, `meta` is only supported when |
|
cfg.pretrained is ``False``. Default: ``'cpu'``. |
|
peft_config (dict, optional): An optional dictionary of keyword arguments to be |
|
passed to the PeftConfig constructor. If provided, the model will be wrapped in a PeftModel. |
|
trust_remote_code (bool, optional): Whether to trust remote code when loading from Hugging Face |
|
Hub. Default: ``True``. |
|
use_auth_token (bool, optional): Whether to use the Hugging Face authentication token when |
|
loading from Hugging Face Hub. Default: ``False``. |
|
use_train_metrics (bool, optional): Whether to use training metrics. Default: ``True``. |
|
load_in_8bit (bool, optional): Whether to load the model in 8-bit mode. Default: ``False``. |
|
init_device (str, optional): Which device to initialize the model on. Default: ``'cpu'``. |
|
use_flash_attention_2 (bool, optional): Whether to use flash-attention 2. Default: ``False``. |
|
tokenizer (PreTrainedTokenizer): The tokenizer that the model will use. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
tokenizer: PreTrainedTokenizerBase, |
|
pretrained_model_name_or_path: str, |
|
pretrained: bool = True, |
|
pretrained_lora_id_or_path: Optional[str] = None, |
|
trust_remote_code: bool = True, |
|
use_auth_token: bool = False, |
|
use_flash_attention_2: bool = False, |
|
load_in_8bit: bool = False, |
|
init_device: str = 'cpu', |
|
config_overrides: Optional[Dict[str, Any]] = None, |
|
peft_config: Optional[Dict[str, Any]] = None, |
|
use_train_metrics: bool = True, |
|
additional_train_metrics: Optional[List] = None, |
|
additional_eval_metrics: Optional[List] = None, |
|
return_lm_logits: Optional[bool] = False, |
|
): |
|
|
|
config_overrides = config_overrides or {} |
|
|
|
model = ComposerHFSequenceClassification.build_inner_model( |
|
pretrained_model_name_or_path=pretrained_model_name_or_path, |
|
pretrained_lora_id_or_path=pretrained_lora_id_or_path, |
|
trust_remote_code=trust_remote_code, |
|
init_device=init_device, |
|
use_flash_attention_2=use_flash_attention_2, |
|
use_auth_token=use_auth_token, |
|
config_overrides=config_overrides, |
|
load_in_8bit=load_in_8bit, |
|
pretrained=pretrained, |
|
prepare_for_fsdp=True, |
|
return_lm_logits=return_lm_logits, |
|
) |
|
|
|
train_metrics, eval_metrics = ComposerHFSequenceClassification.build_metrics( |
|
use_train_metrics=use_train_metrics, |
|
additional_train_metrics=additional_train_metrics, |
|
additional_eval_metrics=additional_eval_metrics, |
|
) |
|
|
|
if peft_config is not None and not peft_installed: |
|
raise NotImplementedError("PEFT is not supported") |
|
|
|
peft_config_object = None |
|
if peft_config is not None: |
|
peft_config_object = self._get_peft_config(peft_config) |
|
|
|
|
|
super().__init__( |
|
model=model, |
|
shift_labels=True, |
|
tokenizer=tokenizer, |
|
metrics=train_metrics, |
|
eval_metrics=eval_metrics, |
|
peft_config=peft_config_object, |
|
allow_embedding_resizing=True, |
|
) |
|
|
|
|
|
self.model.config.pretrained = False |
|
|
|
@staticmethod |
|
def build_metrics( |
|
use_train_metrics: bool, |
|
additional_train_metrics: Optional[List[str]] = None, |
|
additional_eval_metrics: Optional[List[str]] = None, |
|
) -> Tuple[List[Metric], List[Metric]]: |
|
"""Builds the training and evaluation metrics for the model. |
|
|
|
Args: |
|
use_train_metrics (bool): Whether to use training metrics. |
|
additional_train_metrics (Optional[List[str]]): Additional training metrics to include. |
|
additional_eval_metrics (Optional[List[str]]): Additional evaluation metrics to include. |
|
|
|
Returns: |
|
Tuple[List[Metric], List[Metric]]: A tuple containing the list of training metrics and evaluation metrics. |
|
""" |
|
from llmfoundry.utils.builders import build_metric |
|
train_metric_names = additional_train_metrics if additional_train_metrics is not None else [] |
|
eval_metric_names = additional_eval_metrics if additional_eval_metrics is not None else [] |
|
train_metrics = [ |
|
build_metric(metric, {}) for metric in train_metric_names |
|
] if use_train_metrics else [] |
|
eval_metrics = [ |
|
build_metric(metric, {}) for metric in eval_metric_names |
|
] |
|
return train_metrics, eval_metrics |
|
|
|
@staticmethod |
|
def build_inner_model( |
|
pretrained_model_name_or_path: str, |
|
pretrained_lora_id_or_path: Optional[str], |
|
trust_remote_code: bool, |
|
init_device: str, |
|
use_flash_attention_2: bool, |
|
use_auth_token: bool, |
|
config_overrides: Dict[str, Any], |
|
load_in_8bit: bool, |
|
pretrained: bool, |
|
prepare_for_fsdp: bool = False, |
|
return_lm_logits: bool = False, |
|
) -> Union[PreTrainedModel, 'PeftModel']: |
|
"""Builds the inner model for the ComposerHFCausalLM. |
|
|
|
Args: |
|
pretrained_model_name_or_path (str): The pretrained model name or path. |
|
pretrained_lora_id_or_path (Optional[str]): The pretrained LORA ID or path. |
|
trust_remote_code (bool): Whether to trust remote code. |
|
init_device (str): The initialization device. |
|
use_flash_attention_2 (bool): Whether to use flash attention 2. |
|
use_auth_token (bool): Whether to use an authentication token. |
|
config_overrides (Dict[str, Any]): The configuration overrides. |
|
load_in_8bit (bool): Whether to load in 8-bit. |
|
prepare_for_fsdp (bool, optional): Whether to prepare the model for FSDP wrapping. Default: False. |
|
|
|
Returns: |
|
Union[PreTrainedModel, 'PeftModel']: The built inner model. |
|
prepare_for_fsdp (bool): Whether to prepare the model for FSDP wrapping. Default: ``False``. |
|
""" |
|
if not trust_remote_code and pretrained_model_name_or_path.startswith( |
|
'mosaicml/mpt', |
|
): |
|
raise ValueError( |
|
'trust_remote_code must be set to True for MPT models. Without this, the MPT model code will come from the transformers library, ' |
|
+ |
|
'which is significantly slower and not compatible with the LLM foundry training code, rather than the code release by MosaicML.', |
|
) |
|
|
|
resolved_init_device = hf_get_init_device(init_device) |
|
requested_attention_implementation = 'flash_attention_2' if use_flash_attention_2 else 'eager' |
|
|
|
if use_flash_attention_2 and not is_flash_v2_installed(): |
|
raise ValueError( |
|
'use_flash_attention_2 is set to True, but flash-attention 2 is not installed. ' |
|
+ 'Please `pip install llm-foundry[gpu]`.', |
|
) |
|
|
|
|
|
base_config = AutoConfig.from_pretrained( |
|
pretrained_model_name_or_path, |
|
trust_remote_code=trust_remote_code, |
|
token=True, |
|
attn_implementation=requested_attention_implementation, |
|
use_cache=False, |
|
|
|
) |
|
|
|
config = RewardModelConfig( |
|
base_model=pretrained_model_name_or_path, |
|
base_config=base_config, |
|
hidden_size=base_config.hidden_size, |
|
torch_dtype=base_config.torch_dtype, |
|
return_logits=return_lm_logits, |
|
vocab_size=base_config.vocab_size, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def _autoset_attn_implementation_monkeypatch( |
|
cls, |
|
config, |
|
*args, |
|
**kwargs, |
|
): |
|
config._attn_implementation = requested_attention_implementation |
|
return config |
|
|
|
PreTrainedModel._autoset_attn_implementation = classmethod( |
|
_autoset_attn_implementation_monkeypatch, |
|
) |
|
|
|
|
|
for k, v in config_overrides.items(): |
|
if not hasattr(config, k): |
|
raise ValueError( |
|
f'config does not have attribute "{k}" to override ({k}: {v}).', |
|
) |
|
|
|
attr = getattr(config, k) |
|
|
|
if isinstance(attr, Mapping): |
|
extra_keys = [_k for _k in v.keys() if _k not in attr.keys()] |
|
if extra_keys: |
|
raise ValueError( |
|
f'Config dict override got unknown keys. ' + |
|
f'Extra keys: {extra_keys}. ' + |
|
f'Expected (a subset of) keys: {list(attr.keys())}.', |
|
) |
|
getattr(config, k).update(v) |
|
|
|
elif attr is None and isinstance(v, Mapping): |
|
setattr(config, k, {}) |
|
getattr(config, k).update(v) |
|
elif isinstance(attr, PretrainedConfig): |
|
if not isinstance(v, Mapping): |
|
raise ValueError( |
|
f'Expected a dictionary for config override {k}, but got {v}.', |
|
) |
|
|
|
for _k, _v in v.items(): |
|
if not hasattr(attr, _k): |
|
raise ValueError( |
|
f'config does not have attribute "{_k}" to override ({k}: {_k}: {_v}).', |
|
) |
|
setattr(attr, _k, _v) |
|
else: |
|
setattr(config, k, v) |
|
|
|
if hasattr(config, 'attn_config') and get_hf_config_value( |
|
config.attn_config, |
|
'seq_parallel_world_size', |
|
) is not None: |
|
raise NotImplementedError( |
|
'Sequence Parallelism is not supported for HuggingFace models.', |
|
) |
|
|
|
|
|
|
|
if dist.get_local_rank() != 0 and init_device == 'mixed': |
|
pretrained = False |
|
|
|
|
|
|
|
|
|
|
|
if dist.get_local_rank() == 0: |
|
if os.path.isdir(pretrained_model_name_or_path): |
|
with init_empty_weights(include_buffers=False): |
|
with warnings.catch_warnings(): |
|
warnings.simplefilter('ignore', UserWarning) |
|
AutoModelForCausalLM.from_pretrained( |
|
pretrained_model_name_or_path, |
|
trust_remote_code=trust_remote_code, |
|
token=True, |
|
config=base_config, |
|
) |
|
else: |
|
with init_empty_weights(include_buffers=False): |
|
AutoModelForCausalLM.from_config( |
|
base_config, |
|
trust_remote_code=trust_remote_code, |
|
) |
|
|
|
dist.barrier() |
|
|
|
|
|
config.pretrained = pretrained |
|
if resolved_init_device == 'cpu': |
|
if pretrained: |
|
config.pretrain_cfg = { |
|
"trust_remote_code": trust_remote_code, |
|
"token": True, |
|
"load_in_8bit": load_in_8bit, |
|
} |
|
model = AutoModelForCausalLMWithRM(config) |
|
else: |
|
config.pretrain_cfg = { |
|
"trust_remote_code": trust_remote_code, |
|
} |
|
model = AutoModelForCausalLMWithRM(config) |
|
elif resolved_init_device == 'meta': |
|
if pretrained: |
|
raise ValueError( |
|
'Setting cfg.pretrained=True is not supported when init_device="meta".', |
|
) |
|
with init_empty_weights(include_buffers=False): |
|
config.pretrain_cfg = { |
|
"trust_remote_code": trust_remote_code, |
|
} |
|
model = AutoModelForCausalLMWithRM(config) |
|
else: |
|
raise ValueError( |
|
f'init_device="{init_device}" must be either "cpu" or "meta".', |
|
) |
|
|
|
signal_file_path = f'.node_{dist.get_node_rank()}_local_rank0_completed' |
|
if dist.get_local_rank() == 0: |
|
with open(signal_file_path, 'wb') as f: |
|
f.write(b'local_rank0_completed_download') |
|
|
|
|
|
|
|
with dist.local_rank_zero_download_and_wait(signal_file_path): |
|
|
|
dist.barrier() |
|
|
|
if dist.get_local_rank() == 0: |
|
os.remove(signal_file_path) |
|
|
|
|
|
|
|
if model.config.tie_word_embeddings and resolved_init_device == 'meta': |
|
model.tie_weights() |
|
|
|
if pretrained_lora_id_or_path is not None: |
|
"""TODO not supported""" |
|
raise NotImplementedError("PEFT IS NOT SUPPORTED") |
|
|
|
if prepare_for_fsdp: |
|
|
|
|
|
prepare_hf_sequence_classification_model_for_fsdp(model, init_device) |
|
|
|
|
|
model.param_init_fn = lambda module: model._init_weights(module) |
|
return model |
|
|