reduce.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import paddle
  15. from paddle import framework
  16. from paddle.distributed.communication import stream
  17. class ReduceOp:
  18. """
  19. Specify the type of operation used for element-wise reductions.
  20. It should be one of the following values:
  21. ReduceOp.SUM
  22. ReduceOp.MAX
  23. ReduceOp.MIN
  24. ReduceOp.PROD
  25. Examples:
  26. .. code-block:: python
  27. >>> # doctest: +REQUIRES(env: DISTRIBUTED)
  28. >>> import paddle
  29. >>> import paddle.distributed as dist
  30. >>> dist.init_parallel_env()
  31. >>> if dist.get_rank() == 0:
  32. ... data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
  33. >>> else:
  34. ... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
  35. >>> dist.all_reduce(data, op=dist.ReduceOp.SUM)
  36. >>> print(data)
  37. >>> # [[5, 7, 9], [5, 7, 9]] (2 GPUs)
  38. """
  39. SUM = 0
  40. MAX = 1
  41. MIN = 2
  42. PROD = 3
  43. AVG = 4
  44. def _get_reduce_op(reduce_op, func_name):
  45. if framework.in_dynamic_mode():
  46. if reduce_op == ReduceOp.SUM:
  47. return framework.core.ReduceOp.SUM
  48. elif reduce_op == ReduceOp.MAX:
  49. return framework.core.ReduceOp.MAX
  50. elif reduce_op == ReduceOp.MIN:
  51. return framework.core.ReduceOp.MIN
  52. elif reduce_op == ReduceOp.PROD:
  53. return framework.core.ReduceOp.PRODUCT
  54. elif reduce_op == ReduceOp.AVG:
  55. return framework.core.ReduceOp.AVG
  56. else:
  57. if reduce_op == ReduceOp.SUM:
  58. return f'c_{func_name}_sum'
  59. elif reduce_op == ReduceOp.MAX:
  60. return f'c_{func_name}_max'
  61. elif reduce_op == ReduceOp.MIN:
  62. return f'c_{func_name}_min'
  63. elif reduce_op == ReduceOp.PROD:
  64. return f'c_{func_name}_prod'
  65. else:
  66. return f'c_{func_name}'
  67. raise ValueError(f"Unknown reduce_op type for {func_name}.")
  68. def _to_inplace_op(op_name):
  69. return f"{op_name}_"
  70. def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True):
  71. """
  72. Reduce a tensor to the destination from all others. As shown below, one process is started with a GPU and the data of this process is represented
  73. by its group rank. The destination of the reduce operator is GPU0 and the process is sum. Through reduce operator,
  74. the GPU0 will owns the sum of all data from all GPUs.
  75. .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/reduce.png
  76. :width: 800
  77. :alt: reduce
  78. :align: center
  79. Args:
  80. tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type
  81. should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
  82. dst (int): The destination rank id.
  83. op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD|ReduceOp.AVG, optional): The operation used. Default value is ReduceOp.SUM.
  84. group (Group, optional): The group instance return by new_group or None for global default group.
  85. sync_op (bool, optional): Whether this op is a sync op. The default value is True.
  86. Returns:
  87. Return a task object.
  88. Examples:
  89. .. code-block:: python
  90. >>> # doctest: +REQUIRES(env: DISTRIBUTED)
  91. >>> import paddle
  92. >>> import paddle.distributed as dist
  93. >>> dist.init_parallel_env()
  94. >>> if dist.get_rank() == 0:
  95. ... data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
  96. >>> else:
  97. ... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
  98. >>> dist.reduce(data, dst=0)
  99. >>> print(data)
  100. >>> # [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0)
  101. >>> # [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1)
  102. """
  103. # AVG is only supported when nccl >= 2.10
  104. if op == ReduceOp.AVG and (not is_avg_reduce_op_supported()):
  105. group = (
  106. paddle.distributed.collective._get_global_group()
  107. if group is None
  108. else group
  109. )
  110. tensor.scale_(1.0 / group.nranks)
  111. return stream.reduce(
  112. tensor,
  113. dst=dst,
  114. op=ReduceOp.SUM,
  115. group=group,
  116. sync_op=sync_op,
  117. use_calc_stream=False,
  118. )
  119. return stream.reduce(
  120. tensor,
  121. dst=dst,
  122. op=op,
  123. group=group,
  124. sync_op=sync_op,
  125. use_calc_stream=False,
  126. )
  127. # code below will be removed after we remove the old dygraph
  128. if group is not None and not group.is_member():
  129. return
  130. use_calc_stream = sync_op
  131. ring_id = 0 if group is None else group.id
  132. gdst = dst if group is None else group.get_group_rank(dst)
  133. assert gdst >= 0, "dst rank out of group, need global rank"
  134. if op == ReduceOp.SUM:
  135. return paddle._legacy_C_ops.c_reduce_sum(
  136. tensor,
  137. tensor,
  138. 'use_calc_stream',
  139. use_calc_stream,
  140. 'ring_id',
  141. ring_id,
  142. 'root_id',
  143. gdst,
  144. )
  145. elif op == ReduceOp.MAX:
  146. return paddle._legacy_C_ops.c_reduce_max(
  147. tensor,
  148. tensor,
  149. 'use_calc_stream',
  150. use_calc_stream,
  151. 'ring_id',
  152. ring_id,
  153. 'root_id',
  154. gdst,
  155. )
  156. elif op == ReduceOp.MIN:
  157. return paddle._legacy_C_ops.c_reduce_min(
  158. tensor,
  159. tensor,
  160. 'use_calc_stream',
  161. use_calc_stream,
  162. 'ring_id',
  163. ring_id,
  164. 'root_id',
  165. gdst,
  166. )
  167. elif op == ReduceOp.PROD:
  168. return paddle._legacy_C_ops.c_reduce_prod(
  169. tensor,
  170. tensor,
  171. 'use_calc_stream',
  172. use_calc_stream,
  173. 'ring_id',
  174. ring_id,
  175. 'root_id',
  176. gdst,
  177. )
  178. else:
  179. raise ValueError(f"Unknown parameter: {op}.")
  180. def is_avg_reduce_op_supported():
  181. if paddle.is_compiled_with_cuda():
  182. return paddle.base.core.nccl_version() >= 21000
  183. else:
  184. return False