Spaces:
Sleeping
Sleeping
import gradio as gr | |
import bittensor as bt | |
import typing | |
from bittensor.extrinsics.serving import get_metadata | |
from dataclasses import dataclass | |
import datetime | |
import time | |
import functools | |
import multiprocessing | |
from tqdm import tqdm | |
from collections import Counter, defaultdict | |
FONT = """<link href="https://fonts.cdnfonts.com/css/jmh-typewriter" rel="stylesheet">""" | |
TITLE = """<h1 align="center" id="space-title" class="typewriter">Subnet 6 Duplicate Radar</h1>""" | |
REPO_ID = "pawkanarek/sn6_dups" | |
METAGRAPH_RETRIES = 10 | |
METAGRAPH_DELAY_SECS = 30 | |
METADATA_TTL = 10 | |
NETUID = 6 | |
def run_in_subprocess(func: functools.partial, ttl: int) -> typing.Any: | |
"""Runs the provided function on a subprocess with 'ttl' seconds to complete. | |
Args: | |
func (functools.partial): Function to be run. | |
ttl (int): How long to try for in seconds. | |
Returns: | |
Any: The value returned by 'func' | |
""" | |
def wrapped_func(func: functools.partial, queue: multiprocessing.Queue): | |
try: | |
result = func() | |
queue.put(result) | |
except (Exception, BaseException) as e: | |
# Catch exceptions here to add them to the queue. | |
queue.put(e) | |
# Use "fork" (the default on all POSIX except macOS), because pickling doesn't seem | |
# to work on "spawn". | |
ctx = multiprocessing.get_context("fork") | |
queue = ctx.Queue() | |
process = ctx.Process(target=wrapped_func, args=[func, queue]) | |
process.start() | |
process.join(timeout=ttl) | |
if process.is_alive(): | |
process.terminate() | |
process.join() | |
raise TimeoutError(f"Failed to {func.func.__name__} after {ttl} seconds") | |
# Raises an error if the queue is empty. This is fine. It means our subprocess timed out. | |
result = queue.get(block=False) | |
# If we put an exception on the queue then raise instead of returning. | |
if isinstance(result, Exception): | |
raise result | |
if isinstance(result, BaseException): | |
raise Exception(f"BaseException raised in subprocess: {str(result)}") | |
return result | |
def get_subtensor_and_metagraph() -> typing.Tuple[bt.subtensor, bt.metagraph]: | |
for i in range(0, METAGRAPH_RETRIES): | |
try: | |
print("Connecting to subtensor...") | |
subtensor: bt.subtensor = bt.subtensor("finney") | |
print("Pulling metagraph...") | |
metagraph: bt.metagraph = subtensor.metagraph(NETUID, lite=False) | |
return subtensor, metagraph | |
except Exception as e: | |
print(e) | |
if i == METAGRAPH_RETRIES - 1: | |
raise | |
print(f"Error connecting to subtensor or pulling metagraph, retry {i + 1} of {METAGRAPH_RETRIES} in {METAGRAPH_DELAY_SECS} seconds...") | |
time.sleep(METAGRAPH_DELAY_SECS) | |
raise RuntimeError() | |
class ModelData: | |
uid: int | |
hotkey: str | |
namespace: str | |
name: str | |
commit: str | |
hash: str | |
block: int | |
incentive: float | |
emission: float | |
competition: str | |
def from_compressed_str(cls, uid: int, hotkey: str, cs: str, block: int, incentive: float, emission: float): | |
"""Returns an instance of this class from a compressed string representation""" | |
tokens = cs.split(":") | |
return ModelData( | |
uid=uid, | |
hotkey=hotkey, | |
namespace=tokens[0], | |
name=tokens[1], | |
commit=tokens[2] if tokens[2] != "None" else "", | |
hash=tokens[3] if tokens[3] != "None" else "", | |
competition=tokens[4] if len(tokens) > 4 and tokens[4] != "None" else "", | |
block=block, | |
incentive=incentive, | |
emission=emission | |
) | |
def get_subnet_data() -> typing.List[ModelData]: | |
subtensor, metagraph = get_subtensor_and_metagraph() | |
result = [] | |
for uid in tqdm(metagraph.uids.tolist(), desc="Metadata for hotkeys"): | |
hotkey = metagraph.hotkeys[uid] | |
try: | |
# Wrap calls to the subtensor in a subprocess with a timeout to handle potential hangs. | |
partial = functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey) | |
metadata = run_in_subprocess(partial, METADATA_TTL) | |
except KeyboardInterrupt: | |
raise | |
except: | |
metadata = None | |
if not metadata: | |
continue | |
commitment = metadata["info"]["fields"][0] | |
hex_data = commitment[list(commitment.keys())[0]][2:] | |
chain_str = bytes.fromhex(hex_data).decode() | |
block = metadata["block"] | |
incentive = metagraph.incentive[uid].nan_to_num().item() | |
emission = metagraph.emission[uid].nan_to_num().item() * 20 # convert to daily TAO | |
model_data = None | |
try: | |
model_data = ModelData.from_compressed_str(uid, hotkey, chain_str, block, incentive, emission) | |
except: | |
continue | |
result.append(model_data) | |
return result | |
def get_next_update(): | |
now = datetime.datetime.now() | |
delta = now + datetime.timedelta(minutes=20) | |
formatted_time = delta.strftime('%H:%S') | |
return f"""<div align="center" style="font-size: larger;">Next update: <b>{formatted_time}</b></div>""" | |
last_load = datetime.datetime.now() | |
last_result = [] | |
last_time = "" | |
def load_lb() -> typing.List[typing.List[str]]: | |
global last_load, last_result, last_time | |
if last_load + datetime.timedelta(minutes=20) > datetime.datetime.now(): | |
print("cannot load yet") | |
return last_result, last_time | |
last_load = datetime.datetime.now() | |
raw_data = [ | |
( | |
c.namespace, | |
c.name, | |
c.commit, | |
f'{c.namespace}/{c.name}', | |
c.uid, | |
c.block | |
) for c in get_subnet_data() | |
] | |
counter = Counter((namespace, name, commit) for namespace, name, commit, _, _, _ in raw_data) | |
duplicates = {k for k, v in counter.items() if v > 1} | |
block_mapping = defaultdict(set) | |
for namespace, name, commit, _, _, block in raw_data: | |
block_mapping[(namespace, name, commit)].add(block) | |
result = [] | |
for namespace, name, commit, url, uid, block in raw_data: | |
key = (namespace, name, commit) | |
if key in duplicates: | |
blocks_equal = len(block_mapping[key]) == 1 | |
result.append([url, commit, uid, block, blocks_equal]) | |
result = sorted(result, key=lambda x: x[3]) # Sorting mainresult list according to the block | |
last_result = result | |
last_time = get_next_update() | |
return last_result, last_time | |
with gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}") as demo: | |
gr.HTML(FONT) | |
gr.HTML(TITLE) | |
update = gr.HTML() | |
leaderboard_table = gr.components.Dataframe( | |
value=[], | |
headers=["Name", "Commit", "UID", "Block", "blocks equal?"], | |
datatype=["str", "str", "number", "number", "bool"], | |
elem_id="leaderboard-table", | |
interactive=False, | |
visible=True, | |
) | |
demo.load(load_lb, None, [leaderboard_table, update], every=20*60) | |
demo.queue().launch() |