import json import sys from pathlib import Path from typing import Any, List from utils import RPC, DATA_DIR import requests from tqdm import tqdm from web3 import Web3 import pandas as pd import pickle import os from concurrent.futures import ThreadPoolExecutor, as_completed NUM_WORKERS = 10 DEPRECATED_STAKING_PROGRAMS = { "quickstart_alpha_everest": "0x5add592ce0a1B5DceCebB5Dcac086Cd9F9e3eA5C", "quickstart_alpha_alpine": "0x2Ef503950Be67a98746F484DA0bBAdA339DF3326", "quickstart_alpha_coastal": "0x43fB32f25dce34EB76c78C7A42C8F40F84BCD237", } STAKING_PROGRAMS_QS = { "quickstart_beta_hobbyist": "0x389B46c259631Acd6a69Bde8B6cEe218230bAE8C", "quickstart_beta_hobbyist_2": "0x238EB6993b90a978ec6AAD7530d6429c949C08DA", "quickstart_beta_expert": "0x5344B7DD311e5d3DdDd46A4f71481bD7b05AAA3e", "quickstart_beta_expert_2": "0xb964e44c126410df341ae04B13aB10A985fE3513", "quickstart_beta_expert_3": "0x80faD33Cadb5F53f9D29F02Db97D682E8b101618", } STAKING_PROGRAMS_PEARL = { "pearl_alpha": "0xEE9F19b5DF06c7E8Bfc7B28745dcf944C504198A", "pearl_beta": "0xeF44Fb0842DDeF59D37f85D61A1eF492bbA6135d", "pearl_beta_2": "0x1c2F82413666d2a3fD8bC337b0268e62dDF67434", } SERVICE_REGISTRY_ADDRESS = "0x9338b5153AE39BB89f50468E608eD9d764B755fD" def _get_contract(address: str) -> Any: w3 = Web3(Web3.HTTPProvider(RPC)) abi = _get_abi(address) contract = w3.eth.contract(address=Web3.to_checksum_address(address), abi=abi) return contract def _get_abi(address: str) -> List: contract_abi_url = ( "https://gnosis.blockscout.com/api/v2/smart-contracts/{contract_address}" ) response = requests.get(contract_abi_url.format(contract_address=address)).json() if "result" in response: result = response["result"] try: abi = json.loads(result) except json.JSONDecodeError: print("Error: Failed to parse 'result' field as JSON") sys.exit(1) else: abi = response.get("abi") return abi if abi else [] def get_service_safe(service_id: int) -> str: """Gets the service Safe""" service_registry = _get_contract(SERVICE_REGISTRY_ADDRESS) service_safe_address = service_registry.functions.getService(service_id).call()[1] return service_safe_address def list_contract_functions(contract): function_names = [] for item in contract.abi: if item.get("type") == "function": function_names.append(item.get("name")) return function_names def get_service_data(service_registry: Any, service_id: int) -> dict: tmp_map = {} # Get the list of addresses # print(f"getting addresses from service id ={service_id}") # available_functions = list_contract_functions(service_registry) # print("Available Contract Functions:") # for func in available_functions: # print(f"- {func}") data = service_registry.functions.getService(service_id).call() try: owner_data = service_registry.functions.ownerOf(service_id).call() except Exception as e: tqdm.write(f"Error: no owner data infor from {service_id}") return None # print(f"owner data = {owner_data}") address = data[1] state = data[-1] # print(f"address = {address}") # print(f"state={state}") if address != "0x0000000000000000000000000000000000000000": tmp_map[service_id] = { "safe_address": address, "state": state, "owner_address": owner_data, } return tmp_map def update_service_map(start: int = 1, end: int = 1000): if os.path.exists(DATA_DIR / "service_map.pkl"): with open(DATA_DIR / "service_map.pkl", "rb") as f: service_map = pickle.load(f) else: service_map = {} # we do not know which is the last service id right now service_registry = _get_contract(SERVICE_REGISTRY_ADDRESS) with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: futures = [] for service_id in range(start, end): futures.append( executor.submit( get_service_data, service_registry, service_id, ) ) for future in tqdm( as_completed(futures), total=len(futures), desc=f"Fetching all service data from contracts", ): partial_dict = future.result() if partial_dict: service_map.update(partial_dict) with open(DATA_DIR / "service_map.pkl", "wb") as f: pickle.dump(service_map, f) def check_owner_staking_contract(owner_address: str) -> str: staking = "non_staking" owner_address = owner_address.lower() # check quickstart staking contracts qs_list = [x.lower() for x in STAKING_PROGRAMS_QS.values()] if owner_address in qs_list: return "quickstart" # check pearl staking contracts pearl_list = [x.lower() for x in STAKING_PROGRAMS_PEARL.values()] if owner_address in pearl_list: return "pearl" # check legacy staking contracts deprec_list = [x.lower() for x in DEPRECATED_STAKING_PROGRAMS.values()] if owner_address in deprec_list: return "quickstart" return staking def get_trader_address_staking(trader_address: str, service_map: dict) -> str: # check if there is any service id linked with that trader address found_key = -1 for key, value in service_map.items(): if value["safe_address"].lower() == trader_address.lower(): # found a service found_key = key break if found_key == -1: return "non_agent" owner = service_map[found_key]["owner_address"] return check_owner_staking_contract(owner_address=owner) def label_trades_by_staking(trades_df: pd.DataFrame) -> pd.DataFrame: with open(DATA_DIR / "service_map.pkl", "rb") as f: service_map = pickle.load(f) # get the last service id keys = service_map.keys() last_key = max(keys) update_service_map(start=last_key) all_traders = trades_df.trader_address.unique() trades_df["staking"] = "" for trader in tqdm(all_traders, desc="Labeling traders by staking", unit="trader"): # tqdm.write(f"checking trader {trader}") staking_label = get_trader_address_staking(trader, service_map) if staking_label: trades_df.loc[trades_df["trader_address"] == trader, "staking"] = ( staking_label ) # tqdm.write(f"statking label {staking_label}") return if __name__ == "__main__": # create_service_map() trades_df = pd.read_parquet(DATA_DIR / "all_trades_profitability.parquet") label_trades_by_staking(trades_df=trades_df) print( trades_df[ [ "trader_address", "creation_timestamp", "market_creator", "staking", "collateral_amount", ] ] ) print(trades_df.staking.value_counts()) trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False)