File size: 2,497 Bytes
749d1d8 5d9e0b8 749d1d8 285612d 61f9cd0 749d1d8 61f9cd0 285612d ed3130d 749d1d8 32235fd 285612d 32235fd 749d1d8 5d9e0b8 749d1d8 285612d 61f9cd0 285612d 5d9e0b8 bcf2055 5d9e0b8 bcf2055 5d9e0b8 bcf2055 5d9e0b8 285612d cdbb4c0 b65cbe6 285612d 5d9e0b8 285612d 5d9e0b8 285612d 5d9e0b8 285612d 5d9e0b8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
import os
import time
from datetime import datetime
import pandas as pd
import schedule
from datasets import Dataset
from utilities.user_defined_functions import get_latest_data, merge_data, load_or_create_dataset
from utilities.my_logger import setup_logger
from utilities.readme_update import update_readme
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}"
dataset_readme_path = "README.md"
frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly"]:
raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
logger = setup_logger(__name__)
def main():
date = datetime.now().strftime('%Y-%m-%d')
logger.warning(f"Running main function for date: {date}")
dataset = load_or_create_dataset()
new_df = get_latest_data()
# Using dataset from hub
if 'train' in dataset.keys():
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
df = merge_data(old_df=old_df, new_df=new_df)
new_rows = len(df) - len(old_df)
# New dataset
else:
df = new_df
new_rows = len(new_df)
dataset['train'] = Dataset.from_pandas(df, preserve_index=False)
# Update README
update_readme(dataset_name=dataset_name, subreddit=subreddit, latest_date=date, new_rows=new_rows)
logger.info(f"Adding {new_rows} rows for {date}.")
# Push the augmented dataset to the Hugging Face hub
logger.debug(f"Pushing data for {date} to the Hugging Face hub")
dataset.push_to_hub(dataset_name, token=auth_token)
logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub")
def schedule_periodic_task():
"""
Schedule the main task to run at the user-defined frequency
"""
if frequency == 'hourly':
logger.info(f'Scheduling tasks to run every hour at the top of the hour')
schedule.every().hour.at(":00").do(main)
elif frequency == 'daily':
start_time = '05:00'
logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00')
schedule.every().day.at(start_time).do(main)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
schedule_periodic_task()
|