distributed_gpt3_pipeline.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # Copyright (c) Alibaba, Inc. and its affiliates.
  2. from typing import Any, Dict, Generator, Optional
  3. import torch
  4. from modelscope.metainfo import Pipelines
  5. from modelscope.models.nlp import DistributedGPT3
  6. from modelscope.pipelines.base import DistributedPipeline
  7. from modelscope.pipelines.builder import PIPELINES
  8. from modelscope.preprocessors import TextGenerationJiebaPreprocessor
  9. from modelscope.utils.constant import Frameworks, Tasks
  10. from modelscope.utils.device import device_placement
  11. from modelscope.utils.streaming_output import PipelineStreamingOutputMixin
  12. @PIPELINES.register_module(
  13. Tasks.text_generation, module_name=Pipelines.gpt3_generation)
  14. class DistributedGPT3Pipeline(DistributedPipeline,
  15. PipelineStreamingOutputMixin):
  16. """This class is used to instantiate the gpt3 model.
  17. """
  18. model = None
  19. def __init__(self, model, preprocessor=None, **kwargs):
  20. """
  21. Args:
  22. model: The model piece, str is not supported.
  23. preprocessor: The preprocessor matched with the model.
  24. kwargs (dict, `optional`):
  25. Extra kwargs passed into the preprocessor's constructor.
  26. """
  27. if preprocessor is None:
  28. preprocessor = TextGenerationJiebaPreprocessor(model)
  29. super().__init__(model, preprocessor=preprocessor, **kwargs)
  30. assert hasattr(preprocessor, 'tokenizer')
  31. self.model = PipelineStreamingOutputMixin()
  32. self._model_prepare = True
  33. @classmethod
  34. def _instantiate_one(cls, rank, model_dir, **kwargs):
  35. cls.model = DistributedGPT3(model_dir, rank, **kwargs)
  36. cls.model.eval()
  37. @classmethod
  38. def _forward_one(cls, inputs: Dict[str, Any]) -> Dict[str, Any]:
  39. tokens = inputs['inputs']['input_ids'].cuda(
  40. torch.cuda.current_device())
  41. return cls.model.generate(tokens, **inputs['forward_params'])
  42. def postprocess(self, inputs: Dict[str, Any],
  43. **postprocess_params) -> Dict[str, str]:
  44. """process the prediction results
  45. Args:
  46. inputs (Dict[str, Any]): _description_
  47. Returns:
  48. Dict[str, str]: the prediction results
  49. """
  50. from modelscope.outputs import OutputKeys
  51. return {
  52. OutputKeys.TEXT:
  53. self.preprocessor.tokenizer.detokenize(
  54. inputs.sequences[0].tolist())
  55. }
  56. def _sanitize_parameters(self, **pipeline_parameters):
  57. return {}, pipeline_parameters, {}
  58. def _stream_single(self, model_input: Dict[str, Any],
  59. forward_params: Dict[str, Any],
  60. postprocess_params: Dict[str, Any]) -> Generator:
  61. with device_placement(self.framework, self.device_name):
  62. if self._auto_collate:
  63. model_input = self._collate_fn(model_input)
  64. inputs = {'inputs': model_input, 'forward_params': forward_params}
  65. self.model_pool.map(self.__class__._stream_one,
  66. [inputs] * self.world_size)
  67. while True:
  68. res = self.model_pool.map(self.__class__._next_one,
  69. range(self.world_size))
  70. if res[0] is None:
  71. break
  72. out = self.postprocess(res[0], **postprocess_params)
  73. self._check_output(out)
  74. yield out
  75. @classmethod
  76. def _stream_one(cls, inputs: Dict[str, Any]) -> None:
  77. tokens = inputs['inputs']['input_ids'].cuda(
  78. torch.cuda.current_device())
  79. cls._stream = cls.model.stream_generate(tokens,
  80. **inputs['forward_params'])
  81. @classmethod
  82. def _next_one(cls, idx: int) -> Optional[Dict[str, Any]]:
  83. try:
  84. return next(cls._stream)
  85. except StopIteration:
  86. return None