dataset-creator-reddit-uwaterloo / utilities /user_defined_functions.py
alvanlii's picture
Update utilities/user_defined_functions.py
54f8bcb verified
import os
from datetime import datetime
import json
import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset, DownloadMode
from huggingface_hub import login
from utilities.data_processing import data_processing
from utilities.my_logger import setup_logger
from utilities.praw_downloader import praw_downloader
from utilities.praw_processor import preprocess_praw_data, preprocess_praw_comment_data
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/reddit-{subreddit}"
comment_dataset_name = f"{username}/reddit-comments-{subreddit}"
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HF_TOKEN"]
login(auth_token, add_to_git_credential=True)
logger = setup_logger(__name__)
# Dummy row for when we create a new repo make sure to put everything in a list
dummy_data = {
"id": ['id'],
"content": ["This is a sample post content. Just for demonstration purposes!"],
"poster": ["sampleUser123"],
"date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')],
"flair": ["Discussion"],
"title": ["Sample Post Title: How to Use Hugging Face?"],
"score": [457],
"permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"],
"updated": [False],
"new": [False],
"nsfw": [False]
}
dummy_comment_data = {
"id": ['id'],
"content": ["This is a sample post content. Just for demonstration purposes!"],
"poster": ["sampleUser123"],
"date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')],
"flair": ["Discussion"],
"title": ["Sample Post Title: How to Use Hugging Face?"],
"ups": [457],
"score": [457],
"permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"],
"updated": [False],
"new": [False],
"depth": [2],
"link_id": ["eqrkhgbjeh"],
"parent_id": ["eqrkhgbjeh"]
}
def load_or_create_dataset():
"""
Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist.
This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found,
it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it.
After reloading, the dummy data is removed from the dataset.
Returns:
dataset (DatasetDict): The loaded or newly created dataset.
Raises:
FileNotFoundError: If the dataset cannot be loaded or created.
"""
subset = f"year_{datetime.now().year}"
# Load the existing dataset from the Hugging Face hub or create a new one
try:
logger.debug(f"Trying to download {dataset_name}")
dataset = load_dataset(dataset_name, subset, download_mode=DownloadMode.FORCE_REDOWNLOAD)
logger.debug("Loading existing dataset")
except FileNotFoundError:
logger.warning("Creating new dataset")
# Creating Initial Repo
dataset = DatasetDict()
dataset['train'] = Dataset.from_dict(dummy_data)
dataset.push_to_hub(dataset_name, subset, token=auth_token)
# Pulling from Initial Repo
dataset = load_dataset(dataset_name, subset)
# Remove dummy data
del dataset['train']
return dataset
def load_or_create_comment_dataset():
subset = f"year_{datetime.now().year}"
# Load the existing dataset from the Hugging Face hub or create a new one
try:
logger.debug(f"Trying to download {comment_dataset_name}")
dataset = load_dataset(comment_dataset_name, subset, download_mode=DownloadMode.FORCE_REDOWNLOAD)
logger.debug("Loading existing comment dataset")
except (ValueError, FileNotFoundError):
logger.warning("Creating new comment dataset")
# Creating Initial Repo
dataset = DatasetDict()
dataset['train'] = Dataset.from_dict(dummy_comment_data)
dataset.push_to_hub(comment_dataset_name, subset, token=auth_token)
# Pulling from Initial Repo
dataset = load_dataset(comment_dataset_name, subset)
# Remove dummy data
del dataset['train']
return dataset
def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame:
"""
Merges two dataframes, sorts them by 'date_utc', and marks new IDs.
The function first marks rows from the new dataframe, then concatenates the old and new dataframes.
It sorts the resulting dataframe by the 'date_utc' column. Rows from the new dataframe that are not
in the old dataframe are marked as 'new'.
Args:
- old_df (pd.DataFrame): The original dataframe.
- new_df (pd.DataFrame): The new dataframe to be merged with the original dataframe.
Returns:
- pd.DataFrame: The merged, sorted, and marked dataframe.
"""
old_df.drop(columns=['new', 'updated'], inplace=True)
# Concatenate old and new dataframes, sort by 'date_utc', and reset index
df = pd.concat([old_df, new_df], ignore_index=True).sort_values(by='date_utc').reset_index(drop=True)
# Process data accordingly
df = data_processing(df)
# Identify new rows (present in new_df but not in old_df)
df['new'] = df['id'].apply(lambda x: x in set(new_df['id']) - set(old_df['id']))
return df
def remove_filtered_rows(df: pd.DataFrame) -> pd.DataFrame:
"""
Removes rows from the DataFrame where the 'id' is present in filter_ids.json.
:param df: Input DataFrame to be filtered.
:return: DataFrame with rows containing IDs present in filter_ids.json removed.
"""
# Load filter IDs from JSON file
with open('filter_ids.json', 'r') as file:
filter_ids = json.load(file)
# Remove the rows with IDs present in filter_ids
filtered_df = df[~df['id'].isin(filter_ids)]
logger.info(f"Filtered {len(df) - len(filtered_df)} rows from the DataFrame")
return filtered_df
def get_latest_data():
submissions, comments = praw_downloader()
df = preprocess_praw_data(submissions=submissions)
df_comments = preprocess_praw_comment_data(comments=comments)
return df, df_comments