import gradio as gr import bittensor as bt import typing from bittensor.extrinsics.serving import get_metadata from dataclasses import dataclass import datetime import time import functools import multiprocessing from tqdm import tqdm from collections import Counter, defaultdict FONT = """""" TITLE = """

Subnet 6 Duplicate Radar

""" REPO_ID = "pawkanarek/sn6_dups" METAGRAPH_RETRIES = 10 METAGRAPH_DELAY_SECS = 30 METADATA_TTL = 10 NETUID = 6 def run_in_subprocess(func: functools.partial, ttl: int) -> typing.Any: """Runs the provided function on a subprocess with 'ttl' seconds to complete. Args: func (functools.partial): Function to be run. ttl (int): How long to try for in seconds. Returns: Any: The value returned by 'func' """ def wrapped_func(func: functools.partial, queue: multiprocessing.Queue): try: result = func() queue.put(result) except (Exception, BaseException) as e: # Catch exceptions here to add them to the queue. queue.put(e) # Use "fork" (the default on all POSIX except macOS), because pickling doesn't seem # to work on "spawn". ctx = multiprocessing.get_context("fork") queue = ctx.Queue() process = ctx.Process(target=wrapped_func, args=[func, queue]) process.start() process.join(timeout=ttl) if process.is_alive(): process.terminate() process.join() raise TimeoutError(f"Failed to {func.func.__name__} after {ttl} seconds") # Raises an error if the queue is empty. This is fine. It means our subprocess timed out. result = queue.get(block=False) # If we put an exception on the queue then raise instead of returning. if isinstance(result, Exception): raise result if isinstance(result, BaseException): raise Exception(f"BaseException raised in subprocess: {str(result)}") return result def get_subtensor_and_metagraph() -> typing.Tuple[bt.subtensor, bt.metagraph]: for i in range(0, METAGRAPH_RETRIES): try: print("Connecting to subtensor...") subtensor: bt.subtensor = bt.subtensor("finney") print("Pulling metagraph...") metagraph: bt.metagraph = subtensor.metagraph(NETUID, lite=False) return subtensor, metagraph except Exception as e: print(e) if i == METAGRAPH_RETRIES - 1: raise print(f"Error connecting to subtensor or pulling metagraph, retry {i + 1} of {METAGRAPH_RETRIES} in {METAGRAPH_DELAY_SECS} seconds...") time.sleep(METAGRAPH_DELAY_SECS) raise RuntimeError() @dataclass class ModelData: uid: int hotkey: str namespace: str name: str commit: str hash: str block: int incentive: float emission: float competition: str @classmethod def from_compressed_str(cls, uid: int, hotkey: str, cs: str, block: int, incentive: float, emission: float): """Returns an instance of this class from a compressed string representation""" tokens = cs.split(":") return ModelData( uid=uid, hotkey=hotkey, namespace=tokens[0], name=tokens[1], commit=tokens[2] if tokens[2] != "None" else "", hash=tokens[3] if tokens[3] != "None" else "", competition=tokens[4] if len(tokens) > 4 and tokens[4] != "None" else "", block=block, incentive=incentive, emission=emission ) def get_subnet_data() -> typing.List[ModelData]: subtensor, metagraph = get_subtensor_and_metagraph() result = [] for uid in tqdm(metagraph.uids.tolist(), desc="Metadata for hotkeys"): hotkey = metagraph.hotkeys[uid] try: # Wrap calls to the subtensor in a subprocess with a timeout to handle potential hangs. partial = functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey) metadata = run_in_subprocess(partial, METADATA_TTL) except KeyboardInterrupt: raise except: metadata = None if not metadata: continue commitment = metadata["info"]["fields"][0] hex_data = commitment[list(commitment.keys())[0]][2:] chain_str = bytes.fromhex(hex_data).decode() block = metadata["block"] incentive = metagraph.incentive[uid].nan_to_num().item() emission = metagraph.emission[uid].nan_to_num().item() * 20 # convert to daily TAO model_data = None try: model_data = ModelData.from_compressed_str(uid, hotkey, chain_str, block, incentive, emission) except: continue result.append(model_data) return result def get_next_update(): now = datetime.datetime.now() delta = now + datetime.timedelta(minutes=20) formatted_time = delta.strftime('%H:%S') return f"""
Next update: {formatted_time}
""" last_load = datetime.datetime.now() last_result = [] last_time = "" def load_lb() -> typing.List[typing.List[str]]: global last_load, last_result, last_time if last_load + datetime.timedelta(minutes=20) > datetime.datetime.now(): print("cannot load yet") return last_result, last_time last_load = datetime.datetime.now() raw_data = [ ( c.namespace, c.name, c.commit, f'{c.namespace}/{c.name}', c.uid, c.block ) for c in get_subnet_data() ] counter = Counter((namespace, name, commit) for namespace, name, commit, _, _, _ in raw_data) duplicates = {k for k, v in counter.items() if v > 1} block_mapping = defaultdict(set) for namespace, name, commit, _, _, block in raw_data: block_mapping[(namespace, name, commit)].add(block) result = [] for namespace, name, commit, url, uid, block in raw_data: key = (namespace, name, commit) if key in duplicates: blocks_equal = len(block_mapping[key]) == 1 result.append([url, commit, uid, block, blocks_equal]) result = sorted(result, key=lambda x: x[3]) # Sorting mainresult list according to the block last_result = result last_time = get_next_update() return last_result, last_time with gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}") as demo: gr.HTML(FONT) gr.HTML(TITLE) update = gr.HTML() leaderboard_table = gr.components.Dataframe( value=[], headers=["Name", "Commit", "UID", "Block", "blocks equal?"], datatype=["str", "str", "number", "number", "bool"], elem_id="leaderboard-table", interactive=False, visible=True, ) demo.load(load_lb, None, [leaderboard_table, update], every=20*60) demo.queue().launch()