inference.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. # Copyright (c) OpenMMLab. All rights reserved.
  2. # Copyright (c) Alibaba, Inc. and its affiliates.
  3. import logging
  4. import os
  5. import pickle
  6. import shutil
  7. from collections.abc import Mapping
  8. import torch
  9. from torch import distributed as dist
  10. from tqdm import tqdm
  11. from modelscope.utils.data_utils import to_device
  12. from modelscope.utils.torch_utils import (broadcast, get_dist_info, is_dist,
  13. is_master, make_tmp_dir)
  14. def single_gpu_test(trainer,
  15. data_loader,
  16. device,
  17. metric_classes=None,
  18. vis_closure=None,
  19. data_loader_iters=None):
  20. """Test model in EpochBasedTrainer with a single gpu.
  21. Args:
  22. trainer (modelscope.trainers.EpochBasedTrainer): Trainer to be tested.
  23. data_loader (nn.Dataloader): Pytorch data loader.
  24. device (str | torch.device): The target device for the data.
  25. metric_classes (List): List of Metric class that uses to collect metrics.
  26. vis_closure (Callable): Collect data for TensorboardHook.
  27. data_loader_iters (int): Used when dataset has no attribute __len__ or only load part of dataset.
  28. Returns:
  29. list: The prediction results.
  30. """
  31. dataset = data_loader.dataset
  32. progress_with_iters = False
  33. if data_loader_iters is None:
  34. try:
  35. data_len = len(dataset)
  36. except Exception as e:
  37. logging.error(e)
  38. raise ValueError(
  39. 'Please implement ``__len__`` method for your dataset, or provide ``data_loader_iters``'
  40. )
  41. desc = 'Total test samples'
  42. else:
  43. progress_with_iters = True
  44. data_len = data_loader_iters
  45. desc = 'Test iterations'
  46. with tqdm(total=data_len, desc=desc) as pbar:
  47. for i, data in enumerate(data_loader):
  48. data = to_device(data, device)
  49. evaluate_batch(trainer, data, metric_classes, vis_closure)
  50. if progress_with_iters:
  51. batch_size = 1 # iteration count
  52. else:
  53. if isinstance(data, Mapping):
  54. if 'nsentences' in data:
  55. batch_size = data['nsentences']
  56. else:
  57. try:
  58. batch_size = len(next(iter(data.values())))
  59. except Exception:
  60. batch_size = data_loader.batch_size
  61. else:
  62. batch_size = len(data)
  63. for _ in range(batch_size):
  64. pbar.update()
  65. if progress_with_iters and (i + 1) >= data_len:
  66. break
  67. return get_metric_values(metric_classes)
  68. def multi_gpu_test(trainer,
  69. data_loader,
  70. device,
  71. metric_classes=None,
  72. vis_closure=None,
  73. tmpdir=None,
  74. gpu_collect=False,
  75. data_loader_iters_per_gpu=None):
  76. """Test model in EpochBasedTrainer with multiple gpus.
  77. This method tests model with multiple gpus and collects the results
  78. under two different modes: gpu and cpu modes. By setting
  79. ``gpu_collect=True``, it encodes results to gpu tensors and use gpu
  80. communication for results collection. On cpu mode it saves the results on
  81. different gpus to ``tmpdir`` and collects them by the rank 0 worker.
  82. Args:
  83. trainer (modelscope.trainers.EpochBasedTrainer): Trainer to be tested.
  84. data_loader (nn.Dataloader): Pytorch data loader.
  85. device: (str | torch.device): The target device for the data.
  86. tmpdir (str): Path of directory to save the temporary results from
  87. different gpus under cpu mode.
  88. gpu_collect (bool): Option to use either gpu or cpu to collect results.
  89. data_loader_iters_per_gpu (int): Used when dataset has no attribute __len__ or only load part of dataset.
  90. Returns:
  91. list: The prediction results.
  92. """
  93. dataset = data_loader.dataset
  94. rank, world_size = get_dist_info(trainer.dp_group)
  95. progress_with_iters = False
  96. if data_loader_iters_per_gpu is None:
  97. try:
  98. data_len = len(dataset)
  99. total_samples = data_len
  100. except Exception as e:
  101. logging.error(e)
  102. raise ValueError(
  103. 'Please implement ``__len__`` method for your dataset, or provide ``data_loader_iters_per_gpu``'
  104. )
  105. desc = 'Total test samples with multi gpus'
  106. else:
  107. total_samples = 0
  108. progress_with_iters = True
  109. data_len = data_loader_iters_per_gpu * world_size
  110. desc = 'Total test iterations with multi gpus'
  111. count = 0
  112. with tqdm(total=data_len, desc=desc) as pbar:
  113. for i, data in enumerate(data_loader):
  114. data = to_device(data, device)
  115. evaluate_batch(trainer, data, metric_classes, vis_closure)
  116. if isinstance(data, Mapping):
  117. if 'nsentences' in data:
  118. batch_size = data['nsentences']
  119. else:
  120. batch_size = len(next(iter(data.values())))
  121. else:
  122. batch_size = len(data)
  123. if i >= (data_len // world_size) - 1:
  124. total_samples = torch.LongTensor([batch_size
  125. ]).to(trainer.model.device)
  126. dist.all_reduce(total_samples, op=dist.reduce_op.SUM)
  127. total_samples = total_samples.item()
  128. else:
  129. total_samples = batch_size * world_size
  130. if progress_with_iters:
  131. iter_cnt_all = world_size
  132. else:
  133. iter_cnt_all = total_samples
  134. count += iter_cnt_all
  135. if rank == 0:
  136. if count > data_len:
  137. iter_cnt_all = data_len - (count - iter_cnt_all)
  138. for _ in range(iter_cnt_all):
  139. pbar.update()
  140. if progress_with_iters and (i + 1) >= data_len:
  141. break
  142. # collect results and data from all ranks
  143. if gpu_collect:
  144. metric_classes_list = collect_results_gpu(metric_classes,
  145. trainer.dp_group)
  146. else:
  147. if tmpdir is None:
  148. tmpdir = make_tmp_dir()
  149. metric_classes_list = collect_results_cpu(
  150. metric_classes, trainer, os.path.join(tmpdir, 'metrics'))
  151. metric_classes = merge_metrics(metric_classes_list)
  152. return get_metric_values(metric_classes)
  153. def evaluate_batch(trainer, data, metric_classes, vis_closure):
  154. batch_result = trainer.evaluation_step(data)
  155. if metric_classes is not None:
  156. for metric_cls in metric_classes:
  157. metric_cls.add(batch_result, data)
  158. if vis_closure is not None:
  159. # trainer.visualization
  160. vis_closure(batch_result)
  161. def get_metric_values(metric_classes):
  162. metric_values = {}
  163. if is_master():
  164. for metric_cls in metric_classes:
  165. metric_values.update(metric_cls.evaluate())
  166. if is_dist():
  167. metric_values = broadcast(metric_values, 0)
  168. return metric_values
  169. def collect_results_cpu(result_part, trainer, tmpdir=None):
  170. """Collect results under cpu mode.
  171. On cpu mode, this function will save the results on different gpus to
  172. ``tmpdir`` and collect them by the rank 0 worker.
  173. Args:
  174. result_part (list): Result list containing result parts
  175. to be collected.
  176. trainer(`EpochBasedTrainer`): The trainer instance to get the parallel groups.
  177. tmpdir (str | None): temporal directory for collected results to
  178. store. If set to None, it will create a random temporal directory
  179. for it.
  180. Returns:
  181. list: The collected results.
  182. """
  183. rank, world_size = get_dist_info(trainer.dp_group)
  184. if tmpdir is None:
  185. tmpdir = make_tmp_dir()
  186. if not os.path.exists(tmpdir):
  187. os.makedirs(tmpdir, exist_ok=True)
  188. dist.barrier()
  189. # dump the part result to the dir
  190. if (not trainer.is_tp_group_available() or is_master(trainer.tp_group)) \
  191. and (not trainer.is_pp_group_available() or is_master(trainer.pp_group)):
  192. with open(os.path.join(tmpdir, f'part_{rank}.pkl'), 'wb') as f:
  193. pickle.dump(result_part, f)
  194. dist.barrier()
  195. # collect all parts
  196. if not is_master():
  197. return None
  198. else:
  199. # load results of all parts from tmp dir
  200. part_list = []
  201. for i in range(world_size):
  202. part_file = os.path.join(tmpdir, f'part_{i}.pkl')
  203. with open(part_file, 'rb') as f:
  204. part_result = pickle.load(f)
  205. # When data is severely insufficient, an empty part_result
  206. # on a certain gpu could makes the overall outputs empty.
  207. if part_result:
  208. part_list.append(part_result)
  209. # remove tmp dir
  210. shutil.rmtree(tmpdir)
  211. return part_list
  212. def collect_results_gpu(result_part, dp_group=None):
  213. """Collect results under gpu mode.
  214. On gpu mode, this function will encode results to gpu tensors and use gpu
  215. communication for results collection.
  216. Args:
  217. result_part (list): Result list containing result parts
  218. to be collected.
  219. dp_group(`ProcessGroup` or None): The data parallel group, default None for global group.
  220. Returns:
  221. list: The collected results.
  222. """
  223. _, world_size = get_dist_info(dp_group)
  224. # dump result part to tensor with pickle
  225. part_tensor = torch.tensor(
  226. bytearray(pickle.dumps(result_part)), dtype=torch.uint8, device='cuda')
  227. # gather all result part tensor shape
  228. shape_tensor = torch.tensor(part_tensor.shape, device='cuda')
  229. shape_list = [shape_tensor.clone() for _ in range(world_size)]
  230. dist.all_gather(shape_list, shape_tensor, dp_group)
  231. # padding result part tensor to max length
  232. shape_max = torch.tensor(shape_list).max()
  233. part_send = torch.zeros(shape_max, dtype=torch.uint8, device='cuda')
  234. part_send[:shape_tensor[0]] = part_tensor
  235. part_recv_list = [
  236. part_tensor.new_zeros(shape_max) for _ in range(world_size)
  237. ]
  238. # gather all result part
  239. dist.all_gather(part_recv_list, part_send, dp_group)
  240. if is_master():
  241. part_list = []
  242. for recv, shape in zip(part_recv_list, shape_list):
  243. part_result = pickle.loads(recv[:shape[0]].cpu().numpy().tobytes())
  244. # When data is severely insufficient, an empty part_result
  245. # on a certain gpu could makes the overall outputs empty.
  246. if part_result:
  247. part_list.append(part_result)
  248. return part_list
  249. def merge_metrics(metric_classes_list):
  250. if metric_classes_list is None:
  251. return None
  252. metric_classes_0 = metric_classes_list[0]
  253. for metric_classes_i in metric_classes_list[1:]:
  254. for cls_0, cls_i in zip(metric_classes_0, metric_classes_i):
  255. cls_0.merge(cls_i)
  256. return metric_classes_0