spawn.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618
  1. # Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import multiprocessing
  15. import os
  16. import signal
  17. import sys
  18. import warnings
  19. # deprecated module import
  20. # (TODO: GhostScreaming) It will be removed later.
  21. from paddle.base import core
  22. from paddle.device import get_device
  23. from paddle.distributed.cloud_utils import (
  24. _get_trainers_num,
  25. get_cluster_and_pod,
  26. )
  27. from paddle.distributed.fleet.cloud_utils import use_paddlecloud
  28. from paddle.distributed.fleet.launch import get_cluster_from_args
  29. from paddle.distributed.fleet.launch_utils import (
  30. DeviceMode,
  31. block_windows_and_macos,
  32. check_backend,
  33. )
  34. from paddle.distributed.utils.launch_utils import (
  35. _prepare_trainer_env,
  36. _print_arguments,
  37. get_host_name_ip,
  38. )
  39. from paddle.framework import set_flags
  40. __all__ = []
  41. class ParallelEnvArgs:
  42. def __init__(self):
  43. # Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..
  44. self.cluster_node_ips = None
  45. # The current node ip.
  46. self.node_ip = None
  47. # whether to use paddlecloud platform to run your multi-process job.
  48. # If false, no need to set this argument.
  49. self.use_paddlecloud = None
  50. # The trainer's started port on a single node
  51. self.started_port = None
  52. # Print the config or not
  53. self.print_config = True
  54. # It's for gpu training and the training process will run
  55. # on the selected_devices, each process is bound to a single GPU.
  56. # And if it's not set, this module will use all the gpu cards
  57. # for training.
  58. self.selected_devices = None
  59. def _options_valid_check(options):
  60. # `print_config` keeped as a debug options, not show to users
  61. supported_options = [
  62. 'start_method',
  63. 'ips',
  64. 'gpus',
  65. 'xpus',
  66. 'print_config',
  67. 'backend',
  68. ]
  69. deprecated_options = [
  70. 'selected_devices',
  71. 'started_port',
  72. 'cluster_node_ips',
  73. 'node_ip',
  74. 'use_paddlecloud',
  75. ]
  76. for key in options:
  77. if key not in supported_options:
  78. if key in deprecated_options:
  79. warnings.warn(
  80. "The config option (%s) of `paddle.distributed.spawn` is deprecated. "
  81. "Please use the latest config options stated in the `spawn` API documentation."
  82. % key,
  83. DeprecationWarning,
  84. )
  85. else:
  86. raise ValueError(
  87. "The config option (%s) of `paddle.distributed.spawn` is not supported."
  88. % key
  89. )
  90. def _get_default_nprocs():
  91. device = get_device()
  92. if 'gpu' in device:
  93. return core.get_cuda_device_count()
  94. elif 'xpu' in device:
  95. return core.get_xpu_device_count()
  96. elif 'cpu' in device:
  97. return multiprocessing.cpu_count()
  98. elif device in core.get_available_custom_device():
  99. return core.get_custom_device_count(device.split(":")[0])
  100. else:
  101. raise RuntimeError(
  102. f"`paddle.distributed.spawn` does not support parallel training on device `{device}` now."
  103. )
  104. def _get_default_backend():
  105. device = get_device()
  106. if 'gpu' in device:
  107. return 'nccl'
  108. elif 'xpu' in device:
  109. return 'bkcl'
  110. elif 'cpu' in device:
  111. return 'gloo'
  112. elif device in core.get_available_custom_device():
  113. return 'xccl'
  114. else:
  115. raise RuntimeError(
  116. f"`paddle.distributed.spawn` does not support parallel training on device `{device}` now."
  117. )
  118. def _get_node_ip(ips):
  119. node_ip = None
  120. node_ips = [x.strip() for x in ips.split(',')]
  121. if len(node_ips) == 1:
  122. node_ip = node_ips[0]
  123. else:
  124. _, node_ip = get_host_name_ip()
  125. return node_ip
  126. def _get_subprocess_env_list(nprocs, options):
  127. # NOTE (xiongkun03) Why put backend deduction here ?
  128. # Because _get_subprocess_env_list is used by many testcases.
  129. # So for compatibility, we put backend deduction here
  130. # logic for handle backend option
  131. if 'backend' not in options or options['backend'] == 'auto':
  132. options['backend'] = _get_default_backend()
  133. check_backend(options['backend'])
  134. block_windows_and_macos(options['backend'])
  135. # construct processes env list
  136. processes_env_list = []
  137. # get args from kwargs
  138. args = ParallelEnvArgs()
  139. # deal with `ips`
  140. args.cluster_node_ips = options.get('ips', None)
  141. if args.cluster_node_ips is None:
  142. args.cluster_node_ips = options.get('cluster_node_ips', None)
  143. if args.cluster_node_ips is None:
  144. args.cluster_node_ips = "127.0.0.1"
  145. # deal with `gpus` or `xpus`
  146. # set default selected devices(gpus or xpus)
  147. # e.g. if the nprocs is 4, the selected gpus is "0,1,2,3"
  148. # NOTE(chenweihang): [ why not use FLAGS_selected_gpus or FLAGS_selected_xpus directly? ]
  149. # because the FLAGS_selected_gpus or FLAGS_selected_xpus may be used in other place,
  150. # if we set FLAGS_selected_gpus or FLAGS_selected_xpus to be `0,1,2,3`, it may cause error
  151. # when using `ParallelEnv`
  152. # NOTE(chenweihang): use absolute gpu or xpu card id
  153. if options['backend'] == 'nccl':
  154. args.selected_devices = options.get('gpus', None)
  155. if args.selected_devices is None:
  156. args.selected_devices = options.get('selected_devices', None)
  157. env_devices = os.getenv("CUDA_VISIBLE_DEVICES", None)
  158. if env_devices is None or env_devices == "":
  159. env_devices_list = [
  160. str(x) for x in range(core.get_cuda_device_count())
  161. ]
  162. else:
  163. env_devices_list = env_devices.split(',')
  164. if args.selected_devices is None:
  165. if len(env_devices_list) < nprocs:
  166. raise RuntimeError(
  167. "the number of visible devices(%d) is less than the number "
  168. "of spawn processes(%d), please ensure that the correct "
  169. "`nprocs` argument is passed or the environment variable "
  170. "`CUDA_VISIBLE_DEVICES` is correctly configured."
  171. % (len(env_devices_list), nprocs)
  172. )
  173. args.selected_devices = ",".join(
  174. [str(env_devices_list[x]) for x in range(0, nprocs)]
  175. )
  176. else:
  177. selected_device_list = args.selected_devices.split(',')
  178. if len(selected_device_list) != nprocs:
  179. raise ValueError(
  180. "The number of selected devices(%s) is not equal to "
  181. "the number of spawn processes(%d), please ensure that the "
  182. "correct `nprocs` and `gpus` arguments are passed."
  183. % (len(selected_device_list), nprocs)
  184. )
  185. for card_id in selected_device_list:
  186. if card_id not in env_devices_list:
  187. raise ValueError(
  188. "The selected gpu card {} cannot found in "
  189. "CUDA_VISIBLE_DEVICES ({}).".format(
  190. card_id, ",".join(env_devices_list)
  191. )
  192. )
  193. elif options['backend'] == 'bkcl':
  194. args.selected_devices = options.get('xpus', None)
  195. if args.selected_devices is None:
  196. args.selected_devices = options.get('selected_devices', None)
  197. env_devices = os.getenv("XPU_VISIBLE_DEVICES", None)
  198. if env_devices is None or env_devices == "":
  199. env_devices_list = [
  200. str(x) for x in range(core.get_xpu_device_count())
  201. ]
  202. else:
  203. env_devices_list = env_devices.split(',')
  204. if args.selected_devices is None:
  205. if len(env_devices_list) < nprocs:
  206. raise RuntimeError(
  207. "the number of visible devices(%d) is less than the number "
  208. "of spawn processes(%d), please ensure that the correct "
  209. "`nprocs` argument is passed or the environment variable "
  210. "`XPU_VISIBLE_DEVICES` is correctly configured."
  211. % (len(env_devices_list), nprocs)
  212. )
  213. args.selected_devices = ",".join(
  214. [str(env_devices_list[x]) for x in range(0, nprocs)]
  215. )
  216. else:
  217. selected_device_list = args.selected_devices.split(',')
  218. if len(selected_device_list) != nprocs:
  219. raise ValueError(
  220. "The number of selected devices(%s) is not equal to "
  221. "the number of spawn processes(%d), please ensure that the "
  222. "correct `nprocs` and `xpus` arguments are passed."
  223. % (len(selected_device_list), nprocs)
  224. )
  225. for card_id in selected_device_list:
  226. if card_id not in env_devices_list:
  227. raise ValueError(
  228. "The selected xpu card {} cannot found in "
  229. "XPU_VISIBLE_DEVICES ({}).".format(
  230. card_id, ",".join(env_devices_list)
  231. )
  232. )
  233. elif options['backend'] == 'gloo':
  234. # TODO check gpu / xpu flag must not exist
  235. warnings.warn(
  236. "Your model will be trained under CPUONLY mode by using GLOO,"
  237. "because CPUPlace is specified manually or your installed PaddlePaddle only support CPU Device."
  238. )
  239. args.paddle_cpuonly = True
  240. args.selected_devices = None
  241. args.ips = args.cluster_node_ips
  242. assert (
  243. options.get('use_paddlecloud', None) is None
  244. ), "CPUONLY spawn doesn't support use paddle cloud"
  245. assert (
  246. len(args.cluster_node_ips.split(',')) <= 1
  247. ), "CPUONLY spawn only support single trainer, that is len(ips)=1, but got %s."
  248. assert (
  249. _get_trainers_num() == 1
  250. ), "CPUONLY spawn doesn't support multi-trainer"
  251. elif options['backend'] == 'xccl':
  252. args.selected_devices = None
  253. custom_device_name = core.get_all_custom_device_type()[0]
  254. env_devices = os.getenv(f"FLAGS_selected_{custom_device_name}s", None)
  255. if env_devices is None or env_devices == "":
  256. env_devices_list = [
  257. str(x)
  258. for x in range(core.get_custom_device_count(custom_device_name))
  259. ]
  260. else:
  261. env_devices_list = env_devices.split(',')
  262. if len(env_devices_list) < nprocs:
  263. raise RuntimeError(
  264. "the number of visible devices(%d) is less than the number "
  265. "of spawn processes(%d), please ensure that the correct "
  266. "`nprocs` argument is passed or the environment variable "
  267. "`FLAGS_selected_%ss` is correctly configured."
  268. % (len(env_devices_list), nprocs, custom_device_name)
  269. )
  270. args.selected_devices = ",".join(
  271. [str(env_devices_list[x]) for x in range(0, nprocs)]
  272. )
  273. # set other inner args
  274. args.node_ip = options.get('node_ip', None)
  275. if args.node_ip is None:
  276. args.node_ip = _get_node_ip(args.cluster_node_ips)
  277. args.started_port = options.get('started_port', None)
  278. args.use_paddlecloud = options.get('use_paddlecloud', None)
  279. if args.use_paddlecloud is None:
  280. args.use_paddlecloud = use_paddlecloud()
  281. # get cluster and pod config
  282. if options['backend'] == 'gloo':
  283. devices_per_proc = list(range(0, nprocs))
  284. cluster, pod = get_cluster_from_args(
  285. args, DeviceMode.CPU, devices_per_proc
  286. )
  287. else:
  288. cluster, pod = get_cluster_and_pod(args)
  289. # prepare subprocess env list
  290. for trainer in pod.trainers:
  291. processes_env_list.append(
  292. _prepare_trainer_env(cluster, trainer, options['backend'])
  293. )
  294. # [Debug] print config
  295. args.print_config = options.get('print_config', False)
  296. if args.print_config:
  297. _print_arguments(args)
  298. return processes_env_list
  299. def _remove_risky_env():
  300. # remove useless env vars
  301. # no copy, each process will hold env vars itself
  302. os.environ.pop("http_proxy", None)
  303. os.environ.pop("https_proxy", None)
  304. def _set_trainer_env(env_dict, backend):
  305. # NOTE(chenweihang): [ Why need set FLAGS_selected_gpus or FLAGS_selected_xpus here? ]
  306. # When the child process starts, it will inherit the configuration of the
  307. # main process and set the FLAGS once, but the environment variable has
  308. # not been set at this time, which leads to the FLAGS_selected_gpus or FLAGS_selected_xpus
  309. # is keep same with mainprocess(usually empty), so manually update the flags here
  310. # NOTE(xiongkun): why put backend here? because if gloo, we shouldn't set FLAGS_selectedXXX
  311. #
  312. if backend == 'nccl':
  313. set_flags({'FLAGS_selected_gpus': env_dict['FLAGS_selected_gpus']})
  314. elif backend == 'bkcl':
  315. set_flags({'FLAGS_selected_xpus': env_dict['FLAGS_selected_xpus']})
  316. else:
  317. # NOTE(xiongkun) why not raise Error ?
  318. # So far, we added support for CPU parallel, and will be applied when paddle is not
  319. # compiled with cuda or xp. just do nothing.
  320. pass
  321. for var_name in env_dict:
  322. os.environ[var_name] = env_dict[var_name]
  323. def _func_wrapper(func, args, error_queue, return_queue, env_dict, backend):
  324. try:
  325. # config subprocess environment variables
  326. _remove_risky_env()
  327. _set_trainer_env(env_dict, backend)
  328. # execute function
  329. result = func(*args)
  330. # record function return value
  331. return_queue.put(result)
  332. except KeyboardInterrupt:
  333. pass
  334. except Exception:
  335. import traceback
  336. error_queue.put(traceback.format_exc())
  337. sys.exit(1)
  338. class MultiprocessContext:
  339. def __init__(self, processes, error_queues, return_queues):
  340. self.error_queues = error_queues
  341. # NOTE(chenweihang): The `spawn` method is mainly used
  342. # to wrap the outermost execution function of the program for
  343. # parallel execution. Generally, the return value is not concerned,
  344. # but if the user needs to obtain the return value, users can get
  345. # the return result of each process from context.return_queues
  346. self.return_queues = return_queues
  347. self.processes = processes
  348. self.sentinels = {
  349. process.sentinel: index for index, process in enumerate(processes)
  350. }
  351. def join(self, timeout=None):
  352. if len(self.sentinels) == 0:
  353. return True
  354. ready = multiprocessing.connection.wait(
  355. self.sentinels.keys(), timeout=timeout
  356. )
  357. error_index = None
  358. for sentinel in ready:
  359. index = self.sentinels.pop(sentinel)
  360. process = self.processes[index]
  361. process.join()
  362. if process.exitcode != 0:
  363. error_index = index
  364. break
  365. if error_index is None:
  366. return len(self.sentinels) == 0
  367. for process in self.processes:
  368. if process.is_alive():
  369. process.terminate()
  370. process.join()
  371. self._throw_exception(error_index)
  372. def _throw_exception(self, error_index):
  373. if self.error_queues[error_index].empty():
  374. exitcode = self.processes[error_index].exitcode
  375. if exitcode < 0:
  376. name = signal.Signals(-exitcode).name
  377. raise Exception(
  378. "Process %d terminated with signal %s."
  379. % (error_index, name)
  380. )
  381. else:
  382. raise Exception(
  383. "Process %d terminated with exit code %d."
  384. % (error_index, exitcode)
  385. )
  386. original_trace = self.error_queues[error_index].get()
  387. msg = (
  388. "\n\n----------------------------------------------\n"
  389. "Process %d terminated with the following error:\n"
  390. "----------------------------------------------\n\n" % error_index
  391. )
  392. msg += original_trace
  393. raise Exception(msg)
  394. def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
  395. """
  396. Start multiple processes with ``spawn`` method for parallel training.
  397. .. note::
  398. ``spawn`` now only supports GPU or XPU collective mode. The collective mode
  399. of GPU and XPU cannot be started at the same time, so the option `gpus` and
  400. `xpus` cannot be configured at the same time.
  401. Args:
  402. func (function): The target function is called by spawned process.
  403. This function need to be able to pickled, so it must be defined
  404. at the top level of a module.
  405. args (list|tuple, optional): Arguments passed to ``func``.
  406. nprocs (int, optional): Number of processed to start. Default: -1.
  407. when nprocs is -1, the available device will be obtained from
  408. the environment variable when the model is executed: If use GPU,
  409. the currently available device ID is obtained from the environment
  410. variable CUDA_VISIBLE_DEVICES; If use XPU, the currently available
  411. device ID is obtained from the environment variable XPU_VISIBLE_DEVICES.
  412. join (bool, optional): Perform a blocking join on all spawned processes.
  413. Default: True.
  414. daemon (bool, optional): The spawned processes' daemon flag. Default: False.
  415. **options(dict, optional): Other initial parallel execution environment
  416. configuration options. The following options are currently supported:
  417. (1) start_method (string): the way to start a process.
  418. The start method can be ``spawn`` , ``fork`` , ``forkserver`` .
  419. Because the CUDA runtime does not support the ``fork`` start method,
  420. when use CUDA in subprocesses, we should start process by ``spawn``
  421. or ``forkserver`` method. Default: "spawn" ;
  422. (2) gpus (string): The training process will run on the
  423. selected gpus, such as "0,1,2,3". Default: None;
  424. (3) xpus (string): The training process will run on the
  425. selected xpus, such as "0,1,2,3". Default: None;
  426. (5) ips (string): Paddle cluster nodes ips, such as
  427. "192.168.0.16,192.168.0.17". Default: "127.0.0.1" .
  428. Returns:
  429. ``MultiprocessContext`` object, it hold the spawned processes.
  430. Examples:
  431. .. code-block:: python
  432. >>> # doctest: +REQUIRES(env:DISTRIBUTED)
  433. >>> import paddle
  434. >>> import paddle.nn as nn
  435. >>> import paddle.optimizer as opt
  436. >>> import paddle.distributed as dist
  437. >>> class LinearNet(nn.Layer):
  438. ... def __init__(self):
  439. ... super().__init__()
  440. ... self._linear1 = nn.Linear(10, 10)
  441. ... self._linear2 = nn.Linear(10, 1)
  442. ... def forward(self, x):
  443. ... return self._linear2(self._linear1(x))
  444. >>> def train(print_result=False):
  445. ... # 1. initialize parallel environment
  446. ... group = dist.init_parallel_env()
  447. ... process_group = group.process_group if group else None
  448. ... # 2. create data parallel layer & optimizer
  449. ... layer = LinearNet()
  450. ... dp_layer = paddle.DataParallel(layer, group = process_group)
  451. ... loss_fn = nn.MSELoss()
  452. ... adam = opt.Adam(
  453. ... learning_rate=0.001, parameters=dp_layer.parameters())
  454. ... # 3. run layer
  455. ... inputs = paddle.randn([10, 10], 'float32')
  456. ... outputs = dp_layer(inputs)
  457. ... labels = paddle.randn([10, 1], 'float32')
  458. ... loss = loss_fn(outputs, labels)
  459. ... if print_result is True:
  460. ... print("loss:", loss.numpy())
  461. ... loss.backward()
  462. ... adam.step()
  463. ... adam.clear_grad()
  464. >>> # Usage 1: only pass function.
  465. >>> # If your training method no need any argument, and
  466. >>> # use all visible devices for parallel training.
  467. >>> if __name__ == '__main__':
  468. ... dist.spawn(train)
  469. >>> # Usage 2: pass function and arguments.
  470. >>> # If your training method need some arguments, and
  471. >>> # use all visible devices for parallel training.
  472. >>> if __name__ == '__main__':
  473. ... dist.spawn(train, args=(True,))
  474. >>> # Usage 3: pass function, arguments and nprocs.
  475. >>> # If your training method need some arguments, and
  476. >>> # only use part of visible devices for parallel training.
  477. >>> # If your machine hold 8 cards {0,1,2,3,4,5,6,7},
  478. >>> # this case will use cards {0,1}; If you set
  479. >>> # CUDA_VISIBLE_DEVICES=4,5,6,7, this case will use
  480. >>> # cards {4,5}
  481. >>> if __name__ == '__main__':
  482. ... dist.spawn(train, args=(True,), nprocs=2)
  483. >>> # Usage 4: pass function, arguments, nprocs and gpus.
  484. >>> # If your training method need some arguments, and
  485. >>> # only use part of visible devices for parallel training,
  486. >>> # but you can't set your machine's environment variable
  487. >>> # CUDA_VISIBLE_DEVICES, such as it is None or all cards
  488. >>> # {0,1,2,3,4,5,6,7}, you can pass `gpus` to
  489. >>> # select the GPU cards you want to use. For example,
  490. >>> # this case will use cards {4,5} if your machine hold 8 cards.
  491. >>> if __name__ == '__main__':
  492. ... dist.spawn(train, args=(True,), nprocs=2, gpus='4,5')
  493. """
  494. # Give an error hint when the users enter a configuration option
  495. # that does not exist
  496. _options_valid_check(options)
  497. # get default nprocs
  498. if nprocs == -1:
  499. nprocs = _get_default_nprocs()
  500. # NOTE(chenweihang): [ why need get cluster info before run? ]
  501. # when using `paddle.distributed.spawn` start parallel training,
  502. # we should get cluster info before starting subprocess, and pass
  503. # correct info to each subprocess
  504. procs_env_list = _get_subprocess_env_list(nprocs, options)
  505. # start processes
  506. # NOTE(chenweihang): [ why default start method is spawn? ]
  507. # The CUDA runtime does not support the fork start method,
  508. # either the spawn or forkserver start method are required
  509. # to use CUDA in subprocesses.
  510. start_method = options.get('start_method', None)
  511. if start_method is None:
  512. start_method = 'spawn'
  513. mp = multiprocessing.get_context(start_method)
  514. error_queues = []
  515. return_queues = []
  516. processes = []
  517. for i in range(nprocs):
  518. error_queue = mp.SimpleQueue()
  519. return_queue = mp.SimpleQueue()
  520. process = mp.Process(
  521. target=_func_wrapper,
  522. args=(
  523. func,
  524. args,
  525. error_queue,
  526. return_queue,
  527. procs_env_list[i],
  528. options['backend'],
  529. ),
  530. )
  531. process.daemon = daemon
  532. process.start()
  533. error_queues.append(error_queue)
  534. return_queues.append(return_queue)
  535. processes.append(process)
  536. context = MultiprocessContext(processes, error_queues, return_queues)
  537. if not join:
  538. return context
  539. # loop until all process end
  540. while not context.join():
  541. pass
  542. # finally return context
  543. return context