| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- # Copyright (c) Alibaba, Inc. and its affiliates.
- # Copyright 2020 The HuggingFace Team. All rights reserved.
- import math
- import os
- import shutil
- from functools import partialmethod
- import deepspeed
- import torch
- from deepspeed import DeepSpeedEngine
- from megatron_util import mpu, print_rank_0
- from transformers.deepspeed import HfTrainerDeepSpeedConfig
- from modelscope.metainfo import Hooks
- from modelscope.trainers.hooks import LoadCheckpointHook
- from modelscope.trainers.hooks.builder import HOOKS
- from modelscope.trainers.hooks.checkpoint.checkpoint_hook import (
- BestCkptSaverHook, CheckpointHook)
- from modelscope.trainers.hooks.checkpoint.checkpoint_processor import \
- CheckpointProcessor
- from modelscope.trainers.hooks.hook import Hook
- from modelscope.trainers.hooks.lr_scheduler_hook import (LrSchedulerHook,
- LrSchedulerProcessor)
- from modelscope.trainers.hooks.optimizer.base import (OptimizerHook,
- OptimizerProcessor)
- from modelscope.trainers.hooks.priority import Priority
- from modelscope.utils.checkpoint import save_checkpoint
- from modelscope.utils.constant import DistributedParallelType
- from modelscope.utils.device import create_device
- from modelscope.utils.logger import get_logger
- from modelscope.utils.torch_utils import (get_dist_info, get_local_rank,
- init_dist)
- class DeepSpeedConfig(HfTrainerDeepSpeedConfig):
- """
- The `DeepSpeedConfig` object is meant to be created during `TrainingArguments` object creation and has the
- same lifespan as the latter.
- """
- def is_auto(self, ds_key_long):
- val = self.get_value(ds_key_long)
- if val is None:
- return False
- else:
- return val == 'auto'
- def trainer_config_finalize(self, args, model, num_training_steps):
- """
- This stage runs after we have the model and know num_training_steps.
- Now we can complete the configuration process.
- """
- # zero
- # deal with config keys that use `auto` value and rely on model's hidden_size
- hidden_size_based_keys = [
- 'zero_optimization.reduce_bucket_size',
- 'zero_optimization.stage3_prefetch_bucket_size',
- 'zero_optimization.stage3_param_persistence_threshold',
- ]
- hidden_size_auto_keys = [
- x for x in hidden_size_based_keys if self.is_auto(x)
- ]
- if len(hidden_size_auto_keys) > 0:
- if hasattr(model.config, 'hidden_size'):
- hidden_size = model.config.hidden_size
- elif hasattr(model.config, 'hidden_sizes'):
- # if there are many hidden sizes pick the largest one
- hidden_size = max(model.config.hidden_sizes)
- else:
- raise ValueError(
- "The model's config file has neither `hidden_size` nor `hidden_sizes` entry, "
- "therefore it's not possible to automatically fill out the following `auto` entries "
- f'in the DeepSpeed config file: {hidden_size_auto_keys}. You can fix that by replacing '
- '`auto` values for these keys with an integer value of your choice.'
- )
- self.fill_only('zero_optimization.reduce_bucket_size',
- hidden_size * hidden_size)
- if self.is_zero3():
- # automatically assign the optimal config values based on model config
- self.fill_only('zero_optimization.stage3_prefetch_bucket_size',
- 0.9 * hidden_size * hidden_size)
- self.fill_only(
- 'zero_optimization.stage3_param_persistence_threshold',
- 10 * hidden_size)
- # scheduler
- options = args.train.optimizer.get('options', {})
- warmup = options.get('warmup', {})
- warmup_steps = warmup.get('warmup_steps', 0)
- warmup_ratio = warmup.get('warmup_ratio', 0.0)
- warmup_steps = warmup_steps if warmup_steps > 0 else math.ceil(
- num_training_steps * warmup_ratio)
- self.fill_match('scheduler.params.total_num_steps', num_training_steps)
- self.fill_match('scheduler.params.warmup_num_steps', warmup_steps)
- if len(self.mismatches) > 0:
- mismatches = '\n'.join(self.mismatches)
- raise ValueError(
- 'Please correct the following DeepSpeed config values that mismatch TrainingArguments'
- f" values:\n{mismatches}\nThe easiest method is to set these DeepSpeed config values to 'auto'."
- )
- def deepspeed_optim_sched(trainer, hf_deepspeed_config, num_training_steps):
- config = hf_deepspeed_config.config
- optimizer = None
- if 'optimizer' not in config:
- if hf_deepspeed_config.is_offload():
- logger.info(
- 'Detected ZeRO Offload and non-DeepSpeed optimizers: This combination should work as long as the'
- ' custom optimizer has both CPU and GPU implementation (except LAMB)'
- )
- # ds supports Adam, OneBitAdam, and Lamb optimizers and can import other optimizers from torch.
- # But trainer uses AdamW by default.
- optimizer = trainer.optimizer
- # To use other optimizers requires voiding warranty with: `zero_allow_untested_optimizer`
- config['zero_allow_untested_optimizer'] = True
- lr_scheduler = None
- if 'scheduler' not in config:
- lr_scheduler = trainer.scheduler
- return optimizer, lr_scheduler
- class DeepspeedProcessor(CheckpointProcessor, LrSchedulerProcessor,
- OptimizerProcessor):
- _BIN_FILE_DIR = 'model'
- def rank_name(self):
- # TODO
- try:
- tp_world_size = mpu.get_tensor_model_parallel_world_size()
- if tp_world_size == 1:
- return ''
- mp_rank = mpu.get_tensor_model_parallel_rank()
- return '_mp_rank_{:02d}'.format(mp_rank)
- except (ImportError, AssertionError):
- return ''
- def get_bin_filename(self, with_mpu=True):
- if not with_mpu:
- return 'pytorch_model.bin'
- else:
- mp_rank = mpu.get_tensor_model_parallel_rank()
- rank = '{:02d}'.format(mp_rank)
- return f'mp_rank_{rank}_model_states.pt'
- def save_checkpoints(self,
- trainer,
- checkpoint_path_prefix,
- output_dir,
- meta=None,
- save_optimizers=True):
- model = trainer.unwrap_module(trainer.model)
- _train_state_file = checkpoint_path_prefix + self.rank_name(
- ) + CheckpointProcessor.TRAINER_STATE_SUFFIX
- # Save pth file without model state_dict
- save_checkpoint(
- model, _train_state_file, None, None, meta=meta, with_model=False)
- save_dir = os.path.dirname(checkpoint_path_prefix)
- prefix = os.path.basename(checkpoint_path_prefix)
- with_mpu = not mpu.is_unitialized()
- bin_file = self.get_bin_filename(with_mpu)
- src_file = os.path.join(checkpoint_path_prefix, bin_file)
- if self.zero_stage == 3 or with_mpu:
- trainer.model.save_checkpoint(save_dir, prefix)
- else:
- save_checkpoint(
- model, src_file, None, None, meta=None, with_meta=False)
- if self.zero_stage == 3:
- return
- if with_mpu:
- dest_file = os.path.join(output_dir, self._BIN_FILE_DIR, bin_file)
- else:
- dest_file = os.path.join(output_dir, bin_file)
- if os.path.isfile(dest_file):
- os.unlink(dest_file)
- try:
- os.link(src_file, dest_file)
- except OSError as e:
- get_logger().error(
- f'Link {src_file} to {dest_file} error: {e}, '
- 'changing to copy the bin file, this may case more space usage.'
- )
- shutil.copyfile(src_file, dest_file)
- def remove_checkpoints(self, trainer, checkpoint_path_prefix):
- _train_state_file = checkpoint_path_prefix + self.rank_name(
- ) + CheckpointProcessor.TRAINER_STATE_SUFFIX
- if os.path.isfile(_train_state_file):
- os.remove(_train_state_file)
- shutil.rmtree(checkpoint_path_prefix, ignore_errors=True)
- def load_checkpoints(self, checkpoint_path_prefix, trainer, load_all_state,
- strict):
- assert os.path.isdir(checkpoint_path_prefix)
- path = os.path.dirname(checkpoint_path_prefix)
- tag = os.path.basename(checkpoint_path_prefix)
- meta = {}
- _train_state_file = checkpoint_path_prefix + self.rank_name(
- ) + CheckpointProcessor.TRAINER_STATE_SUFFIX
- if os.path.isfile(_train_state_file):
- meta = self.load_trainer_state(trainer, _train_state_file,
- load_all_state)
- if isinstance(trainer.model, DeepSpeedEngine):
- # DeepSpeedEngine is initialized
- trainer.model.load_checkpoint(
- path,
- tag,
- load_module_strict=strict,
- load_module_only=not load_all_state,
- )
- else:
- # in eval or prediction
- save_dir = checkpoint_path_prefix
- bin_file = self.get_bin_filename()
- model_file = os.path.join(save_dir, bin_file)
- checkpoint = torch.load(
- model_file, map_location=lambda storage, loc: storage)
- checkpoint = checkpoint['module']
- model_dict = trainer.unwrap_module(trainer.model).state_dict()
- for key in checkpoint:
- if key not in model_dict.keys():
- print_rank_0('Skip key: ' + key)
- else:
- print_rank_0('Loading key: ' + key)
- trainer.unwrap_module(trainer.model).load_state_dict(
- checkpoint, strict=strict)
- return meta
- def backward(self, trainer, loss_keys, cumulative_iters, grad_clip):
- # assert cumulative_iters == 1, 'DeepSpeed only support cumulative_iters=1'
- # The `trainer.model` here is actually a deepspeed engine object.
- # backward step
- for k in loss_keys:
- loss = trainer.train_outputs[k]
- trainer.model.backward(loss)
- # update parameters
- # Optimizer step for deepspeed must be called on every step regardless of
- # the value of gradient accumulation iters
- trainer.model.step()
- def initialize_optimizer(self, trainer):
- pass
- def step(self, trainer):
- pass
- def should_save_on_rank(self, trainer):
- return True
- def get_current_lr(self, trainer):
- if isinstance(trainer.optimizer, torch.optim.Optimizer) or isinstance(
- trainer.optimizer, deepspeed.DeepSpeedOptimizer):
- lr = [group['lr'] for group in trainer.optimizer.param_groups]
- elif isinstance(trainer.optimizer, dict):
- lr = dict()
- for name, optim in trainer.optimizer.items():
- lr[name] = [group['lr'] for group in optim.param_groups]
- else:
- raise RuntimeError(
- 'lr is not applicable because optimizer does not exist.')
- return lr
- @HOOKS.register_module(module_name=Hooks.DeepspeedHook)
- class DeepspeedHook(Hook):
- PRIORITY = Priority.VERY_HIGH
- def __init__(self,
- config=None,
- deepspeed_activation_checkpointing=True,
- save_zero_checkpoint=False,
- with_mpu=True,
- zero_stage=None):
- self.save_zero_checkpoint = save_zero_checkpoint
- self.deepspeed_activation_checkpointing = deepspeed_activation_checkpointing
- self.with_mpu = with_mpu
- self.deepspeed_config = config
- if zero_stage is not None:
- assert zero_stage in (0, 1, 2,
- 3), 'zero_stage must in (0, 1, 2, 3)!'
- self.zero_stage = zero_stage
- def register_processor(self, trainer):
- processor = DeepspeedProcessor()
- optimizer_hook = trainer.get_hook(OptimizerHook)
- if len(optimizer_hook) > 0 and not isinstance(
- optimizer_hook[0].processor, DeepspeedProcessor):
- optimizer_hook[0].set_processor(processor)
- ckpt_hook = trainer.get_hook(CheckpointHook)
- if len(ckpt_hook) > 0 and not isinstance(ckpt_hook[0].processor,
- DeepspeedProcessor):
- ckpt_hook[0].set_processor(processor)
- best_ckpt_hook = trainer.get_hook(BestCkptSaverHook)
- if len(best_ckpt_hook) > 0 and not isinstance(
- best_ckpt_hook[0].processor, DeepspeedProcessor):
- best_ckpt_hook[0].set_processor(processor)
- load_ckpt_hook = trainer.get_hook(LoadCheckpointHook)
- if len(load_ckpt_hook) > 0 and not isinstance(
- load_ckpt_hook[0].processor, DeepspeedProcessor):
- load_ckpt_hook[0].set_processor(processor)
- lr_scheduler_hook = trainer.get_hook(LrSchedulerHook)
- if len(lr_scheduler_hook) > 0 and not isinstance(
- lr_scheduler_hook[0].processor, DeepspeedProcessor):
- lr_scheduler_hook[0].set_processor(processor)
- self.processor = processor
- def prepare_args(self, args):
- args.per_device_train_batch_size = args.train.dataloader.get(
- 'batch_size_per_gpu', 4)
- args.max_grad_norm = args.train.get('clip_grad', 1.0)
- args.learning_rate = args.train.optimizer.get('lr', 2e-5)
- args.adam_beta1 = args.train.optimizer.get('adam_beta1', 0.9)
- args.adam_beta2 = args.train.optimizer.get('adam_beta2', 0.999)
- args.adam_epsilon = args.train.optimizer.get('adam_epsilon', 1e-8)
- args.weight_decay = args.train.optimizer.get('weight_decay', 0.0)
- args.fp16 = args.train.get('use_fp16', False)
- args.fp16_full_eval = args.train.get('use_fp16', False)
- args.fp16_backend = args.train.get('fp16_backend', 'amp')
- args.save_on_each_node = args.train.get('save_on_each_node', False)
- args.fp16_opt_level = args.train.get('fp16_opt_level', None)
- args.fp16_opt_level = next((item.get('opt_level', args.fp16_opt_level)
- for item in args.train.hooks
- if item['type'] == 'ApexAMPOptimizerHook'),
- args.fp16_opt_level)
- if not args.fp16_opt_level:
- args.fp16_opt_level = 'O1'
- args.bf16 = args.train.get('bf16', False)
- def get_deepspeed_config(self, trainer, args, max_steps):
- _, args.world_size = get_dist_info()
- self.prepare_args(args)
- if os.path.exists(self.deepspeed_config):
- deepspeed_config = self.deepspeed_config
- else:
- deepspeed_config = os.path.join(trainer.model_dir,
- self.deepspeed_config)
- if not os.path.exists(deepspeed_config):
- raise RuntimeError(
- f'No such DeepSpeed json config file: {self.deepspeed_config}.'
- )
- self.logger.info(f'Loading deepspeed config from {deepspeed_config}')
- ds_config = DeepSpeedConfig(deepspeed_config)
- ds_config.trainer_config_process(args)
- ds_config.trainer_config_finalize(args, trainer.model, max_steps)
- return ds_config
- def after_init(self, trainer):
- init_dist('pytorch')
- local_rank = get_local_rank()
- trainer.device = create_device(f'cuda:{local_rank}')
- trainer.model.to(trainer.device)
- trainer.parallel_groups[DistributedParallelType.DP] = None
- def before_val(self, trainer):
- pass
- def before_run(self, trainer):
- if not hasattr(trainer, 'logger'):
- self.logger = get_logger()
- else:
- self.logger = trainer.logger
- # deepspeed init
- args = trainer.cfg
- args.gradient_accumulation_steps = args.train.optimizer.get(
- 'options', {}).get('cumulative_iters', 1)
- num_update_steps_per_epoch = trainer.iters_per_epoch // args.gradient_accumulation_steps
- max_steps = math.ceil(trainer._max_epochs * num_update_steps_per_epoch)
- ds_config = self.get_deepspeed_config(trainer, args, max_steps)
- optimizer, lr_scheduler = deepspeed_optim_sched(
- trainer, ds_config, max_steps)
- config = ds_config.config
- if self.zero_stage is not None:
- config['zero_optimization']['stage'] = self.zero_stage
- self.processor.zero_stage = config['zero_optimization'].get('stage', 0)
- trainer.model, trainer.optimizer, _, trainer.lr_scheduler = deepspeed.initialize(
- model=trainer.model,
- optimizer=optimizer,
- config=config,
- lr_scheduler=lr_scheduler)
|