| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- import base64
- import mimetypes
- from io import BytesIO
- import json
- import numpy as np
- import requests
- from modelscope.outputs import TASK_OUTPUTS, OutputKeys
- from modelscope.pipeline_inputs import TASK_INPUTS, InputType
- # service data decoder func decodes data from network and convert it to pipeline's input
- # for example
- def ExampleDecoder(data):
- # Assuming the pipeline inputs is a dict contains an image and a text,
- # to decode the data from network we decode the image as base64
- data_json = json.loads(data)
- # data: {"image": "xxxxxxxx=="(base64 str), "text": "a question"}
- # pipeline(inputs) as follows:
- # pipeline({'image': image, 'text': text})
- inputs = {
- 'image': decode_base64_to_image(data_json.get('image')),
- 'text': data_json.get('text')
- }
- return inputs
- # service data encoder func encodes data from pipeline outputs and convert to network response (such as json)
- # for example
- def ExampleEncoder(data):
- # Assuming the pipeline outputs is a dict contains an image and a text,
- # and transmit it through network, this func encode image to base64 and dumps into json
- # data (for e.g. python dict):
- # {"image": a numpy array represents a image, "text": "output"}
- image = data['image']
- text = data['text']
- data = {'image': encode_array_to_img_base64(image), 'text': text}
- return json.dumps(data, cls=NumpyEncoder)
- CustomEncoder = {
- # Tasks.visual_question_answering: ExampleEncoder
- }
- CustomDecoder = {
- # Tasks.visual_question_answering: ExampleDecoder
- }
- class NumpyEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, np.ndarray):
- return obj.tolist()
- if isinstance(obj, np.floating):
- return float(obj)
- if isinstance(obj, np.integer):
- return int(obj)
- return json.JSONEncoder.default(self, obj)
- def get_extension(encoding):
- encoding = encoding.replace('audio/wav', 'audio/x-wav')
- tp = mimetypes.guess_type(encoding)[0]
- if tp == 'audio/flac': # flac is not supported by mimetypes
- return 'flac'
- extension = mimetypes.guess_extension(tp)
- if extension is not None and extension.startswith('.'):
- extension = extension[1:]
- return extension
- def get_mimetype(filename):
- mimetype = mimetypes.guess_type(filename)[0]
- if mimetype is not None:
- mimetype = mimetype.replace('x-wav', 'wav').replace('x-flac', 'flac')
- return mimetype
- def decode_base64_to_binary(encoding):
- extension = get_extension(encoding)
- data = encoding.split(',')[1]
- return base64.b64decode(data), extension
- def decode_base64_to_image(encoding):
- from PIL import Image
- content = encoding.split(';')[1]
- image_encoded = content.split(',')[1]
- return Image.open(BytesIO(base64.b64decode(image_encoded)))
- def encode_array_to_img_base64(image_array):
- from PIL import Image
- with BytesIO() as output_bytes:
- pil_image = Image.fromarray(image_array.astype(np.uint8))
- pil_image.save(output_bytes, 'PNG')
- bytes_data = output_bytes.getvalue()
- base64_str = str(base64.b64encode(bytes_data), 'utf-8')
- return 'data:image/png;base64,' + base64_str
- def encode_pcm_to_base64(bytes_data):
- from scipy.io.wavfile import write
- with BytesIO() as out_mem_file:
- write(out_mem_file, 16000, bytes_data)
- base64_str = str(base64.b64encode(out_mem_file.getvalue()), 'utf-8')
- return 'data:audio/pcm;base64,' + base64_str
- def encode_url_to_base64(url):
- encoded_string = base64.b64encode(requests.get(url).content)
- base64_str = str(encoded_string, 'utf-8')
- mimetype = get_mimetype(url)
- return ('data:' + (mimetype if mimetype is not None else '') + ';base64,'
- + base64_str)
- def encode_file_to_base64(f):
- with open(f, 'rb') as file:
- encoded_string = base64.b64encode(file.read())
- base64_str = str(encoded_string, 'utf-8')
- mimetype = get_mimetype(f)
- return ('data:' + (mimetype if mimetype is not None else '')
- + ';base64,' + base64_str)
- def encode_url_or_file_to_base64(path):
- try:
- requests.get(path)
- return encode_url_to_base64(path)
- except (requests.exceptions.MissingSchema,
- requests.exceptions.InvalidSchema):
- return encode_file_to_base64(path)
- def service_data_decoder(task, data):
- if CustomDecoder.get(task) is not None:
- return CustomDecoder[task](data)
- input_data = data.decode('utf-8')
- input_type = TASK_INPUTS[task]
- if isinstance(input_type, list):
- input_type = input_type[0]
- if input_type == InputType.IMAGE:
- return decode_base64_to_image(input_data)
- elif input_type == InputType.AUDIO:
- return decode_base64_to_binary(input_data)[0]
- elif input_type == InputType.TEXT:
- return input_data
- elif isinstance(input_type, dict):
- input_data = {}
- data = json.loads(data)
- for key, val in input_type.items():
- if val == InputType.IMAGE:
- input_data[key] = decode_base64_to_image(data[key])
- elif val == InputType.AUDIO:
- input_data[key] = decode_base64_to_binary(data[key])[0]
- elif val == InputType.TEXT:
- input_data[key] = data[key]
- else:
- return data
- return input_data
- def service_data_encoder(task, data):
- if CustomEncoder.get(task) is not None:
- return CustomEncoder[task](data)
- output_keys = TASK_OUTPUTS[task]
- result = data
- for output_key in output_keys:
- if output_key == OutputKeys.OUTPUT_IMG:
- result[OutputKeys.OUTPUT_IMG] = encode_array_to_img_base64(
- data[OutputKeys.OUTPUT_IMG][..., ::-1])
- elif output_key == OutputKeys.OUTPUT_PCM:
- result[OutputKeys.OUTPUT_PCM] = encode_pcm_to_base64(
- data[OutputKeys.OUTPUT_PCM])
- result = bytes(json.dumps(result, cls=NumpyEncoder), encoding='utf8')
- return result
|