|
import gym |
|
from gym import spaces |
|
from gym.utils import seeding |
|
import numpy as np |
|
from enum import Enum |
|
import matplotlib.pyplot as plt |
|
|
|
|
|
class Actions(Enum): |
|
Sell = 0 |
|
Buy = 1 |
|
Do_nothing = 2 |
|
|
|
|
|
|
|
class TradingEnv(gym.Env): |
|
|
|
metadata = {'render.modes': ['human']} |
|
|
|
def __init__(self, df, window_size, frame_bound): |
|
assert df.ndim == 2 |
|
|
|
assert len(frame_bound) == 2 |
|
self.frame_bound = frame_bound |
|
|
|
self.seed() |
|
self.df = df |
|
self.window_size = window_size |
|
self.prices, self.signal_features = self._process_data() |
|
self.shape = (window_size, self.signal_features.shape[1]) |
|
|
|
|
|
self.action_space = spaces.Discrete(len(Actions)) |
|
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float64) |
|
|
|
|
|
self._start_tick = self.window_size |
|
self._end_tick = len(self.prices) - 1 |
|
self._done = None |
|
self._current_tick = None |
|
self._last_trade_tick = None |
|
self._position = None |
|
self._position_history = None |
|
self._total_reward = None |
|
self._total_profit = None |
|
self._first_rendering = None |
|
self.history = None |
|
|
|
|
|
self.trade_fee_bid_percent = 0.0005 |
|
self.trade_fee_ask_percent = 0.0005 |
|
|
|
|
|
def seed(self, seed=None): |
|
self.np_random, seed = seeding.np_random(seed) |
|
return [seed] |
|
|
|
|
|
def reset(self): |
|
self._done = False |
|
self._current_tick = self._start_tick |
|
self._last_trade_tick = self._current_tick - 1 |
|
self._position = 0 |
|
self._position_history = (self.window_size * [None]) |
|
|
|
self._total_reward = 0. |
|
self._total_profit = 0. |
|
self.history = {} |
|
return self._get_observation() |
|
|
|
|
|
def _calculate_reward(self, action): |
|
step_reward = 0 |
|
|
|
current_price = self.prices[self._current_tick] |
|
last_price = self.prices[self._current_tick - 1] |
|
price_diff = current_price - last_price |
|
|
|
|
|
if action == Actions.Buy.value and self._position == 0: |
|
self._position = 1 |
|
step_reward += price_diff |
|
self._last_trade_tick = self._current_tick - 1 |
|
self._position_history.append(1) |
|
|
|
elif action == Actions.Buy.value and self._position > 0: |
|
step_reward += 0 |
|
self._position_history.append(-1) |
|
|
|
elif action == Actions.Buy.value and self._position < 0: |
|
self._position = 0 |
|
step_reward += -1 * (self.prices[self._current_tick -1] - self.prices[self._last_trade_tick]) |
|
self._total_profit += step_reward |
|
self._position_history.append(4) |
|
|
|
|
|
elif action == Actions.Sell.value and self._position == 0: |
|
self._position = -1 |
|
step_reward += -1 * price_diff |
|
self._last_trade_tick = self._current_tick - 1 |
|
self._position_history.append(3) |
|
|
|
elif action == Actions.Sell.value and self._position > 0: |
|
self._position = 0 |
|
step_reward += self.prices[self._current_tick -1] - self.prices[self._last_trade_tick] |
|
self._total_profit += step_reward |
|
self._position_history.append(2) |
|
elif action == Actions.Sell.value and self._position < 0: |
|
step_reward += 0 |
|
self._position_history.append(-1) |
|
|
|
|
|
elif action == Actions.Do_nothing.value and self._position > 0: |
|
step_reward += price_diff |
|
self._position_history.append(0) |
|
elif action == Actions.Do_nothing.value and self._position < 0: |
|
step_reward += -1 * price_diff |
|
self._position_history.append(0) |
|
elif action == Actions.Do_nothing.value and self._position == 0: |
|
step_reward += -1 * abs(price_diff) |
|
self._position_history.append(0) |
|
|
|
return step_reward |
|
|
|
|
|
def step(self, action): |
|
self._done = False |
|
self._current_tick += 1 |
|
|
|
if self._current_tick == self._end_tick: |
|
self._done = True |
|
|
|
step_reward = self._calculate_reward(action) |
|
self._total_reward += step_reward |
|
|
|
observation = self._get_observation() |
|
info = dict( |
|
total_reward = self._total_reward, |
|
total_profit = self._total_profit, |
|
position = self._position |
|
) |
|
self._update_history(info) |
|
|
|
return observation, step_reward, self._done, info |
|
|
|
|
|
def _get_observation(self): |
|
return self.signal_features[(self._current_tick-self.window_size+1):self._current_tick+1] |
|
|
|
|
|
def _update_history(self, info): |
|
if not self.history: |
|
self.history = {key: [] for key in info.keys()} |
|
|
|
for key, value in info.items(): |
|
self.history[key].append(value) |
|
|
|
|
|
def render(self, mode='human'): |
|
window_ticks = np.arange(len(self._position_history)) |
|
plt.plot(self.prices) |
|
|
|
open_buy = [] |
|
close_buy = [] |
|
open_sell = [] |
|
close_sell = [] |
|
do_nothing = [] |
|
|
|
for i, tick in enumerate(window_ticks): |
|
if self._position_history[i] is None: |
|
continue |
|
|
|
if self._position_history[i] == 1: |
|
open_buy.append(tick) |
|
elif self._position_history[i] == 2 : |
|
close_buy.append(tick) |
|
elif self._position_history[i] == 3 : |
|
open_sell.append(tick) |
|
elif self._position_history[i] == 4 : |
|
close_sell.append(tick) |
|
elif self._position_history[i] == 0 : |
|
do_nothing.append(tick) |
|
|
|
plt.plot(open_buy, self.prices[open_buy], 'go', marker="^") |
|
plt.plot(close_buy, self.prices[close_buy], 'go', marker="v") |
|
plt.plot(open_sell, self.prices[open_sell], 'ro', marker="v") |
|
plt.plot(close_sell, self.prices[close_sell], 'ro', marker="^") |
|
|
|
plt.plot(do_nothing, self.prices[do_nothing], 'yo') |
|
|
|
plt.suptitle( |
|
"Total Reward: %.6f" % self._total_reward + ' ~ ' + |
|
"Total Profit: %.6f" % self._total_profit |
|
) |
|
|
|
|
|
def close(self): |
|
plt.close() |
|
|
|
|
|
def save_rendering(self, filepath): |
|
plt.savefig(filepath) |
|
|
|
|
|
def pause_rendering(self): |
|
plt.show() |
|
|
|
|
|
def _process_data(self): |
|
prices = self.df.loc[:, 'Close'].to_numpy() |
|
|
|
prices[self.frame_bound[0] - self.window_size] |
|
prices = prices[self.frame_bound[0]-self.window_size:self.frame_bound[1]] |
|
|
|
diff = np.insert(np.diff(prices), 0, 0) |
|
signal_features = np.column_stack((prices, diff)) |
|
|
|
return prices, signal_features |
|
|
|
|
|
def _update_profit(self, action): |
|
trade = False |
|
if ((action == Actions.Buy.value and self._position == Positions.Short) or |
|
(action == Actions.Sell.value and self._position == Positions.Long)): |
|
trade = True |
|
|
|
if trade or self._done: |
|
current_price = self.prices[self._current_tick] |
|
last_trade_price = self.prices[self._last_trade_tick] |
|
|
|
if self._position == Positions.Long: |
|
shares = (self._total_profit * (1 - self.trade_fee_ask_percent)) / last_trade_price |
|
self._total_profit = (shares * (1 - self.trade_fee_bid_percent)) * current_price |
|
|
|
|
|
def max_possible_profit(self): |
|
current_tick = self._start_tick |
|
last_trade_tick = current_tick - 1 |
|
profit = 1. |
|
|
|
while current_tick <= self._end_tick: |
|
position = None |
|
if self.prices[current_tick] < self.prices[current_tick - 1]: |
|
while (current_tick <= self._end_tick and |
|
self.prices[current_tick] < self.prices[current_tick - 1]): |
|
current_tick += 1 |
|
position = Positions.Short |
|
else: |
|
while (current_tick <= self._end_tick and |
|
self.prices[current_tick] >= self.prices[current_tick - 1]): |
|
current_tick += 1 |
|
position = Positions.Long |
|
|
|
if position == Positions.Long: |
|
current_price = self.prices[current_tick - 1] |
|
last_trade_price = self.prices[last_trade_tick] |
|
shares = profit / last_trade_price |
|
profit = shares * current_price |
|
last_trade_tick = current_tick - 1 |
|
|
|
return profit |
|
|