derek-thomas's picture
derek-thomas HF staff
Updating README
ed3130d
raw
history blame
4.82 kB
import os
import time
from datetime import datetime, timedelta
import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset
from huggingface_hub import login
from my_logger import setup_logger
from utilities.pushshift_data import scrape_submissions_by_day, submissions_to_dataframe
from utilities.readme_update import update_readme
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-{subreddit}"
dataset_readme_path = "README.md"
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
login(auth_token, add_to_git_credential=True)
logger = setup_logger(__name__)
def main(date_to_fetch):
"""
Runs the main data processing function to fetch and process subreddit data for the specified date.
Args:
date_to_fetch (datetime.date): The date to fetch subreddit data for
Returns:
most_recent_date (str): Most recent date in dataset
"""
# Load the existing dataset from the Hugging Face hub or create a new one
try:
dataset = load_dataset(dataset_name, download_mode="reuse_cache_if_exists", ignore_verifications=True)
logger.debug("Loading existing dataset")
if "__index_level_0__" in dataset["all_days"].column_names:
dataset = dataset.remove_columns(["__index_level_0__"])
except FileNotFoundError:
logger.warning("Creating new dataset")
dataset = DatasetDict()
# Call get_subreddit_day with the calculated date
logger.info(f"Fetching data for {str(date_to_fetch)}")
submissions = scrape_submissions_by_day(subreddit, str(date_to_fetch))
df = submissions_to_dataframe(submissions)
logger.debug(f"Data fetched for {str(date_to_fetch)}")
most_recent_date = date_to_fetch
# Append DataFrame to split 'all_days' or create new split
if "all_days" in dataset:
logger.debug("Appending data to split 'all_days'")
# Merge the new submissions
old_data = dataset['all_days'].to_pandas()
new_data = pd.concat([old_data, df], ignore_index=True)
if '__index_level_0__' in new_data.columns:
new_data = new_data.drop('__index_level_0__', axis=1)
# Drop duplicates just in case
new_data = new_data.drop_duplicates(subset=['id'], keep="first")
# Figure out dates when we restart
old_data_most_recent_date = old_data['date'].max()
most_recent_date = max(old_data_most_recent_date, most_recent_date)
if len(old_data) == len(new_data):
logger.warning("Data in hub is much more recent, using that next!")
return most_recent_date
# Convert back to dataset
dataset["all_days"] = Dataset.from_pandas(new_data)
else:
logger.debug("Creating new split 'all_days'")
dataset["all_days"] = Dataset.from_pandas(df)
# Log appending or creating split 'all'
logger.debug("Appended or created split 'all_days'")
# Push the augmented dataset to the Hugging Face hub
logger.debug(f"Pushing data for {date_to_fetch} to the Hugging Face hub")
readme_text = update_readme(dataset_name, subreddit, date_to_fetch)
dataset.description = readme_text
dataset.push_to_hub(dataset_name, token=auth_token)
logger.info(f"Processed and pushed data for {date_to_fetch} to the Hugging Face Hub")
return most_recent_date
def run_main_continuously():
"""
This function runs the given `main_function` continuously, starting from the date specified
in the environment variable "START_DATE" until two days ago. Once it reaches two days ago,
it will wait until tomorrow to start again at the same time as when it started today.
"""
start_date_str = os.environ.get("START_DATE")
start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()
# Calculate the start time for running the main_function every day.
start_time = datetime.now().time()
while True:
today = datetime.now().date()
two_days_ago = today - timedelta(days=2)
if start_date <= two_days_ago:
logger.warning(f"Running main function for date: {start_date}")
most_recent_date = main(start_date)
start_date = most_recent_date + timedelta(days=1)
else:
tomorrow = today + timedelta(days=1)
now = datetime.now()
start_of_tomorrow = datetime.combine(tomorrow, start_time)
wait_until_tomorrow = (start_of_tomorrow - now).total_seconds()
logger.info(f"Waiting until tomorrow: {wait_until_tomorrow} seconds")
time.sleep(wait_until_tomorrow)
if __name__ == '__main__':
run_main_continuously()