arshy commited on
Commit
b7308a9
·
1 Parent(s): b7433f5

chunk based

Browse files
Files changed (1) hide show
  1. app.py +33 -9
app.py CHANGED
@@ -42,24 +42,48 @@ logger = get_logger()
42
 
43
  def get_last_one_month_data():
44
  con = duckdb.connect(':memory:')
45
- one_month_ago = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
46
 
47
- query1 = f"""
48
- SELECT *
49
- FROM read_parquet('./data/tools.parquet')
50
- WHERE request_time >= '{one_month_ago}'
51
- """
52
  query2 = f"""
53
  SELECT *
54
  FROM read_parquet('./data/all_trades_profitability.parquet')
55
- WHERE creation_timestamp >= '{one_month_ago}'
56
  """
57
- logger.info("Query 1: " + query1)
58
  logger.info("Query 2: " + query2)
59
  df2 = con.execute(query2).fetchdf()
60
  logger.info("here2")
61
- df1 = con.execute(query1).fetchdf()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
  logger.info("here3")
 
63
  con.close()
64
  return df1, df2
65
 
 
42
 
43
  def get_last_one_month_data():
44
  con = duckdb.connect(':memory:')
45
+ # one_month_ago = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
46
 
47
+ # Query to fetch data from all_trades_profitability.parquet
 
 
 
 
48
  query2 = f"""
49
  SELECT *
50
  FROM read_parquet('./data/all_trades_profitability.parquet')
 
51
  """
52
+
53
  logger.info("Query 2: " + query2)
54
  df2 = con.execute(query2).fetchdf()
55
  logger.info("here2")
56
+
57
+ # Reading tools.parquet in chunks
58
+ parquet_file_path = './data/tools.parquet'
59
+ chunk_size = 100000 # Adjust chunk size based on your memory capacity
60
+ offset = 0
61
+ all_filtered_data = []
62
+
63
+ while True:
64
+ # Read a chunk of data
65
+ query1 = f"""
66
+ SELECT *
67
+ FROM read_parquet('{parquet_file_path}', OFFSET {offset}, ROWS {chunk_size})
68
+ """
69
+
70
+ logger.info("Query 1: " + query1)
71
+ chunk_df = con.execute(query1).fetchdf()
72
+
73
+ # Break if the chunk is empty (end of file)
74
+ if chunk_df.empty:
75
+ break
76
+
77
+ # Collect the filtered chunk
78
+ all_filtered_data.append(chunk_df)
79
+
80
+ # Update offset
81
+ offset += chunk_size
82
+
83
+ # Concatenate all filtered chunks into a single DataFrame
84
+ df1 = pd.concat(all_filtered_data, ignore_index=True)
85
  logger.info("here3")
86
+
87
  con.close()
88
  return df1, df2
89