import logging import os import pickle from datetime import datetime from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm from web3 import Web3 import pandas as pd from pathlib import Path from functools import partial from markets import ( etl as mkt_etl, DEFAULT_FILENAME as MARKETS_FILENAME, ) from tools import ( etl as tools_etl, DEFAULT_FILENAME as TOOLS_FILENAME, update_tools_accuracy, ) from profitability import run_profitability_analysis from utils import get_question, current_answer, RPC from get_mech_info import get_mech_info_last_60_days from update_tools_accuracy import compute_tools_accuracy import gc logging.basicConfig(level=logging.INFO) SCRIPTS_DIR = Path(__file__).parent ROOT_DIR = SCRIPTS_DIR.parent DATA_DIR = ROOT_DIR / "data" def block_number_to_timestamp(block_number: int, web3: Web3) -> str: """Convert a block number to a timestamp.""" block = web3.eth.get_block(block_number) timestamp = datetime.utcfromtimestamp(block["timestamp"]) return timestamp.strftime("%Y-%m-%d %H:%M:%S") def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list: """Parallelize the timestamp conversion.""" block_numbers = df["request_block"].tolist() with ThreadPoolExecutor(max_workers=10) as executor: results = list( tqdm(executor.map(function, block_numbers), total=len(block_numbers)) ) return results def add_current_answer(): # Get currentAnswer from FPMMS fpmms = pd.read_parquet(DATA_DIR / MARKETS_FILENAME) tools = pd.read_parquet(DATA_DIR / TOOLS_FILENAME) # Get the question from the tools logging.info("Getting the question and current answer for the tools") tools["title"] = tools["prompt_request"].apply(lambda x: get_question(x)) tools["currentAnswer"] = tools["title"].apply(lambda x: current_answer(x, fpmms)) tools["currentAnswer"] = tools["currentAnswer"].str.replace("yes", "Yes") tools["currentAnswer"] = tools["currentAnswer"].str.replace("no", "No") # Save the tools data after the updates on the content tools.to_parquet(DATA_DIR / TOOLS_FILENAME, index=False) del fpmms def updating_timestamps(rpc: str): web3 = Web3(Web3.HTTPProvider(rpc)) tools = pd.read_parquet(DATA_DIR / TOOLS_FILENAME) # Convert block number to timestamp logging.info("Converting block number to timestamp") t_map = pickle.load(open(DATA_DIR / "t_map.pkl", "rb")) tools["request_time"] = tools["request_block"].map(t_map) no_data = tools["request_time"].isna().sum() logging.info(f"Total rows with no request time info = {no_data}") # Identify tools with missing request_time and fill them missing_time_indices = tools[tools["request_time"].isna()].index if not missing_time_indices.empty: partial_block_number_to_timestamp = partial( block_number_to_timestamp, web3=web3 ) missing_timestamps = parallelize_timestamp_conversion( tools.loc[missing_time_indices], partial_block_number_to_timestamp ) # Update the original DataFrame with the missing timestamps for i, timestamp in zip(missing_time_indices, missing_timestamps): tools.at[i, "request_time"] = timestamp tools["request_month_year"] = pd.to_datetime(tools["request_time"]).dt.strftime( "%Y-%m" ) tools["request_month_year_week"] = ( pd.to_datetime(tools["request_time"]).dt.to_period("W").astype(str) ) # Save the tools data after the updates on the content tools.to_parquet(DATA_DIR / TOOLS_FILENAME, index=False) # Update t_map with new timestamps new_timestamps = ( tools[["request_block", "request_time"]] .dropna() .set_index("request_block") .to_dict()["request_time"] ) t_map.update(new_timestamps) with open(DATA_DIR / "t_map.pkl", "wb") as f: pickle.dump(t_map, f) # clean and release all memory del tools del t_map gc.collect() def weekly_analysis(): """Run weekly analysis for the FPMMS project.""" rpc = RPC # Run markets ETL logging.info("Running markets ETL") mkt_etl(MARKETS_FILENAME) logging.info("Markets ETL completed") # Run tools ETL logging.info("Running tools ETL") # This etl is saving already the tools parquet file tools_etl( rpcs=[rpc], mech_info=get_mech_info_last_60_days(), filename=TOOLS_FILENAME, ) logging.info("Tools ETL completed") # Run profitability analysis logging.info("Running profitability analysis") if os.path.exists(DATA_DIR / "fpmmTrades.parquet"): os.remove(DATA_DIR / "fpmmTrades.parquet") run_profitability_analysis( rpc=rpc, ) logging.info("Profitability analysis completed") add_current_answer() try: updating_timestamps(rpc) except Exception as e: logging.error("Error while updating timestamps of tools") print(e) compute_tools_accuracy() logging.info("Weekly analysis files generated and saved") if __name__ == "__main__": weekly_analysis() # rpc = RPC # updating_timestamps(rpc)