import os import re import pandas as pd from tqdm import tqdm import seaborn as sns import matplotlib.pyplot as plt from matplotlib import rcParams from matplotlib.ticker import MultipleLocator from datasets import load_dataset import numpy as np from sklearn.metrics import ( accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, ) from llm_toolkit.llm_utils import invoke_langchain print(f"loading {__file__}") P1 = """你是一个逻辑游戏的主持人。游戏规则如下: 1. 参与者会得到一个谜题。 2. 参与者可以通过提问来获取线索,尝试解开谜题。 3. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。 4. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 5. 参与者需要根据回答来推理,并最终找出谜题的正确答案。 请严格按照这些规则回答参与者提出的问题。 谜题: {} 实际情况: {} 参与者提出的问题: {} """ P1_en = """You are the host of a logic game. The rules of the game are as follows: 1. Participants will receive a puzzle. 2. Participants can ask questions to obtain clues and try to solve the puzzle. 3. For each question, the host will answer with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning. 4. The answer cannot include any additional information, nor can any word in the options be omitted. For example, “No” cannot be shortened to “N”. 5. Participants need to infer and ultimately find the correct answer to the puzzle based on the responses. Please strictly adhere to these rules when answering participants’ questions. Puzzle: {} Actual situation: {} Question from participants: {}""" P2 = """你是一个情景猜谜游戏的主持人。游戏规则如下: 1. 参与者会得到一个谜面,谜面会描述一个简单又难以理解的事件。 2. 主持人知道谜底,谜底是谜面的答案。 3. 参与者可以询问任何封闭式问题来找寻事件的真相。 4. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。各回答的判断标准如下: - 若谜面和谜底能找到问题的答案,回答:是或者不是 - 若谜面和谜底不能直接或者间接推断出问题的答案,回答:不重要 - 若参与者提问不是一个封闭式问题或者问题难以理解,回答:问法错误 - 若参与者提问基本还原了谜底真相,回答:回答正确 5. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 请严格按照这些规则回答参与者提出的问题。 谜面: {} 谜底: {} 参与者提出的问题: {} 回答: """ P2_en = """You are the host of a situational guessing game. The rules of the game are as follows: 1. Participants will receive a riddle that describes a simple yet difficult to understand event. 2. The host knows the truth, which is the solution to the riddle. 3. Participants can ask any closed-ended questions to uncover the truth of the event. 4. For each question, the host will respond with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning. The criteria for each response are as follows: - If the riddle and answer can provide an answer to the question, respond with: Yes or No - If the riddle and answer cannot directly or indirectly infer an answer to the question, respond with: Unimportant - If the participant's question is not a closed-ended question or is difficult to understand, respond with: Incorrect questioning - If the participant's question essentially reveals the truth of the answer, respond with: Correct answer 5. The response must not include any additional information, nor should any word be omitted from the options. For example, "No" cannot be abbreviated to "N". Please strictly follow these rules when answering the participant's questions. Riddle: {} Truth: {} Participant's question: {} """ system_prompt = "You are an expert in logical reasoning." P2_few_shot = """你是一个情景猜谜游戏的主持人。游戏规则如下: 1. 参与者会得到一个谜面,谜面会描述一个简单又难以理解的事件。 2. 主持人知道谜底,谜底是谜面的答案。 3. 参与者可以询问任何封闭式问题来找寻事件的真相。 4. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。各回答的判断标准如下: - 若谜面和谜底能找到问题的答案,回答:是或者不是 - 若谜面和谜底不能直接或者间接推断出问题的答案,回答:不重要 - 若参与者提问不是一个封闭式问题或者问题难以理解,回答:问法错误 - 若参与者提问基本还原了谜底真相,回答:回答正确 5. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 请严格按照这些规则回答参与者提出的问题。 示例输入和输出: {examples} 谜面: {} 谜底: {} 参与者提出的问题: {} 回答: """ try: df_translation = pd.read_csv("datasets/mgtv/unique_translations.csv") translation_dict = df_translation.set_index("chinese").to_dict(orient="index") translation_dict = {k: v["english"] for k, v in translation_dict.items()} except Exception as e: print(e) translation_dict = {} def get_prompt_template(using_p1=True, chinese_prompt=True): if using_p1: return P1 if chinese_prompt else P1_en else: return P2 if chinese_prompt else P2_en def get_few_shot_prompt_template(num_shots, train_dataset, debug=False): if num_shots == 0: return get_prompt_template(using_p1=False, chinese_prompt=True) labels = train_dataset["label"].unique() if debug: print("num_shots:", num_shots) print("labels:", labels) examples = "" index = 0 while num_shots > 0: for label in labels: while train_dataset["label"][index] != label: index += 1 row = train_dataset.iloc[index] examples += f"""谜面: {row["puzzle"]} 谜底: {row["truth"]} 参与者提出的问题: {row["text"]} 回答: {row["label"]} """ num_shots -= 1 if num_shots == 0: break prompt = P2_few_shot.replace("{examples}", examples) if debug: print("P2_few_shot:", prompt) return prompt def extract_answer(text, debug=False): if text and isinstance(text, str): # Remove the begin and end tokens text = re.sub( r".*?(assistant|\[/INST\]).+?\b", "", text, flags=re.DOTALL | re.MULTILINE, ) if debug: print("--------\nstep 1:", text) text = re.sub(r"<.+?>.*", "", text, flags=re.DOTALL | re.MULTILINE) if debug: print("--------\nstep 2:", text) text = re.sub( r".*?end_header_id\|>\n\n", "", text, flags=re.DOTALL | re.MULTILINE ) if debug: print("--------\nstep 3:", text) text = text.split(".")[0].strip() text = text.split("\n")[0].strip() text = text.split("。")[0].strip() text = text.replace("回答: ", "").strip() if debug: print("--------\nstep 4:", text) text = re.sub( r"^Response:.+?\b", "", text, flags=re.DOTALL | re.MULTILINE, ) if debug: print("--------\nstep 5:", text) return text.strip() return "" def extract_answer_from_text(text, question): labels = ['不是', '是', '不重要', '回答正确', '问法错误'] original_text = text text = text.split("回答:")[-1] found_question = False for line in text.split("\n"): if question in line: found_question = True elif found_question: text = line break text = extract_answer(text) if text in labels: return text text = text.replace("Human: ", "") text = text.replace("Assistant: ", "") if text in labels: return text # print(f"not found: {question} | {original_text} | {text}") return text def calc_metrics(references, predictions, questions=None, debug=False): assert len(references) == len( predictions ), f"lengths are difference: {len(references)} != {len(predictions)}" labels = np.unique(references) valid_classifications = [1 if p in labels else 0 for p in predictions] predictions = [extract_answer(text) for text in predictions] if questions is None else [extract_answer_from_text(text, question) for text, question in zip(predictions, questions)] accuracy = accuracy_score(references, predictions) results = {"accuracy": accuracy} if debug: incorrect_ids = [i for i, p in enumerate(predictions) if p != references[i]] results["incorrect_ids"] = incorrect_ids precision = precision_score( references, predictions, average="weighted", labels=labels ) results["precision"] = float(precision) recall = recall_score(references, predictions, average="weighted", labels=labels) results["recall"] = float(recall) f1 = f1_score(references, predictions, average="weighted", labels=labels) results["f1"] = float(f1) results["ratio_valid_classifications"] = sum(valid_classifications) / len( valid_classifications ) return results def save_results(model_name, results_path, dataset, predictions, debug=False): if not os.path.exists(results_path): # Get the directory part of the file path dir_path = os.path.dirname(results_path) # Create all directories in the path (if they don't exist) os.makedirs(dir_path, exist_ok=True) if isinstance(dataset, pd.DataFrame): df = dataset else: df = dataset.to_pandas() df.drop( columns=["answer", "prompt", "train_text"], inplace=True, errors="ignore" ) else: df = pd.read_csv(results_path, on_bad_lines="warn") df[model_name] = predictions if debug: print(df.head(1)) df.to_csv(results_path, index=False) def load_logical_reasoning_dataset( data_path, tokenizer=None, using_p1=True, chinese_prompt=True, test_data=None, num_shots=0, format_test_only=False, ): postfix = "" if chinese_prompt else "_en" train_data_file = data_path + f"/train{postfix}.csv" test_data_file = data_path + f"/{test_data if test_data else 'dev'}{postfix}.csv" print("loading train/test data files") datasets = load_dataset( "csv", data_files={"train": train_data_file, "test": test_data_file}, ) if tokenizer: reasoning_prompt = ( get_prompt_template(using_p1, chinese_prompt) if num_shots == 0 else get_few_shot_prompt_template(num_shots, datasets["train"].to_pandas()) ) def formatting_prompts_func(examples): inputs = examples["text"] outputs = examples["label"] puzzles = examples["puzzle"] truths = examples["truth"] messages = [ { "role": "system", "content": system_prompt, }, None, ] model_name = os.getenv("MODEL_NAME") if "gemma" in model_name.lower(): messages = messages[1:] texts = [] prompts = [] for input, output, puzzle, truth in zip(inputs, outputs, puzzles, truths): prompt = reasoning_prompt.format(puzzle, truth, input) messages[-1] = {"role": "user", "content": prompt} prompt = tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) prompts.append(prompt) texts.append(prompt + output + tokenizer.eos_token if output else "") return {"train_text": texts, "prompt": prompts} if format_test_only: datasets["test"] = datasets["test"].map( formatting_prompts_func, batched=True, ) else: datasets = datasets.map( formatting_prompts_func, batched=True, ) print(datasets) return datasets def get_metrics(df): metrics_df = pd.DataFrame(df.columns.T)[2:] metrics_df.rename(columns={0: "model"}, inplace=True) metrics_df["model"] = metrics_df["model"].apply(lambda x: x.split("/")[-1]) metrics_df.reset_index(inplace=True) metrics_df = metrics_df.drop(columns=["index"]) accuracy = [] all_metrics = [] for col in df.columns[2:]: metrics = calc_metrics(df["label"], df[col], questions=df["text"], debug=True) print(f"{col}: {metrics}") accuracy.append(metrics["accuracy"]) all_metrics.append(metrics) metrics_df["accuracy"] = accuracy metrics_df["all_metrics"] = all_metrics return metrics_df def load_alpaca_data(data_path, using_p1=True, use_english_datasets=False): alpaca_data_path = ( "llama-factory/data/alpaca_mgtv_p1.json" if using_p1 else "llama-factory/data/alpaca_mgtv_p2.json" ) if use_english_datasets: alpaca_data_path = alpaca_data_path.replace(".json", "_en.json") if os.path.exists(alpaca_data_path): print("loading existing data from:", alpaca_data_path) data = pd.read_json(alpaca_data_path, orient="records", lines=False) return data print("loading new data from:", alpaca_data_path) chinese_prompt = not use_english_datasets datasets = load_logical_reasoning_dataset( data_path, using_p1=using_p1, chinese_prompt=chinese_prompt ) prompt_template = get_prompt_template(using_p1, chinese_prompt) df_train = datasets["train"].to_pandas() df_train["instruction"] = df_train.apply( lambda x: prompt_template.format(x["puzzle"], x["truth"], x["text"]), axis=1 ) df_alpaca = pd.DataFrame( {"instruction": [""] * len(df_train), "input": [""] * len(df_train)} ) df_alpaca["instruction"] = df_train["instruction"] df_alpaca["output"] = df_train["label"] df_alpaca.to_json(alpaca_data_path, orient="records", lines=False, indent=2) return df_alpaca def plot_value_counts(df, column_name, offset=0.1, title=None, preprocess_func=None): # font_family = rcParams["font.family"] # # Set the font to SimHei to support Chinese characters # rcParams["font.family"] = "SimHei" # rcParams["axes.unicode_minus"] = ( # False # This is to support the minus sign in Chinese. # ) if preprocess_func: df["backup"] = df[column_name] df[column_name] = df[column_name].apply(preprocess_func) if preprocess_func == extract_answer else df.apply( preprocess_func, axis=1 ) plt.figure(figsize=(8, 4)) value_counts = df[column_name].value_counts() value_counts = value_counts.rename(index=translation_dict) value_counts.plot(kind="bar") # add values on top of bars for i, v in enumerate(value_counts): plt.text(i, v + offset, str(v), ha="center") plt.xlabel(title or column_name) plt.show() # rcParams["font.family"] = font_family if preprocess_func: plot_confusion_matrix(df["label"], df[column_name]) df[column_name] = df["backup"] df.drop(columns=["backup"], inplace=True) def calc_metrics_for_col(df, col, debug=True): metrics = calc_metrics(df["label"], df[col], questions=df["text"], debug=debug) return metrics["accuracy"], metrics["precision"], metrics["recall"], metrics["f1"] def get_metrics_df(df, variant="epoch", sort_columns=True): perf_df = pd.DataFrame( columns=[variant, "model", "run", "accuracy", "precision", "recall", "f1"] ) columns = ( df.columns[5:].tolist() if variant == "index" else [ col for col in df.columns[5:] if variant in col or variant == "epoch" and "_torch." in col ] ) if sort_columns: columns = sorted(columns, key=lambda x: int(x.lower().replace("-1m", "").replace("chat", "0").replace("instruct", "0").split("-")[-1].split("_")[0])) print("columns:", columns) for i, col in enumerate(columns): metrics = calc_metrics(df["label"], df[col], questions=df["text"], debug=False) new_model_metrics = { variant: i / 5 if variant == "epoch" else i + 1, "model": col if "/" not in col else col.split("/")[1].split("_torch")[0], "run": col, } if variant == "shots": parts = col.split("/shots-") new_model_metrics["shots"] = int(parts[1]) if variant in new_model_metrics["model"]: new_model_metrics["model"] = parts[0] new_model_metrics.update(metrics) # Convert the dictionary to a DataFrame and concatenate it with the existing DataFrame perf_df = pd.concat( [perf_df, pd.DataFrame([new_model_metrics])], ignore_index=True ) return perf_df def plot_metrics(perf_df, model_name, variant="epoch", offset=0.01): fig, ax = plt.subplots(1, 1, figsize=(8, 4)) perf_df = perf_df[perf_df["model"] == model_name] # Ensure the lengths of perf_df["epoch"], perf_df["accuracy"], and perf_df["f1"] are the same min_length = min( len(perf_df[variant]), len(perf_df["accuracy"]), len(perf_df["f1"]) ) perf_df = perf_df.iloc[:min_length] # Plot accuracy and f1 on the same chart with different markers ax.plot( perf_df[variant], perf_df["accuracy"], marker="o", label="Accuracy", color="r" ) ax.plot( perf_df[variant], perf_df["f1"], marker="s", label="F1 Score", color="b" ) # Square marker for F1 Score # Add values on top of points for i in range(min_length): print(f"{perf_df[variant].iloc[i]}: {perf_df['run'].iloc[i]}") ax.annotate( f"{perf_df['accuracy'].iloc[i]*100:.2f}%", (perf_df[variant].iloc[i], perf_df["accuracy"].iloc[i]), ha="center", va="bottom", # Move accuracy numbers below the points xytext=(0, -15), textcoords="offset points", fontsize=10, color="r", ) ax.annotate( f"{perf_df['f1'].iloc[i]*100:.2f}%", (perf_df[variant].iloc[i], perf_df["f1"].iloc[i]), ha="center", va="top", # Move F1 score numbers above the points xytext=(0, 15), # Offset by 15 points vertically textcoords="offset points", fontsize=10, color="b", ) # Set y-axis limit ylimits = ax.get_ylim() ax.set_ylim(ylimits[0] - offset, ylimits[1] + offset) # Add title and labels ax.set_xlabel( "Epoch (0: base model, 0.2 - 2: fine-tuned models)" if variant == "epoch" else "Number of Shots" ) ax.set_ylabel("Accuracy and F1 Score") ax.xaxis.set_major_locator(MultipleLocator(0.2 if variant == "epoch" else 5)) ax.set_title(f"Performance Analysis Across Checkpoints for the {model_name} Model") # Rotate x labels plt.xticks(rotation=0) plt.grid(True) # plt.tight_layout() # Set legend at the right to avoid overlapping with lines plt.legend(loc="center left", bbox_to_anchor=(1.0, 0.5)) plt.show() def reasoning_with_openai( row, user_prompt, max_tokens=None, model="gpt-4o-mini", base_url=None, temperature=0, using_system_prompt=True, is_openai=True, ): return invoke_langchain( user_prompt.format(row["puzzle"], row["truth"], row["text"]), system_prompt=system_prompt if using_system_prompt else None, max_tokens=max_tokens, model=model, base_url=base_url, temperature=temperature, is_openai=is_openai, ) def eval_openai( eval_dataset, model="gpt-4o-mini", max_new_tokens=300, num_shots=0, train_dataset=None, ): user_prompt = ( get_prompt_template(using_p1=False, chinese_prompt=True) if num_shots == 0 else get_few_shot_prompt_template(num_shots, train_dataset) ) print("user_prompt:", user_prompt) total = len(eval_dataset) predictions = [] is_using_o1 = "o1" in model is_openai = "claude" not in model for i in tqdm(range(total)): output = reasoning_with_openai( eval_dataset.iloc[i], user_prompt, model=model, max_tokens=None if is_using_o1 else max_new_tokens, temperature=1 if is_using_o1 else 0, using_system_prompt=not is_using_o1, is_openai=is_openai, ) predictions.append(output) return predictions def plot_confusion_matrix(y_true, y_pred, title="Confusion Matrix"): # font_family = rcParams["font.family"] # # Set the font to SimHei to support Chinese characters # rcParams["font.family"] = "SimHei" # rcParams["axes.unicode_minus"] = ( # False # This is to support the minus sign in Chinese. # ) labels = np.unique(y_true) y_pred = [extract_answer(text) for text in y_pred] cm = confusion_matrix(y_true, y_pred, labels=labels) cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis] fig, ax = plt.subplots(figsize=(8, 8)) labels = [translation_dict[x] for x in labels] sns.heatmap( cm, annot=True, fmt=".4f", cmap="Blues", xticklabels=labels, yticklabels=labels, ) ax.set_title(title) ax.set_xlabel("Predicted labels") ax.set_ylabel("True labels") plt.show() # rcParams["font.family"] = font_family def majority_vote(r1, r2, r3): label = r2 if r1 == r3: label = r1 return label def load_openai_batch_data(data_path, num_shots=10, model="o1-mini", debug=True): openai_data_path = f"{data_path}/{model}.jsonl" if os.path.exists(openai_data_path): print("loading existing data from:", openai_data_path) data = pd.read_json(openai_data_path, orient="records", lines=True) return data datasets = load_logical_reasoning_dataset(data_path) df_train = datasets["train"].to_pandas() prompt = get_few_shot_prompt_template(num_shots, df_train, debug=debug) messages = [] df_test = datasets["test"].to_pandas() for i, row in df_test.iterrows(): content = prompt.format(row["puzzle"], row["truth"], row["text"]) messages.append( { "custom_id": f"request-{i + 1}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": model, "messages": [ {"role": "user", "content": content}, ], }, } ) df_openai = pd.DataFrame(messages) df_openai.to_json(openai_data_path, orient="records", lines=True) return df_openai