# -*- coding: utf-8 -*- # ------------------------------------------------------------------------------ # # Copyright 2023 Valory AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # ------------------------------------------------------------------------------ import time import datetime import pandas as pd from typing import Any from enum import Enum from tqdm import tqdm import numpy as np import os from web3_utils import query_conditional_tokens_gc_subgraph from get_mech_info import ( DATETIME_60_DAYS_AGO, update_fpmmTrades_parquet, update_tools_parquet, update_all_trades_parquet, ) from utils import ( wei_to_unit, convert_hex_to_int, JSON_DATA_DIR, DATA_DIR, DEFAULT_MECH_FEE, ) from staking import label_trades_by_staking from nr_mech_calls import ( create_unknown_traders_df, transform_to_datetime, compute_mech_calls_based_on_timestamps, ) DUST_THRESHOLD = 10000000000000 INVALID_ANSWER = -1 DEFAULT_60_DAYS_AGO_TIMESTAMP = (DATETIME_60_DAYS_AGO).timestamp() WXDAI_CONTRACT_ADDRESS = "0xe91D153E0b41518A2Ce8Dd3D7944Fa863463a97d" DUST_THRESHOLD = 10000000000000 class MarketState(Enum): """Market state""" OPEN = 1 PENDING = 2 FINALIZING = 3 ARBITRATING = 4 CLOSED = 5 def __str__(self) -> str: """Prints the market status.""" return self.name.capitalize() class MarketAttribute(Enum): """Attribute""" NUM_TRADES = "Num_trades" WINNER_TRADES = "Winner_trades" NUM_REDEEMED = "Num_redeemed" INVESTMENT = "Investment" FEES = "Fees" MECH_CALLS = "Mech_calls" MECH_FEES = "Mech_fees" EARNINGS = "Earnings" NET_EARNINGS = "Net_earnings" REDEMPTIONS = "Redemptions" ROI = "ROI" def __str__(self) -> str: """Prints the attribute.""" return self.value def __repr__(self) -> str: """Prints the attribute representation.""" return self.name @staticmethod def argparse(s: str) -> "MarketAttribute": """Performs string conversion to MarketAttribute.""" try: return MarketAttribute[s.upper()] except KeyError as e: raise ValueError(f"Invalid MarketAttribute: {s}") from e ALL_TRADES_STATS_DF_COLS = [ "trader_address", "market_creator", "trade_id", "creation_timestamp", "title", "market_status", "collateral_amount", "outcome_index", "trade_fee_amount", "outcomes_tokens_traded", "current_answer", "is_invalid", "winning_trade", "earnings", "redeemed", "redeemed_amount", "num_mech_calls", "mech_fee_amount", "net_earnings", "roi", ] SUMMARY_STATS_DF_COLS = [ "trader_address", "num_trades", "num_winning_trades", "num_redeemed", "total_investment", "total_trade_fees", "num_mech_calls", "total_mech_fees", "total_earnings", "total_redeemed_amount", "total_net_earnings", "total_net_earnings_wo_mech_fees", "total_roi", "total_roi_wo_mech_fees", "mean_mech_calls_per_trade", "mean_mech_fee_amount_per_trade", ] def _is_redeemed(user_json: dict[str, Any], fpmmTrade: dict[str, Any]) -> bool: """Returns whether the user has redeemed the position.""" user_positions = user_json["data"]["user"]["userPositions"] condition_id = fpmmTrade["fpmm.condition.id"] for position in user_positions: position_condition_ids = position["position"]["conditionIds"] balance = int(position["balance"]) if condition_id in position_condition_ids: if balance == 0: return True # return early return False return False def prepare_profitalibity_data( rpc: str, tools_filename: str, trades_filename: str, ) -> pd.DataFrame: """Prepare data for profitalibity analysis.""" # Check if tools.parquet is in the same directory try: tools = pd.read_parquet(DATA_DIR / tools_filename) # make sure creator_address is in the columns assert "trader_address" in tools.columns, "trader_address column not found" # lowercase and strip creator_address tools["trader_address"] = tools["trader_address"].str.lower().str.strip() tools.drop_duplicates( subset=["request_id", "request_block"], keep="last", inplace=True ) tools.to_parquet(DATA_DIR / tools_filename) print(f"{tools_filename} loaded") except FileNotFoundError: print("tools.parquet not found. Please run tools.py first.") return # Check if fpmmTrades.parquet is in the same directory print("Reading the trades file") try: fpmmTrades = pd.read_parquet(DATA_DIR / trades_filename) except FileNotFoundError: print(f"Error reading {trades_filename} file .") # make sure trader_address is in the columns assert "trader_address" in fpmmTrades.columns, "trader_address column not found" # lowercase and strip creator_address fpmmTrades["trader_address"] = fpmmTrades["trader_address"].str.lower().str.strip() return fpmmTrades def determine_market_status(trade, current_answer): """Determine the market status of a trade.""" if (current_answer is np.nan or current_answer is None) and time.time() >= int( trade["fpmm.openingTimestamp"] ): return MarketState.PENDING elif current_answer is np.nan or current_answer is None: return MarketState.OPEN elif trade["fpmm.isPendingArbitration"]: return MarketState.ARBITRATING elif time.time() < int(trade["fpmm.answerFinalizedTimestamp"]): return MarketState.FINALIZING return MarketState.CLOSED def analyse_trader( trader_address: str, fpmmTrades: pd.DataFrame, tools: pd.DataFrame, trader_estimated_mech_calls: pd.DataFrame, daily_info: bool = False, ) -> pd.DataFrame: """Analyse a trader's trades""" fpmmTrades["creation_timestamp"] = pd.to_datetime(fpmmTrades["creationTimestamp"]) fpmmTrades["creation_date"] = fpmmTrades["creation_timestamp"].dt.date # Filter trades and tools for the given trader trades = fpmmTrades[fpmmTrades["trader_address"] == trader_address] # Prepare the DataFrame trades_df = pd.DataFrame(columns=ALL_TRADES_STATS_DF_COLS) if trades.empty: return trades_df # Fetch user's conditional tokens gc graph try: user_json = query_conditional_tokens_gc_subgraph(trader_address) except Exception as e: print(f"Error fetching user data: {e}") return trades_df # Iterate over the trades for i, trade in tqdm(trades.iterrows(), total=len(trades), desc="Analysing trades"): try: market_answer = trade["fpmm.currentAnswer"] trading_day = trade["creation_date"] trade_id = trade["id"] if not daily_info and not market_answer: print(f"Skipping trade {i} because currentAnswer is NaN") continue # Parsing and computing shared values collateral_amount = wei_to_unit(float(trade["collateralAmount"])) fee_amount = wei_to_unit(float(trade["feeAmount"])) outcome_tokens_traded = wei_to_unit(float(trade["outcomeTokensTraded"])) earnings, winner_trade = (0, False) redemption = _is_redeemed(user_json, trade) current_answer = market_answer if market_answer else None market_creator = trade["market_creator"] # Determine market status market_status = determine_market_status(trade, current_answer) # Skip non-closed markets if not daily_info and market_status != MarketState.CLOSED: print( f"Skipping trade {i} because market is not closed. Market Status: {market_status}" ) continue if current_answer is not None: current_answer = convert_hex_to_int(current_answer) # Compute invalidity is_invalid = current_answer == INVALID_ANSWER # Compute earnings and winner trade status if current_answer is None: earnings = 0.0 winner_trade = None elif is_invalid: earnings = collateral_amount winner_trade = False elif int(trade["outcomeIndex"]) == current_answer: earnings = outcome_tokens_traded winner_trade = True # Compute mech calls using the title, and trade id if daily_info: total_mech_calls = trader_estimated_mech_calls.loc[ (trader_estimated_mech_calls["trading_day"] == trading_day), "total_mech_calls", ].iloc[0] else: total_mech_calls = trader_estimated_mech_calls.loc[ (trader_estimated_mech_calls["market"] == trade["title"]) & (trader_estimated_mech_calls["trade_id"] == trade_id), "mech_calls_per_trade", ].iloc[0] net_earnings = ( earnings - fee_amount - (total_mech_calls * DEFAULT_MECH_FEE) - collateral_amount ) # Assign values to DataFrame trades_df.loc[i] = { "trader_address": trader_address, "market_creator": market_creator, "trade_id": trade["id"], "market_status": market_status.name, "creation_timestamp": trade["creationTimestamp"], "title": trade["title"], "collateral_amount": collateral_amount, "outcome_index": trade["outcomeIndex"], "trade_fee_amount": fee_amount, "outcomes_tokens_traded": outcome_tokens_traded, "current_answer": current_answer, "is_invalid": is_invalid, "winning_trade": winner_trade, "earnings": earnings, "redeemed": redemption, "redeemed_amount": earnings if redemption else 0, "num_mech_calls": total_mech_calls, "mech_fee_amount": total_mech_calls * DEFAULT_MECH_FEE, "net_earnings": net_earnings, "roi": net_earnings / ( collateral_amount + fee_amount + total_mech_calls * DEFAULT_MECH_FEE ), } except Exception as e: print(f"Error processing trade {i}: {e}") print(trade) continue return trades_df def analyse_all_traders( trades: pd.DataFrame, tools: pd.DataFrame, estimated_mech_calls: pd.DataFrame, daily_info: bool = False, ) -> pd.DataFrame: """Analyse all creators.""" all_traders = [] for trader in tqdm( trades["trader_address"].unique(), total=len(trades["trader_address"].unique()), desc="Analysing creators", ): trader_estimated_mech_calls = estimated_mech_calls.loc[ estimated_mech_calls["trader_address"] == trader ] all_traders.append( analyse_trader( trader, trades, tools, trader_estimated_mech_calls, daily_info ) ) # concat all creators all_creators_df = pd.concat(all_traders) return all_creators_df def summary_analyse(df): """Summarise profitability analysis.""" # Ensure DataFrame is not empty if df.empty: return pd.DataFrame(columns=SUMMARY_STATS_DF_COLS) # Group by trader_address grouped = df.groupby("trader_address") # Create summary DataFrame summary_df = grouped.agg( num_trades=("trader_address", "size"), num_winning_trades=("winning_trade", lambda x: float((x).sum())), num_redeemed=("redeemed", lambda x: float(x.sum())), total_investment=("collateral_amount", "sum"), total_trade_fees=("trade_fee_amount", "sum"), num_mech_calls=("num_mech_calls", "sum"), total_mech_fees=("mech_fee_amount", "sum"), total_earnings=("earnings", "sum"), total_redeemed_amount=("redeemed_amount", "sum"), total_net_earnings=("net_earnings", "sum"), ) # Calculating additional columns summary_df["total_roi"] = ( summary_df["total_net_earnings"] / summary_df["total_investment"] ) summary_df["mean_mech_calls_per_trade"] = ( summary_df["num_mech_calls"] / summary_df["num_trades"] ) summary_df["mean_mech_fee_amount_per_trade"] = ( summary_df["total_mech_fees"] / summary_df["num_trades"] ) summary_df["total_net_earnings_wo_mech_fees"] = ( summary_df["total_net_earnings"] + summary_df["total_mech_fees"] ) summary_df["total_roi_wo_mech_fees"] = ( summary_df["total_net_earnings_wo_mech_fees"] / summary_df["total_investment"] ) # Resetting index to include trader_address summary_df.reset_index(inplace=True) return summary_df def run_profitability_analysis( rpc: str, tools_filename: str, trades_filename: str, merge: bool = False, ): """Create all trades analysis.""" # load dfs from data folder for analysis print(f"Preparing data with {tools_filename} and {trades_filename}") fpmmTrades = prepare_profitalibity_data(rpc, tools_filename, trades_filename) if merge: update_tools_parquet(rpc, tools_filename) tools = pd.read_parquet(DATA_DIR / "tools.parquet") fpmmTrades["creationTimestamp"] = fpmmTrades["creationTimestamp"].apply( lambda x: transform_to_datetime(x) ) print("Computing the estimated mech calls dataset") trade_mech_calls = compute_mech_calls_based_on_timestamps( fpmmTrades=fpmmTrades, tools=tools ) print(trade_mech_calls.total_mech_calls.describe()) print("Analysing trades...") all_trades_df = analyse_all_traders(fpmmTrades, tools, trade_mech_calls) # # merge previous files if requested if merge: update_fpmmTrades_parquet(trades_filename) all_trades_df = update_all_trades_parquet(all_trades_df) # debugging purposes all_trades_df.to_parquet(JSON_DATA_DIR / "all_trades_df.parquet", index=False) # filter invalid markets. Condition: "is_invalid" is True invalid_trades = all_trades_df.loc[all_trades_df["is_invalid"] == True] if len(invalid_trades) == 0: print("No new invalid trades") else: if merge: try: print("Merging invalid trades parquet file") old_invalid_trades = pd.read_parquet( DATA_DIR / "invalid_trades.parquet" ) merge_df = pd.concat( [old_invalid_trades, invalid_trades], ignore_index=True ) invalid_trades = merge_df.drop_duplicates() except Exception as e: print(f"Error updating the invalid trades parquet {e}") invalid_trades.to_parquet(DATA_DIR / "invalid_trades.parquet", index=False) all_trades_df = all_trades_df.loc[all_trades_df["is_invalid"] == False] # add staking labels label_trades_by_staking(trades_df=all_trades_df) # create the unknown traders dataset unknown_traders_df, all_trades_df = create_unknown_traders_df( trades_df=all_trades_df ) unknown_traders_df.to_parquet(DATA_DIR / "unknown_traders.parquet", index=False) # save to parquet all_trades_df.to_parquet(DATA_DIR / "all_trades_profitability.parquet", index=False) # summarize profitability df print("Summarising trades...") summary_df = summary_analyse(all_trades_df) summary_df.to_parquet(DATA_DIR / "summary_profitability.parquet", index=False) print("Done!") return all_trades_df, summary_df if __name__ == "__main__": rpc = "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a" if os.path.exists(DATA_DIR / "fpmmTrades.parquet"): os.remove(DATA_DIR / "fpmmTrades.parquet") run_profitability_analysis(rpc)