Olive_Whisper_ASR / utils /pun_predictor.py
sam2ai's picture
Synced repo using 'sync_with_huggingface' Github Action
6de3e11
import json
import os
import re
import numpy as np
import paddle.inference as paddle_infer
from paddlenlp.transformers import ErnieTokenizer
__all__ = ['PunctuationExecutor']
class PunctuationExecutor:
def __init__(self, model_dir, use_gpu=True, gpu_mem=500, num_threads=4):
# config
model_path = os.path.join(model_dir, 'model.pdmodel')
params_path = os.path.join(model_dir, 'model.pdiparams')
if not os.path.exists(model_path) or not os.path.exists(params_path):
raise Exception("{}{}".format(model_path, params_path))
self.config = paddle_infer.Config(model_path, params_path)
#
pretrained_token = 'ernie-1.0'
if os.path.exists(os.path.join(model_dir, 'info.json')):
with open(os.path.join(model_dir, 'info.json'), 'r', encoding='utf-8') as f:
data = json.load(f)
pretrained_token = data['pretrained_token']
if use_gpu:
self.config.enable_use_gpu(gpu_mem, 0)
else:
self.config.disable_gpu()
self.config.set_cpu_math_library_num_threads(num_threads)
# enable memory optim
self.config.enable_memory_optim()
self.config.disable_glog_info()
# config predictor
self.predictor = paddle_infer.create_predictor(self.config)
#
self.input_ids_handle = self.predictor.get_input_handle('input_ids')
self.token_type_ids_handle = self.predictor.get_input_handle('token_type_ids')
#
self.output_names = self.predictor.get_output_names()
self._punc_list = []
if not os.path.join(model_dir, 'vocab.txt'):
raise Exception("{}".format(os.path.join(model_dir, 'vocab.txt')))
with open(os.path.join(model_dir, 'vocab.txt'), 'r', encoding='utf-8') as f:
for line in f:
self._punc_list.append(line.strip())
self.tokenizer = ErnieTokenizer.from_pretrained(pretrained_token)
#
self('')
def _clean_text(self, text):
text = text.lower()
text = re.sub('[^A-Za-z0-9\u4e00-\u9fa5]', '', text)
text = re.sub(f'[{"".join([p for p in self._punc_list][1:])}]', '', text)
return text
#
def preprocess(self, text: str):
clean_text = self._clean_text(text)
if len(clean_text) == 0: return None
tokenized_input = self.tokenizer(list(clean_text), return_length=True, is_split_into_words=True)
input_ids = tokenized_input['input_ids']
seg_ids = tokenized_input['token_type_ids']
seq_len = tokenized_input['seq_len']
return input_ids, seg_ids, seq_len
def infer(self, input_ids: list, seg_ids: list):
#
self.input_ids_handle.reshape([1, len(input_ids)])
self.token_type_ids_handle.reshape([1, len(seg_ids)])
self.input_ids_handle.copy_from_cpu(np.array([input_ids]).astype('int64'))
self.token_type_ids_handle.copy_from_cpu(np.array([seg_ids]).astype('int64'))
# predictor
self.predictor.run()
#
output_handle = self.predictor.get_output_handle(self.output_names[0])
output_data = output_handle.copy_to_cpu()
return output_data
#
def postprocess(self, input_ids, seq_len, preds):
tokens = self.tokenizer.convert_ids_to_tokens(input_ids[1:seq_len - 1])
labels = preds[1:seq_len - 1].tolist()
assert len(tokens) == len(labels)
text = ''
for t, l in zip(tokens, labels):
text += t
if l != 0:
text += self._punc_list[l]
return text
def __call__(self, text: str) -> str:
#
input_ids, seg_ids, seq_len = self.preprocess(text)
preds = self.infer(input_ids=input_ids, seg_ids=seg_ids)
if len(preds.shape) == 2:
preds = preds[0]
text = self.postprocess(input_ids, seq_len, preds)
return text