File size: 6,226 Bytes
1756d68 2703fdd e77b07f 2703fdd e69ab4e 1756d68 2703fdd 1756d68 2703fdd 67a3546 2703fdd 1756d68 ce087e5 67a3546 a40bda5 1756d68 47ad458 1756d68 61f9cd0 2703fdd 61f9cd0 83f6dc4 2703fdd 67a3546 99ec3d4 67a3546 2703fdd a40bda5 2703fdd 0f72ff6 99ec3d4 2703fdd 99ec3d4 2703fdd 99ec3d4 2703fdd 67a3546 a40bda5 67a3546 99ec3d4 67a3546 59127f7 67a3546 99ec3d4 67a3546 99ec3d4 67a3546 2703fdd 61f9cd0 2703fdd 61f9cd0 2703fdd e77b07f 2703fdd 67a3546 2703fdd 67a3546 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
import os
from datetime import datetime
import json
import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset, DownloadMode
from huggingface_hub import login
from utilities.data_processing import data_processing
from utilities.my_logger import setup_logger
from utilities.praw_downloader import praw_downloader
from utilities.praw_processor import preprocess_praw_data, preprocess_praw_comment_data
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/reddit-{subreddit}"
comment_dataset_name = f"{username}/reddit-comments-{subreddit}"
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HF_TOKEN"]
login(auth_token, add_to_git_credential=True)
logger = setup_logger(__name__)
# Dummy row for when we create a new repo make sure to put everything in a list
dummy_data = {
"id": ['id'],
"content": ["This is a sample post content. Just for demonstration purposes!"],
"poster": ["sampleUser123"],
"date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')],
"flair": ["Discussion"],
"title": ["Sample Post Title: How to Use Hugging Face?"],
"score": [457],
"permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"],
"updated": [False],
"new": [False],
"nsfw": [False]
}
dummy_comment_data = {
"id": ['id'],
"content": ["This is a sample post content. Just for demonstration purposes!"],
"poster": ["sampleUser123"],
"date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')],
"flair": ["Discussion"],
"title": ["Sample Post Title: How to Use Hugging Face?"],
"ups": [457],
"score": [457],
"permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"],
"updated": [False],
"new": [False],
"depth": [2],
"link_id": ["eqrkhgbjeh"],
"parent_id": ["eqrkhgbjeh"]
}
def load_or_create_dataset():
"""
Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist.
This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found,
it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it.
After reloading, the dummy data is removed from the dataset.
Returns:
dataset (DatasetDict): The loaded or newly created dataset.
Raises:
FileNotFoundError: If the dataset cannot be loaded or created.
"""
subset = f"year_{datetime.now().year}"
# Load the existing dataset from the Hugging Face hub or create a new one
try:
logger.debug(f"Trying to download {dataset_name}")
dataset = load_dataset(dataset_name, subset, download_mode=DownloadMode.FORCE_REDOWNLOAD)
logger.debug("Loading existing dataset")
except FileNotFoundError:
logger.warning("Creating new dataset")
# Creating Initial Repo
dataset = DatasetDict()
dataset['train'] = Dataset.from_dict(dummy_data)
dataset.push_to_hub(dataset_name, subset, token=auth_token)
# Pulling from Initial Repo
dataset = load_dataset(dataset_name, subset)
# Remove dummy data
del dataset['train']
return dataset
def load_or_create_comment_dataset():
subset = f"year_{datetime.now().year}"
# Load the existing dataset from the Hugging Face hub or create a new one
try:
logger.debug(f"Trying to download {comment_dataset_name}")
dataset = load_dataset(comment_dataset_name, subset, download_mode=DownloadMode.FORCE_REDOWNLOAD)
logger.debug("Loading existing comment dataset")
except (ValueError, FileNotFoundError):
logger.warning("Creating new comment dataset")
# Creating Initial Repo
dataset = DatasetDict()
dataset['train'] = Dataset.from_dict(dummy_comment_data)
dataset.push_to_hub(comment_dataset_name, subset, token=auth_token)
# Pulling from Initial Repo
dataset = load_dataset(comment_dataset_name, subset)
# Remove dummy data
del dataset['train']
return dataset
def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame:
"""
Merges two dataframes, sorts them by 'date_utc', and marks new IDs.
The function first marks rows from the new dataframe, then concatenates the old and new dataframes.
It sorts the resulting dataframe by the 'date_utc' column. Rows from the new dataframe that are not
in the old dataframe are marked as 'new'.
Args:
- old_df (pd.DataFrame): The original dataframe.
- new_df (pd.DataFrame): The new dataframe to be merged with the original dataframe.
Returns:
- pd.DataFrame: The merged, sorted, and marked dataframe.
"""
old_df.drop(columns=['new', 'updated'], inplace=True)
# Concatenate old and new dataframes, sort by 'date_utc', and reset index
df = pd.concat([old_df, new_df], ignore_index=True).sort_values(by='date_utc').reset_index(drop=True)
# Process data accordingly
df = data_processing(df)
# Identify new rows (present in new_df but not in old_df)
df['new'] = df['id'].apply(lambda x: x in set(new_df['id']) - set(old_df['id']))
return df
def remove_filtered_rows(df: pd.DataFrame) -> pd.DataFrame:
"""
Removes rows from the DataFrame where the 'id' is present in filter_ids.json.
:param df: Input DataFrame to be filtered.
:return: DataFrame with rows containing IDs present in filter_ids.json removed.
"""
# Load filter IDs from JSON file
with open('filter_ids.json', 'r') as file:
filter_ids = json.load(file)
# Remove the rows with IDs present in filter_ids
filtered_df = df[~df['id'].isin(filter_ids)]
logger.info(f"Filtered {len(df) - len(filtered_df)} rows from the DataFrame")
return filtered_df
def get_latest_data():
submissions, comments = praw_downloader()
df = preprocess_praw_data(submissions=submissions)
df_comments = preprocess_praw_comment_data(comments=comments)
return df, df_comments
|