File size: 5,775 Bytes
4fd1faf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
from transformers import Pipeline
import numpy as np
import torch
from nltk.chunk import conlltags2tree
from nltk import pos_tag
from nltk.tree import Tree
import string
import torch.nn.functional as F
import re
def tokenize(text):
# print(text)
for punctuation in string.punctuation:
text = text.replace(punctuation, " " + punctuation + " ")
return text.split()
def find_entity_indices(article, entity):
"""
Find all occurrences of an entity in the article and return their indices.
:param article: The complete article text.
:param entity: The entity to search for.
:return: A list of tuples (lArticleOffset, rArticleOffset) for each occurrence.
"""
# normalized_target = normalize_text(entity)
# normalized_document = normalize_text(article)
entity_indices = []
for match in re.finditer(re.escape(entity), article):
start_idx = match.start()
end_idx = match.end()
entity_indices.append((start_idx, end_idx))
return entity_indices
def get_entities(tokens, tags, confidences, text):
tags = [tag.replace("S-", "B-").replace("E-", "I-") for tag in tags]
pos_tags = [pos for token, pos in pos_tag(tokens)]
conlltags = [(token, pos, tg) for token, pos, tg in zip(tokens, pos_tags, tags)]
ne_tree = conlltags2tree(conlltags)
entities = []
idx: int = 0
for subtree in ne_tree:
# skipping 'O' tags
if isinstance(subtree, Tree):
original_label = subtree.label()
original_string = " ".join([token for token, pos in subtree.leaves()])
for indices in find_entity_indices(text, original_string):
entity_start_position = indices[0]
entity_end_position = indices[1]
entities.append(
{
"entity": original_label,
"score": np.average(confidences[idx : idx + len(subtree)]),
"index": idx,
"word": original_string,
"start": entity_start_position,
"end": entity_end_position,
}
)
assert (
text[entity_start_position:entity_end_position] == original_string
)
idx += len(subtree)
# Update the current character position
# We add the length of the original string + 1 (for the space)
else:
token, pos = subtree
# If it's not a named entity, we still need to update the character
# position
idx += 1
return entities
def realign(
text_sentence, out_label_preds, softmax_scores, tokenizer, reverted_label_map
):
preds_list, words_list, confidence_list = [], [], []
word_ids = tokenizer(text_sentence, is_split_into_words=True).word_ids()
for idx, word in enumerate(text_sentence):
beginning_index = word_ids.index(idx)
try:
preds_list.append(reverted_label_map[out_label_preds[beginning_index]])
confidence_list.append(max(softmax_scores[beginning_index]))
except Exception as ex: # the sentence was longer then max_length
preds_list.append("O")
confidence_list.append(0.0)
words_list.append(word)
return words_list, preds_list, confidence_list
class MultitaskTokenClassificationPipeline(Pipeline):
def __init__(self, model, tokenizer, label_map, **kwargs):
super().__init__(model=model, tokenizer=tokenizer, **kwargs)
self.label_map = label_map
self.id2label = {
task: {id_: label for label, id_ in labels.items()}
for task, labels in label_map.items()
}
def _sanitize_parameters(self, **kwargs):
# Add any additional parameter handling if necessary
return kwargs, {}, {}
def preprocess(self, text, **kwargs):
tokenized_inputs = self.tokenizer(
text, padding="max_length", truncation=True, max_length=512
)
text_sentence = tokenize(text)
return tokenized_inputs, text_sentence, text
def _forward(self, inputs):
inputs, text_sentence, text = inputs
input_ids = torch.tensor([inputs["input_ids"]], dtype=torch.long).to(
self.model.device
)
attention_mask = torch.tensor([inputs["attention_mask"]], dtype=torch.long).to(
self.model.device
)
with torch.no_grad():
outputs = self.model(input_ids, attention_mask)
return outputs, text_sentence, text
def postprocess(self, outputs, **kwargs):
"""
Postprocess the outputs of the model
:param outputs:
:param kwargs:
:return:
"""
tokens_result, text_sentence, text = outputs
predictions = {}
confidence_scores = {}
for task, logits in tokens_result.logits.items():
predictions[task] = torch.argmax(logits, dim=-1).tolist()
confidence_scores[task] = F.softmax(logits, dim=-1).tolist()
decoded_predictions = {}
for task, preds in predictions.items():
decoded_predictions[task] = [
[self.id2label[task][label] for label in seq] for seq in preds
]
entities = {}
for task, preds in predictions.items():
words_list, preds_list, confidence_list = realign(
text_sentence,
preds[0],
confidence_scores[task][0],
self.tokenizer,
self.id2label[task],
)
entities[task] = get_entities(words_list, preds_list, confidence_list, text)
return entities
|