luxe-demo / app.py
singletongue's picture
Change MAX_TEXT_FILE_LINES to 10, clean up entity names, modify some UI components
fd70e43 verified
import csv
import re
import unicodedata
from collections import defaultdict
from itertools import chain
import gradio as gr
import torch
import torch.nn as nn
import torch.nn.functional as F
from bm25s.hf import BM25HF, TokenizerHF
from transformers import AutoModelForPreTraining, AutoTokenizer
ALIAS_SEP = "|"
CATEGORY_ENTITY_PREFIX = "Category:"
ENTITY_SPECIAL_TOKENS = ["[PAD]", "[UNK]", "[MASK]", "[MASK2]"]
MAX_TEXT_LENGTH = 800
MAX_TEXT_FILE_LINES = 10
MAX_ENTITY_FILE_LINES = 1000
repo_id = "studio-ousia/luxe"
revision = "ja-v0.3.2"
nayose_repo_id = "studio-ousia/luxe-nayose-bm25"
ignore_category_patterns = [
r"\d+年",
r"楽曲 [ぁ-ん]",
r"漫画作品 [ぁ-ん]",
r"アニメ作品 [ぁ-ん]",
r"アニメ作品 [ぁ-ん]",
r"の一覧",
r"各国の",
r"各年の",
]
def clean_default_entity_vocab(tokenizer):
entity_vocab = {}
for entity, entity_id in tokenizer.entity_vocab.items():
if entity.startswith("ja:"):
entity = entity.removeprefix("ja:")
elif entity.startswith("Category:ja:"):
entity = "Category:" + entity.removeprefix("Category:ja:")
entity_vocab[entity] = entity_id
tokenizer.entity_vocab = entity_vocab
def normalize_text(text: str) -> str:
return unicodedata.normalize("NFKC", text).strip()
def get_texts_from_file(file_path: str | None):
texts = []
if file_path is not None:
try:
with open(file_path, newline="") as f:
reader = csv.DictReader(f, fieldnames=["text"])
for i, row in enumerate(reader):
if i >= MAX_TEXT_FILE_LINES:
gr.Info(f"{MAX_TEXT_FILE_LINES}行目までのデータを読み込みました。", duration=5)
break
text = row["text"]
if text.strip() != "":
texts.append(text[:MAX_TEXT_LENGTH])
except Exception as e:
gr.Warning("ファイルを正しく読み込めませんでした。", duration=5)
print(e)
texts = []
return texts
def get_token_spans(tokenizer, text: str) -> list[tuple[int, int]]:
token_spans = []
end = 0
for token in tokenizer.tokenize(text):
token = token.removeprefix("##")
start = text.index(token, end)
end = start + len(token)
token_spans.append((start, end))
return [(0, 0)] + token_spans + [(end, end)] # count for "[CLS]" and "[SEP]"
def get_predicted_entity_spans(
ner_logits: torch.Tensor, token_spans: list[tuple[int, int]], entity_span_sensitivity: float = 1.0
) -> list[tuple[int, int]]:
length = ner_logits.size(-1)
assert ner_logits.size() == (length, length) # not batched
ner_probs = torch.sigmoid(ner_logits).triu()
probs_sorted, sort_idxs = ner_probs.flatten().sort(descending=True)
predicted_entity_spans = []
if entity_span_sensitivity > 0.0:
for p, i in zip(probs_sorted, sort_idxs.tolist()):
if p < 10.0 ** (-1.0 * entity_span_sensitivity):
break
start_idx = i // length
end_idx = i % length
start = token_spans[start_idx][0]
end = token_spans[end_idx][1]
for ex_start, ex_end in predicted_entity_spans:
if not (start < end <= ex_start or ex_end <= start < end):
break
else:
predicted_entity_spans.append((start, end))
return sorted(predicted_entity_spans)
def get_topk_entities_from_texts(
models,
texts: str | list[str],
k: int = 5,
entity_span_sensitivity: float = 1.0,
nayose_coef: float = 1.0,
entity_replaced_counts: bool = False,
) -> tuple[list[list[tuple[int, int]]], list[list[str]], list[list[str]], list[list[list[str]]]]:
gr.Info("LUXEによる予測を実行しています。", duration=5)
if isinstance(texts, str):
texts = [texts]
model, tokenizer, bm25_tokenizer, bm25_retriever = models
batch_entity_spans: list[list[tuple[int, int]]] = []
topk_normal_entities: list[list[str]] = []
topk_category_entities: list[list[str]] = []
topk_span_entities: list[list[list[str]]] = []
id2normal_entity = {
entity_id: entity
for entity, entity_id in tokenizer.entity_vocab.items()
if entity_id < model.config.num_normal_entities
}
id2category_entity = {
entity_id - model.config.num_normal_entities: entity
for entity, entity_id in tokenizer.entity_vocab.items()
if entity_id >= model.config.num_normal_entities
}
ignore_category_entity_ids = [
entity_id - model.config.num_normal_entities
for entity, entity_id in tokenizer.entity_vocab.items()
if entity_id >= model.config.num_normal_entities
and any(re.search(pattern, entity) for pattern in ignore_category_patterns)
]
entity_k = min(k, len(id2normal_entity))
category_k = min(k, len(id2category_entity))
for text in texts:
text = normalize_text(text).strip()
tokenized_examples = tokenizer(text, return_tensors="pt")
model_outputs = model(**tokenized_examples)
token_spans = get_token_spans(tokenizer, text)
entity_spans = get_predicted_entity_spans(model_outputs.ner_logits[0], token_spans, entity_span_sensitivity)
batch_entity_spans.append(entity_spans)
tokenized_examples = tokenizer(text, entity_spans=entity_spans or None, return_tensors="pt")
model_outputs = model(**tokenized_examples)
if model_outputs.topic_entity_logits is not None:
_, topk_normal_entity_ids = model_outputs.topic_entity_logits[0].topk(entity_k)
topk_normal_entities.append([id2normal_entity[id_] for id_ in topk_normal_entity_ids.tolist()])
else:
topk_normal_entities.append([])
if model_outputs.topic_category_logits is not None:
model_outputs.topic_category_logits[:, ignore_category_entity_ids] = float("-inf")
_, topk_category_entity_ids = model_outputs.topic_category_logits[0].topk(category_k)
topk_category_entities.append([id2category_entity[id_] for id_ in topk_category_entity_ids.tolist()])
else:
topk_category_entities.append([])
if model_outputs.entity_logits is not None:
span_entity_logits = model_outputs.entity_logits[0, :, :500000]
if nayose_coef > 0.0 and entity_replaced_counts == 0:
nayose_queries = ["ja:" + text[start:end] for start, end in entity_spans]
nayose_query_tokens = bm25_tokenizer.tokenize(nayose_queries)
nayose_scores = torch.vstack(
[torch.from_numpy(bm25_retriever.get_scores(tokens)) for tokens in nayose_query_tokens]
)
span_entity_logits += nayose_coef * nayose_scores
_, topk_span_entity_ids = span_entity_logits.topk(entity_k)
topk_span_entities.append(
[[id2normal_entity[id_] for id_ in ids] for ids in topk_span_entity_ids.tolist()]
)
else:
topk_span_entities.append([])
return texts, batch_entity_spans, topk_normal_entities, topk_category_entities, topk_span_entities
def get_new_entity_text_pairs_from_file(file_path: str | None) -> list[list[str]]:
new_entity_text_pairs = []
if file_path is not None:
try:
with open(file_path, newline="") as f:
reader = csv.DictReader(f, fieldnames=["entity", "text"])
for i, row in enumerate(reader):
if i >= MAX_ENTITY_FILE_LINES:
gr.Info(f"{MAX_ENTITY_FILE_LINES}行目までのデータを読み込みました。", duration=5)
break
entity = normalize_text(row["entity"]).strip()
text = normalize_text(row["text"]).strip()
if entity != "" and text != "":
new_entity_text_pairs.append([entity, text])
except Exception as e:
gr.Warning("ファイルを正しく読み込めませんでした。", duration=5)
print(e)
new_entity_text_pairs = []
return new_entity_text_pairs
def replace_entities(
models, new_entity_text_pairs: list[tuple[str, str]], entity_replaced_counts: int, preserve_default_entities: bool
) -> int:
if len(new_entity_text_pairs) == 0:
return entity_replaced_counts
gr.Info("LUXEのモデルとトークナイザのエンティティ語彙を更新しています。完了までお待ちください。", duration=5)
model, tokenizer, bm25_tokenizer, bm25_retriever = models
normal_entity_embeddings = defaultdict(list) # entity -> list of embeddings
category_entity_embeddings = defaultdict(list) # entity -> list of embeddings
normal_entity_counts = {} # entity -> count (int)
category_entity_counts = {} # entity -> count (int)
for entity, entity_id in sorted(tokenizer.entity_vocab.items(), key=lambda x: x[1]):
if entity in ENTITY_SPECIAL_TOKENS or preserve_default_entities:
entity_embedding = model.luke.entity_embeddings.entity_embeddings.weight.data[entity_id]
if entity.startswith(CATEGORY_ENTITY_PREFIX):
category_entity_embeddings[entity].append(entity_embedding)
if model.config.entity_counts is not None:
category_entity_counts[entity] = model.config.entity_counts[entity_id]
else:
category_entity_counts[entity] = 1
else:
normal_entity_embeddings[entity].append(entity_embedding)
if model.config.entity_counts is not None:
normal_entity_counts[entity] = model.config.entity_counts[entity_id]
else:
normal_entity_counts[entity] = 1
for entity, text in new_entity_text_pairs:
tokenized_inputs = tokenizer(text[:MAX_TEXT_LENGTH], return_tensors="pt")
model_outputs = model(**tokenized_inputs)
entity_embedding = model.entity_predictions.transform(model_outputs.last_hidden_state[:, 0])[0]
if entity.startswith(CATEGORY_ENTITY_PREFIX):
category_entity_embeddings[entity].append(entity_embedding)
category_entity_counts.setdefault(entity, 1)
else:
normal_entity_embeddings[entity].append(entity_embedding)
normal_entity_counts.setdefault(entity, 1)
num_normal_entities = len(normal_entity_embeddings)
num_category_entities = len(category_entity_embeddings)
entity_embeddings = {
entity: sum(embeddings) / len(embeddings)
for entity, embeddings in chain(normal_entity_embeddings.items(), category_entity_embeddings.items())
}
entity_vocab = {entity: entity_id for entity_id, entity in enumerate(entity_embeddings.keys())}
entity_counts = [
category_entity_counts[entity] if entity.startswith(CATEGORY_ENTITY_PREFIX) else normal_entity_counts[entity]
for entity in entity_vocab.keys()
]
tokenizer.entity_vocab = entity_vocab
tokenizer.entity_pad_token_id = entity_vocab["[PAD]"]
tokenizer.entity_unk_token_id = entity_vocab["[UNK]"]
tokenizer.entity_mask_token_id = entity_vocab["[MASK]"]
tokenizer.entity_mask2_token_id = entity_vocab["[MASK2]"]
entity_embeddings_tensor = torch.vstack(list(entity_embeddings.values()))
if model.config.normalize_entity_embeddings:
entity_embeddings_tensor = F.normalize(entity_embeddings_tensor)
entity_vocab_size, entity_emb_size = entity_embeddings_tensor.size()
entity_embeddings_module = nn.Embedding(
entity_vocab_size,
entity_emb_size,
padding_idx=tokenizer.entity_pad_token_id,
device=model.luke.entity_embeddings.entity_embeddings.weight.device,
dtype=model.luke.entity_embeddings.entity_embeddings.weight.dtype,
)
entity_embeddings_module.weight.data = entity_embeddings_tensor.data
model.luke.entity_embeddings.entity_embeddings = entity_embeddings_module
entity_decoder_module = nn.Linear(entity_emb_size, entity_vocab_size, bias=False)
model.entity_predictions.decoder = entity_decoder_module
model.entity_predictions.bias = nn.Parameter(torch.zeros(entity_vocab_size))
model.tie_weights()
if model.config.entity_counts is not None:
total_normal_entity_count = sum(entity_counts[:num_normal_entities])
total_category_entity_count = sum(entity_counts[num_normal_entities:])
entity_counts_tensor = torch.tensor(entity_counts, dtype=model.dtype, device=model.device)
total_entity_counts = torch.tensor(
[total_normal_entity_count] * num_normal_entities + [total_category_entity_count] * num_category_entities,
dtype=model.dtype,
device=model.device,
)
entity_log_probs = torch.log(entity_counts_tensor / total_entity_counts)
model.entity_log_probs = entity_log_probs
model.config.entity_vocab_size = entity_vocab_size
model.config.num_normal_entities = num_normal_entities
model.config.num_category_entities = num_category_entities
if model.config.entity_counts is not None:
model.config.entity_counts = entity_counts
gr.Info("LUXEのモデルとトークナイザのエンティティ語彙の更新が完了しました。", duration=5)
return entity_replaced_counts + 1
with gr.Blocks() as demo:
model = AutoModelForPreTraining.from_pretrained(repo_id, revision=revision, trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained(repo_id, revision=revision, trust_remote_code=True)
bm25_tokenizer = TokenizerHF(lower=True, splitter=tokenizer.tokenize, stopwords=None, stemmer=None)
bm25_tokenizer.load_vocab_from_hub("studio-ousia/luxe-nayose-bm25")
bm25_retriever = BM25HF.load_from_hub("studio-ousia/luxe-nayose-bm25")
clean_default_entity_vocab(tokenizer)
# Hint: gr.State に callable を渡すと、それが state の初期値を設定するための関数とみなされて
# __call__ が引数なしで実行されてしまうため、gr.State の引数に model や tokenizer を単体で渡すとエラーになってしまう。
# ここでは、モデル一式のタプル(callable でない)を渡すことで、そのようなエラーを回避している。
# cf. https://www.gradio.app/docs/gradio/state#param-state-value
models = gr.State((model, tokenizer, bm25_tokenizer, bm25_retriever))
texts_input = gr.State([])
entity_replaced_counts = gr.State(0)
topk = gr.State(5)
entity_span_sensitivity = gr.State(1.0)
nayose_coef = gr.State(1.0)
texts = gr.State([])
batch_entity_spans = gr.State([])
topk_normal_entities = gr.State([])
topk_category_entities = gr.State([])
topk_span_entities = gr.State([])
gr.Markdown("# 📝 LUXE Demo (β版)")
gr.Markdown(
"""Studio Ousia で開発中の次世代知識強化言語モデル **LUXE** の動作デモです。
入力されたテキストに対して、テキスト中に出現するエンティティ(事物)と、テキスト全体の主題となるエンティティおよびカテゴリを予測します。
デフォルトのLUXEは、エンティティおよびカテゴリとして、それぞれ日本語 Wikipedia における被リンク数上位50万件および10万件の項目を使用しています。
予測対象のエンティティを任意のものに置き換えて推論を行うことも可能です(下記「LUXE のエンティティ語彙を置き換える」を参照してください)。""",
line_breaks=True,
)
gr.Markdown("## 入力テキスト")
with gr.Tab(label="直接入力"):
text_input = gr.Textbox(label=f"入力テキスト(最大{MAX_TEXT_LENGTH}文字)", max_length=MAX_TEXT_LENGTH)
text_submit_button = gr.Button(value="予測実行", variant="huggingface")
with gr.Tab(label="ファイルアップロード"):
gr.Markdown(
f"""1行1事例のテキストファイル(最大{MAX_TEXT_FILE_LINES}行)をアップロードできます。
アップロードされたテキストのそれぞれに対して推論が実行されます。""",
line_breaks=True,
)
texts_file = gr.File(label="入力テキストファイル")
texts_submit_button = gr.Button(value="予測実行", variant="huggingface")
text_input.submit(
fn=get_topk_entities_from_texts,
inputs=[models, text_input, topk, entity_span_sensitivity, nayose_coef, entity_replaced_counts],
outputs=[texts, batch_entity_spans, topk_normal_entities, topk_category_entities, topk_span_entities],
)
text_submit_button.click(
fn=get_topk_entities_from_texts,
inputs=[models, text_input, topk, entity_span_sensitivity, nayose_coef, entity_replaced_counts],
outputs=[texts, batch_entity_spans, topk_normal_entities, topk_category_entities, topk_span_entities],
)
texts_file.change(fn=get_texts_from_file, inputs=texts_file, outputs=texts_input)
texts_submit_button.click(
fn=get_topk_entities_from_texts,
inputs=[models, texts_input, topk, entity_span_sensitivity, nayose_coef, entity_replaced_counts],
outputs=[texts, batch_entity_spans, topk_normal_entities, topk_category_entities, topk_span_entities],
)
gr.Markdown("---")
with gr.Accordion(label="ハイパーパラメータ", open=False):
topk_input = gr.Number(5, label="予測するエンティティの件数 (Top K)", interactive=True)
entity_span_sensitivity_input = gr.Slider(
minimum=0.0, maximum=5.0, value=1.0, step=0.1, label="エンティティ検出の積極度", interactive=True
)
nayose_coef_input = gr.Slider(
minimum=0.0, maximum=2.0, value=1.0, step=0.1, label="文字列一致の優先度", interactive=True
)
topk_input.change(fn=lambda val: val, inputs=topk_input, outputs=topk)
entity_span_sensitivity_input.change(
fn=lambda val: val, inputs=entity_span_sensitivity_input, outputs=entity_span_sensitivity
)
nayose_coef_input.change(fn=lambda val: val, inputs=nayose_coef_input, outputs=nayose_coef)
with gr.Accordion(label="LUXE のエンティティ語彙を置き換える", open=False):
gr.Markdown(
"""LUXE のモデルとトークナイザのエンティティ語彙を任意のエンティティ集合に置き換えます。
エンティティとともに与えられるエンティティの説明文から、エンティティの埋め込みが計算され、LUXE の推論に利用されます。""",
line_breaks=True,
)
gr.Markdown(
f"「エンティティ」と「エンティティの説明文」の2列からなる CSV ファイル(最大{MAX_ENTITY_FILE_LINES}行)をアップロードできます。"
)
new_entity_text_pairs_file = gr.File(label="エンティティと説明文の CSV ファイル", height="128px")
gr.Markdown("CSV ファイルから読み込まれた項目が以下の表に表示されます。表の内容を直接編集することも可能です。")
new_entity_text_pairs_input = gr.Dataframe(
# value=sample_new_entity_text_pairs,
headers=["entity", "text"],
col_count=(2, "fixed"),
type="array",
label="エンティティと説明文",
interactive=True,
)
preserve_default_entities_checkbox = gr.Checkbox(label="既存のエンティティを保持する", value=True)
replace_entity_button = gr.Button(value="エンティティ語彙を置き換える")
gr.Markdown("LUXE のモデルのエンティティ語彙は、デモページの再読み込み時にリセットされます。")
new_entity_text_pairs_file.change(
fn=get_new_entity_text_pairs_from_file, inputs=new_entity_text_pairs_file, outputs=new_entity_text_pairs_input
)
replace_entity_button.click(
fn=replace_entities,
inputs=[models, new_entity_text_pairs_input, entity_replaced_counts, preserve_default_entities_checkbox],
outputs=entity_replaced_counts,
)
gr.Markdown("---")
gr.Markdown("## 予測されたエンティティとカテゴリ")
@gr.render(inputs=[texts, batch_entity_spans, topk_normal_entities, topk_category_entities, topk_span_entities])
def render_topk_entities(
texts, batch_entity_spans, topk_normal_entities, topk_category_entities, topk_span_entities
):
for text, entity_spans, normal_entities, category_entities, span_entities in zip(
texts, batch_entity_spans, topk_normal_entities, topk_category_entities, topk_span_entities
):
highlighted_text_value = []
cur = 0
for start, end in entity_spans:
if cur < start:
highlighted_text_value.append((text[cur:start], None))
highlighted_text_value.append((text[start:end], "Entity"))
cur = end
if cur < len(text):
highlighted_text_value.append((text[cur:], None))
gr.HighlightedText(
value=highlighted_text_value,
color_map={"Entity": "green"},
combine_adjacent=False,
label="予測されたエンティティのスパン",
)
# gr.Textbox(text, label="Text")
if normal_entities:
gr.Dataset(
label="テキスト全体に関連するエンティティ",
components=["text"],
samples=[[entity] for entity in normal_entities],
)
if category_entities:
gr.Dataset(
label="テキスト全体に関連するカテゴリ",
components=["text"],
samples=[[entity] for entity in category_entities],
)
with gr.Accordion(label="テキスト中のスパンに対応するエンティティ", open=len(texts) == 1):
span_texts = [text[start:end] for start, end in entity_spans]
for span_text, entities in zip(span_texts, span_entities):
gr.Dataset(
label=f"「{span_text}」に対応するエンティティ",
components=["text"],
samples=[[entity] for entity in entities],
)
demo.launch()