from datetime import datetime import pandas as pd from datasets import Dataset, DatasetDict, load_dataset from main import auth_token, dataset_name, logger from utilities.data_processing import data_processing from utilities.praw_downloader import praw_downloader from utilities.praw_processor import preprocess_praw_data # Dummy row for when we create a new repo dummy_data = { "id": ['id'], "content": ["This is a sample post content. Just for demonstration purposes!"], "poster": ["sampleUser123"], "date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')], "flair": ["Discussion"], "title": ["Sample Post Title: How to Use Hugging Face?"], "score": [457], "permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"], "updated": False, "new": False, } def load_or_create_dataset(): """ Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist. This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found, it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it. After reloading, the dummy data is removed from the dataset. Returns: dataset (DatasetDict): The loaded or newly created dataset. Raises: FileNotFoundError: If the dataset cannot be loaded or created. """ # Load the existing dataset from the Hugging Face hub or create a new one try: dataset = load_dataset(dataset_name) logger.debug("Loading existing dataset") except FileNotFoundError: logger.warning("Creating new dataset") # Creating Initial Repo dataset = DatasetDict() dataset['train'] = Dataset.from_dict(dummy_data) dataset.push_to_hub(repo_id=dataset_name, token=auth_token) # Pulling from Initial Repo dataset = load_dataset(dataset_name) # Remove dummy data del dataset['train'] return dataset def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame: """ Merges two dataframes, sorts them by 'date_utc', and marks new IDs. The function first marks rows from the new dataframe, then concatenates the old and new dataframes. It sorts the resulting dataframe by the 'date_utc' column. Rows from the new dataframe that are not in the old dataframe are marked as 'new'. Args: - old_df (pd.DataFrame): The original dataframe. - new_df (pd.DataFrame): The new dataframe to be merged with the original dataframe. Returns: - pd.DataFrame: The merged, sorted, and marked dataframe. """ # Mark rows in old and new dataframes old_df['new'] = False new_df['new'] = True # Concatenate old and new dataframes, sort by 'date_utc', and reset index df = pd.concat([old_df, new_df], ignore_index=True).sort_values(by='date_utc').reset_index(drop=True) # Process data accordingly df = data_processing(df) # Identify new rows (present in new_df but not in old_df) df['new'] = df['new'] & ~df['id'].duplicated(keep=False) return df def get_latest_data(): submissions = praw_downloader() df = preprocess_praw_data(submissions=submissions) return df