import sys import json import os import time from tqdm import tqdm from typing import List, Any, Optional, Union, Tuple import numpy as np import pandas as pd import gc import re from dataclasses import dataclass from pathlib import Path from enum import Enum from json.decoder import JSONDecodeError REDUCE_FACTOR = 0.25 SLEEP = 0.5 REQUEST_ID_FIELD = "request_id" SCRIPTS_DIR = Path(__file__).parent ROOT_DIR = SCRIPTS_DIR.parent DATA_DIR = ROOT_DIR / "data" JSON_DATA_DIR = ROOT_DIR / "json_data" BLOCK_FIELD = "block" CID_PREFIX = "f01701220" REQUEST_ID = "requestId" REQUEST_SENDER = "sender" PROMPT_FIELD = "prompt" HTTP = "http://" HTTPS = HTTP[:4] + "s" + HTTP[4:] IPFS_ADDRESS = f"{HTTPS}gateway.autonolas.tech/ipfs/" FORMAT_UPDATE_BLOCK_NUMBER = 30411638 INVALID_ANSWER_HEX = ( "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" ) INC_TOOLS = [ "prediction-online", "prediction-offline", "claude-prediction-online", "claude-prediction-offline", "prediction-offline-sme", "prediction-online-sme", "prediction-request-rag", "prediction-request-reasoning", "prediction-url-cot-claude", "prediction-request-rag-claude", "prediction-request-reasoning-claude", "superforcaster", ] SUBGRAPH_API_KEY = os.environ.get("SUBGRAPH_API_KEY", None) RPC = os.environ.get("RPC", None) class MechEventName(Enum): """The mech's event names.""" REQUEST = "Request" DELIVER = "Deliver" @dataclass class MechEvent: """A mech's on-chain event representation.""" for_block: int requestId: int data: bytes sender: str def _ipfs_link(self) -> Optional[str]: """Get the ipfs link for the data.""" return f"{IPFS_ADDRESS}{CID_PREFIX}{self.data.hex()}" @property def ipfs_request_link(self) -> Optional[str]: """Get the IPFS link for the request.""" return f"{self._ipfs_link()}/metadata.json" @property def ipfs_deliver_link(self) -> Optional[str]: """Get the IPFS link for the deliver.""" if self.requestId is None: return None return f"{self._ipfs_link()}/{self.requestId}" def ipfs_link(self, event_name: MechEventName) -> Optional[str]: """Get the ipfs link based on the event.""" if event_name == MechEventName.REQUEST: if self.for_block < FORMAT_UPDATE_BLOCK_NUMBER: return self._ipfs_link() return self.ipfs_request_link if event_name == MechEventName.DELIVER: return self.ipfs_deliver_link return None @dataclass(init=False) class MechRequest: """A structure for a request to a mech.""" request_id: Optional[int] request_block: Optional[int] prompt_request: Optional[str] tool: Optional[str] nonce: Optional[str] trader_address: Optional[str] def __init__(self, **kwargs: Any) -> None: """Initialize the request ignoring extra keys.""" self.request_id = int(kwargs.pop(REQUEST_ID, 0)) self.request_block = int(kwargs.pop(BLOCK_FIELD, 0)) self.prompt_request = kwargs.pop(PROMPT_FIELD, None) self.tool = kwargs.pop("tool", None) self.nonce = kwargs.pop("nonce", None) self.trader_address = kwargs.pop("sender", None) @dataclass(init=False) class PredictionResponse: """A response of a prediction.""" p_yes: float p_no: float confidence: float info_utility: float vote: Optional[str] win_probability: Optional[float] def __init__(self, **kwargs: Any) -> None: """Initialize the mech's prediction ignoring extra keys.""" try: self.p_yes = float(kwargs.pop("p_yes")) self.p_no = float(kwargs.pop("p_no")) self.confidence = float(kwargs.pop("confidence")) self.info_utility = float(kwargs.pop("info_utility")) self.win_probability = 0 # Validate probabilities probabilities = { "p_yes": self.p_yes, "p_no": self.p_no, "confidence": self.confidence, "info_utility": self.info_utility, } for name, prob in probabilities.items(): if not 0 <= prob <= 1: raise ValueError(f"{name} probability is out of bounds: {prob}") if self.p_yes + self.p_no != 1: raise ValueError( f"Sum of p_yes and p_no is not 1: {self.p_yes} + {self.p_no}" ) self.vote = self.get_vote() self.win_probability = self.get_win_probability() except KeyError as e: raise KeyError(f"Missing key in PredictionResponse: {e}") except ValueError as e: raise ValueError(f"Invalid value in PredictionResponse: {e}") def get_vote(self) -> Optional[str]: """Return the vote.""" if self.p_no == self.p_yes: return None if self.p_no > self.p_yes: return "No" return "Yes" def get_win_probability(self) -> Optional[float]: """Return the probability estimation for winning with vote.""" return max(self.p_no, self.p_yes) @dataclass(init=False) class MechResponse: """A structure for the response of a mech.""" request_id: int deliver_block: Optional[int] result: Optional[PredictionResponse] error: Optional[str] error_message: Optional[str] prompt_response: Optional[str] mech_address: Optional[str] def __init__(self, **kwargs: Any) -> None: """Initialize the mech's response ignoring extra keys.""" self.error = kwargs.get("error", None) self.request_id = int(kwargs.get(REQUEST_ID, 0)) self.deliver_block = int(kwargs.get(BLOCK_FIELD, 0)) self.result = kwargs.get("result", None) self.prompt_response = kwargs.get(PROMPT_FIELD, None) self.mech_address = kwargs.get("sender", None) if self.result != "Invalid response": self.error_message = kwargs.get("error_message", None) try: if isinstance(self.result, str): kwargs = json.loads(self.result) self.result = PredictionResponse(**kwargs) self.error = 0 except JSONDecodeError: self.error_message = "Response parsing error" self.error = 1 except Exception as e: self.error_message = str(e) self.error = 1 else: self.error_message = "Invalid response from tool" self.error = 1 self.result = None EVENT_TO_MECH_STRUCT = { MechEventName.REQUEST: MechRequest, MechEventName.DELIVER: MechResponse, } def measure_execution_time(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {execution_time:.6f} seconds") return result return wrapper def parse_args() -> str: """Parse the arguments and return the RPC.""" if len(sys.argv) != 2: raise ValueError("Expected the RPC as a positional argument.") return sys.argv[1] def read_abi(abi_path: str) -> str: """Read and return the wxDAI contract's ABI.""" with open(abi_path) as abi_file: return abi_file.read() def reduce_window(contract_instance, event, from_block, batch_size, latest_block): """Dynamically reduce the batch size window.""" keep_fraction = 1 - REDUCE_FACTOR events_filter = contract_instance.events[event].build_filter() events_filter.fromBlock = from_block batch_size = int(batch_size * keep_fraction) events_filter.toBlock = min(from_block + batch_size, latest_block) tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.") time.sleep(SLEEP) return events_filter, batch_size def limit_text(text: str, limit: int = 200) -> str: """Limit the given text""" if len(text) > limit: return f"{text[:limit]}..." return text def check_for_dicts(df: pd.DataFrame) -> List[str]: """Check for columns that contain dictionaries.""" dict_columns = [] for column in df.columns: if df[column].apply(lambda x: isinstance(x, dict)).any(): dict_columns.append(column) return dict_columns def drop_dict_rows(df: pd.DataFrame, dict_columns: List[str]) -> pd.DataFrame: """Drop rows that contain dictionaries.""" for column in dict_columns: df = df[~df[column].apply(lambda x: isinstance(x, dict))] return df def clean(df: pd.DataFrame) -> pd.DataFrame: """Clean the dataframe.""" dict_columns = check_for_dicts(df) df = drop_dict_rows(df, dict_columns) cleaned = df.drop_duplicates() cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str") return cleaned def gen_event_filename(event_name: MechEventName) -> str: """Generate the filename of an event.""" return f"{event_name.value.lower()}s.parquet" def read_n_last_lines(filename: str, n: int = 1) -> str: """Return the `n` last lines' content of a file.""" num_newlines = 0 with open(filename, "rb") as f: try: f.seek(-2, os.SEEK_END) while num_newlines < n: f.seek(-2, os.SEEK_CUR) if f.read(1) == b"\n": num_newlines += 1 except OSError: f.seek(0) last_line = f.readline().decode() return last_line def get_earliest_block(event_name: MechEventName) -> int: """Get the earliest block number to use when filtering for events.""" filename = gen_event_filename(event_name) if not os.path.exists(DATA_DIR / filename): return 0 df = pd.read_parquet(DATA_DIR / filename) block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}" earliest_block = int(df[block_field].max()) # clean and release all memory del df gc.collect() return earliest_block def get_question(text: str) -> str: """Get the question from a text.""" # Regex to find text within double quotes pattern = r'"([^"]*)"' # Find all occurrences questions = re.findall(pattern, text) # Assuming you want the first question if there are multiple question = questions[0] if questions else None return question def current_answer(text: str, fpmms: pd.DataFrame) -> Optional[str]: """Get the current answer for a question.""" row = fpmms[fpmms["title"] == text] if row.shape[0] == 0: return None return row["currentAnswer"].values[0] def convert_hex_to_int(x: Union[str, float]) -> Union[int, float]: """Convert hex to int""" if isinstance(x, float): return np.nan if isinstance(x, str): if x == INVALID_ANSWER_HEX: return -1 return int(x, 16) def wei_to_unit(wei: int) -> float: """Converts wei to currency unit.""" return wei / 10**18 def measure_execution_time(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {execution_time:.6f} seconds") return result return wrapper def get_vote(p_yes, p_no) -> Optional[str]: """Return the vote.""" if p_no == p_yes: return None if p_no > p_yes: return "No" return "Yes" def get_win_probability(p_yes, p_no) -> Optional[float]: """Return the probability estimation for winning with vote.""" return max(p_no, p_yes) def get_result_values(result: str) -> Tuple: if result == "Invalid response": return 1, "Invalid response from tool", None error_message = None params = None try: if isinstance(result, str): params = json.loads(result) error_value = 0 except JSONDecodeError: error_message = "Response parsing error" error_value = 1 except Exception as e: error_message = str(e) error_value = 1 return error_value, error_message, params def get_prediction_values(params: dict) -> Tuple: p_yes = float(params.pop("p_yes")) p_no = float(params.pop("p_no")) confidence = float(params.pop("confidence")) info_utility = float(params.pop("info_utility")) return p_yes, p_no, confidence, info_utility def _to_content(q: str) -> dict[str, Any]: """Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes.""" finalized_query = { "query": q, "variables": None, "extensions": {"headers": None}, } return finalized_query