logical-reasoning / llm_toolkit /logical_reasoning_utils.py
dh-mc's picture
ready for final run
687f68b
raw
history blame
23.8 kB
import os
import re
import pandas as pd
from tqdm import tqdm
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import rcParams
from matplotlib.ticker import MultipleLocator
from datasets import load_dataset
import numpy as np
from sklearn.metrics import (
accuracy_score,
precision_score,
recall_score,
f1_score,
confusion_matrix,
)
from llm_toolkit.llm_utils import invoke_langchain
print(f"loading {__file__}")
P1 = """你是一个逻辑游戏的主持人。游戏规则如下:
1. 参与者会得到一个谜题。
2. 参与者可以通过提问来获取线索,尝试解开谜题。
3. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。
4. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。
5. 参与者需要根据回答来推理,并最终找出谜题的正确答案。
请严格按照这些规则回答参与者提出的问题。
谜题: {}
实际情况: {}
参与者提出的问题: {}
"""
P1_en = """You are the host of a logic game. The rules of the game are as follows:
1. Participants will receive a puzzle.
2. Participants can ask questions to obtain clues and try to solve the puzzle.
3. For each question, the host will answer with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning.
4. The answer cannot include any additional information, nor can any word in the options be omitted. For example, “No” cannot be shortened to “N”.
5. Participants need to infer and ultimately find the correct answer to the puzzle based on the responses.
Please strictly adhere to these rules when answering participants’ questions.
Puzzle: {}
Actual situation: {}
Question from participants: {}"""
P2 = """你是一个情景猜谜游戏的主持人。游戏规则如下:
1. 参与者会得到一个谜面,谜面会描述一个简单又难以理解的事件。
2. 主持人知道谜底,谜底是谜面的答案。
3. 参与者可以询问任何封闭式问题来找寻事件的真相。
4. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。各回答的判断标准如下:
- 若谜面和谜底能找到问题的答案,回答:是或者不是
- 若谜面和谜底不能直接或者间接推断出问题的答案,回答:不重要
- 若参与者提问不是一个封闭式问题或者问题难以理解,回答:问法错误
- 若参与者提问基本还原了谜底真相,回答:回答正确
5. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。
请严格按照这些规则回答参与者提出的问题。
谜面: {}
谜底: {}
参与者提出的问题: {}
回答:
"""
P2_en = """You are the host of a situational guessing game. The rules of the game are as follows:
1. Participants will receive a riddle that describes a simple yet difficult to understand event.
2. The host knows the truth, which is the solution to the riddle.
3. Participants can ask any closed-ended questions to uncover the truth of the event.
4. For each question, the host will respond with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning. The criteria for each response are as follows:
- If the riddle and answer can provide an answer to the question, respond with: Yes or No
- If the riddle and answer cannot directly or indirectly infer an answer to the question, respond with: Unimportant
- If the participant's question is not a closed-ended question or is difficult to understand, respond with: Incorrect questioning
- If the participant's question essentially reveals the truth of the answer, respond with: Correct answer
5. The response must not include any additional information, nor should any word be omitted from the options. For example, "No" cannot be abbreviated to "N".
Please strictly follow these rules when answering the participant's questions.
Riddle: {}
Truth: {}
Participant's question: {}
"""
system_prompt = "You are an expert in logical reasoning."
P2_few_shot = """你是一个情景猜谜游戏的主持人。游戏规则如下:
1. 参与者会得到一个谜面,谜面会描述一个简单又难以理解的事件。
2. 主持人知道谜底,谜底是谜面的答案。
3. 参与者可以询问任何封闭式问题来找寻事件的真相。
4. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。各回答的判断标准如下:
- 若谜面和谜底能找到问题的答案,回答:是或者不是
- 若谜面和谜底不能直接或者间接推断出问题的答案,回答:不重要
- 若参与者提问不是一个封闭式问题或者问题难以理解,回答:问法错误
- 若参与者提问基本还原了谜底真相,回答:回答正确
5. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。
请严格按照这些规则回答参与者提出的问题。
示例输入和输出:
{examples}
谜面: {}
谜底: {}
参与者提出的问题: {}
回答:
"""
try:
df_translation = pd.read_csv("datasets/mgtv/unique_translations.csv")
translation_dict = df_translation.set_index("chinese").to_dict(orient="index")
translation_dict = {k: v["english"] for k, v in translation_dict.items()}
except Exception as e:
print(e)
translation_dict = {}
def get_prompt_template(using_p1=True, chinese_prompt=True):
if using_p1:
return P1 if chinese_prompt else P1_en
else:
return P2 if chinese_prompt else P2_en
def get_few_shot_prompt_template(num_shots, train_dataset, debug=False):
if num_shots == 0:
return get_prompt_template(using_p1=False, chinese_prompt=True)
labels = train_dataset["label"].unique()
if debug:
print("num_shots:", num_shots)
print("labels:", labels)
examples = ""
index = 0
while num_shots > 0:
for label in labels:
while train_dataset["label"][index] != label:
index += 1
row = train_dataset.iloc[index]
examples += f"""谜面: {row["puzzle"]}
谜底: {row["truth"]}
参与者提出的问题: {row["text"]}
回答: {row["label"]}
"""
num_shots -= 1
if num_shots == 0:
break
prompt = P2_few_shot.replace("{examples}", examples)
if debug:
print("P2_few_shot:", prompt)
return prompt
def extract_answer(text, debug=False):
if text and isinstance(text, str):
# Remove the begin and end tokens
text = re.sub(
r".*?(assistant|\[/INST\]).+?\b",
"",
text,
flags=re.DOTALL | re.MULTILINE,
)
if debug:
print("--------\nstep 1:", text)
text = re.sub(r"<.+?>.*", "", text, flags=re.DOTALL | re.MULTILINE)
if debug:
print("--------\nstep 2:", text)
text = re.sub(
r".*?end_header_id\|>\n\n", "", text, flags=re.DOTALL | re.MULTILINE
)
if debug:
print("--------\nstep 3:", text)
text = text.split(".")[0].strip()
text = text.split("\n")[0].strip()
text = text.split("。")[0].strip()
text = text.replace("回答: ", "").strip()
if debug:
print("--------\nstep 4:", text)
text = re.sub(
r"^Response:.+?\b",
"",
text,
flags=re.DOTALL | re.MULTILINE,
)
if debug:
print("--------\nstep 5:", text)
return text.strip()
return ""
def extract_answer_from_text(text, question):
labels = ['不是', '是', '不重要', '回答正确', '问法错误']
original_text = text
text = text.split("回答:")[-1]
found_question = False
for line in text.split("\n"):
if question in line:
found_question = True
elif found_question:
text = line
break
text = extract_answer(text)
if text in labels:
return text
text = text.replace("Human: ", "")
text = text.replace("Assistant: ", "")
if text in labels:
return text
# print(f"not found: {question} | {original_text} | {text}")
return text
def calc_metrics(references, predictions, questions=None, debug=False):
assert len(references) == len(
predictions
), f"lengths are difference: {len(references)} != {len(predictions)}"
labels = np.unique(references)
valid_classifications = [1 if p in labels else 0 for p in predictions]
predictions = [extract_answer(text) for text in predictions] if questions is None else [extract_answer_from_text(text, question) for text, question in zip(predictions, questions)]
accuracy = accuracy_score(references, predictions)
results = {"accuracy": accuracy}
if debug:
incorrect_ids = [i for i, p in enumerate(predictions) if p != references[i]]
results["incorrect_ids"] = incorrect_ids
precision = precision_score(
references, predictions, average="weighted", labels=labels
)
results["precision"] = float(precision)
recall = recall_score(references, predictions, average="weighted", labels=labels)
results["recall"] = float(recall)
f1 = f1_score(references, predictions, average="weighted", labels=labels)
results["f1"] = float(f1)
results["ratio_valid_classifications"] = sum(valid_classifications) / len(
valid_classifications
)
return results
def save_results(model_name, results_path, dataset, predictions, debug=False):
if not os.path.exists(results_path):
# Get the directory part of the file path
dir_path = os.path.dirname(results_path)
# Create all directories in the path (if they don't exist)
os.makedirs(dir_path, exist_ok=True)
if isinstance(dataset, pd.DataFrame):
df = dataset
else:
df = dataset.to_pandas()
df.drop(
columns=["answer", "prompt", "train_text"], inplace=True, errors="ignore"
)
else:
df = pd.read_csv(results_path, on_bad_lines="warn")
df[model_name] = predictions
if debug:
print(df.head(1))
df.to_csv(results_path, index=False)
def load_logical_reasoning_dataset(
data_path,
tokenizer=None,
using_p1=True,
chinese_prompt=True,
test_data=None,
num_shots=0,
format_test_only=False,
):
postfix = "" if chinese_prompt else "_en"
train_data_file = data_path + f"/train{postfix}.csv"
test_data_file = data_path + f"/{test_data if test_data else 'dev'}{postfix}.csv"
print("loading train/test data files")
datasets = load_dataset(
"csv",
data_files={"train": train_data_file, "test": test_data_file},
)
if tokenizer:
reasoning_prompt = (
get_prompt_template(using_p1, chinese_prompt)
if num_shots == 0
else get_few_shot_prompt_template(num_shots, datasets["train"].to_pandas())
)
def formatting_prompts_func(examples):
inputs = examples["text"]
outputs = examples["label"]
puzzles = examples["puzzle"]
truths = examples["truth"]
messages = [
{
"role": "system",
"content": system_prompt,
},
None,
]
model_name = os.getenv("MODEL_NAME")
if "gemma" in model_name.lower():
messages = messages[1:]
texts = []
prompts = []
for input, output, puzzle, truth in zip(inputs, outputs, puzzles, truths):
prompt = reasoning_prompt.format(puzzle, truth, input)
messages[-1] = {"role": "user", "content": prompt}
prompt = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
prompts.append(prompt)
texts.append(prompt + output + tokenizer.eos_token if output else "")
return {"train_text": texts, "prompt": prompts}
if format_test_only:
datasets["test"] = datasets["test"].map(
formatting_prompts_func,
batched=True,
)
else:
datasets = datasets.map(
formatting_prompts_func,
batched=True,
)
print(datasets)
return datasets
def get_metrics(df):
metrics_df = pd.DataFrame(df.columns.T)[2:]
metrics_df.rename(columns={0: "model"}, inplace=True)
metrics_df["model"] = metrics_df["model"].apply(lambda x: x.split("/")[-1])
metrics_df.reset_index(inplace=True)
metrics_df = metrics_df.drop(columns=["index"])
accuracy = []
all_metrics = []
for col in df.columns[2:]:
metrics = calc_metrics(df["label"], df[col], questions=df["text"], debug=True)
print(f"{col}: {metrics}")
accuracy.append(metrics["accuracy"])
all_metrics.append(metrics)
metrics_df["accuracy"] = accuracy
metrics_df["all_metrics"] = all_metrics
return metrics_df
def load_alpaca_data(data_path, using_p1=True, use_english_datasets=False):
alpaca_data_path = (
"llama-factory/data/alpaca_mgtv_p1.json"
if using_p1
else "llama-factory/data/alpaca_mgtv_p2.json"
)
if use_english_datasets:
alpaca_data_path = alpaca_data_path.replace(".json", "_en.json")
if os.path.exists(alpaca_data_path):
print("loading existing data from:", alpaca_data_path)
data = pd.read_json(alpaca_data_path, orient="records", lines=False)
return data
print("loading new data from:", alpaca_data_path)
chinese_prompt = not use_english_datasets
datasets = load_logical_reasoning_dataset(
data_path, using_p1=using_p1, chinese_prompt=chinese_prompt
)
prompt_template = get_prompt_template(using_p1, chinese_prompt)
df_train = datasets["train"].to_pandas()
df_train["instruction"] = df_train.apply(
lambda x: prompt_template.format(x["puzzle"], x["truth"], x["text"]), axis=1
)
df_alpaca = pd.DataFrame(
{"instruction": [""] * len(df_train), "input": [""] * len(df_train)}
)
df_alpaca["instruction"] = df_train["instruction"]
df_alpaca["output"] = df_train["label"]
df_alpaca.to_json(alpaca_data_path, orient="records", lines=False, indent=2)
return df_alpaca
def plot_value_counts(df, column_name, offset=0.1, title=None, preprocess_func=None):
# font_family = rcParams["font.family"]
# # Set the font to SimHei to support Chinese characters
# rcParams["font.family"] = "SimHei"
# rcParams["axes.unicode_minus"] = (
# False # This is to support the minus sign in Chinese.
# )
if preprocess_func:
df["backup"] = df[column_name]
df[column_name] = df[column_name].apply(preprocess_func) if preprocess_func == extract_answer else df.apply(
preprocess_func, axis=1
)
plt.figure(figsize=(8, 4))
value_counts = df[column_name].value_counts()
value_counts = value_counts.rename(index=translation_dict)
value_counts.plot(kind="bar")
# add values on top of bars
for i, v in enumerate(value_counts):
plt.text(i, v + offset, str(v), ha="center")
plt.xlabel(title or column_name)
plt.show()
# rcParams["font.family"] = font_family
if preprocess_func:
plot_confusion_matrix(df["label"], df[column_name])
df[column_name] = df["backup"]
df.drop(columns=["backup"], inplace=True)
def calc_metrics_for_col(df, col, debug=True):
metrics = calc_metrics(df["label"], df[col], questions=df["text"], debug=debug)
return metrics["accuracy"], metrics["precision"], metrics["recall"], metrics["f1"]
def get_metrics_df(df, variant="epoch", sort_columns=True):
perf_df = pd.DataFrame(
columns=[variant, "model", "run", "accuracy", "precision", "recall", "f1"]
)
columns = (
df.columns[5:].tolist()
if variant == "index"
else [
col
for col in df.columns[5:]
if variant in col or variant == "epoch" and "_torch." in col
]
)
if sort_columns:
columns = sorted(columns, key=lambda x: int(x.lower().replace("-1m", "").replace("chat", "0").replace("instruct", "0").split("-")[-1].split("_")[0]))
print("columns:", columns)
for i, col in enumerate(columns):
metrics = calc_metrics(df["label"], df[col], questions=df["text"], debug=False)
new_model_metrics = {
variant: i / 5 if variant == "epoch" else i + 1,
"model": col if "/" not in col else col.split("/")[1].split("_torch")[0],
"run": col,
}
if variant == "shots":
parts = col.split("/shots-")
new_model_metrics["shots"] = int(parts[1])
if variant in new_model_metrics["model"]:
new_model_metrics["model"] = parts[0]
new_model_metrics.update(metrics)
# Convert the dictionary to a DataFrame and concatenate it with the existing DataFrame
perf_df = pd.concat(
[perf_df, pd.DataFrame([new_model_metrics])], ignore_index=True
)
return perf_df
def plot_metrics(perf_df, model_name, variant="epoch", offset=0.01):
fig, ax = plt.subplots(1, 1, figsize=(8, 4))
perf_df = perf_df[perf_df["model"] == model_name]
# Ensure the lengths of perf_df["epoch"], perf_df["accuracy"], and perf_df["f1"] are the same
min_length = min(
len(perf_df[variant]), len(perf_df["accuracy"]), len(perf_df["f1"])
)
perf_df = perf_df.iloc[:min_length]
# Plot accuracy and f1 on the same chart with different markers
ax.plot(
perf_df[variant], perf_df["accuracy"], marker="o", label="Accuracy", color="r"
)
ax.plot(
perf_df[variant], perf_df["f1"], marker="s", label="F1 Score", color="b"
) # Square marker for F1 Score
# Add values on top of points
for i in range(min_length):
print(f"{perf_df[variant].iloc[i]}: {perf_df['run'].iloc[i]}")
ax.annotate(
f"{perf_df['accuracy'].iloc[i]*100:.2f}%",
(perf_df[variant].iloc[i], perf_df["accuracy"].iloc[i]),
ha="center",
va="bottom", # Move accuracy numbers below the points
xytext=(0, -15),
textcoords="offset points",
fontsize=10,
color="r",
)
ax.annotate(
f"{perf_df['f1'].iloc[i]*100:.2f}%",
(perf_df[variant].iloc[i], perf_df["f1"].iloc[i]),
ha="center",
va="top", # Move F1 score numbers above the points
xytext=(0, 15), # Offset by 15 points vertically
textcoords="offset points",
fontsize=10,
color="b",
)
# Set y-axis limit
ylimits = ax.get_ylim()
ax.set_ylim(ylimits[0] - offset, ylimits[1] + offset)
# Add title and labels
ax.set_xlabel(
"Epoch (0: base model, 0.2 - 2: fine-tuned models)"
if variant == "epoch"
else "Number of Shots"
)
ax.set_ylabel("Accuracy and F1 Score")
ax.xaxis.set_major_locator(MultipleLocator(0.2 if variant == "epoch" else 5))
ax.set_title(f"Performance Analysis Across Checkpoints for the {model_name} Model")
# Rotate x labels
plt.xticks(rotation=0)
plt.grid(True)
# plt.tight_layout()
# Set legend at the right to avoid overlapping with lines
plt.legend(loc="center left", bbox_to_anchor=(1.0, 0.5))
plt.show()
def reasoning_with_openai(
row,
user_prompt,
max_tokens=None,
model="gpt-4o-mini",
base_url=None,
temperature=0,
using_system_prompt=True,
is_openai=True,
):
return invoke_langchain(
user_prompt.format(row["puzzle"], row["truth"], row["text"]),
system_prompt=system_prompt if using_system_prompt else None,
max_tokens=max_tokens,
model=model,
base_url=base_url,
temperature=temperature,
is_openai=is_openai,
)
def eval_openai(
eval_dataset,
model="gpt-4o-mini",
max_new_tokens=300,
num_shots=0,
train_dataset=None,
):
user_prompt = (
get_prompt_template(using_p1=False, chinese_prompt=True)
if num_shots == 0
else get_few_shot_prompt_template(num_shots, train_dataset)
)
print("user_prompt:", user_prompt)
total = len(eval_dataset)
predictions = []
is_using_o1 = "o1" in model
is_openai = "claude" not in model
for i in tqdm(range(total)):
output = reasoning_with_openai(
eval_dataset.iloc[i],
user_prompt,
model=model,
max_tokens=None if is_using_o1 else max_new_tokens,
temperature=1 if is_using_o1 else 0,
using_system_prompt=not is_using_o1,
is_openai=is_openai,
)
predictions.append(output)
return predictions
def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"):
# font_family = rcParams["font.family"]
# # Set the font to SimHei to support Chinese characters
# rcParams["font.family"] = "SimHei"
# rcParams["axes.unicode_minus"] = (
# False # This is to support the minus sign in Chinese.
# )
labels = np.unique(y_true)
y_pred = [extract_answer(text) for text in y_pred]
cm = confusion_matrix(y_true, y_pred, labels=labels)
cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
fig, ax = plt.subplots(figsize=(8, 8))
labels = [translation_dict[x] for x in labels]
sns.heatmap(
cm,
annot=True,
fmt=".4f",
cmap="Blues",
xticklabels=labels,
yticklabels=labels,
)
ax.set_title(title)
ax.set_xlabel("Predicted labels")
ax.set_ylabel("True labels")
plt.show()
# rcParams["font.family"] = font_family
def majority_vote(r1, r2, r3):
label = r2
if r1 == r3:
label = r1
return label
def load_openai_batch_data(data_path, num_shots=10, model="o1-mini", debug=True):
openai_data_path = f"{data_path}/{model}.jsonl"
if os.path.exists(openai_data_path):
print("loading existing data from:", openai_data_path)
data = pd.read_json(openai_data_path, orient="records", lines=True)
return data
datasets = load_logical_reasoning_dataset(data_path)
df_train = datasets["train"].to_pandas()
prompt = get_few_shot_prompt_template(num_shots, df_train, debug=debug)
messages = []
df_test = datasets["test"].to_pandas()
for i, row in df_test.iterrows():
content = prompt.format(row["puzzle"], row["truth"], row["text"])
messages.append(
{
"custom_id": f"request-{i + 1}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": model,
"messages": [
{"role": "user", "content": content},
],
},
}
)
df_openai = pd.DataFrame(messages)
df_openai.to_json(openai_data_path, orient="records", lines=True)
return df_openai