cloud_utils.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import os
  15. from paddle.distributed.utils.launch_utils import (
  16. get_cluster,
  17. get_cluster_from_args,
  18. get_gpus,
  19. logger,
  20. )
  21. __all__ = []
  22. def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_devices):
  23. """
  24. args_node_ips:string, args_node_ip:string, args_port: int, selected_devices:list
  25. """
  26. # you can automatically get ip info while using paddlecloud multi nodes mode.
  27. node_ips = os.getenv("PADDLE_TRAINERS")
  28. assert node_ips is not None, "PADDLE_TRAINERS should not be None"
  29. node_ip = os.getenv("POD_IP")
  30. assert node_ip is not None, "POD_IP should not be None"
  31. node_rank = os.getenv("PADDLE_TRAINER_ID")
  32. assert node_rank is not None, "PADDLE_TRAINER_ID should not be None"
  33. paddle_ports_num = int(os.getenv("TRAINER_PORTS_NUM"))
  34. assert paddle_ports_num is not None, "TRAINER_PORTS_NUM should not be None"
  35. node_ips = node_ips.split(",")
  36. num_nodes = len(node_ips)
  37. node_rank = int(node_rank)
  38. if node_ip != "127.0.0.1" and node_ip != args_node_ip:
  39. logger.warning(
  40. f"Please NOTE: When using paddlecloud, node_ip is \
  41. automatically got from POD_IP. Your input node_ip: {args_node_ip} doesn't equals to \
  42. node_ip: {node_ip} from paddlecloud environment."
  43. )
  44. if args_node_ips != "127.0.0.1" and args_node_ips != ",".join(node_ips):
  45. logger.warning(
  46. f"Please NOTE: When using paddlecloud, cluster_node_ips is \
  47. automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\
  48. Your input cluster_node_ips: {args_node_ips} doesn't equals to IPs: {node_ips} from \
  49. paddlecloud environment."
  50. )
  51. # DISTRIBUTED_TRAINER_ENDPOINTS: new environment since paddlecloud 1.8.4
  52. # e.g: DISTRIBUTED_TRAINER_ENDPOINTS="ip1:port1,ip1:port2,ip1:port3,ip1:port4,ip2:port5,ip2:port6,ip2:port7,ip2:port8"
  53. trainer_endpoints = os.getenv("DISTRIBUTED_TRAINER_ENDPOINTS")
  54. if trainer_endpoints is None:
  55. started_port = args_port
  56. if num_nodes > 1:
  57. try:
  58. paddle_port = int(os.getenv("PADDLE_PORT", ""))
  59. if (
  60. paddle_ports_num >= len(selected_devices)
  61. and paddle_port != args_port
  62. ):
  63. logger.warning(f"Use Cloud specified port:{paddle_port}.")
  64. started_port = paddle_port
  65. except Exception as e:
  66. print(e)
  67. if started_port is None:
  68. started_port = 6170
  69. ports = list(range(started_port, started_port + len(selected_devices)))
  70. trainer_endpoints = []
  71. for ip in node_ips:
  72. trainer_endpoints.append(["%s:%d" % (ip, port) for port in ports])
  73. else:
  74. trainer_endpoints_ori = trainer_endpoints.split(",")
  75. trainer_endpoints = []
  76. assert num_nodes * paddle_ports_num == len(trainer_endpoints_ori)
  77. for i in range(num_nodes):
  78. trainer_endpoints.append(
  79. trainer_endpoints_ori[
  80. i * paddle_ports_num : (i + 1) * paddle_ports_num
  81. ]
  82. )
  83. logger.debug(
  84. f"parsed from args: node_ips:{node_ips} \
  85. node_ip:{node_ip} node_rank:{node_rank} trainer_endpoints:{trainer_endpoints}"
  86. )
  87. cluster, pod = get_cluster(
  88. node_ips, node_ip, trainer_endpoints, selected_devices
  89. )
  90. return cluster, cluster.pods[node_rank]
  91. def _get_trainers_num():
  92. return int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
  93. def get_cluster_and_pod(args):
  94. # parse arguments, used for cloud-single-machine and local
  95. selected_devices = get_gpus(args.selected_devices)
  96. trainers_num = _get_trainers_num()
  97. logger.debug(
  98. f"parsed from args trainerss_num:{trainers_num} selected_devices:{selected_devices}"
  99. )
  100. cluster = None
  101. pod = None
  102. if args.use_paddlecloud and trainers_num != 1:
  103. cluster, pod = get_cloud_cluster(
  104. args.cluster_node_ips,
  105. args.node_ip,
  106. args.started_port,
  107. selected_devices,
  108. )
  109. logger.info(f"get cluster from cloud:{cluster}")
  110. else:
  111. cluster, pod = get_cluster_from_args(args, selected_devices)
  112. logger.info(f"get cluster from args:{cluster}")
  113. return cluster, pod