Spaces:
Build error
Build error
import os | |
import re | |
import pandas as pd | |
from tqdm import tqdm | |
import seaborn as sns | |
import matplotlib.pyplot as plt | |
from matplotlib import rcParams | |
from matplotlib.ticker import MultipleLocator | |
from datasets import load_dataset | |
import numpy as np | |
from sklearn.metrics import ( | |
accuracy_score, | |
precision_score, | |
recall_score, | |
f1_score, | |
confusion_matrix, | |
) | |
from llm_toolkit.llm_utils import invoke_langchain | |
print(f"loading {__file__}") | |
P1 = """你是一个逻辑游戏的主持人。游戏规则如下: | |
1. 参与者会得到一个谜题。 | |
2. 参与者可以通过提问来获取线索,尝试解开谜题。 | |
3. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。 | |
4. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 | |
5. 参与者需要根据回答来推理,并最终找出谜题的正确答案。 | |
请严格按照这些规则回答参与者提出的问题。 | |
谜题: {} | |
实际情况: {} | |
参与者提出的问题: {} | |
""" | |
P1_en = """You are the host of a logic game. The rules of the game are as follows: | |
1. Participants will receive a puzzle. | |
2. Participants can ask questions to obtain clues and try to solve the puzzle. | |
3. For each question, the host will answer with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning. | |
4. The answer cannot include any additional information, nor can any word in the options be omitted. For example, “No” cannot be shortened to “N”. | |
5. Participants need to infer and ultimately find the correct answer to the puzzle based on the responses. | |
Please strictly adhere to these rules when answering participants’ questions. | |
Puzzle: {} | |
Actual situation: {} | |
Question from participants: {}""" | |
P2 = """你是一个情景猜谜游戏的主持人。游戏规则如下: | |
1. 参与者会得到一个谜面,谜面会描述一个简单又难以理解的事件。 | |
2. 主持人知道谜底,谜底是谜面的答案。 | |
3. 参与者可以询问任何封闭式问题来找寻事件的真相。 | |
4. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。各回答的判断标准如下: | |
- 若谜面和谜底能找到问题的答案,回答:是或者不是 | |
- 若谜面和谜底不能直接或者间接推断出问题的答案,回答:不重要 | |
- 若参与者提问不是一个封闭式问题或者问题难以理解,回答:问法错误 | |
- 若参与者提问基本还原了谜底真相,回答:回答正确 | |
5. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 | |
请严格按照这些规则回答参与者提出的问题。 | |
谜面: {} | |
谜底: {} | |
参与者提出的问题: {} | |
回答: | |
""" | |
P2_en = """You are the host of a situational guessing game. The rules of the game are as follows: | |
1. Participants will receive a riddle that describes a simple yet difficult to understand event. | |
2. The host knows the truth, which is the solution to the riddle. | |
3. Participants can ask any closed-ended questions to uncover the truth of the event. | |
4. For each question, the host will respond with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning. The criteria for each response are as follows: | |
- If the riddle and answer can provide an answer to the question, respond with: Yes or No | |
- If the riddle and answer cannot directly or indirectly infer an answer to the question, respond with: Unimportant | |
- If the participant's question is not a closed-ended question or is difficult to understand, respond with: Incorrect questioning | |
- If the participant's question essentially reveals the truth of the answer, respond with: Correct answer | |
5. The response must not include any additional information, nor should any word be omitted from the options. For example, "No" cannot be abbreviated to "N". | |
Please strictly follow these rules when answering the participant's questions. | |
Riddle: {} | |
Truth: {} | |
Participant's question: {} | |
""" | |
system_prompt = "You are an expert in logical reasoning." | |
P2_few_shot = """你是一个情景猜谜游戏的主持人。游戏规则如下: | |
1. 参与者会得到一个谜面,谜面会描述一个简单又难以理解的事件。 | |
2. 主持人知道谜底,谜底是谜面的答案。 | |
3. 参与者可以询问任何封闭式问题来找寻事件的真相。 | |
4. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。各回答的判断标准如下: | |
- 若谜面和谜底能找到问题的答案,回答:是或者不是 | |
- 若谜面和谜底不能直接或者间接推断出问题的答案,回答:不重要 | |
- 若参与者提问不是一个封闭式问题或者问题难以理解,回答:问法错误 | |
- 若参与者提问基本还原了谜底真相,回答:回答正确 | |
5. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 | |
请严格按照这些规则回答参与者提出的问题。 | |
示例输入和输出: | |
{examples} | |
谜面: {} | |
谜底: {} | |
参与者提出的问题: {} | |
回答: | |
""" | |
try: | |
df_translation = pd.read_csv("datasets/mgtv/unique_translations.csv") | |
translation_dict = df_translation.set_index("chinese").to_dict(orient="index") | |
translation_dict = {k: v["english"] for k, v in translation_dict.items()} | |
except Exception as e: | |
print(e) | |
translation_dict = {} | |
def get_prompt_template(using_p1=True, chinese_prompt=True): | |
if using_p1: | |
return P1 if chinese_prompt else P1_en | |
else: | |
return P2 if chinese_prompt else P2_en | |
def get_few_shot_prompt_template(num_shots, train_dataset, debug=False): | |
if num_shots == 0: | |
return get_prompt_template(using_p1=False, chinese_prompt=True) | |
labels = train_dataset["label"].unique() | |
if debug: | |
print("num_shots:", num_shots) | |
print("labels:", labels) | |
examples = "" | |
index = 0 | |
while num_shots > 0: | |
for label in labels: | |
while train_dataset["label"][index] != label: | |
index += 1 | |
row = train_dataset.iloc[index] | |
examples += f"""谜面: {row["puzzle"]} | |
谜底: {row["truth"]} | |
参与者提出的问题: {row["text"]} | |
回答: {row["label"]} | |
""" | |
num_shots -= 1 | |
if num_shots == 0: | |
break | |
prompt = P2_few_shot.replace("{examples}", examples) | |
if debug: | |
print("P2_few_shot:", prompt) | |
return prompt | |
def extract_answer(text, debug=False): | |
if text and isinstance(text, str): | |
# Remove the begin and end tokens | |
text = re.sub( | |
r".*?(assistant|\[/INST\]).+?\b", | |
"", | |
text, | |
flags=re.DOTALL | re.MULTILINE, | |
) | |
if debug: | |
print("--------\nstep 1:", text) | |
text = re.sub(r"<.+?>.*", "", text, flags=re.DOTALL | re.MULTILINE) | |
if debug: | |
print("--------\nstep 2:", text) | |
text = re.sub( | |
r".*?end_header_id\|>\n\n", "", text, flags=re.DOTALL | re.MULTILINE | |
) | |
if debug: | |
print("--------\nstep 3:", text) | |
text = text.split(".")[0].strip() | |
text = text.split("\n")[0].strip() | |
text = text.split("。")[0].strip() | |
text = text.replace("回答: ", "").strip() | |
if debug: | |
print("--------\nstep 4:", text) | |
text = re.sub( | |
r"^Response:.+?\b", | |
"", | |
text, | |
flags=re.DOTALL | re.MULTILINE, | |
) | |
if debug: | |
print("--------\nstep 5:", text) | |
return text.strip() | |
return "" | |
def extract_answer_from_text(text, question): | |
labels = ['不是', '是', '不重要', '回答正确', '问法错误'] | |
original_text = text | |
text = text.split("回答:")[-1] | |
found_question = False | |
for line in text.split("\n"): | |
if question in line: | |
found_question = True | |
elif found_question: | |
text = line | |
break | |
text = extract_answer(text) | |
if text in labels: | |
return text | |
text = text.replace("Human: ", "") | |
text = text.replace("Assistant: ", "") | |
if text in labels: | |
return text | |
# print(f"not found: {question} | {original_text} | {text}") | |
return text | |
def calc_metrics(references, predictions, questions=None, debug=False): | |
assert len(references) == len( | |
predictions | |
), f"lengths are difference: {len(references)} != {len(predictions)}" | |
labels = np.unique(references) | |
valid_classifications = [1 if p in labels else 0 for p in predictions] | |
predictions = [extract_answer(text) for text in predictions] if questions is None else [extract_answer_from_text(text, question) for text, question in zip(predictions, questions)] | |
accuracy = accuracy_score(references, predictions) | |
results = {"accuracy": accuracy} | |
if debug: | |
incorrect_ids = [i for i, p in enumerate(predictions) if p != references[i]] | |
results["incorrect_ids"] = incorrect_ids | |
precision = precision_score( | |
references, predictions, average="weighted", labels=labels | |
) | |
results["precision"] = float(precision) | |
recall = recall_score(references, predictions, average="weighted", labels=labels) | |
results["recall"] = float(recall) | |
f1 = f1_score(references, predictions, average="weighted", labels=labels) | |
results["f1"] = float(f1) | |
results["ratio_valid_classifications"] = sum(valid_classifications) / len( | |
valid_classifications | |
) | |
return results | |
def save_results(model_name, results_path, dataset, predictions, debug=False): | |
if not os.path.exists(results_path): | |
# Get the directory part of the file path | |
dir_path = os.path.dirname(results_path) | |
# Create all directories in the path (if they don't exist) | |
os.makedirs(dir_path, exist_ok=True) | |
if isinstance(dataset, pd.DataFrame): | |
df = dataset | |
else: | |
df = dataset.to_pandas() | |
df.drop( | |
columns=["answer", "prompt", "train_text"], inplace=True, errors="ignore" | |
) | |
else: | |
df = pd.read_csv(results_path, on_bad_lines="warn") | |
df[model_name] = predictions | |
if debug: | |
print(df.head(1)) | |
df.to_csv(results_path, index=False) | |
def load_logical_reasoning_dataset( | |
data_path, | |
tokenizer=None, | |
using_p1=True, | |
chinese_prompt=True, | |
test_data=None, | |
num_shots=0, | |
format_test_only=False, | |
): | |
postfix = "" if chinese_prompt else "_en" | |
train_data_file = data_path + f"/train{postfix}.csv" | |
test_data_file = data_path + f"/{test_data if test_data else 'dev'}{postfix}.csv" | |
print("loading train/test data files") | |
datasets = load_dataset( | |
"csv", | |
data_files={"train": train_data_file, "test": test_data_file}, | |
) | |
if tokenizer: | |
reasoning_prompt = ( | |
get_prompt_template(using_p1, chinese_prompt) | |
if num_shots == 0 | |
else get_few_shot_prompt_template(num_shots, datasets["train"].to_pandas()) | |
) | |
def formatting_prompts_func(examples): | |
inputs = examples["text"] | |
outputs = examples["label"] | |
puzzles = examples["puzzle"] | |
truths = examples["truth"] | |
messages = [ | |
{ | |
"role": "system", | |
"content": system_prompt, | |
}, | |
None, | |
] | |
model_name = os.getenv("MODEL_NAME") | |
if "gemma" in model_name.lower(): | |
messages = messages[1:] | |
texts = [] | |
prompts = [] | |
for input, output, puzzle, truth in zip(inputs, outputs, puzzles, truths): | |
prompt = reasoning_prompt.format(puzzle, truth, input) | |
messages[-1] = {"role": "user", "content": prompt} | |
prompt = tokenizer.apply_chat_template( | |
messages, tokenize=False, add_generation_prompt=True | |
) | |
prompts.append(prompt) | |
texts.append(prompt + output + tokenizer.eos_token if output else "") | |
return {"train_text": texts, "prompt": prompts} | |
if format_test_only: | |
datasets["test"] = datasets["test"].map( | |
formatting_prompts_func, | |
batched=True, | |
) | |
else: | |
datasets = datasets.map( | |
formatting_prompts_func, | |
batched=True, | |
) | |
print(datasets) | |
return datasets | |
def get_metrics(df): | |
metrics_df = pd.DataFrame(df.columns.T)[2:] | |
metrics_df.rename(columns={0: "model"}, inplace=True) | |
metrics_df["model"] = metrics_df["model"].apply(lambda x: x.split("/")[-1]) | |
metrics_df.reset_index(inplace=True) | |
metrics_df = metrics_df.drop(columns=["index"]) | |
accuracy = [] | |
all_metrics = [] | |
for col in df.columns[2:]: | |
metrics = calc_metrics(df["label"], df[col], questions=df["text"], debug=True) | |
print(f"{col}: {metrics}") | |
accuracy.append(metrics["accuracy"]) | |
all_metrics.append(metrics) | |
metrics_df["accuracy"] = accuracy | |
metrics_df["all_metrics"] = all_metrics | |
return metrics_df | |
def load_alpaca_data(data_path, using_p1=True, use_english_datasets=False): | |
alpaca_data_path = ( | |
"llama-factory/data/alpaca_mgtv_p1.json" | |
if using_p1 | |
else "llama-factory/data/alpaca_mgtv_p2.json" | |
) | |
if use_english_datasets: | |
alpaca_data_path = alpaca_data_path.replace(".json", "_en.json") | |
if os.path.exists(alpaca_data_path): | |
print("loading existing data from:", alpaca_data_path) | |
data = pd.read_json(alpaca_data_path, orient="records", lines=False) | |
return data | |
print("loading new data from:", alpaca_data_path) | |
chinese_prompt = not use_english_datasets | |
datasets = load_logical_reasoning_dataset( | |
data_path, using_p1=using_p1, chinese_prompt=chinese_prompt | |
) | |
prompt_template = get_prompt_template(using_p1, chinese_prompt) | |
df_train = datasets["train"].to_pandas() | |
df_train["instruction"] = df_train.apply( | |
lambda x: prompt_template.format(x["puzzle"], x["truth"], x["text"]), axis=1 | |
) | |
df_alpaca = pd.DataFrame( | |
{"instruction": [""] * len(df_train), "input": [""] * len(df_train)} | |
) | |
df_alpaca["instruction"] = df_train["instruction"] | |
df_alpaca["output"] = df_train["label"] | |
df_alpaca.to_json(alpaca_data_path, orient="records", lines=False, indent=2) | |
return df_alpaca | |
def plot_value_counts(df, column_name, offset=0.1, title=None, preprocess_func=None): | |
# font_family = rcParams["font.family"] | |
# # Set the font to SimHei to support Chinese characters | |
# rcParams["font.family"] = "SimHei" | |
# rcParams["axes.unicode_minus"] = ( | |
# False # This is to support the minus sign in Chinese. | |
# ) | |
if preprocess_func: | |
df["backup"] = df[column_name] | |
df[column_name] = df[column_name].apply(preprocess_func) if preprocess_func == extract_answer else df.apply( | |
preprocess_func, axis=1 | |
) | |
plt.figure(figsize=(8, 4)) | |
value_counts = df[column_name].value_counts() | |
value_counts = value_counts.rename(index=translation_dict) | |
value_counts.plot(kind="bar") | |
# add values on top of bars | |
for i, v in enumerate(value_counts): | |
plt.text(i, v + offset, str(v), ha="center") | |
plt.xlabel(title or column_name) | |
plt.show() | |
# rcParams["font.family"] = font_family | |
if preprocess_func: | |
plot_confusion_matrix(df["label"], df[column_name]) | |
df[column_name] = df["backup"] | |
df.drop(columns=["backup"], inplace=True) | |
def calc_metrics_for_col(df, col, debug=True): | |
metrics = calc_metrics(df["label"], df[col], questions=df["text"], debug=debug) | |
return metrics["accuracy"], metrics["precision"], metrics["recall"], metrics["f1"] | |
def get_metrics_df(df, variant="epoch", sort_columns=True): | |
perf_df = pd.DataFrame( | |
columns=[variant, "model", "run", "accuracy", "precision", "recall", "f1"] | |
) | |
columns = ( | |
df.columns[5:].tolist() | |
if variant == "index" | |
else [ | |
col | |
for col in df.columns[5:] | |
if variant in col or variant == "epoch" and "_torch." in col | |
] | |
) | |
if sort_columns: | |
columns = sorted(columns, key=lambda x: int(x.lower().replace("-1m", "").replace("chat", "0").replace("instruct", "0").split("-")[-1].split("_")[0])) | |
print("columns:", columns) | |
for i, col in enumerate(columns): | |
metrics = calc_metrics(df["label"], df[col], questions=df["text"], debug=False) | |
new_model_metrics = { | |
variant: i / 5 if variant == "epoch" else i + 1, | |
"model": col if "/" not in col else col.split("/")[1].split("_torch")[0], | |
"run": col, | |
} | |
if variant == "shots": | |
parts = col.split("/shots-") | |
new_model_metrics["shots"] = int(parts[1]) | |
if variant in new_model_metrics["model"]: | |
new_model_metrics["model"] = parts[0] | |
new_model_metrics.update(metrics) | |
# Convert the dictionary to a DataFrame and concatenate it with the existing DataFrame | |
perf_df = pd.concat( | |
[perf_df, pd.DataFrame([new_model_metrics])], ignore_index=True | |
) | |
return perf_df | |
def plot_metrics(perf_df, model_name, variant="epoch", offset=0.01): | |
fig, ax = plt.subplots(1, 1, figsize=(8, 4)) | |
perf_df = perf_df[perf_df["model"] == model_name] | |
# Ensure the lengths of perf_df["epoch"], perf_df["accuracy"], and perf_df["f1"] are the same | |
min_length = min( | |
len(perf_df[variant]), len(perf_df["accuracy"]), len(perf_df["f1"]) | |
) | |
perf_df = perf_df.iloc[:min_length] | |
# Plot accuracy and f1 on the same chart with different markers | |
ax.plot( | |
perf_df[variant], perf_df["accuracy"], marker="o", label="Accuracy", color="r" | |
) | |
ax.plot( | |
perf_df[variant], perf_df["f1"], marker="s", label="F1 Score", color="b" | |
) # Square marker for F1 Score | |
# Add values on top of points | |
for i in range(min_length): | |
print(f"{perf_df[variant].iloc[i]}: {perf_df['run'].iloc[i]}") | |
ax.annotate( | |
f"{perf_df['accuracy'].iloc[i]*100:.2f}%", | |
(perf_df[variant].iloc[i], perf_df["accuracy"].iloc[i]), | |
ha="center", | |
va="bottom", # Move accuracy numbers below the points | |
xytext=(0, -15), | |
textcoords="offset points", | |
fontsize=10, | |
color="r", | |
) | |
ax.annotate( | |
f"{perf_df['f1'].iloc[i]*100:.2f}%", | |
(perf_df[variant].iloc[i], perf_df["f1"].iloc[i]), | |
ha="center", | |
va="top", # Move F1 score numbers above the points | |
xytext=(0, 15), # Offset by 15 points vertically | |
textcoords="offset points", | |
fontsize=10, | |
color="b", | |
) | |
# Set y-axis limit | |
ylimits = ax.get_ylim() | |
ax.set_ylim(ylimits[0] - offset, ylimits[1] + offset) | |
# Add title and labels | |
ax.set_xlabel( | |
"Epoch (0: base model, 0.2 - 2: fine-tuned models)" | |
if variant == "epoch" | |
else "Number of Shots" | |
) | |
ax.set_ylabel("Accuracy and F1 Score") | |
ax.xaxis.set_major_locator(MultipleLocator(0.2 if variant == "epoch" else 5)) | |
ax.set_title(f"Performance Analysis Across Checkpoints for the {model_name} Model") | |
# Rotate x labels | |
plt.xticks(rotation=0) | |
plt.grid(True) | |
# plt.tight_layout() | |
# Set legend at the right to avoid overlapping with lines | |
plt.legend(loc="center left", bbox_to_anchor=(1.0, 0.5)) | |
plt.show() | |
def reasoning_with_openai( | |
row, | |
user_prompt, | |
max_tokens=None, | |
model="gpt-4o-mini", | |
base_url=None, | |
temperature=0, | |
using_system_prompt=True, | |
is_openai=True, | |
): | |
return invoke_langchain( | |
user_prompt.format(row["puzzle"], row["truth"], row["text"]), | |
system_prompt=system_prompt if using_system_prompt else None, | |
max_tokens=max_tokens, | |
model=model, | |
base_url=base_url, | |
temperature=temperature, | |
is_openai=is_openai, | |
) | |
def eval_openai( | |
eval_dataset, | |
model="gpt-4o-mini", | |
max_new_tokens=300, | |
num_shots=0, | |
train_dataset=None, | |
): | |
user_prompt = ( | |
get_prompt_template(using_p1=False, chinese_prompt=True) | |
if num_shots == 0 | |
else get_few_shot_prompt_template(num_shots, train_dataset) | |
) | |
print("user_prompt:", user_prompt) | |
total = len(eval_dataset) | |
predictions = [] | |
is_using_o1 = "o1" in model | |
is_openai = "claude" not in model | |
for i in tqdm(range(total)): | |
output = reasoning_with_openai( | |
eval_dataset.iloc[i], | |
user_prompt, | |
model=model, | |
max_tokens=None if is_using_o1 else max_new_tokens, | |
temperature=1 if is_using_o1 else 0, | |
using_system_prompt=not is_using_o1, | |
is_openai=is_openai, | |
) | |
predictions.append(output) | |
return predictions | |
def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"): | |
# font_family = rcParams["font.family"] | |
# # Set the font to SimHei to support Chinese characters | |
# rcParams["font.family"] = "SimHei" | |
# rcParams["axes.unicode_minus"] = ( | |
# False # This is to support the minus sign in Chinese. | |
# ) | |
labels = np.unique(y_true) | |
y_pred = [extract_answer(text) for text in y_pred] | |
cm = confusion_matrix(y_true, y_pred, labels=labels) | |
cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis] | |
fig, ax = plt.subplots(figsize=(8, 8)) | |
labels = [translation_dict[x] for x in labels] | |
sns.heatmap( | |
cm, | |
annot=True, | |
fmt=".4f", | |
cmap="Blues", | |
xticklabels=labels, | |
yticklabels=labels, | |
) | |
ax.set_title(title) | |
ax.set_xlabel("Predicted labels") | |
ax.set_ylabel("True labels") | |
plt.show() | |
# rcParams["font.family"] = font_family | |
def majority_vote(r1, r2, r3): | |
label = r2 | |
if r1 == r3: | |
label = r1 | |
return label | |
def load_openai_batch_data(data_path, num_shots=10, model="o1-mini", debug=True): | |
openai_data_path = f"{data_path}/{model}.jsonl" | |
if os.path.exists(openai_data_path): | |
print("loading existing data from:", openai_data_path) | |
data = pd.read_json(openai_data_path, orient="records", lines=True) | |
return data | |
datasets = load_logical_reasoning_dataset(data_path) | |
df_train = datasets["train"].to_pandas() | |
prompt = get_few_shot_prompt_template(num_shots, df_train, debug=debug) | |
messages = [] | |
df_test = datasets["test"].to_pandas() | |
for i, row in df_test.iterrows(): | |
content = prompt.format(row["puzzle"], row["truth"], row["text"]) | |
messages.append( | |
{ | |
"custom_id": f"request-{i + 1}", | |
"method": "POST", | |
"url": "/v1/chat/completions", | |
"body": { | |
"model": model, | |
"messages": [ | |
{"role": "user", "content": content}, | |
], | |
}, | |
} | |
) | |
df_openai = pd.DataFrame(messages) | |
df_openai.to_json(openai_data_path, orient="records", lines=True) | |
return df_openai | |