| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- # Copyright 2022 The HuggingFace Team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- import math
- import os
- import platform
- import subprocess
- import sys
- from contextlib import contextmanager
- from dataclasses import dataclass, field
- from functools import lru_cache, wraps
- from shutil import which
- from typing import Optional, Union
- import torch
- from packaging.version import parse
- logger = logging.getLogger(__name__)
- def convert_dict_to_env_variables(current_env: dict):
- """
- Verifies that all keys and values in `current_env` do not contain illegal keys or values, and returns a list of
- strings as the result.
- Example:
- ```python
- >>> from accelerate.utils.environment import verify_env
- >>> env = {"ACCELERATE_DEBUG_MODE": "1", "BAD_ENV_NAME": "<mything", "OTHER_ENV": "2"}
- >>> valid_env_items = verify_env(env)
- >>> print(valid_env_items)
- ["ACCELERATE_DEBUG_MODE=1\n", "OTHER_ENV=2\n"]
- ```
- """
- forbidden_chars = [";", "\n", "<", ">", " "]
- valid_env_items = []
- for key, value in current_env.items():
- if all(char not in (key + value) for char in forbidden_chars) and len(key) >= 1 and len(value) >= 1:
- valid_env_items.append(f"{key}={value}\n")
- else:
- logger.warning(f"WARNING: Skipping {key}={value} as it contains forbidden characters or missing values.")
- return valid_env_items
- def str_to_bool(value, to_bool: bool = False) -> Union[int, bool]:
- """
- Converts a string representation of truth to `True` (1) or `False` (0).
- True values are `y`, `yes`, `t`, `true`, `on`, and `1`; False value are `n`, `no`, `f`, `false`, `off`, and `0`;
- """
- value = value.lower()
- if value in ("y", "yes", "t", "true", "on", "1"):
- return 1 if not to_bool else True
- elif value in ("n", "no", "f", "false", "off", "0"):
- return 0 if not to_bool else False
- else:
- raise ValueError(f"invalid truth value {value}")
- def get_int_from_env(env_keys, default):
- """Returns the first positive env value found in the `env_keys` list or the default."""
- for e in env_keys:
- val = int(os.environ.get(e, -1))
- if val >= 0:
- return val
- return default
- def parse_flag_from_env(key, default=False):
- """Returns truthy value for `key` from the env if available else the default."""
- value = os.environ.get(key, str(default))
- return str_to_bool(value) == 1 # As its name indicates `str_to_bool` actually returns an int...
- def parse_choice_from_env(key, default="no"):
- value = os.environ.get(key, str(default))
- return value
- def are_libraries_initialized(*library_names: str) -> list[str]:
- """
- Checks if any of `library_names` are imported in the environment. Will return any names that are.
- """
- return [lib_name for lib_name in library_names if lib_name in sys.modules.keys()]
- def get_current_device_type() -> tuple[str, str]:
- """
- Determines the current device type and distributed type without initializing any device.
- This is particularly important when using fork-based multiprocessing, as device initialization
- before forking can cause errors.
- The device detection order follows the same priority as state.py:_prepare_backend():
- MLU -> SDAA -> MUSA -> NPU -> HPU -> CUDA -> XPU
- Returns:
- tuple[str, str]: A tuple of (device_type, distributed_type)
- - device_type: The device string (e.g., "cuda", "npu", "xpu")
- - distributed_type: The distributed type string (e.g., "MULTI_GPU", "MULTI_NPU")
- Example:
- ```python
- >>> device_type, distributed_type = get_current_device_type()
- >>> print(device_type) # "cuda"
- >>> print(distributed_type) # "MULTI_GPU"
- ```
- """
- from .imports import (
- is_hpu_available,
- is_mlu_available,
- is_musa_available,
- is_npu_available,
- is_sdaa_available,
- is_xpu_available,
- )
- if is_mlu_available():
- return "mlu", "MULTI_MLU"
- elif is_sdaa_available():
- return "sdaa", "MULTI_SDAA"
- elif is_musa_available():
- return "musa", "MULTI_MUSA"
- elif is_npu_available():
- return "npu", "MULTI_NPU"
- elif is_hpu_available():
- return "hpu", "MULTI_HPU"
- elif torch.cuda.is_available():
- return "cuda", "MULTI_GPU"
- elif is_xpu_available():
- return "xpu", "MULTI_XPU"
- else:
- # Default to CUDA even if not available (for CPU-only scenarios where CUDA code paths are still used)
- return "cuda", "MULTI_GPU"
- def _nvidia_smi():
- """
- Returns the right nvidia-smi command based on the system.
- """
- if platform.system() == "Windows":
- # If platform is Windows and nvidia-smi can't be found in path
- # try from systemd drive with default installation path
- command = which("nvidia-smi")
- if command is None:
- command = f"{os.environ['systemdrive']}\\Program Files\\NVIDIA Corporation\\NVSMI\\nvidia-smi.exe"
- else:
- command = "nvidia-smi"
- return command
- def get_gpu_info():
- """
- Gets GPU count and names using `nvidia-smi` instead of torch to not initialize CUDA.
- Largely based on the `gputil` library.
- """
- # Returns as list of `n` GPUs and their names
- output = subprocess.check_output(
- [_nvidia_smi(), "--query-gpu=count,name", "--format=csv,noheader"], universal_newlines=True
- )
- output = output.strip()
- gpus = output.split(os.linesep)
- # Get names from output
- gpu_count = len(gpus)
- gpu_names = [gpu.split(",")[1].strip() for gpu in gpus]
- return gpu_names, gpu_count
- def get_driver_version():
- """
- Returns the driver version
- In the case of multiple GPUs, will return the first.
- """
- output = subprocess.check_output(
- [_nvidia_smi(), "--query-gpu=driver_version", "--format=csv,noheader"], universal_newlines=True
- )
- output = output.strip()
- return output.split(os.linesep)[0]
- def check_cuda_p2p_ib_support():
- """
- Checks if the devices being used have issues with P2P and IB communications, namely any consumer GPU hardware after
- the 3090.
- Notably uses `nvidia-smi` instead of torch to not initialize CUDA.
- """
- try:
- device_names, device_count = get_gpu_info()
- # As new consumer GPUs get released, add them to `unsupported_devices``
- unsupported_devices = {"RTX 40"}
- if device_count > 1:
- if any(
- unsupported_device in device_name
- for device_name in device_names
- for unsupported_device in unsupported_devices
- ):
- # Check if they have the right driver version
- acceptable_driver_version = "550.40.07"
- current_driver_version = get_driver_version()
- if parse(current_driver_version) < parse(acceptable_driver_version):
- return False
- return True
- except Exception:
- pass
- return True
- @lru_cache
- def check_cuda_fp8_capability():
- """
- Checks if the current GPU available supports FP8.
- Notably might initialize `torch.cuda` to check.
- """
- try:
- # try to get the compute capability from nvidia-smi
- output = subprocess.check_output(
- [_nvidia_smi(), "--query-gpu=compute_capability", "--format=csv,noheader"], universal_newlines=True
- )
- output = output.strip()
- # we take the first GPU's compute capability
- compute_capability = tuple(map(int, output.split(os.linesep)[0].split(".")))
- except Exception:
- compute_capability = torch.cuda.get_device_capability()
- return compute_capability >= (8, 9)
- @dataclass
- class CPUInformation:
- """
- Stores information about the CPU in a distributed environment. It contains the following attributes:
- - rank: The rank of the current process.
- - world_size: The total number of processes in the world.
- - local_rank: The rank of the current process on the local node.
- - local_world_size: The total number of processes on the local node.
- """
- rank: int = field(default=0, metadata={"help": "The rank of the current process."})
- world_size: int = field(default=1, metadata={"help": "The total number of processes in the world."})
- local_rank: int = field(default=0, metadata={"help": "The rank of the current process on the local node."})
- local_world_size: int = field(default=1, metadata={"help": "The total number of processes on the local node."})
- def get_cpu_distributed_information() -> CPUInformation:
- """
- Returns various information about the environment in relation to CPU distributed training as a `CPUInformation`
- dataclass.
- """
- information = {}
- information["rank"] = get_int_from_env(["RANK", "PMI_RANK", "OMPI_COMM_WORLD_RANK", "MV2_COMM_WORLD_RANK"], 0)
- information["world_size"] = get_int_from_env(
- ["WORLD_SIZE", "PMI_SIZE", "OMPI_COMM_WORLD_SIZE", "MV2_COMM_WORLD_SIZE"], 1
- )
- information["local_rank"] = get_int_from_env(
- ["LOCAL_RANK", "MPI_LOCALRANKID", "OMPI_COMM_WORLD_LOCAL_RANK", "MV2_COMM_WORLD_LOCAL_RANK"], 0
- )
- information["local_world_size"] = get_int_from_env(
- ["LOCAL_WORLD_SIZE", "MPI_LOCALNRANKS", "OMPI_COMM_WORLD_LOCAL_SIZE", "MV2_COMM_WORLD_LOCAL_SIZE"],
- 1,
- )
- return CPUInformation(**information)
- def override_numa_affinity(local_process_index: int, verbose: Optional[bool] = None) -> None:
- """
- Overrides whatever NUMA affinity is set for the current process. This is very taxing and requires recalculating the
- affinity to set, ideally you should use `utils.environment.set_numa_affinity` instead.
- Args:
- local_process_index (int):
- The index of the current process on the current server.
- verbose (bool, *optional*):
- Whether to log out the assignment of each CPU. If `ACCELERATE_DEBUG_MODE` is enabled, will default to True.
- """
- if verbose is None:
- verbose = parse_flag_from_env("ACCELERATE_DEBUG_MODE", False)
- if torch.cuda.is_available():
- from accelerate.utils import is_pynvml_available
- if not is_pynvml_available():
- raise ImportError(
- "To set CPU affinity on CUDA GPUs the `nvidia-ml-py` package must be available. (`pip install nvidia-ml-py`)"
- )
- import pynvml as nvml
- # The below code is based on https://github.com/NVIDIA/DeepLearningExamples/blob/master/TensorFlow2/LanguageModeling/BERT/gpu_affinity.py
- nvml.nvmlInit()
- num_elements = math.ceil(os.cpu_count() / 64)
- handle = nvml.nvmlDeviceGetHandleByIndex(local_process_index)
- affinity_string = ""
- for j in nvml.nvmlDeviceGetCpuAffinity(handle, num_elements):
- # assume nvml returns list of 64 bit ints
- affinity_string = f"{j:064b}{affinity_string}"
- affinity_list = [int(x) for x in affinity_string]
- affinity_list.reverse() # so core 0 is the 0th element
- affinity_to_set = [i for i, e in enumerate(affinity_list) if e != 0]
- os.sched_setaffinity(0, affinity_to_set)
- if verbose:
- cpu_cores = os.sched_getaffinity(0)
- logger.info(f"Assigning {len(cpu_cores)} cpu cores to process {local_process_index}: {cpu_cores}")
- @lru_cache
- def set_numa_affinity(local_process_index: int, verbose: Optional[bool] = None) -> None:
- """
- Assigns the current process to a specific NUMA node. Ideally most efficient when having at least 2 cpus per node.
- This result is cached between calls. If you want to override it, please use
- `accelerate.utils.environment.override_numa_afifnity`.
- Args:
- local_process_index (int):
- The index of the current process on the current server.
- verbose (bool, *optional*):
- Whether to print the new cpu cores assignment for each process. If `ACCELERATE_DEBUG_MODE` is enabled, will
- default to True.
- """
- override_numa_affinity(local_process_index=local_process_index, verbose=verbose)
- @contextmanager
- def clear_environment():
- """
- A context manager that will temporarily clear environment variables.
- When this context exits, the previous environment variables will be back.
- Example:
- ```python
- >>> import os
- >>> from accelerate.utils import clear_environment
- >>> os.environ["FOO"] = "bar"
- >>> with clear_environment():
- ... print(os.environ)
- ... os.environ["FOO"] = "new_bar"
- ... print(os.environ["FOO"])
- {}
- new_bar
- >>> print(os.environ["FOO"])
- bar
- ```
- """
- _old_os_environ = os.environ.copy()
- os.environ.clear()
- try:
- yield
- finally:
- os.environ.clear() # clear any added keys,
- os.environ.update(_old_os_environ) # then restore previous environment
- @contextmanager
- def patch_environment(**kwargs):
- """
- A context manager that will add each keyword argument passed to `os.environ` and remove them when exiting.
- Will convert the values in `kwargs` to strings and upper-case all the keys.
- Example:
- ```python
- >>> import os
- >>> from accelerate.utils import patch_environment
- >>> with patch_environment(FOO="bar"):
- ... print(os.environ["FOO"]) # prints "bar"
- >>> print(os.environ["FOO"]) # raises KeyError
- ```
- """
- existing_vars = {}
- for key, value in kwargs.items():
- key = key.upper()
- if key in os.environ:
- existing_vars[key] = os.environ[key]
- os.environ[key] = str(value)
- try:
- yield
- finally:
- for key in kwargs:
- key = key.upper()
- if key in existing_vars:
- # restore previous value
- os.environ[key] = existing_vars[key]
- else:
- os.environ.pop(key, None)
- def purge_accelerate_environment(func_or_cls):
- """Decorator to clean up accelerate environment variables set by the decorated class or function.
- In some circumstances, calling certain classes or functions can result in accelerate env vars being set and not
- being cleaned up afterwards. As an example, when calling:
- TrainingArguments(fp16=True, ...)
- The following env var will be set:
- ACCELERATE_MIXED_PRECISION=fp16
- This can affect subsequent code, since the env var takes precedence over TrainingArguments(fp16=False). This is
- especially relevant for unit testing, where we want to avoid the individual tests to have side effects on one
- another. Decorate the unit test function or whole class with this decorator to ensure that after each test, the env
- vars are cleaned up. This works for both unittest.TestCase and normal classes (pytest); it also works when
- decorating the parent class.
- """
- prefix = "ACCELERATE_"
- @contextmanager
- def env_var_context():
- # Store existing accelerate env vars
- existing_vars = {k: v for k, v in os.environ.items() if k.startswith(prefix)}
- try:
- yield
- finally:
- # Restore original env vars or remove new ones
- for key in [k for k in os.environ if k.startswith(prefix)]:
- if key in existing_vars:
- os.environ[key] = existing_vars[key]
- else:
- os.environ.pop(key, None)
- def wrap_function(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- with env_var_context():
- return func(*args, **kwargs)
- wrapper._accelerate_is_purged_environment_wrapped = True
- return wrapper
- if not isinstance(func_or_cls, type):
- return wrap_function(func_or_cls)
- # Handle classes by wrapping test methods
- def wrap_test_methods(test_class_instance):
- for name in dir(test_class_instance):
- if name.startswith("test"):
- method = getattr(test_class_instance, name)
- if callable(method) and not hasattr(method, "_accelerate_is_purged_environment_wrapped"):
- setattr(test_class_instance, name, wrap_function(method))
- return test_class_instance
- # Handle inheritance
- wrap_test_methods(func_or_cls)
- func_or_cls.__init_subclass__ = classmethod(lambda cls, **kw: wrap_test_methods(cls))
- return func_or_cls
|