| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- # Copyright (c) Alibaba, Inc. and its affiliates.
- from typing import Any, Dict, List, Tuple, Union
- import torch
- from modelscope.metainfo import Preprocessors
- from modelscope.preprocessors import Preprocessor
- from modelscope.preprocessors.builder import PREPROCESSORS
- from modelscope.preprocessors.nlp.text_classification_preprocessor import \
- TextClassificationPreprocessorBase
- from modelscope.preprocessors.nlp.token_classification_preprocessor import (
- NLPTokenizerForLSTM, TokenClassificationPreprocessorBase)
- from modelscope.preprocessors.nlp.transformers_tokenizer import NLPTokenizer
- from modelscope.utils.constant import Fields, ModeKeys
- from modelscope.utils.hub import get_model_type, parse_label_mapping
- from modelscope.utils.logger import get_logger
- logger = get_logger()
- @PREPROCESSORS.register_module(
- Fields.audio, module_name=Preprocessors.sen_cls_tokenizer)
- class SpeakerDiarizationDialogueDetectionPreprocessor(
- TextClassificationPreprocessorBase):
- def _tokenize_text(self, sequence1, sequence2=None, **kwargs):
- if 'return_tensors' not in kwargs:
- kwargs[
- 'return_tensors'] = 'pt' if self.mode == ModeKeys.INFERENCE else None
- return self.nlp_tokenizer(sequence1, sequence2, **kwargs)
- def __init__(self,
- model_dir=None,
- first_sequence: str = None,
- second_sequence: str = None,
- label: Union[str, List] = 'label',
- label2id: Dict = None,
- mode: str = ModeKeys.INFERENCE,
- max_length: int = None,
- use_fast: bool = None,
- keep_original_columns=None,
- **kwargs):
- kwargs['truncation'] = kwargs.get('truncation', True)
- kwargs['padding'] = kwargs.get('padding', 'max_length')
- kwargs[
- 'max_length'] = max_length if max_length is not None else kwargs.get(
- 'sequence_length', 128)
- kwargs.pop('sequence_length', None)
- model_type = None
- if model_dir is not None:
- model_type = get_model_type(model_dir)
- self.nlp_tokenizer = NLPTokenizer(
- model_dir, model_type, use_fast=use_fast, tokenize_kwargs=kwargs)
- super().__init__(model_dir, first_sequence, second_sequence, label,
- label2id, mode, keep_original_columns)
- @PREPROCESSORS.register_module(
- Fields.audio, module_name=Preprocessors.token_cls_tokenizer)
- class SpeakerDiarizationSemanticSpeakerTurnDetectionPreprocessor(
- TokenClassificationPreprocessorBase):
- def __init__(self,
- model_dir: str = None,
- first_sequence: str = 'text',
- label: str = 'label',
- label2id: Dict = None,
- label_all_tokens: bool = False,
- mode: str = ModeKeys.INFERENCE,
- max_length=None,
- use_fast=None,
- keep_original_columns=None,
- return_text=True,
- **kwargs):
- super().__init__(model_dir, first_sequence, label, label2id,
- label_all_tokens, mode, keep_original_columns,
- return_text)
- model_type = None
- if model_dir is not None:
- model_type = get_model_type(model_dir)
- kwargs['truncation'] = kwargs.get('truncation', True)
- kwargs['padding'] = kwargs.get('padding', 'max_length')
- kwargs[
- 'max_length'] = max_length if max_length is not None else kwargs.get(
- 'sequence_length', 128)
- kwargs.pop('sequence_length', None)
- kwargs['add_special_tokens'] = model_type != 'lstm'
- self.nlp_tokenizer = NLPTokenizerForLSTM(
- model_dir=model_dir,
- model_type=model_type,
- use_fast=use_fast,
- tokenize_kwargs=kwargs)
- def _tokenize_text(self, text: Union[str, List[str]], **kwargs):
- tokens = text
- if self.mode != ModeKeys.INFERENCE:
- assert isinstance(tokens, list), 'Input needs to be lists in training and evaluating,' \
- 'because the length of the words and the labels need to be equal.'
- is_split_into_words = self.nlp_tokenizer.get_tokenizer_kwarg(
- 'is_split_into_words', False)
- if is_split_into_words:
- # for supporting prompt seperator, should split twice. [SEP] for default.
- sep_idx = tokens.find('[SEP]')
- if sep_idx == -1 or self.is_lstm_model:
- tokens = list(tokens)
- else:
- tmp_tokens = []
- tmp_tokens.extend(list(tokens[:sep_idx]))
- tmp_tokens.append('[SEP]')
- tmp_tokens.extend(list(tokens[sep_idx + 5:]))
- tokens = tmp_tokens
- if is_split_into_words and self.mode == ModeKeys.INFERENCE:
- encodings, word_ids = self._tokenize_text_by_words(
- tokens, **kwargs)
- elif self.nlp_tokenizer.tokenizer.is_fast:
- encodings, word_ids = self._tokenize_text_with_fast_tokenizer(
- tokens, **kwargs)
- else:
- encodings, word_ids = self._tokenize_text_with_slow_tokenizer(
- tokens, **kwargs)
- sep_idx = -1
- for idx, token_id in enumerate(encodings['input_ids']):
- if token_id == self.nlp_tokenizer.tokenizer.sep_token_id:
- sep_idx = idx
- break
- if sep_idx != -1:
- for i in range(sep_idx, len(encodings['label_mask'])):
- encodings['label_mask'][i] = False
- if self.mode == ModeKeys.INFERENCE:
- for key in encodings.keys():
- encodings[key] = torch.tensor(encodings[key]).unsqueeze(0)
- else:
- encodings.pop('offset_mapping', None)
- return encodings, word_ids
- def _tokenize_text_by_words(self, tokens, **kwargs):
- input_ids = []
- label_mask = []
- offset_mapping = []
- attention_mask = []
- for offset, token in enumerate(tokens):
- subtoken_ids = self.nlp_tokenizer.tokenizer.encode(
- token, add_special_tokens=False)
- if len(subtoken_ids) == 0:
- subtoken_ids = [self.nlp_tokenizer.tokenizer.unk_token_id]
- input_ids.extend(subtoken_ids)
- attention_mask.extend([1] * len(subtoken_ids))
- label_mask.extend([True] + [False] * (len(subtoken_ids) - 1))
- offset_mapping.extend([(offset, offset + 1)])
- padding = kwargs.get('padding',
- self.nlp_tokenizer.get_tokenizer_kwarg('padding'))
- max_length = kwargs.get(
- 'max_length',
- kwargs.get('sequence_length',
- self.nlp_tokenizer.get_tokenizer_kwarg('max_length')))
- special_token = 1 if self.nlp_tokenizer.get_tokenizer_kwarg(
- 'add_special_tokens') else 0
- if len(label_mask) > max_length - 2 * special_token:
- label_mask = label_mask[:(max_length - 2 * special_token)]
- input_ids = input_ids[:(max_length - 2 * special_token)]
- offset_mapping = offset_mapping[:sum(label_mask)]
- if padding == 'max_length':
- label_mask = [False] * special_token + label_mask + \
- [False] * (max_length - len(label_mask) - special_token)
- offset_mapping = offset_mapping + [(0, 0)] * (
- max_length - len(offset_mapping))
- input_ids = [self.nlp_tokenizer.tokenizer.cls_token_id] * special_token + input_ids + \
- [self.nlp_tokenizer.tokenizer.sep_token_id] * special_token + \
- [self.nlp_tokenizer.tokenizer.pad_token_id] * (max_length - len(input_ids) - 2 * special_token)
- attention_mask = attention_mask + [1] * (
- special_token * 2) + [0] * (
- max_length - len(attention_mask) - 2 * special_token)
- else:
- label_mask = [False] * special_token + label_mask + \
- [False] * special_token
- input_ids = [self.nlp_tokenizer.tokenizer.cls_token_id] * special_token + input_ids + \
- [self.nlp_tokenizer.tokenizer.sep_token_id] * special_token
- attention_mask = attention_mask + [1] * (special_token * 2)
- encodings = {
- 'input_ids': input_ids,
- 'attention_mask': attention_mask,
- 'label_mask': label_mask,
- 'offset_mapping': offset_mapping,
- }
- return encodings, None
- def _tokenize_text_with_fast_tokenizer(self, tokens, **kwargs):
- is_split_into_words = isinstance(tokens, list)
- encodings = self.nlp_tokenizer(
- tokens,
- return_offsets_mapping=True,
- is_split_into_words=is_split_into_words,
- **kwargs)
- label_mask = []
- word_ids = encodings.word_ids()
- offset_mapping = []
- for i in range(len(word_ids)):
- if word_ids[i] is None:
- label_mask.append(False)
- elif word_ids[i] == word_ids[i - 1]:
- label_mask.append(False)
- if not is_split_into_words:
- offset_mapping[-1] = (offset_mapping[-1][0],
- encodings['offset_mapping'][i][1])
- else:
- label_mask.append(True)
- if is_split_into_words:
- offset_mapping.append((word_ids[i], word_ids[i] + 1))
- else:
- offset_mapping.append(encodings['offset_mapping'][i])
- padding = self.nlp_tokenizer.get_tokenizer_kwarg('padding')
- if padding == 'max_length':
- offset_mapping = offset_mapping + [(0, 0)] * (
- len(label_mask) - len(offset_mapping))
- encodings['offset_mapping'] = offset_mapping
- encodings['label_mask'] = label_mask
- return encodings, word_ids
- def _tokenize_text_with_slow_tokenizer(self, tokens, **kwargs):
- assert self.mode == ModeKeys.INFERENCE and isinstance(tokens, str), \
- 'Slow tokenizer now only support str input in inference mode. If you are training models, ' \
- 'please consider using the fast tokenizer.'
- word_ids = None
- encodings = self.nlp_tokenizer(
- tokens, is_split_into_words=False, **kwargs)
- tokenizer_name = self.nlp_tokenizer.get_tokenizer_class()
- method = 'get_label_mask_and_offset_mapping_' + tokenizer_name
- if not hasattr(self, method):
- raise RuntimeError(
- f'No `{method}` method defined for '
- f'tokenizer {tokenizer_name}, please use a fast tokenizer instead, or '
- f'try to implement a `{method}` method')
- label_mask, offset_mapping = getattr(self, method)(tokens)
- padding = kwargs.get('padding',
- self.nlp_tokenizer.get_tokenizer_kwarg('padding'))
- max_length = kwargs.get(
- 'max_length', self.nlp_tokenizer.get_tokenizer_kwarg('max_length'))
- special_token = 1 if kwargs.get(
- 'add_special_tokens',
- self.nlp_tokenizer.get_tokenizer_kwarg(
- 'add_special_tokens')) else 0
- if len(label_mask) > max_length - 2 * special_token:
- label_mask = label_mask[:(max_length - 2 * special_token)]
- offset_mapping = offset_mapping[:sum(label_mask)]
- if padding == 'max_length':
- label_mask = [False] * special_token + label_mask + \
- [False] * (max_length - len(label_mask) - special_token)
- offset_mapping = offset_mapping + [(0, 0)] * (
- max_length - len(offset_mapping))
- else:
- label_mask = [False] * special_token + label_mask + \
- [False] * special_token
- encodings['offset_mapping'] = offset_mapping
- encodings['label_mask'] = label_mask
- return encodings, word_ids
- def get_label_mask_and_offset_mapping_BertTokenizer(self, text):
- label_mask = []
- offset_mapping = []
- tokens = self.nlp_tokenizer.tokenizer.tokenize(text)
- offset = 0
- for token in tokens:
- is_start = (token[:2] != '##')
- if is_start:
- label_mask.append(True)
- else:
- token = token[2:]
- label_mask.append(False)
- start = offset + text[offset:].index(token)
- end = start + len(token)
- if is_start:
- offset_mapping.append((start, end))
- else:
- offset_mapping[-1] = (offset_mapping[-1][0], end)
- offset = end
- return label_mask, offset_mapping
- def get_label_mask_and_offset_mapping_XLMRobertaTokenizer(self, text):
- label_mask = []
- offset_mapping = []
- tokens = self.nlp_tokenizer.tokenizer.tokenize(text)
- offset = 0
- last_is_blank = False
- for token in tokens:
- is_start = (token[0] == '_')
- if is_start:
- token = token[1:]
- label_mask.append(True)
- if len(token) == 0:
- last_is_blank = True
- continue
- else:
- label_mask.append(False)
- start = offset + text[offset:].index(token)
- end = start + len(token)
- if last_is_blank or is_start:
- offset_mapping.append((start, end))
- else:
- offset_mapping[-1] = (offset_mapping[-1][0], end)
- offset = end
- last_is_blank = False
- return label_mask, offset_mapping
|