|
import hvplot.streamz |
|
import pandas as pd |
|
import numpy as np |
|
from streamz import Stream |
|
from streamz.dataframe import DataFrame |
|
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message |
|
import datetime |
|
import queue |
|
import threading |
|
import time |
|
import os |
|
import json |
|
from huggingface_hub import CommitScheduler, HfApi, hf_hub_download |
|
import uuid |
|
from pathlib import Path |
|
import panel as pn |
|
|
|
|
|
pn.extension(design="material") |
|
|
|
post_queue = queue.Queue() |
|
|
|
|
|
post_count = 0 |
|
|
|
|
|
stream = Stream() |
|
|
|
time.sleep(1) |
|
example = pd.DataFrame( |
|
{"timestamp": [pd.Timestamp.now()], "post_count": [post_count]}, index=[0] |
|
) |
|
df = DataFrame(stream, example=example) |
|
|
|
|
|
MONTH_IN_SECONDS = 31 * 24 * 60 * 60 |
|
|
|
|
|
REPO_ID = os.getenv("HF_REPO_ID", "davanstrien/bluesky-counts") |
|
REPO_TYPE = os.getenv("HF_REPO_TYPE", "dataset") |
|
HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN") |
|
DATA_FOLDER = Path("bluesky_data") |
|
DATA_FILE = f"bluesky_counts_{uuid.uuid4()}.json" |
|
|
|
|
|
def load_hub_data(): |
|
"""Load the most recent data from the Hub""" |
|
try: |
|
api = HfApi(token=HF_TOKEN) |
|
|
|
files = api.list_repo_files(REPO_ID, repo_type=REPO_TYPE) |
|
data_files = [f for f in files if f.startswith("data/bluesky_counts_")] |
|
|
|
if not data_files: |
|
return [] |
|
|
|
|
|
latest_file = sorted(data_files)[-1] |
|
|
|
local_path = hf_hub_download( |
|
repo_id=REPO_ID, filename=latest_file, repo_type=REPO_TYPE, token=HF_TOKEN |
|
) |
|
|
|
|
|
data = [] |
|
with open(local_path, "r") as f: |
|
data.extend(json.loads(line.strip()) for line in f) |
|
|
|
return data[-MONTH_IN_SECONDS:] |
|
except Exception as e: |
|
print(f"Error loading data from Hub: {e}") |
|
return [] |
|
|
|
|
|
|
|
DATA_FOLDER.mkdir(exist_ok=True) |
|
scheduler = CommitScheduler( |
|
repo_id=REPO_ID, |
|
repo_type=REPO_TYPE, |
|
folder_path=DATA_FOLDER, |
|
path_in_repo="data", |
|
every=600, |
|
token=HF_TOKEN, |
|
) |
|
|
|
|
|
def on_message_handler(message): |
|
global post_count |
|
commit = parse_subscribe_repos_message(message) |
|
|
|
if hasattr(commit, "ops"): |
|
for op in commit.ops: |
|
if op.action == "create" and "app.bsky.feed.post" in op.path: |
|
post_count += 1 |
|
|
|
|
|
def emit_counts(): |
|
"""Emit post counts every second""" |
|
global post_count |
|
|
|
if saved_data := load_hub_data(): |
|
print(f"Loaded {len(saved_data)} historical data points from Hub") |
|
|
|
for point in saved_data[-100:]: |
|
df = pd.DataFrame( |
|
{ |
|
"timestamp": [pd.Timestamp(point["timestamp"])], |
|
"post_count": [point["post_count"]], |
|
} |
|
) |
|
stream.emit(df) |
|
|
|
|
|
time.sleep(1) |
|
|
|
while True: |
|
|
|
now = pd.Timestamp.now() |
|
df = pd.DataFrame({"timestamp": [now], "post_count": [post_count]}) |
|
stream.emit(df) |
|
|
|
|
|
post_count = 0 |
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
plot = df.hvplot.line( |
|
"timestamp", |
|
"post_count", |
|
title="Bluesky Posts per Second", |
|
width=800, |
|
height=400, |
|
backlog=MONTH_IN_SECONDS, |
|
) |
|
|
|
|
|
|
|
def run_firehose(): |
|
client = FirehoseSubscribeReposClient() |
|
client.start(on_message_handler) |
|
|
|
|
|
firehose_thread = threading.Thread(target=run_firehose) |
|
firehose_thread.daemon = True |
|
firehose_thread.start() |
|
|
|
|
|
emit_thread = threading.Thread(target=emit_counts) |
|
emit_thread.daemon = True |
|
emit_thread.start() |
|
|
|
|
|
if __name__ == "__main__": |
|
import panel as pn |
|
|
|
pn.extension() |
|
dashboard = pn.Column(pn.pane.HoloViews(plot)) |
|
|
|
pn.serve( |
|
dashboard, |
|
address="0.0.0.0", |
|
port=7860, |
|
allow_websocket_origin=["*"], |
|
show=False, |
|
) |
|
|