| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900 |
- # Copyright (c) Alibaba, Inc. and its affiliates.
- import ast
- import base64
- import importlib
- import inspect
- import os
- from io import BytesIO
- from typing import Any
- from urllib.parse import urlparse
- import json
- import numpy as np
- from modelscope.hub.file_download import model_file_download
- from modelscope.outputs.outputs import (TASK_OUTPUTS, OutputKeys, OutputTypes,
- OutputTypeSchema)
- from modelscope.pipeline_inputs import (INPUT_TYPE, INPUT_TYPE_SCHEMA,
- TASK_INPUTS, InputType)
- from modelscope.pipelines import pipeline
- from modelscope.pipelines.base import Pipeline
- from modelscope.utils.config import Config
- from modelscope.utils.constant import ModelFile, Tasks
- from modelscope.utils.logger import get_logger
- logger = get_logger()
- """Support webservice integration pipeline。
- This module provides a support library when webservice uses pipeline,
- converts webservice input into pipeline input, and converts pipeline
- output into webservice output, which automatically encodes and
- decodes relevant fields.
- Example:
- # create pipeline instance and pipeline information, save it to app
- pipeline_instance = create_pipeline('damo/cv_gpen_image-portrait-enhancement', 'v1.0.0')
- # get pipeline information, input,output, request example.
- pipeline_info = get_pipeline_information_by_pipeline(pipeline_instance)
- # save the pipeline and info to the app for use in subsequent request processing
- app.state.pipeline = pipeline_instance
- app.state.pipeline_info = pipeline_info
- # for inference request, use call_pipeline_with_json to decode input and
- # call pipeline, call pipeline_output_to_service_base64_output
- # to encode necessary fields, and return the result.
- # request and response are json format.
- @router.post('/call')
- async def inference(request: Request):
- pipeline_service = request.app.state.pipeline
- pipeline_info = request.app.state.pipeline_info
- request_json = await request.json()
- result = call_pipeline_with_json(pipeline_info,
- pipeline_service,
- request_json)
- # convert output to json, if binary field, we need encoded.
- output = pipeline_output_to_service_base64_output(pipeline_info.task_name, result)
- return output
- # Inference service input and output and sample information can be obtained through the docs interface
- @router.get('/describe')
- async def index(request: Request):
- pipeline_info = request.app.state.pipeline_info
- return pipeline_info.schema
- Todo:
- * Support more service input type, such as form.
- """
- def create_pipeline(model_id: str,
- revision: str,
- external_engine_for_llm: bool = True):
- model_configuration_file = model_file_download(
- model_id=model_id,
- file_path=ModelFile.CONFIGURATION,
- revision=revision)
- cfg = Config.from_file(model_configuration_file)
- return pipeline(
- task=cfg.task,
- model=model_id,
- model_revision=revision,
- external_engine_for_llm=external_engine_for_llm)
- def get_class_user_attributes(cls):
- attributes = inspect.getmembers(cls, lambda a: not (inspect.isroutine(a)))
- user_attributes = [
- a for a in attributes
- if (not (a[0].startswith('__') and a[0].endswith('__')))
- ]
- return user_attributes
- def get_input_type(task_inputs: Any):
- """Get task input schema.
- Args:
- task_name (str): The task name.
- """
- if isinstance(task_inputs, str): # no input key
- input_type = INPUT_TYPE[task_inputs]
- return input_type
- elif isinstance(task_inputs, tuple) or isinstance(task_inputs, list):
- for item in task_inputs:
- if isinstance(item,
- dict): # for list, server only support dict format.
- return get_input_type(item)
- else:
- continue
- elif isinstance(task_inputs, dict):
- input_info = {} # key input key, value input type
- for k, v in task_inputs.items():
- input_info[k] = get_input_type(v)
- return input_info
- else:
- raise ValueError(f'invalid input_type definition {task_inputs}')
- def get_input_schema(task_name: str, input_type: type):
- """Get task input schema.
- Args:
- task_name (str): The task name.
- input_type (type): The input type
- """
- if input_type is None:
- task_inputs = TASK_INPUTS[task_name]
- if isinstance(task_inputs,
- str): # only one input field, key is task_inputs
- return {
- 'type': 'object',
- 'properties': {
- task_inputs: INPUT_TYPE_SCHEMA[task_inputs]
- }
- }
- else:
- task_inputs = input_type
- if isinstance(task_inputs, str): # no input key
- return INPUT_TYPE_SCHEMA[task_inputs]
- elif input_type is None and isinstance(task_inputs, list):
- for item in task_inputs:
- # for list, server only support dict format.
- if isinstance(item, dict):
- return get_input_schema(None, item)
- elif isinstance(task_inputs, tuple) or isinstance(task_inputs, list):
- input_schema = {'type': 'array', 'items': {}}
- for item in task_inputs:
- if isinstance(item, dict):
- item_schema = get_input_schema(None, item)
- input_schema['items']['type'] = item_schema
- return input_schema
- else:
- input_schema['items'] = INPUT_TYPE_SCHEMA[item]
- return input_schema
- elif isinstance(task_inputs, dict):
- input_schema = {
- 'type': 'object',
- 'properties': {}
- } # key input key, value input type
- for k, v in task_inputs.items():
- input_schema['properties'][k] = get_input_schema(None, v)
- return input_schema
- else:
- raise ValueError(f'invalid input_type definition {task_inputs}')
- def get_output_schema(task_name: str):
- """Get task output schema.
- Args:
- task_name (str): The task name.
- """
- task_outputs = TASK_OUTPUTS[task_name]
- output_schema = {'type': 'object', 'properties': {}}
- if not isinstance(task_outputs, list):
- raise ValueError('TASK_OUTPUTS for %s is not list.' % task_name)
- else:
- for output_key in task_outputs:
- output_schema['properties'][output_key] = OutputTypeSchema[
- output_key]
- return output_schema
- def get_input_info(task_name: str):
- task_inputs = TASK_INPUTS[task_name]
- if isinstance(task_inputs, str): # no input key default input key input
- input_type = INPUT_TYPE[task_inputs]
- return input_type
- elif isinstance(task_inputs, tuple):
- return task_inputs
- elif isinstance(task_inputs, list):
- for item in task_inputs:
- if isinstance(item,
- dict): # for list, server only support dict format.
- return {'input': get_input_type(item)}
- else:
- continue
- elif isinstance(task_inputs, dict):
- input_info = {} # key input key, value input type
- for k, v in task_inputs.items():
- input_info[k] = get_input_type(v)
- return {'input': input_info}
- else:
- raise ValueError(f'invalid input_type definition {task_inputs}')
- def get_output_info(task_name: str):
- output_keys = TASK_OUTPUTS[task_name]
- output_type = {}
- if not isinstance(output_keys, list):
- raise ValueError('TASK_OUTPUTS for %s is not list.' % task_name)
- else:
- for output_key in output_keys:
- output_type[output_key] = OutputTypes[output_key]
- return output_type
- def get_task_io_info(task_name: str):
- """Get task input output schema.
- Args:
- task_name (str): The task name.
- """
- tasks = get_class_user_attributes(Tasks)
- task_exist = False
- for key, value in tasks:
- if key == task_name or value == task_name:
- task_exist = True
- break
- if not task_exist:
- return None, None
- task_inputs = get_input_info(task_name)
- task_outputs = get_output_info(task_name)
- return task_inputs, task_outputs
- def process_arg_type_annotation(arg, default_value):
- if arg.annotation is not None:
- if isinstance(arg.annotation, ast.Subscript):
- return arg.arg, arg.annotation.value.id
- elif isinstance(arg.annotation, ast.Name):
- return arg.arg, arg.annotation.id
- elif isinstance(arg.annotation, ast.Attribute):
- return arg.arg, arg.annotation.attr
- else:
- raise Exception('Invalid annotation: %s' % arg.annotation)
- else:
- if default_value is not None:
- return arg.arg, type(default_value).__name__
- # Irregular, assuming no type hint no default value type is object
- logger.warning('arg: %s has no data type annotation, use default!' %
- (arg.arg))
- return arg.arg, 'object'
- def convert_to_value(item):
- if isinstance(item, ast.Str):
- return item.s
- elif hasattr(ast, 'Bytes') and isinstance(item, ast.Bytes):
- return item.s
- elif isinstance(item, ast.Tuple):
- return tuple(convert_to_value(i) for i in item.elts)
- elif isinstance(item, ast.Num):
- return item.n
- elif isinstance(item, ast.Name):
- result = VariableKey(item=item)
- constants_lookup = {
- 'True': True,
- 'False': False,
- 'None': None,
- }
- return constants_lookup.get(
- result.name,
- result,
- )
- elif isinstance(item, ast.NameConstant):
- # None, True, False are nameconstants in python3, but names in 2
- return item.value
- else:
- return UnhandledKeyType()
- def process_args(args):
- arguments = []
- # name, type, has_default, default
- n_args = len(args.args)
- n_args_default = len(args.defaults)
- # no default
- for arg in args.args[0:n_args - n_args_default]:
- if arg.arg == 'self':
- continue
- else:
- arg_name, arg_type = process_arg_type_annotation(arg, None)
- arguments.append((arg_name, arg_type, False, None))
- # process defaults arg.
- for arg, dft in zip(args.args[n_args - n_args_default:], args.defaults):
- # compatible with python3.7 ast.Num no value.
- value = convert_to_value(dft)
- arg_name, arg_type = process_arg_type_annotation(arg, value)
- arguments.append((arg_name, arg_type, True, value))
- # kwargs
- n_kwargs = len(args.kwonlyargs)
- n_kwargs_default = len(args.kw_defaults)
- for kwarg in args.kwonlyargs[0:n_kwargs - n_kwargs_default]:
- arg_name, arg_type = process_arg_type_annotation(kwarg)
- arguments.append((arg_name, arg_type, False, None))
- for kwarg, dft in zip(args.kwonlyargs[n_kwargs - n_kwargs_default:],
- args.kw_defaults):
- arg_name, arg_type = process_arg_type_annotation(kwarg)
- arguments.append((arg_name, arg_type, True, dft.value))
- return arguments
- class PipelineClassAnalyzer(ast.NodeVisitor):
- """Analysis pipeline class define get inputs and parameters.
- """
- def __init__(self) -> None:
- super().__init__()
- self.parameters = []
- self.has_call = False
- self.preprocess_parameters = []
- self.has_preprocess = False
- self.has_postprocess = False
- self.has_forward = False
- self.forward_parameters = []
- self.postprocess_parameters = []
- self.lineno = 0
- self.end_lineno = 0
- def visit_FunctionDef(self, node: ast.FunctionDef) -> Any:
- if node.name == '__call__':
- self.parameters = process_args(node.args)
- self.has_call = True
- if node.name == 'preprocess':
- self.preprocess_parameters = process_args(node.args)
- self.has_preprocess = True
- elif node.name == 'postprocess':
- self.postprocess_parameters = process_args(node.args)
- self.has_postprocess = True
- elif node.name == 'forward':
- self.forward_parameters = process_args(node.args)
- self.has_forward = True
- def get_input_parameters(self):
- if self.has_call:
- # custom define __call__ inputs and parameter are control by the
- # custom __call__, all parameter is input.
- return self.parameters, None
- parameters = []
- if self.has_preprocess:
- parameters.extend(self.preprocess_parameters[1:])
- if self.has_forward:
- parameters.extend(self.forward_parameters[1:])
- if self.has_postprocess:
- parameters.extend(self.postprocess_parameters[1:])
- if len(parameters) > 0:
- return None, parameters
- else:
- return None, []
- class AnalysisSourceFileRegisterModules(ast.NodeVisitor):
- """Get register_module call of the python source file.
- Args:
- ast (NodeVisitor): The ast node.
- Examples:
- >>> with open(source_file_path, "rb") as f:
- >>> src = f.read()
- >>> analyzer = AnalysisSourceFileRegisterModules(source_file_path)
- >>> analyzer.visit(ast.parse(src, filename=source_file_path))
- """
- def __init__(self, source_file_path, class_name) -> None:
- super().__init__()
- self.source_file_path = source_file_path
- self.class_name = class_name
- self.class_define = None
- def visit_ClassDef(self, node: ast.ClassDef):
- if node.name == self.class_name:
- self.class_define = node
- def get_pipeline_input_parameters(
- source_file_path: str,
- class_name: str,
- ):
- """Get pipeline input and parameter
- Args:
- source_file_path (str): The pipeline source code path
- class_name (str): The pipeline class name
- """
- with open(source_file_path, 'rb') as f:
- src = f.read()
- analyzer = AnalysisSourceFileRegisterModules(source_file_path,
- class_name)
- analyzer.visit(
- ast.parse(
- src,
- filename=source_file_path,
- # python3.7 no type_comments parameter .
- # type_comments=True
- ))
- clz = PipelineClassAnalyzer()
- clz.visit(analyzer.class_define)
- input, pipeline_parameters = clz.get_input_parameters()
- # remove the first input parameter, the input is defined by task.
- return input, pipeline_parameters
- meta_type_schema_map = {
- # For parameters, current only support types.
- 'str': 'string',
- 'int': 'integer',
- 'float': 'number',
- 'bool': 'boolean',
- 'Dict': 'object',
- 'dict': 'object',
- 'list': 'array',
- 'List': 'array',
- 'Union': 'object',
- 'Input': 'object',
- 'object': 'object',
- }
- def generate_pipeline_parameters_schema(parameters):
- parameters_schema = {'type': 'object', 'properties': {}}
- if parameters is None or len(parameters) == 0:
- return {}
- for param in parameters:
- name, param_type, has_default, default_value = param
- # 'max_length': ('int', True, 1024)
- prop = {'type': meta_type_schema_map[param_type]}
- if has_default:
- prop['default'] = default_value
- parameters_schema['properties'][name] = prop
- return parameters_schema
- def get_pipeline_information_by_pipeline(pipeline: Pipeline, ):
- """Get pipeline input output schema.
- Args:
- pipeline (Pipeline): The pipeline object.
- """
- task_name = pipeline.group_key
- pipeline_class = pipeline.__class__.__name__
- spec = importlib.util.find_spec(pipeline.__module__)
- pipeline_file_path = spec.origin
- info = PipelineInfomation(task_name, pipeline_class, pipeline_file_path)
- return info
- class PipelineInfomation():
- """Analyze pipeline information, task_name, schema.
- """
- def __init__(self, task_name: str, class_name, source_path):
- self._task_name = task_name
- self._class_name = class_name
- self._source_path = source_path
- self._is_custom_call_method = False
- self._analyze()
- def _analyze(self):
- input, parameters = get_pipeline_input_parameters(
- self._source_path, self._class_name)
- # use base pipeline __call__ if inputs and outputs are defined in modelscope lib
- if self._task_name in TASK_INPUTS and self._task_name in TASK_OUTPUTS:
- # delete the first default input which is defined by task.
- if parameters is None:
- self._parameters_schema = {}
- else:
- self._parameters_schema = generate_pipeline_parameters_schema(
- parameters)
- self._input_schema = get_input_schema(self._task_name, None)
- self._output_schema = get_output_schema(self._task_name)
- elif input is not None: # custom pipeline implemented it's own __call__ method
- self._is_custom_call_method = True
- self._input_schema = generate_pipeline_parameters_schema(input)
- self._input_schema[
- 'description'] = 'For binary input such as image audio video, only url is supported.'
- self._parameters_schema = {}
- self._output_schema = {
- 'type': 'object',
- }
- if self._task_name in TASK_OUTPUTS:
- self._output_schema = get_output_schema(self._task_name)
- else:
- logger.warning(
- 'Task: %s input is defined: %s, output is defined: %s which is not completed'
- % (self._task_name, self._task_name
- in TASK_INPUTS, self._task_name in TASK_OUTPUTS))
- self._input_schema = None
- self._output_schema = None
- if self._task_name in TASK_INPUTS:
- self._input_schema = get_input_schema(self._task_name, None)
- if self._task_name in TASK_OUTPUTS:
- self._output_schema = get_output_schema(self._task_name)
- self._parameters_schema = generate_pipeline_parameters_schema(
- parameters)
- @property
- def task_name(self):
- return self._task_name
- @property
- def is_custom_call(self):
- return self._is_custom_call_method
- @property
- def input_schema(self):
- return self._input_schema
- @property
- def output_schema(self):
- return self._output_schema
- @property
- def parameters_schema(self):
- return self._parameters_schema
- @property
- def schema(self):
- return {
- 'input': self._input_schema if self._input_schema else
- self._parameters_schema, # all parameter is input
- 'parameters':
- self._parameters_schema if self._input_schema else {},
- 'output': self._output_schema if self._output_schema else {
- 'type': 'object',
- },
- }
- def __getitem__(self, key):
- return self.__dict__.get('_%s' % key)
- def is_url(url: str):
- """Check the input url is valid url.
- Args:
- url (str): The url
- Returns:
- bool: If is url return True, otherwise False.
- """
- url_parsed = urlparse(url)
- if url_parsed.scheme in ('http', 'https', 'oss'):
- return True
- else:
- return False
- def decode_base64_to_image(content):
- if content.startswith('http') or content.startswith(
- 'oss') or os.path.exists(content):
- return content
- from PIL import Image
- image_file_content = base64.b64decode(content, '-_')
- return Image.open(BytesIO(image_file_content))
- def decode_base64_to_audio(content):
- if content.startswith('http') or content.startswith(
- 'oss') or os.path.exists(content):
- return content
- file_content = base64.b64decode(content)
- return file_content
- def decode_base64_to_video(content):
- if content.startswith('http') or content.startswith(
- 'oss') or os.path.exists(content):
- return content
- file_content = base64.b64decode(content)
- return file_content
- def return_origin(content):
- return content
- def decode_box(content):
- pass
- def service_multipart_input_to_pipeline_input(body):
- """Convert multipart data to pipeline input.
- Args:
- body (dict): The multipart data body
- """
- pass
- def pipeline_output_to_service_multipart_output(output):
- """Convert multipart data to service multipart output.
- Args:
- output (dict): Multipart body.
- """
- pass
- base64_decoder_map = {
- InputType.IMAGE: decode_base64_to_image,
- InputType.TEXT: return_origin,
- InputType.AUDIO: decode_base64_to_audio,
- InputType.VIDEO: decode_base64_to_video,
- InputType.BOX: decode_box,
- InputType.DICT: return_origin,
- InputType.LIST: return_origin,
- InputType.NUMBER: return_origin,
- }
- def call_pipeline_with_json(pipeline_info: PipelineInfomation,
- pipeline: Pipeline, body: str):
- """Call pipeline with json input.
- Args:
- pipeline_info (PipelineInfomation): The pipeline information object.
- pipeline (Pipeline): The pipeline object.
- body (Dict): The input object, include input and parameters
- """
- # TODO: is_custom_call misjudgment
- # if pipeline_info.is_custom_call:
- # pipeline_inputs = body['input']
- # result = pipeline(**pipeline_inputs)
- # else:
- pipeline_inputs, parameters = service_base64_input_to_pipeline_input(
- pipeline_info['task_name'], body)
- result = pipeline(pipeline_inputs, **parameters)
- return result
- def service_base64_input_to_pipeline_input(task_name, body):
- """Convert service base64 input to pipeline input and parameters
- Args:
- task_name (str): The task name.
- body (Dict): The input object, include input and parameters
- """
- if 'input' not in body:
- raise ValueError('No input data!')
- service_input = body['input']
- if 'parameters' in body:
- parameters = body['parameters']
- else:
- parameters = {}
- pipeline_input = {}
- if isinstance(service_input, (str, int, float)):
- return service_input, parameters
- task_input_info = TASK_INPUTS.get(task_name, None)
- if isinstance(task_input_info, str): # no input key default
- if isinstance(service_input, dict):
- return base64_decoder_map[task_input_info](list(
- service_input.values())[0]), parameters
- else:
- return base64_decoder_map[task_input_info](
- service_input), parameters
- elif isinstance(task_input_info, tuple):
- pipeline_input = tuple(service_input)
- return pipeline_input, parameters
- elif isinstance(task_input_info, dict):
- for key, value in service_input.items(
- ): # task input has no nesting field.
- # get input filed type
- input_type = task_input_info[key]
- # TODO recursion for list, dict if need.
- if not isinstance(input_type, str):
- pipeline_input[key] = value
- continue
- if input_type not in INPUT_TYPE:
- raise ValueError('Invalid input field: %s' % input_type)
- pipeline_input[key] = base64_decoder_map[input_type](value)
- return pipeline_input, parameters
- elif isinstance(task_input_info,
- list): # one of input format, we use dict.
- for item in task_input_info:
- if isinstance(item, dict):
- for key, value in service_input.items(
- ): # task input has no nesting field.
- # get input filed type
- input_type = item[key]
- if input_type not in INPUT_TYPE:
- raise ValueError('Invalid input field: %s'
- % input_type)
- pipeline_input[key] = base64_decoder_map[input_type](value)
- return pipeline_input, parameters
- else:
- return service_input, parameters
- def encode_numpy_image_to_base64(image):
- import cv2
- _, img_encode = cv2.imencode('.png', image)
- bytes_data = img_encode.tobytes()
- base64_str = str(base64.b64encode(bytes_data), 'utf-8')
- return base64_str
- def encode_video_to_base64(video):
- return str(base64.b64encode(video), 'utf-8')
- def encode_pcm_to_base64(pcm):
- return str(base64.b64encode(pcm), 'utf-8')
- def encode_wav_to_base64(wav):
- return str(base64.b64encode(wav), 'utf-8')
- def encode_bytes_to_base64(bts):
- return str(base64.b64encode(bts), 'utf-8')
- base64_encoder_map = {
- 'image': encode_numpy_image_to_base64,
- 'video': encode_video_to_base64,
- 'pcm': encode_pcm_to_base64,
- 'wav': encode_wav_to_base64,
- 'bytes': encode_bytes_to_base64,
- }
- # convert numpy etc type to python type.
- type_to_python_type = {
- np.int64: int,
- }
- def _convert_to_python_type(inputs):
- if isinstance(inputs, (list, tuple)):
- res = []
- for item in inputs:
- res.append(_convert_to_python_type(item))
- return res
- elif isinstance(inputs, dict):
- res = {}
- for k, v in inputs.items():
- if type(v) in type_to_python_type:
- res[k] = type_to_python_type[type(v)](v)
- else:
- res[k] = _convert_to_python_type(v)
- return res
- elif isinstance(inputs, np.ndarray):
- return inputs.tolist()
- elif isinstance(inputs, np.floating):
- return float(inputs)
- elif isinstance(inputs, np.integer):
- return int(inputs)
- else:
- return inputs
- def pipeline_output_to_service_base64_output(task_name, pipeline_output):
- """Convert pipeline output to service output,
- convert binary fields to base64 encoding。
- Args:
- task_name (str): The output task name.
- pipeline_output (object): The pipeline output.
- """
- json_serializable_output = {}
- task_outputs = TASK_OUTPUTS.get(task_name, [])
- # TODO: for batch
- if isinstance(pipeline_output, list):
- pipeline_output = pipeline_output[0]
- for key, value in pipeline_output.items():
- if key not in task_outputs:
- import torch
- if isinstance(value, torch.Tensor):
- v = np.array(value.cpu()).tolist()
- else:
- v = value
- json_serializable_output[key] = v
- continue # skip the output not defined.
- if key in [
- OutputKeys.OUTPUT_IMG, OutputKeys.OUTPUT_IMGS,
- OutputKeys.OUTPUT_VIDEO, OutputKeys.OUTPUT_PCM,
- OutputKeys.OUTPUT_PCM_LIST, OutputKeys.OUTPUT_WAV
- ]:
- if isinstance(value, list):
- items = []
- if key == OutputKeys.OUTPUT_IMGS:
- output_item_type = OutputKeys.OUTPUT_IMG
- else:
- output_item_type = OutputKeys.OUTPUT_PCM
- for item in value:
- items.append(base64_encoder_map[
- OutputTypes[output_item_type]](item))
- json_serializable_output[key] = items
- else:
- json_serializable_output[key] = base64_encoder_map[
- OutputTypes[key]](
- value)
- elif OutputTypes[key] in [np.ndarray] and isinstance(
- value, np.ndarray):
- json_serializable_output[key] = value.tolist()
- elif isinstance(value, np.ndarray):
- json_serializable_output[key] = value.tolist()
- else:
- json_serializable_output[key] = value
- return _convert_to_python_type(json_serializable_output)
- def get_task_input_examples(task):
- current_work_dir = os.path.dirname(__file__)
- with open(current_work_dir + '/pipeline_inputs.json', 'r') as f:
- input_examples = json.load(f)
- if task in input_examples:
- return input_examples[task]
- return None
- def get_task_schemas(task):
- current_work_dir = os.path.dirname(__file__)
- with open(current_work_dir + '/pipeline_schema.json', 'r') as f:
- schema = json.load(f)
- if task in schema:
- return schema[task]
- return None
- if __name__ == '__main__':
- from modelscope.utils.ast_utils import load_index
- index = load_index()
- task_schemas = {}
- for key, value in index['index'].items():
- reg, task_name, class_name = key
- if reg == 'PIPELINES' and task_name != 'default':
- print(
- f"value['filepath']: {value['filepath']}, class_name: {class_name}"
- )
- input, parameters = get_pipeline_input_parameters(
- value['filepath'], class_name)
- try:
- if task_name in TASK_INPUTS and task_name in TASK_OUTPUTS:
- # delete the first default input which is defined by task.
- # parameters.pop(0)
- parameters_schema = generate_pipeline_parameters_schema(
- parameters)
- input_schema = get_input_schema(task_name, None)
- output_schema = get_output_schema(task_name)
- schema = {
- 'input': input_schema,
- 'parameters': parameters_schema,
- 'output': output_schema
- }
- else:
- logger.warning(
- 'Task: %s input is defined: %s, output is defined: %s which is not completed'
- % (task_name, task_name in TASK_INPUTS, task_name
- in TASK_OUTPUTS))
- input_schema = None
- output_schema = None
- if task_name in TASK_INPUTS:
- input_schema = get_input_schema(task_name, None)
- if task_name in TASK_OUTPUTS:
- output_schema = get_output_schema(task_name)
- parameters_schema = generate_pipeline_parameters_schema(
- parameters)
- schema = {
- 'input': input_schema if input_schema else
- parameters_schema, # all parameter is input
- 'parameters':
- parameters_schema if input_schema else {},
- 'output': output_schema if output_schema else {
- 'type': 'object',
- },
- }
- except BaseException:
- continue
- task_schemas[task_name] = schema
- s = json.dumps(task_schemas)
- with open('./task_schema.json', 'w') as f:
- f.write(s)
|