Spaces:
Running
on
T4
Running
on
T4
import logging | |
import os | |
from pathlib import Path | |
from time import sleep | |
from typing import Callable, List, Optional, Union | |
import numpy as np | |
import tensorflow as tf | |
from huggingface_hub import Repository, create_repo | |
from packaging.version import parse | |
from tensorflow.keras.callbacks import Callback | |
from . import IntervalStrategy, PreTrainedTokenizerBase | |
from .modelcard import TrainingSummary | |
logger = logging.getLogger(__name__) | |
class KerasMetricCallback(Callback): | |
""" | |
Callback to compute metrics at the end of every epoch. Unlike normal Keras metrics, these do not need to be | |
compilable by TF. It is particularly useful for common NLP metrics like BLEU and ROUGE that require string | |
operations or generation loops that cannot be compiled. Predictions (or generations) will be computed on the | |
`eval_dataset` before being passed to the `metric_fn` in `np.ndarray` format. The `metric_fn` should compute | |
metrics and return a dict mapping metric names to metric values. | |
We provide an example of a suitable metric_fn that computes ROUGE scores for a summarization model below. Note that | |
this example skips some post-processing for readability and simplicity, and should probably not be used as-is! | |
```py | |
from datasets import load_metric | |
rouge_metric = load_metric("rouge") | |
def rouge_fn(predictions, labels): | |
decoded_predictions = tokenizer.batch_decode(predictions, skip_special_tokens=True) | |
decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True) | |
result = rouge_metric.compute(predictions=decoded_predictions, references=decoded_labels) | |
return {key: value.mid.fmeasure * 100 for key, value in result.items()} | |
``` | |
The above function will return a dict containing values which will be logged like any other Keras metric: | |
``` | |
{'rouge1': 37.4199, 'rouge2': 13.9768, 'rougeL': 34.361, 'rougeLsum': 35.0781 | |
``` | |
Args: | |
metric_fn (`Callable`): | |
Metric function provided by the user. It will be called with two arguments - `predictions` and `labels`. | |
These contain the model's outputs and matching labels from the dataset. It should return a dict mapping | |
metric names to numerical values. | |
eval_dataset (`tf.data.Dataset` or `dict` or `tuple` or `np.ndarray` or `tf.Tensor`): | |
Validation data to be used to generate predictions for the `metric_fn`. | |
output_cols (`List[str], *optional*): | |
A list of columns to be retained from the model output as the predictions. Defaults to all. | |
label_cols ('`List[str]`, *optional*'): | |
A list of columns to be retained from the input dataset as the labels. Will be autodetected if this is not | |
supplied. | |
batch_size (`int`, *optional*): | |
Batch size. Only used when the data is not a pre-batched `tf.data.Dataset`. | |
predict_with_generate (`bool`, *optional*, defaults to `False`): | |
Whether we should use `model.generate()` to get outputs for the model. | |
use_xla_generation (`bool`, *optional*, defaults to `False`): | |
If we're generating, whether to compile model generation with XLA. This can massively increase the speed of | |
generation (up to 100X speedup) but will require a new XLA compilation for each input shape. When using XLA | |
generation, it's a good idea to pad your inputs to the same size, or to use the `pad_to_multiple_of` | |
argument in your `tokenizer` or `DataCollator`, which will reduce the number of unique input shapes and | |
save a lot of compilation time. This option has no effect is `predict_with_generate` is `False`. | |
generate_kwargs (`dict`, *optional*): | |
Keyword arguments to pass to `model.generate()` when generating. Has no effect if `predict_with_generate` | |
is `False`. | |
""" | |
def __init__( | |
self, | |
metric_fn: Callable, | |
eval_dataset: Union[tf.data.Dataset, np.ndarray, tf.Tensor, tuple, dict], | |
output_cols: Optional[List[str]] = None, | |
label_cols: Optional[List[str]] = None, | |
batch_size: Optional[int] = None, | |
predict_with_generate: bool = False, | |
use_xla_generation: bool = False, | |
generate_kwargs: Optional[dict] = None, | |
): | |
super().__init__() | |
self.metric_fn = metric_fn | |
self.batch_size = batch_size | |
if not isinstance(eval_dataset, tf.data.Dataset): | |
if batch_size is None: | |
raise ValueError( | |
"When passing data to KerasMetricCallback that is not a pre-batched tf.data.Dataset " | |
"the batch_size argument must be set." | |
) | |
# Wrap a tf.data.Dataset around it | |
eval_dataset = tf.data.Dataset.from_tensor_slices(eval_dataset).batch(batch_size, drop_remainder=False) | |
self.eval_dataset = eval_dataset | |
self.predict_with_generate = predict_with_generate | |
self.output_cols = output_cols | |
# This next block attempts to parse out which elements of the dataset should be appended to the labels list | |
# that is passed to the metric_fn | |
if isinstance(eval_dataset.element_spec, tuple) and len(eval_dataset.element_spec) == 2: | |
input_spec, label_spec = eval_dataset.element_spec | |
else: | |
input_spec = eval_dataset.element_spec | |
label_spec = None | |
if label_cols is not None: | |
for label in label_cols: | |
if label not in input_spec: | |
raise ValueError(f"Label {label} is in label_cols but could not be found in the dataset inputs!") | |
self.label_cols = label_cols | |
self.use_keras_label = False | |
elif label_spec is not None: | |
# If the dataset inputs are split into a 2-tuple of inputs and labels, | |
# assume the second element is the labels | |
self.label_cols = None | |
self.use_keras_label = True | |
elif "labels" in input_spec: | |
self.label_cols = ["labels"] | |
self.use_keras_label = False | |
logging.warning("No label_cols specified for KerasMetricCallback, assuming you want the 'labels' key.") | |
elif "start_positions" in input_spec and "end_positions" in input_spec: | |
self.label_cols = ["start_positions", "end_positions"] | |
self.use_keras_label = False | |
logging.warning( | |
"No label_cols specified for KerasMetricCallback, assuming you want the " | |
"start_positions and end_positions keys." | |
) | |
else: | |
raise ValueError("Could not autodetect label_cols for KerasMetricCallback, please specify them!") | |
if parse(tf.__version__) < parse("2.7"): | |
logging.warning("TF versions less than 2.7 may encounter issues with KerasMetricCallback!") | |
self.use_xla_generation = use_xla_generation | |
self.generate_kwargs = {} if generate_kwargs is None else generate_kwargs | |
self.generation_function = None | |
def _concatenate_batches(batches, padding_index=-100): | |
# If all batches are unidimensional or same length, do a simple concatenation | |
if batches[0].ndim == 1 or all(batch.shape[1] == batches[0].shape[1] for batch in batches): | |
return np.concatenate(batches, axis=0) | |
# Welp, they're not the same length. Let's do some padding | |
max_len = max([batch.shape[1] for batch in batches]) | |
num_samples = sum([batch.shape[0] for batch in batches]) | |
output = np.full_like( | |
batches[0], fill_value=padding_index, shape=[num_samples, max_len] + list(batches[0].shape[2:]) | |
) | |
# i keeps track of which part of the concatenated array we're writing the next batch to | |
i = 0 | |
for batch in batches: | |
output[i : i + len(batch), : batch.shape[1]] = batch | |
i += len(batch) | |
return output | |
def _postprocess_predictions_or_labels(self, inputs): | |
if isinstance(inputs[0], dict): | |
outputs = {} | |
for key in inputs[0].keys(): | |
outputs[key] = self._concatenate_batches([batch[key] for batch in inputs]) | |
# If it's a dict with only one key, just return the array | |
if len(outputs) == 1: | |
outputs = list(outputs.values())[0] | |
elif isinstance(inputs[0], list) or isinstance(inputs[0], tuple): | |
outputs = [] | |
for input_list in zip(*inputs): | |
outputs.append(self._concatenate_batches(input_list)) | |
if len(outputs) == 1: | |
outputs = outputs[0] # If it's a list with only one element, just return the array | |
elif isinstance(inputs[0], np.ndarray): | |
outputs = self._concatenate_batches(inputs) | |
elif isinstance(inputs[0], tf.Tensor): | |
outputs = self._concatenate_batches([tensor.numpy() for tensor in inputs]) | |
else: | |
raise TypeError(f"Couldn't handle batch of type {type(inputs[0])}!") | |
return outputs | |
def on_epoch_end(self, epoch, logs=None): | |
if hasattr(self.model, "config"): | |
ignore_keys = getattr(self.model.config, "keys_to_ignore_at_inference", []) | |
else: | |
ignore_keys = [] | |
main_input_name = None | |
if self.predict_with_generate: | |
# This dense conditional recognizes the case where we have an encoder-decoder model, but | |
# avoids getting tangled up when we just have a model with a layer called 'encoder' | |
if hasattr(self.model, "encoder") and hasattr(self.model.encoder, "main_input_name"): | |
main_input_name = self.model.encoder.main_input_name | |
else: | |
main_input_name = getattr(self.model, "main_input_name", "input_ids") | |
if self.use_xla_generation and self.generation_function is None: | |
def generation_function(inputs, attention_mask): | |
return self.model.generate(inputs, attention_mask=attention_mask, **self.generate_kwargs) | |
self.generation_function = tf.function(generation_function, jit_compile=True) | |
prediction_list = [] | |
label_list = [] | |
# The whole predict/generate loop is handled inside this method | |
for batch in self.eval_dataset: | |
if isinstance(batch, tuple): | |
batch, labels = batch | |
else: | |
labels = None | |
if self.predict_with_generate: | |
if isinstance(batch, dict): | |
generation_inputs = batch[main_input_name] | |
attention_mask = batch.get("attention_mask", None) | |
else: | |
generation_inputs = batch | |
attention_mask = None | |
if self.use_xla_generation: | |
predictions = self.generation_function(generation_inputs, attention_mask=attention_mask) | |
else: | |
predictions = self.model.generate( | |
generation_inputs, attention_mask=attention_mask, **self.generate_kwargs | |
) | |
else: | |
predictions = self.model.predict_on_batch(batch) | |
if isinstance(predictions, dict): | |
# This converts any dict-subclass to a regular dict | |
# Keras REALLY doesn't like it when we pass around a BatchEncoding or other derived class | |
predictions = dict(predictions) | |
if self.output_cols is not None: | |
predictions = {key: predictions[key] for key in self.output_cols} | |
else: | |
predictions = { | |
key: val for key, val in predictions.items() if key not in ignore_keys + ["loss"] | |
} | |
prediction_list.append(predictions) | |
if not self.use_keras_label: | |
labels = {key: batch[key].numpy() for key in self.label_cols} | |
elif isinstance(labels, dict): | |
labels = {key: array.numpy() for key, array in labels.items()} | |
elif isinstance(labels, list) or isinstance(labels, tuple): | |
labels = [array.numpy() for array in labels] | |
elif isinstance(labels, tf.Tensor): | |
labels = labels.numpy() | |
else: | |
raise TypeError(f"Confused by labels of type {type(labels)}") | |
label_list.append(labels) | |
all_preds = self._postprocess_predictions_or_labels(prediction_list) | |
all_labels = self._postprocess_predictions_or_labels(label_list) | |
metric_output = self.metric_fn((all_preds, all_labels)) | |
if not isinstance(metric_output, dict): | |
raise TypeError( | |
f"metric_fn should return a dict mapping metric names to values but instead returned {metric_output}" | |
) | |
# This is the critical bit - Keras passes a dict containing the loss and standard metric values for this epoch | |
# in the logs argument. Ordinarily, this is so the callback can read them, but in this case we write a bunch of | |
# new keys in there, which will then get read by the History callback and treated like any other metric value. | |
# I promise that I have it in writing from Chollet that this is okay. | |
logs.update(metric_output) | |
class PushToHubCallback(Callback): | |
""" | |
Callback that will save and push the model to the Hub regularly. By default, it pushes once per epoch, but this can | |
be changed with the `save_strategy` argument. Pushed models can be accessed like any other model on the hub, such | |
as with the `from_pretrained` method. | |
```py | |
from transformers.keras_callbacks import PushToHubCallback | |
push_to_hub_callback = PushToHubCallback( | |
output_dir="./model_save", | |
tokenizer=tokenizer, | |
hub_model_id="gpt5-7xlarge", | |
) | |
model.fit(train_dataset, callbacks=[push_to_hub_callback]) | |
``` | |
Args: | |
output_dir (`str`): | |
The output directory where the model predictions and checkpoints will be written and synced with the | |
repository on the Hub. | |
save_strategy (`str` or [`~trainer_utils.IntervalStrategy`], *optional*, defaults to `"epoch"`): | |
The checkpoint save strategy to adopt during training. Possible values are: | |
- `"no"`: Save is done at the end of training. | |
- `"epoch"`: Save is done at the end of each epoch. | |
- `"steps"`: Save is done every `save_steps` | |
save_steps (`int`, *optional*): | |
The number of steps between saves when using the "steps" `save_strategy`. | |
tokenizer (`PreTrainedTokenizerBase`, *optional*): | |
The tokenizer used by the model. If supplied, will be uploaded to the repo alongside the weights. | |
hub_model_id (`str`, *optional*): | |
The name of the repository to keep in sync with the local `output_dir`. It can be a simple model ID in | |
which case the model will be pushed in your namespace. Otherwise it should be the whole repository name, | |
for instance `"user_name/model"`, which allows you to push to an organization you are a member of with | |
`"organization_name/model"`. | |
Will default to the name of `output_dir`. | |
hub_token (`str`, *optional*): | |
The token to use to push the model to the Hub. Will default to the token in the cache folder obtained with | |
`huggingface-cli login`. | |
checkpoint (`bool`, *optional*, defaults to `False`): | |
Whether to save full training checkpoints (including epoch and optimizer state) to allow training to be | |
resumed. Only usable when `save_strategy` is `"epoch"`. | |
""" | |
def __init__( | |
self, | |
output_dir: Union[str, Path], | |
save_strategy: Union[str, IntervalStrategy] = "epoch", | |
save_steps: Optional[int] = None, | |
tokenizer: Optional[PreTrainedTokenizerBase] = None, | |
hub_model_id: Optional[str] = None, | |
hub_token: Optional[str] = None, | |
checkpoint: bool = False, | |
**model_card_args, | |
): | |
super().__init__() | |
if checkpoint and save_strategy != "epoch": | |
raise ValueError("Cannot save checkpoints when save_strategy is not 'epoch'!") | |
if isinstance(save_strategy, str): | |
save_strategy = IntervalStrategy(save_strategy.lower()) | |
self.save_strategy = save_strategy | |
if self.save_strategy == IntervalStrategy.STEPS and (not isinstance(save_steps, int) or save_steps <= 0): | |
raise ValueError("Please supply a positive integer argument for save_steps when save_strategy == 'steps'!") | |
self.save_steps = save_steps | |
output_dir = Path(output_dir) | |
# Create repo and retrieve repo_id | |
if hub_model_id is None: | |
hub_model_id = output_dir.absolute().name | |
self.hub_model_id = create_repo(repo_id=hub_model_id, exist_ok=True, token=hub_token).repo_id | |
self.output_dir = output_dir | |
self.repo = Repository(str(self.output_dir), clone_from=self.hub_model_id, token=hub_token) | |
self.tokenizer = tokenizer | |
self.last_job = None | |
self.checkpoint = checkpoint | |
self.training_history = None | |
self.model_card_args = model_card_args | |
def on_train_begin(self, logs=None): | |
# Although we can access model.history, we have no guarantees that the History callback will fire before this | |
# one, so we keep track of it here too | |
self.training_history = [] | |
def on_train_batch_end(self, batch, logs=None): | |
if self.save_strategy == IntervalStrategy.STEPS and (batch + 1) % self.save_steps == 0: | |
if self.last_job is not None and not self.last_job.is_done: | |
return # The last upload is still running, don't start another | |
self.model.save_pretrained(self.output_dir) | |
if self.tokenizer is not None: | |
self.tokenizer.save_pretrained(self.output_dir) | |
_, self.last_job = self.repo.push_to_hub( | |
commit_message=f"Training in progress steps {batch}", blocking=False | |
) | |
def on_epoch_end(self, epoch, logs=None): | |
logs = logs.copy() # Don't accidentally write things that Keras will read later | |
if "epoch" not in logs: | |
logs["epoch"] = epoch | |
self.training_history.append(logs) | |
if self.save_strategy == IntervalStrategy.EPOCH: | |
if self.last_job is not None and not self.last_job.is_done: | |
return # The last upload is still running, don't start another | |
self.model.save_pretrained(self.output_dir) | |
if self.tokenizer is not None: | |
self.tokenizer.save_pretrained(self.output_dir) | |
if self.checkpoint: | |
checkpoint_dir = os.path.join(self.output_dir, "checkpoint") | |
self.model._save_checkpoint(checkpoint_dir, epoch) | |
train_summary = TrainingSummary.from_keras( | |
model=self.model, | |
model_name=self.hub_model_id, | |
keras_history=self.training_history, | |
**self.model_card_args, | |
) | |
model_card = train_summary.to_model_card() | |
with (self.output_dir / "README.md").open("w") as f: | |
f.write(model_card) | |
_, self.last_job = self.repo.push_to_hub( | |
commit_message=f"Training in progress epoch {epoch}", blocking=False | |
) | |
def on_train_end(self, logs=None): | |
# Makes sure the latest version of the model is uploaded | |
if self.last_job is not None and not self.last_job.is_done: | |
logging.info("Pushing the last epoch to the Hub, this may take a while...") | |
while not self.last_job.is_done: | |
sleep(1) | |
else: | |
self.model.save_pretrained(self.output_dir) | |
if self.tokenizer is not None: | |
self.tokenizer.save_pretrained(self.output_dir) | |
train_summary = TrainingSummary.from_keras( | |
model=self.model, | |
model_name=self.hub_model_id, | |
keras_history=self.training_history, | |
**self.model_card_args, | |
) | |
model_card = train_summary.to_model_card() | |
with (self.output_dir / "README.md").open("w") as f: | |
f.write(model_card) | |
self.repo.push_to_hub(commit_message="End of training", blocking=True) | |