| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353 |
- # coding=utf-8
- # Copyright 2021 The Fairseq Authors and the HuggingFace Inc. team. All rights reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """PyTorch Wav2Vec2 model."""
- import math
- import warnings
- from dataclasses import dataclass
- from typing import Callable, Optional, Union
- import numpy as np
- import torch
- from safetensors.torch import load_file as safe_load_file
- from torch import nn
- from torch.nn import CrossEntropyLoss
- from ...activations import ACT2FN
- from ...integrations.deepspeed import is_deepspeed_zero3_enabled
- from ...integrations.fsdp import is_fsdp_managed_module
- from ...modeling_attn_mask_utils import (
- _prepare_4d_attention_mask,
- _prepare_4d_attention_mask_for_sdpa,
- )
- from ...modeling_flash_attention_utils import FlashAttentionKwargs
- from ...modeling_layers import GradientCheckpointingLayer
- from ...modeling_outputs import (
- BaseModelOutput,
- CausalLMOutput,
- MaskedLMOutput,
- SequenceClassifierOutput,
- TokenClassifierOutput,
- Wav2Vec2BaseModelOutput,
- XVectorOutput,
- )
- from ...modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel
- from ...processing_utils import Unpack
- from ...utils import (
- ModelOutput,
- auto_docstring,
- cached_file,
- check_torch_load_is_safe,
- is_peft_available,
- is_torch_flex_attn_available,
- logging,
- )
- from .configuration_wav2vec2 import Wav2Vec2Config
- WAV2VEC2_ADAPTER_PT_FILE = "adapter.{}.bin"
- WAV2VEC2_ADAPTER_SAFE_FILE = "adapter.{}.safetensors"
- if is_torch_flex_attn_available():
- from ...integrations.flex_attention import make_flex_block_causal_mask
- logger = logging.get_logger(__name__)
- _HIDDEN_STATES_START_POSITION = 2
- @dataclass
- @auto_docstring(
- custom_intro="""
- Output type of [`Wav2Vec2ForPreTraining`], with potential hidden states and attentions.
- """
- )
- class Wav2Vec2ForPreTrainingOutput(ModelOutput):
- r"""
- loss (*optional*, returned when `sample_negative_indices` are passed, `torch.FloatTensor` of shape `(1,)`):
- Total loss as the sum of the contrastive loss (L_m) and the diversity loss (L_d) as stated in the [official
- paper](https://huggingface.co/papers/2006.11477).
- projected_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.proj_codevector_dim)`):
- Hidden-states of the model projected to *config.proj_codevector_dim* that can be used to predict the masked
- projected quantized states.
- projected_quantized_states (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.proj_codevector_dim)`):
- Quantized extracted feature vectors projected to *config.proj_codevector_dim* representing the positive
- target vectors for contrastive loss.
- codevector_perplexity (`torch.FloatTensor` of shape `(1,)`):
- The perplexity of the codevector distribution, used to measure the diversity of the codebook.
- contrastive_loss (*optional*, returned when `sample_negative_indices` are passed, `torch.FloatTensor` of shape `(1,)`):
- The contrastive loss (L_m) as stated in the [official paper](https://huggingface.co/papers/2006.11477).
- diversity_loss (*optional*, returned when `sample_negative_indices` are passed, `torch.FloatTensor` of shape `(1,)`):
- The diversity loss (L_d) as stated in the [official paper](https://huggingface.co/papers/2006.11477).
- """
- loss: Optional[torch.FloatTensor] = None
- projected_states: Optional[torch.FloatTensor] = None
- projected_quantized_states: Optional[torch.FloatTensor] = None
- codevector_perplexity: Optional[torch.FloatTensor] = None
- hidden_states: Optional[tuple[torch.FloatTensor]] = None
- attentions: Optional[tuple[torch.FloatTensor]] = None
- contrastive_loss: Optional[torch.FloatTensor] = None
- diversity_loss: Optional[torch.FloatTensor] = None
- def _compute_mask_indices(
- shape: tuple[int, int],
- mask_prob: float,
- mask_length: int,
- attention_mask: Optional[torch.LongTensor] = None,
- min_masks: int = 0,
- ) -> np.ndarray:
- """
- Computes random mask spans for a given shape. Used to implement [SpecAugment: A Simple Data Augmentation Method for
- ASR](https://huggingface.co/papers/1904.08779). Note that this method is not optimized to run on TPU and should be run on
- CPU as part of the preprocessing during training.
- Args:
- shape: The shape for which to compute masks. This should be of a tuple of size 2 where
- the first element is the batch size and the second element is the length of the axis to span.
- mask_prob: The percentage of the whole axis (between 0 and 1) which will be masked. The number of
- independently generated mask spans of length `mask_length` is computed by
- `mask_prob*shape[1]/mask_length`. Note that due to overlaps, `mask_prob` is an upper bound and the
- actual percentage will be smaller.
- mask_length: size of the mask
- min_masks: minimum number of masked spans
- attention_mask: A (right-padded) attention mask which independently shortens the feature axis of
- each batch dimension.
- """
- batch_size, sequence_length = shape
- if mask_length < 1:
- raise ValueError("`mask_length` has to be bigger than 0.")
- if mask_length > sequence_length:
- raise ValueError(
- f"`mask_length` has to be smaller than `sequence_length`, but got `mask_length`: {mask_length}"
- f" and `sequence_length`: {sequence_length}`"
- )
- # epsilon is used for probabilistic rounding
- epsilon = np.random.rand(1).item()
- def compute_num_masked_span(input_length):
- """Given input length, compute how many spans should be masked"""
- num_masked_span = int(mask_prob * input_length / mask_length + epsilon)
- num_masked_span = max(num_masked_span, min_masks)
- # make sure num masked span <= sequence_length
- if num_masked_span * mask_length > sequence_length:
- num_masked_span = sequence_length // mask_length
- # make sure num_masked span is also <= input_length - (mask_length - 1)
- if input_length - (mask_length - 1) < num_masked_span:
- num_masked_span = max(input_length - (mask_length - 1), 0)
- return num_masked_span
- # compute number of masked spans in batch
- input_lengths = (
- attention_mask.detach().sum(-1).tolist()
- if attention_mask is not None
- else [sequence_length for _ in range(batch_size)]
- )
- # SpecAugment mask to fill
- spec_aug_mask = np.zeros((batch_size, sequence_length), dtype=bool)
- spec_aug_mask_idxs = []
- max_num_masked_span = compute_num_masked_span(sequence_length)
- if max_num_masked_span == 0:
- return spec_aug_mask
- for input_length in input_lengths:
- # compute num of masked spans for this input
- num_masked_span = compute_num_masked_span(input_length)
- # get random indices to mask
- spec_aug_mask_idx = np.random.choice(
- np.arange(input_length - (mask_length - 1)), num_masked_span, replace=False
- )
- # pick first sampled index that will serve as a dummy index to pad vector
- # to ensure same dimension for all batches due to probabilistic rounding
- # Picking first sample just pads those vectors twice.
- if len(spec_aug_mask_idx) == 0:
- # this case can only happen if `input_length` is strictly smaller then
- # `sequence_length` in which case the last token has to be a padding
- # token which we can use as a dummy mask id
- dummy_mask_idx = sequence_length - 1
- else:
- dummy_mask_idx = spec_aug_mask_idx[0]
- spec_aug_mask_idx = np.concatenate(
- [spec_aug_mask_idx, np.ones(max_num_masked_span - num_masked_span, dtype=np.int32) * dummy_mask_idx]
- )
- spec_aug_mask_idxs.append(spec_aug_mask_idx)
- spec_aug_mask_idxs = np.array(spec_aug_mask_idxs)
- # expand masked indices to masked spans
- spec_aug_mask_idxs = np.broadcast_to(
- spec_aug_mask_idxs[:, :, None], (batch_size, max_num_masked_span, mask_length)
- )
- spec_aug_mask_idxs = spec_aug_mask_idxs.reshape(batch_size, max_num_masked_span * mask_length)
- # add offset to the starting indexes so that indexes now create a span
- offsets = np.arange(mask_length)[None, None, :]
- offsets = np.broadcast_to(offsets, (batch_size, max_num_masked_span, mask_length)).reshape(
- batch_size, max_num_masked_span * mask_length
- )
- spec_aug_mask_idxs = spec_aug_mask_idxs + offsets
- # ensure that we cannot have indices larger than sequence_length
- if spec_aug_mask_idxs.max() > sequence_length - 1:
- spec_aug_mask_idxs[spec_aug_mask_idxs > sequence_length - 1] = sequence_length - 1
- # scatter indices to mask
- np.put_along_axis(spec_aug_mask, spec_aug_mask_idxs, 1, -1)
- return spec_aug_mask
- def _sample_negative_indices(
- features_shape: tuple, num_negatives: int, mask_time_indices: Optional[np.ndarray] = None
- ):
- """
- Sample `num_negatives` vectors from feature vectors.
- """
- batch_size, sequence_length = features_shape
- # generate indices of the positive vectors themselves, repeat them `num_negatives` times
- sequence_length_range = np.arange(sequence_length)
- # get `num_negatives` random vector indices from the same utterance
- sampled_negative_indices = np.zeros(shape=(batch_size, sequence_length, num_negatives), dtype=np.int32)
- mask_time_indices = (
- mask_time_indices.astype(bool) if mask_time_indices is not None else np.ones(features_shape, dtype=bool)
- )
- for batch_idx in range(batch_size):
- high = mask_time_indices[batch_idx].sum() - 1
- mapped_masked_indices = sequence_length_range[mask_time_indices[batch_idx]]
- feature_indices = np.broadcast_to(np.arange(high + 1)[:, None], (high + 1, num_negatives))
- sampled_indices = np.random.randint(0, high, size=(high + 1, num_negatives))
- # avoid sampling the same positive vector, but keep the distribution uniform
- sampled_indices[sampled_indices >= feature_indices] += 1
- # remap to actual indices
- sampled_negative_indices[batch_idx][mask_time_indices[batch_idx]] = mapped_masked_indices[sampled_indices]
- # correct for batch size
- sampled_negative_indices[batch_idx] += batch_idx * sequence_length
- return sampled_negative_indices
- class Wav2Vec2NoLayerNormConvLayer(GradientCheckpointingLayer):
- def __init__(self, config, layer_id=0):
- super().__init__()
- self.in_conv_dim = config.conv_dim[layer_id - 1] if layer_id > 0 else 1
- self.out_conv_dim = config.conv_dim[layer_id]
- self.conv = nn.Conv1d(
- self.in_conv_dim,
- self.out_conv_dim,
- kernel_size=config.conv_kernel[layer_id],
- stride=config.conv_stride[layer_id],
- bias=config.conv_bias,
- )
- self.activation = ACT2FN[config.feat_extract_activation]
- def forward(self, hidden_states):
- hidden_states = self.conv(hidden_states)
- hidden_states = self.activation(hidden_states)
- return hidden_states
- class Wav2Vec2LayerNormConvLayer(GradientCheckpointingLayer):
- def __init__(self, config, layer_id=0):
- super().__init__()
- self.in_conv_dim = config.conv_dim[layer_id - 1] if layer_id > 0 else 1
- self.out_conv_dim = config.conv_dim[layer_id]
- self.conv = nn.Conv1d(
- self.in_conv_dim,
- self.out_conv_dim,
- kernel_size=config.conv_kernel[layer_id],
- stride=config.conv_stride[layer_id],
- bias=config.conv_bias,
- )
- self.layer_norm = nn.LayerNorm(self.out_conv_dim, elementwise_affine=True)
- self.activation = ACT2FN[config.feat_extract_activation]
- def forward(self, hidden_states):
- hidden_states = self.conv(hidden_states)
- hidden_states = hidden_states.transpose(-2, -1)
- hidden_states = self.layer_norm(hidden_states)
- hidden_states = hidden_states.transpose(-2, -1)
- hidden_states = self.activation(hidden_states)
- return hidden_states
- class Wav2Vec2GroupNormConvLayer(GradientCheckpointingLayer):
- def __init__(self, config, layer_id=0):
- super().__init__()
- self.in_conv_dim = config.conv_dim[layer_id - 1] if layer_id > 0 else 1
- self.out_conv_dim = config.conv_dim[layer_id]
- self.conv = nn.Conv1d(
- self.in_conv_dim,
- self.out_conv_dim,
- kernel_size=config.conv_kernel[layer_id],
- stride=config.conv_stride[layer_id],
- bias=config.conv_bias,
- )
- self.activation = ACT2FN[config.feat_extract_activation]
- self.layer_norm = nn.GroupNorm(num_groups=self.out_conv_dim, num_channels=self.out_conv_dim, affine=True)
- def forward(self, hidden_states):
- hidden_states = self.conv(hidden_states)
- hidden_states = self.layer_norm(hidden_states)
- hidden_states = self.activation(hidden_states)
- return hidden_states
- class Wav2Vec2PositionalConvEmbedding(nn.Module):
- def __init__(self, config):
- super().__init__()
- self.conv = nn.Conv1d(
- config.hidden_size,
- config.hidden_size,
- kernel_size=config.num_conv_pos_embeddings,
- padding=config.num_conv_pos_embeddings // 2,
- groups=config.num_conv_pos_embedding_groups,
- )
- weight_norm = nn.utils.weight_norm
- if hasattr(nn.utils.parametrizations, "weight_norm"):
- weight_norm = nn.utils.parametrizations.weight_norm
- if is_deepspeed_zero3_enabled():
- import deepspeed
- with deepspeed.zero.GatheredParameters(self.conv.weight, modifier_rank=0):
- self.conv = weight_norm(self.conv, name="weight", dim=2)
- if hasattr(self.conv, "parametrizations"):
- weight_g = self.conv.parametrizations.weight.original0
- weight_v = self.conv.parametrizations.weight.original1
- else:
- weight_g = self.conv.weight_g
- weight_v = self.conv.weight_v
- deepspeed.zero.register_external_parameter(self, weight_v)
- deepspeed.zero.register_external_parameter(self, weight_g)
- else:
- self.conv = weight_norm(self.conv, name="weight", dim=2)
- self.padding = Wav2Vec2SamePadLayer(config.num_conv_pos_embeddings)
- self.activation = ACT2FN[config.feat_extract_activation]
- def forward(self, hidden_states):
- hidden_states = hidden_states.transpose(1, 2)
- hidden_states = self.conv(hidden_states)
- hidden_states = self.padding(hidden_states)
- hidden_states = self.activation(hidden_states)
- hidden_states = hidden_states.transpose(1, 2)
- return hidden_states
- class Wav2Vec2SamePadLayer(nn.Module):
- def __init__(self, num_conv_pos_embeddings):
- super().__init__()
- self.num_pad_remove = 1 if num_conv_pos_embeddings % 2 == 0 else 0
- def forward(self, hidden_states):
- if self.num_pad_remove > 0:
- hidden_states = hidden_states[:, :, : -self.num_pad_remove]
- return hidden_states
- class Wav2Vec2FeatureEncoder(nn.Module):
- """Construct the features from raw audio waveform"""
- def __init__(self, config):
- super().__init__()
- if config.feat_extract_norm == "group":
- conv_layers = [Wav2Vec2GroupNormConvLayer(config, layer_id=0)] + [
- Wav2Vec2NoLayerNormConvLayer(config, layer_id=i + 1) for i in range(config.num_feat_extract_layers - 1)
- ]
- elif config.feat_extract_norm == "layer":
- conv_layers = [
- Wav2Vec2LayerNormConvLayer(config, layer_id=i) for i in range(config.num_feat_extract_layers)
- ]
- else:
- raise ValueError(
- f"`config.feat_extract_norm` is {config.feat_extract_norm}, but has to be one of ['group', 'layer']"
- )
- self.conv_layers = nn.ModuleList(conv_layers)
- self.gradient_checkpointing = False
- self._requires_grad = True
- def _freeze_parameters(self):
- for param in self.parameters():
- param.requires_grad = False
- self._requires_grad = False
- def forward(self, input_values):
- hidden_states = input_values[:, None]
- # make sure hidden_states require grad for gradient_checkpointing
- if self._requires_grad and self.training:
- hidden_states.requires_grad = True
- for conv_layer in self.conv_layers:
- hidden_states = conv_layer(hidden_states)
- return hidden_states
- class Wav2Vec2FeatureExtractor(Wav2Vec2FeatureEncoder):
- def __init__(self, config):
- super().__init__(config)
- warnings.warn(
- f"The class `{self.__class__.__name__}` has been depreciated "
- "and will be removed in Transformers v5. "
- f"Use `{self.__class__.__bases__[0].__name__}` instead.",
- FutureWarning,
- )
- class Wav2Vec2FeatureProjection(nn.Module):
- def __init__(self, config):
- super().__init__()
- self.layer_norm = nn.LayerNorm(config.conv_dim[-1], eps=config.layer_norm_eps)
- self.projection = nn.Linear(config.conv_dim[-1], config.hidden_size)
- self.dropout = nn.Dropout(config.feat_proj_dropout)
- def forward(self, hidden_states):
- # non-projected hidden states are needed for quantization
- norm_hidden_states = self.layer_norm(hidden_states)
- hidden_states = self.projection(norm_hidden_states)
- hidden_states = self.dropout(hidden_states)
- return hidden_states, norm_hidden_states
- # Copied from transformers.models.bart.modeling_bart.eager_attention_forward
- def eager_attention_forward(
- module: nn.Module,
- query: torch.Tensor,
- key: torch.Tensor,
- value: torch.Tensor,
- attention_mask: Optional[torch.Tensor],
- scaling: Optional[float] = None,
- dropout: float = 0.0,
- head_mask: Optional[torch.Tensor] = None,
- **kwargs,
- ):
- if scaling is None:
- scaling = query.size(-1) ** -0.5
- attn_weights = torch.matmul(query, key.transpose(2, 3)) * scaling
- if attention_mask is not None:
- attn_weights = attn_weights + attention_mask
- attn_weights = nn.functional.softmax(attn_weights, dim=-1)
- if head_mask is not None:
- attn_weights = attn_weights * head_mask.view(1, -1, 1, 1)
- attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training)
- attn_output = torch.matmul(attn_weights, value)
- attn_output = attn_output.transpose(1, 2).contiguous()
- return attn_output, attn_weights
- class Wav2Vec2Attention(nn.Module):
- """Multi-headed attention from 'Attention Is All You Need' paper"""
- def __init__(
- self,
- embed_dim: int,
- num_heads: int,
- dropout: float = 0.0,
- is_decoder: bool = False,
- bias: bool = True,
- is_causal: bool = False,
- config: Optional[Wav2Vec2Config] = None,
- ):
- super().__init__()
- self.embed_dim = embed_dim
- self.num_heads = num_heads
- self.dropout = dropout
- self.head_dim = embed_dim // num_heads
- self.config = config
- if (self.head_dim * num_heads) != self.embed_dim:
- raise ValueError(
- f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim}"
- f" and `num_heads`: {num_heads})."
- )
- self.scaling = self.head_dim**-0.5
- self.is_decoder = is_decoder
- self.is_causal = is_causal
- self.k_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
- self.v_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
- self.q_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
- self.out_proj = nn.Linear(embed_dim, embed_dim, bias=bias)
- def forward(
- self,
- hidden_states: torch.Tensor,
- key_value_states: Optional[torch.Tensor] = None,
- attention_mask: Optional[torch.Tensor] = None,
- layer_head_mask: Optional[torch.Tensor] = None,
- output_attentions: Optional[bool] = False,
- # TODO: we need a refactor so that the different attention modules can get their specific kwargs
- # ATM, we have mixed things encoder, decoder, and encoder-decoder attn
- **kwargs: Unpack[FlashAttentionKwargs],
- ) -> tuple[torch.Tensor, Optional[torch.Tensor], Optional[tuple[torch.Tensor]]]:
- """Input shape: Batch x Time x Channel"""
- # if key_value_states are provided this layer is used as a cross-attention layer
- # for the decoder
- is_cross_attention = key_value_states is not None
- # determine input shapes
- bsz, tgt_len = hidden_states.shape[:-1]
- src_len = key_value_states.shape[1] if is_cross_attention else tgt_len
- q_input_shape = (bsz, tgt_len, -1, self.head_dim)
- kv_input_shape = (bsz, src_len, -1, self.head_dim)
- # get query proj
- query_states = self.q_proj(hidden_states).view(*q_input_shape).transpose(1, 2)
- current_states = key_value_states if is_cross_attention else hidden_states
- key_states = self.k_proj(current_states).view(*kv_input_shape).transpose(1, 2)
- value_states = self.v_proj(current_states).view(*kv_input_shape).transpose(1, 2)
- attention_interface: Callable = eager_attention_forward
- if self.config._attn_implementation != "eager":
- attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation]
- attn_output, attn_weights = attention_interface(
- self,
- query_states,
- key_states,
- value_states,
- attention_mask,
- dropout=0.0 if not self.training else self.dropout,
- scaling=self.scaling,
- output_attentions=output_attentions,
- head_mask=layer_head_mask,
- **kwargs,
- )
- attn_output = attn_output.reshape(bsz, tgt_len, -1).contiguous()
- attn_output = self.out_proj(attn_output)
- return attn_output, attn_weights, None
- class Wav2Vec2FeedForward(nn.Module):
- def __init__(self, config):
- super().__init__()
- self.intermediate_dropout = nn.Dropout(config.activation_dropout)
- self.intermediate_dense = nn.Linear(config.hidden_size, config.intermediate_size)
- if isinstance(config.hidden_act, str):
- self.intermediate_act_fn = ACT2FN[config.hidden_act]
- else:
- self.intermediate_act_fn = config.hidden_act
- self.output_dense = nn.Linear(config.intermediate_size, config.hidden_size)
- self.output_dropout = nn.Dropout(config.hidden_dropout)
- def forward(self, hidden_states):
- hidden_states = self.intermediate_dense(hidden_states)
- hidden_states = self.intermediate_act_fn(hidden_states)
- hidden_states = self.intermediate_dropout(hidden_states)
- hidden_states = self.output_dense(hidden_states)
- hidden_states = self.output_dropout(hidden_states)
- return hidden_states
- class Wav2Vec2EncoderLayer(GradientCheckpointingLayer):
- def __init__(self, config):
- super().__init__()
- self.attention = Wav2Vec2Attention(
- embed_dim=config.hidden_size,
- num_heads=config.num_attention_heads,
- dropout=config.attention_dropout,
- is_decoder=False,
- config=config,
- )
- self.dropout = nn.Dropout(config.hidden_dropout)
- self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- self.feed_forward = Wav2Vec2FeedForward(config)
- self.final_layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- def forward(self, hidden_states, attention_mask=None, output_attentions=False):
- attn_residual = hidden_states
- hidden_states, attn_weights, _ = self.attention(
- hidden_states, attention_mask=attention_mask, output_attentions=output_attentions
- )
- hidden_states = self.dropout(hidden_states)
- hidden_states = attn_residual + hidden_states
- hidden_states = self.layer_norm(hidden_states)
- hidden_states = hidden_states + self.feed_forward(hidden_states)
- hidden_states = self.final_layer_norm(hidden_states)
- outputs = (hidden_states,)
- if output_attentions:
- outputs += (attn_weights,)
- return outputs
- class Wav2Vec2EncoderLayerStableLayerNorm(GradientCheckpointingLayer):
- def __init__(self, config):
- super().__init__()
- self.attention = Wav2Vec2Attention(
- embed_dim=config.hidden_size,
- num_heads=config.num_attention_heads,
- dropout=config.attention_dropout,
- is_decoder=False,
- config=config,
- )
- self.dropout = nn.Dropout(config.hidden_dropout)
- self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- self.feed_forward = Wav2Vec2FeedForward(config)
- self.final_layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- if getattr(config, "adapter_attn_dim", None) is not None:
- self.adapter_layer = Wav2Vec2AttnAdapterLayer(config)
- else:
- self.adapter_layer = None
- def forward(
- self,
- hidden_states: torch.Tensor,
- attention_mask: Optional[torch.Tensor] = None,
- output_attentions: bool = False,
- ):
- attn_residual = hidden_states
- hidden_states = self.layer_norm(hidden_states)
- hidden_states, attn_weights, _ = self.attention(
- hidden_states, attention_mask=attention_mask, output_attentions=output_attentions
- )
- hidden_states = self.dropout(hidden_states)
- hidden_states = attn_residual + hidden_states
- hidden_states = hidden_states + self.feed_forward(self.final_layer_norm(hidden_states))
- if self.adapter_layer is not None:
- hidden_states = hidden_states + self.adapter_layer(hidden_states)
- outputs = (hidden_states,)
- if output_attentions:
- outputs += (attn_weights,)
- return outputs
- class Wav2Vec2Encoder(nn.Module):
- def __init__(self, config):
- super().__init__()
- self.config = config
- self.pos_conv_embed = Wav2Vec2PositionalConvEmbedding(config)
- self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- self.dropout = nn.Dropout(config.hidden_dropout)
- self.layers = nn.ModuleList([Wav2Vec2EncoderLayer(config) for _ in range(config.num_hidden_layers)])
- self.gradient_checkpointing = False
- def forward(
- self,
- hidden_states: torch.tensor,
- attention_mask: Optional[torch.Tensor] = None,
- output_attentions: bool = False,
- output_hidden_states: bool = False,
- return_dict: bool = True,
- ):
- all_hidden_states = () if output_hidden_states else None
- all_self_attentions = () if output_attentions else None
- if attention_mask is not None:
- # make sure padded tokens output 0
- expand_attention_mask = attention_mask.unsqueeze(-1).repeat(1, 1, hidden_states.shape[2])
- hidden_states[~expand_attention_mask] = 0
- attention_mask = self._update_full_mask(
- attention_mask,
- hidden_states,
- )
- position_embeddings = self.pos_conv_embed(hidden_states)
- hidden_states = hidden_states + position_embeddings
- hidden_states = self.layer_norm(hidden_states)
- hidden_states = self.dropout(hidden_states)
- synced_gpus = is_deepspeed_zero3_enabled() or is_fsdp_managed_module(self)
- for layer in self.layers:
- if output_hidden_states:
- all_hidden_states = all_hidden_states + (hidden_states,)
- # add LayerDrop (see https://huggingface.co/papers/1909.11556 for description)
- dropout_probability = torch.rand([])
- skip_the_layer = self.training and dropout_probability < self.config.layerdrop
- if not skip_the_layer or synced_gpus:
- # under fsdp or deepspeed zero3 all gpus must run in sync
- layer_outputs = layer(
- hidden_states, attention_mask=attention_mask, output_attentions=output_attentions
- )
- hidden_states = layer_outputs[0]
- if skip_the_layer:
- layer_outputs = (None, None)
- if output_attentions:
- all_self_attentions = all_self_attentions + (layer_outputs[1],)
- if output_hidden_states:
- all_hidden_states = all_hidden_states + (hidden_states,)
- if not return_dict:
- return tuple(v for v in [hidden_states, all_hidden_states, all_self_attentions] if v is not None)
- return BaseModelOutput(
- last_hidden_state=hidden_states,
- hidden_states=all_hidden_states,
- attentions=all_self_attentions,
- )
- # Copied from transformers.models.bart.modeling_bart.BartPreTrainedModel._update_full_mask
- def _update_full_mask(
- self,
- attention_mask: Union[torch.Tensor, None],
- inputs_embeds: torch.Tensor,
- ):
- if attention_mask is not None:
- if self.config._attn_implementation == "flash_attention_2":
- attention_mask = attention_mask if 0 in attention_mask else None
- elif self.config._attn_implementation == "sdpa":
- # output_attentions=True & head_mask can not be supported when using SDPA, fall back to
- # the manual implementation that requires a 4D causal mask in all cases.
- # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
- attention_mask = _prepare_4d_attention_mask_for_sdpa(attention_mask, inputs_embeds.dtype)
- elif self.config._attn_implementation == "flex_attention":
- if isinstance(attention_mask, torch.Tensor):
- attention_mask = make_flex_block_causal_mask(attention_mask, is_causal=False)
- else:
- # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
- attention_mask = _prepare_4d_attention_mask(attention_mask, inputs_embeds.dtype)
- return attention_mask
- class Wav2Vec2EncoderStableLayerNorm(nn.Module):
- def __init__(self, config):
- super().__init__()
- self.config = config
- self.pos_conv_embed = Wav2Vec2PositionalConvEmbedding(config)
- self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
- self.dropout = nn.Dropout(config.hidden_dropout)
- self.layers = nn.ModuleList(
- [Wav2Vec2EncoderLayerStableLayerNorm(config) for _ in range(config.num_hidden_layers)]
- )
- self.gradient_checkpointing = False
- def forward(
- self,
- hidden_states,
- attention_mask=None,
- output_attentions=False,
- output_hidden_states=False,
- return_dict=True,
- ):
- all_hidden_states = () if output_hidden_states else None
- all_self_attentions = () if output_attentions else None
- if attention_mask is not None:
- # make sure padded tokens output 0
- expand_attention_mask = attention_mask.unsqueeze(-1).repeat(1, 1, hidden_states.shape[2])
- hidden_states[~expand_attention_mask] = 0
- attention_mask = self._update_full_mask(
- attention_mask,
- hidden_states,
- )
- position_embeddings = self.pos_conv_embed(hidden_states)
- hidden_states = hidden_states + position_embeddings
- hidden_states = self.dropout(hidden_states)
- synced_gpus = is_deepspeed_zero3_enabled() or is_fsdp_managed_module(self)
- for layer in self.layers:
- if output_hidden_states:
- all_hidden_states = all_hidden_states + (hidden_states,)
- # add LayerDrop (see https://huggingface.co/papers/1909.11556 for description)
- dropout_probability = torch.rand([])
- skip_the_layer = self.training and dropout_probability < self.config.layerdrop
- if not skip_the_layer or synced_gpus:
- # under fsdp or deepspeed zero3 all gpus must run in sync
- # XXX: could optimize this like synced_gpus in generate_utils but not sure if it's worth the code complication
- layer_outputs = layer(
- hidden_states, attention_mask=attention_mask, output_attentions=output_attentions
- )
- hidden_states = layer_outputs[0]
- if skip_the_layer:
- layer_outputs = (None, None)
- if output_attentions:
- all_self_attentions = all_self_attentions + (layer_outputs[1],)
- hidden_states = self.layer_norm(hidden_states)
- if output_hidden_states:
- all_hidden_states = all_hidden_states + (hidden_states,)
- if not return_dict:
- return tuple(v for v in [hidden_states, all_hidden_states, all_self_attentions] if v is not None)
- return BaseModelOutput(
- last_hidden_state=hidden_states,
- hidden_states=all_hidden_states,
- attentions=all_self_attentions,
- )
- # Copied from transformers.models.bart.modeling_bart.BartPreTrainedModel._update_full_mask
- def _update_full_mask(
- self,
- attention_mask: Union[torch.Tensor, None],
- inputs_embeds: torch.Tensor,
- ):
- if attention_mask is not None:
- if self.config._attn_implementation == "flash_attention_2":
- attention_mask = attention_mask if 0 in attention_mask else None
- elif self.config._attn_implementation == "sdpa":
- # output_attentions=True & head_mask can not be supported when using SDPA, fall back to
- # the manual implementation that requires a 4D causal mask in all cases.
- # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
- attention_mask = _prepare_4d_attention_mask_for_sdpa(attention_mask, inputs_embeds.dtype)
- elif self.config._attn_implementation == "flex_attention":
- if isinstance(attention_mask, torch.Tensor):
- attention_mask = make_flex_block_causal_mask(attention_mask, is_causal=False)
- else:
- # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
- attention_mask = _prepare_4d_attention_mask(attention_mask, inputs_embeds.dtype)
- return attention_mask
- class Wav2Vec2GumbelVectorQuantizer(nn.Module):
- """
- Vector quantization using gumbel softmax. See `[CATEGORICAL REPARAMETERIZATION WITH
- GUMBEL-SOFTMAX](https://huggingface.co/papers/1611.01144) for more information.
- """
- def __init__(self, config):
- super().__init__()
- self.num_groups = config.num_codevector_groups
- self.num_vars = config.num_codevectors_per_group
- if config.codevector_dim % self.num_groups != 0:
- raise ValueError(
- f"`config.codevector_dim {config.codevector_dim} must be divisible "
- f"by `config.num_codevector_groups` {self.num_groups} for concatenation"
- )
- # storage for codebook variables (codewords)
- self.codevectors = nn.Parameter(
- torch.FloatTensor(1, self.num_groups * self.num_vars, config.codevector_dim // self.num_groups)
- )
- self.weight_proj = nn.Linear(config.conv_dim[-1], self.num_groups * self.num_vars)
- # can be decayed for training
- self.temperature = 2
- @staticmethod
- def _compute_perplexity(probs, mask=None):
- if mask is not None:
- mask_extended = mask.flatten()[:, None, None].expand(probs.shape)
- probs = torch.where(mask_extended, probs, torch.zeros_like(probs))
- marginal_probs = probs.sum(dim=0) / mask.sum()
- else:
- marginal_probs = probs.mean(dim=0)
- perplexity = torch.exp(-torch.sum(marginal_probs * torch.log(marginal_probs + 1e-7), dim=-1)).sum()
- return perplexity
- def forward(self, hidden_states, mask_time_indices=None):
- batch_size, sequence_length, hidden_size = hidden_states.shape
- # project to codevector dim
- hidden_states = self.weight_proj(hidden_states)
- hidden_states = hidden_states.view(batch_size * sequence_length * self.num_groups, -1)
- if self.training:
- # sample code vector probs via gumbel in differentiateable way
- codevector_probs = nn.functional.gumbel_softmax(
- hidden_states.float(), tau=self.temperature, hard=True
- ).type_as(hidden_states)
- # compute perplexity
- codevector_soft_dist = torch.softmax(
- hidden_states.view(batch_size * sequence_length, self.num_groups, -1).float(), dim=-1
- )
- perplexity = self._compute_perplexity(codevector_soft_dist, mask_time_indices)
- else:
- # take argmax in non-differentiable way
- # comptute hard codevector distribution (one hot)
- codevector_idx = hidden_states.argmax(dim=-1)
- codevector_probs = hidden_states.new_zeros(hidden_states.shape).scatter_(
- -1, codevector_idx.view(-1, 1), 1.0
- )
- codevector_probs = codevector_probs.view(batch_size * sequence_length, self.num_groups, -1)
- perplexity = self._compute_perplexity(codevector_probs, mask_time_indices)
- codevector_probs = codevector_probs.view(batch_size * sequence_length, -1)
- # use probs to retrieve codevectors
- codevectors_per_group = codevector_probs.unsqueeze(-1) * self.codevectors
- codevectors = codevectors_per_group.view(batch_size * sequence_length, self.num_groups, self.num_vars, -1)
- codevectors = codevectors.sum(-2).view(batch_size, sequence_length, -1)
- return codevectors, perplexity
- class Wav2Vec2Adapter(nn.Module):
- def __init__(self, config):
- super().__init__()
- # feature dim might need to be down-projected
- if config.output_hidden_size != config.hidden_size:
- self.proj = nn.Linear(config.hidden_size, config.output_hidden_size)
- self.proj_layer_norm = nn.LayerNorm(config.output_hidden_size)
- else:
- self.proj = self.proj_layer_norm = None
- self.layers = nn.ModuleList(Wav2Vec2AdapterLayer(config) for _ in range(config.num_adapter_layers))
- self.layerdrop = config.layerdrop
- def forward(self, hidden_states):
- # down project hidden_states if necessary
- if self.proj is not None and self.proj_layer_norm is not None:
- hidden_states = self.proj(hidden_states)
- hidden_states = self.proj_layer_norm(hidden_states)
- hidden_states = hidden_states.transpose(1, 2)
- for layer in self.layers:
- layerdrop_prob = np.random.random()
- if not self.training or (layerdrop_prob > self.layerdrop):
- hidden_states = layer(hidden_states)
- hidden_states = hidden_states.transpose(1, 2)
- return hidden_states
- class Wav2Vec2AdapterLayer(nn.Module):
- def __init__(self, config):
- super().__init__()
- self.conv = nn.Conv1d(
- config.output_hidden_size,
- 2 * config.output_hidden_size,
- config.adapter_kernel_size,
- stride=config.adapter_stride,
- padding=1,
- )
- def forward(self, hidden_states):
- hidden_states = self.conv(hidden_states)
- hidden_states = nn.functional.glu(hidden_states, dim=1)
- return hidden_states
- class Wav2Vec2AttnAdapterLayer(nn.Module):
- def __init__(self, config):
- """
- Implements adapter modules directly with 3D tensor weight as parameters and without using ModuleList to speed
- up training throughput.
- """
- super().__init__()
- self.input_dim = config.adapter_attn_dim
- self.hidden_dim = config.hidden_size
- self.norm = nn.LayerNorm(self.hidden_dim)
- self.linear_1 = nn.Linear(self.hidden_dim, self.input_dim)
- self.act_fn = nn.ReLU()
- self.linear_2 = nn.Linear(self.input_dim, self.hidden_dim)
- def forward(self, hidden_states: torch.FloatTensor):
- hidden_states = self.norm(hidden_states)
- hidden_states = self.linear_1(hidden_states)
- hidden_states = self.act_fn(hidden_states)
- hidden_states = self.linear_2(hidden_states)
- return hidden_states
- @auto_docstring
- class Wav2Vec2PreTrainedModel(PreTrainedModel):
- config: Wav2Vec2Config
- base_model_prefix = "wav2vec2"
- main_input_name = "input_values"
- supports_gradient_checkpointing = True
- _supports_flash_attn = True
- _supports_sdpa = True
- _supports_flex_attn = True
- def _init_weights(self, module):
- """Initialize the weights"""
- # Wav2Vec2ForPreTraining last 2 linear layers need standard Linear init.
- if isinstance(module, Wav2Vec2ForPreTraining):
- module.project_hid.reset_parameters()
- module.project_q.reset_parameters()
- module.project_hid._is_hf_initialized = True
- module.project_q._is_hf_initialized = True
- # gumbel softmax requires special init
- elif isinstance(module, Wav2Vec2GumbelVectorQuantizer):
- module.weight_proj.weight.data.normal_(mean=0.0, std=1)
- module.weight_proj.bias.data.zero_()
- nn.init.uniform_(module.codevectors)
- elif isinstance(module, Wav2Vec2PositionalConvEmbedding):
- nn.init.normal_(
- module.conv.weight,
- mean=0,
- std=2 * math.sqrt(1 / (module.conv.kernel_size[0] * module.conv.in_channels)),
- )
- nn.init.constant_(module.conv.bias, 0)
- elif isinstance(module, Wav2Vec2FeatureProjection):
- k = math.sqrt(1 / module.projection.in_features)
- nn.init.uniform_(module.projection.weight, a=-k, b=k)
- nn.init.uniform_(module.projection.bias, a=-k, b=k)
- elif isinstance(module, nn.Linear):
- module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
- if module.bias is not None:
- module.bias.data.zero_()
- elif isinstance(module, (nn.LayerNorm, nn.GroupNorm)):
- module.bias.data.zero_()
- module.weight.data.fill_(1.0)
- elif isinstance(module, nn.Conv1d):
- nn.init.kaiming_normal_(module.weight)
- if module.bias is not None:
- k = math.sqrt(module.groups / (module.in_channels * module.kernel_size[0]))
- nn.init.uniform_(module.bias, a=-k, b=k)
- def _get_feat_extract_output_lengths(
- self, input_lengths: Union[torch.LongTensor, int], add_adapter: Optional[bool] = None
- ):
- """
- Computes the output length of the convolutional layers
- """
- add_adapter = self.config.add_adapter if add_adapter is None else add_adapter
- def _conv_out_length(input_length, kernel_size, stride):
- # 1D convolutional layer output length formula taken
- # from https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html
- return torch.div(input_length - kernel_size, stride, rounding_mode="floor") + 1
- for kernel_size, stride in zip(self.config.conv_kernel, self.config.conv_stride):
- input_lengths = _conv_out_length(input_lengths, kernel_size, stride)
- if add_adapter:
- for _ in range(self.config.num_adapter_layers):
- input_lengths = _conv_out_length(input_lengths, 1, self.config.adapter_stride)
- return input_lengths
- def _get_feature_vector_attention_mask(
- self, feature_vector_length: int, attention_mask: torch.LongTensor, add_adapter=None
- ):
- # Effectively attention_mask.sum(-1), but not inplace to be able to run
- # on inference mode.
- non_padded_lengths = attention_mask.cumsum(dim=-1)[:, -1]
- output_lengths = self._get_feat_extract_output_lengths(non_padded_lengths, add_adapter=add_adapter)
- output_lengths = output_lengths.to(torch.long)
- batch_size = attention_mask.shape[0]
- attention_mask = torch.zeros(
- (batch_size, feature_vector_length), dtype=attention_mask.dtype, device=attention_mask.device
- )
- # these two operations makes sure that all values before the output lengths idxs are attended to
- attention_mask[(torch.arange(attention_mask.shape[0], device=attention_mask.device), output_lengths - 1)] = 1
- attention_mask = attention_mask.flip([-1]).cumsum(-1).flip([-1]).bool()
- return attention_mask
- def _get_adapters(self):
- if self.config.adapter_attn_dim is None:
- raise ValueError(f"{self.__class__} has no adapter layers. Make sure to define `config.adapter_attn_dim`.")
- adapter_weights = {}
- for name, module in self.named_modules():
- if isinstance(module, Wav2Vec2AttnAdapterLayer):
- for param_name, param in module.named_parameters():
- adapter_weights[".".join([name, param_name])] = param
- if isinstance(self, Wav2Vec2ForCTC):
- for name, param in self.lm_head.named_parameters():
- adapter_weights[".".join(["lm_head", name])] = param
- return adapter_weights
- def init_adapter_layers(self):
- """
- (Re-)initialize attention adapter layers and lm head for adapter-only fine-tuning
- """
- # init attention adapters
- for module in self.modules():
- if isinstance(module, Wav2Vec2AttnAdapterLayer):
- self._init_weights(module)
- # init lm head
- if isinstance(self, Wav2Vec2ForCTC):
- self._init_weights(self.lm_head)
- def load_adapter(self, target_lang: str, force_load=True, **kwargs):
- r"""
- Load a language adapter model from a pre-trained adapter model.
- Parameters:
- target_lang (`str`):
- Has to be a language id of an existing adapter weight. Adapter weights are stored in the format
- adapter.<lang>.safetensors or adapter.<lang>.bin
- force_load (`bool`, defaults to `True`):
- Whether the weights shall be loaded even if `target_lang` matches `self.target_lang`.
- cache_dir (`Union[str, os.PathLike]`, *optional*):
- Path to a directory in which a downloaded pretrained model configuration should be cached if the
- standard cache should not be used.
- force_download (`bool`, *optional*, defaults to `False`):
- Whether or not to force the (re-)download of the model weights and configuration files, overriding the
- cached versions if they exist.
- resume_download:
- Deprecated and ignored. All downloads are now resumed by default when possible.
- Will be removed in v5 of Transformers.
- proxies (`dict[str, str]`, *optional*):
- A dictionary of proxy servers to use by protocol or endpoint, e.g., `{'http': 'foo.bar:3128',
- 'http://hostname': 'foo.bar:4012'}`. The proxies are used on each request.
- local_files_only(`bool`, *optional*, defaults to `False`):
- Whether or not to only look at local files (i.e., do not try to download the model).
- token (`str` or `bool`, *optional*):
- The token to use as HTTP bearer authorization for remote files. If `True`, or not specified, will use
- the token generated when running `hf auth login` (stored in `~/.huggingface`).
- revision (`str`, *optional*, defaults to `"main"`):
- The specific model version to use. It can be a branch name, a tag name, or a commit id, since we use a
- git-based system for storing models and other artifacts on huggingface.co, so `revision` can be any
- identifier allowed by git.
- <Tip>
- To test a pull request you made on the Hub, you can pass `revision="refs/pr/<pr_number>"`.
- </Tip>
- mirror (`str`, *optional*):
- Mirror source to accelerate downloads in China. If you are from China and have an accessibility
- problem, you can set this option to resolve it. Note that we do not guarantee the timeliness or safety.
- Please refer to the mirror site for more information.
- <Tip>
- Activate the special ["offline-mode"](https://huggingface.co/transformers/installation.html#offline-mode) to
- use this method in a firewalled environment.
- </Tip>
- Examples:
- ```python
- >>> from transformers import Wav2Vec2ForCTC, AutoProcessor
- >>> ckpt = "facebook/mms-1b-all"
- >>> processor = AutoProcessor.from_pretrained(ckpt)
- >>> model = Wav2Vec2ForCTC.from_pretrained(ckpt, target_lang="eng")
- >>> # set specific language
- >>> processor.tokenizer.set_target_lang("spa")
- >>> model.load_adapter("spa")
- ```
- """
- if self.config.adapter_attn_dim is None:
- raise ValueError(f"Cannot load_adapter for {target_lang} if `config.adapter_attn_dim` is not defined.")
- if target_lang == self.target_lang and not force_load:
- logger.warning(f"Adapter weights are already set to {target_lang}.")
- return
- cache_dir = kwargs.pop("cache_dir", None)
- force_download = kwargs.pop("force_download", False)
- resume_download = kwargs.pop("resume_download", None)
- proxies = kwargs.pop("proxies", None)
- local_files_only = kwargs.pop("local_files_only", False)
- token = kwargs.pop("token", None)
- use_auth_token = kwargs.pop("use_auth_token", None)
- revision = kwargs.pop("revision", None)
- use_safetensors = kwargs.pop("use_safetensors", None)
- if use_auth_token is not None:
- warnings.warn(
- "The `use_auth_token` argument is deprecated and will be removed in v5 of Transformers. Please use `token` instead.",
- FutureWarning,
- )
- if token is not None:
- raise ValueError(
- "`token` and `use_auth_token` are both specified. Please set only the argument `token`."
- )
- token = use_auth_token
- model_path_or_id = self.config._name_or_path
- state_dict = None
- # 1. Let's first try loading a safetensors adapter weight
- if use_safetensors is not False:
- filepath = WAV2VEC2_ADAPTER_SAFE_FILE.format(target_lang)
- try:
- weight_path = cached_file(
- model_path_or_id,
- filename=filepath,
- force_download=force_download,
- resume_download=resume_download,
- proxies=proxies,
- local_files_only=local_files_only,
- token=token,
- revision=revision,
- cache_dir=cache_dir,
- )
- state_dict = safe_load_file(weight_path)
- except OSError:
- if use_safetensors:
- # Raise any environment error raise by `cached_file`. It will have a helpful error message adapted
- # to the original exception.
- raise
- except Exception:
- # For any other exception, we throw a generic error.
- if use_safetensors:
- raise OSError(
- f"Can't load the model for '{model_path_or_id}'. If you were trying to load it"
- " from 'https://huggingface.co/models', make sure you don't have a local directory with the"
- f" same name. Otherwise, make sure '{model_path_or_id}' is the correct path to a"
- f" directory containing a file named {filepath}."
- )
- # 2. If this didn't work let's try loading a PyTorch adapter weight
- if state_dict is None:
- filepath = WAV2VEC2_ADAPTER_PT_FILE.format(target_lang)
- try:
- weight_path = cached_file(
- model_path_or_id,
- filename=filepath,
- force_download=force_download,
- resume_download=resume_download,
- proxies=proxies,
- local_files_only=local_files_only,
- token=token,
- revision=revision,
- cache_dir=cache_dir,
- )
- check_torch_load_is_safe()
- state_dict = torch.load(
- weight_path,
- map_location="cpu",
- weights_only=True,
- )
- except OSError:
- # Raise any environment error raise by `cached_file`. It will have a helpful error message adapted
- # to the original exception.
- raise
- except ValueError:
- raise
- except Exception:
- # For any other exception, we throw a generic error.
- raise OSError(
- f"Can't load the model for '{model_path_or_id}'. If you were trying to load it"
- " from 'https://huggingface.co/models', make sure you don't have a local directory with the"
- f" same name. Otherwise, make sure '{model_path_or_id}' is the correct path to a"
- f" directory containing a file named {filepath}."
- )
- adapter_weights = self._get_adapters()
- unexpected_keys = set(state_dict.keys()) - set(adapter_weights.keys())
- missing_keys = set(adapter_weights.keys()) - set(state_dict.keys())
- if len(unexpected_keys) > 0:
- raise ValueError(f"The adapter weights {weight_path} has unexpected keys: {', '.join(unexpected_keys)}.")
- elif len(missing_keys) > 0:
- raise ValueError(f"The adapter weights {weight_path} has missing keys: {', '.join(missing_keys)}.")
- # make sure now vocab size is correct
- target_vocab_size = state_dict["lm_head.weight"].shape[0]
- if target_vocab_size != self.config.vocab_size:
- self.lm_head = nn.Linear(
- self.config.output_hidden_size, target_vocab_size, device=self.device, dtype=self.dtype
- )
- self.config.vocab_size = target_vocab_size
- # make sure that adapter weights are put in exactly the same precision and device placement and overwritten adapter weights
- state_dict = {k: v.to(adapter_weights[k]) for k, v in state_dict.items()}
- self.load_state_dict(state_dict, strict=False)
- # set target language correctly
- self.target_lang = target_lang
- @auto_docstring
- class Wav2Vec2Model(Wav2Vec2PreTrainedModel):
- def __init__(self, config: Wav2Vec2Config):
- super().__init__(config)
- self.config = config
- self.feature_extractor = Wav2Vec2FeatureEncoder(config)
- self.feature_projection = Wav2Vec2FeatureProjection(config)
- # model only needs masking vector if mask prob is > 0.0
- if config.mask_time_prob > 0.0 or config.mask_feature_prob > 0.0:
- self.masked_spec_embed = nn.Parameter(torch.Tensor(config.hidden_size).uniform_())
- if config.do_stable_layer_norm:
- self.encoder = Wav2Vec2EncoderStableLayerNorm(config)
- else:
- self.encoder = Wav2Vec2Encoder(config)
- self.adapter = Wav2Vec2Adapter(config) if config.add_adapter else None
- # Initialize weights and apply final processing
- self.post_init()
- def freeze_feature_extractor(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameters will
- not be updated during training.
- """
- warnings.warn(
- "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. "
- "Please use the equivalent `freeze_feature_encoder` method instead.",
- FutureWarning,
- )
- self.freeze_feature_encoder()
- def freeze_feature_encoder(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameter will
- not be updated during training.
- """
- self.feature_extractor._freeze_parameters()
- def _mask_hidden_states(
- self,
- hidden_states: torch.FloatTensor,
- mask_time_indices: Optional[torch.FloatTensor] = None,
- attention_mask: Optional[torch.LongTensor] = None,
- ):
- """
- Masks extracted features along time axis and/or along feature axis according to
- [SpecAugment](https://huggingface.co/papers/1904.08779).
- """
- # `config.apply_spec_augment` can set masking to False
- if not getattr(self.config, "apply_spec_augment", True):
- return hidden_states
- # generate indices & apply SpecAugment along time axis
- batch_size, sequence_length, hidden_size = hidden_states.size()
- if mask_time_indices is not None:
- # apply SpecAugment along time axis with given mask_time_indices
- hidden_states[mask_time_indices] = self.masked_spec_embed.to(hidden_states.dtype)
- elif self.config.mask_time_prob > 0 and self.training:
- mask_time_indices = _compute_mask_indices(
- (batch_size, sequence_length),
- mask_prob=self.config.mask_time_prob,
- mask_length=self.config.mask_time_length,
- attention_mask=attention_mask,
- min_masks=self.config.mask_time_min_masks,
- )
- mask_time_indices = torch.tensor(mask_time_indices, device=hidden_states.device, dtype=torch.bool)
- hidden_states[mask_time_indices] = self.masked_spec_embed.to(hidden_states.dtype)
- if self.config.mask_feature_prob > 0 and self.training:
- # generate indices & apply SpecAugment along feature axis
- mask_feature_indices = _compute_mask_indices(
- (batch_size, hidden_size),
- mask_prob=self.config.mask_feature_prob,
- mask_length=self.config.mask_feature_length,
- min_masks=self.config.mask_feature_min_masks,
- )
- mask_feature_indices = torch.tensor(mask_feature_indices, device=hidden_states.device, dtype=torch.bool)
- mask_feature_indices = mask_feature_indices[:, None].expand(-1, sequence_length, -1)
- hidden_states[mask_feature_indices] = 0
- return hidden_states
- @auto_docstring
- def forward(
- self,
- input_values: Optional[torch.Tensor],
- attention_mask: Optional[torch.Tensor] = None,
- mask_time_indices: Optional[torch.FloatTensor] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- ) -> Union[tuple, Wav2Vec2BaseModelOutput]:
- r"""
- mask_time_indices (`torch.BoolTensor` of shape `(batch_size, sequence_length)`, *optional*):
- Indices to mask extracted features for contrastive loss. When in training mode, model learns to predict
- masked extracted features in *config.proj_codevector_dim* space.
- """
- output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
- output_hidden_states = (
- output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
- )
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- extract_features = self.feature_extractor(input_values)
- extract_features = extract_features.transpose(1, 2)
- if attention_mask is not None:
- # compute reduced attention_mask corresponding to feature vectors
- attention_mask = self._get_feature_vector_attention_mask(
- extract_features.shape[1], attention_mask, add_adapter=False
- )
- hidden_states, extract_features = self.feature_projection(extract_features)
- hidden_states = self._mask_hidden_states(
- hidden_states, mask_time_indices=mask_time_indices, attention_mask=attention_mask
- )
- encoder_outputs = self.encoder(
- hidden_states,
- attention_mask=attention_mask,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- hidden_states = encoder_outputs[0]
- if self.adapter is not None:
- hidden_states = self.adapter(hidden_states)
- if not return_dict:
- return (hidden_states, extract_features) + encoder_outputs[1:]
- return Wav2Vec2BaseModelOutput(
- last_hidden_state=hidden_states,
- extract_features=extract_features,
- hidden_states=encoder_outputs.hidden_states,
- attentions=encoder_outputs.attentions,
- )
- @auto_docstring(
- custom_intro="""
- Wav2Vec2 Model with a quantizer and `VQ` head on top.
- """
- )
- class Wav2Vec2ForPreTraining(Wav2Vec2PreTrainedModel):
- def __init__(self, config: Wav2Vec2Config):
- super().__init__(config)
- self.wav2vec2 = Wav2Vec2Model(config)
- self.dropout_features = nn.Dropout(config.feat_quantizer_dropout)
- self.quantizer = Wav2Vec2GumbelVectorQuantizer(config)
- self.project_hid = nn.Linear(config.hidden_size, config.proj_codevector_dim)
- self.project_q = nn.Linear(config.codevector_dim, config.proj_codevector_dim)
- # Initialize weights and apply final processing
- self.post_init()
- def set_gumbel_temperature(self, temperature: int):
- """
- Set the Gumbel softmax temperature to a given value. Only necessary for training
- """
- self.quantizer.temperature = temperature
- def freeze_feature_extractor(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameters will
- not be updated during training.
- """
- warnings.warn(
- "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. "
- "Please use the equivalent `freeze_feature_encoder` method instead.",
- FutureWarning,
- )
- self.freeze_feature_encoder()
- def freeze_feature_encoder(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameter will
- not be updated during training.
- """
- self.wav2vec2.feature_extractor._freeze_parameters()
- @staticmethod
- def compute_contrastive_logits(
- target_features: torch.FloatTensor,
- negative_features: torch.FloatTensor,
- predicted_features: torch.FloatTensor,
- temperature: int = 0.1,
- ):
- """
- Compute logits for contrastive loss based using cosine similarity as the distance measure between
- `[positive_feature, negative_features]` and `[predicted_features]`. Additionally, temperature can be applied.
- """
- target_features = torch.cat([target_features, negative_features], dim=0)
- logits = torch.cosine_similarity(predicted_features.float(), target_features.float(), dim=-1).type_as(
- target_features
- )
- # apply temperature
- logits = logits / temperature
- return logits
- @auto_docstring
- def forward(
- self,
- input_values: Optional[torch.Tensor],
- attention_mask: Optional[torch.Tensor] = None,
- mask_time_indices: Optional[torch.BoolTensor] = None,
- sampled_negative_indices: Optional[torch.BoolTensor] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- ) -> Union[tuple, Wav2Vec2ForPreTrainingOutput]:
- r"""
- mask_time_indices (`torch.BoolTensor` of shape `(batch_size, sequence_length)`, *optional*):
- Indices to mask extracted features for contrastive loss. When in training mode, model learns to predict
- masked extracted features in *config.proj_codevector_dim* space.
- sampled_negative_indices (`torch.BoolTensor` of shape `(batch_size, sequence_length, num_negatives)`, *optional*):
- Indices indicating which quantized target vectors are used as negative sampled vectors in contrastive loss.
- Required input for pre-training.
- Example:
- ```python
- >>> import torch
- >>> from transformers import AutoFeatureExtractor, Wav2Vec2ForPreTraining
- >>> from transformers.models.wav2vec2.modeling_wav2vec2 import _compute_mask_indices, _sample_negative_indices
- >>> from datasets import load_dataset
- >>> feature_extractor = AutoFeatureExtractor.from_pretrained("facebook/wav2vec2-base")
- >>> model = Wav2Vec2ForPreTraining.from_pretrained("facebook/wav2vec2-base")
- >>> ds = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation")
- >>> input_values = feature_extractor(ds[0]["audio"]["array"], return_tensors="pt").input_values # Batch size 1
- >>> # compute masked indices
- >>> batch_size, raw_sequence_length = input_values.shape
- >>> sequence_length = model._get_feat_extract_output_lengths(raw_sequence_length).item()
- >>> mask_time_indices = _compute_mask_indices(
- ... shape=(batch_size, sequence_length), mask_prob=0.2, mask_length=2
- ... )
- >>> sampled_negative_indices = _sample_negative_indices(
- ... features_shape=(batch_size, sequence_length),
- ... num_negatives=model.config.num_negatives,
- ... mask_time_indices=mask_time_indices,
- ... )
- >>> mask_time_indices = torch.tensor(data=mask_time_indices, device=input_values.device, dtype=torch.long)
- >>> sampled_negative_indices = torch.tensor(
- ... data=sampled_negative_indices, device=input_values.device, dtype=torch.long
- ... )
- >>> with torch.no_grad():
- ... outputs = model(input_values, mask_time_indices=mask_time_indices)
- >>> # compute cosine similarity between predicted (=projected_states) and target (=projected_quantized_states)
- >>> cosine_sim = torch.cosine_similarity(outputs.projected_states, outputs.projected_quantized_states, dim=-1)
- >>> # show that cosine similarity is much higher than random
- >>> cosine_sim[mask_time_indices.to(torch.bool)].mean() > 0.5
- tensor(True)
- >>> # for contrastive loss training model should be put into train mode
- >>> model = model.train()
- >>> loss = model(
- ... input_values, mask_time_indices=mask_time_indices, sampled_negative_indices=sampled_negative_indices
- ... ).loss
- ```"""
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- if mask_time_indices is not None:
- mask_time_indices = mask_time_indices.to(torch.bool)
- outputs = self.wav2vec2(
- input_values,
- attention_mask=attention_mask,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- mask_time_indices=mask_time_indices,
- return_dict=return_dict,
- )
- # 1. project all transformed features (including masked) to final vq dim
- transformer_features = self.project_hid(outputs[0])
- # 2. quantize all (unmasked) extracted features and project to final vq dim
- extract_features = self.dropout_features(outputs[1])
- if attention_mask is not None:
- # compute reduced attention_mask corresponding to feature vectors
- attention_mask = self._get_feature_vector_attention_mask(
- extract_features.shape[1], attention_mask, add_adapter=False
- )
- quantized_features, codevector_perplexity = self.quantizer(
- extract_features, mask_time_indices=mask_time_indices
- )
- quantized_features = quantized_features.to(self.project_q.weight.dtype)
- quantized_features = self.project_q(quantized_features)
- loss = contrastive_loss = diversity_loss = None
- if sampled_negative_indices is not None:
- batch_size, sequence_length, hidden_size = quantized_features.shape
- # for training, we sample negatives
- # 3. sample K negatives (distractors) quantized states for contrastive loss
- # if attention_mask is passed, make sure that padded feature vectors cannot be sampled
- # sample negative quantized vectors BTC => (BxT)C
- negative_quantized_features = quantized_features.view(-1, hidden_size)[
- sampled_negative_indices.long().view(-1)
- ]
- negative_quantized_features = negative_quantized_features.view(
- batch_size, sequence_length, -1, hidden_size
- ).permute(2, 0, 1, 3)
- # 4. compute logits, corresponding to `logs = sim(c_t, [q_t, \sim{q}_t]) / \kappa`
- # of equation (3) in https://huggingface.co/papers/2006.11477
- logits = self.compute_contrastive_logits(
- quantized_features[None, :],
- negative_quantized_features,
- transformer_features,
- self.config.contrastive_logits_temperature,
- )
- # 5. if a negative vector is identical to the positive (i.e. when codebook utilization is low),
- # its cosine similarity will be masked
- neg_is_pos = (quantized_features == negative_quantized_features).all(-1)
- if neg_is_pos.any():
- logits[1:][neg_is_pos] = float("-inf")
- # 6. compute contrastive loss \mathbf{L}_m = cross_entropy(logs) =
- # -log(exp(sim(c_t, q_t)/\kappa) / \sum_{\sim{q}} exp(sim(c_t, \sim{q})/\kappa))
- logits = logits.transpose(0, 2).reshape(-1, logits.size(0))
- target = ((1 - mask_time_indices.long()) * -100).transpose(0, 1).flatten()
- contrastive_loss = nn.functional.cross_entropy(logits.float(), target, reduction="sum")
- # 7. compute diversity loss: \mathbf{L}_d
- num_codevectors = self.config.num_codevectors_per_group * self.config.num_codevector_groups
- diversity_loss = ((num_codevectors - codevector_perplexity) / num_codevectors) * mask_time_indices.sum()
- # 8. \mathbf{L} = \mathbf{L}_m + \alpha * \mathbf{L}_d
- loss = contrastive_loss + self.config.diversity_loss_weight * diversity_loss
- if not return_dict:
- if loss is not None:
- return (loss, transformer_features, quantized_features, codevector_perplexity) + outputs[2:]
- return (transformer_features, quantized_features, codevector_perplexity) + outputs[2:]
- return Wav2Vec2ForPreTrainingOutput(
- loss=loss,
- projected_states=transformer_features,
- projected_quantized_states=quantized_features,
- codevector_perplexity=codevector_perplexity,
- hidden_states=outputs.hidden_states,
- attentions=outputs.attentions,
- contrastive_loss=contrastive_loss,
- diversity_loss=diversity_loss,
- )
- @auto_docstring
- class Wav2Vec2ForMaskedLM(Wav2Vec2PreTrainedModel):
- def __init__(self, config):
- super().__init__(config)
- warnings.warn(
- "The class `Wav2Vec2ForMaskedLM` is deprecated. Please use `Wav2Vec2ForCTC` instead.", FutureWarning
- )
- self.wav2vec2 = Wav2Vec2Model(config)
- self.dropout = nn.Dropout(config.final_dropout)
- self.lm_head = nn.Linear(config.hidden_size, config.vocab_size)
- # Initialize weights and apply final processing
- self.post_init()
- @auto_docstring
- def forward(
- self,
- input_values: torch.FloatTensor,
- attention_mask: Optional[torch.LongTensor] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- labels: Optional[torch.Tensor] = None,
- ) -> Union[tuple, MaskedLMOutput]:
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- outputs = self.wav2vec2(
- input_values,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- hidden_states = outputs[0]
- hidden_states = self.dropout(hidden_states)
- logits = self.lm_head(hidden_states)
- if not return_dict:
- output = (logits,) + outputs[2:]
- return output
- return MaskedLMOutput(logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions)
- @auto_docstring(
- custom_intro="""
- Wav2Vec2 Model with a `language modeling` head on top for Connectionist Temporal Classification (CTC).
- """
- )
- class Wav2Vec2ForCTC(Wav2Vec2PreTrainedModel):
- def __init__(self, config, target_lang: Optional[str] = None):
- r"""
- target_lang (`str`, *optional*):
- Language id of adapter weights. Adapter weights are stored in the format adapter.<lang>.safetensors or
- adapter.<lang>.bin. Only relevant when using an instance of [`Wav2Vec2ForCTC`] with adapters. Uses 'eng' by
- default.
- """
- super().__init__(config)
- self.wav2vec2 = Wav2Vec2Model(config)
- self.dropout = nn.Dropout(config.final_dropout)
- self.target_lang = target_lang
- if config.vocab_size is None:
- raise ValueError(
- f"You are trying to instantiate {self.__class__} with a configuration that "
- "does not define the vocabulary size of the language model head. Please "
- "instantiate the model as follows: `Wav2Vec2ForCTC.from_pretrained(..., vocab_size=vocab_size)`. "
- "or define `vocab_size` of your model's configuration."
- )
- output_hidden_size = (
- config.output_hidden_size if hasattr(config, "add_adapter") and config.add_adapter else config.hidden_size
- )
- self.lm_head = nn.Linear(output_hidden_size, config.vocab_size)
- # Initialize weights and apply final processing
- self.post_init()
- def tie_weights(self):
- """
- This method overwrites [`~PreTrainedModel.tie_weights`] so that adapter weights can be correctly loaded when
- passing `target_lang=...` to `from_pretrained(...)`.
- This method is **not** supposed to be called by the user and is prone to be changed in the future.
- """
- # Note that `tie_weights` is usually used to tie input and output embedding weights. The method is re-purposed to
- # correctly load adapter layers for Wav2Vec2 so that we do not have to introduce a new API to
- # [`PreTrainedModel`]. While slightly hacky, Wav2Vec2 never has to tie input and output embeddings, so that it is
- # ok to repurpose this function here.
- target_lang = self.target_lang
- if target_lang is not None and getattr(self.config, "adapter_attn_dim", None) is None:
- raise ValueError(f"Cannot pass `target_lang`: {target_lang} if `config.adapter_attn_dim` is not defined.")
- elif target_lang is None and getattr(self.config, "adapter_attn_dim", None) is not None:
- logger.info("By default `target_lang` is set to 'eng'.")
- elif target_lang is not None:
- self.load_adapter(target_lang, force_load=True)
- def freeze_feature_extractor(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameter will
- not be updated during training.
- """
- warnings.warn(
- "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. "
- "Please use the equivalent `freeze_feature_encoder` method instead.",
- FutureWarning,
- )
- self.freeze_feature_encoder()
- def freeze_feature_encoder(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameter will
- not be updated during training.
- """
- self.wav2vec2.feature_extractor._freeze_parameters()
- def freeze_base_model(self):
- """
- Calling this function will disable the gradient computation for the base model so that its parameters will not
- be updated during training. Only the classification head will be updated.
- """
- for param in self.wav2vec2.parameters():
- param.requires_grad = False
- @auto_docstring
- def forward(
- self,
- input_values: Optional[torch.Tensor],
- attention_mask: Optional[torch.Tensor] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- labels: Optional[torch.Tensor] = None,
- ) -> Union[tuple, CausalLMOutput]:
- r"""
- labels (`torch.LongTensor` of shape `(batch_size, target_length)`, *optional*):
- Labels for connectionist temporal classification. Note that `target_length` has to be smaller or equal to
- the sequence length of the output logits. Indices are selected in `[-100, 0, ..., config.vocab_size - 1]`.
- All labels set to `-100` are ignored (masked), the loss is only computed for labels in `[0, ...,
- config.vocab_size - 1]`.
- """
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- if labels is not None and labels.max() >= self.config.vocab_size:
- raise ValueError(f"Label values must be <= vocab_size: {self.config.vocab_size}")
- outputs = self.wav2vec2(
- input_values,
- attention_mask=attention_mask,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- hidden_states = outputs[0]
- hidden_states = self.dropout(hidden_states)
- logits = self.lm_head(hidden_states)
- loss = None
- if labels is not None:
- # retrieve loss input_lengths from attention_mask
- attention_mask = (
- attention_mask if attention_mask is not None else torch.ones_like(input_values, dtype=torch.long)
- )
- input_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(-1)).to(torch.long)
- # assuming that padded tokens are filled with -100
- # when not being attended to
- labels_mask = labels >= 0
- target_lengths = labels_mask.sum(-1)
- flattened_targets = labels.masked_select(labels_mask)
- # ctc_loss doesn't support fp16
- log_probs = nn.functional.log_softmax(logits, dim=-1, dtype=torch.float32).transpose(0, 1)
- with torch.backends.cudnn.flags(enabled=False):
- loss = nn.functional.ctc_loss(
- log_probs,
- flattened_targets,
- input_lengths,
- target_lengths,
- blank=self.config.pad_token_id,
- reduction=self.config.ctc_loss_reduction,
- zero_infinity=self.config.ctc_zero_infinity,
- )
- if not return_dict:
- output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:]
- return ((loss,) + output) if loss is not None else output
- return CausalLMOutput(
- loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions
- )
- @auto_docstring(
- custom_intro="""
- Wav2Vec2 Model with a sequence classification head on top (a linear layer over the pooled output) for tasks like
- SUPERB Keyword Spotting.
- """
- )
- class Wav2Vec2ForSequenceClassification(Wav2Vec2PreTrainedModel):
- def __init__(self, config):
- super().__init__(config)
- if hasattr(config, "add_adapter") and config.add_adapter:
- raise ValueError(
- "Sequence classification does not support the use of Wav2Vec2 adapters (config.add_adapter=True)"
- )
- self.wav2vec2 = Wav2Vec2Model(config)
- num_layers = config.num_hidden_layers + 1 # transformer layers + input embeddings
- if config.use_weighted_layer_sum:
- self.layer_weights = nn.Parameter(torch.ones(num_layers) / num_layers)
- self.projector = nn.Linear(config.hidden_size, config.classifier_proj_size)
- self.classifier = nn.Linear(config.classifier_proj_size, config.num_labels)
- # Initialize weights and apply final processing
- self.post_init()
- def freeze_feature_extractor(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameters will
- not be updated during training.
- """
- warnings.warn(
- "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. "
- "Please use the equivalent `freeze_feature_encoder` method instead.",
- FutureWarning,
- )
- self.freeze_feature_encoder()
- def freeze_feature_encoder(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameter will
- not be updated during training.
- """
- self.wav2vec2.feature_extractor._freeze_parameters()
- def freeze_base_model(self):
- """
- Calling this function will disable the gradient computation for the base model so that its parameters will not
- be updated during training. Only the classification head will be updated.
- """
- for param in self.wav2vec2.parameters():
- param.requires_grad = False
- @auto_docstring
- def forward(
- self,
- input_values: Optional[torch.Tensor],
- attention_mask: Optional[torch.Tensor] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- labels: Optional[torch.Tensor] = None,
- ) -> Union[tuple, SequenceClassifierOutput]:
- r"""
- input_values (`torch.FloatTensor` of shape `(batch_size, sequence_length)`):
- Float values of input raw speech waveform. Values can be obtained by loading a `.flac` or `.wav` audio file
- into an array of type `list[float]`, a `numpy.ndarray` or a `torch.Tensor`, *e.g.* via the torchcodec library
- (`pip install torchcodec`) or the soundfile library (`pip install soundfile`).
- To prepare the array into `input_values`, the [`AutoProcessor`] should be used for padding and conversion
- into a tensor of type `torch.FloatTensor`. See [`Wav2Vec2Processor.__call__`] for details.
- labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
- Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
- config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
- `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
- """
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- output_hidden_states = True if self.config.use_weighted_layer_sum else output_hidden_states
- outputs = self.wav2vec2(
- input_values,
- attention_mask=attention_mask,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- if self.config.use_weighted_layer_sum:
- hidden_states = outputs[_HIDDEN_STATES_START_POSITION]
- hidden_states = torch.stack(hidden_states, dim=1)
- norm_weights = nn.functional.softmax(self.layer_weights, dim=-1)
- hidden_states = (hidden_states * norm_weights.view(-1, 1, 1)).sum(dim=1)
- else:
- hidden_states = outputs[0]
- hidden_states = self.projector(hidden_states)
- if attention_mask is None:
- pooled_output = hidden_states.mean(dim=1)
- else:
- padding_mask = self._get_feature_vector_attention_mask(hidden_states.shape[1], attention_mask)
- expand_padding_mask = padding_mask.unsqueeze(-1).repeat(1, 1, hidden_states.shape[2])
- hidden_states[~expand_padding_mask] = 0.0
- pooled_output = hidden_states.sum(dim=1) / padding_mask.sum(dim=1).view(-1, 1)
- logits = self.classifier(pooled_output)
- loss = None
- if labels is not None:
- loss_fct = CrossEntropyLoss()
- loss = loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1))
- if not return_dict:
- output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:]
- return ((loss,) + output) if loss is not None else output
- return SequenceClassifierOutput(
- loss=loss,
- logits=logits,
- hidden_states=outputs.hidden_states,
- attentions=outputs.attentions,
- )
- @auto_docstring
- class Wav2Vec2ForAudioFrameClassification(Wav2Vec2PreTrainedModel):
- def __init__(self, config):
- super().__init__(config)
- if hasattr(config, "add_adapter") and config.add_adapter:
- raise ValueError(
- "Audio frame classification does not support the use of Wav2Vec2 adapters (config.add_adapter=True)"
- )
- self.wav2vec2 = Wav2Vec2Model(config)
- num_layers = config.num_hidden_layers + 1 # transformer layers + input embeddings
- if config.use_weighted_layer_sum:
- self.layer_weights = nn.Parameter(torch.ones(num_layers) / num_layers)
- self.classifier = nn.Linear(config.hidden_size, config.num_labels)
- self.num_labels = config.num_labels
- self.init_weights()
- def freeze_feature_extractor(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameter will
- not be updated during training.
- """
- warnings.warn(
- "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. "
- "Please use the equivalent `freeze_feature_encoder` method instead.",
- FutureWarning,
- )
- self.freeze_feature_encoder()
- def freeze_feature_encoder(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameter will
- not be updated during training.
- """
- self.wav2vec2.feature_extractor._freeze_parameters()
- def freeze_base_model(self):
- """
- Calling this function will disable the gradient computation for the base model so that its parameters will not
- be updated during training. Only the classification head will be updated.
- """
- for param in self.wav2vec2.parameters():
- param.requires_grad = False
- @auto_docstring
- def forward(
- self,
- input_values: Optional[torch.Tensor],
- attention_mask: Optional[torch.Tensor] = None,
- labels: Optional[torch.Tensor] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- ) -> Union[tuple, TokenClassifierOutput]:
- r"""
- input_values (`torch.FloatTensor` of shape `(batch_size, sequence_length)`):
- Float values of input raw speech waveform. Values can be obtained by loading a `.flac` or `.wav` audio file
- into an array of type `list[float]`, a `numpy.ndarray` or a `torch.Tensor`, *e.g.* via the torchcodec library
- (`pip install torchcodec`) or the soundfile library (`pip install soundfile`).
- To prepare the array into `input_values`, the [`AutoProcessor`] should be used for padding and conversion
- into a tensor of type `torch.FloatTensor`. See [`Wav2Vec2Processor.__call__`] for details.
- labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
- Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
- config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
- `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
- """
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- output_hidden_states = True if self.config.use_weighted_layer_sum else output_hidden_states
- outputs = self.wav2vec2(
- input_values,
- attention_mask=attention_mask,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- if self.config.use_weighted_layer_sum:
- hidden_states = outputs[_HIDDEN_STATES_START_POSITION]
- hidden_states = torch.stack(hidden_states, dim=1)
- norm_weights = nn.functional.softmax(self.layer_weights, dim=-1)
- hidden_states = (hidden_states * norm_weights.view(-1, 1, 1)).sum(dim=1)
- else:
- hidden_states = outputs[0]
- logits = self.classifier(hidden_states)
- loss = None
- if labels is not None:
- loss_fct = CrossEntropyLoss()
- loss = loss_fct(logits.view(-1, self.num_labels), torch.argmax(labels.view(-1, self.num_labels), axis=1))
- if not return_dict:
- output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:]
- return output
- return TokenClassifierOutput(
- loss=loss,
- logits=logits,
- hidden_states=outputs.hidden_states,
- attentions=outputs.attentions,
- )
- class AMSoftmaxLoss(nn.Module):
- def __init__(self, input_dim, num_labels, scale=30.0, margin=0.4):
- super().__init__()
- self.scale = scale
- self.margin = margin
- self.num_labels = num_labels
- self.weight = nn.Parameter(torch.randn(input_dim, num_labels), requires_grad=True)
- self.loss = nn.CrossEntropyLoss()
- def forward(self, hidden_states, labels):
- labels = labels.flatten()
- weight = nn.functional.normalize(self.weight, dim=0)
- hidden_states = nn.functional.normalize(hidden_states, dim=1)
- cos_theta = torch.mm(hidden_states, weight)
- psi = cos_theta - self.margin
- onehot = nn.functional.one_hot(labels, self.num_labels)
- logits = self.scale * torch.where(onehot.bool(), psi, cos_theta)
- loss = self.loss(logits, labels)
- return loss
- class TDNNLayer(nn.Module):
- def __init__(self, config, layer_id=0):
- super().__init__()
- self.in_conv_dim = config.tdnn_dim[layer_id - 1] if layer_id > 0 else config.tdnn_dim[layer_id]
- self.out_conv_dim = config.tdnn_dim[layer_id]
- self.kernel_size = config.tdnn_kernel[layer_id]
- self.dilation = config.tdnn_dilation[layer_id]
- self.kernel = nn.Linear(self.in_conv_dim * self.kernel_size, self.out_conv_dim)
- self.activation = nn.ReLU()
- def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
- if is_peft_available():
- from peft.tuners.lora import LoraLayer
- if is_peft_available():
- if isinstance(self.kernel, LoraLayer):
- warnings.warn(
- "Detected LoRA on TDNNLayer. LoRA weights won't be applied due to optimization. "
- "You should exclude TDNNLayer from LoRA's target modules.",
- )
- # for backward compatibility, we keep nn.Linear but call F.conv1d for speed up
- hidden_states = hidden_states.transpose(1, 2)
- weight = self.kernel.weight.view(self.out_conv_dim, self.kernel_size, self.in_conv_dim).transpose(1, 2)
- hidden_states = nn.functional.conv1d(hidden_states, weight, self.kernel.bias, dilation=self.dilation)
- hidden_states = hidden_states.transpose(1, 2)
- hidden_states = self.activation(hidden_states)
- return hidden_states
- @auto_docstring(
- custom_intro="""
- Wav2Vec2 Model with an XVector feature extraction head on top for tasks like Speaker Verification.
- """
- )
- class Wav2Vec2ForXVector(Wav2Vec2PreTrainedModel):
- def __init__(self, config):
- super().__init__(config)
- self.wav2vec2 = Wav2Vec2Model(config)
- num_layers = config.num_hidden_layers + 1 # transformer layers + input embeddings
- if config.use_weighted_layer_sum:
- self.layer_weights = nn.Parameter(torch.ones(num_layers) / num_layers)
- self.projector = nn.Linear(config.hidden_size, config.tdnn_dim[0])
- tdnn_layers = [TDNNLayer(config, i) for i in range(len(config.tdnn_dim))]
- self.tdnn = nn.ModuleList(tdnn_layers)
- self.feature_extractor = nn.Linear(config.tdnn_dim[-1] * 2, config.xvector_output_dim)
- self.classifier = nn.Linear(config.xvector_output_dim, config.xvector_output_dim)
- self.objective = AMSoftmaxLoss(config.xvector_output_dim, config.num_labels)
- self.init_weights()
- def freeze_feature_extractor(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameter will
- not be updated during training.
- """
- warnings.warn(
- "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. "
- "Please use the equivalent `freeze_feature_encoder` method instead.",
- FutureWarning,
- )
- self.freeze_feature_encoder()
- def freeze_feature_encoder(self):
- """
- Calling this function will disable the gradient computation for the feature encoder so that its parameter will
- not be updated during training.
- """
- self.wav2vec2.feature_extractor._freeze_parameters()
- def freeze_base_model(self):
- """
- Calling this function will disable the gradient computation for the base model so that its parameters will not
- be updated during training. Only the classification head will be updated.
- """
- for param in self.wav2vec2.parameters():
- param.requires_grad = False
- def _get_tdnn_output_lengths(self, input_lengths: Union[torch.LongTensor, int]):
- """
- Computes the output length of the TDNN layers
- """
- def _conv_out_length(input_length, kernel_size, stride):
- # 1D convolutional layer output length formula taken
- # from https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html
- return (input_length - kernel_size) // stride + 1
- for kernel_size in self.config.tdnn_kernel:
- input_lengths = _conv_out_length(input_lengths, kernel_size, 1)
- return input_lengths
- @auto_docstring
- def forward(
- self,
- input_values: Optional[torch.Tensor],
- attention_mask: Optional[torch.Tensor] = None,
- output_attentions: Optional[bool] = None,
- output_hidden_states: Optional[bool] = None,
- return_dict: Optional[bool] = None,
- labels: Optional[torch.Tensor] = None,
- ) -> Union[tuple, XVectorOutput]:
- r"""
- input_values (`torch.FloatTensor` of shape `(batch_size, sequence_length)`):
- Float values of input raw speech waveform. Values can be obtained by loading a `.flac` or `.wav` audio file
- into an array of type `list[float]`, a `numpy.ndarray` or a `torch.Tensor`, *e.g.* via the torchcodec library
- (`pip install torchcodec`) or the soundfile library (`pip install soundfile`).
- To prepare the array into `input_values`, the [`AutoProcessor`] should be used for padding and conversion
- into a tensor of type `torch.FloatTensor`. See [`Wav2Vec2Processor.__call__`] for details.
- labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
- Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
- config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
- `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
- """
- return_dict = return_dict if return_dict is not None else self.config.use_return_dict
- output_hidden_states = True if self.config.use_weighted_layer_sum else output_hidden_states
- outputs = self.wav2vec2(
- input_values,
- attention_mask=attention_mask,
- output_attentions=output_attentions,
- output_hidden_states=output_hidden_states,
- return_dict=return_dict,
- )
- if self.config.use_weighted_layer_sum:
- hidden_states = outputs[_HIDDEN_STATES_START_POSITION]
- hidden_states = torch.stack(hidden_states, dim=1)
- norm_weights = nn.functional.softmax(self.layer_weights, dim=-1)
- hidden_states = (hidden_states * norm_weights.view(-1, 1, 1)).sum(dim=1)
- else:
- hidden_states = outputs[0]
- hidden_states = self.projector(hidden_states)
- for tdnn_layer in self.tdnn:
- hidden_states = tdnn_layer(hidden_states)
- # Statistic Pooling
- if attention_mask is None:
- mean_features = hidden_states.mean(dim=1)
- std_features = hidden_states.std(dim=1)
- else:
- feat_extract_output_lengths = self._get_feat_extract_output_lengths(attention_mask.sum(dim=1))
- tdnn_output_lengths = self._get_tdnn_output_lengths(feat_extract_output_lengths)
- mean_features = []
- std_features = []
- for i, length in enumerate(tdnn_output_lengths):
- mean_features.append(hidden_states[i, :length].mean(dim=0))
- std_features.append(hidden_states[i, :length].std(dim=0))
- mean_features = torch.stack(mean_features)
- std_features = torch.stack(std_features)
- statistic_pooling = torch.cat([mean_features, std_features], dim=-1)
- output_embeddings = self.feature_extractor(statistic_pooling)
- logits = self.classifier(output_embeddings)
- loss = None
- if labels is not None:
- loss = self.objective(logits, labels)
- if not return_dict:
- output = (logits, output_embeddings) + outputs[_HIDDEN_STATES_START_POSITION:]
- return ((loss,) + output) if loss is not None else output
- return XVectorOutput(
- loss=loss,
- logits=logits,
- embeddings=output_embeddings,
- hidden_states=outputs.hidden_states,
- attentions=outputs.attentions,
- )
- __all__ = [
- "Wav2Vec2ForAudioFrameClassification",
- "Wav2Vec2ForCTC",
- "Wav2Vec2ForMaskedLM",
- "Wav2Vec2ForPreTraining",
- "Wav2Vec2ForSequenceClassification",
- "Wav2Vec2ForXVector",
- "Wav2Vec2Model",
- "Wav2Vec2PreTrainedModel",
- ]
|