Spaces:
Running
on
Zero
Running
on
Zero
File size: 8,352 Bytes
568e264 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
import math
from typing import Any, List, Optional, Tuple, Union
import torch
from wenet.transformer.search import DecodeResult
from wenet.utils.mask import (make_non_pad_mask, mask_finished_preds,
mask_finished_scores)
def _isChinese(ch: str):
if '\u4e00' <= ch <= '\u9fff' or '\u0030' <= ch <= '\u0039' or ch == '@':
return True
return False
def _isAllChinese(word: Union[List[Any], str]):
word_lists = []
for i in word:
cur = i.replace(' ', '')
cur = cur.replace('</s>', '')
cur = cur.replace('<s>', '')
cur = cur.replace('<unk>', '')
cur = cur.replace('<OOV>', '')
word_lists.append(cur)
if len(word_lists) == 0:
return False
for ch in word_lists:
if _isChinese(ch) is False:
return False
return True
def _isAllAlpha(word: Union[List[Any], str]):
word_lists = []
for i in word:
cur = i.replace(' ', '')
cur = cur.replace('</s>', '')
cur = cur.replace('<s>', '')
cur = cur.replace('<unk>', '')
cur = cur.replace('<OOV>', '')
word_lists.append(cur)
if len(word_lists) == 0:
return False
for ch in word_lists:
if ch.isalpha() is False and ch != "'":
return False
elif ch.isalpha() is True and _isChinese(ch) is True:
return False
return True
def paraformer_beautify_result(tokens: List[str]) -> str:
middle_lists = []
word_lists = []
word_item = ''
# wash words lists
for token in tokens:
if token in ['<sos>', '<eos>', '<blank>']:
continue
else:
middle_lists.append(token)
# all chinese characters
if _isAllChinese(middle_lists):
for _, ch in enumerate(middle_lists):
word_lists.append(ch.replace(' ', ''))
# all alpha characters
elif _isAllAlpha(middle_lists):
for _, ch in enumerate(middle_lists):
word = ''
if '@@' in ch:
word = ch.replace('@@', '')
word_item += word
else:
word_item += ch
word_lists.append(word_item)
word_lists.append(' ')
word_item = ''
# mix characters
else:
alpha_blank = False
for _, ch in enumerate(middle_lists):
word = ''
if _isAllChinese(ch):
if alpha_blank is True:
word_lists.pop()
word_lists.append(ch)
alpha_blank = False
elif '@@' in ch:
word = ch.replace('@@', '')
word_item += word
alpha_blank = False
elif _isAllAlpha(ch):
word_item += ch
word_lists.append(word_item)
word_lists.append(' ')
word_item = ''
alpha_blank = True
else:
word_lists.append(ch)
alpha_blank = False
return ''.join(word_lists).strip()
def gen_timestamps_from_peak(cif_peaks: List[int],
num_frames: int,
frame_rate=0.02):
START_END_THRESHOLD = 5
MAX_TOKEN_DURATION = 14
force_time_shift = -0.5
fire_place = [peak + force_time_shift for peak in cif_peaks]
times = []
for i in range(len(fire_place) - 1):
if MAX_TOKEN_DURATION < 0 or fire_place[
i + 1] - fire_place[i] <= MAX_TOKEN_DURATION:
times.append(
[fire_place[i] * frame_rate, fire_place[i + 1] * frame_rate])
else:
split = fire_place[i] + MAX_TOKEN_DURATION
times.append([fire_place[i] * frame_rate, split * frame_rate])
if len(times) > 0:
if num_frames - fire_place[-1] > START_END_THRESHOLD:
end = (num_frames + fire_place[-1]) * 0.5
times[-1][1] = end * frame_rate
times.append([end * frame_rate, num_frames * frame_rate])
else:
times[-1][1] = num_frames * frame_rate
return times
def paraformer_greedy_search(
decoder_out: torch.Tensor,
decoder_out_lens: torch.Tensor,
cif_peaks: Optional[torch.Tensor] = None) -> List[DecodeResult]:
batch_size = decoder_out.shape[0]
maxlen = decoder_out.size(1)
topk_prob, topk_index = decoder_out.topk(1, dim=2)
topk_index = topk_index.view(batch_size, maxlen) # (B, maxlen)
topk_prob = topk_prob.view(batch_size, maxlen)
results: List[DecodeResult] = []
topk_index = topk_index.cpu().tolist()
topk_prob = topk_prob.cpu().tolist()
decoder_out_lens = decoder_out_lens.cpu().numpy()
for (i, hyp) in enumerate(topk_index):
confidence = 0.0
tokens_confidence = []
lens = decoder_out_lens[i]
for logp in topk_prob[i][:lens]:
tokens_confidence.append(math.exp(logp))
confidence += logp
r = DecodeResult(hyp[:lens],
tokens_confidence=tokens_confidence,
confidence=math.exp(confidence / lens))
results.append(r)
if cif_peaks is not None:
for (b, peaks) in enumerate(cif_peaks):
result = results[b]
times = []
n_token = 0
for (i, peak) in enumerate(peaks):
if n_token >= len(result.tokens):
break
if peak > 1 - 1e-4:
times.append(i)
n_token += 1
result.times = times
assert len(result.times) == len(result.tokens)
return results
def paraformer_beam_search(decoder_out: torch.Tensor,
decoder_out_lens: torch.Tensor,
beam_size: int = 10,
eos: int = -1) -> List[DecodeResult]:
mask = make_non_pad_mask(decoder_out_lens)
indices, _ = _batch_beam_search(decoder_out,
mask,
beam_size=beam_size,
eos=eos)
best_hyps = indices[:, 0, :].cpu()
decoder_out_lens = decoder_out_lens.cpu()
results = []
# TODO(Mddct): scores, times etc
for (i, hyp) in enumerate(best_hyps.tolist()):
r = DecodeResult(hyp[:decoder_out_lens.numpy()[i]])
results.append(r)
return results
def _batch_beam_search(
logit: torch.Tensor,
masks: torch.Tensor,
beam_size: int = 10,
eos: int = -1,
) -> Tuple[torch.Tensor, torch.Tensor]:
""" Perform batch beam search
Args:
logit: shape (batch_size, seq_length, vocab_size)
masks: shape (batch_size, seq_length)
beam_size: beam size
Returns:
indices: shape (batch_size, beam_size, seq_length)
log_prob: shape (batch_size, beam_size)
"""
batch_size, seq_length, vocab_size = logit.shape
masks = ~masks
# beam search
with torch.no_grad():
# b,t,v
log_post = torch.nn.functional.log_softmax(logit, dim=-1)
# b,k
log_prob, indices = log_post[:, 0, :].topk(beam_size, sorted=True)
end_flag = torch.eq(masks[:, 0], 1).view(-1, 1)
# mask predictor and scores if end
log_prob = mask_finished_scores(log_prob, end_flag)
indices = mask_finished_preds(indices, end_flag, eos)
# b,k,1
indices = indices.unsqueeze(-1)
for i in range(1, seq_length):
# b,v
scores = mask_finished_scores(log_post[:, i, :], end_flag)
# b,v -> b,k,v
topk_scores = scores.unsqueeze(1).repeat(1, beam_size, 1)
# b,k,1 + b,k,v -> b,k,v
top_k_logp = log_prob.unsqueeze(-1) + topk_scores
# b,k,v -> b,k*v -> b,k
log_prob, top_k_index = top_k_logp.view(batch_size,
-1).topk(beam_size,
sorted=True)
index = mask_finished_preds(top_k_index, end_flag, eos)
indices = torch.cat([indices, index.unsqueeze(-1)], dim=-1)
end_flag = torch.eq(masks[:, i], 1).view(-1, 1)
indices = torch.fmod(indices, vocab_size)
return indices, log_prob
|