# -*- coding: utf-8 -*- # ------------------------------------------------------------------------------ # # Copyright 2023 Valory AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # ------------------------------------------------------------------------------ import os.path import json import time import random from typing import ( Optional, List, Dict, Union, Any, ) import pandas as pd import requests from eth_typing import ChecksumAddress from eth_utils import to_checksum_address from requests.adapters import HTTPAdapter from requests.exceptions import ( ReadTimeout as RequestsReadTimeoutError, HTTPError as RequestsHTTPError, ) from tqdm import tqdm from urllib3 import Retry from urllib3.exceptions import ( ReadTimeoutError as Urllib3ReadTimeoutError, HTTPError as Urllib3HTTPError, ) from web3 import Web3, HTTPProvider from web3.exceptions import MismatchedABI from markets import add_market_creator from web3.types import BlockParams from concurrent.futures import ThreadPoolExecutor, as_completed from utils import ( clean, BLOCK_FIELD, gen_event_filename, read_abi, SLEEP, reduce_window, limit_text, DATA_DIR, JSON_DATA_DIR, REQUEST_ID_FIELD, MechEvent, MechEventName, MechRequest, MechResponse, EVENT_TO_MECH_STRUCT, REQUEST_ID, HTTP, HTTPS, REQUEST_SENDER, get_result_values, get_vote, get_win_probability, get_prediction_values, ) CONTRACTS_PATH = "contracts" MECH_TO_INFO = { # this block number is when the creator had its first tx ever, and after this mech's creation "0xff82123dfb52ab75c417195c5fdb87630145ae81": ("old_mech_abi.json", 28911547), # this block number is when this mech was created "0x77af31de935740567cf4ff1986d04b2c964a786a": ("new_mech_abi.json", 30776879), } # optionally set the latest block to stop searching for the delivered events LATEST_BLOCK: Optional[int] = None LATEST_BLOCK_NAME: BlockParams = "latest" BLOCK_DATA_NUMBER = "number" BLOCKS_CHUNK_SIZE = 10_000 EVENT_ARGUMENTS = "args" DATA = "data" IPFS_LINKS_SERIES_NAME = "ipfs_links" BACKOFF_FACTOR = 1 STATUS_FORCELIST = [404, 500, 502, 503, 504] DEFAULT_FILENAME = "tools.parquet" RE_RPC_FILTER_ERROR = r"Filter with id: '\d+' does not exist." ABI_ERROR = "The event signature did not match the provided ABI" HTTP_TIMEOUT = 10 N_IPFS_RETRIES = 1 N_RPC_RETRIES = 100 RPC_POLL_INTERVAL = 0.05 IPFS_POLL_INTERVAL = 0.05 IRRELEVANT_TOOLS = [ "openai-text-davinci-002", "openai-text-davinci-003", "openai-gpt-3.5-turbo", "openai-gpt-4", "stabilityai-stable-diffusion-v1-5", "stabilityai-stable-diffusion-xl-beta-v2-2-2", "stabilityai-stable-diffusion-512-v2-1", "stabilityai-stable-diffusion-768-v2-1", "deepmind-optimization-strong", "deepmind-optimization", ] # this is how frequently we will keep a snapshot of the progress so far in terms of blocks' batches # for example, the value 1 means that for every `BLOCKS_CHUNK_SIZE` blocks that we search, # we also store the snapshot SNAPSHOT_RATE = 10 NUM_WORKERS = 10 GET_CONTENTS_BATCH_SIZE = 1000 def get_events( w3: Web3, event: str, mech_address: ChecksumAddress, mech_abi_path: str, earliest_block: int, latest_block: int, ) -> List: """Get the delivered events.""" abi = read_abi(mech_abi_path) contract_instance = w3.eth.contract(address=mech_address, abi=abi) events = [] from_block = earliest_block batch_size = BLOCKS_CHUNK_SIZE with tqdm( total=latest_block - from_block, desc=f"Searching {event} events for mech {mech_address}", unit="blocks", ) as pbar: while from_block < latest_block: events_filter = contract_instance.events[event].build_filter() events_filter.fromBlock = from_block events_filter.toBlock = min(from_block + batch_size, latest_block) entries = None retries = 0 while entries is None: try: entries = events_filter.deploy(w3).get_all_entries() retries = 0 except (RequestsHTTPError, Urllib3HTTPError) as exc: if "Request Entity Too Large" in exc.args[0]: events_filter, batch_size = reduce_window( contract_instance, event, from_block, batch_size, latest_block, ) except (Urllib3ReadTimeoutError, RequestsReadTimeoutError): events_filter, batch_size = reduce_window( contract_instance, event, from_block, batch_size, latest_block ) except Exception as exc: retries += 1 if retries == N_RPC_RETRIES: tqdm.write( f"Skipping events for blocks {events_filter.fromBlock} - {events_filter.toBlock} " f"as the retries have been exceeded." ) break sleep = SLEEP * retries # error_message = "" # if isinstance(exc.args[0], str): # error_message = exc.args[0] # elif isinstance(exc, ValueError): # error_message = exc.args[0].get("message", "") # if ( # ( # isinstance(exc, ValueError) # and re.match(RE_RPC_FILTER_ERROR, error_message) is None # ) # and not isinstance(exc, ValueError) # and not isinstance(exc, MismatchedABI) # ): tqdm.write( f"An error was raised from the RPC: {exc}\n Retrying in {sleep} seconds." ) if hasattr(exc, "message"): tqdm.write(f"Error message: {exc.message}\n") time.sleep(sleep) from_block += batch_size pbar.update(batch_size) if entries is None: continue chunk = list(entries) events.extend(chunk) time.sleep(RPC_POLL_INTERVAL) return events def parse_events(raw_events: List) -> List[MechEvent]: # TODO use dictionary instead of List """Parse all the specified MechEvents.""" parsed_events = [] for event in raw_events: for_block = event.get("blockNumber", 0) args = event.get(EVENT_ARGUMENTS, {}) request_id = args.get(REQUEST_ID, 0) data = args.get(DATA, b"") sender = args.get(REQUEST_SENDER, "") parsed_event = MechEvent(for_block, request_id, data, sender) parsed_events.append(parsed_event) return parsed_events def parse_dict_events(events_dict: dict) -> List[MechEvent]: # TODO use dictionary instead of List """Parse all the specified MechEvents.""" parsed_events = [] list_ids = list(events_dict.keys()) for mech_id in list_ids: event = events_dict[mech_id] for_block = event.get("blockNumber", 0) args = event.get(EVENT_ARGUMENTS, {}) request_id = args.get(REQUEST_ID, 0) data = args.get(DATA, b"") sender = args.get(REQUEST_SENDER, "") parsed_event = MechEvent(for_block, request_id, data, sender) parsed_events.append(parsed_event) return parsed_events def create_session() -> requests.Session: """Create a session with a retry strategy.""" session = requests.Session() retry_strategy = Retry( total=N_IPFS_RETRIES + 1, backoff_factor=BACKOFF_FACTOR, status_forcelist=STATUS_FORCELIST, ) adapter = HTTPAdapter(max_retries=retry_strategy) for protocol in (HTTP, HTTPS): session.mount(protocol, adapter) return session def request( session: requests.Session, url: str, timeout: int = HTTP_TIMEOUT ) -> Optional[requests.Response]: """Perform a request with a session.""" try: response = session.get(url, timeout=timeout) response.raise_for_status() except requests.exceptions.HTTPError as exc: tqdm.write(f"HTTP error occurred: {exc}.") except Exception as exc: tqdm.write(f"Unexpected error occurred: {exc}.") else: return response return None def parse_ipfs_response( session: requests.Session, url: str, event: MechEvent, event_name: MechEventName, response: requests.Response, ) -> Optional[Dict[str, str]]: """Parse a response from IPFS.""" try: return response.json() except requests.exceptions.JSONDecodeError: # this is a workaround because the `metadata.json` file was introduced and removed multiple times if event_name == MechEvent.REQUEST and url != event.ipfs_request_link: url = event.ipfs_request_link response = request(session, url) if response is None: tqdm.write(f"Skipping {event=}.") return None try: return response.json() except requests.exceptions.JSONDecodeError: pass tqdm.write(f"Failed to parse response into json for {url=}.") return None def parse_ipfs_tools_content( raw_content: Dict[str, str], event: MechEvent, event_name: MechEventName ) -> Optional[Union[MechRequest, MechResponse]]: """Parse tools content from IPFS.""" struct = EVENT_TO_MECH_STRUCT.get(event_name) raw_content[REQUEST_ID] = str(event.requestId) raw_content[BLOCK_FIELD] = str(event.for_block) raw_content["sender"] = str(event.sender) try: mech_response = struct(**raw_content) except (ValueError, TypeError, KeyError): tqdm.write(f"Could not parse {limit_text(str(raw_content))}") return None if event_name == MechEventName.REQUEST and mech_response.tool in IRRELEVANT_TOOLS: return None return mech_response def get_contents( session: requests.Session, events: List[MechEvent], event_name: MechEventName ) -> pd.DataFrame: """Fetch the tools' responses.""" contents = [] for event in tqdm(events, desc=f"Tools' results", unit="results"): url = event.ipfs_link(event_name) response = request(session, url) if response is None: tqdm.write(f"Skipping {event=}.") continue raw_content = parse_ipfs_response(session, url, event, event_name, response) if raw_content is None: continue mech_response = parse_ipfs_tools_content(raw_content, event, event_name) if mech_response is None: continue contents.append(mech_response) time.sleep(IPFS_POLL_INTERVAL) return pd.DataFrame(contents) def parse_json_events(json_events: dict, keys_to_traverse: List[int]) -> pd.DataFrame: """Function to parse the mech info in a json format""" all_records = [] for key in keys_to_traverse: try: json_input = json_events[key] output = {} output["request_id"] = json_input["requestId"] output["request_block"] = json_input["blockNumber"] output["prompt_request"] = json_input["ipfsContents"]["prompt"] output["tool"] = json_input["ipfsContents"]["tool"] output["nonce"] = json_input["ipfsContents"]["nonce"] output["trader_address"] = json_input["sender"] output["deliver_block"] = json_input["deliver"]["blockNumber"] error_value, error_message, prediction_params = get_result_values( json_input["deliver"]["ipfsContents"]["result"] ) error_message_value = json_input.get("error_message", error_message) output["error"] = error_value output["error_message"] = error_message_value output["prompt_response"] = json_input["deliver"]["ipfsContents"]["prompt"] output["mech_address"] = json_input["deliver"]["sender"] p_yes_value, p_no_value, confidence_value, info_utility_value = ( get_prediction_values(prediction_params) ) output["p_yes"] = p_yes_value output["p_no"] = p_no_value output["confidence"] = confidence_value output["info_utility"] = info_utility_value output["vote"] = get_vote(p_yes_value, p_no_value) output["win_probability"] = get_win_probability(p_yes_value, p_no_value) all_records.append(output) except Exception as e: print(e) print(f"Error parsing the key ={key}. Noted as error") output["error"] = 1 output["error_message"] = "Response parsing error" output["p_yes"] = None output["p_no"] = None output["confidence"] = None output["info_utility"] = None output["vote"] = None output["win_probability"] = None all_records.append(output) return pd.DataFrame.from_dict(all_records, orient="columns") def transform_request(contents: pd.DataFrame) -> pd.DataFrame: """Transform the requests dataframe.""" return clean(contents) def transform_deliver(contents: pd.DataFrame) -> pd.DataFrame: """Transform the delivers dataframe.""" unpacked_result = pd.json_normalize(contents.result) # # drop result column if it exists if "result" in unpacked_result.columns: unpacked_result.drop(columns=["result"], inplace=True) # drop prompt column if it exists if "prompt" in unpacked_result.columns: unpacked_result.drop(columns=["prompt"], inplace=True) # rename prompt column to prompt_deliver unpacked_result.rename(columns={"prompt": "prompt_deliver"}, inplace=True) contents = pd.concat((contents, unpacked_result), axis=1) if "result" in contents.columns: contents.drop(columns=["result"], inplace=True) if "prompt" in contents.columns: contents.drop(columns=["prompt"], inplace=True) return clean(contents) def store_progress( filename: str, event_to_contents: Dict[str, pd.DataFrame], tools: pd.DataFrame, ) -> None: """Store the given progress.""" print("storing given progress") if filename: DATA_DIR.mkdir(parents=True, exist_ok=True) # Ensure the directory exists for event_name, content in event_to_contents.items(): event_filename = gen_event_filename( event_name ) # Ensure this function returns a valid filename string try: if "result" in content.columns: content = content.drop( columns=["result"] ) # Avoid in-place modification content.to_parquet(DATA_DIR / event_filename, index=False) except Exception as e: print(f"Failed to write {event_name} data: {e}") # Drop result columns for tools DataFrame try: if "result" in tools.columns: tools = tools.drop(columns=["result"]) tools.to_parquet(DATA_DIR / filename, index=False) except Exception as e: print(f"Failed to write tools data: {e}") def etl( rpcs: List[str], mech_info: dict[str, Any], filename: Optional[str] = None, ) -> pd.DataFrame: """Fetch from on-chain events, process, store and return the tools' results on all the questions as a Dataframe.""" w3s = [Web3(HTTPProvider(r)) for r in rpcs] session = create_session() event_to_transformer = { MechEventName.REQUEST: transform_request, MechEventName.DELIVER: transform_deliver, } mech_to_info = { to_checksum_address(address): ( os.path.join(CONTRACTS_PATH, filename), earliest_block, ) for address, (filename, earliest_block) in mech_info.items() } event_to_contents = {} latest_block = LATEST_BLOCK if latest_block is None: latest_block = w3s[0].eth.get_block(LATEST_BLOCK_NAME)[BLOCK_DATA_NUMBER] next_start_block = None # Loop through events in event_to_transformer for event_name, transformer in event_to_transformer.items(): # if next_start_block is None: # next_start_block_base = get_earliest_block(event_name) # Loop through mech addresses in mech_to_info events = [] for address, (abi, earliest_block) in mech_to_info.items(): next_start_block = earliest_block print( f"Searching for {event_name.value} events for mech {address} from block {next_start_block} to {latest_block}." ) # parallelize the fetching of events with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: futures = [] for i in range( next_start_block, latest_block, BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE ): futures.append( executor.submit( get_events, random.choice(w3s), event_name.value, address, abi, i, min(i + BLOCKS_CHUNK_SIZE * SNAPSHOT_RATE, latest_block), ) ) for future in tqdm( as_completed(futures), total=len(futures), desc=f"Fetching {event_name.value} Events", ): current_mech_events = future.result() events.extend(current_mech_events) print("Parsing events") parsed = parse_events(events) contents = [] with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: futures = [] for i in range(0, len(parsed), GET_CONTENTS_BATCH_SIZE): futures.append( executor.submit( get_contents, session, parsed[i : i + GET_CONTENTS_BATCH_SIZE], event_name, ) ) for future in tqdm( as_completed(futures), total=len(futures), desc=f"Fetching {event_name.value} Contents", ): current_mech_contents = future.result() contents.append(current_mech_contents) contents = pd.concat(contents, ignore_index=True) transformed = transformer(contents) event_to_contents[event_name] = transformed.copy() # Store progress tools = pd.merge(*event_to_contents.values(), on=REQUEST_ID_FIELD) print(tools.info()) store_progress(filename, event_to_contents, tools) return tools def parse_store_json_events_parallel( json_events: Dict[str, Any], filename: str = DEFAULT_FILENAME ): total_nr_events = len(json_events) ids_to_traverse = list(json_events.keys()) print(f"Parsing {total_nr_events} events") contents = [] with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: futures = [] for i in range(0, total_nr_events, GET_CONTENTS_BATCH_SIZE): futures.append( executor.submit( parse_json_events, json_events, ids_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE], ) ) for future in tqdm( as_completed(futures), total=len(futures), desc=f"Fetching json contents", ): current_mech_contents = future.result() contents.append(current_mech_contents) tools = pd.concat(contents, ignore_index=True) print(f"Adding market creators info. Length of the tools file = {tools}") tools = add_market_creator(tools) print( f"Length of the tools dataframe after adding market creators info= {len(tools)}" ) print(tools.info()) try: if "result" in tools.columns: tools = tools.drop(columns=["result"]) tools.to_parquet(DATA_DIR / filename, index=False) except Exception as e: print(f"Failed to write tools data: {e}") return tools def generate_tools_file(): """Function to parse the json mech events and generate the parquet tools file""" try: with open(JSON_DATA_DIR / "tools_info.json", "r") as file: file_contents = json.load(file) parse_store_json_events_parallel(file_contents) except Exception as e: print(f"An Exception happened while parsing the json events {e}") if __name__ == "__main__": RPCs = [ "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a", ] filename = DEFAULT_FILENAME tools = etl(rpcs=RPCs, filename=filename)