import json import os import re import numpy as np import paddle.inference as paddle_infer from paddlenlp.transformers import ErnieTokenizer __all__ = ['PunctuationExecutor'] class PunctuationExecutor: def __init__(self, model_dir, use_gpu=True, gpu_mem=500, num_threads=4): # config model_path = os.path.join(model_dir, 'model.pdmodel') params_path = os.path.join(model_dir, 'model.pdiparams') if not os.path.exists(model_path) or not os.path.exists(params_path): raise Exception("{}{}".format(model_path, params_path)) self.config = paddle_infer.Config(model_path, params_path) # pretrained_token = 'ernie-1.0' if os.path.exists(os.path.join(model_dir, 'info.json')): with open(os.path.join(model_dir, 'info.json'), 'r', encoding='utf-8') as f: data = json.load(f) pretrained_token = data['pretrained_token'] if use_gpu: self.config.enable_use_gpu(gpu_mem, 0) else: self.config.disable_gpu() self.config.set_cpu_math_library_num_threads(num_threads) # enable memory optim self.config.enable_memory_optim() self.config.disable_glog_info() # config predictor self.predictor = paddle_infer.create_predictor(self.config) # self.input_ids_handle = self.predictor.get_input_handle('input_ids') self.token_type_ids_handle = self.predictor.get_input_handle('token_type_ids') # self.output_names = self.predictor.get_output_names() self._punc_list = [] if not os.path.join(model_dir, 'vocab.txt'): raise Exception("{}".format(os.path.join(model_dir, 'vocab.txt'))) with open(os.path.join(model_dir, 'vocab.txt'), 'r', encoding='utf-8') as f: for line in f: self._punc_list.append(line.strip()) self.tokenizer = ErnieTokenizer.from_pretrained(pretrained_token) # self('') def _clean_text(self, text): text = text.lower() text = re.sub('[^A-Za-z0-9\u4e00-\u9fa5]', '', text) text = re.sub(f'[{"".join([p for p in self._punc_list][1:])}]', '', text) return text # def preprocess(self, text: str): clean_text = self._clean_text(text) if len(clean_text) == 0: return None tokenized_input = self.tokenizer(list(clean_text), return_length=True, is_split_into_words=True) input_ids = tokenized_input['input_ids'] seg_ids = tokenized_input['token_type_ids'] seq_len = tokenized_input['seq_len'] return input_ids, seg_ids, seq_len def infer(self, input_ids: list, seg_ids: list): # self.input_ids_handle.reshape([1, len(input_ids)]) self.token_type_ids_handle.reshape([1, len(seg_ids)]) self.input_ids_handle.copy_from_cpu(np.array([input_ids]).astype('int64')) self.token_type_ids_handle.copy_from_cpu(np.array([seg_ids]).astype('int64')) # predictor self.predictor.run() # output_handle = self.predictor.get_output_handle(self.output_names[0]) output_data = output_handle.copy_to_cpu() return output_data # def postprocess(self, input_ids, seq_len, preds): tokens = self.tokenizer.convert_ids_to_tokens(input_ids[1:seq_len - 1]) labels = preds[1:seq_len - 1].tolist() assert len(tokens) == len(labels) text = '' for t, l in zip(tokens, labels): text += t if l != 0: text += self._punc_list[l] return text def __call__(self, text: str) -> str: # input_ids, seg_ids, seq_len = self.preprocess(text) preds = self.infer(input_ids=input_ids, seg_ids=seg_ids) if len(preds.shape) == 2: preds = preds[0] text = self.postprocess(input_ids, seq_len, preds) return text