File size: 15,287 Bytes
aef7e33 b083e5f aef7e33 4deb54c aef7e33 4deb54c aef7e33 4deb54c f0aa55b aef7e33 f0aa55b b617e3c 38f5a66 f0aa55b f17dec9 f0aa55b aef7e33 4deb54c aef7e33 c6eb236 aef7e33 c6eb236 aef7e33 c6eb236 aef7e33 c6eb236 aef7e33 4deb54c c6eb236 4deb54c c6eb236 4deb54c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
import torch
import pickle
from transformers import AutoTokenizer , DistilBertForSequenceClassification
from transformers import BatchEncoding, PreTrainedTokenizerBase
from typing import Optional
from torch import Tensor
import numpy as np
from random import shuffle
from Model import BERT
from Model import tokenizer , mult_token_id , cls_token_id , pad_token_id , max_pred , maxlen , sep_token_id
from transformers import pipeline
device = "cpu"
# Load the model
def load_models():
print("Loading DistilBERT model...")
model = DistilBertForSequenceClassification.from_pretrained("DistillMDPI1/DistillMDPI1/saved_model")
print("Loading BERT model...")
neptune = BERT()
device = "cpu"
model_save_path = "Neptune/Neptune/model.pt"
neptune.load_state_dict(torch.load(model_save_path, map_location=torch.device('cpu')))
neptune.to(device)
print("Loading speech recognition pipeline...")
pipe = pipeline(
"automatic-speech-recognition",
model="openai/whisper-tiny.en",
chunk_length_s=30,
device=device,
)
print(pipe)
# Charger le label encoder
with open("DistillMDPI1/DistillMDPI1/label_encoder.pkl", "rb") as f:
label_encoder = pickle.load(f)
return model, neptune, pipe
class_labels = {
16: ('vehicles','info' , '#4f9ef8'),
10: ('environments','success' , '#0cbc87'),
9: ('energies', 'danger', '#d6293e'),
0: ('Physics', 'primary', '#0f6fec'),
13: ('robotics', 'moss','#B1E5F2'),
3: ('agriculture','agri' , '#a8c686'),
11: ('ML', 'yellow', '#ffc107'),
8: ('economies', 'warning' , '#f7c32e'),
15: ('technologies','vanila' ,'#FDF0D5' ),
12: ('mathematics','coffe' ,'#7f5539' ),
14: ('sports', 'orange', '#fd7e14'),
4: ('AI','cyan', '#0dcaf0'),
6: ('Innovation','rosy' ,'#BF98A0'),
5: ('Science','picton' ,'#5fa8d3' ),
1: ('Societies','purple' , '#6f42c1'),
2: ('administration','pink', '#d63384'),
7: ('biology' ,'cambridge' , '#88aa99')}
def predict_class(text , model):
# Tokenisation du texte
inputs = transform_list_of_texts(text, tokenizer, 510, 510, 1, 2550)
# Extraire le tenseur de la liste
input_ids_tensor = inputs["input_ids"][0]
attention_mask_tensor = inputs["attention_mask"][0]
# Passage du texte à travers le modèle
with torch.no_grad():
outputs = model(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
# Application de la fonction softmax
probabilities = torch.softmax(outputs.logits, dim=1)[0]
# Identification de la classe majoritaire
predicted_class_index = torch.argmax(probabilities).item()
predicted_class = class_labels[predicted_class_index]
# Créer un dictionnaire de pourcentages trié par probabilité
sorted_percentages = {class_labels[idx]: probabilities[idx].item() * 100 for idx in range(len(class_labels))}
sorted_percentages = dict(sorted(sorted_percentages.items(), key=lambda item: item[1], reverse=True))
return predicted_class, sorted_percentages
def transform_list_of_texts(
texts: list[str],
tokenizer: PreTrainedTokenizerBase,
chunk_size: int,
stride: int,
minimal_chunk_length: int,
maximal_text_length: Optional[int] = None,
) -> BatchEncoding:
model_inputs = [
transform_single_text(text, tokenizer, chunk_size, stride, minimal_chunk_length, maximal_text_length)
for text in texts
]
input_ids = [model_input[0] for model_input in model_inputs]
attention_mask = [model_input[1] for model_input in model_inputs]
tokens = {"input_ids": input_ids, "attention_mask": attention_mask}
return BatchEncoding(tokens)
def transform_single_text(
text: str,
tokenizer: PreTrainedTokenizerBase,
chunk_size: int,
stride: int,
minimal_chunk_length: int,
maximal_text_length: Optional[int],
) -> tuple[Tensor, Tensor]:
"""Transforms (the entire) text to model input of BERT model."""
if maximal_text_length:
tokens = tokenize_text_with_truncation(text, tokenizer, maximal_text_length)
else:
tokens = tokenize_whole_text(text, tokenizer)
input_id_chunks, mask_chunks = split_tokens_into_smaller_chunks(tokens, chunk_size, stride, minimal_chunk_length)
add_special_tokens_at_beginning_and_end(input_id_chunks, mask_chunks)
add_padding_tokens(input_id_chunks, mask_chunks , chunk_size)
input_ids, attention_mask = stack_tokens_from_all_chunks(input_id_chunks, mask_chunks)
return input_ids, attention_mask
def tokenize_whole_text(text: str, tokenizer: PreTrainedTokenizerBase) -> BatchEncoding:
"""Tokenizes the entire text without truncation and without special tokens."""
tokens = tokenizer(text, add_special_tokens=False, truncation=False, return_tensors="pt")
return tokens
def tokenize_text_with_truncation(
text: str, tokenizer: PreTrainedTokenizerBase, maximal_text_length: int
) -> BatchEncoding:
"""Tokenizes the text with truncation to maximal_text_length and without special tokens."""
tokens = tokenizer(
text, add_special_tokens=False, max_length=maximal_text_length, truncation=True, return_tensors="pt"
)
return tokens
def split_tokens_into_smaller_chunks(
tokens: BatchEncoding,
chunk_size: int,
stride: int,
minimal_chunk_length: int,
) -> tuple[list[Tensor], list[Tensor]]:
"""Splits tokens into overlapping chunks with given size and stride."""
input_id_chunks = split_overlapping(tokens["input_ids"][0], chunk_size, stride, minimal_chunk_length)
mask_chunks = split_overlapping(tokens["attention_mask"][0], chunk_size, stride, minimal_chunk_length)
return input_id_chunks, mask_chunks
def add_special_tokens_at_beginning_and_end(input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> None:
"""
Adds special CLS token (token id = 101) at the beginning.
Adds SEP token (token id = 102) at the end of each chunk.
Adds corresponding attention masks equal to 1 (attention mask is boolean).
"""
for i in range(len(input_id_chunks)):
# adding CLS (token id 101) and SEP (token id 102) tokens
input_id_chunks[i] = torch.cat([Tensor([101]), input_id_chunks[i], Tensor([102])])
# adding attention masks corresponding to special tokens
mask_chunks[i] = torch.cat([Tensor([1]), mask_chunks[i], Tensor([1])])
def add_padding_tokens(input_id_chunks: list[Tensor], mask_chunks: list[Tensor] , chunk_size) -> None:
"""Adds padding tokens (token id = 0) at the end to make sure that all chunks have exactly 512 tokens."""
for i in range(len(input_id_chunks)):
# get required padding length
pad_len = chunk_size + 2 - input_id_chunks[i].shape[0]
# check if tensor length satisfies required chunk size
if pad_len > 0:
# if padding length is more than 0, we must add padding
input_id_chunks[i] = torch.cat([input_id_chunks[i], Tensor([0] * pad_len)])
mask_chunks[i] = torch.cat([mask_chunks[i], Tensor([0] * pad_len)])
def stack_tokens_from_all_chunks(input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> tuple[Tensor, Tensor]:
"""Reshapes data to a form compatible with BERT model input."""
input_ids = torch.stack(input_id_chunks)
attention_mask = torch.stack(mask_chunks)
return input_ids.long(), attention_mask.int()
def split_overlapping(tensor: Tensor, chunk_size: int, stride: int, minimal_chunk_length: int) -> list[Tensor]:
"""Helper function for dividing 1-dimensional tensors into overlapping chunks."""
result = [tensor[i : i + chunk_size] for i in range(0, len(tensor), stride)]
if len(result) > 1:
# ignore chunks with less than minimal_length number of tokens
result = [x for x in result if len(x) >= minimal_chunk_length]
return result
## Voice part
def stack_tokens_from_all_chunks_for_inference(input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> tuple[Tensor, Tensor]:
"""Reshapes data to a form compatible with BERT model input."""
input_ids = torch.stack(input_id_chunks)
attention_mask = torch.stack(mask_chunks)
return input_ids.long(), attention_mask.int()
def transform_for_inference_text(text: str,
tokenizer: PreTrainedTokenizerBase,
chunk_size: int,
stride: int,
minimal_chunk_length: int,
maximal_text_length: Optional[int],) -> BatchEncoding:
if maximal_text_length:
tokens = tokenize_text_with_truncation(text, tokenizer, maximal_text_length)
else:
tokens = tokenize_whole_text(text, tokenizer)
input_id_chunks, mask_chunks = split_tokens_into_smaller_chunks(tokens, chunk_size, stride, minimal_chunk_length)
add_special_tokens_at_beginning_and_end_inference(input_id_chunks, mask_chunks)
add_padding_tokens_inference(input_id_chunks, mask_chunks, chunk_size)
input_ids, attention_mask = stack_tokens_from_all_chunks_for_inference(input_id_chunks, mask_chunks)
return {"input_ids": input_ids, "attention_mask": attention_mask}
def add_special_tokens_at_beginning_and_end_inference(input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> None:
"""
Adds special MULT token, CLS token at the beginning.
Adds SEP token at the end of each chunk.
Adds corresponding attention masks equal to 1 (attention mask is boolean).
"""
for i in range(len(input_id_chunks)):
# adding MULT, CLS, and SEP tokens
input_id_chunks[i] = torch.cat([input_id_chunks[i]])
# adding attention masks corresponding to special tokens
mask_chunks[i] = torch.cat([mask_chunks[i]])
def add_padding_tokens_inference(input_id_chunks: list[Tensor], mask_chunks: list[Tensor], chunk_size: int) -> None:
"""Adds padding tokens at the end to make sure that all chunks have exactly chunk_size tokens."""
pad_token_id = 0 # Assuming this is defined somewhere in your code
for i in range(len(input_id_chunks)):
# get required padding length
pad_len = chunk_size - input_id_chunks[i].shape[0]
# check if tensor length satisfies required chunk size
if pad_len > 0:
# if padding length is more than 0, we must add padding
input_id_chunks[i] = torch.cat([input_id_chunks[i], torch.tensor([pad_token_id] * pad_len)])
mask_chunks[i] = torch.cat([mask_chunks[i], torch.tensor([0] * pad_len)])
def prepare_text(tokens_splitted: BatchEncoding):
batch = []
sentences = []
input_ids_list = tokens_splitted['input_ids']
for i in range(0, len(input_ids_list), 2): # Adjust loop to stop at second last index
k = i + 1
if k == len(input_ids_list):
input_ids_a = input_ids_list[i]
input_ids_a = [token for token in input_ids_a.view(-1).tolist() if token != pad_token_id]
input_ids_b = []
input_ids = [cls_token_id] + [mult_token_id] + input_ids_a + [sep_token_id] + [mult_token_id] + input_ids_b + [sep_token_id]
text_input_a = tokenizer.decode(input_ids_a)
sentences.append(text_input_a)
segment_ids = [0] * (1 + 1 + len(input_ids_a) + 1) + [1] * (1 + len(input_ids_b) + 1)
# MASK LM
n_pred = min(max_pred, max(1, int(round(len(input_ids) * 0.15))))
cand_masked_pos = [idx for idx, token in enumerate(input_ids) if token not in [cls_token_id, sep_token_id, mult_token_id]]
shuffle(cand_masked_pos)
masked_tokens, masked_pos = [], []
for pos in cand_masked_pos[:n_pred]:
masked_pos.append(pos)
masked_tokens.append(input_ids[pos])
input_ids[pos] = tokenizer.mask_token_id
# Zero Padding
n_pad = maxlen - len(input_ids)
input_ids.extend([pad_token_id] * n_pad)
segment_ids.extend([0] * n_pad)
# Zero Padding for masked tokens
if max_pred > n_pred:
n_pad = max_pred - n_pred
masked_tokens.extend([0] * n_pad)
masked_pos.extend([0] * n_pad)
else:
input_ids_a = input_ids_list[i] # Correct the indexing here
input_ids_b = input_ids_list[k] # Correct the indexing here
input_ids_a = [token for token in input_ids_a.view(-1).tolist() if token != pad_token_id]
input_ids_b = [token for token in input_ids_b.view(-1).tolist() if token != pad_token_id]
input_ids = [cls_token_id] + [mult_token_id] + input_ids_a + [sep_token_id] + [mult_token_id] + input_ids_b + [sep_token_id]
segment_ids = [0] * (1 + 1 + len(input_ids_a) + 1) + [1] * (1 + len(input_ids_b) + 1)
text_input_a = tokenizer.decode(input_ids_a)
text_input_b = tokenizer.decode(input_ids_b)
sentences.append(text_input_a)
sentences.append(text_input_b)
# MASK LM
n_pred = min(max_pred, max(1, int(round(len(input_ids) * 0.15))))
cand_masked_pos = [idx for idx, token in enumerate(input_ids) if token not in [cls_token_id, sep_token_id, mult_token_id]]
shuffle(cand_masked_pos)
masked_tokens, masked_pos = [], []
for pos in cand_masked_pos[:n_pred]:
masked_pos.append(pos)
masked_tokens.append(input_ids[pos])
input_ids[pos] = tokenizer.mask_token_id
# Zero Padding
n_pad = maxlen - len(input_ids)
input_ids.extend([pad_token_id] * n_pad)
segment_ids.extend([0] * n_pad)
# Zero Padding for masked tokens
if max_pred > n_pred:
n_pad = max_pred - n_pred
masked_tokens.extend([0] * n_pad)
masked_pos.extend([0] * n_pad)
batch.append([input_ids, segment_ids, masked_pos])
return batch, sentences
def inference(text: str):
encoded_text = transform_for_inference_text(text, tokenizer, 125, 125, 1, 2550)
batch, sentences = prepare_text(encoded_text)
return batch, sentences
def predict(inference_batch,neptune , device = device):
all_preds_mult1 = []
neptune.eval()
with torch.no_grad():
for batch in inference_batch:
input_ids = torch.tensor(batch[0], device=device, dtype=torch.long).unsqueeze(0)
segment_ids = torch.tensor(batch[1], device=device, dtype=torch.long).unsqueeze(0)
masked_pos = torch.tensor(batch[2], device=device, dtype=torch.long).unsqueeze(0)
_, _, logits_mclsf1, logits_mclsf2 = neptune(input_ids, segment_ids, masked_pos)
preds_mult1 = torch.argmax(logits_mclsf1, dim=1).cpu().detach().numpy()
preds_mult2 = torch.argmax(logits_mclsf2, dim=1).cpu().detach().numpy()
all_preds_mult1.extend(preds_mult1)
all_preds_mult1.extend(preds_mult2)
return all_preds_mult1
def align_predictions_with_sentences(sentences, preds):
dc = {} # Initialize an empty dictionary
for sentence, pred in zip(sentences, preds): # Iterate through sentences and predictions
dc[sentence] = class_labels.get(pred, "Unknown") # Look up the label for each prediction
return dc |