import functools
import traceback
import gradio as gr
import bittensor as bt
from typing import Dict, List, Any, Optional, Tuple
from bittensor.extrinsics.serving import get_metadata
from dataclasses import dataclass
import wandb
import math
import os
import datetime
import time
import json
import pandas as pd
from dotenv import load_dotenv
from huggingface_hub import HfApi
from apscheduler.schedulers.background import BackgroundScheduler
import pandas as pd
load_dotenv()
FONT = (
""""""
)
TITLE = """
Subnet 9 Leaderboard
"""
HEADER = """
Subnet 9 is a Bittensor subnet that rewards miners for producing pretrained Foundation-Models on the Falcon Refined Web dataset. It acts like a continuous benchmark whereby miners are rewarded for attaining the best losses on randomly sampled pages of Falcon. The models with the best head-to-head loss on the evaluation data receive a steady emission of TAO."""
EVALUATION_DETAILS = """
Name: the 🤗 Hugging Face model name (click to go to the model card)
Rewards / Day: the expected rewards per day based on current ranking.
Last Average Loss: the last loss value on the evaluation data for the model as calculated by a validator (lower is better)
UID: the Bittensor UID of the miner
Block: the Bittensor block that the model was submitted in
More stats on taostats."""
EVALUATION_HEADER = """
Shows the latest internal evaluation statistics as calculated by the Opentensor validator
"""
VALIDATOR_WANDB_PROJECT = "opentensor-dev/pretraining-subnet"
BENCHMARK_WANDB_PROJECT = "raofoundation/pretraining-leaderboard-data"
H4_TOKEN = os.environ.get("H4_TOKEN", None)
API = HfApi(token=H4_TOKEN)
WANDB_TOKEN = os.environ.get("WANDB_API_KEY", None)
REPO_ID = "RaoFoundation/pretraining-leaderboard"
MAX_AVG_LOSS_POINTS = 1
RETRIES = 5
DELAY_SECS = 3
NETUID = 9
SECONDS_PER_BLOCK = 12
@dataclass
class ModelData:
uid: int
hotkey: str
namespace: str
name: str
commit: str
hash: str
block: int
incentive: float
emission: float
@classmethod
def from_compressed_str(
cls,
uid: int,
hotkey: str,
cs: str,
block: int,
incentive: float,
emission: float,
):
"""Returns an instance of this class from a compressed string representation"""
tokens = cs.split(":")
return ModelData(
uid=uid,
hotkey=hotkey,
namespace=tokens[0],
name=tokens[1],
commit=tokens[2] if tokens[2] != "None" else None,
hash=tokens[3] if tokens[3] != "None" else None,
block=block,
incentive=incentive,
emission=emission,
)
def run_with_retries(func, *args, **kwargs):
for i in range(0, RETRIES):
try:
return func(*args, **kwargs)
except (Exception, RuntimeError):
if i == RETRIES - 1:
raise
time.sleep(DELAY_SECS)
raise RuntimeError("Should never happen")
def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]:
def _internal() -> Tuple[bt.subtensor, bt.metagraph]:
subtensor = bt.subtensor("finney")
metagraph = bt.metagraph(NETUID, lite=False)
return subtensor, metagraph
return run_with_retries(_internal)
def get_validator_weights(
metagraph: bt.metagraph,
) -> Dict[int, Tuple[float, int, Dict[int, float]]]:
"""Returns a dictionary of validator UIDs to (vtrust, stake, {uid: weight})."""
ret = {}
for uid in metagraph.uids.tolist():
vtrust = metagraph.validator_trust[uid].item()
if vtrust > 0:
ret[uid] = (vtrust, metagraph.S[uid].item(), {})
for ouid in metagraph.uids.tolist():
if ouid == uid:
continue
weight = round(metagraph.weights[uid][ouid].item(), 4)
if weight > 0:
ret[uid][-1][ouid] = weight
return ret
def get_subnet_data(
subtensor: bt.subtensor, metagraph: bt.metagraph
) -> List[ModelData]:
result = []
for uid in metagraph.uids.tolist():
hotkey = metagraph.hotkeys[uid]
metadata = None
try:
metadata = run_with_retries(
functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey)
)
except:
print(f"Failed to get metadata for UID {uid}: {traceback.format_exc()}")
if not metadata:
continue
commitment = metadata["info"]["fields"][0]
hex_data = commitment[list(commitment.keys())[0]][2:]
chain_str = bytes.fromhex(hex_data).decode()
block = metadata["block"]
incentive = metagraph.incentive[uid].nan_to_num().item()
emission = (
metagraph.emission[uid].nan_to_num().item() * 20
) # convert to daily TAO
model_data = None
try:
model_data = ModelData.from_compressed_str(
uid, hotkey, chain_str, block, incentive, emission
)
except:
continue
result.append(model_data)
return result
def is_floatable(x) -> bool:
return (
isinstance(x, float) and not math.isnan(x) and not math.isinf(x)
) or isinstance(x, int)
def get_wandb_runs(project: str, filters: Dict[str, Any]) -> List:
"""Get the latest runs from Wandb, retrying infinitely until we get them."""
while True:
api = wandb.Api(api_key=WANDB_TOKEN)
runs = list(
api.runs(
project,
filters=filters,
)
)
if len(runs) > 0:
return runs
# WandDB API is quite unreliable. Wait another minute and try again.
print("Failed to get runs from Wandb. Trying again in 60 seconds.")
time.sleep(60)
def get_scores(
uids: List[int],
wandb_runs: List,
) -> Dict[int, Dict[str, Optional[float]]]:
result = {}
previous_timestamp = None
# Iterate through the runs until we've processed all the uids.
for i, run in enumerate(wandb_runs):
if not "original_format_json" in run.summary:
continue
data = json.loads(run.summary["original_format_json"])
all_uid_data = data["uid_data"]
timestamp = data["timestamp"]
# Make sure runs are indeed in descending time order.
assert (
previous_timestamp is None or timestamp < previous_timestamp
), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}"
previous_timestamp = timestamp
for uid in uids:
if uid in result:
continue
if str(uid) in all_uid_data:
uid_data = all_uid_data[str(uid)]
# Only the most recent run is fresh.
is_fresh = i == 0
result[uid] = {
"avg_loss": uid_data.get("average_loss", None),
"win_rate": uid_data.get("win_rate", None),
"win_total": uid_data.get("win_total", None),
"weight": uid_data.get("weight", None),
"fresh": is_fresh,
}
if len(result) == len(uids):
break
return result
def get_losses_over_time(wandb_runs: List) -> pd.DataFrame:
"""Returns a dataframe of the best average model loss over time."""
timestamps = []
best_losses = []
for run in wandb_runs:
if "original_format_json" not in run.summary:
continue
data = json.loads(run.summary["original_format_json"])
all_uid_data = data["uid_data"]
timestamp = datetime.datetime.fromtimestamp(data["timestamp"])
best_loss = math.inf
for _, uid_data in all_uid_data.items():
loss = uid_data.get("average_loss", math.inf)
# Filter out the numbers from the exploit.
if loss < best_loss and (loss > 2.5 or timestamp > datetime.datetime(2024,2,8)):
best_loss = uid_data["average_loss"]
if best_loss != math.inf:
timestamps.append(timestamp)
best_losses.append(best_loss)
return pd.DataFrame({"timestamp": timestamps, "best_loss": best_losses})
def format_score(uid: int, scores, key) -> Optional[float]:
if uid in scores:
if key in scores[uid]:
point = scores[uid][key]
if is_floatable(point):
return round(scores[uid][key], 4)
return None
def next_epoch(subtensor: bt.subtensor, block: int) -> int:
return (
block
+ subtensor.get_subnet_hyperparameters(NETUID).tempo
- subtensor.blocks_since_epoch(NETUID, block)
)
def get_next_update_div(current_block: int, next_update_block: int) -> str:
now = datetime.datetime.now()
blocks_to_go = next_update_block - current_block
next_update_time = now + datetime.timedelta(
seconds=blocks_to_go * SECONDS_PER_BLOCK
)
delta = next_update_time - now
return f"""
Next reward update: {blocks_to_go} blocks (~{int(delta.total_seconds() // 60)} minutes)
Last Updated: {datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")} (UTC)
"""
def leaderboard_data(
leaderboard: List[ModelData],
scores: Dict[int, Dict[str, Optional[float]]],
show_stale: bool,
) -> List[List[Any]]:
"""Returns the leaderboard data, based on models data and UID scores."""
return [
[
f"[{c.namespace}/{c.name} ({c.commit[0:8]})](https://huggingface.co/{c.namespace}/{c.name}/commit/{c.commit})",
format_score(c.uid, scores, "win_rate"),
format_score(c.uid, scores, "avg_loss"),
format_score(c.uid, scores, "weight"),
c.uid,
c.block,
]
for c in leaderboard
if (c.uid in scores and scores[c.uid]["fresh"]) or show_stale
]
def get_benchmarks() -> Tuple[pd.DataFrame, datetime.datetime]:
"""Returns the latest benchmarks and the time they were run."""
runs = get_wandb_runs(project=BENCHMARK_WANDB_PROJECT, filters=None)
for run in runs:
artifacts = list(run.logged_artifacts())
if artifacts:
table = artifacts[-1].get("benchmarks")
if table:
return table.get_dataframe(), datetime.datetime.strptime(run.metadata["startedAt"], "%Y-%m-%dT%H:%M:%S.%f")
bt.logging.error("Failed to get benchmarks from Wandb.")
return None, None
def restart_space():
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN)
def main():
# To avoid leaderboard failures, infinitely try until we get all data
# needed to populate the dashboard
while True:
try:
subtensor, metagraph = get_subtensor_and_metagraph()
model_data: List[ModelData] = get_subnet_data(subtensor, metagraph)
model_data.sort(key=lambda x: x.incentive, reverse=True)
vali_runs = get_wandb_runs(project=VALIDATOR_WANDB_PROJECT, filters={"config.type": "validator", "config.uid": 238})
scores = get_scores([x.uid for x in model_data], vali_runs)
current_block = metagraph.block.item()
next_epoch_block = next_epoch(subtensor, current_block)
validator_df = get_validator_weights(metagraph)
weight_keys = set()
for uid, stats in validator_df.items():
weight_keys.update(stats[-1].keys())
benchmarks, benchmark_timestamp = get_benchmarks()
break
except Exception as e:
print(f"Failed to get data: {e}")
time.sleep(30)
demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}")
with demo:
gr.HTML(FONT)
gr.HTML(TITLE)
gr.HTML(HEADER)
gr.HTML(value=get_next_update_div(current_block, next_epoch_block))
gr.Label(
value={
f"{c.namespace}/{c.name} ({c.commit[0:8]}) · (τ{round(c.emission, 2):,})": c.incentive
for c in model_data
if c.incentive
},
num_top_classes=10,
)
if benchmarks is not None:
with gr.Accordion("Top Model Benchmarks"):
gr.components.Dataframe(benchmarks)
gr.HTML("""
PPL computed using a stride of 512. See here for the full code.
""")
gr.HTML(f"""
Last Updated: {benchmark_timestamp.strftime("%Y-%m-%d %H:%M:%S")} (UTC)
""")
with gr.Accordion("Evaluation Stats"):
gr.HTML(EVALUATION_HEADER)
show_stale = gr.Checkbox(label="Show Stale", interactive=True)
leaderboard_table = gr.components.Dataframe(
value=leaderboard_data(model_data, scores, show_stale.value),
headers=["Name", "Win Rate", "Average Loss", "Weight", "UID", "Block"],
datatype=["markdown", "number", "number", "number", "number", "number"],
elem_id="leaderboard-table",
interactive=False,
visible=True,
)
gr.HTML(EVALUATION_DETAILS)
show_stale.change(
lambda stale: leaderboard_data(model_data, scores, stale),
inputs=[show_stale],
outputs=leaderboard_table,
)
gr.LinePlot(
get_losses_over_time(vali_runs),
x="timestamp",
x_title="Date",
y="best_loss",
y_title="Average Loss",
tooltip="best_loss",
interactive=True,
visible=True,
width=1024,
title="Best Average Loss Over Time",
)
with gr.Accordion("Validator Stats"):
gr.components.Dataframe(
value=[
[uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)]
+ [
validator_df[uid][-1].get(c.uid)
for c in model_data
if c.incentive
]
for uid, _ in sorted(
zip(
validator_df.keys(),
[validator_df[x][1] for x in validator_df.keys()],
),
key=lambda x: x[1],
reverse=True,
)
],
headers=["UID", "Stake (Ï„)", "V-Trust"]
+ [
f"{c.namespace}/{c.name} ({c.commit[0:8]})"
for c in model_data
if c.incentive
],
datatype=["number", "number", "number"]
+ ["number" for c in model_data if c.incentive],
interactive=False,
visible=True,
)
gr.HTML(value=get_last_updated_div())
scheduler = BackgroundScheduler()
scheduler.add_job(
restart_space, "interval", seconds=60 * 30
) # restart every 15 minutes
scheduler.start()
demo.launch()
main()