| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614 |
- # Copyright (c) Alibaba, Inc. and its affiliates.
- import os
- import os.path as osp
- import random
- from abc import ABC, abstractmethod
- from functools import partial
- from multiprocessing import Pool
- from threading import Lock
- from typing import Any, Dict, Generator, List, Mapping, Optional, Union
- import numpy as np
- from packaging import version
- from modelscope.models.base import Model
- from modelscope.msdatasets import MsDataset
- from modelscope.outputs import TASK_OUTPUTS, ModelOutputBase
- from modelscope.pipeline_inputs import TASK_INPUTS, check_input_type
- from modelscope.preprocessors import Preprocessor
- from modelscope.utils.config import Config
- from modelscope.utils.constant import Frameworks, Invoke, ModelFile
- from modelscope.utils.device import (create_device, device_placement,
- verify_device)
- from modelscope.utils.hub import read_config, snapshot_download
- from modelscope.utils.import_utils import is_tf_available, is_torch_available
- from modelscope.utils.logger import get_logger
- from modelscope.utils.torch_utils import compile_model
- from ..utils.automodel_utils import check_model_from_owner_group
- from .util import is_model, is_official_hub_path
- if is_torch_available():
- import torch
- if is_tf_available():
- pass
- Tensor = Union['torch.Tensor', 'tf.Tensor']
- Input = Union[str, tuple, MsDataset, 'Image.Image', 'numpy.ndarray']
- InputModel = Union[str, Model, 'torch.nn.Module']
- logger = get_logger()
- class Pipeline(ABC):
- """Pipeline base.
- """
- def initiate_single_model(self, model, **kwargs):
- if self.trust_remote_code:
- kwargs['trust_remote_code'] = True
- if isinstance(model, str):
- logger.info(f'initiate model from {model}')
- if isinstance(model, str) and is_official_hub_path(model):
- logger.info(f'initiate model from location {model}.')
- # expecting model has been prefetched to local cache beforehand
- return Model.from_pretrained(
- model,
- device=self.device_name,
- model_prefetched=True,
- invoked_by=Invoke.PIPELINE,
- device_map=self.device_map,
- **kwargs) if is_model(model) else model
- else:
- return model
- def initiate_multiple_models(self, input_models: List[InputModel]):
- models = []
- for model in input_models:
- models.append(self.initiate_single_model(model))
- return models
- def __init__(self,
- config_file: str = None,
- model: Union[InputModel, List[InputModel]] = None,
- preprocessor: Union[Preprocessor, List[Preprocessor]] = None,
- device: str = 'gpu',
- auto_collate=True,
- device_map=None,
- **kwargs):
- """ Base class for pipeline.
- If config_file is provided, model and preprocessor will be
- instantiated from corresponding config. Otherwise, model
- and preprocessor will be constructed separately.
- Args:
- config_file(str, optional): Filepath to configuration file.
- model: (list of) Model name or model object
- preprocessor: (list of) Preprocessor object
- device (str): device str, should be either cpu, cuda, gpu, gpu:X or cuda:X
- auto_collate (bool): automatically to convert data to tensor or not.
- compile (bool, optional): Compile the model with torch 2.0, default False
- compile_options (dict, optional): The compile options if compile=True,
- default None to use the default params of 'TorchModel.compile'.
- """
- if device_map is not None:
- assert device == 'gpu', '`device` and `device_map` cannot be input at the same time!'
- self.device_map = device_map
- verify_device(device)
- self.device_name = device
- self.trust_remote_code = kwargs.get('trust_remote_code', False)
- if not isinstance(model, List):
- self.model = self.initiate_single_model(model, **kwargs)
- self.models = [self.model]
- else:
- self.model = None
- self.models = self.initiate_multiple_models(model)
- self.has_multiple_models = len(self.models) > 1
- if config_file is not None:
- self.cfg = Config.from_file(config_file)
- model_dir = os.path.dirname(config_file)
- elif not self.has_multiple_models:
- if isinstance(self.model, str):
- model_dir = self.model
- else:
- model_dir = self.model.model_dir
- self.cfg = read_config(model_dir)
- if preprocessor is None and not self.has_multiple_models:
- self.preprocessor = Preprocessor.from_pretrained(model_dir)
- else:
- self.preprocessor = preprocessor
- if self.model or (self.has_multiple_models and self.models[0]):
- self.framework = self._get_framework()
- else:
- self.framework = None
- if self.framework == Frameworks.torch:
- self.device = create_device(self.device_name)
- self._model_prepare = False
- self._model_prepare_lock = Lock()
- self._auto_collate = auto_collate
- self._compile = kwargs.get('compile', False)
- self._compile_options = kwargs.get('compile_options', {})
- def check_trust_remote_code(self,
- info_str: Optional[str] = None,
- model_dir: Optional[str] = None):
- """Check trust_remote_code if the pipeline needs to import extra libs
- Args:
- info_str(str): The info showed to user if trust_remote_code is `False`.
- model_dir(`Optional[str]`): The local model directory. If is a trusted model, check remote code will pass.
- """
- info_str = info_str or (
- 'This pipeline requires `trust_remote_code` to be `True` because it needs to '
- 'import extra libs or execute the code in the model repo, setting this to true '
- 'means you trust the files in it.')
- if not check_model_from_owner_group(model_dir=model_dir):
- assert self.trust_remote_code, info_str
- def prepare_model(self):
- """ Place model on certain device for pytorch models before first inference
- """
- self._model_prepare_lock.acquire(timeout=600)
- def _prepare_single(model):
- if not isinstance(model, torch.nn.Module) and hasattr(
- model, 'model'):
- model = model.model
- if not isinstance(model, torch.nn.Module):
- return
- model.eval()
- from modelscope.utils.torch_utils import is_on_same_device
- if is_on_same_device(model):
- model.to(self.device)
- if not self._model_prepare:
- # prepare model for pytorch
- if self.framework == Frameworks.torch:
- if self.has_multiple_models:
- for m in self.models:
- _prepare_single(m)
- if self._compile:
- self.models = [
- compile_model(m, **self._compile_options)
- for m in self.models
- ]
- else:
- _prepare_single(self.model)
- if self._compile:
- self.model = compile_model(self.model,
- **self._compile_options)
- self._model_prepare = True
- self._model_prepare_lock.release()
- def _get_framework(self) -> str:
- frameworks = []
- for m in self.models:
- if isinstance(m, str):
- model_dir = m
- else:
- model_dir = m.model_dir
- cfg_file = osp.join(model_dir, ModelFile.CONFIGURATION)
- cfg = Config.from_file(cfg_file)
- frameworks.append(cfg.framework)
- if not all(x == frameworks[0] for x in frameworks):
- logger.warning(
- f'got multiple models, but they are in different frameworks {frameworks}'
- )
- return None
- return frameworks[0]
- def __call__(self, input: Union[Input, List[Input]], *args,
- **kwargs) -> Union[Dict[str, Any], Generator]:
- # model provider should leave it as it is
- # modelscope library developer will handle this function
- # place model to cpu or gpu
- if (self.model or (self.has_multiple_models and self.models[0])):
- if not self._model_prepare:
- self.prepare_model()
- # simple showcase, need to support iterator type for both tensorflow and pytorch
- # input_dict = self._handle_input(input)
- # sanitize the parameters
- batch_size = kwargs.pop('batch_size', None)
- preprocess_params, forward_params, postprocess_params = self._sanitize_parameters(
- **kwargs)
- kwargs['preprocess_params'] = preprocess_params
- kwargs['forward_params'] = forward_params
- kwargs['postprocess_params'] = postprocess_params
- # for LLMPipeline, we shall support treating list of roles as a
- # one single 'messages' input
- if 'LLMPipeline' in type(self).__name__ and isinstance(input, list):
- input = {'messages': input}
- kwargs['is_message'] = True
- if isinstance(input, list):
- if batch_size is None:
- output = []
- for ele in input:
- output.append(self._process_single(ele, *args, **kwargs))
- else:
- output = self._process_batch(input, batch_size, **kwargs)
- elif isinstance(input, MsDataset):
- return self._process_iterator(input, *args, **kwargs)
- else:
- output = self._process_single(input, *args, **kwargs)
- return output
- def _sanitize_parameters(self, **pipeline_parameters):
- """
- this method should sanitize the keyword args to preprocessor params,
- forward params and postprocess params on '__call__' or '_process_single' method
- considered to be a normal classmethod with default implementation / output
- Default Returns:
- Dict[str, str]: preprocess_params = {}
- Dict[str, str]: forward_params = {}
- Dict[str, str]: postprocess_params = pipeline_parameters
- """
- return {}, {}, pipeline_parameters
- def _process_iterator(self, input: Input, *args, **kwargs):
- for ele in input:
- yield self._process_single(ele, *args, **kwargs)
- def _collate_fn(self, data):
- return collate_fn(data, self.device)
- def _process_single(self, input: Input, *args, **kwargs) -> Dict[str, Any]:
- preprocess_params = kwargs.get('preprocess_params', {})
- forward_params = kwargs.get('forward_params', {})
- postprocess_params = kwargs.get('postprocess_params', {})
- self._check_input(input)
- out = self.preprocess(input, **preprocess_params)
- with device_placement(self.framework, self.device_name):
- if self.framework == Frameworks.torch:
- with torch.no_grad():
- if self._auto_collate:
- out = self._collate_fn(out)
- out = self.forward(out, **forward_params)
- else:
- out = self.forward(out, **forward_params)
- out = self.postprocess(out, **postprocess_params)
- self._check_output(out)
- return out
- def _batch(self, data_list):
- batch_data = {}
- for sample_preprocessed in data_list:
- for k, v in sample_preprocessed.items():
- value_list = batch_data.get(k, [])
- value_list.append(v)
- batch_data[k] = value_list
- for k in batch_data.keys():
- if isinstance(batch_data[k][0], torch.Tensor):
- batch_data[k] = torch.cat(batch_data[k])
- return batch_data
- def _process_batch(self, input: List[Input], batch_size,
- **kwargs) -> Dict[str, Any]:
- preprocess_params = kwargs.get('preprocess_params')
- forward_params = kwargs.get('forward_params')
- postprocess_params = kwargs.get('postprocess_params')
- # batch data
- output_list = []
- for i in range(0, len(input), batch_size):
- end = min(i + batch_size, len(input))
- real_batch_size = end - i
- preprocessed_list = [
- self.preprocess(i, **preprocess_params) for i in input[i:end]
- ]
- with device_placement(self.framework, self.device_name):
- if self.framework == Frameworks.torch:
- with torch.no_grad():
- batched_out = self._batch(preprocessed_list)
- if self._auto_collate:
- batched_out = self._collate_fn(batched_out)
- batched_out = self.forward(batched_out,
- **forward_params)
- else:
- batched_out = self._batch(preprocessed_list)
- batched_out = self.forward(batched_out, **forward_params)
- for batch_idx in range(real_batch_size):
- out = {}
- for k, element in batched_out.items():
- if element is not None:
- if isinstance(element, (tuple, list)):
- if isinstance(element[0], torch.Tensor):
- out[k] = type(element)(
- e[batch_idx:batch_idx + 1]
- for e in element)
- else:
- # Compatible with traditional pipelines
- out[k] = element[batch_idx]
- else:
- out[k] = element[batch_idx:batch_idx + 1]
- out = self.postprocess(out, **postprocess_params)
- self._check_output(out)
- output_list.append(out)
- return output_list
- def _check_input(self, input):
- task_name = self.group_key
- if task_name in TASK_INPUTS:
- input_type = TASK_INPUTS[task_name]
- # if multiple input formats are defined, we first
- # found the one that match input data and check
- if isinstance(input_type, list):
- matched_type = None
- for t in input_type:
- if isinstance(input, (dict, tuple)):
- if type(t) == type(input):
- matched_type = t
- break
- elif isinstance(t, str):
- matched_type = t
- break
- if matched_type is None:
- err_msg = 'input data format for current pipeline should be one of following: \n'
- for t in input_type:
- err_msg += f'{t}\n'
- raise ValueError(err_msg)
- else:
- input_type = matched_type
- if isinstance(input_type, str):
- check_input_type(input_type, input)
- elif isinstance(input_type, tuple):
- assert isinstance(input, tuple), 'input should be a tuple'
- for t, input_ele in zip(input_type, input):
- check_input_type(t, input_ele)
- elif isinstance(input_type, dict):
- for k in input_type.keys():
- # allow single input for multi-modal models
- if isinstance(input, dict) and k in input:
- check_input_type(input_type[k], input[k])
- else:
- raise ValueError(f'invalid input_type definition {input_type}')
- elif not getattr(self, '_input_has_warned', False):
- logger.warning(f'task {task_name} input definition is missing')
- self._input_has_warned = True
- def _check_output(self, input):
- # this attribute is dynamically attached by registry
- # when cls is registered in registry using task name
- task_name = self.group_key
- if task_name not in TASK_OUTPUTS:
- if not getattr(self, '_output_has_warned', False):
- logger.warning(f'task {task_name} output keys are missing')
- self._output_has_warned = True
- return
- output_keys = TASK_OUTPUTS[task_name]
- missing_keys = []
- input = input.keys() if isinstance(input,
- (dict, ModelOutputBase)) else input
- for k in output_keys:
- if isinstance(k, (dict, ModelOutputBase)) and k not in input:
- missing_keys.append(k)
- if len(missing_keys) > 0:
- raise ValueError(f'expected output keys are {output_keys}, '
- f'those {missing_keys} are missing')
- def preprocess(self, inputs: Input, **preprocess_params) -> Dict[str, Any]:
- """ Provide default implementation based on preprocess_cfg and user can reimplement it
- """
- assert self.preprocessor is not None, 'preprocess method should be implemented'
- assert not isinstance(self.preprocessor, List),\
- 'default implementation does not support using multiple preprocessors.'
- return self.preprocessor(inputs, **preprocess_params)
- def forward(self, inputs: Dict[str, Any],
- **forward_params) -> Dict[str, Any]:
- """ Provide default implementation using self.model and user can reimplement it
- """
- assert self.model is not None, 'forward method should be implemented'
- assert not self.has_multiple_models, 'default implementation does not support multiple models in a pipeline.'
- return self.model(inputs, **forward_params)
- def postprocess(self, inputs: Dict[str, Any],
- **post_params) -> Dict[str, Any]:
- """ If current pipeline support model reuse, common postprocess
- code should be write here.
- Args:
- inputs: input data
- post_params: post process parameters
- Return:
- dict of results: a dict containing outputs of model, each
- output should have the standard output name.
- """
- raise NotImplementedError('postprocess')
- class DistributedPipeline(Pipeline):
- """This pipeline is used to load multi gpu models.
- What will this class do:
- 1. Read the global config from the configuration.json
- 2. Set the multiprocessing method to spawn
- 3. Open a multiprocessing pool of the world_size to instantiate model pieces.
- 4. Set the master port and ip
- 5. Call _instantiate_one to instantiate one model piece,
- This method should be implemented by the derived class.
- 6. After the forward method is called, do preprocess in main process and
- call _forward_one to collect results, and do post process in main process.
- NOTE: _instantiate_one and _forward_one are class methods, any derived class should implement them and
- store the model handler in the class field.
- """
- def __init__(self,
- model: str = None,
- preprocessor: Union[Preprocessor, List[Preprocessor]] = None,
- auto_collate=True,
- **kwargs):
- # DistributedPipeline uses classmethod to initialize model
- # without calling super().__init__ method
- self.preprocessor = preprocessor
- self._model_prepare = False
- self._model_prepare_lock = Lock()
- self._auto_collate = auto_collate
- if os.path.exists(model):
- self.model_dir = model
- else:
- self.model_dir = snapshot_download(model)
- self.cfg = read_config(self.model_dir)
- self.world_size = self._get_world_size(self.cfg)
- self.model_pool = None
- self.device_name = 'cpu'
- self.device = create_device(self.device_name)
- self.has_multiple_models = False
- self.framework = self.cfg.framework
- torch.multiprocessing.set_start_method('spawn', force=True)
- ranks = list(range(self.world_size))
- self.model_pool = Pool(self.world_size)
- if 'master_ip' not in kwargs:
- kwargs['master_ip'] = '127.0.0.1'
- master_port = int(kwargs['master_port']
- ) if 'master_port' in kwargs else random.randint(
- 29500, 39500)
- from modelscope.utils.torch_utils import _find_free_port, _is_free_port
- if not _is_free_port(master_port):
- master_port = _find_free_port()
- kwargs['master_port'] = str(master_port)
- # TODO: Pass ip and port to megatron_util for initialization
- os.environ['MASTER_ADDR'] = kwargs['master_ip']
- os.environ['MASTER_PORT'] = kwargs['master_port']
- self.model_pool.map(
- partial(
- self.__class__._instantiate_one,
- model_dir=self.model_dir,
- **self.cfg.model,
- **kwargs), ranks)
- self.models = []
- def __del__(self):
- if hasattr(self, 'model_pool') and self.model_pool is not None:
- try:
- self.model_pool.terminate()
- except AttributeError:
- pass
- def __getstate__(self):
- self_dict = self.__dict__.copy()
- del self_dict['model_pool']
- del self_dict['preprocessor']
- del self_dict['_model_prepare_lock']
- return self_dict
- @classmethod
- def _instantiate_one(cls, rank, model_dir, **kwargs):
- """Instantiate one model piece.
- Args:
- rank: The model rank.
- model_dir: The model_dir in the node.
- kwargs: Any extra args.
- Returns:
- None. The model handler should be kept in the class field.
- """
- pass
- def forward(self, inputs: Dict[str, Any],
- **forward_params) -> Dict[str, Any]:
- inputs = {
- 'inputs': inputs,
- 'forward_params': forward_params,
- }
- res = self.model_pool.map(self.__class__._forward_one,
- [inputs] * self.world_size)
- return res[0]
- @classmethod
- def _forward_one(cls, inputs):
- """Forward the inputs to one model piece.
- Use the model handler kept in the class field to forward.
- Args:
- inputs: The inputs after the preprocessing.
- Returns:
- The forward results.
- """
- pass
- def _get_world_size(self, cfg: Config) -> int:
- m_world_size = cfg.safe_get('megatron.world_size')
- if m_world_size is None:
- return cfg.safe_get('model.world_size')
- return m_world_size
- def collate_fn(data, device):
- """Prepare the input just before the forward function.
- This method will move the tensors to the right device.
- Usually this method does not need to be overridden.
- Args:
- data: The data out of the dataloader.
- device: The device to move data to.
- Returns: The processed data.
- """
- from torch.utils.data.dataloader import default_collate
- def get_class_name(obj):
- return obj.__class__.__name__
- if isinstance(data, dict) or isinstance(data, Mapping):
- # add compatibility for img_metas for mmlab models
- return type(data)({
- k: collate_fn(v, device) if k != 'img_metas' else v
- for k, v in data.items()
- })
- elif isinstance(data, (tuple, list)):
- if 0 == len(data):
- return torch.Tensor([])
- if isinstance(data[0], (int, float)):
- return default_collate(data).to(device)
- else:
- return type(data)(collate_fn(v, device) for v in data)
- elif isinstance(data, np.ndarray):
- if data.dtype.type is np.str_:
- return data
- else:
- return collate_fn(torch.from_numpy(data), device)
- elif isinstance(data, torch.Tensor):
- return data.to(device)
- elif isinstance(data, (bytes, str, int, float, bool, type(None))):
- return data
- elif get_class_name(data) == 'InputFeatures':
- # modelscope.preprocessors.nlp.InputFeatures
- return data
- elif get_class_name(data) == 'DataContainer':
- # mmcv.parallel.DataContainer
- return data
- else:
- raise ValueError(f'Unsupported data type {type(data)}')
|