Spaces:
Runtime error
Runtime error
import json | |
import os | |
import numpy as np | |
import pandas as pd | |
from transformers import (AutoModel, AutoModelForMaskedLM, AutoTokenizer, | |
RobertaModel, pipeline) | |
class MLMTest(): | |
def __init__(self, config_file="mlm_test_config.csv", full_text_file="mlm_full_text.csv", targeted_text_file="mlm_targeted_text.csv"): | |
self.config_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), config_file)) | |
self.config_df.fillna("", inplace=True) | |
self.full_text_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), full_text_file)) | |
self.targeted_text_df = pd.read_csv(os.path.join(os.path.dirname(os.path.realpath(__file__)), targeted_text_file)) | |
self.full_text_results = [] | |
self.targeted_text_results = [] | |
def _run_full_test_row(self, text, print_debug=False): | |
return_data = [] | |
data = text.split() | |
for i in range(0, len(data)): | |
masked_text = " ".join(data[:i]) + " "+self.nlp.tokenizer.mask_token+" " + " ".join(data[i+1:]) | |
expected_result = data[i] | |
result = self.nlp(masked_text) | |
self.full_text_results.append({"text": masked_text, "result": result[0]["token_str"], "true_output": expected_result}) | |
if print_debug: | |
print(masked_text) | |
print([x["token_str"] for x in result]) | |
print("-"*20) | |
return_data.append({"prediction": result[0]["token_str"], "true_output": expected_result}) | |
return return_data | |
def _run_targeted_test_row(self, text, expected_result, print_debug=False): | |
return_data = [] | |
result = self.nlp(text.replace("<mask>", self.nlp.tokenizer.mask_token)) | |
self.targeted_text_results.append({"text": text, "result": result[0]["token_str"], "true_output": expected_result}) | |
if print_debug: | |
print(text) | |
print([x["token_str"] for x in result]) | |
print("-"*20) | |
return_data.append({"prediction": result[0]["token_str"], "true_output": expected_result}) | |
return return_data | |
def _compute_acc(self, results): | |
ctr = 0 | |
for row in results: | |
try: | |
z = json.loads(row["true_output"]) | |
if isinstance(z, list): | |
if row["prediction"] in z: | |
ctr+=1 | |
except: | |
if row["prediction"] == row["true_output"]: | |
ctr+=1 | |
return float(ctr/len(results)) | |
def run_full_test(self, exclude_user_ids=[], print_debug=False): | |
df = pd.DataFrame() | |
for idx, row in self.config_df.iterrows(): | |
self.full_text_results = [] | |
model_name = row["model_name"] | |
display_name = row["display_name"] if row["display_name"] else row["model_name"] | |
revision = row["revision"] if row["revision"] else "main" | |
from_flax = row["from_flax"] | |
if from_flax: | |
model = AutoModelForMaskedLM.from_pretrained(model_name, from_flax=True, revision=revision) | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
tokenizer.save_pretrained('exported_pytorch_model') | |
model.save_pretrained('exported_pytorch_model') | |
self.nlp = pipeline('fill-mask', model="exported_pytorch_model") | |
else: | |
self.nlp = pipeline('fill-mask', model=model_name) | |
accs = [] | |
try: | |
for idx, row in self.full_text_df.iterrows(): | |
if row["user_id"] in exclude_user_ids: | |
continue | |
results = self._run_full_test_row(row["text"], print_debug=print_debug) | |
acc = self._compute_acc(results) | |
accs.append(acc) | |
except: | |
print("Error for", display_name) | |
continue | |
print(display_name, " Average acc:", sum(accs)/len(accs)) | |
if df.empty: | |
df = pd.DataFrame(self.full_text_results) | |
df.rename(columns={"result": display_name}, inplace=True) | |
else: | |
preds = [x["result"] for x in self.full_text_results] | |
df[display_name] = preds | |
df.to_csv("full_text_results.csv", index=False) | |
print("Results saved to full_text_results.csv") | |
def run_targeted_test(self, exclude_user_ids=[], print_debug=False): | |
df = pd.DataFrame() | |
for idx, row in self.config_df.iterrows(): | |
self.targeted_text_results = [] | |
model_name = row["model_name"] | |
display_name = row["display_name"] if row["display_name"] else row["model_name"] | |
revision = row["revision"] if row["revision"] else "main" | |
from_flax = row["from_flax"] | |
if from_flax: | |
model = AutoModelForMaskedLM.from_pretrained(model_name, from_flax=True, revision=revision) | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
tokenizer.save_pretrained('exported_pytorch_model') | |
model.save_pretrained('exported_pytorch_model') | |
self.nlp = pipeline('fill-mask', model="exported_pytorch_model") | |
else: | |
self.nlp = pipeline('fill-mask', model=model_name) | |
accs = [] | |
try: | |
for idx, row2 in self.targeted_text_df.iterrows(): | |
if row2["user_id"] in exclude_user_ids: | |
continue | |
results = self._run_targeted_test_row(row2["text"], row2["output"], print_debug=print_debug) | |
acc = self._compute_acc(results) | |
accs.append(acc) | |
except: | |
import traceback | |
print(traceback.format_exc()) | |
print("Error for", display_name) | |
continue | |
print(display_name, " Average acc:", sum(accs)/len(accs)) | |
if df.empty: | |
df = pd.DataFrame(self.targeted_text_results) | |
df.rename(columns={"result": display_name}, inplace=True) | |
else: | |
preds = [x["result"] for x in self.targeted_text_results] | |
df[display_name] = preds | |
df.to_csv("targeted_text_results.csv", index=False) | |
print("Results saved to targeted_text_results.csv") | |