Spaces:
Build error
Build error
import os | |
import re | |
import pandas as pd | |
import seaborn as sns | |
import matplotlib.pyplot as plt | |
from datasets import load_dataset | |
from tqdm import tqdm | |
print(f"loading {__file__}") | |
P1 = """你是一个逻辑游戏的主持人。游戏规则如下: | |
1. 参与者会得到一个谜题。 | |
2. 参与者可以通过提问来获取线索,尝试解开谜题。 | |
3. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。 | |
4. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 | |
5. 参与者需要根据回答来推理,并最终找出谜题的正确答案。 | |
请严格按照这些规则回答参与者提出的问题。 | |
谜题: {} | |
实际情况: {} | |
参与者提出的问题: {} | |
""" | |
P2 = """你是一个情景猜谜游戏的主持人。游戏规则如下: | |
1. 参与者会得到一个谜面,谜面会描述一个简单又难以理解的事件。 | |
2. 主持人知道谜底,谜底是谜面的答案。 | |
3. 参与者可以询问任何封闭式问题来找寻事件的真相。 | |
4. 对于每个问题,主持人将根据实际情况回答以下五个选项之一:是、不是、不重要、回答正确、问法错误。各回答的判断标准如下: | |
- 若谜面和谜底能找到问题的答案,回答:是或者不是 | |
- 若谜面和谜底不能直接或者间接推断出问题的答案,回答:不重要 | |
- 若参与者提问不是一个封闭式问题或者问题难以理解,回答:问法错误 | |
- 若参与者提问基本还原了谜底真相,回答:回答正确 | |
5. 回答中不能添加任何其它信息,也不能省略选项中的任何一个字。例如,不可以把“不是”省略成“不”。 | |
请严格按照这些规则回答参与者提出的问题。 | |
**谜面:** {} | |
**谜底:** {} | |
**参与者提出的问题:** {} | |
""" | |
def extract_answer(text, debug=False): | |
if text: | |
# Remove the begin and end tokens | |
text = re.sub( | |
r".*?(assistant|\[/INST\]).+?\b", | |
"", | |
text, | |
flags=re.DOTALL | re.MULTILINE, | |
) | |
if debug: | |
print("--------\nstep 1:", text) | |
text = re.sub(r"<.+?>.*", "", text, flags=re.DOTALL | re.MULTILINE) | |
if debug: | |
print("--------\nstep 2:", text) | |
text = re.sub( | |
r".*?end_header_id\|>\n\n", "", text, flags=re.DOTALL | re.MULTILINE | |
) | |
if debug: | |
print("--------\nstep 3:", text) | |
text = text.split(".")[0].strip() | |
if debug: | |
print("--------\nstep 4:", text) | |
text = re.sub( | |
r"^Response:.+?\b", | |
"", | |
text, | |
flags=re.DOTALL | re.MULTILINE, | |
) | |
if debug: | |
print("--------\nstep 5:", text) | |
return text | |
def calc_metrics(references, predictions, debug=False): | |
assert len(references) == len( | |
predictions | |
), f"lengths are difference: {len(references)} != {len(predictions)}" | |
predictions = [extract_answer(text) for text in predictions] | |
correct = [1 if ref == pred else 0 for ref, pred in zip(references, predictions)] | |
accuracy = sum(correct) / len(references) | |
results = {"accuracy": accuracy} | |
if debug: | |
incorrect_ids = [i for i, c in enumerate(correct) if c == 0] | |
results["incorrect_ids"] = incorrect_ids | |
return results | |
def save_results(model_name, results_path, dataset, predictions, debug=False): | |
if not os.path.exists(results_path): | |
# Get the directory part of the file path | |
dir_path = os.path.dirname(results_path) | |
# Create all directories in the path (if they don't exist) | |
os.makedirs(dir_path, exist_ok=True) | |
df = dataset.to_pandas() | |
df.drop(columns=["answer", "prompt", "train_text"], inplace=True) | |
else: | |
df = pd.read_csv(results_path, on_bad_lines="warn") | |
df[model_name] = predictions | |
if debug: | |
print(df.head(1)) | |
df.to_csv(results_path, index=False) | |
def load_logical_reasoning_dataset( | |
data_path, tokenizer=None, using_p1=True, chinese_prompt=True | |
): | |
postfix = "" if chinese_prompt else "_en" | |
train_data_file = data_path + f"/train{postfix}.csv" | |
test_data_file = data_path + f"/dev{postfix}.csv" | |
print("loading train/test data files") | |
datasets = load_dataset( | |
"csv", | |
data_files={"train": train_data_file, "test": test_data_file}, | |
) | |
if tokenizer: | |
reasoning_prompt = ( | |
(P1 if using_p1 else P2) | |
if chinese_prompt | |
else """You are the host of a situational guessing game. The rules of the game are as follows: | |
1. Participants will receive a riddle that describes a simple yet difficult to understand event. | |
2. The host knows the answer, which is the solution to the riddle. | |
3. Participants can ask any closed-ended questions to uncover the truth of the event. | |
4. For each question, the host will respond with one of the following five options based on the actual situation: Yes, No, Unimportant, Correct answer, or Incorrect questioning. The criteria for each response are as follows: | |
- If the riddle and answer can provide an answer to the question, respond with: Yes or No | |
- If the riddle and answer cannot directly or indirectly infer an answer to the question, respond with: Unimportant | |
- If the participant's question is not a closed-ended question or is difficult to understand, respond with: Incorrect questioning | |
- If the participant's question essentially reveals the truth of the answer, respond with: Correct answer | |
5. The response must not include any additional information, nor should any word be omitted from the options. For example, "No" cannot be abbreviated to "N". | |
Please strictly follow these rules when answering the participant's questions. | |
**Riddle:** {} | |
**Answer:** {} | |
**Participant's question:** {} | |
""" | |
) | |
def formatting_prompts_func(examples): | |
inputs = examples["text"] | |
outputs = examples["label"] | |
puzzles = examples["puzzle"] | |
truths = examples["truth"] | |
messages = [ | |
{ | |
"role": "system", | |
"content": "You are an expert in logical reasoning.", | |
}, | |
None, | |
] | |
model_name = os.getenv("MODEL_NAME") | |
if "mistral" in model_name.lower(): | |
messages = messages[1:] | |
texts = [] | |
prompts = [] | |
for input, output, puzzle, truth in zip(inputs, outputs, puzzles, truths): | |
prompt = reasoning_prompt.format(puzzle, truth, input) | |
messages[-1] = {"role": "user", "content": prompt} | |
prompt = tokenizer.apply_chat_template( | |
messages, tokenize=False, add_generation_prompt=True | |
) | |
prompts.append(prompt) | |
texts.append(prompt + output + tokenizer.eos_token) | |
return {"train_text": texts, "prompt": prompts} | |
datasets = datasets.map( | |
formatting_prompts_func, | |
batched=True, | |
) | |
print(datasets) | |
return datasets | |
def eval_model(model, tokenizer, eval_dataset): | |
total = len(eval_dataset) | |
predictions = [] | |
for i in tqdm(range(total)): | |
inputs = tokenizer( | |
eval_dataset["prompt"][i : i + 1], | |
return_tensors="pt", | |
).to("cuda") | |
outputs = model.generate(**inputs, max_new_tokens=4096, use_cache=False) | |
decoded_output = tokenizer.batch_decode(outputs) | |
debug = i == 0 | |
decoded_output = [ | |
extract_answer(output, debug=debug) for output in decoded_output | |
] | |
predictions.extend(decoded_output) | |
return predictions | |
def save_model( | |
model, | |
tokenizer, | |
include_gguf=True, | |
include_merged=True, | |
publish=True, | |
): | |
try: | |
token = os.getenv("HF_TOKEN") or None | |
model_name = os.getenv("MODEL_NAME") | |
save_method = "lora" | |
quantization_method = "q5_k_m" | |
model_names = get_model_names( | |
model_name, save_method=save_method, quantization_method=quantization_method | |
) | |
model.save_pretrained(model_names["local"]) | |
tokenizer.save_pretrained(model_names["local"]) | |
if publish: | |
model.push_to_hub( | |
model_names["hub"], | |
token=token, | |
) | |
tokenizer.push_to_hub( | |
model_names["hub"], | |
token=token, | |
) | |
if include_merged: | |
model.save_pretrained_merged( | |
model_names["local"] + "-merged", tokenizer, save_method=save_method | |
) | |
if publish: | |
model.push_to_hub_merged( | |
model_names["hub"] + "-merged", | |
tokenizer, | |
save_method="lora", | |
token="", | |
) | |
if include_gguf: | |
model.save_pretrained_gguf( | |
model_names["local-gguf"], | |
tokenizer, | |
quantization_method=quantization_method, | |
) | |
if publish: | |
model.push_to_hub_gguf( | |
model_names["hub-gguf"], | |
tokenizer, | |
quantization_method=quantization_method, | |
token=token, | |
) | |
except Exception as e: | |
print(e) | |
def get_metrics(df): | |
metrics_df = pd.DataFrame(df.columns.T)[2:] | |
metrics_df.rename(columns={0: "model"}, inplace=True) | |
metrics_df["model"] = metrics_df["model"].apply(lambda x: x.split("/")[-1]) | |
metrics_df.reset_index(inplace=True) | |
metrics_df = metrics_df.drop(columns=["index"]) | |
accuracy = [] | |
meteor = [] | |
bleu_1 = [] | |
rouge_l = [] | |
all_metrics = [] | |
for col in df.columns[2:]: | |
metrics = calc_metrics(df["english"], df[col], debug=True) | |
print(f"{col}: {metrics}") | |
accuracy.append(metrics["accuracy"]) | |
all_metrics.append(metrics) | |
metrics_df["accuracy"] = accuracy | |
metrics_df["all_metrics"] = all_metrics | |
return metrics_df | |
def load_alpaca_data(data_path, using_p1=True, use_english_datasets=False): | |
alpaca_data_path = ( | |
"llama-factory/data/alpaca_mgtv_p1.json" | |
if using_p1 | |
else "llama-factory/data/alpaca_mgtv_p2.json" | |
) | |
if os.path.exists(alpaca_data_path): | |
print("loading existing data from:", alpaca_data_path) | |
data = pd.read_json(alpaca_data_path, orient="records", lines=False) | |
return data | |
print("loading new data from:", alpaca_data_path) | |
datasets = load_logical_reasoning_dataset( | |
data_path, chinese_prompt=not use_english_datasets | |
) | |
prompt_template = P1 if using_p1 else P2 | |
df_train = datasets["train"].to_pandas() | |
df_train["instruction"] = df_train.apply( | |
lambda x: prompt_template.format(x["puzzle"], x["truth"], x["text"]), axis=1 | |
) | |
df_alpaca = pd.DataFrame( | |
{"instruction": [""] * len(df_train), "input": [""] * len(df_train)} | |
) | |
df_alpaca["instruction"] = df_train["instruction"] | |
df_alpaca["output"] = df_train["label"] | |
df_alpaca.to_json(alpaca_data_path, orient="records", lines=False, indent=2) | |
return df_alpaca | |