# -*- coding: utf-8 -*- # ------------------------------------------------------------------------------ # # Copyright 2023 Valory AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # ------------------------------------------------------------------------------ import time import requests import datetime import pandas as pd from collections import defaultdict from typing import Any from string import Template from enum import Enum from tqdm import tqdm import numpy as np import os from pathlib import Path from get_mech_info import DATETIME_60_DAYS_AGO from utils import SUBGRAPH_API_KEY, wei_to_unit, convert_hex_to_int, _to_content from queries import omen_xdai_trades_query, conditional_tokens_gc_user_query QUERY_BATCH_SIZE = 1000 DUST_THRESHOLD = 10000000000000 INVALID_ANSWER = -1 FPMM_QS_CREATOR = "0x89c5cc945dd550bcffb72fe42bff002429f46fec" FPMM_PEARL_CREATOR = "0xFfc8029154ECD55ABED15BD428bA596E7D23f557" DEFAULT_FROM_DATE = "1970-01-01T00:00:00" DEFAULT_TO_DATE = "2038-01-19T03:14:07" DEFAULT_FROM_TIMESTAMP = 0 DEFAULT_60_DAYS_AGO_TIMESTAMP = (DATETIME_60_DAYS_AGO).timestamp() DEFAULT_TO_TIMESTAMP = 2147483647 # around year 2038 WXDAI_CONTRACT_ADDRESS = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d" DEFAULT_MECH_FEE = 0.01 DUST_THRESHOLD = 10000000000000 SCRIPTS_DIR = Path(__file__).parent ROOT_DIR = SCRIPTS_DIR.parent DATA_DIR = ROOT_DIR / "data" class MarketState(Enum): """Market state""" OPEN = 1 PENDING = 2 FINALIZING = 3 ARBITRATING = 4 CLOSED = 5 def __str__(self) -> str: """Prints the market status.""" return self.name.capitalize() class MarketAttribute(Enum): """Attribute""" NUM_TRADES = "Num_trades" WINNER_TRADES = "Winner_trades" NUM_REDEEMED = "Num_redeemed" INVESTMENT = "Investment" FEES = "Fees" MECH_CALLS = "Mech_calls" MECH_FEES = "Mech_fees" EARNINGS = "Earnings" NET_EARNINGS = "Net_earnings" REDEMPTIONS = "Redemptions" ROI = "ROI" def __str__(self) -> str: """Prints the attribute.""" return self.value def __repr__(self) -> str: """Prints the attribute representation.""" return self.name @staticmethod def argparse(s: str) -> "MarketAttribute": """Performs string conversion to MarketAttribute.""" try: return MarketAttribute[s.upper()] except KeyError as e: raise ValueError(f"Invalid MarketAttribute: {s}") from e ALL_TRADES_STATS_DF_COLS = [ "trader_address", "market_creator", "trade_id", "creation_timestamp", "title", "market_status", "collateral_amount", "outcome_index", "trade_fee_amount", "outcomes_tokens_traded", "current_answer", "is_invalid", "winning_trade", "earnings", "redeemed", "redeemed_amount", "num_mech_calls", "mech_fee_amount", "net_earnings", "roi", ] SUMMARY_STATS_DF_COLS = [ "trader_address", "num_trades", "num_winning_trades", "num_redeemed", "total_investment", "total_trade_fees", "num_mech_calls", "total_mech_fees", "total_earnings", "total_redeemed_amount", "total_net_earnings", "total_net_earnings_wo_mech_fees", "total_roi", "total_roi_wo_mech_fees", "mean_mech_calls_per_trade", "mean_mech_fee_amount_per_trade", ] headers = { "Accept": "application/json, multipart/mixed", "Content-Type": "application/json", } def _query_omen_xdai_subgraph( trader_category: str, from_timestamp: float, to_timestamp: float, fpmm_from_timestamp: float, fpmm_to_timestamp: float, ) -> dict[str, Any]: """Query the subgraph.""" OMEN_SUBGRAPH_URL = Template( """https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" ) omen_subgraph = OMEN_SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) print(f"omen_subgraph = {omen_subgraph}") grouped_results = defaultdict(list) id_gt = "" if trader_category == "quickstart": creator_id = FPMM_QS_CREATOR.lower() else: # pearl creator_id = FPMM_PEARL_CREATOR.lower() while True: query = omen_xdai_trades_query.substitute( fpmm_creator=creator_id, creationTimestamp_gte=int(from_timestamp), creationTimestamp_lte=int(to_timestamp), fpmm_creationTimestamp_gte=int(fpmm_from_timestamp), fpmm_creationTimestamp_lte=int(fpmm_to_timestamp), first=QUERY_BATCH_SIZE, id_gt=id_gt, ) content_json = _to_content(query) res = requests.post(omen_subgraph, headers=headers, json=content_json) result_json = res.json() # print(f"result = {result_json}") user_trades = result_json.get("data", {}).get("fpmmTrades", []) if not user_trades: break for trade in user_trades: fpmm_id = trade.get("fpmm", {}).get("id") grouped_results[fpmm_id].append(trade) id_gt = user_trades[len(user_trades) - 1]["id"] all_results = { "data": { "fpmmTrades": [ trade for trades_list in grouped_results.values() for trade in trades_list ] } } return all_results def _query_conditional_tokens_gc_subgraph(creator: str) -> dict[str, Any]: """Query the subgraph.""" SUBGRAPH_URL = Template( """https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/7s9rGBffUTL8kDZuxvvpuc46v44iuDarbrADBFw5uVp2""" ) subgraph = SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) all_results: dict[str, Any] = {"data": {"user": {"userPositions": []}}} userPositions_id_gt = "" while True: query = conditional_tokens_gc_user_query.substitute( id=creator.lower(), first=QUERY_BATCH_SIZE, userPositions_id_gt=userPositions_id_gt, ) content_json = {"query": query} print("sending query to subgraph") res = requests.post(subgraph, headers=headers, json=content_json) result_json = res.json() # print(f"result = {result_json}") user_data = result_json.get("data", {}).get("user", {}) if not user_data: break user_positions = user_data.get("userPositions", []) if user_positions: all_results["data"]["user"]["userPositions"].extend(user_positions) userPositions_id_gt = user_positions[len(user_positions) - 1]["id"] else: break if len(all_results["data"]["user"]["userPositions"]) == 0: return {"data": {"user": None}} return all_results def _is_redeemed(user_json: dict[str, Any], fpmmTrade: dict[str, Any]) -> bool: """Returns whether the user has redeemed the position.""" user_positions = user_json["data"]["user"]["userPositions"] condition_id = fpmmTrade["fpmm.condition.id"] for position in user_positions: position_condition_ids = position["position"]["conditionIds"] balance = int(position["balance"]) if condition_id in position_condition_ids: if balance == 0: return True # return early return False return False def transform_fpmmTrades(df: pd.DataFrame) -> pd.DataFrame: print("Transforming trades dataframe") # convert creator to address df["creator"] = df["creator"].apply(lambda x: x["id"]) # normalize fpmm column fpmm = pd.json_normalize(df["fpmm"]) fpmm.columns = [f"fpmm.{col}" for col in fpmm.columns] df = pd.concat([df, fpmm], axis=1) # drop fpmm column df.drop(["fpmm"], axis=1, inplace=True) # change creator to creator_address df.rename(columns={"creator": "trader_address"}, inplace=True) print(df.head()) print(df.info()) return df def create_fpmmTrades(rpc: str, from_timestamp: float = DEFAULT_FROM_TIMESTAMP): """Create fpmmTrades for all trades.""" # Quickstart trades qs_trades_json = _query_omen_xdai_subgraph( trader_category="quickstart", from_timestamp=from_timestamp, to_timestamp=DEFAULT_TO_TIMESTAMP, fpmm_from_timestamp=from_timestamp, fpmm_to_timestamp=DEFAULT_TO_TIMESTAMP, ) print(f"length of the qs_trades_json dataset {len(qs_trades_json)}") # convert to dataframe qs_df = pd.DataFrame(qs_trades_json["data"]["fpmmTrades"]) qs_df["market_creator"] = "quickstart" qs_df = transform_fpmmTrades(qs_df) # Pearl trades pearl_trades_json = _query_omen_xdai_subgraph( trader_category="pearl", from_timestamp=from_timestamp, to_timestamp=DEFAULT_TO_TIMESTAMP, fpmm_from_timestamp=from_timestamp, fpmm_to_timestamp=DEFAULT_TO_TIMESTAMP, ) print(f"length of the pearl_trades_json dataset {len(pearl_trades_json)}") # convert to dataframe pearl_df = pd.DataFrame(pearl_trades_json["data"]["fpmmTrades"]) pearl_df["market_creator"] = "pearl" pearl_df = transform_fpmmTrades(pearl_df) return pd.concat([qs_df, pearl_df], ignore_index=True) def prepare_profitalibity_data( rpc: str, tools_filename: str = "tools.parquet", trades_filename: str = "fpmmTrades.parquet", from_timestamp: float = DEFAULT_60_DAYS_AGO_TIMESTAMP, ): """Prepare data for profitalibity analysis.""" # Check if tools.parquet is in the same directory try: tools = pd.read_parquet(DATA_DIR / tools_filename) # make sure creator_address is in the columns assert "trader_address" in tools.columns, "trader_address column not found" # lowercase and strip creator_address tools["trader_address"] = tools["trader_address"].str.lower().str.strip() # drop duplicates tools.drop_duplicates(inplace=True) print(f"{tools_filename} loaded") except FileNotFoundError: print("tools.parquet not found. Please run tools.py first.") return # Check if fpmmTrades.parquet is in the same directory try: fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename) print(f"{trades_filename} loaded") except FileNotFoundError: print("fpmmTrades.parquet not found. Creating fpmmTrades.parquet...") fpmmTrades = create_fpmmTrades(rpc, from_timestamp=from_timestamp) fpmmTrades.to_parquet(DATA_DIR / "fpmmTrades.parquet", index=False) # make sure trader_address is in the columns assert "trader_address" in fpmmTrades.columns, "trader_address column not found" # lowercase and strip creator_address fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip() return fpmmTrades, tools def determine_market_status(trade, current_answer): """Determine the market status of a trade.""" if current_answer is np.nan and time.time() >= int(trade["fpmm.openingTimestamp"]): return MarketState.PENDING elif current_answer == np.nan: return MarketState.OPEN elif trade["fpmm.isPendingArbitration"]: return MarketState.ARBITRATING elif time.time() < int(trade["fpmm.answerFinalizedTimestamp"]): return MarketState.FINALIZING return MarketState.CLOSED def analyse_trader( trader_address: str, fpmmTrades: pd.DataFrame, tools: pd.DataFrame ) -> pd.DataFrame: """Analyse a trader's trades""" # Filter trades and tools for the given trader trades = fpmmTrades[fpmmTrades["trader_address"] == trader_address] tools_usage = tools[tools["trader_address"] == trader_address] # Prepare the DataFrame trades_df = pd.DataFrame(columns=ALL_TRADES_STATS_DF_COLS) if trades.empty: return trades_df # Fetch user's conditional tokens gc graph try: user_json = _query_conditional_tokens_gc_subgraph(trader_address) except Exception as e: print(f"Error fetching user data: {e}") return trades_df # Iterate over the trades for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"): try: if not trade["fpmm.currentAnswer"]: print(f"Skipping trade {i} because currentAnswer is NaN") continue # Parsing and computing shared values creation_timestamp_utc = datetime.datetime.fromtimestamp( int(trade["creationTimestamp"]), tz=datetime.timezone.utc ) collateral_amount = wei_to_unit(float(trade["collateralAmount"])) fee_amount = wei_to_unit(float(trade["feeAmount"])) outcome_tokens_traded = wei_to_unit(float(trade["outcomeTokensTraded"])) earnings, winner_trade = (0, False) redemption = _is_redeemed(user_json, trade) current_answer = trade["fpmm.currentAnswer"] market_creator = trade["market_creator"] # Determine market status market_status = determine_market_status(trade, current_answer) # Skip non-closed markets if market_status != MarketState.CLOSED: print( f"Skipping trade {i} because market is not closed. Market Status: {market_status}" ) continue current_answer = convert_hex_to_int(current_answer) # Compute invalidity is_invalid = current_answer == INVALID_ANSWER # Compute earnings and winner trade status if is_invalid: earnings = collateral_amount winner_trade = False elif int(trade["outcomeIndex"]) == current_answer: earnings = outcome_tokens_traded winner_trade = True # Compute mech calls try: num_mech_calls = ( tools_usage["prompt_request"] .apply(lambda x: trade["title"] in x) .sum() ) except Exception: print(f"Error while getting the number of mech calls") num_mech_calls = 2 # Average value net_earnings = ( earnings - fee_amount - (num_mech_calls * DEFAULT_MECH_FEE) - collateral_amount ) # Assign values to DataFrame trades_df.loc[i] = { "trader_address": trader_address, "market_creator": market_creator, "trade_id": trade["id"], "market_status": market_status.name, "creation_timestamp": creation_timestamp_utc, "title": trade["title"], "collateral_amount": collateral_amount, "outcome_index": trade["outcomeIndex"], "trade_fee_amount": fee_amount, "outcomes_tokens_traded": outcome_tokens_traded, "current_answer": current_answer, "is_invalid": is_invalid, "winning_trade": winner_trade, "earnings": earnings, "redeemed": redemption, "redeemed_amount": earnings if redemption else 0, "num_mech_calls": num_mech_calls, "mech_fee_amount": num_mech_calls * DEFAULT_MECH_FEE, "net_earnings": net_earnings, "roi": net_earnings / (collateral_amount + fee_amount + num_mech_calls * DEFAULT_MECH_FEE), } except Exception as e: print(f"Error processing trade {i}: {e}") continue return trades_df def analyse_all_traders(trades: pd.DataFrame, tools: pd.DataFrame) -> pd.DataFrame: """Analyse all creators.""" all_traders = [] for trader in tqdm( trades["trader_address"].unique(), total=len(trades["trader_address"].unique()), desc="Analysing creators", ): all_traders.append(analyse_trader(trader, trades, tools)) # concat all creators all_creators_df = pd.concat(all_traders) return all_creators_df def summary_analyse(df): """Summarise profitability analysis.""" # Ensure DataFrame is not empty if df.empty: return pd.DataFrame(columns=SUMMARY_STATS_DF_COLS) # Group by trader_address grouped = df.groupby("trader_address") # Create summary DataFrame summary_df = grouped.agg( num_trades=("trader_address", "size"), num_winning_trades=("winning_trade", lambda x: float((x).sum())), num_redeemed=("redeemed", lambda x: float(x.sum())), total_investment=("collateral_amount", "sum"), total_trade_fees=("trade_fee_amount", "sum"), num_mech_calls=("num_mech_calls", "sum"), total_mech_fees=("mech_fee_amount", "sum"), total_earnings=("earnings", "sum"), total_redeemed_amount=("redeemed_amount", "sum"), total_net_earnings=("net_earnings", "sum"), ) # Calculating additional columns summary_df["total_roi"] = ( summary_df["total_net_earnings"] / summary_df["total_investment"] ) summary_df["mean_mech_calls_per_trade"] = ( summary_df["num_mech_calls"] / summary_df["num_trades"] ) summary_df["mean_mech_fee_amount_per_trade"] = ( summary_df["total_mech_fees"] / summary_df["num_trades"] ) summary_df["total_net_earnings_wo_mech_fees"] = ( summary_df["total_net_earnings"] + summary_df["total_mech_fees"] ) summary_df["total_roi_wo_mech_fees"] = ( summary_df["total_net_earnings_wo_mech_fees"] / summary_df["total_investment"] ) # Resetting index to include trader_address summary_df.reset_index(inplace=True) return summary_df def run_profitability_analysis( rpc: str, tools_filename: str = "tools.parquet", trades_filename: str = "fpmmTrades.parquet", from_timestamp: float = DEFAULT_60_DAYS_AGO_TIMESTAMP, ): """Create all trades analysis.""" # load dfs from data folder for analysis print(f"Preparing data with {tools_filename} and {trades_filename}") fpmmTrades, tools = prepare_profitalibity_data( rpc, tools_filename, trades_filename, from_timestamp ) tools["trader_address"] = tools["trader_address"].str.lower() # all trades profitability df print("Analysing trades...") all_trades_df = analyse_all_traders(fpmmTrades, tools) # filter invalid markets. Condition: "is_invalid" is True invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True] invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False) all_trades_df = all_trades_df.loc[all_trades_df["is_invalid"] == False] # summarize profitability df print("Summarising trades...") summary_df = summary_analyse(all_trades_df) # save to parquet all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False) summary_df.to_parquet(DATA_DIR / "summary_profitability.parquet", index=False) print("Done!") return all_trades_df, summary_df if __name__ == "__main__": rpc = "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a" if os.path.exists(DATA_DIR / "fpmmTrades.parquet"): os.remove(DATA_DIR / "fpmmTrades.parquet") run_profitability_analysis(rpc)