| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- # Copyright 2021 The HuggingFace Team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import os
- import sys
- import tempfile
- import torch
- from .state import AcceleratorState, PartialState
- from .utils import (
- PrecisionType,
- PrepareForLaunch,
- are_libraries_initialized,
- check_cuda_p2p_ib_support,
- get_current_device_type,
- get_gpu_info,
- is_mps_available,
- is_torch_version,
- patch_environment,
- )
- from .utils.constants import ELASTIC_LOG_LINE_PREFIX_TEMPLATE_PYTORCH_VERSION
- def test_launch():
- "Verify a `PartialState` can be initialized."
- _ = PartialState()
- def notebook_launcher(
- function,
- args=(),
- num_processes=None,
- mixed_precision="no",
- use_port="29500",
- master_addr="127.0.0.1",
- node_rank=0,
- num_nodes=1,
- rdzv_backend="static",
- rdzv_endpoint="",
- rdzv_conf=None,
- rdzv_id="none",
- max_restarts=0,
- monitor_interval=0.1,
- log_line_prefix_template=None,
- ):
- """
- Launches a training function, using several processes or multiple nodes if it's possible in the current environment
- (TPU with multiple cores for instance).
- <Tip warning={true}>
- To use this function absolutely zero calls to a device must be made in the notebook session before calling. If any
- have been made, you will need to restart the notebook and make sure no cells use any device capability.
- Setting `ACCELERATE_DEBUG_MODE="1"` in your environment will run a test before truly launching to ensure that none
- of those calls have been made.
- </Tip>
- Args:
- function (`Callable`):
- The training function to execute. If it accepts arguments, the first argument should be the index of the
- process run.
- args (`Tuple`):
- Tuple of arguments to pass to the function (it will receive `*args`).
- num_processes (`int`, *optional*):
- The number of processes to use for training. Will default to 8 in Colab/Kaggle if a TPU is available, to
- the number of devices available otherwise.
- mixed_precision (`str`, *optional*, defaults to `"no"`):
- If `fp16` or `bf16`, will use mixed precision training on multi-device.
- use_port (`str`, *optional*, defaults to `"29500"`):
- The port to use to communicate between processes when launching a multi-device training.
- master_addr (`str`, *optional*, defaults to `"127.0.0.1"`):
- The address to use for communication between processes.
- node_rank (`int`, *optional*, defaults to 0):
- The rank of the current node.
- num_nodes (`int`, *optional*, defaults to 1):
- The number of nodes to use for training.
- rdzv_backend (`str`, *optional*, defaults to `"static"`):
- The rendezvous method to use, such as 'static' (the default) or 'c10d'
- rdzv_endpoint (`str`, *optional*, defaults to `""`):
- The endpoint of the rdzv sync. storage.
- rdzv_conf (`Dict`, *optional*, defaults to `None`):
- Additional rendezvous configuration.
- rdzv_id (`str`, *optional*, defaults to `"none"`):
- The unique run id of the job.
- max_restarts (`int`, *optional*, defaults to 0):
- The maximum amount of restarts that elastic agent will conduct on workers before failure.
- monitor_interval (`float`, *optional*, defaults to 0.1):
- The interval in seconds that is used by the elastic_agent as a period of monitoring workers.
- log_line_prefix_template (`str`, *optional*, defaults to `None`):
- The prefix template for elastic launch logging. Available from PyTorch 2.2.0.
- Example:
- ```python
- # Assume this is defined in a Jupyter Notebook on an instance with two devices
- from accelerate import notebook_launcher
- def train(*args):
- # Your training function here
- ...
- notebook_launcher(train, args=(arg1, arg2), num_processes=2, mixed_precision="fp16")
- ```
- """
- # Are we in a google colab or a Kaggle Kernel?
- in_colab = False
- in_kaggle = False
- if any(key.startswith("KAGGLE") for key in os.environ.keys()):
- in_kaggle = True
- elif "IPython" in sys.modules:
- in_colab = "google.colab" in str(sys.modules["IPython"].get_ipython())
- try:
- mixed_precision = PrecisionType(mixed_precision.lower())
- except ValueError:
- raise ValueError(
- f"Unknown mixed_precision mode: {args.mixed_precision.lower()}. Choose between {PrecisionType.list()}."
- )
- if (in_colab or in_kaggle) and (
- (os.environ.get("TPU_NAME", None) is not None) or (os.environ.get("PJRT_DEVICE", "") == "TPU")
- ):
- # TPU launch
- import torch_xla.distributed.xla_multiprocessing as xmp
- if len(AcceleratorState._shared_state) > 0:
- raise ValueError(
- "To train on TPU in Colab or Kaggle Kernel, the `Accelerator` should only be initialized inside "
- "your training function. Restart your notebook and make sure no cells initializes an "
- "`Accelerator`."
- )
- launcher = PrepareForLaunch(function, distributed_type="XLA")
- print("Launching a training on TPU cores.")
- xmp.spawn(launcher, args=args, start_method="fork")
- elif in_colab and get_gpu_info()[1] < 2:
- # No need for a distributed launch otherwise as it's either CPU or one GPU.
- if torch.cuda.is_available():
- print("Launching training on one GPU.")
- else:
- print("Launching training on one CPU.")
- function(*args)
- else:
- if num_processes is None:
- raise ValueError(
- "You have to specify the number of devices you would like to use, add `num_processes=...` to your call."
- )
- if node_rank >= num_nodes:
- raise ValueError("The node_rank must be less than the number of nodes.")
- if num_processes > 1:
- # Multi-device launch
- from torch.distributed.launcher.api import LaunchConfig, elastic_launch
- from torch.multiprocessing import start_processes
- from torch.multiprocessing.spawn import ProcessRaisedException
- if len(AcceleratorState._shared_state) > 0:
- raise ValueError(
- "To launch a multi-device training from your notebook, the `Accelerator` should only be initialized "
- "inside your training function. Restart your notebook and make sure no cells initializes an "
- "`Accelerator`."
- )
- # Check for specific libraries known to initialize device that users constantly use
- problematic_imports = are_libraries_initialized("bitsandbytes")
- if len(problematic_imports) > 0:
- err = (
- "Could not start distributed process. Libraries known to initialize device upon import have been "
- "imported already. Please keep these imports inside your training function to try and help with this:"
- )
- for lib_name in problematic_imports:
- err += f"\n\t* `{lib_name}`"
- raise RuntimeError(err)
- patched_env = dict(
- nproc=num_processes,
- node_rank=node_rank,
- world_size=num_nodes * num_processes,
- master_addr=master_addr,
- master_port=use_port,
- mixed_precision=mixed_precision,
- )
- # Check for CUDA P2P and IB issues
- if not check_cuda_p2p_ib_support():
- patched_env["nccl_p2p_disable"] = "1"
- patched_env["nccl_ib_disable"] = "1"
- # torch.distributed will expect a few environment variable to be here. We set the ones common to each
- # process here (the other ones will be set be the launcher).
- with patch_environment(**patched_env):
- # First dummy launch
- # Determine device type without initializing any device (which would break fork)
- device_type, distributed_type = get_current_device_type()
- if os.environ.get("ACCELERATE_DEBUG_MODE", "false").lower() == "true":
- launcher = PrepareForLaunch(test_launch, distributed_type=distributed_type)
- try:
- start_processes(launcher, args=(), nprocs=num_processes, start_method="fork")
- except ProcessRaisedException as e:
- err = "An issue was found when verifying a stable environment for the notebook launcher."
- if f"Cannot re-initialize {device_type.upper()} in forked subprocess" in e.args[0]:
- raise RuntimeError(
- f"{err}"
- "This likely stems from an outside import causing issues once the `notebook_launcher()` is called. "
- "Please review your imports and test them when running the `notebook_launcher()` to identify "
- f"which one is problematic and causing {device_type.upper()} to be initialized."
- ) from e
- else:
- raise RuntimeError(f"{err} The following error was raised: {e}") from e
- # Now the actual launch
- launcher = PrepareForLaunch(function, distributed_type=distributed_type)
- print(f"Launching training on {num_processes} {device_type.upper()}s.")
- try:
- if rdzv_conf is None:
- rdzv_conf = {}
- if rdzv_backend == "static":
- rdzv_conf["rank"] = node_rank
- if not rdzv_endpoint:
- rdzv_endpoint = f"{master_addr}:{use_port}"
- launch_config_kwargs = dict(
- min_nodes=num_nodes,
- max_nodes=num_nodes,
- nproc_per_node=num_processes,
- run_id=rdzv_id,
- rdzv_endpoint=rdzv_endpoint,
- rdzv_backend=rdzv_backend,
- rdzv_configs=rdzv_conf,
- max_restarts=max_restarts,
- monitor_interval=monitor_interval,
- start_method="fork",
- )
- if is_torch_version(">=", ELASTIC_LOG_LINE_PREFIX_TEMPLATE_PYTORCH_VERSION):
- launch_config_kwargs["log_line_prefix_template"] = log_line_prefix_template
- elastic_launch(config=LaunchConfig(**launch_config_kwargs), entrypoint=function)(*args)
- except ProcessRaisedException as e:
- if f"Cannot re-initialize {device_type.upper()} in forked subprocess" in e.args[0]:
- raise RuntimeError(
- f"{device_type.upper()} has been initialized before the `notebook_launcher` could create a forked subprocess. "
- "This likely stems from an outside import causing issues once the `notebook_launcher()` is called. "
- "Please review your imports and test them when running the `notebook_launcher()` to identify "
- f"which one is problematic and causing {device_type.upper()} to be initialized."
- ) from e
- else:
- raise RuntimeError(f"An issue was found when launching the training: {e}") from e
- else:
- # No need for a distributed launch otherwise as it's either CPU, GPU, XPU or MPS.
- if is_mps_available():
- os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
- print("Launching training on MPS.")
- elif torch.cuda.is_available():
- print("Launching training on one GPU.")
- elif torch.xpu.is_available():
- print("Launching training on one XPU.")
- else:
- print("Launching training on CPU.")
- function(*args)
- def debug_launcher(function, args=(), num_processes=2):
- """
- Launches a training function using several processes on CPU for debugging purposes.
- <Tip warning={true}>
- This function is provided for internal testing and debugging, but it's not intended for real trainings. It will
- only use the CPU.
- </Tip>
- Args:
- function (`Callable`):
- The training function to execute.
- args (`Tuple`):
- Tuple of arguments to pass to the function (it will receive `*args`).
- num_processes (`int`, *optional*, defaults to 2):
- The number of processes to use for training.
- """
- from torch.multiprocessing import start_processes
- with tempfile.NamedTemporaryFile() as tmp_file:
- # torch.distributed will expect a few environment variable to be here. We set the ones common to each
- # process here (the other ones will be set be the launcher).
- with patch_environment(
- world_size=num_processes,
- master_addr="127.0.0.1",
- master_port="29500",
- accelerate_mixed_precision="no",
- accelerate_debug_rdv_file=tmp_file.name,
- accelerate_use_cpu="yes",
- ):
- launcher = PrepareForLaunch(function, debug=True)
- start_processes(launcher, args=args, nprocs=num_processes, start_method="fork")
|