sn6_dups / app.py
pawkanarek's picture
use globals
885c476 verified
import gradio as gr
import bittensor as bt
import typing
from bittensor.extrinsics.serving import get_metadata
from dataclasses import dataclass
import datetime
import time
import functools
import multiprocessing
from tqdm import tqdm
from collections import Counter, defaultdict
FONT = """<link href="https://fonts.cdnfonts.com/css/jmh-typewriter" rel="stylesheet">"""
TITLE = """<h1 align="center" id="space-title" class="typewriter">Subnet 6 Duplicate Radar</h1>"""
REPO_ID = "pawkanarek/sn6_dups"
METAGRAPH_RETRIES = 10
METAGRAPH_DELAY_SECS = 30
METADATA_TTL = 10
NETUID = 6
def run_in_subprocess(func: functools.partial, ttl: int) -> typing.Any:
"""Runs the provided function on a subprocess with 'ttl' seconds to complete.
Args:
func (functools.partial): Function to be run.
ttl (int): How long to try for in seconds.
Returns:
Any: The value returned by 'func'
"""
def wrapped_func(func: functools.partial, queue: multiprocessing.Queue):
try:
result = func()
queue.put(result)
except (Exception, BaseException) as e:
# Catch exceptions here to add them to the queue.
queue.put(e)
# Use "fork" (the default on all POSIX except macOS), because pickling doesn't seem
# to work on "spawn".
ctx = multiprocessing.get_context("fork")
queue = ctx.Queue()
process = ctx.Process(target=wrapped_func, args=[func, queue])
process.start()
process.join(timeout=ttl)
if process.is_alive():
process.terminate()
process.join()
raise TimeoutError(f"Failed to {func.func.__name__} after {ttl} seconds")
# Raises an error if the queue is empty. This is fine. It means our subprocess timed out.
result = queue.get(block=False)
# If we put an exception on the queue then raise instead of returning.
if isinstance(result, Exception):
raise result
if isinstance(result, BaseException):
raise Exception(f"BaseException raised in subprocess: {str(result)}")
return result
def get_subtensor_and_metagraph() -> typing.Tuple[bt.subtensor, bt.metagraph]:
for i in range(0, METAGRAPH_RETRIES):
try:
print("Connecting to subtensor...")
subtensor: bt.subtensor = bt.subtensor("finney")
print("Pulling metagraph...")
metagraph: bt.metagraph = subtensor.metagraph(NETUID, lite=False)
return subtensor, metagraph
except Exception as e:
print(e)
if i == METAGRAPH_RETRIES - 1:
raise
print(f"Error connecting to subtensor or pulling metagraph, retry {i + 1} of {METAGRAPH_RETRIES} in {METAGRAPH_DELAY_SECS} seconds...")
time.sleep(METAGRAPH_DELAY_SECS)
raise RuntimeError()
@dataclass
class ModelData:
uid: int
hotkey: str
namespace: str
name: str
commit: str
hash: str
block: int
incentive: float
emission: float
competition: str
@classmethod
def from_compressed_str(cls, uid: int, hotkey: str, cs: str, block: int, incentive: float, emission: float):
"""Returns an instance of this class from a compressed string representation"""
tokens = cs.split(":")
return ModelData(
uid=uid,
hotkey=hotkey,
namespace=tokens[0],
name=tokens[1],
commit=tokens[2] if tokens[2] != "None" else "",
hash=tokens[3] if tokens[3] != "None" else "",
competition=tokens[4] if len(tokens) > 4 and tokens[4] != "None" else "",
block=block,
incentive=incentive,
emission=emission
)
def get_subnet_data() -> typing.List[ModelData]:
subtensor, metagraph = get_subtensor_and_metagraph()
result = []
for uid in tqdm(metagraph.uids.tolist(), desc="Metadata for hotkeys"):
hotkey = metagraph.hotkeys[uid]
try:
# Wrap calls to the subtensor in a subprocess with a timeout to handle potential hangs.
partial = functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey)
metadata = run_in_subprocess(partial, METADATA_TTL)
except KeyboardInterrupt:
raise
except:
metadata = None
if not metadata:
continue
commitment = metadata["info"]["fields"][0]
hex_data = commitment[list(commitment.keys())[0]][2:]
chain_str = bytes.fromhex(hex_data).decode()
block = metadata["block"]
incentive = metagraph.incentive[uid].nan_to_num().item()
emission = metagraph.emission[uid].nan_to_num().item() * 20 # convert to daily TAO
model_data = None
try:
model_data = ModelData.from_compressed_str(uid, hotkey, chain_str, block, incentive, emission)
except:
continue
result.append(model_data)
return result
def get_next_update():
now = datetime.datetime.now()
delta = now + datetime.timedelta(minutes=20)
formatted_time = delta.strftime('%H:%S')
return f"""<div align="center" style="font-size: larger;">Next update: <b>{formatted_time}</b></div>"""
last_load = datetime.datetime.now()
last_result = []
last_time = ""
def load_lb() -> typing.List[typing.List[str]]:
global last_load, last_result, last_time
if last_load + datetime.timedelta(minutes=20) > datetime.datetime.now():
print("cannot load yet")
return last_result, last_time
last_load = datetime.datetime.now()
raw_data = [
(
c.namespace,
c.name,
c.commit,
f'{c.namespace}/{c.name}',
c.uid,
c.block
) for c in get_subnet_data()
]
counter = Counter((namespace, name, commit) for namespace, name, commit, _, _, _ in raw_data)
duplicates = {k for k, v in counter.items() if v > 1}
block_mapping = defaultdict(set)
for namespace, name, commit, _, _, block in raw_data:
block_mapping[(namespace, name, commit)].add(block)
result = []
for namespace, name, commit, url, uid, block in raw_data:
key = (namespace, name, commit)
if key in duplicates:
blocks_equal = len(block_mapping[key]) == 1
result.append([url, commit, uid, block, blocks_equal])
result = sorted(result, key=lambda x: x[3]) # Sorting mainresult list according to the block
last_result = result
last_time = get_next_update()
return last_result, last_time
with gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}") as demo:
gr.HTML(FONT)
gr.HTML(TITLE)
update = gr.HTML()
leaderboard_table = gr.components.Dataframe(
value=[],
headers=["Name", "Commit", "UID", "Block", "blocks equal?"],
datatype=["str", "str", "number", "number", "bool"],
elem_id="leaderboard-table",
interactive=False,
visible=True,
)
demo.load(load_lb, None, [leaderboard_table, update], every=20*60)
demo.queue().launch()