|
import os |
|
import time |
|
from datetime import datetime, timedelta |
|
|
|
import pandas as pd |
|
import schedule |
|
from datasets import DatasetDict, load_dataset, Dataset |
|
from huggingface_hub import login |
|
|
|
from utilities.data_collator import merge_and_filter_data |
|
from utilities.my_logger import setup_logger |
|
from utilities.readme_update import update_readme |
|
|
|
|
|
subreddit = os.environ["SUBREDDIT"] |
|
username = os.environ["USERNAME"] |
|
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}" |
|
dataset_readme_path = "README.md" |
|
|
|
|
|
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"] |
|
login(auth_token, add_to_git_credential=True) |
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
def get_dataset(): |
|
|
|
try: |
|
dataset = load_dataset(dataset_name, download_mode="reuse_cache_if_exists", ignore_verifications=True) |
|
logger.debug("Loading existing dataset") |
|
if "__index_level_0__" in dataset["all_days"].column_names: |
|
dataset = dataset.remove_columns(["__index_level_0__"]) |
|
except FileNotFoundError: |
|
logger.warning("Creating new dataset") |
|
dataset = DatasetDict() |
|
return dataset |
|
|
|
|
|
def main(): |
|
date = datetime.now().strftime('%Y-%m-%d') |
|
logger.warning(f"Running main function for date: {date}") |
|
dataset = get_dataset() |
|
|
|
|
|
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame() |
|
new_df = merge_and_filter_data(old_df=old_df) |
|
dataset['train'] = Dataset.from_pandas(new_df, preserve_index=False) |
|
|
|
|
|
update_readme(dataset_name=dataset_name, subreddit=subreddit, latest_date=date) |
|
|
|
|
|
logger.debug(f"Pushing data for {date} to the Hugging Face hub") |
|
dataset.push_to_hub(dataset_name, token=auth_token) |
|
logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub") |
|
|
|
|
|
def schedule_daily_task(): |
|
""" |
|
Schedule the daily_task to run at the specific time every day. |
|
""" |
|
start_time = (datetime.now() + timedelta(seconds=5)).time().strftime('%H:%M') |
|
logger.info(f'Scheduling tasks to run every day at: {start_time}') |
|
main() |
|
schedule.every().day.at(start_time).do(main) |
|
|
|
while True: |
|
schedule.run_pending() |
|
time.sleep(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
schedule_daily_task() |
|
|