Spaces:
Restarting
Restarting
from collections import defaultdict | |
import torch | |
import intel_extension_for_pytorch as ipex # pylint: disable=import-error, unused-import | |
import intel_extension_for_pytorch._C as core # pylint: disable=import-error, unused-import | |
# pylint: disable=protected-access, missing-function-docstring, line-too-long | |
OptState = ipex.cpu.autocast._grad_scaler.OptState | |
_MultiDeviceReplicator = ipex.cpu.autocast._grad_scaler._MultiDeviceReplicator | |
_refresh_per_optimizer_state = ipex.cpu.autocast._grad_scaler._refresh_per_optimizer_state | |
def _unscale_grads_(self, optimizer, inv_scale, found_inf, allow_fp16): # pylint: disable=unused-argument | |
per_device_inv_scale = _MultiDeviceReplicator(inv_scale) | |
per_device_found_inf = _MultiDeviceReplicator(found_inf) | |
# To set up _amp_foreach_non_finite_check_and_unscale_, split grads by device and dtype. | |
# There could be hundreds of grads, so we'd like to iterate through them just once. | |
# However, we don't know their devices or dtypes in advance. | |
# https://stackoverflow.com/questions/5029934/defaultdict-of-defaultdict | |
# Google says mypy struggles with defaultdicts type annotations. | |
per_device_and_dtype_grads = defaultdict(lambda: defaultdict(list)) # type: ignore[var-annotated] | |
# sync grad to master weight | |
if hasattr(optimizer, "sync_grad"): | |
optimizer.sync_grad() | |
with torch.no_grad(): | |
for group in optimizer.param_groups: | |
for param in group["params"]: | |
if param.grad is None: | |
continue | |
if (not allow_fp16) and param.grad.dtype == torch.float16: | |
raise ValueError("Attempting to unscale FP16 gradients.") | |
if param.grad.is_sparse: | |
# is_coalesced() == False means the sparse grad has values with duplicate indices. | |
# coalesce() deduplicates indices and adds all values that have the same index. | |
# For scaled fp16 values, there's a good chance coalescing will cause overflow, | |
# so we should check the coalesced _values(). | |
if param.grad.dtype is torch.float16: | |
param.grad = param.grad.coalesce() | |
to_unscale = param.grad._values() | |
else: | |
to_unscale = param.grad | |
# -: is there a way to split by device and dtype without appending in the inner loop? | |
to_unscale = to_unscale.to("cpu") | |
per_device_and_dtype_grads[to_unscale.device][ | |
to_unscale.dtype | |
].append(to_unscale) | |
for _, per_dtype_grads in per_device_and_dtype_grads.items(): | |
for grads in per_dtype_grads.values(): | |
core._amp_foreach_non_finite_check_and_unscale_( | |
grads, | |
per_device_found_inf.get("cpu"), | |
per_device_inv_scale.get("cpu"), | |
) | |
return per_device_found_inf._per_device_tensors | |
def unscale_(self, optimizer): | |
""" | |
Divides ("unscales") the optimizer's gradient tensors by the scale factor. | |
:meth:`unscale_` is optional, serving cases where you need to | |
:ref:`modify or inspect gradients<working-with-unscaled-gradients>` | |
between the backward pass(es) and :meth:`step`. | |
If :meth:`unscale_` is not called explicitly, gradients will be unscaled automatically during :meth:`step`. | |
Simple example, using :meth:`unscale_` to enable clipping of unscaled gradients:: | |
... | |
scaler.scale(loss).backward() | |
scaler.unscale_(optimizer) | |
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm) | |
scaler.step(optimizer) | |
scaler.update() | |
Args: | |
optimizer (torch.optim.Optimizer): Optimizer that owns the gradients to be unscaled. | |
.. warning:: | |
:meth:`unscale_` should only be called once per optimizer per :meth:`step` call, | |
and only after all gradients for that optimizer's assigned parameters have been accumulated. | |
Calling :meth:`unscale_` twice for a given optimizer between each :meth:`step` triggers a RuntimeError. | |
.. warning:: | |
:meth:`unscale_` may unscale sparse gradients out of place, replacing the ``.grad`` attribute. | |
""" | |
if not self._enabled: | |
return | |
self._check_scale_growth_tracker("unscale_") | |
optimizer_state = self._per_optimizer_states[id(optimizer)] | |
if optimizer_state["stage"] is OptState.UNSCALED: # pylint: disable=no-else-raise | |
raise RuntimeError( | |
"unscale_() has already been called on this optimizer since the last update()." | |
) | |
elif optimizer_state["stage"] is OptState.STEPPED: | |
raise RuntimeError("unscale_() is being called after step().") | |
# FP32 division can be imprecise for certain compile options, so we carry out the reciprocal in FP64. | |
assert self._scale is not None | |
inv_scale = self._scale.to("cpu").double().reciprocal().float().to(self._scale.device) | |
found_inf = torch.full( | |
(1,), 0.0, dtype=torch.float32, device=self._scale.device | |
) | |
optimizer_state["found_inf_per_device"] = self._unscale_grads_( | |
optimizer, inv_scale, found_inf, False | |
) | |
optimizer_state["stage"] = OptState.UNSCALED | |
def update(self, new_scale=None): | |
""" | |
Updates the scale factor. | |
If any optimizer steps were skipped the scale is multiplied by ``backoff_factor`` | |
to reduce it. If ``growth_interval`` unskipped iterations occurred consecutively, | |
the scale is multiplied by ``growth_factor`` to increase it. | |
Passing ``new_scale`` sets the new scale value manually. (``new_scale`` is not | |
used directly, it's used to fill GradScaler's internal scale tensor. So if | |
``new_scale`` was a tensor, later in-place changes to that tensor will not further | |
affect the scale GradScaler uses internally.) | |
Args: | |
new_scale (float or :class:`torch.FloatTensor`, optional, default=None): New scale factor. | |
.. warning:: | |
:meth:`update` should only be called at the end of the iteration, after ``scaler.step(optimizer)`` has | |
been invoked for all optimizers used this iteration. | |
""" | |
if not self._enabled: | |
return | |
_scale, _growth_tracker = self._check_scale_growth_tracker("update") | |
if new_scale is not None: | |
# Accept a new user-defined scale. | |
if isinstance(new_scale, float): | |
self._scale.fill_(new_scale) # type: ignore[union-attr] | |
else: | |
reason = "new_scale should be a float or a 1-element torch.FloatTensor with requires_grad=False." | |
assert isinstance(new_scale, torch.FloatTensor), reason # type: ignore[attr-defined] | |
assert new_scale.numel() == 1, reason | |
assert new_scale.requires_grad is False, reason | |
self._scale.copy_(new_scale) # type: ignore[union-attr] | |
else: | |
# Consume shared inf/nan data collected from optimizers to update the scale. | |
# If all found_inf tensors are on the same device as self._scale, this operation is asynchronous. | |
found_infs = [ | |
found_inf.to(device="cpu", non_blocking=True) | |
for state in self._per_optimizer_states.values() | |
for found_inf in state["found_inf_per_device"].values() | |
] | |
assert len(found_infs) > 0, "No inf checks were recorded prior to update." | |
found_inf_combined = found_infs[0] | |
if len(found_infs) > 1: | |
for i in range(1, len(found_infs)): | |
found_inf_combined += found_infs[i] | |
to_device = _scale.device | |
_scale = _scale.to("cpu") | |
_growth_tracker = _growth_tracker.to("cpu") | |
core._amp_update_scale_( | |
_scale, | |
_growth_tracker, | |
found_inf_combined, | |
self._growth_factor, | |
self._backoff_factor, | |
self._growth_interval, | |
) | |
_scale = _scale.to(to_device) | |
_growth_tracker = _growth_tracker.to(to_device) | |
# To prepare for next iteration, clear the data collected from optimizers this iteration. | |
self._per_optimizer_states = defaultdict(_refresh_per_optimizer_state) | |
def gradscaler_init(): | |
torch.xpu.amp.GradScaler = ipex.cpu.autocast._grad_scaler.GradScaler | |
torch.xpu.amp.GradScaler._unscale_grads_ = _unscale_grads_ | |
torch.xpu.amp.GradScaler.unscale_ = unscale_ | |
torch.xpu.amp.GradScaler.update = update | |
return torch.xpu.amp.GradScaler |