|
import logging |
|
import re |
|
import os |
|
import pickle |
|
from datetime import datetime |
|
from concurrent.futures import ThreadPoolExecutor |
|
from tqdm import tqdm |
|
from web3 import Web3 |
|
from typing import Optional |
|
import pandas as pd |
|
from pathlib import Path |
|
from functools import partial |
|
from markets import ( |
|
etl as mkt_etl, |
|
DEFAULT_FILENAME as MARKETS_FILENAME, |
|
) |
|
from tools import ( |
|
etl as tools_etl, |
|
DEFAULT_FILENAME as TOOLS_FILENAME, |
|
) |
|
from profitability import run_profitability_analysis |
|
import gc |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
SCRIPTS_DIR = Path(__file__).parent |
|
ROOT_DIR = SCRIPTS_DIR.parent |
|
DATA_DIR = ROOT_DIR / "data" |
|
|
|
def get_question(text: str) -> str: |
|
"""Get the question from a text.""" |
|
|
|
pattern = r'"([^"]*)"' |
|
|
|
|
|
questions = re.findall(pattern, text) |
|
|
|
|
|
question = questions[0] if questions else None |
|
|
|
return question |
|
|
|
|
|
def current_answer(text: str, fpmms: pd.DataFrame) -> Optional[str]: |
|
"""Get the current answer for a question.""" |
|
row = fpmms[fpmms['title'] == text] |
|
if row.shape[0] == 0: |
|
return None |
|
return row['currentAnswer'].values[0] |
|
|
|
|
|
def block_number_to_timestamp(block_number: int, web3: Web3) -> str: |
|
"""Convert a block number to a timestamp.""" |
|
block = web3.eth.get_block(block_number) |
|
timestamp = datetime.utcfromtimestamp(block['timestamp']) |
|
return timestamp.strftime('%Y-%m-%d %H:%M:%S') |
|
|
|
|
|
def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list: |
|
"""Parallelize the timestamp conversion.""" |
|
block_numbers = df['request_block'].tolist() |
|
with ThreadPoolExecutor(max_workers=10) as executor: |
|
results = list(tqdm(executor.map(function, block_numbers), total=len(block_numbers))) |
|
return results |
|
|
|
|
|
def weekly_analysis(): |
|
"""Run weekly analysis for the FPMMS project.""" |
|
rpc = "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a" |
|
web3 = Web3(Web3.HTTPProvider(rpc)) |
|
|
|
|
|
logging.info("Running markets ETL") |
|
mkt_etl(MARKETS_FILENAME) |
|
logging.info("Markets ETL completed") |
|
|
|
|
|
logging.info("Running tools ETL") |
|
tools_etl( |
|
rpcs=[rpc], |
|
filename=TOOLS_FILENAME, |
|
full_contents=True, |
|
) |
|
logging.info("Tools ETL completed") |
|
|
|
|
|
logging.info("Running profitability analysis") |
|
if os.path.exists(DATA_DIR / "fpmmTrades.csv"): |
|
os.remove(DATA_DIR / "fpmmTrades.csv") |
|
run_profitability_analysis( |
|
rpc=rpc, |
|
) |
|
logging.info("Profitability analysis completed") |
|
|
|
|
|
fpmms = pd.read_csv(DATA_DIR / MARKETS_FILENAME) |
|
tools = pd.read_csv(DATA_DIR / TOOLS_FILENAME) |
|
|
|
|
|
logging.info("Getting the question and current answer for the tools") |
|
tools['title'] = tools['prompt_request'].apply(lambda x: get_question(x)) |
|
tools['currentAnswer'] = tools['title'].apply(lambda x: current_answer(x, fpmms)) |
|
|
|
tools['currentAnswer'] = tools['currentAnswer'].str.replace('yes', 'Yes') |
|
tools['currentAnswer'] = tools['currentAnswer'].str.replace('no', 'No') |
|
|
|
|
|
logging.info("Converting block number to timestamp") |
|
t_map = pickle.load(open(DATA_DIR / "t_map.pkl", "rb")) |
|
tools['request_time'] = tools['request_block'].map(t_map) |
|
|
|
|
|
missing_time_indices = tools[tools['request_time'].isna()].index |
|
if not missing_time_indices.empty: |
|
partial_block_number_to_timestamp = partial(block_number_to_timestamp, web3=web3) |
|
missing_timestamps = parallelize_timestamp_conversion(tools.loc[missing_time_indices], partial_block_number_to_timestamp) |
|
|
|
|
|
for i, timestamp in zip(missing_time_indices, missing_timestamps): |
|
tools.at[i, 'request_time'] = timestamp |
|
|
|
tools['request_month_year'] = pd.to_datetime(tools['request_time']).dt.strftime('%Y-%m') |
|
tools['request_month_year_week'] = pd.to_datetime(tools['request_time']).dt.to_period('W').astype(str) |
|
|
|
|
|
tools.to_csv(DATA_DIR / TOOLS_FILENAME, index=False) |
|
|
|
|
|
new_timestamps = tools[['request_block', 'request_time']].dropna().set_index('request_block').to_dict()['request_time'] |
|
t_map.update(new_timestamps) |
|
|
|
with open(DATA_DIR / "t_map.pkl", "wb") as f: |
|
pickle.dump(t_map, f) |
|
|
|
del tools |
|
del fpmms |
|
del t_map |
|
gc.collect() |
|
|
|
logging.info("Weekly analysis files generated and saved") |
|
|
|
|
|
if __name__ == "__main__": |
|
weekly_analysis() |
|
|
|
|