rendezvous.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # mypy: allow-untyped-defs
  2. try:
  3. from urllib.parse import urlparse, urlunparse
  4. except ImportError as e:
  5. raise ImportError(
  6. "urllib cannot be found, urlparse from python2 is no longer supported."
  7. ) from e
  8. import numbers
  9. import os
  10. import sys
  11. from collections.abc import Iterator
  12. from datetime import timedelta
  13. from typing import Callable, Optional
  14. from torch.distributed import FileStore, Store, TCPStore
  15. from .constants import default_pg_timeout
  16. _rendezvous_handlers: dict[str, Callable[..., Iterator[tuple[Store, int, int]]]] = {}
  17. __all__ = ["register_rendezvous_handler", "rendezvous"]
  18. def register_rendezvous_handler(scheme, handler):
  19. """
  20. Register a new rendezvous handler.
  21. Before we can run collective algorithms, participating processes
  22. need to find each other and exchange information to be able to
  23. communicate. We call this process rendezvous.
  24. The outcome of the rendezvous process is a triplet containing a
  25. shared key/value store, the rank of the process, and the total
  26. number of participating processes.
  27. If none of the bundled rendezvous methods apply to your execution
  28. environment you can opt to register your own rendezvous handler.
  29. Pick a unique name and use the URL scheme to identify it when
  30. calling the `rendezvous()` function.
  31. Args:
  32. scheme (str): URL scheme to identify your rendezvous handler.
  33. handler (function): Handler that is invoked when the
  34. `rendezvous()` function is called with a URL that uses
  35. the corresponding scheme. It must be a generator function
  36. that yields the triplet.
  37. """
  38. global _rendezvous_handlers
  39. if scheme in _rendezvous_handlers:
  40. raise RuntimeError(f"Rendezvous handler for {scheme}:// already registered")
  41. _rendezvous_handlers[scheme] = handler
  42. # Query will have format "rank=0&world_size=1" and is
  43. # converted into {"rank": 0, "world_size": 1}
  44. def _query_to_dict(query: str) -> dict[str, str]:
  45. return {
  46. pair[0]: pair[1]
  47. for pair in (pair.split("=") for pair in filter(None, query.split("&")))
  48. }
  49. def _get_use_libuv_from_query_dict(query_dict: dict[str, str]) -> bool:
  50. # libuv is the default backend for TCPStore. To enable the non-libuv backend,
  51. # user can explicitly specify ``use_libuv=0`` in the URL parameter.
  52. if sys.platform == "win32":
  53. # PyTorch is built without libuv support on windows, so default to 0
  54. return query_dict.get("use_libuv", os.environ.get("USE_LIBUV", "0")) == "1"
  55. return query_dict.get("use_libuv", os.environ.get("USE_LIBUV", "1")) == "1"
  56. def _rendezvous_helper(url: str, rank: int, world_size_opt: Optional[int], **kwargs):
  57. result = urlparse(url)
  58. if world_size_opt is None:
  59. world_size = -1
  60. if result.scheme == "env":
  61. rank = int(os.environ.get("RANK", rank))
  62. # If the world_size env variable is not present then it is a dynamic group
  63. world_size = int(os.environ.get("WORLD_SIZE", world_size))
  64. else:
  65. world_size = world_size_opt
  66. if rank != -1 or world_size != -1 or world_size_opt is None:
  67. query_dict = _query_to_dict(result.query)
  68. assert "rank" not in query_dict and "world_size" not in query_dict, (
  69. f"The url: {url} has node-specific arguments(rank, world_size) already."
  70. )
  71. if rank != -1:
  72. query_dict["rank"] = str(rank)
  73. if world_size != -1 or world_size_opt is None:
  74. query_dict["world_size"] = str(world_size)
  75. result = result._replace(
  76. query=f"{'&'.join([f'{k}={v}' for k, v in query_dict.items()])}"
  77. )
  78. url = urlunparse(result)
  79. if result.scheme not in _rendezvous_handlers:
  80. raise RuntimeError(f"No rendezvous handler for {result.scheme}://")
  81. return _rendezvous_handlers[result.scheme](url, **kwargs)
  82. def rendezvous(url: str, rank: int = -1, world_size: int = -1, **kwargs):
  83. if not isinstance(url, (str, bytes)):
  84. raise RuntimeError(f"`url` must be a string. {type(url)}: {url}")
  85. if not isinstance(rank, numbers.Integral):
  86. raise RuntimeError(f"`rank` must be an integer. {rank}")
  87. if not isinstance(world_size, numbers.Integral):
  88. raise RuntimeError(f"`world_size` must be an integer. {world_size}")
  89. return _rendezvous_helper(url, rank, world_size, **kwargs)
  90. def _create_store_from_options(backend_options, rank):
  91. store, _, _ = next(_rendezvous_helper(backend_options.init_method, rank, None))
  92. return store
  93. def _rendezvous_error(msg):
  94. return ValueError("Error initializing torch.distributed using " + msg)
  95. def _file_rendezvous_handler(url: str, **kwargs):
  96. def _error(msg):
  97. return _rendezvous_error("file:// rendezvous: " + msg)
  98. result = urlparse(url)
  99. path = result.path
  100. if sys.platform == "win32":
  101. import urllib.request
  102. full_path = result.netloc + result.path
  103. path = urllib.request.url2pathname(full_path)
  104. if path:
  105. # Normalizing an empty string produces ".", which is not expected.
  106. path = os.path.normpath(path)
  107. if not path:
  108. raise _error("path missing")
  109. query_dict = _query_to_dict(result.query)
  110. if "rank" not in query_dict:
  111. raise _error("rank parameter missing")
  112. if "world_size" not in query_dict:
  113. raise _error("world size parameter missing")
  114. rank = int(query_dict["rank"])
  115. world_size = int(query_dict["world_size"])
  116. store = FileStore(path, world_size)
  117. yield (store, rank, world_size)
  118. # If this configuration is invalidated, there is nothing we can do about it
  119. raise RuntimeError("Unable to perform rerendezvous using file:// method")
  120. def _torchelastic_use_agent_store() -> bool:
  121. return os.environ.get("TORCHELASTIC_USE_AGENT_STORE", None) == str(True)
  122. def _create_c10d_store(
  123. hostname, port, rank, world_size, timeout, use_libuv=True
  124. ) -> Store:
  125. """
  126. Smartly creates a c10d Store object on ``rank`` based on whether we need to reuse agent store.
  127. The TCPStore server is assumed to be hosted
  128. on ``hostname:port``.
  129. By default, the TCPStore server uses the asynchronous implementation
  130. ``LibUVStoreDaemon`` which utilizes libuv.
  131. If ``torchelastic_use_agent_store()`` is ``True``, then it is assumed that
  132. the agent leader (node rank 0) hosts the TCPStore server (for which the
  133. endpoint is specified by the given ``hostname:port``). Hence
  134. ALL ranks will create and return a TCPStore client (e.g. ``start_daemon=False``).
  135. If ``torchelastic_use_agent_store()`` is ``False``, then rank 0 will host
  136. the TCPStore (with multi-tenancy) and it is assumed that rank 0's hostname
  137. and port are correctly passed via ``hostname`` and ``port``. All
  138. non-zero ranks will create and return a TCPStore client.
  139. """
  140. # check if port is uint16_t
  141. if not 0 <= port < 2**16:
  142. raise ValueError(f"port must have value from 0 to 65535 but was {port}.")
  143. if _torchelastic_use_agent_store():
  144. # We create a new TCPStore for every retry so no need to add prefix for each attempt.
  145. return TCPStore(
  146. host_name=hostname,
  147. port=port,
  148. world_size=world_size,
  149. is_master=False,
  150. timeout=timeout,
  151. )
  152. else:
  153. start_daemon = rank == 0
  154. return TCPStore(
  155. host_name=hostname,
  156. port=port,
  157. world_size=world_size,
  158. is_master=start_daemon,
  159. timeout=timeout,
  160. multi_tenant=True,
  161. use_libuv=use_libuv,
  162. )
  163. def _tcp_rendezvous_handler(
  164. url: str, timeout: timedelta = default_pg_timeout, **kwargs
  165. ):
  166. def _error(msg):
  167. return _rendezvous_error("tcp:// rendezvous: " + msg)
  168. result = urlparse(url)
  169. if result.port is None:
  170. raise _error("port number missing")
  171. query_dict = _query_to_dict(result.query)
  172. if "rank" not in query_dict:
  173. raise _error("rank parameter missing")
  174. if "world_size" not in query_dict:
  175. raise _error("world size parameter missing")
  176. rank = int(query_dict["rank"])
  177. world_size = int(query_dict["world_size"])
  178. use_libuv = _get_use_libuv_from_query_dict(query_dict)
  179. assert result.hostname is not None
  180. store = _create_c10d_store(
  181. result.hostname, result.port, rank, world_size, timeout, use_libuv
  182. )
  183. yield (store, rank, world_size)
  184. # If this configuration is invalidated, there is nothing we can do about it
  185. raise RuntimeError("Unable to perform re-rendezvous using tcp:// method")
  186. def _env_rendezvous_handler(
  187. url: str, timeout: timedelta = default_pg_timeout, **kwargs
  188. ):
  189. def _error(msg):
  190. return _rendezvous_error("env:// rendezvous: " + msg)
  191. def _env_error(var):
  192. return _error(f"environment variable {var} expected, but not set")
  193. def _get_env_or_raise(env_var: str) -> str:
  194. env_val = os.environ.get(env_var, None)
  195. if not env_val:
  196. raise _env_error(env_var)
  197. else:
  198. return env_val
  199. result = urlparse(url)
  200. query_dict = _query_to_dict(result.query)
  201. rank: int
  202. world_size: int
  203. master_port: int
  204. master_addr: str
  205. if "rank" in query_dict:
  206. rank = int(query_dict["rank"])
  207. else:
  208. rank = int(_get_env_or_raise("RANK"))
  209. if "world_size" in query_dict:
  210. world_size = int(query_dict["world_size"])
  211. else:
  212. world_size = int(_get_env_or_raise("WORLD_SIZE"))
  213. master_addr = _get_env_or_raise("MASTER_ADDR")
  214. master_port = int(_get_env_or_raise("MASTER_PORT"))
  215. use_libuv = _get_use_libuv_from_query_dict(query_dict)
  216. store = _create_c10d_store(
  217. master_addr, master_port, rank, world_size, timeout, use_libuv
  218. )
  219. yield (store, rank, world_size)
  220. # If this configuration is invalidated, there is nothing we can do about it
  221. raise RuntimeError("Unable to perform re-rendezvous using env:// method")
  222. register_rendezvous_handler("tcp", _tcp_rendezvous_handler)
  223. register_rendezvous_handler("env", _env_rendezvous_handler)
  224. register_rendezvous_handler("file", _file_rendezvous_handler)