File size: 3,290 Bytes
749d1d8
 
5d9e0b8
749d1d8
 
285612d
61f9cd0
749d1d8
67a3546
 
 
 
 
 
 
 
285612d
5ec6657
749d1d8
 
 
32235fd
8ddb605
67a3546
 
32235fd
749d1d8
5d9e0b8
fe10af2
5d9e0b8
 
749d1d8
47ad458
749d1d8
 
 
67a3546
285612d
a40bda5
67a3546
513505c
 
 
 
 
 
 
 
 
 
99ec3d4
e77b07f
5d9e0b8
285612d
 
b65cbe6
285612d
 
67a3546
99ec3d4
67a3546
 
5ec6657
285612d
 
67a3546
 
 
 
 
 
 
 
d08f251
 
 
 
67a3546
 
 
 
5d9e0b8
285612d
5d9e0b8
285612d
5d9e0b8
 
 
 
 
 
e69ab4e
3976b91
 
 
285612d
 
 
 
 
 
 
5d9e0b8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os
import time
from datetime import datetime

import pandas as pd
import schedule
from datasets import Dataset

from utilities.user_defined_functions import (
    get_latest_data,
    merge_data,
    load_or_create_dataset,
    remove_filtered_rows,
    load_or_create_comment_dataset
)

from utilities.my_logger import setup_logger
from utilities.readme_update import update_dataset_readme

# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/reddit-{subreddit}"
comment_dataset_name = f"{username}/reddit-comments-{subreddit}"

dataset_readme_path = "README.md"

frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly", "custom"]:
    raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")

# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HF_TOKEN"]

logger = setup_logger(__name__)

def upload(new_df, dataset, hf_dataset_name):
    date = datetime.now().strftime('%Y-%m-%d')
    subset = f"year_{datetime.now().year}"
    
    if 'train' in dataset.keys():
        old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
        df = merge_data(old_df=old_df, new_df=new_df)
        new_rows = len(df) - len(old_df)
    # New dataset
    else:
        df = new_df
        df['new'] = True
        df['updated'] = False
        new_rows = len(new_df)

    df = remove_filtered_rows(df)
    dataset['train'] = Dataset.from_pandas(df, preserve_index=False)

    # Update README
    logger.info(f"Adding {new_rows} rows for {date}.")

    # Push the augmented dataset to the Hugging Face hub
    logger.debug(f"Pushing data for {date} to {hf_dataset_name}")
    dataset.push_to_hub(hf_dataset_name, subset, token=auth_token)
    logger.info(f"Processed and pushed data for {date} to {hf_dataset_name}")
    update_dataset_readme(dataset_name=hf_dataset_name, subreddit=subreddit, new_rows=new_rows)
    logger.info(f"Updated README.")


def main():
    date = datetime.now().strftime('%Y-%m-%d')
    
    logger.warning(f"Running main function for date: {date}")
    sub_dataset = load_or_create_dataset()
    new_df, new_df_comment = get_latest_data()
    
    upload(new_df, sub_dataset, dataset_name)
    del sub_dataset, new_df
    import gc
    gc.collect()

    comment_dataset = load_or_create_comment_dataset()
    upload(new_df_comment, comment_dataset, comment_dataset_name)


def schedule_periodic_task():
    """
    Schedule the main task to run at the user-defined frequency
    """
    if frequency == 'hourly':
        logger.info(f'Scheduling tasks to run every hour at the top of the hour')
        schedule.every().hour.at(":00").do(main)
    elif frequency == 'daily':
        start_time = '05:00'
        logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00')
        schedule.every().day.at(start_time).do(main)
    elif frequency == 'custom':
        logger.info(f'Scheduling tasks to run every 3 hours at the top of the hour')
        schedule.every(3).hours.at(":00").do(main)

    while True:
        schedule.run_pending()
        time.sleep(1)


if __name__ == "__main__":
    schedule_periodic_task()