Spaces:
Runtime error
Runtime error
import calendar | |
from datetime import datetime | |
from typing import List | |
import ccxt | |
import numpy as np | |
import pandas as pd | |
from meta.data_processors._base import _Base | |
# from basic_processor import _Base | |
class Ccxt(_Base): | |
def __init__( | |
self, | |
data_source: str, | |
start_date: str, | |
end_date: str, | |
time_interval: str, | |
**kwargs, | |
): | |
super().__init__(data_source, start_date, end_date, time_interval, **kwargs) | |
self.binance = ccxt.binance() | |
def download_data( | |
self, ticker_list: List[str], save_path: str = "./data/dataset.csv" | |
): | |
crypto_column = pd.MultiIndex.from_product( | |
[ticker_list, ["open", "high", "low", "close", "volume"]] | |
) | |
first_time = True | |
for ticker in ticker_list: | |
start_dt = datetime.strptime(self.start_date, "%Y%m%d %H:%M:%S") | |
end_dt = datetime.strptime(self.end_date, "%Y%m%d %H:%M:%S") | |
start_timestamp = calendar.timegm(start_dt.utctimetuple()) | |
end_timestamp = calendar.timegm(end_dt.utctimetuple()) | |
if self.time_interval == "1Min": | |
date_list = [ | |
datetime.utcfromtimestamp(float(time)) | |
for time in range(start_timestamp, end_timestamp, 60 * 720) | |
] | |
else: | |
date_list = [ | |
datetime.utcfromtimestamp(float(time)) | |
for time in range(start_timestamp, end_timestamp, 60 * 1440) | |
] | |
df = self.ohlcv(date_list, ticker, self.time_interval) | |
if first_time: | |
dataset = pd.DataFrame(columns=crypto_column, index=df["time"].values) | |
first_time = False | |
temp_col = pd.MultiIndex.from_product( | |
[[ticker], ["open", "high", "low", "close", "volume"]] | |
) | |
dataset[temp_col] = df[["open", "high", "low", "close", "volume"]].values | |
print("Actual end time: " + str(df["time"].values[-1])) | |
self.dataframe = dataset | |
self.save_data(save_path) | |
print( | |
f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}" | |
) | |
# def add_technical_indicators(self, df, pair_list, tech_indicator_list = [ | |
# 'macd', 'boll_ub', 'boll_lb', 'rsi_30', 'dx_30', | |
# 'close_30_sma', 'close_60_sma']): | |
# df = df.dropna() | |
# df = df.copy() | |
# column_list = [pair_list, ['open','high','low','close','volume']+(tech_indicator_list)] | |
# column = pd.MultiIndex.from_product(column_list) | |
# index_list = df.index | |
# dataset = pd.DataFrame(columns=column,index=index_list) | |
# for pair in pair_list: | |
# pair_column = pd.MultiIndex.from_product([[pair],['open','high','low','close','volume']]) | |
# dataset[pair_column] = df[pair] | |
# temp_df = df[pair].reset_index().sort_values(by=['index']) | |
# temp_df = temp_df.rename(columns={'index':'date'}) | |
# crypto_df = Sdf.retype(temp_df.copy()) | |
# for indicator in tech_indicator_list: | |
# temp_indicator = crypto_df[indicator].values.tolist() | |
# dataset[(pair,indicator)] = temp_indicator | |
# print('Succesfully add technical indicators') | |
# return dataset | |
def df_to_ary(self, pair_list, tech_indicator_list=None): | |
if tech_indicator_list is None: | |
tech_indicator_list = [ | |
"macd", | |
"boll_ub", | |
"boll_lb", | |
"rsi_30", | |
"dx_30", | |
"close_30_sma", | |
"close_60_sma", | |
] | |
df = self.dataframe | |
df = df.dropna() | |
date_ary = df.index.values | |
price_array = df[pd.MultiIndex.from_product([pair_list, ["close"]])].values | |
tech_array = df[ | |
pd.MultiIndex.from_product([pair_list, tech_indicator_list]) | |
].values | |
return price_array, tech_array, date_ary | |
def min_ohlcv(self, dt, pair, limit): | |
since = calendar.timegm(dt.utctimetuple()) * 1000 | |
return self.binance.fetch_ohlcv( | |
symbol=pair, timeframe="1m", since=since, limit=limit | |
) | |
def ohlcv(self, dt, pair, period="1d"): | |
ohlcv = [] | |
limit = 1000 | |
if period == "1Min": | |
limit = 720 | |
elif period == "1D": | |
limit = 1 | |
elif period == "1H": | |
limit = 24 | |
elif period == "5Min": | |
limit = 288 | |
for i in dt: | |
start_dt = i | |
since = calendar.timegm(start_dt.utctimetuple()) * 1000 | |
if period == "1Min": | |
ohlcv.extend(self.min_ohlcv(start_dt, pair, limit)) | |
else: | |
ohlcv.extend( | |
self.binance.fetch_ohlcv( | |
symbol=pair, timeframe=period, since=since, limit=limit | |
) | |
) | |
df = pd.DataFrame( | |
ohlcv, columns=["time", "open", "high", "low", "close", "volume"] | |
) | |
df["time"] = [datetime.fromtimestamp(float(time) / 1000) for time in df["time"]] | |
df["open"] = df["open"].astype(np.float64) | |
df["high"] = df["high"].astype(np.float64) | |
df["low"] = df["low"].astype(np.float64) | |
df["close"] = df["close"].astype(np.float64) | |
df["volume"] = df["volume"].astype(np.float64) | |
return df | |