Spaces:
Running
Running
import re | |
import string | |
from collections import Counter | |
import numpy as np | |
import pandas as pd | |
import tqdm | |
from langchain.evaluation.qa import QAEvalChain | |
from langchain.llms import OpenAI | |
from algos.PWS import PWS_Base, PWS_Extra | |
from algos.notool import CoT, IO | |
from algos.react import ReactBase | |
def normalize_answer(s): | |
def remove_articles(text): | |
return re.sub(r"\b(a|an|the)\b", " ", text) | |
def white_space_fix(text): | |
return " ".join(text.split()) | |
def remove_punc(text): | |
exclude = set(string.punctuation) | |
return "".join(ch for ch in text if ch not in exclude) | |
def lower(text): | |
return text.lower() | |
return white_space_fix(remove_articles(remove_punc(lower(s)))) | |
def f1_score(prediction, ground_truth): | |
normalized_prediction = normalize_answer(prediction) | |
normalized_ground_truth = normalize_answer(ground_truth) | |
if normalized_prediction in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth: | |
return 0 | |
if normalized_ground_truth in ['yes', 'no', 'noanswer'] and normalized_prediction != normalized_ground_truth: | |
return 0 | |
prediction_tokens = normalized_prediction.split() | |
ground_truth_tokens = normalized_ground_truth.split() | |
common = Counter(prediction_tokens) & Counter(ground_truth_tokens) | |
num_same = sum(common.values()) | |
if num_same == 0: | |
return 0 | |
precision = 1.0 * num_same / len(prediction_tokens) | |
recall = 1.0 * num_same / len(ground_truth_tokens) | |
f1 = (2 * precision * recall) / (precision + recall) | |
return f1 | |
def llm_accuracy_score(query, prediction, ground_truth): | |
data = [{ | |
'query': query, | |
'answer': ground_truth, | |
}] | |
pred = [{ | |
'query': query, | |
'answer': ground_truth, | |
'result': prediction, | |
}] | |
eval_chain = QAEvalChain.from_llm(OpenAI(temperature=0)) | |
graded_outputs = eval_chain.evaluate(data, pred) | |
return 1 if graded_outputs[0]['text'].strip() == 'CORRECT' else 0 | |
class Evaluator: | |
def __init__(self, task, dataset, algo, maxtry=3): | |
assert task in ["hotpot_qa", "trivia_qa", "gsm8k", "physics_question", "disfl_qa", | |
"sports_understanding", "strategy_qa", "sotu_qa"] | |
assert isinstance(dataset, pd.DataFrame) | |
assert isinstance(algo, (PWS_Base, PWS_Extra, ReactBase, IO, CoT)) | |
self.task = task | |
self.dataset = dataset | |
self.algo = algo | |
self.maxtry = maxtry | |
self.failed_response = self._failed_response() | |
self.eval_data = self._initialize_eval_dict() | |
def run(self): | |
print("\n******************* Start Evaluation *******************\n") | |
if self.task in ["hotpot_qa", "sotu_qa"]: | |
for i in tqdm.tqdm(range(len(self.dataset))): | |
question = self.dataset["question"][i] | |
label = self.dataset["answer"][i] | |
for _ in range(self.maxtry): | |
try: | |
response = self.algo.run(question) | |
break | |
except: | |
response = self.failed_response | |
self._update_eval_dict(question, label, response) | |
elif self.task == "fever": | |
for i in tqdm.tqdm(range(len(self.dataset))): | |
question = self.dataset["claim"][i] | |
label = self.dataset["label"][i] | |
for _ in range(self.maxtry): | |
try: | |
response = self.algo.run(question) | |
break | |
except: | |
response = self.failed_response | |
self._update_eval_dict(question, label, response) | |
elif self.task == "trivia_qa": | |
for i in tqdm.tqdm(range(len(self.dataset))): | |
question = self.dataset["question"][i] | |
label = self.dataset["answer"][i]["value"] | |
for _ in range(self.maxtry): | |
try: | |
response = self.algo.run(question) | |
break | |
except: | |
response = self.failed_response | |
self._update_eval_dict(question, label, response) | |
elif self.task == "gsm8k": | |
for i in tqdm.tqdm(range(len(self.dataset))): | |
question = self.dataset["question"][i] | |
label = self.dataset["answer"][i].split("#### ")[1] | |
for _ in range(self.maxtry): | |
try: | |
response = self.algo.run(question) | |
break | |
except: | |
response = self.failed_response | |
self._update_eval_dict(question, label, response) | |
elif self.task in ["physics_question", "sports_understanding", "strategy_qa"]: | |
for i in tqdm.tqdm(range(len(self.dataset))): | |
question = self.dataset["input"][i] | |
label = self.dataset["target"][i] | |
for _ in range(self.maxtry): | |
try: | |
response = self.algo.run(question) | |
break | |
except: | |
response = self.failed_response | |
self._update_eval_dict(question, label, response) | |
else: | |
raise NotImplementedError | |
return self._get_avg_results(), self.eval_data | |
def _initialize_eval_dict(self): | |
data = {} | |
for d in ["label", "preds", "em", "f1", "acc", "wall_time", "total_tokens", "total_cost", "steps", "token_cost", | |
"tool_cost", "planner_log", "solver_log"]: | |
data[d] = [] | |
return data | |
def _update_eval_dict(self, question, label, response): | |
pred = self._parse_prediction(response["output"]) | |
self.eval_data["label"] += [label] | |
self.eval_data["preds"] += [pred] | |
self.eval_data["em"] += [self.get_metrics(question, label, pred)["em"]] | |
self.eval_data["f1"] += [self.get_metrics(question, label, pred)["f1"]] | |
self.eval_data["acc"] += [self.get_metrics(question, label, pred)["acc"]] | |
self.eval_data["wall_time"] += [response["wall_time"]] | |
self.eval_data["total_tokens"] += [response["total_tokens"]] | |
self.eval_data["total_cost"] += [response["total_cost"]] | |
self.eval_data["steps"] += [response["steps"]] | |
self.eval_data["token_cost"] += [response["token_cost"]] | |
self.eval_data["tool_cost"] += [response["tool_cost"]] | |
if "planner_log" in response: | |
self.eval_data["planner_log"] += [response["planner_log"]] | |
if "solver_log" in response: | |
self.eval_data["solver_log"] += [response["solver_log"]] | |
def _get_avg_results(self): | |
result = {} | |
result["avg_em"] = np.nanmean(self.eval_data["em"]) | |
result["avg_f1"] = np.nanmean(self.eval_data["f1"]) | |
result["avg_acc"] = np.nanmean(self.eval_data["acc"]) | |
result["avg_wall_time"] = np.nanmean(self.eval_data["wall_time"]) | |
result["avg_total_tokens"] = np.nanmean(self.eval_data["total_tokens"]) | |
result["avg_total_cost"] = np.nanmean(self.eval_data["total_cost"]) | |
result["avg_steps"] = np.nanmean(self.eval_data["steps"]) | |
result["avg_token_cost"] = np.nanmean(self.eval_data["token_cost"]) | |
result["avg_tool_cost"] = np.nanmean(self.eval_data["tool_cost"]) | |
return result | |
def get_metrics(self, query, label, pred): | |
if pred is None: | |
return {'em': 0, 'f1': 0} | |
norm_label = normalize_answer(label) | |
norm_pred = normalize_answer(pred) | |
em = (norm_pred == norm_label) | |
f1 = f1_score(norm_pred, norm_label) | |
acc = llm_accuracy_score(query, pred, label) | |
return {'em': em, 'f1': f1, 'acc': acc} | |
def _parse_prediction(self, output): | |
if isinstance(self.algo, IO): | |
return str(output).strip("\n") | |
elif isinstance(self.algo, CoT): | |
return str(output).split("\n")[-1].replace("Answer:", "") | |
elif isinstance(self.algo, ReactBase): | |
return str(output).strip("\n") | |
elif isinstance(self.algo, PWS_Base): | |
return str(output).strip("\n") | |
elif isinstance(self.algo, PWS_Extra): | |
return str(output).strip("\n") | |
def _failed_response(self): | |
resposne = {} | |
for key in ["input", "output", "wall_time", "total_tokens", "total_cost", "steps", "token_cost", "tool_cost"]: | |
resposne[key] = np.nan | |
return resposne | |