| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- # Copyright (c) Alibaba, Inc. and its affiliates.
- import ast
- import io
- from typing import Any, Dict, List, Union
- import numpy as np
- import soundfile as sf
- import torch
- import torchaudio
- from modelscope.fileio import File
- from modelscope.metainfo import Pipelines
- from modelscope.outputs import OutputKeys
- from modelscope.pipelines import pipeline
- from modelscope.pipelines.base import InputModel, Pipeline
- from modelscope.pipelines.builder import PIPELINES
- from modelscope.utils.constant import Tasks
- from modelscope.utils.logger import get_logger
- logger = get_logger()
- __all__ = ['SegmentationClusteringPipeline']
- @PIPELINES.register_module(
- Tasks.speaker_diarization, module_name=Pipelines.segmentation_clustering)
- class SegmentationClusteringPipeline(Pipeline):
- """Segmentation and Clustering Pipeline
- use `model` to create a Segmentation and Clustering Pipeline.
- Args:
- model (SegmentationClusteringPipeline): A model instance, or a model local dir, or a model id in the model hub.
- kwargs (dict, `optional`):
- Extra kwargs passed into the pipeline's constructor.
- Example:
- >>> from modelscope.pipelines import pipeline
- >>> from modelscope.utils.constant import Tasks
- >>> p = pipeline(
- >>> task=Tasks.speaker_diarization, model='damo/speech_campplus_speaker-diarization_common')
- >>> print(p(audio))
- """
- def __init__(self, model: InputModel, **kwargs):
- """use `model` to create a speaker diarization pipeline for prediction
- Args:
- model (str): a valid official model id
- """
- super().__init__(model=model, **kwargs)
- self.config = self.model.other_config
- config = {
- 'seg_dur': 1.5,
- 'seg_shift': 0.75,
- }
- self.config.update(config)
- self.fs = self.config['sample_rate']
- self.sv_pipeline = pipeline(
- task='speaker-verification', model=self.config['speaker_model'])
- def __call__(self, audio: Union[str, np.ndarray, list],
- **params) -> Dict[str, Any]:
- """ extract the speaker embeddings of input audio and do cluster
- Args:
- audio (str, np.ndarray, list): If it is represented as a str or a np.ndarray, it
- should be a complete speech signal and requires VAD preprocessing. If the audio
- is represented as a list, it should contain only the effective speech segments
- obtained through VAD preprocessing. The list should be formatted as [[0(s),3.2,
- np.ndarray], [5.3,9.1, np.ndarray], ...]. Each element is a sublist that contains
- the start time, end time, and the numpy array of the speech segment respectively.
- """
- self.config.update(params)
- # vad
- logger.info('Doing VAD...')
- vad_segments = self.preprocess(audio)
- # check input data
- self.check_audio_list(vad_segments)
- # segmentation
- logger.info('Doing segmentation...')
- segments = self.chunk(vad_segments)
- # embedding
- logger.info('Extracting embeddings...')
- embeddings = self.forward(segments)
- # clustering
- logger.info('Clustering...')
- labels = self.clustering(embeddings)
- # post processing
- logger.info('Post processing...')
- output = self.postprocess(segments, vad_segments, labels, embeddings)
- return {OutputKeys.TEXT: output}
- def forward(self, input: list) -> np.ndarray:
- embeddings = []
- for s in input:
- save_dict = self.sv_pipeline([s[2]], output_emb=True)
- if save_dict['embs'].shape == (1, 192):
- embeddings.append(save_dict['embs'])
- embeddings = np.concatenate(embeddings)
- return embeddings
- def clustering(self, embeddings: np.ndarray) -> np.ndarray:
- labels = self.model(embeddings, **self.config)
- return labels
- def postprocess(self, segments: list, vad_segments: list,
- labels: np.ndarray, embeddings: np.ndarray) -> list:
- assert len(segments) == len(labels)
- labels = self.correct_labels(labels)
- distribute_res = []
- for i in range(len(segments)):
- distribute_res.append([segments[i][0], segments[i][1], labels[i]])
- # merge the same speakers chronologically
- distribute_res = self.merge_seque(distribute_res)
- # acquire speaker center
- spk_embs = []
- for i in range(labels.max() + 1):
- spk_emb = embeddings[labels == i].mean(0)
- spk_embs.append(spk_emb)
- spk_embs = np.stack(spk_embs)
- def is_overlapped(t1, t2):
- if t1 > t2 + 1e-4:
- return True
- return False
- # distribute the overlap region
- for i in range(1, len(distribute_res)):
- if is_overlapped(distribute_res[i - 1][1], distribute_res[i][0]):
- p = (distribute_res[i][0] + distribute_res[i - 1][1]) / 2
- if 'change_locator' in self.config:
- if not hasattr(self, 'change_locator_pipeline'):
- self.change_locator_pipeline = pipeline(
- task=Tasks.speaker_diarization,
- model=self.config['change_locator'])
- short_utt_st = max(p - 1.5, distribute_res[i - 1][0])
- short_utt_ed = min(p + 1.5, distribute_res[i][1])
- if short_utt_ed - short_utt_st > 1:
- audio_data = self.cut_audio(short_utt_st, short_utt_ed,
- vad_segments)
- spk1 = distribute_res[i - 1][2]
- spk2 = distribute_res[i][2]
- _, ct = self.change_locator_pipeline(
- audio_data, [spk_embs[spk1], spk_embs[spk2]],
- output_res=True)
- if ct is not None:
- p = short_utt_st + ct
- distribute_res[i][0] = p
- distribute_res[i - 1][1] = p
- # smooth the result
- distribute_res = self.smooth(distribute_res)
- return distribute_res
- def preprocess(self, audio: Union[str, np.ndarray, list]) -> list:
- if isinstance(audio, list):
- audio.sort(key=lambda x: x[0])
- return audio
- elif isinstance(audio, str):
- file_bytes = File.read(audio)
- audio, fs = sf.read(io.BytesIO(file_bytes), dtype='float32')
- if len(audio.shape) == 2:
- audio = audio[:, 0]
- if fs != self.fs:
- logger.info(
- f'[WARNING]: The sample rate of audio is not {self.fs}, resample it.'
- )
- audio, fs = torchaudio.sox_effects.apply_effects_tensor(
- torch.from_numpy(audio).unsqueeze(0),
- fs,
- effects=[['rate', str(self.fs)]])
- audio = audio.squeeze(0).numpy()
- assert len(audio.shape) == 1, 'modelscope error: Wrong audio format.'
- if audio.dtype in ['int16', 'int32', 'int64']:
- audio = (audio / (1 << 15)).astype('float32')
- else:
- audio = audio.astype('float32')
- if not hasattr(self, 'vad_pipeline'):
- self.vad_pipeline = pipeline(
- task=Tasks.voice_activity_detection,
- model=self.config['vad_model'],
- model_revision='v2.0.2')
- vad_time = self.vad_pipeline(
- audio, fs=self.fs, is_final=True)[0]['value']
- vad_segments = []
- if isinstance(vad_time, str):
- vad_time_list = ast.literal_eval(vad_time)
- elif isinstance(vad_time, list):
- vad_time_list = vad_time
- else:
- raise ValueError('Incorrect vad result. Get %s' % (type(vad_time)))
- for t in vad_time_list:
- st = int(t[0]) / 1000
- ed = int(t[1]) / 1000
- vad_segments.append(
- [st, ed, audio[int(st * self.fs):int(ed * self.fs)]])
- return vad_segments
- def check_audio_list(self, audio: list):
- audio_dur = 0
- for i in range(len(audio)):
- seg = audio[i]
- assert seg[1] >= seg[0], 'modelscope error: Wrong time stamps.'
- assert isinstance(seg[2],
- np.ndarray), 'modelscope error: Wrong data type.'
- assert int(seg[1] * self.fs) - int(
- seg[0] * self.fs
- ) == seg[2].shape[
- 0], 'modelscope error: audio data in list is inconsistent with time length.'
- if i > 0:
- assert seg[0] >= audio[
- i - 1][1], 'modelscope error: Wrong time stamps.'
- audio_dur += seg[1] - seg[0]
- assert audio_dur > 5, 'modelscope error: The effective audio duration is too short.'
- def chunk(self, vad_segments: list) -> list:
- def seg_chunk(seg_data):
- seg_st = seg_data[0]
- data = seg_data[2]
- chunk_len = int(self.config['seg_dur'] * self.fs)
- chunk_shift = int(self.config['seg_shift'] * self.fs)
- last_chunk_ed = 0
- seg_res = []
- for chunk_st in range(0, data.shape[0], chunk_shift):
- chunk_ed = min(chunk_st + chunk_len, data.shape[0])
- if chunk_ed <= last_chunk_ed:
- break
- last_chunk_ed = chunk_ed
- chunk_st = max(0, chunk_ed - chunk_len)
- chunk_data = data[chunk_st:chunk_ed]
- if chunk_data.shape[0] < chunk_len:
- chunk_data = np.pad(chunk_data,
- (0, chunk_len - chunk_data.shape[0]),
- 'constant')
- seg_res.append([
- chunk_st / self.fs + seg_st, chunk_ed / self.fs + seg_st,
- chunk_data
- ])
- return seg_res
- segs = []
- for i, s in enumerate(vad_segments):
- segs.extend(seg_chunk(s))
- return segs
- def cut_audio(self, cut_st: float, cut_ed: float,
- audio: Union[np.ndarray, list]) -> np.ndarray:
- # collect audio data given the start and end time.
- if isinstance(audio, np.ndarray):
- return audio[int(cut_st * self.fs):int(cut_ed * self.fs)]
- elif isinstance(audio, list):
- for i in range(len(audio)):
- if i == 0:
- if cut_st < audio[i][1]:
- st_i = i
- else:
- if cut_st >= audio[i - 1][1] and cut_st < audio[i][1]:
- st_i = i
- if i == len(audio) - 1:
- if cut_ed > audio[i][0]:
- ed_i = i
- else:
- if cut_ed > audio[i][0] and cut_ed <= audio[i + 1][0]:
- ed_i = i
- audio_segs = audio[st_i:ed_i + 1]
- cut_data = []
- for i in range(len(audio_segs)):
- s_st, s_ed, data = audio_segs[i]
- cut_data.append(
- data[int((max(cut_st, s_st) - s_st)
- * self.fs):int((min(cut_ed, s_ed) - s_st)
- * self.fs)])
- cut_data = np.concatenate(cut_data)
- return cut_data
- else:
- raise ValueError('modelscope error: Wrong audio format.')
- def correct_labels(self, labels):
- labels_id = 0
- id2id = {}
- new_labels = []
- for i in labels:
- if i not in id2id:
- id2id[i] = labels_id
- labels_id += 1
- new_labels.append(id2id[i])
- return np.array(new_labels)
- def merge_seque(self, distribute_res):
- res = [distribute_res[0]]
- for i in range(1, len(distribute_res)):
- if distribute_res[i][2] != res[-1][2] or distribute_res[i][
- 0] > res[-1][1]:
- res.append(distribute_res[i])
- else:
- res[-1][1] = distribute_res[i][1]
- return res
- def smooth(self, res, mindur=1):
- # short segments are assigned to nearest speakers.
- for i in range(len(res)):
- res[i][0] = round(res[i][0], 2)
- res[i][1] = round(res[i][1], 2)
- if res[i][1] - res[i][0] < mindur:
- if i == 0:
- res[i][2] = res[i + 1][2]
- elif i == len(res) - 1:
- res[i][2] = res[i - 1][2]
- elif res[i][0] - res[i - 1][1] <= res[i + 1][0] - res[i][1]:
- res[i][2] = res[i - 1][2]
- else:
- res[i][2] = res[i + 1][2]
- # merge the speakers
- res = self.merge_seque(res)
- return res
|