Zscore_Crypto / zscore_backtest.py
gjin10969
initialize
e97cf97
import pandas as pd
import ccxt
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import json
from datetime import datetime, timedelta
import pytz
import os
images_folder = '/home/gjin/Documents/zscore/images'
os.makedirs(images_folder, exist_ok=True)
# Prompt for the symbol and time frame
symbols = input('Please input Symbol: ')
timeframe = input("Please input time frame: ")
# Initialize Binance Futures API
binance = ccxt.binance({
'options': {'defaultType': 'future'}, # Specify futures
})
from tqdm import tqdm
import pandas as pd
def fetch_and_calculate_zscore(symbol, timeframe, since, limit=200, rolling_window=30):
data = binance.fetch_ohlcv(symbol, timeframe=timeframe, since=since, limit=limit)
df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# Convert timestamp to UTC datetime format
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
# Calculate rolling mean, std, and Z-Score
df['mean'] = df['close'].rolling(window=rolling_window).mean()
df['std'] = df['close'].rolling(window=rolling_window).std()
df['z_score'] = (df['close'] - df['mean']) / df['std']
# Initialize signal columns
df['buy_signal'] = 0
df['sell_signal'] = 0
# Variables to track thresholds and state
in_sell_signal = False
in_buy_signal = False
crossed_threshold = False # Track if the extreme threshold has been crossed (2 or -2)
# Iterate through the dataframe to track signals with tqdm progress bar
for i in tqdm(range(1, len(df)), desc="Processing Z-score signals"):
current_z = df.loc[i, 'z_score']
# Track when Z-score crosses the thresholds (2 and -2)
if current_z > 2 and not crossed_threshold: # If Z-score exceeds 2
crossed_threshold = True
in_sell_signal = True # Trigger sell signal
elif current_z < -2 and not crossed_threshold: # If Z-score falls below -2
crossed_threshold = True
in_buy_signal = True # Trigger buy signal
# Maintain sell signal between 1 and 2
if in_sell_signal:
if 1 <= current_z <= 2:
df.loc[i, 'sell_signal'] = 1 # Sell signal active
# Exit sell signal if Z-score falls below 1
elif current_z < 1:
in_sell_signal = False
crossed_threshold = False # Reset threshold crossing
# Maintain buy signal between -2 and -1
if in_buy_signal:
if -2 <= current_z <= -1:
df.loc[i, 'buy_signal'] = 1 # Buy signal active
# Exit buy signal if Z-score rises above -1
elif current_z > -1:
in_buy_signal = False
crossed_threshold = False # Reset threshold crossing
return df
# Convert time to local timezone (Philippine Time)
utc_time = datetime.utcnow()
philippine_tz = pytz.timezone('Asia/Manila')
philippine_time = pytz.utc.localize(utc_time).astimezone(philippine_tz)
# Format the time in your preferred format
formatted_ph_time = philippine_time.strftime("%Y-%m-%d %H:%M:%S")
latest_data = []
def update_signal_json(symbol, df, json_data):
# Extract the latest data point from the DataFrame
global latest_data
latest_data = df.iloc[-1] # Update to get the last row
# Get the timestamp from the latest data and format it
timestamp = latest_data['timestamp'].strftime("%Y-%m-%d %H:%M:%S") if isinstance(latest_data['timestamp'], pd.Timestamp) else str(latest_data['timestamp'])
# Check if the latest Z-score has a signal
signal_status = "True" if latest_data['buy_signal'] == 1 or latest_data['sell_signal'] == 1 else "False"
# Prepare new entry with real-time Z-Score
signal_entry = {
"symbol": symbol,
"time_frame": timeframe, # Make sure `timeframe` is defined or passed to this function
"date_and_time": timestamp, # Correct timestamp for the entry
"realtime_ph_time": formatted_ph_time, # Add the local Philippine time (UTC+8)
"current_price": latest_data['close'], # Closing price for the most recent entry
"zscore": latest_data['z_score'], # Z-Score value
"detection": signal_status # Add signal status
}
# Append the new data to the json_data list
json_data.append(signal_entry)
return json_data
def plot_data(btcdom_df, pair_df, btc_df):
fig, ax = plt.subplots(figsize=(14, 7))
# Clear previous plots
ax.clear()
# Plot Z-Scores for all pairs
ax.plot(btcdom_df['timestamp'], btcdom_df['z_score'], label="BTCDOM/USDT Z-Score", color='blue', linestyle='-')
ax.plot(pair_df['timestamp'], pair_df['z_score'], label=f"{symbols}/USDT Z-Score", color='orange', linestyle='-')
ax.plot(btc_df['timestamp'], btc_df['z_score'], label="BTC/USDT Z-Score", color='gray', linestyle='-')
# Add thresholds
ax.axhline(y=2, color='red', linestyle='--', label='Overbought Threshold')
ax.axhline(y=-2, color='green', linestyle='--', label='Oversold Threshold')
# Plot Buy and Sell signals for BTCDOM/USDT
ax.scatter(btcdom_df[btcdom_df['buy_signal'] == 1]['timestamp'], btcdom_df[btcdom_df['buy_signal'] == 1]['z_score'],
marker='^', color='green', label='BTCDOM Buy Signal')
ax.scatter(btcdom_df[btcdom_df['sell_signal'] == 1]['timestamp'], btcdom_df[btcdom_df['sell_signal'] == 1]['z_score'],
marker='v', color='red', label='BTCDOM Sell Signal')
# Plot signals for the other pair
ax.scatter(pair_df[pair_df['buy_signal'] == 1]['timestamp'], pair_df[pair_df['buy_signal'] == 1]['z_score'],
marker='^', color='green', alpha=0.5, label=f"{symbols} Buy Signal")
ax.scatter(pair_df[pair_df['sell_signal'] == 1]['timestamp'], pair_df[pair_df['sell_signal'] == 1]['z_score'],
marker='v', color='red', alpha=0.5, label=f"{symbols} Sell Signal")
# Format plot
ax.set_title(f"Z-Scores Signals {timeframe} for {symbols}/USDT Futures", fontsize=16)
ax.set_xlabel("Time (UTC)", fontsize=12)
ax.set_ylabel("Z-Score", fontsize=12)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m-%d %H:%M"))
ax.legend(loc="upper left")
ax.grid(True)
plt.xticks(rotation=45)
# Save each plot with unique filename
global latest_data
plot_filename = f"/home/gjin/Documents/zscore/images/zscore_plot_{symbols}_{latest_data['timestamp'].strftime('%Y%m%d_%H%M%S')}.png"
plt.savefig(plot_filename)
plt.close(fig) # Close the figure to prevent memory issues
# plt.close(fig) # Close the figure to prevent memory issues
# Function to run historical data processing
def run_historical():
json_data = []
try:
with open(f'signals_{symbols}.json', 'r') as file:
json_data = json.load(file)
except FileNotFoundError:
pass
# Set start and end dates for the loop
start_date = datetime(2024, 9, 1)
end_date = datetime(2024, 11, 27)
# Loop through each month in the date range (or week, depending on your choice)
current_date = start_date
while current_date < end_date:
# Set 'since' to the start of each month or week (whichever you prefer)
since = binance.parse8601(current_date.strftime('%Y-%m-%dT%H:%M:%SZ'))
btcdom_symbol = 'BTCDOM/USDT'
pair_symbol = f'{symbols}/USDT'
btc_symbol = 'BTC/USDT'
# Fetch and process data
btcdom_df = fetch_and_calculate_zscore(btcdom_symbol, timeframe, since)
pair_df = fetch_and_calculate_zscore(pair_symbol, timeframe, since)
btc_df = fetch_and_calculate_zscore(btc_symbol, timeframe, since)
# Update signals and append to JSON
json_data = update_signal_json(pair_symbol, pair_df, json_data)
json_data = update_signal_json(btc_symbol, btc_df, json_data)
json_data = update_signal_json(btcdom_symbol, btcdom_df, json_data)
# Save updated signals to JSON
with open(f'signals{symbols}.json', 'w') as file:
json.dump(json_data, file, indent=4)
# Plot the data and save each plot separately
plot_data(btcdom_df, pair_df, btc_df)
# Move to the next chunk (next month/week)
current_date += timedelta(hours=4)
# Start historical data processing
run_historical()