binding.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675
  1. import os
  2. import shutil
  3. import traceback
  4. from collections import defaultdict
  5. from collections.abc import Callable, Iterable
  6. from dataclasses import asdict, dataclass
  7. from enum import Enum
  8. from functools import wraps
  9. from logging import getLogger
  10. from typing import ParamSpec, TypeVar
  11. import torch
  12. from torch._utils_internal import signpost_event
  13. __all__ = [
  14. "AffinityMode",
  15. "maybe_wrap_command_args_with_numa_binding",
  16. "maybe_wrap_with_numa_binding",
  17. "NumaOptions",
  18. ]
  19. logger = getLogger(__name__)
  20. class AffinityMode(str, Enum):
  21. """
  22. See behavior description for each affinity mode
  23. in torch.distributed.run.
  24. """
  25. NODE = "node"
  26. SOCKET = "socket"
  27. EXCLUSIVE = "exclusive"
  28. CORE_COMPLEX = "core-complex"
  29. @dataclass(frozen=True)
  30. class NumaOptions:
  31. affinity_mode: AffinityMode
  32. """
  33. If true, we will fall back to using the original command/entrypoint if we fail to compute
  34. NUMA bindings.
  35. You should avoid using this option! It is only intended as a safety mechanism for facilitating
  36. mass rollouts of numa binding.
  37. """
  38. should_fall_back_if_binding_fails: bool = False
  39. def maybe_wrap_command_args_with_numa_binding(
  40. command_args: tuple[str, ...],
  41. *,
  42. gpu_index: int,
  43. numa_options: NumaOptions | None,
  44. ) -> tuple[str, ...]:
  45. """
  46. Wraps command arguments with numactl to apply NUMA CPU binding.
  47. This function prepends numactl with appropriate CPU affinity flags to the
  48. provided command arguments, binding the process to CPUs associated with
  49. the specified GPU's NUMA node.
  50. Args:
  51. command_args: The original command arguments to wrap.
  52. gpu_index: The index of the GPU that will be used by the subprocess.
  53. numa_options: Configuration for NUMA binding behavior. If None, returns
  54. the original command_args unchanged.
  55. Returns:
  56. Tuple of command arguments, potentially wrapped with numactl for NUMA binding.
  57. Returns the original command_args if numa_options is None or if binding fails
  58. and fallback is enabled.
  59. """
  60. if numa_options is None:
  61. return command_args
  62. kwargs = {
  63. "command_args": command_args,
  64. "gpu_index": gpu_index,
  65. "numa_options": asdict(numa_options),
  66. }
  67. try:
  68. logical_cpu_indices = _get_validated_logical_cpus_to_bind_to(
  69. gpu_index=gpu_index,
  70. numa_options=numa_options,
  71. )
  72. wrapped_command_args = _assemble_numactl_command_args(
  73. original_command_args=command_args,
  74. logical_cpu_indices=logical_cpu_indices,
  75. )
  76. signpost_event(
  77. category="numa_binding",
  78. name="apply_success",
  79. parameters={
  80. **kwargs,
  81. "wrapped_command": wrapped_command_args,
  82. },
  83. )
  84. return wrapped_command_args
  85. except Exception:
  86. # pyrefly: ignore [bad-argument-type]
  87. _handle_exception(numa_options=numa_options, logger_kwargs=kwargs)
  88. return command_args
  89. _TParams = ParamSpec("_TParams")
  90. _TReturn = TypeVar("_TReturn")
  91. def maybe_wrap_with_numa_binding(
  92. func: Callable[_TParams, _TReturn],
  93. *,
  94. gpu_index: int,
  95. numa_options: NumaOptions | None,
  96. ) -> Callable[_TParams, _TReturn]:
  97. """
  98. Wraps a function to apply NUMA CPU binding before execution.
  99. This decorator applies NUMA CPU affinity to all threads in the current process
  100. before calling the wrapped function, binding them to CPUs associated with the
  101. specified GPU's NUMA node.
  102. Args:
  103. func: The function to wrap with NUMA binding.
  104. gpu_index: The index of the GPU that will be used.
  105. numa_options: Configuration for NUMA binding behavior. If None, returns
  106. the original function unchanged.
  107. Returns:
  108. A wrapped function that applies NUMA binding before execution, or the
  109. original function if numa_options is None.
  110. """
  111. if numa_options is None:
  112. return func
  113. @wraps(func)
  114. def wrapped(*args: _TParams.args, **kwargs: _TParams.kwargs) -> _TReturn:
  115. _maybe_apply_numa_binding_to_current_process(
  116. gpu_index=gpu_index,
  117. # pyrefly: ignore [bad-argument-type]
  118. numa_options=numa_options,
  119. )
  120. return func(*args, **kwargs)
  121. return wrapped
  122. def _maybe_apply_numa_binding_to_current_process(
  123. *, gpu_index: int, numa_options: NumaOptions
  124. ) -> None:
  125. kwargs = {
  126. "gpu_index": gpu_index,
  127. "numa_options": asdict(numa_options),
  128. }
  129. try:
  130. logical_cpu_indices = _get_validated_logical_cpus_to_bind_to(
  131. gpu_index=gpu_index,
  132. numa_options=numa_options,
  133. )
  134. _bind_all_threads_in_current_process_to_logical_cpus(
  135. logical_cpu_indices=logical_cpu_indices
  136. )
  137. signpost_event(
  138. category="numa_binding",
  139. name="apply_success",
  140. parameters={
  141. **kwargs,
  142. "logical_cpu_indices": _get_ranges_str_from_ints(logical_cpu_indices),
  143. },
  144. )
  145. except Exception:
  146. # pyrefly: ignore [bad-argument-type]
  147. _handle_exception(numa_options=numa_options, logger_kwargs=kwargs)
  148. def _assemble_numactl_command_args(
  149. *, original_command_args: tuple[str, ...], logical_cpu_indices: set[int]
  150. ) -> tuple[str, ...]:
  151. return (
  152. "numactl",
  153. f"--physcpubind={_get_ranges_str_from_ints(logical_cpu_indices)}",
  154. *original_command_args,
  155. )
  156. def _handle_exception(
  157. *, numa_options: NumaOptions, logger_kwargs: dict[str, object]
  158. ) -> None:
  159. signpost_event(
  160. category="numa_binding",
  161. name="apply_exception",
  162. parameters={
  163. **logger_kwargs,
  164. "traceback": traceback.format_exc(),
  165. },
  166. )
  167. logger.exception("Failed to apply NUMA binding for input=%r", logger_kwargs)
  168. if numa_options.should_fall_back_if_binding_fails:
  169. logger.warning(
  170. "Continuing executing without applying NUMA binding, despite exception %s",
  171. traceback.format_exc(),
  172. )
  173. return
  174. # This function is called within an except block, so silence the warning
  175. # about raise without an exception.
  176. raise # noqa: PLE0704
  177. def _get_validated_logical_cpus_to_bind_to(
  178. *,
  179. gpu_index: int,
  180. numa_options: NumaOptions,
  181. ) -> set[int]:
  182. logical_cpu_indices = _get_logical_cpus_to_bind_to(
  183. gpu_index=gpu_index, numa_options=numa_options
  184. )
  185. _raise_if_binding_invalid(logical_cpu_indices=logical_cpu_indices)
  186. return logical_cpu_indices
  187. def _raise_if_binding_invalid(*, logical_cpu_indices: set[int]) -> None:
  188. # NOTE: numactl CLI is only actually necessary for the str entrypoint path,
  189. # but for simplicity we will just check it no matter what.
  190. if shutil.which("numactl") is None:
  191. raise RuntimeError("numactl CLI is required for NUMA binding")
  192. if not logical_cpu_indices:
  193. raise RuntimeError("Must bind to a non-empty set of CPU indices")
  194. def _bind_all_threads_in_current_process_to_logical_cpus(
  195. *, logical_cpu_indices: set[int]
  196. ) -> None:
  197. # Save the original affinity of the main thread before changing it
  198. # pyrefly: ignore [missing-attribute]
  199. original_main_thread_affinity = os.sched_getaffinity(0) # type: ignore[attr-defined]
  200. # 0 represents the current thread.
  201. # This is outside the try/except because the main thread should always bind successfully.
  202. # pyrefly: ignore [missing-attribute]
  203. os.sched_setaffinity(0, logical_cpu_indices) # type: ignore[attr-defined]
  204. for tid_str in os.listdir("/proc/self/task"):
  205. try:
  206. tid = int(tid_str)
  207. # pyrefly: ignore [missing-attribute]
  208. tid_affinity = os.sched_getaffinity(tid) # type: ignore[attr-defined]
  209. # Defensive check to ensure we do not overwrite affinity on any threads
  210. # that have already had their affinity set elsewhere.
  211. if tid_affinity == original_main_thread_affinity:
  212. # pyrefly: ignore [missing-attribute]
  213. os.sched_setaffinity(tid, logical_cpu_indices) # type: ignore[attr-defined]
  214. except Exception:
  215. # Thread may have exited or otherwise become invalid
  216. pass
  217. def _get_logical_cpus_to_bind_to(
  218. *,
  219. gpu_index: int,
  220. numa_options: NumaOptions,
  221. ) -> set[int]:
  222. """
  223. Args:
  224. gpu_index: The index of the GPU that will be used by the subprocess.
  225. Example: 0
  226. numa_options: See NumaOptions for details.
  227. Returns:
  228. Set of logical CPU indices to bind to.
  229. """
  230. if numa_options.affinity_mode == AffinityMode.NODE:
  231. logical_cpus = _node_get_logical_cpus_to_bind_to(gpu_index=gpu_index)
  232. elif numa_options.affinity_mode == AffinityMode.SOCKET:
  233. logical_cpus = _socket_get_logical_cpus_to_bind_to(gpu_index=gpu_index)
  234. elif numa_options.affinity_mode == AffinityMode.EXCLUSIVE:
  235. logical_cpus = _exclusive_get_logical_cpus_to_bind_to(gpu_index=gpu_index)
  236. elif numa_options.affinity_mode == AffinityMode.CORE_COMPLEX:
  237. logical_cpus = _core_complex_get_logical_cpus_to_bind_to(gpu_index=gpu_index)
  238. else:
  239. raise ValueError(f"Affinity mode {numa_options.affinity_mode} not supported.")
  240. return logical_cpus
  241. def _node_get_logical_cpus_to_bind_to(*, gpu_index: int) -> set[int]:
  242. """
  243. Core logic of 'node' numa strategy.
  244. """
  245. numa_node_index = _get_numa_node_index_for_gpu_index(gpu_index=gpu_index)
  246. return _get_allowed_logical_cpu_indices_for_numa_node(
  247. numa_node_index=numa_node_index
  248. )
  249. def _socket_get_logical_cpus_to_bind_to(*, gpu_index: int) -> set[int]:
  250. """
  251. Core logic of 'socket' numa strategy.
  252. """
  253. numa_node_index_of_gpu = _get_numa_node_index_for_gpu_index(gpu_index=gpu_index)
  254. socket_index = _get_socket_index_for_numa_node(
  255. numa_node_index=numa_node_index_of_gpu
  256. )
  257. numa_node_indices = _get_numa_node_indices_for_socket_index(
  258. socket_index=socket_index
  259. )
  260. logical_cpus = set()
  261. for numa_node_index in numa_node_indices:
  262. logical_cpus.update(
  263. _get_allowed_logical_cpu_indices_for_numa_node(
  264. numa_node_index=numa_node_index
  265. )
  266. )
  267. return logical_cpus
  268. def _exclusive_get_logical_cpus_to_bind_to(*, gpu_index: int) -> set[int]:
  269. """
  270. Core logic of 'exclusive' numa strategy.
  271. """
  272. numa_node_index = _get_numa_node_index_for_gpu_index(gpu_index=gpu_index)
  273. gpu_indices = _get_gpu_indices_for_numa_node(numa_node_index=numa_node_index)
  274. gpu_indices = sorted(gpu_indices)
  275. original_gpu_relative_index = gpu_indices.index(gpu_index)
  276. allowed_logical_cpu_indices = _get_allowed_logical_cpu_indices_for_numa_node(
  277. numa_node_index=numa_node_index
  278. )
  279. # Arbitrarily use the min logical cpu index on the physical core to
  280. # represent the physical core.
  281. physical_core_to_allowed_logical_cpu_indices = _group_by(
  282. allowed_logical_cpu_indices,
  283. lambda logical_cpu_index: min(
  284. _get_logical_cpu_indices_sharing_same_physical_core_as(
  285. logical_cpu_index=logical_cpu_index
  286. )
  287. ),
  288. )
  289. # Sort the dict for consistency (dicts maintain order in Python)
  290. physical_core_to_allowed_logical_cpu_indices = dict(
  291. sorted(physical_core_to_allowed_logical_cpu_indices.items())
  292. )
  293. num_physical_cores_per_gpu = len(
  294. physical_core_to_allowed_logical_cpu_indices
  295. ) // len(gpu_indices)
  296. # Often, the number of physical cores will not be perfectly divisible by the number
  297. # of GPUs. In those cases, give the lowest GPU indices an extra core
  298. num_gpus_to_give_one_extra_physical_core = len(
  299. physical_core_to_allowed_logical_cpu_indices
  300. ) % len(gpu_indices)
  301. if num_physical_cores_per_gpu < 1:
  302. raise RuntimeError(
  303. f"There are only {len(physical_core_to_allowed_logical_cpu_indices)} physical cores on {numa_node_index=},"
  304. + f" but there are {len(gpu_indices)} GPUs associated with this NUMA node."
  305. )
  306. # Compute slice indices for this GPU
  307. start = original_gpu_relative_index * num_physical_cores_per_gpu + min(
  308. original_gpu_relative_index, num_gpus_to_give_one_extra_physical_core
  309. )
  310. end = (
  311. start
  312. + num_physical_cores_per_gpu
  313. + (
  314. 1
  315. if original_gpu_relative_index < num_gpus_to_give_one_extra_physical_core
  316. else 0
  317. )
  318. )
  319. # Slice and flatten the logical CPUs from the selected physical cores
  320. logical_cpu_indices_for_original_gpu = {
  321. logical_cpu_index
  322. for logical_cpu_indices in list(
  323. physical_core_to_allowed_logical_cpu_indices.values()
  324. )[start:end]
  325. for logical_cpu_index in logical_cpu_indices
  326. }
  327. return logical_cpu_indices_for_original_gpu
  328. def _core_complex_get_logical_cpus_to_bind_to(*, gpu_index: int) -> set[int]:
  329. """
  330. Core logic of 'core-complex' numa strategy.
  331. Each GPU is assigned a full core complex (group of cores sharing L3 cache)
  332. within its affined NUMA node.
  333. """
  334. numa_node_index = _get_numa_node_index_for_gpu_index(gpu_index=gpu_index)
  335. gpu_indices = _get_gpu_indices_for_numa_node(numa_node_index=numa_node_index)
  336. gpu_indices = sorted(gpu_indices)
  337. original_gpu_relative_index = gpu_indices.index(gpu_index)
  338. allowed_logical_cpu_indices = _get_allowed_logical_cpu_indices_for_numa_node(
  339. numa_node_index=numa_node_index
  340. )
  341. # Arbitrarily use the min logical cpu index on the max level cache
  342. # to represent the max level cache.
  343. max_level_cache_to_allowed_logical_cpu_indices = _group_by(
  344. allowed_logical_cpu_indices,
  345. lambda logical_cpu_index: min(
  346. _get_logical_cpus_sharing_same_max_level_cache_as(
  347. logical_cpu_index=logical_cpu_index
  348. )
  349. ),
  350. )
  351. max_level_cache_to_allowed_logical_cpu_indices = dict(
  352. sorted(
  353. max_level_cache_to_allowed_logical_cpu_indices.items(),
  354. # First, prioritize caches with more available cpus
  355. # Second, prioritize lower index cpus (just for clarity/consistency)
  356. key=lambda item: (-len(item[1]), item[0]),
  357. )
  358. )
  359. cache_index_for_original_gpu = original_gpu_relative_index % len(
  360. max_level_cache_to_allowed_logical_cpu_indices
  361. )
  362. logical_cpu_indices_for_original_gpu = list(
  363. max_level_cache_to_allowed_logical_cpu_indices.values()
  364. )[cache_index_for_original_gpu]
  365. return logical_cpu_indices_for_original_gpu
  366. K = TypeVar("K")
  367. V = TypeVar("V")
  368. def _group_by(values: Iterable[V], get_key: Callable[[V], K]) -> dict[K, set[V]]:
  369. """
  370. Groups elements with same key into sets.
  371. """
  372. key_to_values: defaultdict[K, set[V]] = defaultdict(set)
  373. for value in values:
  374. key = get_key(value)
  375. key_to_values[key].add(value)
  376. return key_to_values
  377. def _get_logical_cpu_indices_sharing_same_physical_core_as(
  378. *, logical_cpu_index: int
  379. ) -> set[int]:
  380. thread_siblings_list_absolute_path = (
  381. f"/sys/devices/system/cpu/cpu{logical_cpu_index}/topology/thread_siblings_list"
  382. )
  383. with open(thread_siblings_list_absolute_path) as f:
  384. return _get_set_of_int_from_ranges_str(f.read())
  385. def _get_logical_cpus_sharing_same_max_level_cache_as(
  386. *, logical_cpu_index: int
  387. ) -> set[int]:
  388. cpu_cache_dir_absolute_path = (
  389. f"/sys/devices/system/cpu/cpu{logical_cpu_index}/cache"
  390. )
  391. max_level = -1
  392. logical_cpus_sharing_max_level_cache = set()
  393. for entry in os.listdir(cpu_cache_dir_absolute_path):
  394. if not entry.startswith("index") or not entry[5:].isdecimal():
  395. continue
  396. cache_index_absolute_path = os.path.join(cpu_cache_dir_absolute_path, entry)
  397. # Filter out other cache types like Instruction
  398. type_absolute_path = os.path.join(cache_index_absolute_path, "type")
  399. with open(type_absolute_path) as type_file:
  400. if type_file.read().strip() not in {"Unified", "Data"}:
  401. continue
  402. level_absolute_path = os.path.join(cache_index_absolute_path, "level")
  403. with open(level_absolute_path) as level_file:
  404. level = int(level_file.read())
  405. if level <= max_level:
  406. continue
  407. max_level = level
  408. shared_cpu_list_absolute_path = os.path.join(
  409. cache_index_absolute_path, "shared_cpu_list"
  410. )
  411. with open(shared_cpu_list_absolute_path) as share_cpu_list_file:
  412. logical_cpus_sharing_max_level_cache = _get_set_of_int_from_ranges_str(
  413. share_cpu_list_file.read()
  414. )
  415. return logical_cpus_sharing_max_level_cache
  416. def _get_allowed_logical_cpu_indices_for_numa_node(*, numa_node_index: int) -> set[int]:
  417. all_cpu_indices = _get_cpu_indices_for_numa_node_MAYBE_NOT_ALLOWED(
  418. numa_node_index=numa_node_index
  419. )
  420. allowed_cpu_indices = _get_allowed_cpu_indices_for_current_thread()
  421. return all_cpu_indices & allowed_cpu_indices
  422. def _get_cpu_indices_for_numa_node_MAYBE_NOT_ALLOWED(
  423. *, numa_node_index: int
  424. ) -> set[int]:
  425. """
  426. Returns:
  427. Indices of all CPUs associated with numa_node_index. However, the list
  428. is not filtered based on whether the thread is allowed to use them.
  429. """
  430. cpulist_absolute_path = f"/sys/devices/system/node/node{numa_node_index}/cpulist"
  431. try:
  432. with open(cpulist_absolute_path) as f:
  433. cpu_range_str = f.read()
  434. except FileNotFoundError as e:
  435. raise RuntimeError(
  436. f"Could not determine CPUs corresponding to {numa_node_index=}."
  437. ) from e
  438. return _get_set_of_int_from_ranges_str(cpu_range_str)
  439. def _get_gpu_count() -> int:
  440. return torch.cuda.device_count()
  441. def _get_numa_node_index_for_gpu_index(*, gpu_index: int) -> int:
  442. device_properties = torch.cuda.get_device_properties(gpu_index)
  443. domain = device_properties.pci_domain_id # type: ignore[attr-defined]
  444. bus = device_properties.pci_bus_id # type: ignore[attr-defined]
  445. device = device_properties.pci_device_id # type: ignore[attr-defined]
  446. # Format to sysfs PCI address: "0000:dc:00.0"
  447. pci_addr = f"{domain:04x}:{bus:02x}:{device:02x}.0"
  448. pci_numa_node_absolute_path = f"/sys/bus/pci/devices/{pci_addr}/numa_node"
  449. with open(pci_numa_node_absolute_path) as f:
  450. # In systems with only one NUMA node, this will
  451. # often be saved as -1. In those cases, there is obviously
  452. # at least one numa node, 0, so we use that.
  453. return max(int(f.read().strip()), 0)
  454. def _get_gpu_indices_for_numa_node(*, numa_node_index: int) -> set[int]:
  455. return {
  456. gpu_index
  457. for gpu_index in range(_get_gpu_count())
  458. if _get_numa_node_index_for_gpu_index(gpu_index=gpu_index) == numa_node_index
  459. }
  460. def _get_socket_index_for_numa_node(*, numa_node_index: int) -> int:
  461. arbitrary_cpu_index = _get_arbitrary_allowed_cpu_index_for_numa_node(
  462. numa_node_index=numa_node_index
  463. )
  464. return _get_socket_index_for_cpu(cpu_index=arbitrary_cpu_index)
  465. def _get_socket_index_for_cpu(*, cpu_index: int) -> int:
  466. package_id_absolute_path = (
  467. f"/sys/devices/system/cpu/cpu{cpu_index}/topology/physical_package_id"
  468. )
  469. try:
  470. with open(package_id_absolute_path) as f:
  471. return int(f.read().strip())
  472. except FileNotFoundError as e:
  473. raise RuntimeError(f"Could not determine socket for {cpu_index=}") from e
  474. def _get_arbitrary_allowed_cpu_index_for_numa_node(*, numa_node_index: int) -> int:
  475. return min(
  476. _get_allowed_logical_cpu_indices_for_numa_node(numa_node_index=numa_node_index)
  477. )
  478. def _get_set_of_int_from_ranges_str(ranges_str: str) -> set[int]:
  479. """
  480. Util for parsing a string of int ranges, as in a sysfs file.
  481. Args:
  482. ranges_str: E.g., "0-2,4,6-7"
  483. Returns:
  484. E.g., {0, 1, 2, 4, 6, 7}
  485. """
  486. ints: set[int] = set()
  487. for range_str in ranges_str.split(","):
  488. range_str = range_str.strip()
  489. if not range_str:
  490. continue
  491. if "-" in range_str:
  492. start_str, end_str = range_str.split("-")
  493. start, end = int(start_str), int(end_str)
  494. ints.update(range(start, end + 1))
  495. else:
  496. ints.add(int(range_str))
  497. return ints
  498. def _get_ranges_str_from_ints(ints: Iterable[int]) -> str:
  499. """
  500. Convert a set of integers to a compact string with ranges.
  501. Args:
  502. ints: E.g., {0, 1, 2, 4, 6, 7}
  503. Returns:
  504. E.g., "0-2,4,6-7"
  505. """
  506. if not ints:
  507. return ""
  508. sorted_ints = sorted(ints)
  509. ranges = []
  510. start = prev = sorted_ints[0]
  511. for num in sorted_ints[1:]:
  512. if num == prev + 1:
  513. prev = num
  514. else:
  515. if start == prev:
  516. ranges.append(f"{start}")
  517. else:
  518. ranges.append(f"{start}-{prev}")
  519. start = prev = num
  520. # Append the last range
  521. if start == prev:
  522. ranges.append(f"{start}")
  523. else:
  524. ranges.append(f"{start}-{prev}")
  525. return ",".join(ranges)
  526. def _get_systemwide_numa_node_indices() -> set[int]:
  527. with open("/sys/devices/system/node/possible") as f:
  528. possible_nodes_str = f.read()
  529. return _get_set_of_int_from_ranges_str(possible_nodes_str)
  530. def _get_numa_node_indices_for_socket_index(*, socket_index: int) -> set[int]:
  531. systemwide_numa_node_indices = _get_systemwide_numa_node_indices()
  532. matching_numa_node_indices = set()
  533. for numa_node_index in systemwide_numa_node_indices:
  534. arbitrary_cpu_index = _get_arbitrary_allowed_cpu_index_for_numa_node(
  535. numa_node_index=numa_node_index
  536. )
  537. if socket_index == _get_socket_index_for_cpu(cpu_index=arbitrary_cpu_index):
  538. matching_numa_node_indices.add(numa_node_index)
  539. return matching_numa_node_indices
  540. def _get_allowed_cpu_indices_for_current_thread() -> set[int]:
  541. # 0 denotes current thread
  542. # pyrefly: ignore [missing-attribute]
  543. return os.sched_getaffinity(0) # type:ignore[attr-defined]