|
import logging |
|
import os |
|
import pickle |
|
from datetime import datetime |
|
from concurrent.futures import ThreadPoolExecutor |
|
from tqdm import tqdm |
|
from web3 import Web3 |
|
import pandas as pd |
|
from pathlib import Path |
|
from functools import partial |
|
from markets import ( |
|
etl as mkt_etl, |
|
DEFAULT_FILENAME as MARKETS_FILENAME, |
|
) |
|
from tools import ( |
|
DEFAULT_FILENAME as TOOLS_FILENAME, |
|
generate_tools_file, |
|
) |
|
from profitability import run_profitability_analysis |
|
from utils import get_question, current_answer, RPC |
|
from get_mech_info import ( |
|
get_mech_events_last_60_days, |
|
) |
|
from update_tools_accuracy import compute_tools_accuracy |
|
import gc |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
SCRIPTS_DIR = Path(__file__).parent |
|
ROOT_DIR = SCRIPTS_DIR.parent |
|
DATA_DIR = ROOT_DIR / "data" |
|
|
|
|
|
def block_number_to_timestamp(block_number: int, web3: Web3) -> str: |
|
"""Convert a block number to a timestamp.""" |
|
block = web3.eth.get_block(block_number) |
|
timestamp = datetime.utcfromtimestamp(block["timestamp"]) |
|
return timestamp.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list: |
|
"""Parallelize the timestamp conversion.""" |
|
block_numbers = df["request_block"].tolist() |
|
with ThreadPoolExecutor(max_workers=10) as executor: |
|
results = list( |
|
tqdm(executor.map(function, block_numbers), total=len(block_numbers)) |
|
) |
|
return results |
|
|
|
|
|
def add_current_answer(): |
|
|
|
fpmms = pd.read_parquet(DATA_DIR / MARKETS_FILENAME) |
|
tools = pd.read_parquet(DATA_DIR / TOOLS_FILENAME) |
|
|
|
|
|
logging.info("Getting the question and current answer for the tools") |
|
tools["title"] = tools["prompt_request"].apply(lambda x: get_question(x)) |
|
tools["currentAnswer"] = tools["title"].apply(lambda x: current_answer(x, fpmms)) |
|
|
|
tools["currentAnswer"] = tools["currentAnswer"].str.replace("yes", "Yes") |
|
tools["currentAnswer"] = tools["currentAnswer"].str.replace("no", "No") |
|
|
|
tools.to_parquet(DATA_DIR / TOOLS_FILENAME, index=False) |
|
del fpmms |
|
|
|
|
|
def updating_timestamps(rpc: str): |
|
web3 = Web3(Web3.HTTPProvider(rpc)) |
|
|
|
tools = pd.read_parquet(DATA_DIR / TOOLS_FILENAME) |
|
|
|
|
|
logging.info("Converting block number to timestamp") |
|
t_map = pickle.load(open(DATA_DIR / "t_map.pkl", "rb")) |
|
tools["request_time"] = tools["request_block"].map(t_map) |
|
|
|
no_data = tools["request_time"].isna().sum() |
|
logging.info(f"Total rows with no request time info = {no_data}") |
|
|
|
|
|
missing_time_indices = tools[tools["request_time"].isna()].index |
|
if not missing_time_indices.empty: |
|
partial_block_number_to_timestamp = partial( |
|
block_number_to_timestamp, web3=web3 |
|
) |
|
missing_timestamps = parallelize_timestamp_conversion( |
|
tools.loc[missing_time_indices], partial_block_number_to_timestamp |
|
) |
|
|
|
|
|
for i, timestamp in zip(missing_time_indices, missing_timestamps): |
|
tools.at[i, "request_time"] = timestamp |
|
|
|
tools["request_month_year"] = pd.to_datetime(tools["request_time"]).dt.strftime( |
|
"%Y-%m" |
|
) |
|
tools["request_month_year_week"] = ( |
|
pd.to_datetime(tools["request_time"]).dt.to_period("W").astype(str) |
|
) |
|
|
|
|
|
tools.to_parquet(DATA_DIR / TOOLS_FILENAME, index=False) |
|
|
|
|
|
new_timestamps = ( |
|
tools[["request_block", "request_time"]] |
|
.dropna() |
|
.set_index("request_block") |
|
.to_dict()["request_time"] |
|
) |
|
t_map.update(new_timestamps) |
|
|
|
with open(DATA_DIR / "t_map.pkl", "wb") as f: |
|
pickle.dump(t_map, f) |
|
|
|
|
|
del tools |
|
del t_map |
|
gc.collect() |
|
|
|
|
|
def weekly_analysis(): |
|
"""Run weekly analysis for the FPMMS project.""" |
|
rpc = RPC |
|
|
|
logging.info("Running markets ETL") |
|
mkt_etl(MARKETS_FILENAME) |
|
logging.info("Markets ETL completed") |
|
|
|
|
|
logging.info("Generating the mech json files") |
|
get_mech_events_last_60_days() |
|
logging.info("Finished generating the mech json files") |
|
|
|
|
|
logging.info("Generate and parse the tools content") |
|
generate_tools_file() |
|
logging.info("Tools ETL completed") |
|
|
|
|
|
logging.info("Running profitability analysis") |
|
if os.path.exists(DATA_DIR / "fpmmTrades.parquet"): |
|
os.remove(DATA_DIR / "fpmmTrades.parquet") |
|
run_profitability_analysis( |
|
rpc=rpc, |
|
) |
|
|
|
logging.info("Profitability analysis completed") |
|
add_current_answer() |
|
try: |
|
updating_timestamps(rpc) |
|
except Exception as e: |
|
logging.error("Error while updating timestamps of tools") |
|
print(e) |
|
|
|
compute_tools_accuracy() |
|
|
|
logging.info("Weekly analysis files generated and saved") |
|
|
|
|
|
if __name__ == "__main__": |
|
weekly_analysis() |
|
|
|
|
|
|
|
|