workspace / .ipynb_checkpoints /dataloader-checkpoint.py
Penghaoo's picture
End of training
4d3e798 verified
from datasets import Dataset, DatasetDict
import pandas as pd
import numpy as np
import glob
from sklearn.model_selection import train_test_split
import re
datapath = '/cluster/work/lawecon/Work/penghao/dataset/stories/'
pairpath = '../../../work/lawecon/Work/penghao/pairs.csv'
#3600 ->time lags
class StoryPairDataset(Dataset):
def __init__(self, datapath, pairpath, tokenizer, task, used_dataset_size=-1, train_test_split=0.1,
split_by='random',
max_len=4096*2, mode='m3', max_time_window=3000, least_likes=5, margin=True):
self.datapath = datapath
print(self.datapath)
self.train_test_split = train_test_split
self.pairpath = pairpath
self.tokenizer = tokenizer
self.max_len = max_len
self.split_by = split_by
self.least_likes = least_likes
self.max_time_window = max_time_window
self.used_dataset_size = used_dataset_size
if mode == 'm2':
self.max_time_window = 12009600
else:
self.max_time_window = max_time_window
self.pair = self.load_pair()
self.task = task
self.margin = margin
self.stories = self.load_stories(self.datapath)
print(self.stories.columns)
print(len(self.stories))
# turn df into dataset
# self.dataset = datasets.Dataset.from_pandas(self.df)
self.train, self.test = self.train_test_split__()
self.train = self.marginInclude(self.train)
self.test = self.marginInclude(self.test)
# combine train and test to a single dataset, before train and test
self.dataset = self.make_dataset()
print('current setting mode is ', mode)
print('currnet setting split_by is ', split_by)
print('current setting least_likes is ', least_likes)
def load_stories(self, path):
stories = pd.DataFrame()
#print(f"Reading stories from {path}...")
for file in glob.glob(path + '*.csv'):
#print(f"Reading {file}...")
try:
# Read the CSV file into a DataFrame
df = pd.read_csv(file)
# Check if the DataFrame is empty or not
if df.empty:
print(f"Warning: {file} is empty or not readable.")
continue
# Concatenate the DataFrames
stories = pd.concat([stories, df], ignore_index=True)
except pd.errors.EmptyDataError:
# print(f"Error: {file} is empty or not readable.")
pass
except pd.errors.ParserError:
print(f"Error: {file} cannot be parsed.")
except Exception as e:
print(f"Error: An unexpected error occurred while processing {file}. Details: {str(e)}")
# contain Index(['prompt_id', 'prompt', 'story_id', 'story_title', 'story_author', 'story_url', 'link', 'genre', 'is_sensitive', 'categories', 'likes', 'story_text', 'posted_date', 'comments'], dtype='object')
return stories
def load_pair(self):
pair = pd.read_csv(self.pairpath)
# contain the colums of prompt_id, story1_id, story2_id, rel, time_lag, least_likes
pair = pair[pair['time_lag'] <= self.max_time_window]
print('the max of tima lag is ', pair['time_lag'].max())
pair = pair[pair['least_likes'] >= self.least_likes]
# swap the order of story1 and story2 if rel is negative, and makes rel positive
pair.loc[pair['rel'] < 0, ['story1_id', 'story2_id']] = pair.loc[
pair['rel'] < 0, ['story2_id', 'story1_id']].values
pair['rel'] = abs(pair['rel'])
# filter the pair if they have same story id
pair = pair[pair['story1_id'] != pair['story2_id']]
if self.used_dataset_size == -1:
self.used_dataset_size = len(pair)
else:
pair = pair.sample(n=self.used_dataset_size)
print('the total number of pairs is ', len(pair))
# remove the duplicate pairs
pair = pair.drop_duplicates(subset=['story1_id', 'story2_id'])
#remove the rel = 0
pair = pair[pair['rel'] != 0]
print('the number of effective pairs is ', len(pair))
return pair
def marginInclude(self, df):
if self.margin:
# drop the column of rel
df = df.drop(columns=['rel'])
else:
# rename rel to margin
df = df.rename(columns={'rel': 'margin'})
return df
def train_test_split__(self):
'''
split the pairs into train and test set
:return:
'''
test_size = round(len(self.pair) * self.train_test_split)
if self.split_by == 'time':
# give the pair the information of year according to the story_id
self.stories['posted_date'] = pd.to_datetime(self.stories['posted_date'])
#convert datetime64[ns] to comparable format, e.g. 2021-04-27 23:29:00 -> 20210427
self.stories['posted_date'] = self.stories['posted_date'].dt.strftime('%Y%m%d')
# the time after 2022 is test set
test = self.pair[self.pair['story1_id'].apply(lambda x: int(self.stories[self.stories['story_id'] == x]['posted_date'].values[0]) > 20220000)]
train = self.pair[self.pair['story1_id'].apply(lambda x: int(self.stories[self.stories['story_id'] == x]['posted_date'].values[0]) <= 20220000)]
print('the number of test set is ', len(test))
print('the number of train set is ', len(train))
print('the ratio of test set is ', len(test) / (len(test) + len(train)))
elif self.split_by == 'random':
train, test = train_test_split(self.pair, test_size=self.train_test_split)
# covert to huggingface dataset
elif self.split_by == 'genre':
# count the number of pairs for each category
# give the pair the information of category according to the story_id
self.pair['genre'] = self.pair['story1_id'].apply(
lambda x: self.stories[self.stories['story_id'] == x]['genre'].values[0])
genre = {}
for c in self.pair['genre'].unique():
genre[c] = len(self.pair[self.pair['genre'] == c])
# select the category to nearest to 10 per cent of the total
genre = dict(sorted(genre.items(), key=lambda item: item[1], reverse=True))#sort the genre by the number of pairs from high to low
print(genre)
total = sum(genre.values())
#select the close genre to 10% of the total
test_genre = []
test_count = 0
while test_count < total * self.train_test_split:
test_genre.append(list(genre.keys())[0])
test_count += genre[list(genre.keys())[0]]
del genre[list(genre.keys())[0]]
if test_count + genre[list(genre.keys())[0]] > total * self.train_test_split:
break
test = self.pair[self.pair['genre'].apply(lambda x: x in test_genre)]
train = self.pair[self.pair['genre'].apply(lambda x: x not in test_genre)]
print('the genre of test set is ', test_genre)
print('the percentage of test set is ', test_count / total,'where total is ', total)
elif self.split_by == 'chaos':
#instead using the pairs, we randomly assign the story id to replace the old story id from that prompt
for i in range(len(self.pair)):
self.pair.at[i, 'story1_id'] = np.random.choice(self.stories[self.stories['prompt_id'] == self.pair.at[i, 'prompt_id']]['story_id'].values)
self.pair.at[i, 'story2_id'] = np.random.choice(self.stories[self.stories['prompt_id'] == self.pair.at[i, 'prompt_id']]['story_id'].values)
train, test = train_test_split(self.pair, test_size=self.train_test_split)
return train, test
def apply_template_to_text(self, row):
# Ensure proper access to columns in pair
prompt_id, story1_id, story2_id = row[['prompt_id', 'story1_id', 'story2_id']]
# Extract text based on IDs
chosen_prompt = self.stories[self.stories['prompt_id'] == prompt_id]['prompt']
chosen_prompt = chosen_prompt.values[0]
chosen_story = self.stories[self.stories['story_id'] == story1_id]['story_title'].values[0] + '/n' + \
self.stories[self.stories['story_id'] == story1_id]['story_text'].values[0]
rejected_prompt = self.stories[self.stories['prompt_id'] == prompt_id]['prompt']
rejected_prompt = rejected_prompt.values[0]
rejected_story = self.stories[self.stories['story_id'] == story2_id]['story_title'].values[0] + '/n' + \
self.stories[self.stories['story_id'] == story2_id]['story_text'].values[0]
# Create chosen and rejected text dictionaries
chosen_text = [{'role': 'user', 'content': chosen_prompt},
{'role': 'assistant', 'content': chosen_story}]
rejected_text = [{'role': 'user', 'content': rejected_prompt},
{'role': 'assistant', 'content': rejected_story}]
# Apply tokenizer to chosen and rejected text
chosen_text = self.tokenizer.apply_chat_template(chosen_text, tokenize=False)
rejected_text = self.tokenizer.apply_chat_template(rejected_text, tokenize=False)
res = {}
res['chosen_text'] = chosen_text
res['rejected_text'] = rejected_text
#add eos and bos token
res['chosen_text'] = self.tokenizer.bos_token + res['chosen_text'] + self.tokenizer.eos_token
res['rejected_text'] = self.tokenizer.bos_token + res['rejected_text'] + self.tokenizer.eos_token
res['text'] = chosen_text
#add eos and bos token
res['text'] = self.tokenizer.bos_token + res['text'] + self.tokenizer.eos_token
if 'gemma' in self.tokenizer.name_or_path:
split_words = '<|im_start|>assistant\n'
elif 'mistral' in self.tokenizer.name_or_path or 'llama' in self.tokenizer.name_or_path:
split_words = '[/INST]'
chosen_text_tmp = chosen_text.split(split_words)[-1]
prompt_text = chosen_text.replace(chosen_text_tmp, '')
chosen_text = chosen_text_tmp
rejected_text = rejected_text.split(split_words)[-1]
res['prompt'] = prompt_text
res['chosen'] = chosen_text
res['rejected'] = rejected_text
# add bos and eos token
res['prompt'] = self.tokenizer.bos_token + res['prompt']
res['chosen'] = res['chosen'] + self.tokenizer.eos_token
res['rejected'] = res['rejected'] + self.tokenizer.eos_token
return res
def convert_sft(self,df):
#collect all the story id in the pair
story_ids = list(set(df['story1_id'].values) | set(df['story2_id'].values))
#now make new train and test set as story_ids as story1_id and story2_id
df = pd.DataFrame()
df['story1_id'] = story_ids
df['story2_id'] = df['story1_id']
#reload stories
#self.stories = self.load_stories(self.datapath)
# get prompt_id from the pair
def get_prompt_id(x):
return self.stories[self.stories['story_id'] == x]['prompt_id'].values[0]
df['prompt_id'] = df['story1_id'].apply(lambda x: get_prompt_id(x))
return df
def make_dataset(self):
# reset the index
self.train.reset_index(drop=True, inplace=True)
self.test.reset_index(drop=True, inplace=True)
entries = []
if self.task == 'rm':
entries = ['chosen_text', 'rejected_text']
elif self.task == 'dpo':
entries = ['prompt', 'chosen', 'rejected']
elif self.task == 'sft':
self.train = self.convert_sft(self.train)
self.test = self.convert_sft(self.test)
entries = ['text']
print('the columns of train is ', self.train.columns)
for index, row in self.train.iterrows():
res = self.apply_template_to_text(row)
for e in entries:
self.train.at[index, e] = res[e]
for index, row in self.test.iterrows():
res = self.apply_template_to_text(row)
for e in entries:
self.test.at[index, e] = res[e]
print('the first example of train is ', self.train.iloc[0])
#since the we aggred on max_len = 8192, we need to filter this
if self.margin:
entries.append('margin')
train_dataset = Dataset.from_pandas(self.train[entries])
test_dataset = Dataset.from_pandas(self.test[entries])
return DatasetDict({'train': train_dataset, 'test': test_dataset})
def save_dataset(self, path):
'''
save the dataset to the readsy folder
:param path:
:return:
'''
self.dataset.save_to_disk('../' + path)