| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- # Copyright 2021 The HuggingFace Team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import base64
- import json
- import os
- from copy import deepcopy
- from torch import optim
- from ..optimizer import AcceleratedOptimizer
- from ..scheduler import AcceleratedScheduler
- from .dataclasses import DistributedType
- from .imports import is_bnb_available
- from .versions import compare_versions
- def map_pytorch_optim_to_deepspeed(optimizer):
- """
- Args:
- optimizer: torch.optim.Optimizer
- Returns the DeepSeedCPUOptimizer (deepspeed.ops) version of the optimizer.
- """
- defaults = {k: v for k, v in optimizer.defaults.items() if k in ["lr", "weight_decay"]}
- # Select the DeepSpeedCPUOptimizer based on the original optimizer class.
- # DeepSpeedCPUAdam is the default
- from deepspeed.ops.adam import DeepSpeedCPUAdam
- optimizer_class = DeepSpeedCPUAdam
- # For DeepSpeedCPUAdam (adamw_mode)
- if compare_versions("deepspeed", ">=", "0.3.1"):
- defaults["adamw_mode"] = False
- is_adaw = isinstance(optimizer, optim.AdamW)
- if is_bnb_available() and not is_adaw:
- import bitsandbytes.optim as bnb_opt
- if isinstance(optimizer, (bnb_opt.AdamW, bnb_opt.AdamW32bit)):
- try:
- is_adaw = optimizer.optim_bits == 32
- except AttributeError:
- is_adaw = optimizer.args.optim_bits == 32
- else:
- is_adaw = False
- if is_adaw:
- defaults["adamw_mode"] = True
- # For DeepSpeedCPUAdagrad
- if compare_versions("deepspeed", ">=", "0.5.5"):
- # Check if the optimizer is PyTorch's Adagrad.
- is_ada = isinstance(optimizer, optim.Adagrad)
- # If not, and bitsandbytes is available,
- # # check if the optimizer is the 32-bit bitsandbytes Adagrad.
- if is_bnb_available() and not is_ada:
- import bitsandbytes.optim as bnb_opt
- if isinstance(optimizer, (bnb_opt.Adagrad, bnb_opt.Adagrad32bit)):
- try:
- is_ada = optimizer.optim_bits == 32
- except AttributeError:
- is_ada = optimizer.args.optim_bits == 32
- if is_ada:
- from deepspeed.ops.adagrad import DeepSpeedCPUAdagrad
- optimizer_class = DeepSpeedCPUAdagrad
- # For DeepSpeedCPULion
- if is_bnb_available(min_version="0.38.0") and compare_versions("deepspeed", ">=", "0.11.0"):
- from bitsandbytes.optim import Lion, Lion32bit
- if isinstance(optimizer, (Lion, Lion32bit)):
- try:
- is_bnb_32bits = optimizer.optim_bits == 32
- except AttributeError:
- is_bnb_32bits = optimizer.args.optim_bits == 32
- if is_bnb_32bits:
- from deepspeed.ops.lion import DeepSpeedCPULion
- optimizer_class = DeepSpeedCPULion
- return optimizer_class(optimizer.param_groups, **defaults)
- def get_active_deepspeed_plugin(state):
- """
- Returns the currently active DeepSpeedPlugin.
- Raises:
- ValueError: If DeepSpeed was not enabled and this function is called.
- """
- if state.distributed_type != DistributedType.DEEPSPEED:
- raise ValueError(
- "Couldn't retrieve the active `DeepSpeedPlugin` as none were enabled. "
- "Please make sure that either `Accelerator` is configured for `deepspeed` "
- "or make sure that the desired `DeepSpeedPlugin` has been enabled (`AcceleratorState().select_deepspeed_plugin(name)`) "
- "before calling this function."
- )
- if not isinstance(state.deepspeed_plugins, dict):
- return state.deepspeed_plugins
- return next(plugin for plugin in state.deepspeed_plugins.values() if plugin.selected)
- class HfDeepSpeedConfig:
- """
- This object contains a DeepSpeed configuration dictionary and can be quickly queried for things like zero stage.
- A `weakref` of this object is stored in the module's globals to be able to access the config from areas where
- things like the Trainer object is not available (e.g. `from_pretrained` and `_get_resized_embeddings`). Therefore
- it's important that this object remains alive while the program is still running.
- [`Trainer`] uses the `HfTrainerDeepSpeedConfig` subclass instead. That subclass has logic to sync the configuration
- with values of [`TrainingArguments`] by replacing special placeholder values: `"auto"`. Without this special logic
- the DeepSpeed configuration is not modified in any way.
- Args:
- config_file_or_dict (`Union[str, Dict]`): path to DeepSpeed config file or dict.
- """
- def __init__(self, config_file_or_dict):
- if isinstance(config_file_or_dict, dict):
- # Don't modify user's data should they want to reuse it (e.g. in tests), because once we
- # modified it, it will not be accepted here again, since `auto` values would have been overridden
- config = deepcopy(config_file_or_dict)
- elif os.path.exists(config_file_or_dict):
- with open(config_file_or_dict, encoding="utf-8") as f:
- config = json.load(f)
- else:
- try:
- try:
- # First try parsing as JSON directly
- config = json.loads(config_file_or_dict)
- except json.JSONDecodeError:
- # If that fails, try base64 decoding
- config_decoded = base64.urlsafe_b64decode(config_file_or_dict).decode("utf-8")
- config = json.loads(config_decoded)
- except (UnicodeDecodeError, AttributeError, ValueError):
- raise ValueError(
- f"Expected a string path to an existing deepspeed config, or a dictionary, or a base64 encoded string. Received: {config_file_or_dict}"
- )
- self.config = config
- self.set_stage_and_offload()
- def set_stage_and_offload(self):
- # zero stage - this is done as early as possible, before model is created, to allow
- # ``is_deepspeed_zero3_enabled`` query and getting to the early deepspeed config object
- # during ``zero.Init()`` which needs to know the dtype, and some other hparams.
- self._stage = self.get_value("zero_optimization.stage", -1)
- # offload
- self._offload = False
- if self.is_zero2() or self.is_zero3():
- offload_devices_valid = set(["cpu", "nvme"])
- offload_devices = set(
- [
- self.get_value("zero_optimization.offload_optimizer.device"),
- self.get_value("zero_optimization.offload_param.device"),
- ]
- )
- if len(offload_devices & offload_devices_valid) > 0:
- self._offload = True
- def find_config_node(self, ds_key_long):
- config = self.config
- # find the config node of interest if it exists
- nodes = ds_key_long.split(".")
- ds_key = nodes.pop()
- for node in nodes:
- config = config.get(node)
- if config is None:
- return None, ds_key
- return config, ds_key
- def get_value(self, ds_key_long, default=None):
- """
- Returns the set value or `default` if no value is set
- """
- config, ds_key = self.find_config_node(ds_key_long)
- if config is None:
- return default
- return config.get(ds_key, default)
- def del_config_sub_tree(self, ds_key_long, must_exist=False):
- """
- Deletes a sub-section of the config file if it's found.
- Unless `must_exist` is `True` the section doesn't have to exist.
- """
- config = self.config
- # find the config node of interest if it exists
- nodes = ds_key_long.split(".")
- for node in nodes:
- parent_config = config
- config = config.get(node)
- if config is None:
- if must_exist:
- raise ValueError(f"Can't find {ds_key_long} entry in the config: {self.config}")
- else:
- return
- # if found remove it
- if parent_config is not None:
- parent_config.pop(node)
- def is_true(self, ds_key_long):
- """
- Returns `True`/``False` only if the value is set, always `False` otherwise. So use this method to ask the very
- specific question of whether the value is set to `True` (and it's not set to `False`` or isn't set).
- """
- value = self.get_value(ds_key_long)
- return False if value is None else bool(value)
- def is_false(self, ds_key_long):
- """
- Returns `True`/``False` only if the value is set, always `False` otherwise. So use this method to ask the very
- specific question of whether the value is set to `False` (and it's not set to `True`` or isn't set).
- """
- value = self.get_value(ds_key_long)
- return False if value is None else not bool(value)
- def is_zero2(self):
- return self._stage == 2
- def is_zero3(self):
- return self._stage == 3
- def is_offload(self):
- return self._offload
- class DeepSpeedEngineWrapper:
- """
- Internal wrapper for deepspeed.runtime.engine.DeepSpeedEngine. This is used to follow conventional training loop.
- Args:
- engine (deepspeed.runtime.engine.DeepSpeedEngine): deepspeed engine to wrap
- """
- def __init__(self, engine):
- self.engine = engine
- def backward(self, loss, sync_gradients=True, **kwargs):
- # Set gradient accumulation boundary based on Accelerate's sync_gradients state
- # This tells DeepSpeed whether this is the final micro-batch before gradient sync
- self.engine.set_gradient_accumulation_boundary(is_boundary=sync_gradients)
- # runs backpropagation and handles mixed precision
- self.engine.backward(loss, **kwargs)
- # Only perform step and related operations at gradient accumulation boundaries
- if sync_gradients:
- # Deepspeed's `engine.step` performs the following operations:
- # - gradient accumulation check
- # - gradient clipping
- # - optimizer step
- # - zero grad
- # - checking overflow
- # - lr_scheduler step (only if engine.lr_scheduler is not None)
- self.engine.step()
- # and this plugin overrides the above calls with no-ops when Accelerate runs under
- # Deepspeed, but allows normal functionality for non-Deepspeed cases thus enabling a simple
- # training loop that works transparently under many training regimes.
- def get_global_grad_norm(self):
- """Get the global gradient norm from DeepSpeed engine."""
- grad_norm = self.engine.get_global_grad_norm()
- # Convert to scalar if it's a tensor
- if hasattr(grad_norm, "item"):
- return grad_norm.item()
- return grad_norm
- class DeepSpeedOptimizerWrapper(AcceleratedOptimizer):
- """
- Internal wrapper around a deepspeed optimizer.
- Args:
- optimizer (`torch.optim.optimizer.Optimizer`):
- The optimizer to wrap.
- """
- def __init__(self, optimizer):
- super().__init__(optimizer, device_placement=False, scaler=None)
- self.__has_overflow__ = hasattr(self.optimizer, "overflow")
- def zero_grad(self, set_to_none=None):
- pass # `accelerator.backward(loss)` is doing that automatically. Therefore, its implementation is not needed
- def step(self):
- pass # `accelerator.backward(loss)` is doing that automatically. Therefore, its implementation is not needed
- @property
- def step_was_skipped(self):
- """Whether or not the optimizer step was done, or skipped because of gradient overflow."""
- if self.__has_overflow__:
- return self.optimizer.overflow
- return False
- class DeepSpeedSchedulerWrapper(AcceleratedScheduler):
- """
- Internal wrapper around a deepspeed scheduler.
- Args:
- scheduler (`torch.optim.lr_scheduler.LambdaLR`):
- The scheduler to wrap.
- optimizers (one or a list of `torch.optim.Optimizer`):
- """
- def __init__(self, scheduler, optimizers):
- super().__init__(scheduler, optimizers)
- def step(self):
- pass # `accelerator.backward(loss)` is doing that automatically. Therefore, its implementation is not needed
- class DummyOptim:
- """
- Dummy optimizer presents model parameters or param groups, this is primarily used to follow conventional training
- loop when optimizer config is specified in the deepspeed config file.
- Args:
- lr (float):
- Learning rate.
- params (iterable): iterable of parameters to optimize or dicts defining
- parameter groups
- weight_decay (float):
- Weight decay.
- **kwargs (additional keyword arguments, *optional*):
- Other arguments.
- """
- def __init__(self, params, lr=0.001, weight_decay=0, **kwargs):
- self.params = params
- self.lr = lr
- self.weight_decay = weight_decay
- self.kwargs = kwargs
- class DummyScheduler:
- """
- Dummy scheduler presents model parameters or param groups, this is primarily used to follow conventional training
- loop when scheduler config is specified in the deepspeed config file.
- Args:
- optimizer (`torch.optim.optimizer.Optimizer`):
- The optimizer to wrap.
- total_num_steps (int, *optional*):
- Total number of steps.
- warmup_num_steps (int, *optional*):
- Number of steps for warmup.
- lr_scheduler_callable (callable, *optional*):
- A callable function that creates an LR Scheduler. It accepts only one argument `optimizer`.
- **kwargs (additional keyword arguments, *optional*):
- Other arguments.
- """
- def __init__(self, optimizer, total_num_steps=None, warmup_num_steps=0, lr_scheduler_callable=None, **kwargs):
- self.optimizer = optimizer
- self.total_num_steps = total_num_steps
- self.warmup_num_steps = warmup_num_steps
- self.lr_scheduler_callable = lr_scheduler_callable
- self.kwargs = kwargs
|