|
import sys |
|
import json |
|
import os |
|
import time |
|
from tqdm import tqdm |
|
from typing import List, Any, Optional, Union |
|
import numpy as np |
|
import pandas as pd |
|
import gc |
|
import re |
|
from dataclasses import dataclass |
|
from pathlib import Path |
|
from enum import Enum |
|
from json.decoder import JSONDecodeError |
|
|
|
REDUCE_FACTOR = 0.25 |
|
SLEEP = 0.5 |
|
REQUEST_ID_FIELD = "request_id" |
|
SCRIPTS_DIR = Path(__file__).parent |
|
ROOT_DIR = SCRIPTS_DIR.parent |
|
DATA_DIR = ROOT_DIR / "data" |
|
BLOCK_FIELD = "block" |
|
CID_PREFIX = "f01701220" |
|
REQUEST_ID = "requestId" |
|
REQUEST_SENDER = "sender" |
|
PROMPT_FIELD = "prompt" |
|
HTTP = "http://" |
|
HTTPS = HTTP[:4] + "s" + HTTP[4:] |
|
IPFS_ADDRESS = f"{HTTPS}gateway.autonolas.tech/ipfs/" |
|
FORMAT_UPDATE_BLOCK_NUMBER = 30411638 |
|
INVALID_ANSWER_HEX = ( |
|
"0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" |
|
) |
|
|
|
INC_TOOLS = [ |
|
"prediction-online", |
|
"prediction-offline", |
|
"claude-prediction-online", |
|
"claude-prediction-offline", |
|
"prediction-offline-sme", |
|
"prediction-online-sme", |
|
"prediction-request-rag", |
|
"prediction-request-reasoning", |
|
"prediction-url-cot-claude", |
|
"prediction-request-rag-claude", |
|
"prediction-request-reasoning-claude", |
|
] |
|
|
|
SUBGRAPH_API_KEY = os.environ.get("SUBGRAPH_API_KEY", None) |
|
RPC = os.environ.get("RPC", None) |
|
|
|
|
|
class MechEventName(Enum): |
|
"""The mech's event names.""" |
|
|
|
REQUEST = "Request" |
|
DELIVER = "Deliver" |
|
|
|
|
|
@dataclass |
|
class MechEvent: |
|
"""A mech's on-chain event representation.""" |
|
|
|
for_block: int |
|
requestId: int |
|
data: bytes |
|
sender: str |
|
|
|
def _ipfs_link(self) -> Optional[str]: |
|
"""Get the ipfs link for the data.""" |
|
return f"{IPFS_ADDRESS}{CID_PREFIX}{self.data.hex()}" |
|
|
|
@property |
|
def ipfs_request_link(self) -> Optional[str]: |
|
"""Get the IPFS link for the request.""" |
|
return f"{self._ipfs_link()}/metadata.json" |
|
|
|
@property |
|
def ipfs_deliver_link(self) -> Optional[str]: |
|
"""Get the IPFS link for the deliver.""" |
|
if self.requestId is None: |
|
return None |
|
return f"{self._ipfs_link()}/{self.requestId}" |
|
|
|
def ipfs_link(self, event_name: MechEventName) -> Optional[str]: |
|
"""Get the ipfs link based on the event.""" |
|
if event_name == MechEventName.REQUEST: |
|
if self.for_block < FORMAT_UPDATE_BLOCK_NUMBER: |
|
return self._ipfs_link() |
|
return self.ipfs_request_link |
|
if event_name == MechEventName.DELIVER: |
|
return self.ipfs_deliver_link |
|
return None |
|
|
|
|
|
@dataclass(init=False) |
|
class MechRequest: |
|
"""A structure for a request to a mech.""" |
|
|
|
request_id: Optional[int] |
|
request_block: Optional[int] |
|
prompt_request: Optional[str] |
|
tool: Optional[str] |
|
nonce: Optional[str] |
|
trader_address: Optional[str] |
|
|
|
def __init__(self, **kwargs: Any) -> None: |
|
"""Initialize the request ignoring extra keys.""" |
|
self.request_id = int(kwargs.pop(REQUEST_ID, 0)) |
|
self.request_block = int(kwargs.pop(BLOCK_FIELD, 0)) |
|
self.prompt_request = kwargs.pop(PROMPT_FIELD, None) |
|
self.tool = kwargs.pop("tool", None) |
|
self.nonce = kwargs.pop("nonce", None) |
|
self.trader_address = kwargs.pop("sender", None) |
|
|
|
|
|
@dataclass(init=False) |
|
class PredictionResponse: |
|
"""A response of a prediction.""" |
|
|
|
p_yes: float |
|
p_no: float |
|
confidence: float |
|
info_utility: float |
|
vote: Optional[str] |
|
win_probability: Optional[float] |
|
|
|
def __init__(self, **kwargs: Any) -> None: |
|
"""Initialize the mech's prediction ignoring extra keys.""" |
|
try: |
|
self.p_yes = float(kwargs.pop("p_yes")) |
|
self.p_no = float(kwargs.pop("p_no")) |
|
self.confidence = float(kwargs.pop("confidence")) |
|
self.info_utility = float(kwargs.pop("info_utility")) |
|
self.win_probability = 0 |
|
|
|
|
|
probabilities = { |
|
"p_yes": self.p_yes, |
|
"p_no": self.p_no, |
|
"confidence": self.confidence, |
|
"info_utility": self.info_utility, |
|
} |
|
|
|
for name, prob in probabilities.items(): |
|
if not 0 <= prob <= 1: |
|
raise ValueError(f"{name} probability is out of bounds: {prob}") |
|
|
|
if self.p_yes + self.p_no != 1: |
|
raise ValueError( |
|
f"Sum of p_yes and p_no is not 1: {self.p_yes} + {self.p_no}" |
|
) |
|
|
|
self.vote = self.get_vote() |
|
self.win_probability = self.get_win_probability() |
|
|
|
except KeyError as e: |
|
raise KeyError(f"Missing key in PredictionResponse: {e}") |
|
except ValueError as e: |
|
raise ValueError(f"Invalid value in PredictionResponse: {e}") |
|
|
|
def get_vote(self) -> Optional[str]: |
|
"""Return the vote.""" |
|
if self.p_no == self.p_yes: |
|
return None |
|
if self.p_no > self.p_yes: |
|
return "No" |
|
return "Yes" |
|
|
|
def get_win_probability(self) -> Optional[float]: |
|
"""Return the probability estimation for winning with vote.""" |
|
return max(self.p_no, self.p_yes) |
|
|
|
|
|
@dataclass(init=False) |
|
class MechResponse: |
|
"""A structure for the response of a mech.""" |
|
|
|
request_id: int |
|
deliver_block: Optional[int] |
|
result: Optional[PredictionResponse] |
|
error: Optional[str] |
|
error_message: Optional[str] |
|
prompt_response: Optional[str] |
|
mech_address: Optional[str] |
|
|
|
def __init__(self, **kwargs: Any) -> None: |
|
"""Initialize the mech's response ignoring extra keys.""" |
|
self.error = kwargs.get("error", None) |
|
self.request_id = int(kwargs.get(REQUEST_ID, 0)) |
|
self.deliver_block = int(kwargs.get(BLOCK_FIELD, 0)) |
|
self.result = kwargs.get("result", None) |
|
self.prompt_response = kwargs.get(PROMPT_FIELD, None) |
|
self.mech_address = kwargs.get("sender", None) |
|
|
|
if self.result != "Invalid response": |
|
self.error_message = kwargs.get("error_message", None) |
|
|
|
try: |
|
if isinstance(self.result, str): |
|
kwargs = json.loads(self.result) |
|
self.result = PredictionResponse(**kwargs) |
|
self.error = 0 |
|
|
|
except JSONDecodeError: |
|
self.error_message = "Response parsing error" |
|
self.error = 1 |
|
|
|
except Exception as e: |
|
self.error_message = str(e) |
|
self.error = 1 |
|
|
|
else: |
|
self.error_message = "Invalid response from tool" |
|
self.error = 1 |
|
self.result = None |
|
|
|
|
|
EVENT_TO_MECH_STRUCT = { |
|
MechEventName.REQUEST: MechRequest, |
|
MechEventName.DELIVER: MechResponse, |
|
} |
|
|
|
|
|
def parse_args() -> str: |
|
"""Parse the arguments and return the RPC.""" |
|
if len(sys.argv) != 2: |
|
raise ValueError("Expected the RPC as a positional argument.") |
|
return sys.argv[1] |
|
|
|
|
|
def read_abi(abi_path: str) -> str: |
|
"""Read and return the wxDAI contract's ABI.""" |
|
with open(abi_path) as abi_file: |
|
return abi_file.read() |
|
|
|
|
|
def reduce_window(contract_instance, event, from_block, batch_size, latest_block): |
|
"""Dynamically reduce the batch size window.""" |
|
keep_fraction = 1 - REDUCE_FACTOR |
|
events_filter = contract_instance.events[event].build_filter() |
|
events_filter.fromBlock = from_block |
|
batch_size = int(batch_size * keep_fraction) |
|
events_filter.toBlock = min(from_block + batch_size, latest_block) |
|
tqdm.write(f"RPC timed out! Resizing batch size to {batch_size}.") |
|
time.sleep(SLEEP) |
|
return events_filter, batch_size |
|
|
|
|
|
def limit_text(text: str, limit: int = 200) -> str: |
|
"""Limit the given text""" |
|
if len(text) > limit: |
|
return f"{text[:limit]}..." |
|
return text |
|
|
|
|
|
def check_for_dicts(df: pd.DataFrame) -> List[str]: |
|
"""Check for columns that contain dictionaries.""" |
|
dict_columns = [] |
|
for column in df.columns: |
|
if df[column].apply(lambda x: isinstance(x, dict)).any(): |
|
dict_columns.append(column) |
|
return dict_columns |
|
|
|
|
|
def drop_dict_rows(df: pd.DataFrame, dict_columns: List[str]) -> pd.DataFrame: |
|
"""Drop rows that contain dictionaries.""" |
|
for column in dict_columns: |
|
df = df[~df[column].apply(lambda x: isinstance(x, dict))] |
|
return df |
|
|
|
|
|
def clean(df: pd.DataFrame) -> pd.DataFrame: |
|
"""Clean the dataframe.""" |
|
dict_columns = check_for_dicts(df) |
|
df = drop_dict_rows(df, dict_columns) |
|
cleaned = df.drop_duplicates() |
|
cleaned[REQUEST_ID_FIELD] = cleaned[REQUEST_ID_FIELD].astype("str") |
|
return cleaned |
|
|
|
|
|
def gen_event_filename(event_name: MechEventName) -> str: |
|
"""Generate the filename of an event.""" |
|
return f"{event_name.value.lower()}s.parquet" |
|
|
|
|
|
def read_n_last_lines(filename: str, n: int = 1) -> str: |
|
"""Return the `n` last lines' content of a file.""" |
|
num_newlines = 0 |
|
with open(filename, "rb") as f: |
|
try: |
|
f.seek(-2, os.SEEK_END) |
|
while num_newlines < n: |
|
f.seek(-2, os.SEEK_CUR) |
|
if f.read(1) == b"\n": |
|
num_newlines += 1 |
|
except OSError: |
|
f.seek(0) |
|
last_line = f.readline().decode() |
|
return last_line |
|
|
|
|
|
def get_earliest_block(event_name: MechEventName) -> int: |
|
"""Get the earliest block number to use when filtering for events.""" |
|
filename = gen_event_filename(event_name) |
|
if not os.path.exists(DATA_DIR / filename): |
|
return 0 |
|
|
|
df = pd.read_parquet(DATA_DIR / filename) |
|
block_field = f"{event_name.value.lower()}_{BLOCK_FIELD}" |
|
earliest_block = int(df[block_field].max()) |
|
|
|
del df |
|
gc.collect() |
|
return earliest_block |
|
|
|
|
|
def get_question(text: str) -> str: |
|
"""Get the question from a text.""" |
|
|
|
pattern = r'"([^"]*)"' |
|
|
|
|
|
questions = re.findall(pattern, text) |
|
|
|
|
|
question = questions[0] if questions else None |
|
|
|
return question |
|
|
|
|
|
def current_answer(text: str, fpmms: pd.DataFrame) -> Optional[str]: |
|
"""Get the current answer for a question.""" |
|
row = fpmms[fpmms["title"] == text] |
|
if row.shape[0] == 0: |
|
return None |
|
return row["currentAnswer"].values[0] |
|
|
|
|
|
def convert_hex_to_int(x: Union[str, float]) -> Union[int, float]: |
|
"""Convert hex to int""" |
|
if isinstance(x, float): |
|
return np.nan |
|
if isinstance(x, str): |
|
if x == INVALID_ANSWER_HEX: |
|
return -1 |
|
return int(x, 16) |
|
|
|
|
|
def wei_to_unit(wei: int) -> float: |
|
"""Converts wei to currency unit.""" |
|
return wei / 10**18 |
|
|
|
|
|
def measure_execution_time(func): |
|
def wrapper(*args, **kwargs): |
|
start_time = time.time() |
|
result = func(*args, **kwargs) |
|
end_time = time.time() |
|
execution_time = end_time - start_time |
|
print(f"Execution time: {execution_time:.6f} seconds") |
|
return result |
|
|
|
return wrapper |
|
|
|
|
|
def _to_content(q: str) -> dict[str, Any]: |
|
"""Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes.""" |
|
finalized_query = { |
|
"query": q, |
|
"variables": None, |
|
"extensions": {"headers": None}, |
|
} |
|
return finalized_query |
|
|