import pandas as pd import streamlit as st import gspread from google.oauth2.service_account import Credentials import ast import requests from datetime import datetime import boto3 import json # Define the scope start = False starting_position = [] tradeHistory_positions = [] s3 = boto3.resource( service_name = 's3', region_name = 'ap-south-1', aws_access_key_id = 'AKIA3TD2SOLYZML62HJR', aws_secret_access_key ='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' ) bucket_name = 'usdsmcoinmdata' # File mapping for options file_mapping = { "usdm": "usdm_trade_history.csv", "coinm": "coinm_trade_history.csv", "copyLeaderboard": "copyLeaderboard_trade_history.csv", "okx": "okx_history.csv", "buybit": "buybit.csv" } # Streamlit App st.title("Trade History Viewer") # Dropdown to select the trade history option = st.selectbox("Choose the trade history to display:", list(file_mapping.keys())) # Fetch and display the corresponding trade history if option: file_key = file_mapping[option] try: # Fetch the file from S3 obj = s3.Bucket(bucket_name).Object(file_key).get() # Read the CSV into a DataFrame df = pd.read_csv(obj['Body'], index_col=False) # Display the DataFrame in Streamlit st.write(f"Displaying data for: **{option}**") except Exception as e: st.error(f"Error fetching the file: {str(e)}") df2 = pd.read_csv('df.csv') def convert_str_to_list_or_keep(value): if isinstance(value, str): try: return ast.literal_eval(value) except (SyntaxError, ValueError): return value else: return value df = df.apply(lambda col: col.map(convert_str_to_list_or_keep)) df2 = df2.apply(lambda col: col.map(convert_str_to_list_or_keep)) df['positionClosed'] = False if option =="copyLeaderboard": uid_input = int(st.text_input("Enter U_IDs to filter")) if option =="buybit": uid_input = (st.text_input("Enter U_IDs to filter")) else: uid_input = str.upper(st.text_input("Enter U_IDs to filter")) option2 = st.radio("Choose an option:", ["Show Position History", "Show Live Positions"]) if df is not None and uid_input: if option2 == "Show Position History": st.title("Position History Viewer") # Display starting positions with clickable rows st.header("Starting Positions") filtered_df = df[df['U_IDs'] == uid_input].copy() if not filtered_df.empty: trade_list = filtered_df['trade_history'].iloc[0] else: st.write("No data found for the provided U_ID.") unique_lists = [] def get_amounts_from_positions_and_closed_trades(data): # Check if 'Modified' key exists and extract amounts if 'Modified' in data: modified_positions = data['Modified'] # modified_positions = modified_positions[0] if isinstance(modified_positions, dict) and 'amount' in modified_positions: if option =="okx": print("reached") amount = modified_positions.get('amount')/modified_positions.get('markPrice') else: amount = modified_positions.get('amount') if isinstance(amount, (int, float)): # Check if amount is a number amounts =amount # Check if 'ClosedTrades' key exists and extract amounts if 'ClosedTrades' in data: closed_trades = data['ClosedTrades'] closed_trades =closed_trades[0] if isinstance(closed_trades, dict) and 'amount' in closed_trades: if option == "okx": amount = closed_trades.get('amount')/closed_trades.get('markPrice') else: amount = closed_trades.get('amount') if isinstance(amount, (int, float)): # Check if amount is a number amounts = amount return amounts def get_symbols_from_positions_and_closed_trades(data): # Check if 'Modified' key exists and extract symbols if 'Modified' in data: modified_positions = data['Modified'] # modified_positions =modified_positions if isinstance(modified_positions, dict) and 'symbol' in modified_positions: symbol = modified_positions['symbol'] # Check if 'ClosedTrades' key exists and extract symbols if 'ClosedTrades' in data: closed_trades = data['ClosedTrades'] closed_trades =closed_trades[0] if isinstance(closed_trades, dict) and 'symbol' in closed_trades: symbol = closed_trades['symbol'] return symbol for i in range(len(trade_list)): if trade_list[i]=="none": continue if not trade_list: # Check if the trade_list is empty st.header("No data found, this may not be in the leaderboard") if start ==False: st.subheader(f"Data is from {datetime.now()}") start =True foundCLosed = False changeInAmount = 0 if 'symbol' in trade_list[i]: symbol = trade_list[i]['symbol'] side ="buy" if trade_list[i]['amount']>0 else "sell" amount = trade_list[i]['amount'] if option =="okx": amount = trade_list[i]['amount']/trade_list[i]['markPrice'] else: symbol = trade_list[i]['symbol'] trade_list[i]['side'] =side trade_list[i]['changeInAmount'] = changeInAmount trade_list[i]['i'] = i unique_lists.append({"position":trade_list[i]}) trade_list[i] = "none" else: if 'positions' in trade_list[i]: reached = False # Collect necessary data first before modifying the dictionary for k, v in list(trade_list[i].items()): # Convert to a list to avoid modifying during iteration for entry in v: if 'NewPosition' in entry: new_position = entry.get('NewPosition', {}) # Extract symbol and amount symbol = new_position.get('symbol') if option =="okx": amount = new_position.get('amount')/new_position.get('markPrice') else: amount = new_position.get('amount') if option != "copyLeaderBoard": if start==False: start_time = new_position.get('updateTime') year = start_time[0] month = start_time[1] day = start_time[2] hour =start_time[3] minute =start_time[4] seconds = start_time[5] dt = datetime(year, month, day, hour, minute, seconds) human_readable_format = dt.strftime('%B %d, %Y, %I:%M:%S %p') st.subheader(f"Data from {human_readable_format}") start=True # if start==False: # # start =True side = "buy" if amount > 0 else "sell" new_position['side'] = side new_position['changeInAmount'] = changeInAmount new_position['i'] = i # Update the entry with the modified 'NewPosition' entry['NewPosition'] = new_position # Append the updated trade_list[i] to unique_lists unique_lists.append(trade_list[i]) reached = True # Now safely modify the dictionary after iteration is complete if reached: trade_list[i] = "none" # Now safely modify the dictionary after iteration is complete for j in range(i+1, len(trade_list)): if trade_list[j] == "none": continue if 'positions' in trade_list[j] and isinstance(trade_list[j]['positions'], list): for position in trade_list[j]['positions']: # Check if 'Modified' is in the position and is a dict if 'Modified' in position and isinstance(position['Modified'], dict): if option!="copyLeaderboard": if start==False: for k,v in position.items(): start_time = v['updateTime'] year = start_time[0] month = start_time[1] day = start_time[2] hour =start_time[3] minute =start_time[4] seconds = start_time[5] dt = datetime(year, month, day, hour, minute, seconds) human_readable_format = dt.strftime('%d-%m-%Y %H:%M:%S') st.subheader(f"Data from {human_readable_format}") start=True modified_amount = get_amounts_from_positions_and_closed_trades(position) modified_symbol = get_symbols_from_positions_and_closed_trades(position) if modified_amount > 0: modified_side = "buy" else: modified_side = "sell" if symbol == modified_symbol and side == modified_side: if start ==False: st.header(f"Data is from {datetime.now}") start =True position['Modified']['side'] = modified_side position['Modified']['changeInAmount'] = amount - modified_amount if modified_amount < 0 else modified_amount - amount position['Modified']['i'] = i amount = modified_amount unique_lists.append(trade_list[j]) trade_list[j] = "none" # Check if 'ClosedTrades' is in the position and is a tuple if 'ClosedTrades' in position and isinstance(position['ClosedTrades'], tuple): if start ==False: st.header(f"Data is from {datetime.now}") start =True foundCLosed = False closed_trades_tuple = position['ClosedTrades'] closed_trades_dict = { 'trade_info': closed_trades_tuple[0], 'side': closed_trades_tuple[1] } closed_amount = get_amounts_from_positions_and_closed_trades(position) closed_symbol = get_symbols_from_positions_and_closed_trades(position) if closed_amount > 0: closed_side = "buy" else: closed_side = "sell" if symbol == closed_symbol and side == closed_side: if option!= "copyLeaderboard": if start==False: for k,v in position.items(): start_time = v['updateTime'] start =True closed_trades_dict['side'] = closed_side trade_info = closed_trades_dict['trade_info'] trade_info['changeInAmount'] = amount - closed_amount if closed_amount < 0 else closed_amount - amount amount = closed_amount closed_trades_dict['trade_info']['i'] = i # Store index 'i' inside 'ClosedTrades' closed_trades_dict['trade_info']['closed'] = True # Append the updated trade_list[j] to unique_lists unique_lists.append(trade_list[j]) trade_list[j] = "none" foundCLosed = True break # Break the inner loop if a closed trade was found if foundCLosed: break for k in range(len(unique_lists)): data = unique_lists[k] if k ==0: if 'positions' in data: if isinstance(data['positions'], list): for a in data['positions']: if 'NewPosition' in a: position_data = a['NewPosition'] starting_position.append(position_data) tradeHistory_positions.append(position_data) else: if 'position' in data: position_data =data['position'] starting_position.append(position_data) tradeHistory_positions.append(position_data) if 'positions' in data: if isinstance(data['positions'],list): for a in data['positions']: if 'ClosedTrades' in a: position_data = a['ClosedTrades'][0] tradeHistory_positions.append(position_data) if 'positions' in data: if isinstance(data['positions'],list): for a in data['positions']: if 'Modified' in a: position_data = a['Modified'] tradeHistory_positions.append(position_data) unique_lists =[] elif option2 == "Show Live Positions": filtered_df2 = df2[df2['U_IDs'] == uid_input] if not filtered_df2.empty: positions_list = filtered_df2['Positions'].iloc[0] # Extract the first match # Convert the list of dictionaries to a DataFrame if isinstance(positions_list, list) and positions_list: positions_df = pd.DataFrame(positions_list) st.subheader("Live Positions") st.dataframe(positions_df) else: st.write("No live positions data available for the given U_ID.") # data3 = sheet3.get_all_values() # headers3 = data3.pop(0) # df3 = pd.DataFrame(data3, columns=headers3) # filtered_df3 = df3[df3['U_IDs'] == uid_input] # st.subheader("Performace") # st.dataframe(filtered_df3) def show_position_history(selected_position): st.header(f"History for {selected_position}") # Filter trade history for the selected position position_history = [pos for pos in tradeHistory_positions if pos['i'] == selected_position] if position_history: df_history = pd.DataFrame(position_history) if option =="copyLeaderboard": df_history['changeInAmount'] = pd.to_numeric(df_history['changeInAmount'], errors='coerce') df_history['markPrice'] = pd.to_numeric(df_history['markPrice'], errors='coerce') df_history['entryPrice'] = pd.to_numeric(df_history['entryPrice'], errors='coerce') df_history['amount'] = pd.to_numeric(df_history['amount'],errors='coerce') # Replace NaN with 0 or handle as required df_history.fillna(0, inplace=True) # Update the global timestamp with the last update from history columns_to_check = [ 'symbol', 'side', 'amount', 'changeInAmount', 'markPrice', 'entryPrice', 'pnl', 'roe', 'leverage', 'updateTime', 'tradeType', 'stopLossPrice', 'takeProfitPrice', 'weightedScoreRatio' ] # Adding missing columns with None as default for column in columns_to_check: if column not in df_history.columns: df_history[column] = None # Create a transformed DataFrame for display df_transformed = pd.DataFrame({ 'Pair/Asset': df_history['symbol'], 'is long': df_history['side'], 'Current size after change': df_history['amount'], 'Change in size in Asset': df_history['changeInAmount'], 'Change in size in USDT': df_history['changeInAmount'] * -(df_history['markPrice']), 'Entry price': df_history['entryPrice'], 'Exit price': df_history['markPrice'], 'pnl in usdt': df_history['pnl'], 'pnl in %': df_history['roe'], 'Leverage': df_history['leverage'], 'updatedTime': df_history['updateTime'], 'Trade Type': df_history['tradeType'], # New field 'Stop Loss Price': df_history['stopLossPrice'], # New field 'Take Profit Price': df_history['takeProfitPrice'], # New field 'Weighted Score Ratio': df_history['weightedScoreRatio'], # New field 'Transaction Value in USDT': df_history['amount'] * df_history['markPrice'], # New calculation 'Profit/Loss Ratio': (df_history['markPrice'] - df_history['entryPrice']) / df_history['entryPrice'] # New calculation }) if option == "okx": df_transformed = pd.DataFrame({ 'Pair/Asset': df_history['symbol'], 'is long': df_history['side'], 'Current size after change': df_history['usdAmount'], 'Change in size in Asset': df_history['changeInAmount'], 'Change in size in USDT': df_history['changeInAmount'] * -(df_history['markPrice']), 'Entry price': df_history['entryPrice'], 'Exit price': df_history['markPrice'], 'pnl in usdt': df_history['pnl'], 'pnl in %': df_history['roe'], 'Leverage': df_history['leverage'], 'updatedTime': df_history['updateTime'] }) if 'closed' in df_history.columns: df_transformed['Position closed'] = df_history['closed'] st.dataframe(df_transformed) # Add the update timestamp to the transformed DataFrame else: st.write("No history found for this position.") def lastUpdated(selected_position): position_history = [pos for pos in tradeHistory_positions if pos['i'] == selected_position] return position_history[-1]['updateTime'] def isClosed(selected_position): # Filter trade history for the selected position position_history = [pos for pos in tradeHistory_positions if pos['i'] == selected_position] # Check if there are any records for the selected position if not position_history: return False # Get the most recent entry for the selected position last_entry = position_history[-1] # Check if the 'closed' key exists and if it indicates the position is closed return last_entry.get('closed', False) def main(): df_starting = pd.DataFrame(starting_position) for index, row in df_starting.iterrows(): side = True if row['amount'] > 0 else False is_closed = isClosed(row['i']) # Generate a unique key for the button button_key = f"position_{row['i']}" # Display a button for each trade position if option == "copyLeaderboard": if st.button( f"{row['symbol']} : Long: {side}, Entry Price: {row['entryPrice']}, " f"Market Price: {row['markPrice']}, Amount: {row['amount']}, " f"Leverage: {row['leverage']}, " f" isClosed: {is_closed}", key=button_key ): show_position_history(row['i']) if option =="okx": if st.button( f"{row['symbol']} : Long: {side}, Entry Price: {row['entryPrice']}, " f"Market Price: {row['markPrice']}, Amount: {row['usdAmount']}, " f"Leverage: {row['leverage']}, TradeTakenAt: {row['updateTime']}, " f"lastUpdated: {lastUpdated(row['i'])}, isClosed: {is_closed}", key=button_key ): show_position_history(row['i']) else: if st.button( f"{row['symbol']} : Long: {side}, Entry Price: {row['entryPrice']}, " f"Market Price: {row['markPrice']}, Amount: {row['amount']}, " f"Leverage: {row['leverage']}, TradeTakenAt: {row['updateTime']}, " f"lastUpdated: {lastUpdated(row['i'])}, isClosed: {is_closed}", key=button_key ): show_position_history(row['i']) if __name__ == "__main__": main()