Spaces:
Runtime error
Runtime error
""" | |
Common data structures and utilities. | |
""" | |
import ast | |
import dataclasses | |
import glob | |
import json | |
import os | |
import re | |
import time | |
from typing import Optional | |
import openai | |
import anthropic | |
from fastchat.model.model_adapter import ( | |
get_conversation_template, | |
ANTHROPIC_MODEL_LIST, | |
OPENAI_MODEL_LIST, | |
) | |
# API setting constants | |
API_MAX_RETRY = 16 | |
API_RETRY_SLEEP = 10 | |
API_ERROR_OUTPUT = "$ERROR$" | |
TIE_DELTA = 0.1 | |
# Categories that need reference answers | |
NEED_REF_CATS = ["math", "reasoning", "coding", "arena-hard-200"] | |
# Extract scores from judgments | |
two_score_pattern = re.compile("\[\[(\d+\.?\d*),\s?(\d+\.?\d*)\]\]") | |
two_score_pattern_backup = re.compile("\[(\d+\.?\d*),\s?(\d+\.?\d*)\]") | |
one_score_pattern = re.compile("\[\[(\d+\.?\d*)\]\]") | |
one_score_pattern_backup = re.compile("\[(\d+\.?\d*)\]") | |
# Sampling temperature configs for | |
temperature_config = { | |
"writing": 0.7, | |
"roleplay": 0.7, | |
"extraction": 0.0, | |
"math": 0.0, | |
"coding": 0.0, | |
"reasoning": 0.0, | |
"stem": 0.1, | |
"humanities": 0.1, | |
"arena-hard-200": 0.0, | |
} | |
reverse_model_map = { | |
"model_1": "model_2", | |
"model_2": "model_1", | |
} | |
class Judge: | |
model_name: str | |
prompt_template: dict | |
ref_based: bool = False | |
multi_turn: bool = False | |
class MatchSingle: | |
question: dict | |
model: str | |
answer: dict | |
judge: Judge | |
ref_answer: dict = None | |
multi_turn: bool = False | |
class MatchPair: | |
question: dict | |
model_1: str | |
model_2: str | |
answer_1: dict | |
answer_2: dict | |
judge: Judge | |
ref_answer: dict = None | |
multi_turn: bool = False | |
def load_questions(question_file: str, begin: Optional[int], end: Optional[int]): | |
"""Load questions from a file.""" | |
questions = [] | |
with open(question_file, "r") as ques_file: | |
for line in ques_file: | |
if line: | |
questions.append(json.loads(line)) | |
questions = questions[begin:end] | |
return questions | |
def load_model_answers(answer_dir: str): | |
"""Load model answers. | |
The return value is a python dict of type: | |
Dict[model_name: str -> Dict[question_id: int -> answer: dict]] | |
""" | |
filenames = glob.glob(os.path.join(answer_dir, "*.jsonl")) | |
filenames.sort() | |
model_answers = {} | |
for filename in filenames: | |
model_name = os.path.basename(filename)[:-6] | |
answer = {} | |
with open(filename) as fin: | |
for line in fin: | |
line = json.loads(line) | |
answer[line["question_id"]] = line | |
model_answers[model_name] = answer | |
return model_answers | |
def load_judge_prompts(prompt_file: str): | |
"""Load judge prompts. | |
The return value is a python dict of type: | |
Dict[judge_name: str -> dict] | |
""" | |
prompts = {} | |
with open(prompt_file) as fin: | |
for line in fin: | |
line = json.loads(line) | |
prompts[line["name"]] = line | |
return prompts | |
def run_judge_single(question, answer, judge, ref_answer, multi_turn=False): | |
kwargs = {} | |
model = judge.model_name | |
if ref_answer is not None: | |
kwargs["ref_answer_1"] = ref_answer["choices"][0]["turns"][0] | |
if multi_turn: | |
kwargs["ref_answer_2"] = ref_answer["choices"][0]["turns"][1] | |
if multi_turn: | |
user_prompt = judge.prompt_template["prompt_template"].format( | |
question_1=question["turns"][0], | |
question_2=question["turns"][1], | |
answer_1=answer["choices"][0]["turns"][0], | |
answer_2=answer["choices"][0]["turns"][1], | |
**kwargs, | |
) | |
else: | |
user_prompt = judge.prompt_template["prompt_template"].format( | |
question=question["turns"][0], | |
answer=answer["choices"][0]["turns"][0], | |
**kwargs, | |
) | |
rating = -1 | |
system_prompt = judge.prompt_template["system_prompt"] | |
conv = get_conversation_template(model) | |
conv.set_system_message(system_prompt) | |
conv.append_message(conv.roles[0], user_prompt) | |
conv.append_message(conv.roles[1], None) | |
if model in OPENAI_MODEL_LIST: | |
judgment = chat_completion_openai(model, conv, temperature=0, max_tokens=2048) | |
elif model in ANTHROPIC_MODEL_LIST: | |
judgment = chat_completion_anthropic( | |
model, conv, temperature=0, max_tokens=1024 | |
) | |
else: | |
raise ValueError(f"Invalid judge model name: {model}") | |
if judge.prompt_template["output_format"] == "[[rating]]": | |
match = re.search(one_score_pattern, judgment) | |
if not match: | |
match = re.search(one_score_pattern_backup, judgment) | |
if match: | |
rating = ast.literal_eval(match.groups()[0]) | |
else: | |
rating = -1 | |
else: | |
raise ValueError( | |
f"invalid output format: {judge.prompt_template['output_format']}" | |
) | |
return rating, user_prompt, judgment | |
def play_a_match_single(match: MatchSingle, output_file: str): | |
question, model, answer, judge, ref_answer, multi_turn = ( | |
match.question, | |
match.model, | |
match.answer, | |
match.judge, | |
match.ref_answer, | |
match.multi_turn, | |
) | |
if judge.prompt_template["type"] == "single": | |
score, user_prompt, judgment = run_judge_single( | |
question, answer, judge, ref_answer, multi_turn=multi_turn | |
) | |
question_id = question["question_id"] | |
turn = 1 if not multi_turn else 2 | |
result = { | |
"question_id": question_id, | |
"model": model, | |
"judge": (judge.model_name, judge.prompt_template["name"]), | |
"user_prompt": user_prompt, | |
"judgment": judgment, | |
"score": score, | |
"turn": turn, | |
"tstamp": time.time(), | |
} | |
print( | |
f"question: {question_id}, turn: {turn}, model: {model}, " | |
f"score: {score}, " | |
f"judge: {(judge.model_name, judge.prompt_template['name'])}" | |
) | |
else: | |
raise ValueError(f"invalid judge type: {judge['type']}") | |
if output_file: | |
os.makedirs(os.path.dirname(output_file), exist_ok=True) | |
with open(output_file, "a") as fout: | |
fout.write(json.dumps(result) + "\n") | |
return result | |
def run_judge_pair(question, answer_a, answer_b, judge, ref_answer, multi_turn=False): | |
kwargs = {} | |
model = judge.model_name | |
if ref_answer is not None: | |
kwargs["ref_answer_1"] = ref_answer["choices"][0]["turns"][0] | |
if multi_turn: | |
kwargs["ref_answer_2"] = ref_answer["choices"][0]["turns"][1] | |
if multi_turn: | |
system_prompt = judge.prompt_template["system_prompt"] | |
user_prompt = judge.prompt_template["prompt_template"].format( | |
question_1=question["turns"][0], | |
question_2=question["turns"][1], | |
answer_a_1=answer_a["choices"][0]["turns"][0], | |
answer_b_1=answer_b["choices"][0]["turns"][0], | |
answer_a_2=answer_a["choices"][0]["turns"][1], | |
answer_b_2=answer_b["choices"][0]["turns"][1], | |
**kwargs, | |
) | |
else: | |
system_prompt = judge.prompt_template["system_prompt"] | |
user_prompt = judge.prompt_template["prompt_template"].format( | |
question=question["turns"][0], | |
answer_a=answer_a["choices"][0]["turns"][0], | |
answer_b=answer_b["choices"][0]["turns"][0], | |
**kwargs, | |
) | |
winner = "error" | |
conv = get_conversation_template(model) | |
conv.append_message(conv.roles[0], user_prompt) | |
conv.append_message(conv.roles[1], None) | |
if model in OPENAI_MODEL_LIST: | |
conv.set_system_message(system_prompt) | |
judgment = chat_completion_openai(model, conv, temperature=0, max_tokens=2048) | |
elif model in ANTHROPIC_MODEL_LIST: | |
if system_prompt != "You are a helpful assistant.": | |
user_prompt = "[Instruction]\n" + system_prompt + "\n\n" + user_prompt | |
conv.messages[0][1] = user_prompt | |
judgment = chat_completion_anthropic( | |
model, conv, temperature=0, max_tokens=1024 | |
) | |
else: | |
raise ValueError(f"Invalid judge model name: {model}") | |
if judge.prompt_template["output_format"] == "[[A]]": | |
if "[[A]]" in judgment: | |
winner = "A" | |
elif "[[B]]" in judgment: | |
winner = "B" | |
elif "[[C]]" in judgment: | |
winner = "tie" | |
else: | |
winner = "error" | |
elif judge.prompt_template["output_format"] == "[[rating_a,rating_b]]": | |
match = re.search(two_score_pattern, judgment) | |
if not match: | |
match = re.search(two_score_pattern_backup, judgment) | |
if match: | |
scores = [ast.literal_eval(s.strip()) for s in match.groups()] | |
if abs(scores[0] - scores[1]) <= TIE_DELTA: | |
winner = "tie" | |
elif scores[0] > scores[1]: | |
winner = "A" | |
else: | |
winner = "B" | |
else: | |
winner = "error" | |
else: | |
raise ValueError( | |
f"invalid output format: {judge.prompt_template['output_format']}" | |
) | |
return winner, user_prompt, judgment | |
def play_a_match_pair(match: MatchPair, output_file: str): | |
question, model_1, model_2, answer_1, answer_2, judge, ref_answer, multi_turn = ( | |
match.question, | |
match.model_1, | |
match.model_2, | |
match.answer_1, | |
match.answer_2, | |
match.judge, | |
match.ref_answer, | |
match.multi_turn, | |
) | |
if judge.prompt_template["type"] == "pairwise": | |
g1_winner, g1_user_prompt, g1_judgment = run_judge_pair( | |
question, answer_1, answer_2, judge, ref_answer, multi_turn=multi_turn | |
) | |
g2_winner, g2_user_prompt, g2_judgment = run_judge_pair( | |
question, answer_2, answer_1, judge, ref_answer, multi_turn=multi_turn | |
) | |
g1_map = {"A": "model_1", "B": "model_2"} | |
g2_map = {"A": "model_2", "B": "model_1"} | |
g1_winner = g1_map.get(g1_winner, g1_winner) | |
g2_winner = g2_map.get(g2_winner, g2_winner) | |
question_id = question["question_id"] | |
turn = 1 if not multi_turn else 2 | |
result = { | |
"question_id": question_id, | |
"model_1": model_1, | |
"model_2": model_2, | |
"g1_winner": g1_winner, | |
"g2_winner": g2_winner, | |
"judge": (judge.model_name, judge.prompt_template["name"]), | |
"g1_user_prompt": g1_user_prompt, | |
"g1_judgment": g1_judgment, | |
"g2_user_prompt": g2_user_prompt, | |
"g2_judgment": g2_judgment, | |
"turn": turn, | |
"tstamp": time.time(), | |
} | |
print( | |
f"question: {question_id}, turn: {turn}, model_1: {model_1}, model_2: {model_2}, " | |
f"g1_winner: {g1_winner}, g2_winner: {g2_winner}, " | |
f"judge: {(judge.model_name, judge.prompt_template['name'])}" | |
) | |
elif judge.prompt_template["type"] == "single": | |
m1_score, m1_user_prompt, m1_judgment = run_judge_single( | |
question, answer_1, judge | |
) | |
m2_score, m2_user_prompt, m2_judgment = run_judge_single( | |
question, answer_2, judge | |
) | |
if abs(m1_score - m2_score) <= TIE_DELTA: | |
winner = "tie" | |
elif m1_score > m2_score: | |
winner = "model_1" | |
else: | |
winner = "model_2" | |
question_id = question["question_id"] | |
result = { | |
"question_id": question_id, | |
"model_1": model_1, | |
"model_2": model_2, | |
"g1_winner": winner, | |
"g2_winner": winner, | |
"judge": (judge.model_name, judge.prompt_template["name"]), | |
"g1_user_prompt": m1_user_prompt, | |
"g1_judgment": m1_judgment, | |
"g2_user_prompt": m2_user_prompt, | |
"g2_judgment": m2_judgment, | |
"m1_score": m1_score, | |
"m2_score": m2_score, | |
"tstamp": time.time(), | |
} | |
print( | |
f"question: {question_id}, model_1: {model_1}, model_2: {model_2}, " | |
f"winner: {winner}, m1_score: {m1_score}, m2_score: {m2_score}, " | |
f"judge: {(judge.model_name, judge.prompt_template['name'])}" | |
) | |
else: | |
raise ValueError(f"invalid judge type: {judge['type']}") | |
if output_file: | |
os.makedirs(os.path.dirname(output_file), exist_ok=True) | |
with open(output_file, "a") as fout: | |
fout.write(json.dumps(result) + "\n") | |
return result | |
def chat_completion_openai(model, conv, temperature, max_tokens, api_dict=None): | |
if api_dict is not None: | |
openai.api_base = api_dict["api_base"] | |
openai.api_key = api_dict["api_key"] | |
output = API_ERROR_OUTPUT | |
for _ in range(API_MAX_RETRY): | |
try: | |
messages = conv.to_openai_api_messages() | |
response = openai.ChatCompletion.create( | |
model=model, | |
messages=messages, | |
n=1, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
) | |
output = response["choices"][0]["message"]["content"] | |
break | |
except openai.error.OpenAIError as e: | |
print(type(e), e) | |
time.sleep(API_RETRY_SLEEP) | |
return output | |
def chat_completion_openai_azure(model, conv, temperature, max_tokens, api_dict=None): | |
openai.api_type = "azure" | |
openai.api_version = "2023-07-01-preview" | |
if api_dict is not None: | |
openai.api_base = api_dict["api_base"] | |
openai.api_key = api_dict["api_key"] | |
else: | |
openai.api_base = os.environ["AZURE_OPENAI_ENDPOINT"] | |
openai.api_key = os.environ["AZURE_OPENAI_KEY"] | |
if "azure-" in model: | |
model = model[6:] | |
output = API_ERROR_OUTPUT | |
for _ in range(API_MAX_RETRY): | |
try: | |
messages = conv.to_openai_api_messages() | |
response = openai.ChatCompletion.create( | |
engine=model, | |
messages=messages, | |
n=1, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
) | |
output = response["choices"][0]["message"]["content"] | |
break | |
except openai.error.OpenAIError as e: | |
print(type(e), e) | |
time.sleep(API_RETRY_SLEEP) | |
except openai.error.InvalidRequestError as e: | |
print(type(e), e) | |
break | |
except KeyError: | |
print(response) | |
break | |
return output | |
def chat_completion_anthropic(model, conv, temperature, max_tokens, api_dict=None): | |
if api_dict is not None and "api_key" in api_dict: | |
api_key = api_dict["api_key"] | |
else: | |
api_key = os.environ["ANTHROPIC_API_KEY"] | |
output = API_ERROR_OUTPUT | |
for _ in range(API_MAX_RETRY): | |
try: | |
c = anthropic.Anthropic(api_key=api_key) | |
prompt = conv.get_prompt() | |
response = c.completions.create( | |
model=model, | |
prompt=prompt, | |
stop_sequences=[anthropic.HUMAN_PROMPT], | |
max_tokens_to_sample=max_tokens, | |
temperature=temperature, | |
) | |
output = response.completion | |
break | |
except anthropic.APIError as e: | |
print(type(e), e) | |
time.sleep(API_RETRY_SLEEP) | |
return output.strip() | |
def chat_completion_palm(chat_state, model, conv, temperature, max_tokens): | |
from fastchat.serve.api_provider import init_palm_chat | |
assert model == "palm-2-chat-bison-001" | |
if chat_state is None: | |
chat_state = init_palm_chat("chat-bison@001") | |
parameters = { | |
"temperature": temperature, | |
"top_p": 0.8, | |
"top_k": 40, | |
"max_output_tokens": max_tokens, | |
} | |
output = API_ERROR_OUTPUT | |
for _ in range(API_MAX_RETRY): | |
try: | |
response = chat_state.send_message(conv.messages[-2][1], **parameters) | |
output = response.text | |
break | |
except Exception as e: | |
print(type(e), e) | |
time.sleep(API_RETRY_SLEEP) | |
return chat_state, output | |
def normalize_game_key_single(gamekey, result): | |
"""Make the model names sorted in a game key.""" | |
qid, model_1, model_2 = gamekey | |
if model_1 < model_2: | |
return gamekey, result | |
else: | |
new_gamekey = (qid, model_2, model_1) | |
new_result = { | |
"winners": tuple(reverse_model_map.get(x, x) for x in result["winners"]), | |
"g1_judgment": result["g2_judgment"], | |
"g2_judgment": result["g1_judgment"], | |
} | |
return new_gamekey, new_result | |
def normalize_game_key_dict(judgment_dict): | |
"""Make the model names sorted in the game keys.""" | |
ret = {} | |
for key, value in judgment_dict.items(): | |
new_key, new_value = normalize_game_key_single(key, value) | |
ret[new_key] = new_value | |
return ret | |
def load_pairwise_model_judgments(filename: str): | |
"""Load model judgments. | |
The return value is a dict of type: | |
Dict[judge: Tuple -> Dict[game_key: tuple -> game_result: dict] | |
""" | |
judge_dict = {} | |
for line in open(filename): | |
obj = json.loads(line) | |
judge = tuple(obj["judge"]) | |
qid, model_1, model_2 = obj["question_id"], obj["model_1"], obj["model_2"] | |
if judge not in judge_dict: | |
judge_dict[judge] = {} | |
if "winner" in obj: | |
winner = obj["winner"] | |
elif "g1_winner" in obj and "g2_winner" in obj: | |
g1_winner, g2_winner = obj["g1_winner"], obj["g2_winner"] | |
if g1_winner == g2_winner: | |
winner = g1_winner | |
else: | |
winner = "inconsistent" | |
else: | |
raise ValueError(f"Invalid keys: {list(obj.keys())}") | |
gamekey = (qid, model_1, model_2) | |
winners = (winner,) | |
judge_dict[judge][gamekey] = { | |
"winners": winners, | |
"g1_judgment": obj["g1_judgment"], | |
"g2_judgment": obj["g2_judgment"], | |
} | |
# Make the model names sorted in the game keys | |
normalized = {} | |
for judge, value in judge_dict.items(): | |
normalized[judge] = normalize_game_key_dict(value) | |
return normalized | |
def load_single_model_judgments(filename: str): | |
"""Load model judgments. | |
The return value is a dict of type: | |
Dict[judge: Tuple -> Dict[game_key: tuple -> game_result: dict] | |
""" | |
judge_dict = {} | |
for line in open(filename): | |
obj = json.loads(line) | |
judge = tuple(obj["judge"]) | |
qid, model = obj["question_id"], obj["model"] | |
if judge not in judge_dict: | |
judge_dict[judge] = {} | |
gamekey = (qid, model) | |
judge_dict[judge][gamekey] = { | |
"score": obj["score"], | |
"judgment": obj["judgment"], | |
} | |
return judge_dict | |
def resolve_pairwise_judgment_dict( | |
question, model_judgments_normal, model_judgments_math, multi_turn=False | |
): | |
"""Return the correct pairwise judge.""" | |
if multi_turn: | |
if question["category"] in NEED_REF_CATS: | |
return model_judgments_math[("gpt-4", "pair-math-v1-multi-turn")] | |
return model_judgments_normal[("gpt-4", "pair-v2-multi-turn")] | |
if question["category"] in NEED_REF_CATS: | |
return model_judgments_math[("gpt-4", "pair-math-v1")] | |
else: | |
return model_judgments_normal[("gpt-4", "pair-v2")] | |
def resolve_single_judgment_dict( | |
question, model_judgments_normal, model_judgments_math, multi_turn=False | |
): | |
"""Return the correct single answer grading judge.""" | |
if multi_turn: | |
if question["category"] in NEED_REF_CATS: | |
return model_judgments_math[("gpt-4", "single-math-v1-multi-turn")] | |
return model_judgments_normal[("gpt-4", "single-v1-multi-turn")] | |
if question["category"] in NEED_REF_CATS: | |
return model_judgments_math[("gpt-4", "single-math-v1")] | |
else: | |
return model_judgments_normal[("gpt-4", "single-v1")] | |
def get_pairwise_judge_explanation(gamekey, judgment_dict): | |
"""Get model judge explanation.""" | |
try: | |
qid, model_1, model_2 = gamekey | |
if model_1 < model_2: | |
res = judgment_dict[gamekey] | |
g1_judgment, g2_judgment = res["g1_judgment"], res["g2_judgment"] | |
else: | |
new_gamekey = (qid, model_2, model_1) | |
res = judgment_dict[new_gamekey] | |
model_1, model_2 = model_1, model_2 | |
g1_judgment, g2_judgment = res["g2_judgment"], res["g1_judgment"] | |
return ( | |
f"**Game 1**. **A**: {model_1}, **B**: {model_2}\n\n" | |
f"**Judgment**: {g1_judgment}" | |
+ f"\n\n`--------------------------`\n\n" | |
+ f"**Game 2**. **A**: {model_2}, **B**: {model_1}\n\n" | |
f"**Judgment**: {g2_judgment}" | |
) | |
except KeyError: | |
return "N/A" | |
def get_single_judge_explanation(gamekey, judgment_dict): | |
"""Get model judge explanation.""" | |
try: | |
qid, model = gamekey | |
res = judgment_dict[gamekey] | |
g1_judgment = res["judgment"] | |
g1_score = res["score"] | |
return ( | |
f"**Game 1**. **A**: {model}, **Score**: {g1_score}\n\n" | |
f"**Judgment**: {g1_judgment}" | |
) | |
except KeyError: | |
return "N/A" | |
def check_data(questions, model_answers, ref_answers, models, judges): | |
# check model answers | |
for m in models: | |
assert m in model_answers, f"Missing model answer for {m}" | |
m_answer = model_answers[m] | |
for q in questions: | |
assert ( | |
q["question_id"] in m_answer | |
), f"Missing model {m}'s answer to Question {q['question_id']}" | |
# check ref answers | |
for jg in judges.values(): | |
if not jg.ref_based: | |
continue | |
for q in questions: | |
if q["category"] not in NEED_REF_CATS: | |
continue | |
assert ( | |
q["question_id"] in ref_answers[jg.model_name] | |
), f"Missing reference answer to Question {q['question_id']} for judge {jg.model_name}" | |
def get_model_list(answer_dir): | |
file_paths = glob.glob(f"{answer_dir}/*.jsonl") | |
file_names = [os.path.splitext(os.path.basename(f))[0] for f in file_paths] | |
return file_names | |