roberta-hindi / mlm_custom /test_mlm.py
hassiahk's picture
Model changes and code formatting
666b7aa
raw
history blame
6.47 kB
import json
import os
import numpy as np
import pandas as pd
from transformers import (AutoModel, AutoModelForMaskedLM, AutoTokenizer,
RobertaModel, pipeline)
class MLMTest():
def __init__(self, config_file="mlm_test_config.csv", full_text_file="mlm_full_text.csv", targeted_text_file="mlm_targeted_text.csv"):
self.config_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), config_file))
self.config_df.fillna("", inplace=True)
self.full_text_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), full_text_file))
self.targeted_text_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), targeted_text_file))
self.full_text_results = []
self.targeted_text_results = []
def _run_full_test_row(self, text, print_debug=False):
return_data = []
data = text.split()
for i in range(0, len(data)):
masked_text = " ".join(data[:i]) + " "+self.nlp.tokenizer.mask_token+" " + " ".join(data[i+1:])
expected_result = data[i]
result = self.nlp(masked_text)
self.full_text_results.append({"text": masked_text, "result": result[0]["token_str"], "true_output": expected_result})
if print_debug:
print(masked_text)
print([x["token_str"] for x in result])
print("-"*20)
return_data.append({"prediction": result[0]["token_str"], "true_output": expected_result})
return return_data
def _run_targeted_test_row(self, text, expected_result, print_debug=False):
return_data = []
result = self.nlp(text.replace("<mask>", self.nlp.tokenizer.mask_token))
self.targeted_text_results.append({"text": text, "result": result[0]["token_str"], "true_output": expected_result})
if print_debug:
print(text)
print([x["token_str"] for x in result])
print("-"*20)
return_data.append({"prediction": result[0]["token_str"], "true_output": expected_result})
return return_data
def _compute_acc(self, results):
ctr = 0
for row in results:
try:
z = json.loads(row["true_output"])
if isinstance(z, list):
if row["prediction"] in z:
ctr+=1
except:
if row["prediction"] == row["true_output"]:
ctr+=1
return float(ctr/len(results))
def run_full_test(self, exclude_user_ids=[], print_debug=False):
df = pd.DataFrame()
for idx, row in self.config_df.iterrows():
self.full_text_results = []
model_name = row["model_name"]
display_name = row["display_name"] if row["display_name"] else row["model_name"]
revision = row["revision"] if row["revision"] else "main"
from_flax = row["from_flax"]
if from_flax:
model = AutoModelForMaskedLM.from_pretrained(model_name, from_flax=True, revision=revision)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.save_pretrained('exported_pytorch_model')
model.save_pretrained('exported_pytorch_model')
self.nlp = pipeline('fill-mask', model="exported_pytorch_model")
else:
self.nlp = pipeline('fill-mask', model=model_name)
accs = []
try:
for idx, row in self.full_text_df.iterrows():
if row["user_id"] in exclude_user_ids:
continue
results = self._run_full_test_row(row["text"], print_debug=print_debug)
acc = self._compute_acc(results)
accs.append(acc)
except:
print("Error for", display_name)
continue
print(display_name, " Average acc:", sum(accs)/len(accs))
if df.empty:
df = pd.DataFrame(self.full_text_results)
df.rename(columns={"result": display_name}, inplace=True)
else:
preds = [x["result"] for x in self.full_text_results]
df[display_name] = preds
df.to_csv("full_text_results.csv", index=False)
print("Results saved to full_text_results.csv")
def run_targeted_test(self, exclude_user_ids=[], print_debug=False):
df = pd.DataFrame()
for idx, row in self.config_df.iterrows():
self.targeted_text_results = []
model_name = row["model_name"]
display_name = row["display_name"] if row["display_name"] else row["model_name"]
revision = row["revision"] if row["revision"] else "main"
from_flax = row["from_flax"]
if from_flax:
model = AutoModelForMaskedLM.from_pretrained(model_name, from_flax=True, revision=revision)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.save_pretrained('exported_pytorch_model')
model.save_pretrained('exported_pytorch_model')
self.nlp = pipeline('fill-mask', model="exported_pytorch_model")
else:
self.nlp = pipeline('fill-mask', model=model_name)
accs = []
try:
for idx, row2 in self.targeted_text_df.iterrows():
if row2["user_id"] in exclude_user_ids:
continue
results = self._run_targeted_test_row(row2["text"], row2["output"], print_debug=print_debug)
acc = self._compute_acc(results)
accs.append(acc)
except:
import traceback
print(traceback.format_exc())
print("Error for", display_name)
continue
print(display_name, " Average acc:", sum(accs)/len(accs))
if df.empty:
df = pd.DataFrame(self.targeted_text_results)
df.rename(columns={"result": display_name}, inplace=True)
else:
preds = [x["result"] for x in self.targeted_text_results]
df[display_name] = preds
df.to_csv("targeted_text_results.csv", index=False)
print("Results saved to targeted_text_results.csv")