training_args_sm.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # Copyright 2021 The HuggingFace Team. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import importlib.util
  15. import json
  16. import os
  17. import warnings
  18. from dataclasses import dataclass, field
  19. from functools import cached_property
  20. import torch
  21. from ..training_args import TrainingArguments
  22. from ..utils import is_sagemaker_dp_enabled, logging
  23. logger = logging.get_logger(__name__)
  24. # TODO: should be moved to `utils` after refactoring of SageMakerTrainer
  25. def is_sagemaker_model_parallel_available():
  26. # Get the sagemaker specific mp parameters from smp_options variable.
  27. smp_options = os.getenv("SM_HP_MP_PARAMETERS", "{}")
  28. try:
  29. # Parse it and check the field "partitions" is included, it is required for model parallel.
  30. smp_options = json.loads(smp_options)
  31. if "partitions" not in smp_options:
  32. return False
  33. except json.JSONDecodeError:
  34. return False
  35. # Get the sagemaker specific framework parameters from mpi_options variable.
  36. mpi_options = os.getenv("SM_FRAMEWORK_PARAMS", "{}")
  37. try:
  38. # Parse it and check the field "sagemaker_distributed_dataparallel_enabled".
  39. mpi_options = json.loads(mpi_options)
  40. if not mpi_options.get("sagemaker_mpi_enabled", False):
  41. return False
  42. except json.JSONDecodeError:
  43. return False
  44. # Lastly, check if the `smdistributed` module is present.
  45. return importlib.util.find_spec("smdistributed") is not None
  46. if is_sagemaker_model_parallel_available():
  47. import smdistributed.modelparallel.torch as smp
  48. smp.init()
  49. @dataclass
  50. class SageMakerTrainingArguments(TrainingArguments):
  51. mp_parameters: str = field(
  52. default="",
  53. metadata={"help": "Used by the SageMaker launcher to send mp-specific args. Ignored in SageMakerTrainer"},
  54. )
  55. def __post_init__(self):
  56. super().__post_init__()
  57. warnings.warn(
  58. "`SageMakerTrainingArguments` is deprecated and will be removed in v5 of Transformers. You can use "
  59. "`TrainingArguments` instead.",
  60. FutureWarning,
  61. )
  62. @cached_property
  63. def _setup_devices(self) -> "torch.device":
  64. logger.info("PyTorch: setting up devices")
  65. if torch.distributed.is_available() and torch.distributed.is_initialized() and self.local_rank == -1:
  66. logger.warning(
  67. "torch.distributed process group is initialized, but local_rank == -1. "
  68. "In order to use Torch DDP, launch your script with `python -m torch.distributed.launch"
  69. )
  70. if self.no_cuda:
  71. device = torch.device("cpu")
  72. self._n_gpu = 0
  73. elif is_sagemaker_model_parallel_available():
  74. local_rank = smp.local_rank()
  75. device = torch.device("cuda", local_rank)
  76. self._n_gpu = 1
  77. elif is_sagemaker_dp_enabled():
  78. import smdistributed.dataparallel.torch.torch_smddp # noqa: F401
  79. torch.distributed.init_process_group(backend="smddp", timeout=self.ddp_timeout_delta)
  80. self.local_rank = int(os.getenv("SMDATAPARALLEL_LOCAL_RANK"))
  81. device = torch.device("cuda", self.local_rank)
  82. self._n_gpu = 1
  83. elif self.local_rank == -1:
  84. # if n_gpu is > 1 we'll use nn.DataParallel.
  85. # If you only want to use a specific subset of GPUs use `CUDA_VISIBLE_DEVICES=0`
  86. # Explicitly set CUDA to the first (index 0) CUDA device, otherwise `set_device` will
  87. # trigger an error that a device index is missing. Index 0 takes into account the
  88. # GPUs available in the environment, so `CUDA_VISIBLE_DEVICES=1,2` with `cuda:0`
  89. # will use the first GPU in that env, i.e. GPU#1
  90. device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
  91. # Sometimes the line in the postinit has not been run before we end up here, so just checking we're not at
  92. # the default value.
  93. self._n_gpu = torch.cuda.device_count()
  94. else:
  95. # Here, we'll use torch.distributed.
  96. # Initializes the distributed backend which will take care of synchronizing nodes/GPUs
  97. if not torch.distributed.is_initialized():
  98. torch.distributed.init_process_group(backend="nccl", timeout=self.ddp_timeout_delta)
  99. device = torch.device("cuda", self.local_rank)
  100. self._n_gpu = 1
  101. if device.type == "cuda":
  102. torch.cuda.set_device(device)
  103. return device
  104. @property
  105. def world_size(self):
  106. if is_sagemaker_model_parallel_available():
  107. return smp.dp_size()
  108. return super().world_size
  109. @property
  110. def place_model_on_device(self):
  111. return not is_sagemaker_model_parallel_available()
  112. @property
  113. def _no_sync_in_gradient_accumulation(self):
  114. return False