import os import time from datetime import datetime, timedelta import pandas as pd import schedule from datasets import DatasetDict, load_dataset, Dataset from huggingface_hub import login from utilities.data_collator import merge_and_filter_data from utilities.my_logger import setup_logger from utilities.readme_update import update_readme # Set dataset name, path to README.md, and existing dataset details subreddit = os.environ["SUBREDDIT"] username = os.environ["USERNAME"] dataset_name = f"{username}/dataset-creator-reddit-{subreddit}" dataset_readme_path = "README.md" # Authenticate with Hugging Face using an auth token auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"] login(auth_token, add_to_git_credential=True) logger = setup_logger(__name__) def get_dataset(): # Load the existing dataset from the Hugging Face hub or create a new one try: dataset = load_dataset(dataset_name, download_mode="reuse_cache_if_exists", ignore_verifications=True) logger.debug("Loading existing dataset") if "__index_level_0__" in dataset["train"].column_names: dataset = dataset.remove_columns(["__index_level_0__"]) except FileNotFoundError: logger.warning("Creating new dataset") dataset = DatasetDict() return dataset def main(): date = datetime.now().strftime('%Y-%m-%d') logger.warning(f"Running main function for date: {date}") dataset = get_dataset() # Get Latest Data and merge with historic data old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame() new_df = merge_and_filter_data(old_df=old_df) dataset['train'] = Dataset.from_pandas(new_df, preserve_index=False) # Update README new_rows = len(new_df) - len(old_df) update_readme(dataset_name=dataset_name, subreddit=subreddit, latest_date=date, new_rows=new_rows) logger.info(f"Adding {new_rows} rows for {date}.") # Push the augmented dataset to the Hugging Face hub logger.debug(f"Pushing data for {date} to the Hugging Face hub") dataset.push_to_hub(dataset_name, token=auth_token) logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub") def schedule_daily_task(): """ Schedule the daily_task to run at the specific time every day. """ # start_time = (datetime.now() + timedelta(minutes=1)).time().strftime('%H:%M') # Now + 30 seconds start_time = '05:00' logger.info(f'Scheduling tasks to run every day at: {start_time}') schedule.every().day.at(start_time).do(main) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": schedule_daily_task()