Spaces:
Runtime error
Runtime error
import logging | |
import re | |
import os | |
import pickle | |
from datetime import datetime | |
from concurrent.futures import ThreadPoolExecutor | |
from tqdm import tqdm | |
from web3 import Web3 | |
from typing import Optional | |
import pandas as pd | |
from pathlib import Path | |
from functools import partial | |
from markets import ( | |
etl as mkt_etl, | |
DEFAULT_FILENAME as MARKETS_FILENAME, | |
) | |
from tools import ( | |
etl as tools_etl, | |
DEFAULT_FILENAME as TOOLS_FILENAME, | |
) | |
from profitability import run_profitability_analysis | |
import gc | |
logging.basicConfig(level=logging.INFO) | |
SCRIPTS_DIR = Path(__file__).parent | |
ROOT_DIR = SCRIPTS_DIR.parent | |
DATA_DIR = ROOT_DIR / "data" | |
def get_question(text: str) -> str: | |
"""Get the question from a text.""" | |
# Regex to find text within double quotes | |
pattern = r'"([^"]*)"' | |
# Find all occurrences | |
questions = re.findall(pattern, text) | |
# Assuming you want the first question if there are multiple | |
question = questions[0] if questions else None | |
return question | |
def current_answer(text: str, fpmms: pd.DataFrame) -> Optional[str]: | |
"""Get the current answer for a question.""" | |
row = fpmms[fpmms['title'] == text] | |
if row.shape[0] == 0: | |
return None | |
return row['currentAnswer'].values[0] | |
def block_number_to_timestamp(block_number: int, web3: Web3) -> str: | |
"""Convert a block number to a timestamp.""" | |
block = web3.eth.get_block(block_number) | |
timestamp = datetime.utcfromtimestamp(block['timestamp']) | |
return timestamp.strftime('%Y-%m-%d %H:%M:%S') | |
def parallelize_timestamp_conversion(df: pd.DataFrame, function: callable) -> list: | |
"""Parallelize the timestamp conversion.""" | |
block_numbers = df['request_block'].tolist() | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
results = list(tqdm(executor.map(function, block_numbers), total=len(block_numbers))) | |
return results | |
def weekly_analysis(): | |
"""Run weekly analysis for the FPMMS project.""" | |
rpc = "https://lb.nodies.app/v1/406d8dcc043f4cb3959ed7d6673d311a" | |
web3 = Web3(Web3.HTTPProvider(rpc)) | |
# Run markets ETL | |
logging.info("Running markets ETL") | |
mkt_etl(MARKETS_FILENAME) | |
logging.info("Markets ETL completed") | |
# Run tools ETL | |
logging.info("Running tools ETL") | |
tools_etl( | |
rpcs=[rpc], | |
filename=TOOLS_FILENAME, | |
full_contents=True, | |
) | |
logging.info("Tools ETL completed") | |
# Run profitability analysis | |
logging.info("Running profitability analysis") | |
if os.path.exists(DATA_DIR / "fpmmTrades.parquet"): | |
os.remove(DATA_DIR / "fpmmTrades.parquet") | |
run_profitability_analysis( | |
rpc=rpc, | |
) | |
logging.info("Profitability analysis completed") | |
# Get currentAnswer from FPMMS | |
fpmms = pd.read_parquet(DATA_DIR / MARKETS_FILENAME) | |
tools = pd.read_parquet(DATA_DIR / TOOLS_FILENAME) | |
# Get the question from the tools | |
logging.info("Getting the question and current answer for the tools") | |
tools['title'] = tools['prompt_request'].apply(lambda x: get_question(x)) | |
tools['currentAnswer'] = tools['title'].apply(lambda x: current_answer(x, fpmms)) | |
tools['currentAnswer'] = tools['currentAnswer'].str.replace('yes', 'Yes') | |
tools['currentAnswer'] = tools['currentAnswer'].str.replace('no', 'No') | |
# Convert block number to timestamp | |
logging.info("Converting block number to timestamp") | |
t_map = pickle.load(open(DATA_DIR / "t_map.pkl", "rb")) | |
tools['request_time'] = tools['request_block'].map(t_map) | |
# Identify tools with missing request_time and fill them | |
missing_time_indices = tools[tools['request_time'].isna()].index | |
if not missing_time_indices.empty: | |
partial_block_number_to_timestamp = partial(block_number_to_timestamp, web3=web3) | |
missing_timestamps = parallelize_timestamp_conversion(tools.loc[missing_time_indices], partial_block_number_to_timestamp) | |
# Update the original DataFrame with the missing timestamps | |
for i, timestamp in zip(missing_time_indices, missing_timestamps): | |
tools.at[i, 'request_time'] = timestamp | |
tools['request_month_year'] = pd.to_datetime(tools['request_time']).dt.strftime('%Y-%m') | |
tools['request_month_year_week'] = pd.to_datetime(tools['request_time']).dt.to_period('W').astype(str) | |
# Save the tools | |
tools.to_parquet(DATA_DIR / TOOLS_FILENAME, index=False) | |
# Update t_map with new timestamps | |
new_timestamps = tools[['request_block', 'request_time']].dropna().set_index('request_block').to_dict()['request_time'] | |
t_map.update(new_timestamps) | |
with open(DATA_DIR / "t_map.pkl", "wb") as f: | |
pickle.dump(t_map, f) | |
# clean and release all memory | |
del tools | |
del fpmms | |
del t_map | |
gc.collect() | |
logging.info("Weekly analysis files generated and saved") | |
if __name__ == "__main__": | |
weekly_analysis() | |