# Copyright (c) Alibaba, Inc. and its affiliates. import ast import base64 import importlib import inspect import os from io import BytesIO from typing import Any from urllib.parse import urlparse import json import numpy as np from modelscope.hub.file_download import model_file_download from modelscope.outputs.outputs import (TASK_OUTPUTS, OutputKeys, OutputTypes, OutputTypeSchema) from modelscope.pipeline_inputs import (INPUT_TYPE, INPUT_TYPE_SCHEMA, TASK_INPUTS, InputType) from modelscope.pipelines import pipeline from modelscope.pipelines.base import Pipeline from modelscope.utils.config import Config from modelscope.utils.constant import ModelFile, Tasks from modelscope.utils.logger import get_logger logger = get_logger() """Support webservice integration pipeline。 This module provides a support library when webservice uses pipeline, converts webservice input into pipeline input, and converts pipeline output into webservice output, which automatically encodes and decodes relevant fields. Example: # create pipeline instance and pipeline information, save it to app pipeline_instance = create_pipeline('damo/cv_gpen_image-portrait-enhancement', 'v1.0.0') # get pipeline information, input,output, request example. pipeline_info = get_pipeline_information_by_pipeline(pipeline_instance) # save the pipeline and info to the app for use in subsequent request processing app.state.pipeline = pipeline_instance app.state.pipeline_info = pipeline_info # for inference request, use call_pipeline_with_json to decode input and # call pipeline, call pipeline_output_to_service_base64_output # to encode necessary fields, and return the result. # request and response are json format. @router.post('/call') async def inference(request: Request): pipeline_service = request.app.state.pipeline pipeline_info = request.app.state.pipeline_info request_json = await request.json() result = call_pipeline_with_json(pipeline_info, pipeline_service, request_json) # convert output to json, if binary field, we need encoded. output = pipeline_output_to_service_base64_output(pipeline_info.task_name, result) return output # Inference service input and output and sample information can be obtained through the docs interface @router.get('/describe') async def index(request: Request): pipeline_info = request.app.state.pipeline_info return pipeline_info.schema Todo: * Support more service input type, such as form. """ def create_pipeline(model_id: str, revision: str, external_engine_for_llm: bool = True): model_configuration_file = model_file_download( model_id=model_id, file_path=ModelFile.CONFIGURATION, revision=revision) cfg = Config.from_file(model_configuration_file) return pipeline( task=cfg.task, model=model_id, model_revision=revision, external_engine_for_llm=external_engine_for_llm) def get_class_user_attributes(cls): attributes = inspect.getmembers(cls, lambda a: not (inspect.isroutine(a))) user_attributes = [ a for a in attributes if (not (a[0].startswith('__') and a[0].endswith('__'))) ] return user_attributes def get_input_type(task_inputs: Any): """Get task input schema. Args: task_name (str): The task name. """ if isinstance(task_inputs, str): # no input key input_type = INPUT_TYPE[task_inputs] return input_type elif isinstance(task_inputs, tuple) or isinstance(task_inputs, list): for item in task_inputs: if isinstance(item, dict): # for list, server only support dict format. return get_input_type(item) else: continue elif isinstance(task_inputs, dict): input_info = {} # key input key, value input type for k, v in task_inputs.items(): input_info[k] = get_input_type(v) return input_info else: raise ValueError(f'invalid input_type definition {task_inputs}') def get_input_schema(task_name: str, input_type: type): """Get task input schema. Args: task_name (str): The task name. input_type (type): The input type """ if input_type is None: task_inputs = TASK_INPUTS[task_name] if isinstance(task_inputs, str): # only one input field, key is task_inputs return { 'type': 'object', 'properties': { task_inputs: INPUT_TYPE_SCHEMA[task_inputs] } } else: task_inputs = input_type if isinstance(task_inputs, str): # no input key return INPUT_TYPE_SCHEMA[task_inputs] elif input_type is None and isinstance(task_inputs, list): for item in task_inputs: # for list, server only support dict format. if isinstance(item, dict): return get_input_schema(None, item) elif isinstance(task_inputs, tuple) or isinstance(task_inputs, list): input_schema = {'type': 'array', 'items': {}} for item in task_inputs: if isinstance(item, dict): item_schema = get_input_schema(None, item) input_schema['items']['type'] = item_schema return input_schema else: input_schema['items'] = INPUT_TYPE_SCHEMA[item] return input_schema elif isinstance(task_inputs, dict): input_schema = { 'type': 'object', 'properties': {} } # key input key, value input type for k, v in task_inputs.items(): input_schema['properties'][k] = get_input_schema(None, v) return input_schema else: raise ValueError(f'invalid input_type definition {task_inputs}') def get_output_schema(task_name: str): """Get task output schema. Args: task_name (str): The task name. """ task_outputs = TASK_OUTPUTS[task_name] output_schema = {'type': 'object', 'properties': {}} if not isinstance(task_outputs, list): raise ValueError('TASK_OUTPUTS for %s is not list.' % task_name) else: for output_key in task_outputs: output_schema['properties'][output_key] = OutputTypeSchema[ output_key] return output_schema def get_input_info(task_name: str): task_inputs = TASK_INPUTS[task_name] if isinstance(task_inputs, str): # no input key default input key input input_type = INPUT_TYPE[task_inputs] return input_type elif isinstance(task_inputs, tuple): return task_inputs elif isinstance(task_inputs, list): for item in task_inputs: if isinstance(item, dict): # for list, server only support dict format. return {'input': get_input_type(item)} else: continue elif isinstance(task_inputs, dict): input_info = {} # key input key, value input type for k, v in task_inputs.items(): input_info[k] = get_input_type(v) return {'input': input_info} else: raise ValueError(f'invalid input_type definition {task_inputs}') def get_output_info(task_name: str): output_keys = TASK_OUTPUTS[task_name] output_type = {} if not isinstance(output_keys, list): raise ValueError('TASK_OUTPUTS for %s is not list.' % task_name) else: for output_key in output_keys: output_type[output_key] = OutputTypes[output_key] return output_type def get_task_io_info(task_name: str): """Get task input output schema. Args: task_name (str): The task name. """ tasks = get_class_user_attributes(Tasks) task_exist = False for key, value in tasks: if key == task_name or value == task_name: task_exist = True break if not task_exist: return None, None task_inputs = get_input_info(task_name) task_outputs = get_output_info(task_name) return task_inputs, task_outputs def process_arg_type_annotation(arg, default_value): if arg.annotation is not None: if isinstance(arg.annotation, ast.Subscript): return arg.arg, arg.annotation.value.id elif isinstance(arg.annotation, ast.Name): return arg.arg, arg.annotation.id elif isinstance(arg.annotation, ast.Attribute): return arg.arg, arg.annotation.attr else: raise Exception('Invalid annotation: %s' % arg.annotation) else: if default_value is not None: return arg.arg, type(default_value).__name__ # Irregular, assuming no type hint no default value type is object logger.warning('arg: %s has no data type annotation, use default!' % (arg.arg)) return arg.arg, 'object' def convert_to_value(item): if isinstance(item, ast.Str): return item.s elif hasattr(ast, 'Bytes') and isinstance(item, ast.Bytes): return item.s elif isinstance(item, ast.Tuple): return tuple(convert_to_value(i) for i in item.elts) elif isinstance(item, ast.Num): return item.n elif isinstance(item, ast.Name): result = VariableKey(item=item) constants_lookup = { 'True': True, 'False': False, 'None': None, } return constants_lookup.get( result.name, result, ) elif isinstance(item, ast.NameConstant): # None, True, False are nameconstants in python3, but names in 2 return item.value else: return UnhandledKeyType() def process_args(args): arguments = [] # name, type, has_default, default n_args = len(args.args) n_args_default = len(args.defaults) # no default for arg in args.args[0:n_args - n_args_default]: if arg.arg == 'self': continue else: arg_name, arg_type = process_arg_type_annotation(arg, None) arguments.append((arg_name, arg_type, False, None)) # process defaults arg. for arg, dft in zip(args.args[n_args - n_args_default:], args.defaults): # compatible with python3.7 ast.Num no value. value = convert_to_value(dft) arg_name, arg_type = process_arg_type_annotation(arg, value) arguments.append((arg_name, arg_type, True, value)) # kwargs n_kwargs = len(args.kwonlyargs) n_kwargs_default = len(args.kw_defaults) for kwarg in args.kwonlyargs[0:n_kwargs - n_kwargs_default]: arg_name, arg_type = process_arg_type_annotation(kwarg) arguments.append((arg_name, arg_type, False, None)) for kwarg, dft in zip(args.kwonlyargs[n_kwargs - n_kwargs_default:], args.kw_defaults): arg_name, arg_type = process_arg_type_annotation(kwarg) arguments.append((arg_name, arg_type, True, dft.value)) return arguments class PipelineClassAnalyzer(ast.NodeVisitor): """Analysis pipeline class define get inputs and parameters. """ def __init__(self) -> None: super().__init__() self.parameters = [] self.has_call = False self.preprocess_parameters = [] self.has_preprocess = False self.has_postprocess = False self.has_forward = False self.forward_parameters = [] self.postprocess_parameters = [] self.lineno = 0 self.end_lineno = 0 def visit_FunctionDef(self, node: ast.FunctionDef) -> Any: if node.name == '__call__': self.parameters = process_args(node.args) self.has_call = True if node.name == 'preprocess': self.preprocess_parameters = process_args(node.args) self.has_preprocess = True elif node.name == 'postprocess': self.postprocess_parameters = process_args(node.args) self.has_postprocess = True elif node.name == 'forward': self.forward_parameters = process_args(node.args) self.has_forward = True def get_input_parameters(self): if self.has_call: # custom define __call__ inputs and parameter are control by the # custom __call__, all parameter is input. return self.parameters, None parameters = [] if self.has_preprocess: parameters.extend(self.preprocess_parameters[1:]) if self.has_forward: parameters.extend(self.forward_parameters[1:]) if self.has_postprocess: parameters.extend(self.postprocess_parameters[1:]) if len(parameters) > 0: return None, parameters else: return None, [] class AnalysisSourceFileRegisterModules(ast.NodeVisitor): """Get register_module call of the python source file. Args: ast (NodeVisitor): The ast node. Examples: >>> with open(source_file_path, "rb") as f: >>> src = f.read() >>> analyzer = AnalysisSourceFileRegisterModules(source_file_path) >>> analyzer.visit(ast.parse(src, filename=source_file_path)) """ def __init__(self, source_file_path, class_name) -> None: super().__init__() self.source_file_path = source_file_path self.class_name = class_name self.class_define = None def visit_ClassDef(self, node: ast.ClassDef): if node.name == self.class_name: self.class_define = node def get_pipeline_input_parameters( source_file_path: str, class_name: str, ): """Get pipeline input and parameter Args: source_file_path (str): The pipeline source code path class_name (str): The pipeline class name """ with open(source_file_path, 'rb') as f: src = f.read() analyzer = AnalysisSourceFileRegisterModules(source_file_path, class_name) analyzer.visit( ast.parse( src, filename=source_file_path, # python3.7 no type_comments parameter . # type_comments=True )) clz = PipelineClassAnalyzer() clz.visit(analyzer.class_define) input, pipeline_parameters = clz.get_input_parameters() # remove the first input parameter, the input is defined by task. return input, pipeline_parameters meta_type_schema_map = { # For parameters, current only support types. 'str': 'string', 'int': 'integer', 'float': 'number', 'bool': 'boolean', 'Dict': 'object', 'dict': 'object', 'list': 'array', 'List': 'array', 'Union': 'object', 'Input': 'object', 'object': 'object', } def generate_pipeline_parameters_schema(parameters): parameters_schema = {'type': 'object', 'properties': {}} if parameters is None or len(parameters) == 0: return {} for param in parameters: name, param_type, has_default, default_value = param # 'max_length': ('int', True, 1024) prop = {'type': meta_type_schema_map[param_type]} if has_default: prop['default'] = default_value parameters_schema['properties'][name] = prop return parameters_schema def get_pipeline_information_by_pipeline(pipeline: Pipeline, ): """Get pipeline input output schema. Args: pipeline (Pipeline): The pipeline object. """ task_name = pipeline.group_key pipeline_class = pipeline.__class__.__name__ spec = importlib.util.find_spec(pipeline.__module__) pipeline_file_path = spec.origin info = PipelineInfomation(task_name, pipeline_class, pipeline_file_path) return info class PipelineInfomation(): """Analyze pipeline information, task_name, schema. """ def __init__(self, task_name: str, class_name, source_path): self._task_name = task_name self._class_name = class_name self._source_path = source_path self._is_custom_call_method = False self._analyze() def _analyze(self): input, parameters = get_pipeline_input_parameters( self._source_path, self._class_name) # use base pipeline __call__ if inputs and outputs are defined in modelscope lib if self._task_name in TASK_INPUTS and self._task_name in TASK_OUTPUTS: # delete the first default input which is defined by task. if parameters is None: self._parameters_schema = {} else: self._parameters_schema = generate_pipeline_parameters_schema( parameters) self._input_schema = get_input_schema(self._task_name, None) self._output_schema = get_output_schema(self._task_name) elif input is not None: # custom pipeline implemented it's own __call__ method self._is_custom_call_method = True self._input_schema = generate_pipeline_parameters_schema(input) self._input_schema[ 'description'] = 'For binary input such as image audio video, only url is supported.' self._parameters_schema = {} self._output_schema = { 'type': 'object', } if self._task_name in TASK_OUTPUTS: self._output_schema = get_output_schema(self._task_name) else: logger.warning( 'Task: %s input is defined: %s, output is defined: %s which is not completed' % (self._task_name, self._task_name in TASK_INPUTS, self._task_name in TASK_OUTPUTS)) self._input_schema = None self._output_schema = None if self._task_name in TASK_INPUTS: self._input_schema = get_input_schema(self._task_name, None) if self._task_name in TASK_OUTPUTS: self._output_schema = get_output_schema(self._task_name) self._parameters_schema = generate_pipeline_parameters_schema( parameters) @property def task_name(self): return self._task_name @property def is_custom_call(self): return self._is_custom_call_method @property def input_schema(self): return self._input_schema @property def output_schema(self): return self._output_schema @property def parameters_schema(self): return self._parameters_schema @property def schema(self): return { 'input': self._input_schema if self._input_schema else self._parameters_schema, # all parameter is input 'parameters': self._parameters_schema if self._input_schema else {}, 'output': self._output_schema if self._output_schema else { 'type': 'object', }, } def __getitem__(self, key): return self.__dict__.get('_%s' % key) def is_url(url: str): """Check the input url is valid url. Args: url (str): The url Returns: bool: If is url return True, otherwise False. """ url_parsed = urlparse(url) if url_parsed.scheme in ('http', 'https', 'oss'): return True else: return False def decode_base64_to_image(content): if content.startswith('http') or content.startswith( 'oss') or os.path.exists(content): return content from PIL import Image image_file_content = base64.b64decode(content, '-_') return Image.open(BytesIO(image_file_content)) def decode_base64_to_audio(content): if content.startswith('http') or content.startswith( 'oss') or os.path.exists(content): return content file_content = base64.b64decode(content) return file_content def decode_base64_to_video(content): if content.startswith('http') or content.startswith( 'oss') or os.path.exists(content): return content file_content = base64.b64decode(content) return file_content def return_origin(content): return content def decode_box(content): pass def service_multipart_input_to_pipeline_input(body): """Convert multipart data to pipeline input. Args: body (dict): The multipart data body """ pass def pipeline_output_to_service_multipart_output(output): """Convert multipart data to service multipart output. Args: output (dict): Multipart body. """ pass base64_decoder_map = { InputType.IMAGE: decode_base64_to_image, InputType.TEXT: return_origin, InputType.AUDIO: decode_base64_to_audio, InputType.VIDEO: decode_base64_to_video, InputType.BOX: decode_box, InputType.DICT: return_origin, InputType.LIST: return_origin, InputType.NUMBER: return_origin, } def call_pipeline_with_json(pipeline_info: PipelineInfomation, pipeline: Pipeline, body: str): """Call pipeline with json input. Args: pipeline_info (PipelineInfomation): The pipeline information object. pipeline (Pipeline): The pipeline object. body (Dict): The input object, include input and parameters """ # TODO: is_custom_call misjudgment # if pipeline_info.is_custom_call: # pipeline_inputs = body['input'] # result = pipeline(**pipeline_inputs) # else: pipeline_inputs, parameters = service_base64_input_to_pipeline_input( pipeline_info['task_name'], body) result = pipeline(pipeline_inputs, **parameters) return result def service_base64_input_to_pipeline_input(task_name, body): """Convert service base64 input to pipeline input and parameters Args: task_name (str): The task name. body (Dict): The input object, include input and parameters """ if 'input' not in body: raise ValueError('No input data!') service_input = body['input'] if 'parameters' in body: parameters = body['parameters'] else: parameters = {} pipeline_input = {} if isinstance(service_input, (str, int, float)): return service_input, parameters task_input_info = TASK_INPUTS.get(task_name, None) if isinstance(task_input_info, str): # no input key default if isinstance(service_input, dict): return base64_decoder_map[task_input_info](list( service_input.values())[0]), parameters else: return base64_decoder_map[task_input_info]( service_input), parameters elif isinstance(task_input_info, tuple): pipeline_input = tuple(service_input) return pipeline_input, parameters elif isinstance(task_input_info, dict): for key, value in service_input.items( ): # task input has no nesting field. # get input filed type input_type = task_input_info[key] # TODO recursion for list, dict if need. if not isinstance(input_type, str): pipeline_input[key] = value continue if input_type not in INPUT_TYPE: raise ValueError('Invalid input field: %s' % input_type) pipeline_input[key] = base64_decoder_map[input_type](value) return pipeline_input, parameters elif isinstance(task_input_info, list): # one of input format, we use dict. for item in task_input_info: if isinstance(item, dict): for key, value in service_input.items( ): # task input has no nesting field. # get input filed type input_type = item[key] if input_type not in INPUT_TYPE: raise ValueError('Invalid input field: %s' % input_type) pipeline_input[key] = base64_decoder_map[input_type](value) return pipeline_input, parameters else: return service_input, parameters def encode_numpy_image_to_base64(image): import cv2 _, img_encode = cv2.imencode('.png', image) bytes_data = img_encode.tobytes() base64_str = str(base64.b64encode(bytes_data), 'utf-8') return base64_str def encode_video_to_base64(video): return str(base64.b64encode(video), 'utf-8') def encode_pcm_to_base64(pcm): return str(base64.b64encode(pcm), 'utf-8') def encode_wav_to_base64(wav): return str(base64.b64encode(wav), 'utf-8') def encode_bytes_to_base64(bts): return str(base64.b64encode(bts), 'utf-8') base64_encoder_map = { 'image': encode_numpy_image_to_base64, 'video': encode_video_to_base64, 'pcm': encode_pcm_to_base64, 'wav': encode_wav_to_base64, 'bytes': encode_bytes_to_base64, } # convert numpy etc type to python type. type_to_python_type = { np.int64: int, } def _convert_to_python_type(inputs): if isinstance(inputs, (list, tuple)): res = [] for item in inputs: res.append(_convert_to_python_type(item)) return res elif isinstance(inputs, dict): res = {} for k, v in inputs.items(): if type(v) in type_to_python_type: res[k] = type_to_python_type[type(v)](v) else: res[k] = _convert_to_python_type(v) return res elif isinstance(inputs, np.ndarray): return inputs.tolist() elif isinstance(inputs, np.floating): return float(inputs) elif isinstance(inputs, np.integer): return int(inputs) else: return inputs def pipeline_output_to_service_base64_output(task_name, pipeline_output): """Convert pipeline output to service output, convert binary fields to base64 encoding。 Args: task_name (str): The output task name. pipeline_output (object): The pipeline output. """ json_serializable_output = {} task_outputs = TASK_OUTPUTS.get(task_name, []) # TODO: for batch if isinstance(pipeline_output, list): pipeline_output = pipeline_output[0] for key, value in pipeline_output.items(): if key not in task_outputs: import torch if isinstance(value, torch.Tensor): v = np.array(value.cpu()).tolist() else: v = value json_serializable_output[key] = v continue # skip the output not defined. if key in [ OutputKeys.OUTPUT_IMG, OutputKeys.OUTPUT_IMGS, OutputKeys.OUTPUT_VIDEO, OutputKeys.OUTPUT_PCM, OutputKeys.OUTPUT_PCM_LIST, OutputKeys.OUTPUT_WAV ]: if isinstance(value, list): items = [] if key == OutputKeys.OUTPUT_IMGS: output_item_type = OutputKeys.OUTPUT_IMG else: output_item_type = OutputKeys.OUTPUT_PCM for item in value: items.append(base64_encoder_map[ OutputTypes[output_item_type]](item)) json_serializable_output[key] = items else: json_serializable_output[key] = base64_encoder_map[ OutputTypes[key]]( value) elif OutputTypes[key] in [np.ndarray] and isinstance( value, np.ndarray): json_serializable_output[key] = value.tolist() elif isinstance(value, np.ndarray): json_serializable_output[key] = value.tolist() else: json_serializable_output[key] = value return _convert_to_python_type(json_serializable_output) def get_task_input_examples(task): current_work_dir = os.path.dirname(__file__) with open(current_work_dir + '/pipeline_inputs.json', 'r') as f: input_examples = json.load(f) if task in input_examples: return input_examples[task] return None def get_task_schemas(task): current_work_dir = os.path.dirname(__file__) with open(current_work_dir + '/pipeline_schema.json', 'r') as f: schema = json.load(f) if task in schema: return schema[task] return None if __name__ == '__main__': from modelscope.utils.ast_utils import load_index index = load_index() task_schemas = {} for key, value in index['index'].items(): reg, task_name, class_name = key if reg == 'PIPELINES' and task_name != 'default': print( f"value['filepath']: {value['filepath']}, class_name: {class_name}" ) input, parameters = get_pipeline_input_parameters( value['filepath'], class_name) try: if task_name in TASK_INPUTS and task_name in TASK_OUTPUTS: # delete the first default input which is defined by task. # parameters.pop(0) parameters_schema = generate_pipeline_parameters_schema( parameters) input_schema = get_input_schema(task_name, None) output_schema = get_output_schema(task_name) schema = { 'input': input_schema, 'parameters': parameters_schema, 'output': output_schema } else: logger.warning( 'Task: %s input is defined: %s, output is defined: %s which is not completed' % (task_name, task_name in TASK_INPUTS, task_name in TASK_OUTPUTS)) input_schema = None output_schema = None if task_name in TASK_INPUTS: input_schema = get_input_schema(task_name, None) if task_name in TASK_OUTPUTS: output_schema = get_output_schema(task_name) parameters_schema = generate_pipeline_parameters_schema( parameters) schema = { 'input': input_schema if input_schema else parameters_schema, # all parameter is input 'parameters': parameters_schema if input_schema else {}, 'output': output_schema if output_schema else { 'type': 'object', }, } except BaseException: continue task_schemas[task_name] = schema s = json.dumps(task_schemas) with open('./task_schema.json', 'w') as f: f.write(s)