| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251 |
- #
- # The implementation of this file is based on:
- # https://github.com/intel/neural-compressor/tree/master/neural_compressor
- #
- # Copyright (c) 2023 Intel Corporation
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """Class for ONNX model."""
- import copy
- import logging
- import os
- import sys
- from collections import deque
- from pathlib import Path
- import onnx
- import onnx.external_data_helper
- from .util import MAXIMUM_PROTOBUF, find_by_name
- logger = logging.getLogger("neural_compressor")
- # TODO: Check https://github.com/microsoft/onnxruntime/blob/main/onnxruntime/python/tools/quantization/onnx_model.py to see if we can integrate with it.
- class ONNXModel:
- """Build ONNX model."""
- def __init__(self, model, **kwargs):
- """Initialize an ONNX model.
- Args:
- model (str or ModelProto): path to onnx model or loaded ModelProto model object.
- ignore_warning (bool): ignore large model warning. Default is False.
- load_external_data (bool): load external data for large model. Default is True.
- """
- self._model = model if not isinstance(model, str) else onnx.load(model, load_external_data=False)
- self._model_path = None if not isinstance(model, str) else model
- self.check_is_large_model()
- if self._is_large_model and self._model_path is None and not kwargs.get("ignore_warning", False):
- logger.warning("Model size > 2GB. Please use model path instead of onnx model object to quantize")
- if self._is_large_model and isinstance(model, str) and kwargs.get("load_external_data", True):
- onnx.external_data_helper.load_external_data_for_model(self._model, os.path.dirname(self._model_path))
- self._config = None
- if isinstance(model, str) and os.path.exists(Path(model).parent.joinpath("config.json").as_posix()):
- from transformers import AutoConfig # noqa: PLC0415
- self._config = AutoConfig.from_pretrained(Path(model).parent.as_posix())
- self.node_name_counter = {}
- self._output_name_to_node = {}
- self._input_name_to_nodes = {}
- self._get_input_name_to_nodes(self._model.graph.node)
- self._get_output_name_to_node(self._model.graph.node)
- self._graph_info = {}
- self._get_graph_info()
- self._q_config = None
- def check_is_large_model(self):
- """Check model > 2GB."""
- init_size = 0
- for init in self._model.graph.initializer:
- # if initializer has external data location, return True
- if init.HasField("data_location") and init.data_location == onnx.TensorProto.EXTERNAL:
- self._is_large_model = True
- return
- # if raise error of initializer size > 2GB, return True
- try:
- init_bytes = init.SerializeToString()
- init_size += sys.getsizeof(init_bytes)
- except Exception as e:
- if "exceeds maximum protobuf size of 2GB" in str(e):
- self._is_large_model = True
- return
- else: # pragma: no cover
- raise e
- if init_size > MAXIMUM_PROTOBUF:
- self._is_large_model = True
- return
- self._is_large_model = False
- @property
- def is_large_model(self):
- """Check the onnx model is over 2GB."""
- return self._is_large_model
- @property
- def model_path(self):
- """Return model path."""
- return self._model_path
- @model_path.setter
- def model_path(self, path):
- """Set model path."""
- self._model_path = path
- def framework(self):
- """Return framework."""
- return "onnxruntime"
- @property
- def q_config(self):
- """Return q_config."""
- return self._q_config
- @q_config.setter
- def q_config(self, q_config):
- """Set q_config."""
- self._q_config = q_config
- @property
- def hf_config(self):
- """Return huggingface config if model is Transformer-based."""
- return self._config
- @property
- def model(self):
- """Return model itself."""
- return self._model
- @model.setter
- def model(self, model):
- """Set model itself."""
- self._model = model
- self._graph_info = {}
- self._get_graph_info()
- self._output_name_to_node = {}
- self._input_name_to_nodes = {}
- self._get_input_name_to_nodes(self._model.graph.node)
- self._get_output_name_to_node(self._model.graph.node)
- def input(self):
- """Return input of model."""
- return [i.name for i in self._model.graph.input]
- def output(self):
- """Return output of model."""
- return [i.name for i in self._model.graph.output]
- def update(self):
- """Update model info."""
- self._graph_info = {}
- self._get_graph_info()
- self._output_name_to_node = {}
- self._input_name_to_nodes = {}
- self._get_input_name_to_nodes(self._model.graph.node)
- self._get_output_name_to_node(self._model.graph.node)
- @property
- def graph_info(self):
- """Return ORT Graph Info object holding information about backend graph."""
- return self._graph_info
- def _get_graph_info(self):
- """Update graph info."""
- for node in self._model.graph.node:
- self.graph_info.update({node.name: node.op_type})
- def save(self, root):
- """Save ONNX model."""
- if os.path.split(root)[0] != "" and not os.path.exists(os.path.split(root)[0]):
- raise ValueError('"root" directory does not exists.')
- if self.is_large_model:
- onnx.external_data_helper.load_external_data_for_model(self._model, os.path.split(self._model_path)[0])
- onnx.save_model(
- self._model,
- root,
- save_as_external_data=True,
- all_tensors_to_one_file=True,
- location=root.split("/")[-1] + "_data",
- size_threshold=1024,
- convert_attribute=False,
- )
- else:
- onnx.save(self._model, root)
- if self._config is not None:
- model_type = "" if not hasattr(self._config, "model_type") else self._config.model_type
- self._config.__class__.model_type = model_type
- output_config_file = Path(root).parent.joinpath("config.json").as_posix()
- self._config.to_json_file(output_config_file, use_diff=False)
- def nodes(self):
- """Return model nodes."""
- return self._model.graph.node
- def initializer(self):
- """Return model initializer."""
- return self._model.graph.initializer
- def graph(self):
- """Return model graph."""
- return self._model.graph
- def ir_version(self):
- """Return model ir_version."""
- return self._model.ir_version
- def opset_import(self):
- """Return model opset_import."""
- return self._model.opset_import
- def remove_node(self, node):
- """Remove a node from model."""
- if node in self._model.graph.node:
- self._model.graph.node.remove(node)
- def remove_nodes(self, nodes_to_remove):
- """Remove nodes from model."""
- for node in nodes_to_remove:
- self.remove_node(node)
- def add_node(self, node):
- """Add a node to model."""
- self._model.graph.node.extend([node])
- def add_nodes(self, nodes_to_add):
- """Add nodes to model."""
- self._model.graph.node.extend(nodes_to_add)
- def add_initializer(self, tensor):
- """Add a initializer to model."""
- if find_by_name(tensor.name, self._model.graph.initializer) is None:
- self._model.graph.initializer.extend([tensor])
- def add_initializers(self, tensors):
- """Add initializers to model."""
- for tensor in tensors:
- self.add_initializer(tensor)
- def get_initializer(self, name):
- """Get an initializer by name."""
- for tensor in self._model.graph.initializer:
- if tensor.name == name:
- return tensor
- return None
- def get_initializer_share_num(self, name):
- """Get the number of shares of initializer."""
- num = 0
- if self.get_initializer(name) is None:
- return num
- for node in self.nodes():
- if name in node.input:
- num += 1
- return num
- def get_node(self, name):
- """Get a node by name."""
- for node in self._model.graph.node:
- if node.name == name:
- return node
- return None
- def remove_initializer(self, tensor):
- """Remove an initializer from model."""
- if tensor in self._model.graph.initializer:
- self._model.graph.initializer.remove(tensor)
- def remove_initializers(self, init_to_remove):
- """Remove initializers from model."""
- for initializer in init_to_remove:
- self.remove_initializer(initializer)
- def set_initializer(self, tensor, array, raw=False):
- """Update initializer."""
- old_tensor = self.get_initializer(tensor)
- self.remove_initializer(old_tensor)
- dims = old_tensor.dims
- data_type = old_tensor.data_type
- new_tensor = (
- onnx.helper.make_tensor(tensor, data_type, dims, array.flatten().tolist())
- if not raw
- else onnx.helper.make_tensor(tensor, data_type, dims, array.tostring(), raw=raw)
- )
- self.add_initializer(new_tensor)
- @property
- def input_name_to_nodes(self):
- """Return input names of nodes."""
- return self._input_name_to_nodes
- def _get_input_name_to_nodes(self, nodes):
- """Get input names of nodes."""
- for node in nodes:
- attrs = [
- attr
- for attr in node.attribute
- if attr.type == onnx.AttributeProto.GRAPH or attr.type == onnx.AttributeProto.GRAPHS
- ]
- if len(attrs) > 0:
- for attr in attrs:
- self._get_input_name_to_nodes(attr.g.node)
- for input_name in node.input:
- if len(input_name.strip()) != 0:
- if input_name not in self._input_name_to_nodes:
- self._input_name_to_nodes[input_name] = [node]
- else:
- self._input_name_to_nodes[input_name].append(node)
- @property
- def output_name_to_node(self):
- """Return output names of nodes."""
- return self._output_name_to_node
- def _get_output_name_to_node(self, nodes):
- """Get output names of nodes."""
- for node in nodes:
- attrs = [
- attr
- for attr in node.attribute
- if attr.type == onnx.AttributeProto.GRAPH or attr.type == onnx.AttributeProto.GRAPHS
- ]
- if len(attrs) > 0:
- for attr in attrs:
- self._get_output_name_to_node(attr.g.node)
- for output_name in node.output:
- if len(output_name.strip()) != 0:
- self._output_name_to_node[output_name] = node
- def get_siblings(self, node):
- """Get siblings nodes."""
- siblings = []
- for parent in self.get_parents(node):
- for child in self.get_children(parent):
- if child.name != node.name:
- siblings.append(child)
- return siblings
- def get_children(self, node, input_name_to_nodes=None):
- """Get children nodes."""
- if input_name_to_nodes is None:
- input_name_to_nodes = self._input_name_to_nodes
- children = []
- for output in node.output:
- if output in input_name_to_nodes:
- for child in input_name_to_nodes[output]:
- children.append(child) # noqa: PERF402
- return children
- def get_parents(self, node, output_name_to_node=None):
- """Get parents nodes."""
- if output_name_to_node is None:
- output_name_to_node = self._output_name_to_node
- parents = []
- for input in node.input:
- if input in output_name_to_node:
- parents.append(output_name_to_node[input])
- return parents
- def get_parent(self, node, idx, output_name_to_node=None):
- """Get parent node by idx."""
- if output_name_to_node is None:
- output_name_to_node = self._output_name_to_node
- if len(node.input) <= idx:
- return None
- input = node.input[idx]
- if input not in output_name_to_node:
- return None
- return output_name_to_node[input]
- def find_node_by_name(self, node_name, new_nodes_list, graph):
- """Find out node by name."""
- graph_nodes_list = list(graph.node) # deep copy
- graph_nodes_list.extend(new_nodes_list)
- node = find_by_name(node_name, graph_nodes_list)
- return node
- def find_nodes_by_initializer(self, graph, initializer):
- """Find all nodes with given initializer as an input."""
- nodes = []
- for node in graph.node:
- for node_input in node.input:
- if node_input == initializer.name:
- nodes.append(node)
- return nodes
- def get_scale_zero(self, tensor):
- """Help function to get scale and zero_point."""
- if not tensor.endswith("_quantized"):
- logger.debug(f"Find {tensor} in the quantized graph is not quantized.")
- return None, None
- def _searcher(tensor_name):
- """Search scale and zero point tensor recursively."""
- node = self._input_name_to_nodes[tensor_name][0]
- parent = self._output_name_to_node.get(tensor_name, None)
- direct_int8 = ["Reshape", "Transpose", "Squeeze", "Unsqueeze", "MaxPool", "Pad", "Split"]
- if parent is not None and parent.op_type in direct_int8:
- fp32_tensor_name = (
- parent.input[0]
- .replace("_quantized", "")
- .replace("_QuantizeLinear", "")
- .replace("_QuantizeInput", "")
- )
- elif node.op_type in ["Gather"]: # pragma: no cover
- fp32_tensor_name = (
- node.output[0]
- .replace("_quantized", "")
- .replace("_QuantizeLinear", "")
- .replace("_QuantizeInput", "")
- )
- else:
- fp32_tensor_name = (
- tensor_name.replace("_quantized", "").replace("_QuantizeLinear", "").replace("_QuantizeInput", "")
- )
- scale = fp32_tensor_name + "_scale"
- scale_tensor = self.get_initializer(scale)
- zo = fp32_tensor_name + "_zero_point"
- zo_tensor = self.get_initializer(zo)
- if scale_tensor is None or zo_tensor is None:
- if parent is not None:
- scale_tensor, zo_tensor = _searcher(parent.input[0])
- return scale_tensor, zo_tensor
- node = self._input_name_to_nodes[tensor][0]
- # TODO check if scale_tensor and zero_point is needed
- # for bias of qlinearconv, scale and zero_point is not needed
- if (node.op_type == "QLinearConv" and tensor == node.input[-1]) or (
- node.op_type == "QGemm" and tensor == node.input[-3]
- ):
- return None, None
- else:
- scale_tensor, zo_tensor = _searcher(tensor)
- assert scale_tensor, f"missing scale for tensor {tensor}"
- assert zo_tensor, f"missing zero point for tensor {tensor}"
- return scale_tensor, zo_tensor
- def save_model_to_file(self, output_path, use_external_data_format=False):
- """Save model to external data, which is needed for model size > 2GB."""
- if use_external_data_format:
- onnx.external_data_helper.convert_model_to_external_data(
- self._model, all_tensors_to_one_file=True, location=Path(output_path).name + ".data"
- )
- onnx.save_model(self._model, output_path)
- @staticmethod
- def replace_node_input(node, old_input_name, new_input_name):
- """Replace input of a node."""
- assert isinstance(old_input_name, str) and isinstance(new_input_name, str)
- for j in range(len(node.input)):
- if node.input[j] == old_input_name:
- node.input[j] = new_input_name
- def replace_input_of_all_nodes(self, old_input_name, new_input_name, white_optype=None, black_optype=None):
- """Replace inputs of all nodes."""
- if white_optype is None:
- white_optype = []
- if black_optype is None:
- black_optype = []
- if len(white_optype) > 0:
- for node in self.model.graph.node:
- if node.op_type in white_optype:
- ONNXModel.replace_node_input(node, old_input_name, new_input_name)
- else:
- for node in self.model.graph.node:
- if node.op_type not in black_optype:
- ONNXModel.replace_node_input(node, old_input_name, new_input_name)
- @staticmethod
- def replace_node_output(node, old_output_name, new_output_name):
- """Replace output of a node."""
- assert isinstance(old_output_name, str) and isinstance(new_output_name, str)
- for j in range(len(node.output)):
- if node.output[j] == old_output_name:
- node.output[j] = new_output_name
- def replace_output_of_all_nodes(self, old_output_name, new_output_name, white_optype=None, black_optype=None):
- """Replace outputs of all nodes."""
- if white_optype is None:
- white_optype = []
- if black_optype is None:
- black_optype = []
- if len(white_optype) > 0:
- for node in self.model.graph.node:
- if node.op_type in white_optype:
- ONNXModel.replace_node_output(node, old_output_name, new_output_name)
- else:
- for node in self.model.graph.node:
- if node.op_type not in black_optype:
- ONNXModel.replace_node_output(node, old_output_name, new_output_name)
- def remove_unused_nodes(self):
- """Remove unused nodes."""
- unused_nodes = []
- nodes = self.nodes()
- for node in nodes:
- if (
- node.op_type == "Constant"
- and node.output[0] not in self._model.graph.output
- and node.output[0] not in self._input_name_to_nodes
- ):
- unused_nodes.append(node)
- elif (
- node.op_type == "QuantizeLinear"
- and len(self.get_children(node)) == 1
- and self.get_children(node)[0].op_type == "DequantizeLinear"
- and node.input[0] not in self._output_name_to_node
- and self.get_children(node)[0].output[0] not in self._input_name_to_nodes
- ):
- unused_nodes.append(node)
- unused_nodes.extend(self.get_children(node))
- else:
- # remove the node if it does not serve as the input or output of any other nodes
- unused = True
- for output in node.output:
- if output in self._input_name_to_nodes or output in self.output():
- unused = False
- break
- for input in node.input:
- if self.get_initializer(input) is not None:
- continue
- elif input in self._output_name_to_node or input in self.input():
- unused = False
- break
- if unused:
- unused_nodes.append(node)
- self.remove_nodes(unused_nodes)
- ununsed_weights = []
- for w in self._model.graph.initializer:
- if w.name not in self._input_name_to_nodes and w.name not in self._model.graph.output:
- ununsed_weights.append(w)
- # Remove from graph.input
- for graph_input in self.graph().input:
- if graph_input.name == w.name:
- self.graph().input.remove(graph_input)
- self.remove_initializers(ununsed_weights)
- self.update()
- def topological_sort(self, enable_subgraph=False):
- """Topological sort the model."""
- if not enable_subgraph:
- input_name_to_nodes = {}
- output_name_to_node = {}
- for node in self.model.graph.node:
- for input_name in node.input:
- if len(input_name.strip()) != 0:
- if input_name not in input_name_to_nodes:
- input_name_to_nodes[input_name] = [node]
- else:
- input_name_to_nodes[input_name].append(node)
- for output_name in node.output:
- if len(output_name.strip()) != 0:
- output_name_to_node[output_name] = node
- else: # pragma: no cover
- input_name_to_nodes = self._input_name_to_nodes
- output_name_to_node = self._output_name_to_node
- all_nodes = {}
- q = deque()
- wait = deque()
- for inp in self.model.graph.input:
- q.extend(input_name_to_nodes[inp.name])
- for n in self.model.graph.node:
- if all(i not in output_name_to_node and i not in self.input() for i in n.input):
- q.append(n)
- while q:
- n = q.popleft()
- if not all(output_name_to_node[i].name in all_nodes for i in n.input if i in output_name_to_node):
- if n not in wait:
- wait.append(n)
- continue
- all_nodes[n.name] = n
- for out in n.output:
- if out in input_name_to_nodes:
- q.extend([i for i in input_name_to_nodes[out] if i.name not in all_nodes and i not in q])
- if len(q) == 0 and len(wait) != 0:
- q = copy.deepcopy(wait)
- wait.clear()
- nodes = [i[1] for i in all_nodes.items()]
- assert len(list({n.name for n in nodes})) == len(list({n.name for n in self.model.graph.node}))
- self.model.graph.ClearField("node")
- self.model.graph.node.extend(nodes)
- def get_nodes_chain(self, start, stop, result_chain=None):
- """Get nodes chain with given start node and stop node."""
- if result_chain is None:
- result_chain = []
- # process start node list
- start_node = deque()
- for node in start:
- if isinstance(node, str):
- start_node.append(node)
- elif isinstance(node, onnx.NodeProto):
- start_node.append(node.name)
- else:
- assert False, "'get_nodes_chain' function only support list[string]or list[NodeProto] params" # noqa: B011
- # process stop node list
- stop_node = []
- for node in stop:
- if isinstance(node, str):
- stop_node.append(node)
- elif isinstance(node, onnx.NodeProto):
- stop_node.append(node.name)
- else:
- assert False, "'get_nodes_chain' function only support list[string]or list[NodeProto] params" # noqa: B011
- while start_node:
- node_name = start_node.popleft()
- if node_name in stop_node:
- continue
- if node_name not in result_chain:
- result_chain.append(node_name)
- else:
- continue
- node = find_by_name(node_name, list(self.model.graph.node))
- for parent in self.get_parents(node):
- start_node.append(parent.name)
- return result_chain
- def find_split_node_for_layer_wise_quantization(self):
- """Find split node for layer wise quantization."""
- # find split nodes of decoder blocks
- # embed -> decoder.0 -(split_node)-> ... -(split_node)-> decoder.n -(split_node)-> norm -> head
- # after split: embed -> decoder.0,
- # decoder.1,
- # decoder.2,
- # ...,
- # decoder.n,
- # norm -> head
- start_nodes = []
- for node in self._model.graph.node:
- start_node, qkv_nodes_list = None, None
- if node.op_type == "SkipLayerNormalization":
- start_node = node
- qkv_nodes_list = [
- self.match_parent_path(
- start_node,
- ["MatMul", "Reshape", "Transpose", "Reshape", "MatMul"],
- [None, 0, 0, 0, 0],
- ),
- self.match_parent_path(
- start_node,
- ["Add", "MatMul", "Reshape", "Transpose", "MatMul"],
- [1, 1, 0, 0, 0],
- ),
- ]
- if node.op_type == "Add":
- start_node = node
- qkv_nodes_list = [
- # match base attention structure
- self.match_parent_path(
- start_node,
- ["Add", "MatMul", "Reshape", "Transpose", "MatMul"],
- [0, None, 0, 0, 0],
- ),
- self.match_parent_path(
- start_node, ["Add", "MatMul", "Reshape", "Transpose", "MatMul"], [1, None, 0, 0, 0]
- ),
- # match gpt attention no past structure
- self.match_parent_path(
- start_node,
- ["Reshape", "Gemm", "Reshape", "Reshape", "Transpose", "MatMul"],
- [None, 0, 0, 0, 0, 0],
- output_name_to_node=self.output_name_to_node,
- return_indice=[],
- ),
- # match bart attention structure
- self.match_parent_path(
- start_node,
- ["Add", "MatMul", "Reshape", "Transpose", "Reshape", "MatMul"],
- [0, None, 0, 0, 0, 0],
- ),
- self.match_parent_path(
- start_node,
- ["Add", "MatMul", "Reshape", "Transpose", "Reshape", "MatMul"],
- [1, None, 0, 0, 0, 0],
- ),
- self.match_parent_path(
- start_node,
- ["MatMul", "Mul", "MatMul", "Mul", "Div", "Add"],
- [None, 0, None, 0, None, 0],
- ),
- self.match_parent_path(
- start_node,
- ["MatMul", "Mul", "MatMul", "SimplifiedLayerNormalization", "Add"],
- [None, 0, None, 0, 0],
- ),
- ]
- if not start_node:
- continue
- if not any(qkv_nodes_list):
- continue
- start_nodes.append(start_node)
- return start_nodes
- def find_qkv_in_attention(self, find_all=False):
- """Find qkv MatMul in Attention.
- Args:
- find_all (bool, optional): find all qkv MatMul. Defaults to False
- Returns:
- qkv (list): qkv MatMul list
- """
- qkv = []
- for node in self._model.graph.node:
- if node.op_type == "Attention":
- qkv.append([node.name])
- continue
- start_node, qkv_nodes_list = None, None
- if node.op_type == "SkipLayerNormalization":
- start_node = node
- qkv_nodes_list = [
- self.match_parent_path(
- start_node,
- ["MatMul", "Reshape", "Transpose", "Reshape", "MatMul"],
- [None, 0, 0, 0, 0],
- ),
- self.match_parent_path(
- start_node,
- ["Add", "MatMul", "Reshape", "Transpose", "MatMul"],
- [1, 1, 0, 0, 0],
- ),
- ]
- if node.op_type == "Add":
- start_node = node
- qkv_nodes_list = [
- # match base attention structure
- self.match_parent_path(
- start_node,
- ["Add", "MatMul", "Reshape", "Transpose", "MatMul"],
- [0, None, 0, 0, 0],
- ),
- self.match_parent_path(
- start_node, ["Add", "MatMul", "Reshape", "Transpose", "MatMul"], [1, None, 0, 0, 0]
- ),
- # match gpt attention no past structure
- self.match_parent_path(
- start_node,
- ["Reshape", "Gemm", "Reshape", "Reshape", "Transpose", "MatMul"],
- [None, 0, 0, 0, 0, 0],
- output_name_to_node=self.output_name_to_node,
- return_indice=[],
- ),
- # match bart attention structure
- self.match_parent_path(
- start_node,
- ["Add", "MatMul", "Reshape", "Transpose", "Reshape", "MatMul"],
- [0, None, 0, 0, 0, 0],
- ),
- self.match_parent_path(
- start_node,
- ["Add", "MatMul", "Reshape", "Transpose", "Reshape", "MatMul"],
- [1, None, 0, 0, 0, 0],
- ),
- ]
- if not start_node:
- continue
- if not any(qkv_nodes_list):
- continue
- qkv_nodes = [qkv for qkv in qkv_nodes_list if qkv is not None][-1]
- other_inputs = []
- for input in start_node.input:
- if input not in self.output_name_to_node:
- continue
- if input == qkv_nodes[0].output[0]:
- continue
- other_inputs.append(input)
- if len(other_inputs) != 1:
- continue
- root_input = other_inputs[0]
- input_name_to_nodes = self.input_name_to_nodes
- children = input_name_to_nodes[root_input]
- children_types = [child.op_type for child in children]
- if children_types.count("MatMul") == 3:
- qkv.append([child.name for child in children if child.op_type == "MatMul"])
- if not find_all:
- break
- return qkv
- def find_ffn_matmul(self, attention_index, attention_matmul_list, block_len):
- """Find MatMul in FFN.
- Args:
- attention_index (list): index of Attention
- attention_matmul_list (list): list of Attention and MatMul nodes
- block_len (int): block length
- Returns:
- list: list of MatMul in FFN
- """
- ffn_matmul = []
- for idx in range(len(attention_index)):
- if idx != len(attention_index) - 1:
- index = attention_index[idx + 1]
- if index - 2 >= 0:
- ffn_matmul.append([attention_matmul_list[index - 2], attention_matmul_list[index - 1]])
- else:
- index = attention_index[idx]
- if index + block_len - 1 < len(attention_matmul_list):
- ffn_matmul.append(
- [attention_matmul_list[index + block_len - 2], attention_matmul_list[index + block_len - 1]]
- )
- return ffn_matmul
- def export(self, save_path, conf):
- """Export Qlinear to QDQ model."""
- from neural_compressor.config import ONNXQlinear2QDQConfig # noqa: PLC0415
- from neural_compressor.utils.export import onnx_qlinear_to_qdq # noqa: PLC0415
- if isinstance(conf, ONNXQlinear2QDQConfig):
- add_nodes, remove_nodes, inits = onnx_qlinear_to_qdq(self._model, self._input_name_to_nodes)
- self.add_nodes(add_nodes)
- self.remove_nodes(remove_nodes)
- self.add_initializers(inits)
- self.update()
- self.remove_unused_nodes()
- self.topological_sort()
- self.save(save_path)
- else:
- logger.warning("Unsupported config for export, only ONNXQlinear2QDQConfig is supported!")
- exit(0)
- def add_tensors_to_outputs(self, tensor_names):
- """Add the tensors to the model outputs to gets their values.
- Args:
- tensor_names: The names of tensors to be dumped.
- """
- added_outputs = []
- for tensor in tensor_names:
- if tensor not in self.output():
- added_tensor = onnx.helper.ValueInfoProto()
- added_tensor.name = tensor
- added_outputs.append(added_tensor)
- self._model.graph.output.extend(added_outputs) # pylint: disable=no-member
- def remove_tensors_from_outputs(self, tensor_names):
- """Remove the tensors from the model outputs.
- Args:
- tensor_names: The names of tensors to be removed.
- """
- removed_outputs = []
- for tensor in tensor_names:
- if tensor in self.output():
- removed_outputs.append(self._model.graph.output[self.output().index(tensor)])
- for output in removed_outputs:
- self._model.graph.output.remove(output)
- def match_first_parent(self, node, parent_op_type, output_name_to_node, exclude=None):
- """Find parent node based on constraints on op_type.
- Args:
- node (str): current node name.
- parent_op_type (str): constraint of parent node op_type.
- output_name_to_node (dict): dictionary with output name as key, and node as value.
- exclude (list): list of nodes that are excluded (not allowed to match as parent).
- Returns:
- parent: The matched parent node. None if not found.
- index: The input index of matched parent node. None if not found.
- """
- if exclude is None:
- exclude = []
- for i, input in enumerate(node.input):
- if input in output_name_to_node:
- parent = output_name_to_node[input]
- if parent.op_type == parent_op_type and parent not in exclude:
- return parent, i
- return None, None
- def match_parent(
- self,
- node,
- parent_op_type,
- input_index=None,
- output_name_to_node=None,
- exclude=None,
- return_indice=None,
- ):
- """Find parent node based on constraints on op_type and index.
- Args:
- node (str): current node name.
- parent_op_type (str): constraint of parent node op_type.
- input_index (int or None): only check the parent given input index of current node.
- output_name_to_node (dict): dictionary with output name as key, and node as value.
- exclude (list): list of nodes that are excluded (not allowed to match as parent).
- return_indice (list): a list to append the input index when input_index is None.
- Returns:
- parent: The matched parent node.
- """
- assert node is not None
- assert input_index is None or input_index >= 0
- if exclude is None:
- exclude = []
- if output_name_to_node is None:
- output_name_to_node = self._output_name_to_node
- if input_index is None:
- parent, index = self.match_first_parent(node, parent_op_type, output_name_to_node, exclude)
- if return_indice is not None:
- return_indice.append(index)
- return parent
- if input_index >= len(node.input):
- return None
- parent = self.get_parent(node, input_index, output_name_to_node)
- if parent is not None and parent.op_type == parent_op_type and parent not in exclude:
- return parent
- return None
- def match_parent_path(
- self,
- node,
- parent_op_types,
- parent_input_index,
- output_name_to_node=None,
- return_indice=None,
- ):
- """Find a sequence of input edges based on constraints on parent op_type and index.
- Args:
- node (str): current node name.
- parent_op_types (str): constraint of parent node op_type of each input edge.
- parent_input_index (list): constraint of input index of each input edge.
- None means no constraint.
- output_name_to_node (dict): dictionary with output name as key, and node as value.
- return_indice (list): a list to append the input index when there is
- no constraint on input index of an edge.
- Returns:
- parents: a list of matched parent node.
- """
- assert len(parent_input_index) == len(parent_op_types)
- if output_name_to_node is None:
- output_name_to_node = self._output_name_to_node
- current_node = node
- matched_parents = []
- for i, op_type in enumerate(parent_op_types):
- matched_parent = self.match_parent(
- current_node,
- op_type,
- parent_input_index[i],
- output_name_to_node,
- exclude=[],
- return_indice=return_indice,
- )
- if matched_parent is None:
- return None
- matched_parents.append(matched_parent)
- current_node = matched_parent
- return matched_parents
- def is_smoothquant_model(self):
- """Check the model is smooth quantized or not.
- Returns:
- bool: the model is smooth quantized or not.
- """
- for init in self.model.graph.initializer: # noqa: SIM110
- if "_smooth_scale" in init.name:
- return True
- return False
- def find_split_nodes(self):
- """Find split nodes for layer-wise quantization."""
- split_nodes = self.find_split_node_for_layer_wise_quantization()
- return split_nodes
- def split_model_with_node(
- self, split_node_name, path_of_model_to_split, shape_infer=True, save_both_split_models=True
- ):
- """Split model into two parts at a given node.
- Args:
- split_node_name (str): name of the node where the model is split at>
- path_of_model_to_split (str): path of model to be split.
- shape_infer (bool): do shape inference. Default is True.
- save_both_split_models (bool): whether to save the two split models.
- False means only save the first split model.
- True means save both the two split models.
- Default id True.
- Returns:
- tuple: the first split model, the second split model
- """
- # origin model : ... -> node_1 -> split_node -> node_2 -> ...
- # split model 1: ... -> node_1 -> split_node
- # split model 2: node_2 -> ...
- split_model_part_1 = onnx.ModelProto()
- split_model_part_1.CopyFrom(self._model)
- split_model_part_1.graph.ClearField("node")
- split_model_part_2 = onnx.ModelProto()
- split_model_part_2.CopyFrom(self._model)
- split_model_part_2.graph.ClearField("node")
- split_node_output = None
- part_idx = 1
- for node in self._model.graph.node:
- if part_idx == 1:
- split_model_part_1.graph.node.append(node)
- elif part_idx == 2:
- split_model_part_2.graph.node.append(node)
- if node.name == split_node_name:
- split_node_output = node.output
- part_idx = 2
- assert len(split_node_output) == 1, (
- f"Only support split at node with 1 output tensor, while current split node {split_node_name} has {len(split_node_output)} output tensors"
- )
- split_tensor_name = split_node_output[0]
- # infer shape of the model to be split
- if shape_infer:
- try:
- from neural_compressor.adaptor.ox_utils.util import infer_shapes # noqa: PLC0415
- self._model = infer_shapes(self._model, auto_merge=True, base_dir=os.path.dirname(self._model_path))
- except Exception as e: # pragma: no cover
- logger.error(
- "Shape infer fails for layer-wise quantization. "
- "We would recommend checking the graph optimization level of your model "
- "and setting it to 'DISABLE_ALL' or 'ENABLE_BASIC', "
- "as this may help avoid this error."
- )
- raise e
- split_tensor_type, split_tensor_shape = self._get_output_type_shape_by_tensor_name(split_tensor_name)
- split_tensor = onnx.helper.make_tensor_value_info(split_tensor_name, split_tensor_type, split_tensor_shape)
- split_model_part_1 = ONNXModel(split_model_part_1, ignore_warning=True)
- split_model_part_2 = ONNXModel(split_model_part_2, ignore_warning=True)
- # remove unused input & output
- split_model_part_1._remove_unused_input_output()
- split_model_part_2._remove_unused_input_output()
- split_model_part_1.model.graph.output.append(split_tensor)
- split_model_part_2.model.graph.input.append(split_tensor)
- insert_output_for_model_1 = []
- insert_input_for_model_2 = []
- for output in split_model_part_1.output_name_to_node:
- if output in split_model_part_2.input_name_to_nodes:
- output_type, output_shape = self._get_output_type_shape_by_tensor_name(output)
- output_tensor = onnx.helper.make_tensor_value_info(output, output_type, output_shape)
- if output_tensor not in split_model_part_1.model.graph.output:
- insert_output_for_model_1.append(output_tensor)
- if output_tensor not in split_model_part_2.model.graph.input:
- insert_input_for_model_2.append(output_tensor)
- # insert model 1 output
- for output in insert_output_for_model_1:
- split_model_part_1.model.graph.output.append(output)
- # insert model 2 input
- for input in insert_input_for_model_2:
- split_model_part_2.model.graph.input.append(input)
- # remove unused init
- split_model_part_1.remove_unused_init()
- split_model_part_2.remove_unused_init()
- split_model_part_1.update()
- split_model_part_2.update()
- dir_of_model_to_split = os.path.dirname(path_of_model_to_split)
- split_model_part_1.load_model_initializer_by_tensor(dir_of_model_to_split)
- split_model_part_1_path = os.path.join(dir_of_model_to_split, "split_model_part_1.onnx")
- split_model_part_1.model_path = split_model_part_1_path
- split_model_part_1._save_split_model(split_model_part_1_path)
- split_model_part_1.check_is_large_model()
- logger.debug(f"save split model part 1 to {split_model_part_1_path} for layer wise quantization")
- if save_both_split_models:
- split_model_part_2.load_model_initializer_by_tensor(dir_of_model_to_split)
- split_model_part_2_path = os.path.join(dir_of_model_to_split, "split_model_part_2.onnx")
- split_model_part_2.model_path = split_model_part_2_path
- split_model_part_2._save_split_model(split_model_part_2_path)
- split_model_part_2.check_is_large_model()
- logger.debug(f"save split model part 2 to {split_model_part_2_path} for layer wise quantization")
- return split_model_part_1, split_model_part_2
- else:
- return split_model_part_1, split_model_part_2
- def _save_split_model(self, save_path):
- """Save split model as external data for layer wise quantization.
- Args:
- save_path (str): the path to save the split model
- """
- if os.path.exists(save_path + "_data"):
- os.remove(save_path + "_data")
- onnx.save_model(
- self._model,
- save_path,
- save_as_external_data=True,
- all_tensors_to_one_file=True,
- location=save_path.split("/")[-1] + "_data",
- size_threshold=1024,
- convert_attribute=False,
- )
- def _get_output_type_shape_by_tensor_name(self, tensor_name):
- """Get output type and shape with a tensor name.
- Args:
- tensor_name (str): name of a tensor
- Returns:
- tuple: output type and shape
- """
- elem_type = onnx.TensorProto.FLOAT
- shape = None
- for output in self._model.graph.value_info:
- if output.name == tensor_name:
- elem_type = output.type.tensor_type.elem_type
- shape = [
- dim.dim_value if dim.HasField("dim_value") else -1 for dim in output.type.tensor_type.shape.dim
- ]
- break
- return elem_type, shape
- def _remove_unused_input_output(self):
- """Remove unused input & output for split model."""
- remove_outputs = []
- remove_inputs = []
- for output in self._model.graph.output:
- if output.name not in self.output_name_to_node:
- remove_outputs.append(output)
- for input in self._model.graph.input:
- if input.name not in self.input_name_to_nodes:
- remove_inputs.append(input)
- for output in remove_outputs:
- self._model.graph.output.remove(output)
- for input in remove_inputs:
- self._model.graph.input.remove(input)
- def remove_unused_init(self):
- """Remove unused init."""
- remov_inits = []
- for init in self._model.graph.initializer:
- if init.name not in self.input_name_to_nodes:
- remov_inits.append(init)
- self.remove_initializers(remov_inits)
- def load_model_initializer_by_tensor(self, data_path=None):
- """Load model initializer by tensor.
- Args:
- data_path (str, optional): the directory of saved initializer. Defaults to None.
- """
- if data_path is None:
- data_path = os.path.dirname(self._model_path)
- for init in self._model.graph.initializer:
- if init.HasField("data_location") and init.data_location == onnx.TensorProto.EXTERNAL:
- onnx.external_data_helper.load_external_data_for_tensor(init, data_path)
- def write_external_data_to_new_location(self, external_data_location="external.data", overwrite=False):
- """Write external data of merged quantized model to new location to save memory.
- Args:
- external_data_location (str, optional): external data location of merged quantized model.
- Defaults to "external.data".
- overwrite (bool, optional): if True, remove existed externa data. Defaults to False.
- """
- if overwrite and os.path.exists(os.path.join(os.path.dirname(self._model_path), external_data_location)):
- os.remove(os.path.join(os.path.dirname(self._model_path), external_data_location))
- self.load_model_initializer_by_tensor()
- onnx.external_data_helper.convert_model_to_external_data(self._model, location=external_data_location)
- # TODO : if init is already saved, skip write it
- onnx.external_data_helper.write_external_data_tensors(self._model, filepath=os.path.dirname(self._model_path))
- def merge_split_models(self, to_merge_model):
- """Merge two split model into final model."""
- to_merge_model.write_external_data_to_new_location()
- self.add_nodes(list(to_merge_model.nodes()))
- self.add_initializers(list(to_merge_model.initializer()))
- self.update()
- # add new output
- for output in to_merge_model.graph().output:
- if output.name not in self.output():
- self._model.graph.output.append(output)
- # remove unused output
- remove_output = []
- for output in self._model.graph.output:
- if output.name in to_merge_model.input():
- remove_output.append(output)
- for output in remove_output:
- self._model.graph.output.remove(output)
- # add new input
- for input in to_merge_model.graph().input:
- if (
- input.name not in self.input()
- and input.name not in self.output()
- and input.name not in self.output_name_to_node
- ):
- self._model.graph.input.append(input)
- def re_org_output(self, origin_output):
- """Re-org output of merged model for layer-wise quantization."""
- outputs = {}
- tmp_remove = []
- for output in self._model.graph.output:
- outputs[output.name] = output
- tmp_remove.append(output)
- for output in tmp_remove:
- self._model.graph.output.remove(output)
- for out_name in origin_output:
- self._model.graph.output.append(outputs[out_name])
|