# -*- coding: utf-8 -*- # ------------------------------------------------------------------------------ # # Copyright 2023 Valory AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # ------------------------------------------------------------------------------ import os.path import re import time import random from typing import ( Optional, List, Dict, Union, ) import pandas as pd import requests from eth_typing import ChecksumAddress from eth_utils import to_checksum_address from requests.adapters import HTTPAdapter from requests.exceptions import ( ReadTimeout as RequestsReadTimeoutError, HTTPError as RequestsHTTPError, ) from tqdm import tqdm from urllib3 import Retry from urllib3.exceptions import ( ReadTimeoutError as Urllib3ReadTimeoutError, HTTPError as Urllib3HTTPError, ) from web3 import Web3, HTTPProvider from web3.exceptions import MismatchedABI from web3.types import BlockParams from concurrent.futures import ThreadPoolExecutor, as_completed from get_mech_info import get_mech_info_last_60_days from utils import ( clean, BLOCK_FIELD, gen_event_filename, read_abi, SLEEP, reduce_window, limit_text, DATA_DIR, REQUEST_ID_FIELD, MechEvent, MechEventName, MechRequest, MechResponse, EVENT_TO_MECH_STRUCT, REQUEST_ID, HTTP, HTTPS, REQUEST_SENDER, ) CONTRACTS_PATH = "contracts" MECH_TO_INFO = { # this block number is when the creator had its first tx ever, and after this mech's creation "0xff82123dfb52ab75c417195c5fdb87630145ae81": ("old_mech_abi.json", 28911547), # this block number is when this mech was created "0x77af31de935740567cf4ff1986d04b2c964a786a": ("new_mech_abi.json", 30776879), } # optionally set the latest block to stop searching for the delivered events LATEST_BLOCK: Optional[int] = None LATEST_BLOCK_NAME: BlockParams = "latest" BLOCK_DATA_NUMBER = "number" BLOCKS_CHUNK_SIZE = 10_000 EVENT_ARGUMENTS = "args" DATA = "data" IPFS_LINKS_SERIES_NAME = "ipfs_links" BACKOFF_FACTOR = 1 STATUS_FORCELIST = [404, 500, 502, 503, 504] DEFAULT_FILENAME = "tools.parquet" RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist." ABI_ERROR = "The event signature did not match the provided ABI" HTTP_TIMEOUT = 10 N_IPFS_RETRIES = 1 N_RPC_RETRIES = 100 RPC_POLL_INTERVAL = 0.05 IPFS_POLL_INTERVAL = 0.05 IRRELEVANT_TOOLS = [ "openai-text-davinci-002", "openai-text-davinci-003", "openai-gpt-3.5-turbo", "openai-gpt-4", "stabilityai-stable-diffusion-v1-5", "stabilityai-stable-diffusion-xl-beta-v2-2-2", "stabilityai-stable-diffusion-512-v2-1", "stabilityai-stable-diffusion-768-v2-1", "deepmind-optimization-strong", "deepmind-optimization", ] # this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches # for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search, # we also store the snapshot SNAPSHOT_RATE = 10 NUM_WORKERS = 10 GET_CONTENTS_BATCH_SIZE = 1000 def get_events( w3: Web3, event: str, mech_address: ChecksumAddress, mech_abi_path: str, earliest_block: int, latest_block: int, ) -> List: """Get the delivered events.""" abi = read_abi(mech_abi_path) contract_instance = w3.eth.contract(address=mech_address, abi=abi) events = [] from_block = earliest_block batch_size = BLOCKS_CHUNK_SIZE with tqdm( total=latest_block - from_block, desc=f"Searching {event} events for mech {mech_address}", unit="blocks", ) as pbar: while from_block < latest_block: events_filter = contract_instance.events[event].build_filter() events_filter.fromBlock = from_block events_filter.toBlock = min(from_block + batch_size, latest_block) entries = None retries = 0 while entries is None: try: entries = events_filter.deploy(w3).get_all_entries() retries = 0 except (RequestsHTTPError, Urllib3HTTPError) as exc: if "Request Entity Too Large" in exc.args[0]: events_filter, batch_size = reduce_window( contract_instance, event, from_block, batch_size, latest_block, ) except (Urllib3ReadTimeoutError, RequestsReadTimeoutError): events_filter, batch_size = reduce_window( contract_instance, event, from_block, batch_size, latest_block ) except Exception as exc: retries += 1 if retries == N_RPC_RETRIES: tqdm.write( f"Skipping events for blocks {events_filter.fromBlock} - {events_filter.toBlock} " f"as the retries have been exceeded." ) break sleep = SLEEP * retries if ( ( isinstance(exc, ValueError) and re.match( RE_RPC_FILTER_ERROR, exc.args[0].get("message", "") ) is None ) and not isinstance(exc, ValueError) and not isinstance(exc, MismatchedABI) ): tqdm.write( f"An error was raised from the RPC: {exc}\n Retrying in {sleep} seconds." ) time.sleep(sleep) from_block += batch_size pbar.update(batch_size) if entries is None: continue chunk = list(entries) events.extend(chunk) time.sleep(RPC_POLL_INTERVAL) return events def parse_events(raw_events: List) -> List[MechEvent]: """Parse all the specified MechEvents.""" parsed_events = [] for event in raw_events: for_block = event.get("blockNumber", 0) args = event.get(EVENT_ARGUMENTS, {}) request_id = args.get(REQUEST_ID, 0) data = args.get(DATA, b"") sender = args.get(REQUEST_SENDER, "") parsed_event = MechEvent(for_block, request_id, data, sender) parsed_events.append(parsed_event) return parsed_events def create_session() -> requests.Session: """Create a session with a retry strategy.""" session = requests.Session() retry_strategy = Retry( total=N_IPFS_RETRIES + 1, backoff_factor=BACKOFF_FACTOR, status_forcelist=STATUS_FORCELIST, ) adapter = HTTPAdapter(max_retries=retry_strategy) for protocol in (HTTP, HTTPS): session.mount(protocol, adapter) return session def request( session: requests.Session, url: str, timeout: int = HTTP_TIMEOUT ) -> Optional[requests.Response]: """Perform a request with a session.""" try: response = session.get(url, timeout=timeout) response.raise_for_status() except requests.exceptions.HTTPError as exc: tqdm.write(f"HTTP error occurred: {exc}.") except Exception as exc: tqdm.write(f"Unexpected error occurred: {exc}.") else: return response return None def parse_ipfs_response( session: requests.Session, url: str, event: MechEvent, event_name: MechEventName, response: requests.Response, ) -> Optional[Dict[str, str]]: """Parse a response from IPFS.""" try: return response.json() except requests.exceptions.JSONDecodeError: # this is a workaround because the `metadata.json` file was introduced and removed multiple times if event_name == MechEventName.REQUEST and url != event.ipfs_request_link: url = event.ipfs_request_link response = request(session, url) if response is None: tqdm.write(f"Skipping {event=}.") return None try: return response.json() except requests.exceptions.JSONDecodeError: pass tqdm.write(f"Failed to parse response into json for {url=}.") return None def parse_ipfs_tools_content( raw_content: Dict[str, str], event: MechEvent, event_name: MechEventName ) -> Optional[Union[MechRequest, MechResponse]]: """Parse tools content from IPFS.""" struct = EVENT_TO_MECH_STRUCT.get(event_name) raw_content[REQUEST_ID] = str(event.requestId) raw_content[BLOCK_FIELD] = str(event.for_block) raw_content["sender"] = str(event.sender) try: mech_response = struct(**raw_content) except (ValueError, TypeError, KeyError): tqdm.write(f"Could not parse {limit_text(str(raw_content))}") return None if event_name == MechEventName.REQUEST and mech_response.tool in IRRELEVANT_TOOLS: return None return mech_response def get_contents( session: requests.Session, events: List[MechEvent], event_name: MechEventName ) -> pd.DataFrame: """Fetch the tools' responses.""" contents = [] for event in tqdm(events, desc=f"Tools' results", unit="results"): url = event.ipfs_link(event_name) response = request(session, url) if response is None: tqdm.write(f"Skipping {event=}.") continue raw_content = parse_ipfs_response(session, url, event, event_name, response) if raw_content is None: continue mech_response = parse_ipfs_tools_content(raw_content, event, event_name) if mech_response is None: continue contents.append(mech_response) time.sleep(IPFS_POLL_INTERVAL) return pd.DataFrame(contents) def transform_request(contents: pd.DataFrame) -> pd.DataFrame: """Transform the requests dataframe.""" return clean(contents) def transform_deliver(contents: pd.DataFrame) -> pd.DataFrame: """Transform the delivers dataframe.""" unpacked_result = pd.json_normalize(contents.result) # # drop result column if it exists if "result" in unpacked_result.columns: unpacked_result.drop(columns=["result"], inplace=True) # drop prompt column if it exists if "prompt" in unpacked_result.columns: unpacked_result.drop(columns=["prompt"], inplace=True) # rename prompt column to prompt_deliver unpacked_result.rename(columns={"prompt": "prompt_deliver"}, inplace=True) contents = pd.concat((contents, unpacked_result), axis=1) if "result" in contents.columns: contents.drop(columns=["result"], inplace=True) if "prompt" in contents.columns: contents.drop(columns=["prompt"], inplace=True) return clean(contents) def store_progress( filename: str, event_to_contents: Dict[str, pd.DataFrame], tools: pd.DataFrame, ) -> None: """Store the given progress.""" print("storing given progress") if filename: DATA_DIR.mkdir(parents=True, exist_ok=True) # Ensure the directory exists for event_name, content in event_to_contents.items(): event_filename = gen_event_filename( event_name ) # Ensure this function returns a valid filename string try: if "result" in content.columns: content = content.drop( columns=["result"] ) # Avoid in-place modification content.to_parquet(DATA_DIR / event_filename, index=False) except Exception as e: print(f"Failed to write {event_name} data: {e}") # Drop result and error columns for tools DataFrame try: if "result" in tools.columns: tools = tools.drop(columns=["result"]) tools.to_parquet(DATA_DIR / filename, index=False) except Exception as e: print(f"Failed to write tools data: {e}") def etl( rpcs: List[str], filename: Optional[str] = None, ) -> pd.DataFrame: """Fetch from on-chain events, process, store and return the tools' results on all the questions as a Dataframe.""" w3s = [Web3(HTTPProvider(r)) for r in rpcs] session = create_session() event_to_transformer = { MechEventName.REQUEST: transform_request, MechEventName.DELIVER: transform_deliver, } mech_to_info = { to_checksum_address(address): ( os.path.join(CONTRACTS_PATH, filename), earliest_block, ) for address, (filename, earliest_block) in get_mech_info_last_60_days().items() } event_to_contents = {} latest_block = LATEST_BLOCK if latest_block is None: latest_block = w3s[0].eth.get_block(LATEST_BLOCK_NAME)[BLOCK_DATA_NUMBER] next_start_block = None # Loop through events in event_to_transformer for event_name, transformer in event_to_transformer.items(): # if next_start_block is None: # next_start_block_base = get_earliest_block(event_name) # Loop through mech addresses in mech_to_info events = [] for address, (abi, earliest_block) in mech_to_info.items(): next_start_block = earliest_block print( f"Searching for {event_name.value} events for mech {address} from block {next_start_block} to {latest_block}." ) # parallelize the fetching of events with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: futures = [] for i in range( next_start_block, latest_block, BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE ): futures.append( executor.submit( get_events, random.choice(w3s), event_name.value, address, abi, i, min(i + BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE, latest_block), ) ) for future in tqdm( as_completed(futures), total=len(futures), desc=f"Fetching {event_name.value} Events", ): current_mech_events = future.result() events.extend(current_mech_events) print("Parsing events") parsed = parse_events(events) contents = [] with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: futures = [] for i in range(0, len(parsed), GET_CONTENTS_BATCH_SIZE): futures.append( executor.submit( get_contents, session, parsed[i : i + GET_CONTENTS_BATCH_SIZE], event_name, ) ) for future in tqdm( as_completed(futures), total=len(futures), desc=f"Fetching {event_name.value} Contents", ): current_mech_contents = future.result() contents.append(current_mech_contents) contents = pd.concat(contents, ignore_index=True) transformed = transformer(contents) event_to_contents[event_name] = transformed.copy() # Store progress tools = pd.merge(*event_to_contents.values(), on=REQUEST_ID_FIELD) print(tools.info()) store_progress(filename, event_to_contents, tools) return tools def update_tools_accuracy( tools_acc: pd.DataFrame, tools_df: pd.DataFrame, inc_tools: List[str] ) -> pd.DataFrame: """To compute/update the latest accuracy information for the different mech tools""" # computation of the accuracy information tools_inc = tools_df[tools_df["tool"].isin(inc_tools)] # filtering errors tools_non_error = tools_inc[tools_inc["error"] != 1] tools_non_error.loc[:, "currentAnswer"] = tools_non_error["currentAnswer"].replace( {"no": "No", "yes": "Yes"} ) tools_non_error = tools_non_error[ tools_non_error["currentAnswer"].isin(["Yes", "No"]) ] tools_non_error = tools_non_error[tools_non_error["vote"].isin(["Yes", "No"])] tools_non_error["win"] = ( tools_non_error["currentAnswer"] == tools_non_error["vote"] ).astype(int) tools_non_error.columns = tools_non_error.columns.astype(str) print("Tools dataset after filtering") print(tools_non_error.head()) wins = tools_non_error.groupby(["tool", "win"]).size().unstack().fillna(0) wins["tool_accuracy"] = (wins[1] / (wins[0] + wins[1])) * 100 wins.reset_index(inplace=True) wins["total_requests"] = wins[0] + wins[1] wins.columns = wins.columns.astype(str) wins = wins[["tool", "tool_accuracy", "total_requests"]] print("Wins dataset") print(wins.head()) timeline = tools_non_error.groupby(["tool"])["request_time"].agg(["min", "max"]) print("timeline dataset") print(timeline.head()) acc_info = wins.merge(timeline, how="left", on="tool") if tools_acc is None: print("Creating accuracy file for the first time") return acc_info # update the old information print("Updating accuracy information") tools_to_update = list(acc_info["tool"].values) existing_tools = list(tools_acc["tool"].values) for tool in tools_to_update: if tool in existing_tools: new_accuracy = acc_info[acc_info["tool"] == tool]["tool_accuracy"].values[0] new_volume = acc_info[acc_info["tool"] == tool]["total_requests"].values[0] new_min_timeline = acc_info[acc_info["tool"] == tool]["min"].values[0] new_max_timeline = acc_info[acc_info["tool"] == tool]["max"].values[0] tools_acc.loc[tools_acc["tool"] == tool, "tool_accuracy"] = new_accuracy tools_acc.loc[tools_acc["tool"] == tool, "total_requests"] = new_volume tools_acc.loc[tools_acc["tool"] == tool, "min"] = new_min_timeline tools_acc.loc[tools_acc["tool"] == tool, "max"] = new_max_timeline print(tools_acc) return tools_acc if __name__ == "__main__": RPCs = [ "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a", ] filename = DEFAULT_FILENAME tools = etl(rpcs=RPCs, filename=filename)