File size: 20,654 Bytes
4c65bff |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
import logging
import os
from pathlib import Path
from time import sleep
from typing import Callable, List, Optional, Union
import numpy as np
import tensorflow as tf
from huggingface_hub import Repository, create_repo
from packaging.version import parse
from tensorflow.keras.callbacks import Callback
from . import IntervalStrategy, PreTrainedTokenizerBase
from .modelcard import TrainingSummary
logger = logging.getLogger(__name__)
class KerasMetricCallback(Callback):
"""
Callback to compute metrics at the end of every epoch. Unlike normal Keras metrics, these do not need to be
compilable by TF. It is particularly useful for common NLP metrics like BLEU and ROUGE that require string
operations or generation loops that cannot be compiled. Predictions (or generations) will be computed on the
`eval_dataset` before being passed to the `metric_fn` in `np.ndarray` format. The `metric_fn` should compute
metrics and return a dict mapping metric names to metric values.
We provide an example of a suitable metric_fn that computes ROUGE scores for a summarization model below. Note that
this example skips some post-processing for readability and simplicity, and should probably not be used as-is!
```py
from datasets import load_metric
rouge_metric = load_metric("rouge")
def rouge_fn(predictions, labels):
decoded_predictions = tokenizer.batch_decode(predictions, skip_special_tokens=True)
decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
result = rouge_metric.compute(predictions=decoded_predictions, references=decoded_labels)
return {key: value.mid.fmeasure * 100 for key, value in result.items()}
```
The above function will return a dict containing values which will be logged like any other Keras metric:
```
{'rouge1': 37.4199, 'rouge2': 13.9768, 'rougeL': 34.361, 'rougeLsum': 35.0781
```
Args:
metric_fn (`Callable`):
Metric function provided by the user. It will be called with two arguments - `predictions` and `labels`.
These contain the model's outputs and matching labels from the dataset. It should return a dict mapping
metric names to numerical values.
eval_dataset (`tf.data.Dataset` or `dict` or `tuple` or `np.ndarray` or `tf.Tensor`):
Validation data to be used to generate predictions for the `metric_fn`.
output_cols (`List[str], *optional*):
A list of columns to be retained from the model output as the predictions. Defaults to all.
label_cols ('`List[str]`, *optional*'):
A list of columns to be retained from the input dataset as the labels. Will be autodetected if this is not
supplied.
batch_size (`int`, *optional*):
Batch size. Only used when the data is not a pre-batched `tf.data.Dataset`.
predict_with_generate (`bool`, *optional*, defaults to `False`):
Whether we should use `model.generate()` to get outputs for the model.
use_xla_generation (`bool`, *optional*, defaults to `False`):
If we're generating, whether to compile model generation with XLA. This can massively increase the speed of
generation (up to 100X speedup) but will require a new XLA compilation for each input shape. When using XLA
generation, it's a good idea to pad your inputs to the same size, or to use the `pad_to_multiple_of`
argument in your `tokenizer` or `DataCollator`, which will reduce the number of unique input shapes and
save a lot of compilation time. This option has no effect is `predict_with_generate` is `False`.
generate_kwargs (`dict`, *optional*):
Keyword arguments to pass to `model.generate()` when generating. Has no effect if `predict_with_generate`
is `False`.
"""
def __init__(
self,
metric_fn: Callable,
eval_dataset: Union[tf.data.Dataset, np.ndarray, tf.Tensor, tuple, dict],
output_cols: Optional[List[str]] = None,
label_cols: Optional[List[str]] = None,
batch_size: Optional[int] = None,
predict_with_generate: bool = False,
use_xla_generation: bool = False,
generate_kwargs: Optional[dict] = None,
):
super().__init__()
self.metric_fn = metric_fn
self.batch_size = batch_size
if not isinstance(eval_dataset, tf.data.Dataset):
if batch_size is None:
raise ValueError(
"When passing data to KerasMetricCallback that is not a pre-batched tf.data.Dataset "
"the batch_size argument must be set."
)
# Wrap a tf.data.Dataset around it
eval_dataset = tf.data.Dataset.from_tensor_slices(eval_dataset).batch(batch_size, drop_remainder=False)
self.eval_dataset = eval_dataset
self.predict_with_generate = predict_with_generate
self.output_cols = output_cols
# This next block attempts to parse out which elements of the dataset should be appended to the labels list
# that is passed to the metric_fn
if isinstance(eval_dataset.element_spec, tuple) and len(eval_dataset.element_spec) == 2:
input_spec, label_spec = eval_dataset.element_spec
else:
input_spec = eval_dataset.element_spec
label_spec = None
if label_cols is not None:
for label in label_cols:
if label not in input_spec:
raise ValueError(f"Label {label} is in label_cols but could not be found in the dataset inputs!")
self.label_cols = label_cols
self.use_keras_label = False
elif label_spec is not None:
# If the dataset inputs are split into a 2-tuple of inputs and labels,
# assume the second element is the labels
self.label_cols = None
self.use_keras_label = True
elif "labels" in input_spec:
self.label_cols = ["labels"]
self.use_keras_label = False
logging.warning("No label_cols specified for KerasMetricCallback, assuming you want the 'labels' key.")
elif "start_positions" in input_spec and "end_positions" in input_spec:
self.label_cols = ["start_positions", "end_positions"]
self.use_keras_label = False
logging.warning(
"No label_cols specified for KerasMetricCallback, assuming you want the "
"start_positions and end_positions keys."
)
else:
raise ValueError("Could not autodetect label_cols for KerasMetricCallback, please specify them!")
if parse(tf.__version__) < parse("2.7"):
logging.warning("TF versions less than 2.7 may encounter issues with KerasMetricCallback!")
self.use_xla_generation = use_xla_generation
self.generate_kwargs = {} if generate_kwargs is None else generate_kwargs
self.generation_function = None
@staticmethod
def _concatenate_batches(batches, padding_index=-100):
# If all batches are unidimensional or same length, do a simple concatenation
if batches[0].ndim == 1 or all(batch.shape[1] == batches[0].shape[1] for batch in batches):
return np.concatenate(batches, axis=0)
# Welp, they're not the same length. Let's do some padding
max_len = max([batch.shape[1] for batch in batches])
num_samples = sum([batch.shape[0] for batch in batches])
output = np.full_like(
batches[0], fill_value=padding_index, shape=[num_samples, max_len] + list(batches[0].shape[2:])
)
# i keeps track of which part of the concatenated array we're writing the next batch to
i = 0
for batch in batches:
output[i : i + len(batch), : batch.shape[1]] = batch
i += len(batch)
return output
def _postprocess_predictions_or_labels(self, inputs):
if isinstance(inputs[0], dict):
outputs = {}
for key in inputs[0].keys():
outputs[key] = self._concatenate_batches([batch[key] for batch in inputs])
# If it's a dict with only one key, just return the array
if len(outputs) == 1:
outputs = list(outputs.values())[0]
elif isinstance(inputs[0], list) or isinstance(inputs[0], tuple):
outputs = []
for input_list in zip(*inputs):
outputs.append(self._concatenate_batches(input_list))
if len(outputs) == 1:
outputs = outputs[0] # If it's a list with only one element, just return the array
elif isinstance(inputs[0], np.ndarray):
outputs = self._concatenate_batches(inputs)
elif isinstance(inputs[0], tf.Tensor):
outputs = self._concatenate_batches([tensor.numpy() for tensor in inputs])
else:
raise TypeError(f"Couldn't handle batch of type {type(inputs[0])}!")
return outputs
def on_epoch_end(self, epoch, logs=None):
if hasattr(self.model, "config"):
ignore_keys = getattr(self.model.config, "keys_to_ignore_at_inference", [])
else:
ignore_keys = []
main_input_name = None
if self.predict_with_generate:
# This dense conditional recognizes the case where we have an encoder-decoder model, but
# avoids getting tangled up when we just have a model with a layer called 'encoder'
if hasattr(self.model, "encoder") and hasattr(self.model.encoder, "main_input_name"):
main_input_name = self.model.encoder.main_input_name
else:
main_input_name = getattr(self.model, "main_input_name", "input_ids")
if self.use_xla_generation and self.generation_function is None:
def generation_function(inputs, attention_mask):
return self.model.generate(inputs, attention_mask=attention_mask, **self.generate_kwargs)
self.generation_function = tf.function(generation_function, jit_compile=True)
prediction_list = []
label_list = []
# The whole predict/generate loop is handled inside this method
for batch in self.eval_dataset:
if isinstance(batch, tuple):
batch, labels = batch
else:
labels = None
if self.predict_with_generate:
if isinstance(batch, dict):
generation_inputs = batch[main_input_name]
attention_mask = batch.get("attention_mask", None)
else:
generation_inputs = batch
attention_mask = None
if self.use_xla_generation:
predictions = self.generation_function(generation_inputs, attention_mask=attention_mask)
else:
predictions = self.model.generate(
generation_inputs, attention_mask=attention_mask, **self.generate_kwargs
)
else:
predictions = self.model.predict_on_batch(batch)
if isinstance(predictions, dict):
# This converts any dict-subclass to a regular dict
# Keras REALLY doesn't like it when we pass around a BatchEncoding or other derived class
predictions = dict(predictions)
if self.output_cols is not None:
predictions = {key: predictions[key] for key in self.output_cols}
else:
predictions = {
key: val for key, val in predictions.items() if key not in ignore_keys + ["loss"]
}
prediction_list.append(predictions)
if not self.use_keras_label:
labels = {key: batch[key].numpy() for key in self.label_cols}
elif isinstance(labels, dict):
labels = {key: array.numpy() for key, array in labels.items()}
elif isinstance(labels, list) or isinstance(labels, tuple):
labels = [array.numpy() for array in labels]
elif isinstance(labels, tf.Tensor):
labels = labels.numpy()
else:
raise TypeError(f"Confused by labels of type {type(labels)}")
label_list.append(labels)
all_preds = self._postprocess_predictions_or_labels(prediction_list)
all_labels = self._postprocess_predictions_or_labels(label_list)
metric_output = self.metric_fn((all_preds, all_labels))
if not isinstance(metric_output, dict):
raise TypeError(
f"metric_fn should return a dict mapping metric names to values but instead returned {metric_output}"
)
# This is the critical bit - Keras passes a dict containing the loss and standard metric values for this epoch
# in the logs argument. Ordinarily, this is so the callback can read them, but in this case we write a bunch of
# new keys in there, which will then get read by the History callback and treated like any other metric value.
# I promise that I have it in writing from Chollet that this is okay.
logs.update(metric_output)
class PushToHubCallback(Callback):
"""
Callback that will save and push the model to the Hub regularly. By default, it pushes once per epoch, but this can
be changed with the `save_strategy` argument. Pushed models can be accessed like any other model on the hub, such
as with the `from_pretrained` method.
```py
from transformers.keras_callbacks import PushToHubCallback
push_to_hub_callback = PushToHubCallback(
output_dir="./model_save",
tokenizer=tokenizer,
hub_model_id="gpt5-7xlarge",
)
model.fit(train_dataset, callbacks=[push_to_hub_callback])
```
Args:
output_dir (`str`):
The output directory where the model predictions and checkpoints will be written and synced with the
repository on the Hub.
save_strategy (`str` or [`~trainer_utils.IntervalStrategy`], *optional*, defaults to `"epoch"`):
The checkpoint save strategy to adopt during training. Possible values are:
- `"no"`: Save is done at the end of training.
- `"epoch"`: Save is done at the end of each epoch.
- `"steps"`: Save is done every `save_steps`
save_steps (`int`, *optional*):
The number of steps between saves when using the "steps" `save_strategy`.
tokenizer (`PreTrainedTokenizerBase`, *optional*):
The tokenizer used by the model. If supplied, will be uploaded to the repo alongside the weights.
hub_model_id (`str`, *optional*):
The name of the repository to keep in sync with the local `output_dir`. It can be a simple model ID in
which case the model will be pushed in your namespace. Otherwise it should be the whole repository name,
for instance `"user_name/model"`, which allows you to push to an organization you are a member of with
`"organization_name/model"`.
Will default to the name of `output_dir`.
hub_token (`str`, *optional*):
The token to use to push the model to the Hub. Will default to the token in the cache folder obtained with
`huggingface-cli login`.
checkpoint (`bool`, *optional*, defaults to `False`):
Whether to save full training checkpoints (including epoch and optimizer state) to allow training to be
resumed. Only usable when `save_strategy` is `"epoch"`.
"""
def __init__(
self,
output_dir: Union[str, Path],
save_strategy: Union[str, IntervalStrategy] = "epoch",
save_steps: Optional[int] = None,
tokenizer: Optional[PreTrainedTokenizerBase] = None,
hub_model_id: Optional[str] = None,
hub_token: Optional[str] = None,
checkpoint: bool = False,
**model_card_args,
):
super().__init__()
if checkpoint and save_strategy != "epoch":
raise ValueError("Cannot save checkpoints when save_strategy is not 'epoch'!")
if isinstance(save_strategy, str):
save_strategy = IntervalStrategy(save_strategy.lower())
self.save_strategy = save_strategy
if self.save_strategy == IntervalStrategy.STEPS and (not isinstance(save_steps, int) or save_steps <= 0):
raise ValueError("Please supply a positive integer argument for save_steps when save_strategy == 'steps'!")
self.save_steps = save_steps
output_dir = Path(output_dir)
# Create repo and retrieve repo_id
if hub_model_id is None:
hub_model_id = output_dir.absolute().name
self.hub_model_id = create_repo(repo_id=hub_model_id, exist_ok=True, token=hub_token).repo_id
self.output_dir = output_dir
self.repo = Repository(str(self.output_dir), clone_from=self.hub_model_id, token=hub_token)
self.tokenizer = tokenizer
self.last_job = None
self.checkpoint = checkpoint
self.training_history = None
self.model_card_args = model_card_args
def on_train_begin(self, logs=None):
# Although we can access model.history, we have no guarantees that the History callback will fire before this
# one, so we keep track of it here too
self.training_history = []
def on_train_batch_end(self, batch, logs=None):
if self.save_strategy == IntervalStrategy.STEPS and (batch + 1) % self.save_steps == 0:
if self.last_job is not None and not self.last_job.is_done:
return # The last upload is still running, don't start another
self.model.save_pretrained(self.output_dir)
if self.tokenizer is not None:
self.tokenizer.save_pretrained(self.output_dir)
_, self.last_job = self.repo.push_to_hub(
commit_message=f"Training in progress steps {batch}", blocking=False
)
def on_epoch_end(self, epoch, logs=None):
logs = logs.copy() # Don't accidentally write things that Keras will read later
if "epoch" not in logs:
logs["epoch"] = epoch
self.training_history.append(logs)
if self.save_strategy == IntervalStrategy.EPOCH:
if self.last_job is not None and not self.last_job.is_done:
return # The last upload is still running, don't start another
self.model.save_pretrained(self.output_dir)
if self.tokenizer is not None:
self.tokenizer.save_pretrained(self.output_dir)
if self.checkpoint:
checkpoint_dir = os.path.join(self.output_dir, "checkpoint")
self.model._save_checkpoint(checkpoint_dir, epoch)
train_summary = TrainingSummary.from_keras(
model=self.model,
model_name=self.hub_model_id,
keras_history=self.training_history,
**self.model_card_args,
)
model_card = train_summary.to_model_card()
with (self.output_dir / "README.md").open("w") as f:
f.write(model_card)
_, self.last_job = self.repo.push_to_hub(
commit_message=f"Training in progress epoch {epoch}", blocking=False
)
def on_train_end(self, logs=None):
# Makes sure the latest version of the model is uploaded
if self.last_job is not None and not self.last_job.is_done:
logging.info("Pushing the last epoch to the Hub, this may take a while...")
while not self.last_job.is_done:
sleep(1)
else:
self.model.save_pretrained(self.output_dir)
if self.tokenizer is not None:
self.tokenizer.save_pretrained(self.output_dir)
train_summary = TrainingSummary.from_keras(
model=self.model,
model_name=self.hub_model_id,
keras_history=self.training_history,
**self.model_card_args,
)
model_card = train_summary.to_model_card()
with (self.output_dir / "README.md").open("w") as f:
f.write(model_card)
self.repo.push_to_hub(commit_message="End of training", blocking=True)
|