Spaces:
Sleeping
Sleeping
#from conf import * | |
#main_path = "/Volumes/TOSHIBA EXT/temp/kbqa_portable_prj" | |
#main_path = "/Users/svjack/temp/HP_kbqa" | |
#from conf import * | |
#main_path = model_path | |
import logging | |
import os | |
import sys | |
from dataclasses import dataclass, field | |
from typing import Optional | |
import numpy as np | |
from datasets import ClassLabel, load_dataset, load_metric | |
import transformers | |
import transformers.adapters.composition as ac | |
from transformers import ( | |
AdapterConfig, | |
AutoConfig, | |
AutoModelForTokenClassification, | |
AutoTokenizer, | |
DataCollatorForTokenClassification, | |
HfArgumentParser, | |
MultiLingAdapterArguments, | |
PreTrainedTokenizerFast, | |
Trainer, | |
TrainingArguments, | |
set_seed, | |
) | |
from transformers.trainer_utils import get_last_checkpoint | |
from transformers.utils import check_min_version | |
from transformers.utils.versions import require_version | |
import pandas as pd | |
import pickle as pkl | |
from copy import deepcopy | |
import torch | |
from scipy.special import softmax | |
from functools import partial, reduce | |
import json | |
from io import StringIO | |
import re | |
from transformers import list_adapters, AutoModelWithHeads | |
from collections import defaultdict | |
class ModelArguments: | |
""" | |
Arguments pertaining to which model/config/tokenizer we are going to fine-tune from. | |
""" | |
model_name_or_path: str = field( | |
metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models"} | |
) | |
config_name: Optional[str] = field( | |
default=None, metadata={"help": "Pretrained config name or path if not the same as model_name"} | |
) | |
tokenizer_name: Optional[str] = field( | |
default=None, metadata={"help": "Pretrained tokenizer name or path if not the same as model_name"} | |
) | |
cache_dir: Optional[str] = field( | |
default=None, | |
metadata={"help": "Where do you want to store the pretrained models downloaded from huggingface.co"}, | |
) | |
model_revision: str = field( | |
default="main", | |
metadata={"help": "The specific model version to use (can be a branch name, tag name or commit id)."}, | |
) | |
use_auth_token: bool = field( | |
default=False, | |
metadata={ | |
"help": "Will use the token generated when running `transformers-cli login` (necessary to use this script " | |
"with private models)." | |
}, | |
) | |
class DataTrainingArguments: | |
""" | |
Arguments pertaining to what data we are going to input our model for training and eval. | |
""" | |
task_name: Optional[str] = field(default="ner", metadata={"help": "The name of the task (ner, pos...)."}) | |
dataset_name: Optional[str] = field( | |
default=None, metadata={"help": "The name of the dataset to use (via the datasets library)."} | |
) | |
dataset_config_name: Optional[str] = field( | |
default=None, metadata={"help": "The configuration name of the dataset to use (via the datasets library)."} | |
) | |
train_file: Optional[str] = field( | |
default=None, metadata={"help": "The input training data file (a csv or JSON file)."} | |
) | |
validation_file: Optional[str] = field( | |
default=None, | |
metadata={"help": "An optional input evaluation data file to evaluate on (a csv or JSON file)."}, | |
) | |
test_file: Optional[str] = field( | |
default=None, | |
metadata={"help": "An optional input test data file to predict on (a csv or JSON file)."}, | |
) | |
text_column_name: Optional[str] = field( | |
default=None, metadata={"help": "The column name of text to input in the file (a csv or JSON file)."} | |
) | |
label_column_name: Optional[str] = field( | |
default=None, metadata={"help": "The column name of label to input in the file (a csv or JSON file)."} | |
) | |
overwrite_cache: bool = field( | |
default=False, metadata={"help": "Overwrite the cached training and evaluation sets"} | |
) | |
preprocessing_num_workers: Optional[int] = field( | |
default=None, | |
metadata={"help": "The number of processes to use for the preprocessing."}, | |
) | |
pad_to_max_length: bool = field( | |
default=False, | |
metadata={ | |
"help": "Whether to pad all samples to model maximum sentence length. " | |
"If False, will pad the samples dynamically when batching to the maximum length in the batch. More " | |
"efficient on GPU but very bad for TPU." | |
}, | |
) | |
max_train_samples: Optional[int] = field( | |
default=None, | |
metadata={ | |
"help": "For debugging purposes or quicker training, truncate the number of training examples to this " | |
"value if set." | |
}, | |
) | |
max_eval_samples: Optional[int] = field( | |
default=None, | |
metadata={ | |
"help": "For debugging purposes or quicker training, truncate the number of evaluation examples to this " | |
"value if set." | |
}, | |
) | |
max_predict_samples: Optional[int] = field( | |
default=None, | |
metadata={ | |
"help": "For debugging purposes or quicker training, truncate the number of prediction examples to this " | |
"value if set." | |
}, | |
) | |
label_all_tokens: bool = field( | |
default=False, | |
metadata={ | |
"help": "Whether to put the label for one word on all tokens of generated by that word or just on the " | |
"one (in which case the other tokens will have a padding index)." | |
}, | |
) | |
return_entity_level_metrics: bool = field( | |
default=False, | |
metadata={"help": "Whether to return all the entity levels during evaluation or just the overall ones."}, | |
) | |
def __post_init__(self): | |
if self.dataset_name is None and self.train_file is None and self.validation_file is None: | |
raise ValueError("Need either a dataset name or a training/validation file.") | |
else: | |
if self.train_file is not None: | |
extension = self.train_file.split(".")[-1] | |
assert extension in ["csv", "json"], "`train_file` should be a csv or a json file." | |
if self.validation_file is not None: | |
extension = self.validation_file.split(".")[-1] | |
assert extension in ["csv", "json"], "`validation_file` should be a csv or a json file." | |
self.task_name = self.task_name.lower() | |
import os | |
#p0 = os.path.join(main_path, "sel_ner/ner_data_args.pkl") | |
p0 = "sel_ner/ner_data_args.pkl" | |
assert os.path.exists(p0) | |
with open(p0, "rb") as f: | |
t4 = pkl.load(f) | |
model_args, data_args, training_args, adapter_args = map(deepcopy, t4) | |
zh_model = AutoModelWithHeads.from_pretrained("bert-base-chinese") | |
#config_path = "/Users/svjack/temp/ner_trans/adapter_ner_data/test-sel-ner/sel_ner/adapter_config.json" | |
#adapter_path = "/Users/svjack/temp/ner_trans/adapter_ner_data/test-sel-ner/sel_ner" | |
config_path = "sel_ner/adapter_config.json" | |
adapter_path = "sel_ner" | |
#config_path = os.path.join(main_path ,"sel_ner/adapter_config.json") | |
#adapter_path = os.path.join(main_path ,"sel_ner") | |
config = AdapterConfig.load(config_path) | |
zh_model.load_adapter(adapter_path, config=config) | |
zh_model.set_active_adapters(['sel_ner']) | |
def single_sent_pred(input_text, tokenizer, model): | |
input_ = tokenizer(input_text) | |
input_ids = input_["input_ids"] | |
output = model(torch.Tensor([input_ids]).type(torch.LongTensor)) | |
output_prob = softmax(output.logits.detach().numpy()[0], axis = -1) | |
token_list = tokenizer.convert_ids_to_tokens(input_ids) | |
assert len(token_list) == len(output_prob) | |
return token_list, output_prob | |
def single_pred_to_df(token_list, output_prob, label_list): | |
assert output_prob.shape[0] == len(token_list) and output_prob.shape[1] == len(label_list) | |
pred_label_list = pd.Series(output_prob.argmax(axis = -1)).map( | |
lambda idx: label_list[idx] | |
).tolist() | |
return pd.concat(list(map(pd.Series, [token_list, pred_label_list])), axis = 1) | |
def token_l_to_nest_l(token_l, prefix = "##"): | |
req = [] | |
#req.append([]) | |
#### token_l must startswith [CLS] | |
assert token_l[0] == "[CLS]" | |
for ele in token_l: | |
if not ele.startswith(prefix): | |
req.append([ele]) | |
else: | |
req[-1].append(ele) | |
return req | |
def list_window_collect(l, w_size = 1, drop_NONE = False): | |
assert len(l) >= w_size | |
req = [] | |
for i in range(len(l)): | |
l_slice = l[i: i + w_size] | |
l_slice += [None] * (w_size - len(l_slice)) | |
req.append(l_slice) | |
if drop_NONE: | |
return list(filter(lambda x: None not in x, req)) | |
return req | |
def same_pkt_l(l0, l1): | |
l0_size_l = list(map(len, l0)) | |
assert sum(l0_size_l) == len(l1) | |
cum_l0_size = np.cumsum(l0_size_l).tolist() | |
slice_l = list_window_collect(cum_l0_size, 2, drop_NONE=True) | |
slice_l = [[0 ,slice_l[0][0]]] + slice_l | |
slice_df = pd.DataFrame(slice_l) | |
return (l0, slice_df.apply(lambda s: l1[s[0]:s[1]], axis = 1).tolist()) | |
def cnt_backtrans_slice(token_list, label_list, prefix = "##", | |
token_agg_func = lambda x: x[0] if len(x) == 1 else "".join([x[0]] + list(map(lambda y: y[len("##"):], x[1:]))), | |
label_agg_func = lambda x: x[0] if len(x) == 1 else pd.Series(x).value_counts().index.tolist()[0] | |
): | |
token_nest_list = token_l_to_nest_l(token_list, prefix=prefix) | |
token_nest_list, label_nest_list = same_pkt_l(token_nest_list, label_list) | |
token_list_req = list(map(token_agg_func, token_nest_list)) | |
label_list_req = list(map(label_agg_func, label_nest_list)) | |
return (token_list_req, label_list_req) | |
def from_text_to_final(input_text, tokenizer, model, label_list): | |
token_list, output_prob = single_sent_pred(input_text, tokenizer, model) | |
token_pred_df = single_pred_to_df(token_list, output_prob, label_list) | |
token_list_, label_list_ = token_pred_df[0].tolist(), token_pred_df[1].tolist() | |
token_pred_df_reduce = pd.DataFrame(list(zip(*cnt_backtrans_slice(token_list_, label_list_)))) | |
return token_pred_df_reduce | |
tokenizer_name_or_path = model_args.tokenizer_name if model_args.tokenizer_name else model_args.model_name_or_path | |
tokenizer = AutoTokenizer.from_pretrained( | |
tokenizer_name_or_path, | |
cache_dir=model_args.cache_dir, | |
use_fast=True, | |
revision=model_args.model_revision, | |
use_auth_token=True if model_args.use_auth_token else None, | |
) | |
label_list = ['O-TAG', 'E-TAG', 'T-TAG'] | |
### fix eng with " " | |
### used when ner_model input with some eng-string fillwith " " | |
def fill_str(sent ,str_): | |
is_en = False | |
if re.findall("[a-zA-Z0-9 ]+", str_) and re.findall("[a-zA-Z0-9 ]+", str_)[0] == str_: | |
is_en = True | |
if not is_en: | |
return str_ | |
find_part = re.findall("([{} ]+)".format(str_), text) | |
assert find_part | |
find_part = sorted(filter(lambda x: x.replace(" ", "") == str_.replace(" ", "") ,find_part), key = len, reverse = True)[0] | |
assert find_part in sent | |
return find_part | |
def for_loop_detect(s, invalid_tag = "O-TAG", sp_token = "123454321"): | |
assert type(s) == type(pd.Series()) | |
char_list = s.iloc[0] | |
tag_list = s.iloc[1] | |
assert len(char_list) == len(tag_list) | |
req = defaultdict(list) | |
pre_tag = "" | |
for idx, tag in enumerate(tag_list): | |
if tag == invalid_tag or tag != pre_tag: | |
for k in req.keys(): | |
if req[k][-1] != invalid_tag: | |
req[k].append(sp_token) | |
if tag != pre_tag and tag != invalid_tag: | |
char = char_list[idx] | |
req[tag].append(char) | |
elif tag != invalid_tag: | |
char = char_list[idx] | |
req[tag].append(char) | |
pre_tag = tag | |
req = dict(map(lambda t2: ( | |
t2[0], | |
list( | |
filter(lambda x: x.strip() ,"".join(t2[1]).split(sp_token)) | |
) | |
), req.items())) | |
return req | |
def ner_entity_type_predict_only(question): | |
assert type(question) == type("") | |
question = question.replace(" ", "") | |
ner_df = from_text_to_final( | |
" ".join(list(question)), | |
tokenizer, | |
zh_model, | |
label_list | |
) | |
assert ner_df.shape[0] == len(question) + 2 | |
### [UNK] filling | |
ner_df[0] = ["[CLS]"] + list(question) + ["[SEP]"] | |
et_dict = for_loop_detect(ner_df.T.apply(lambda x: x.tolist(), axis = 1)) | |
return et_dict | |
import gradio as gr | |
example_sample = [ | |
"宁波在哪个省份?", | |
"美国的通货是什么?", | |
] | |
demo = gr.Interface( | |
fn=ner_entity_type_predict_only, | |
inputs="text", | |
outputs="json", | |
title=f"Chinese Question Entity Property decomposition 🌧️ demonstration", | |
examples=example_sample if example_sample else None, | |
cache_examples = False | |
) | |
demo.launch(server_name=None, server_port=None) | |
''' | |
rep = requests.post( | |
url = "http://localhost:8855/extract_et", | |
data = { | |
"question": "哈利波特的作者是谁?" | |
} | |
) | |
json.loads(rep.content.decode()) | |
@csrf_exempt | |
def extract_et(request): | |
assert request.method == "POST" | |
post_data = request.POST | |
question = post_data["question"] | |
assert type(question) == type("") | |
#question = "宁波在哪个省?" | |
#abc = do_search(question) | |
et_dict = ner_entity_type_predict_only(question) | |
assert type(et_dict) == type({}) | |
return HttpResponse(json.dumps(et_dict)) | |
if __name__ == "__main__": | |
from_text_to_final("宁波在哪个省?", | |
tokenizer, | |
zh_model, | |
label_list | |
) | |
from_text_to_final("美国的通货是什么?", | |
tokenizer, | |
zh_model, | |
label_list | |
) | |
''' | |