|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import functools |
|
import warnings |
|
from string import Template |
|
from typing import Optional, Generator, Callable |
|
|
|
import pandas as pd |
|
import requests |
|
from tqdm import tqdm |
|
|
|
from typing import List, Dict |
|
from pathlib import Path |
|
from utils import SUBGRAPH_API_KEY |
|
from queries import ( |
|
FPMMS_QUERY, |
|
ID_FIELD, |
|
DATA_FIELD, |
|
ANSWER_FIELD, |
|
QUERY_FIELD, |
|
TITLE_FIELD, |
|
OUTCOMES_FIELD, |
|
ERROR_FIELD, |
|
QUESTION_FIELD, |
|
FPMMS_FIELD, |
|
) |
|
|
|
ResponseItemType = List[Dict[str, str]] |
|
SubgraphResponseType = Dict[str, ResponseItemType] |
|
|
|
|
|
CREATOR = "0x89c5cc945dd550BcFfb72Fe42BfF002429F46Fec" |
|
PEARL_CREATOR = "0xFfc8029154ECD55ABED15BD428bA596E7D23f557" |
|
BATCH_SIZE = 1000 |
|
|
|
OMEN_SUBGRAPH_URL = Template( |
|
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" |
|
) |
|
|
|
MAX_UINT_HEX = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" |
|
DEFAULT_FILENAME = "fpmms.parquet" |
|
SCRIPTS_DIR = Path(__file__).parent |
|
ROOT_DIR = SCRIPTS_DIR.parent |
|
DATA_DIR = ROOT_DIR / "data" |
|
|
|
|
|
class RetriesExceeded(Exception): |
|
"""Exception to raise when retries are exceeded during data-fetching.""" |
|
|
|
def __init__( |
|
self, msg="Maximum retries were exceeded while trying to fetch the data!" |
|
): |
|
super().__init__(msg) |
|
|
|
|
|
def hacky_retry(func: Callable, n_retries: int = 3) -> Callable: |
|
"""Create a hacky retry strategy. |
|
Unfortunately, we cannot use `requests.packages.urllib3.util.retry.Retry`, |
|
because the subgraph does not return the appropriate status codes in case of failure. |
|
Instead, it always returns code 200. Thus, we raise exceptions manually inside `make_request`, |
|
catch those exceptions in the hacky retry decorator and try again. |
|
Finally, if the allowed number of retries is exceeded, we raise a custom `RetriesExceeded` exception. |
|
|
|
:param func: the input request function. |
|
:param n_retries: the maximum allowed number of retries. |
|
:return: The request method with the hacky retry strategy applied. |
|
""" |
|
|
|
@functools.wraps(func) |
|
def wrapper_hacky_retry(*args, **kwargs) -> SubgraphResponseType: |
|
"""The wrapper for the hacky retry. |
|
|
|
:return: a response dictionary. |
|
""" |
|
retried = 0 |
|
|
|
while retried <= n_retries: |
|
try: |
|
if retried > 0: |
|
warnings.warn(f"Retrying {retried}/{n_retries}...") |
|
|
|
return func(*args, **kwargs) |
|
except (ValueError, ConnectionError) as e: |
|
warnings.warn(e.args[0]) |
|
finally: |
|
retried += 1 |
|
|
|
raise RetriesExceeded() |
|
|
|
return wrapper_hacky_retry |
|
|
|
|
|
@hacky_retry |
|
def query_subgraph(url: str, query: str, key: str) -> SubgraphResponseType: |
|
"""Query a subgraph. |
|
|
|
Args: |
|
url: the subgraph's URL. |
|
query: the query to be used. |
|
key: the key to use in order to access the required data. |
|
|
|
Returns: |
|
a response dictionary. |
|
""" |
|
content = {QUERY_FIELD: query} |
|
headers = { |
|
"Accept": "application/json", |
|
"Content-Type": "application/json", |
|
} |
|
res = requests.post(url, json=content, headers=headers) |
|
|
|
if res.status_code != 200: |
|
raise ConnectionError( |
|
"Something went wrong while trying to communicate with the subgraph " |
|
f"(Error: {res.status_code})!\n{res.text}" |
|
) |
|
|
|
body = res.json() |
|
if ERROR_FIELD in body.keys(): |
|
raise ValueError(f"The given query is not correct: {body[ERROR_FIELD]}") |
|
|
|
data = body.get(DATA_FIELD, {}).get(key, None) |
|
if data is None: |
|
raise ValueError(f"Unknown error encountered!\nRaw response: \n{body}") |
|
|
|
return data |
|
|
|
|
|
def fpmms_fetcher(trader_category: str) -> Generator[ResponseItemType, int, None]: |
|
"""An indefinite fetcher for the FPMMs.""" |
|
omen_subgraph = OMEN_SUBGRAPH_URL.substitute(subgraph_api_key=SUBGRAPH_API_KEY) |
|
print(f"omen_subgraph = {omen_subgraph}") |
|
|
|
if trader_category == "pearl": |
|
creator_id = PEARL_CREATOR |
|
else: |
|
creator_id = CREATOR |
|
while True: |
|
fpmm_id = yield |
|
fpmms_query = FPMMS_QUERY.substitute( |
|
creator=creator_id, |
|
fpmm_id=fpmm_id, |
|
fpmms_field=FPMMS_FIELD, |
|
first=BATCH_SIZE, |
|
id_field=ID_FIELD, |
|
answer_field=ANSWER_FIELD, |
|
question_field=QUESTION_FIELD, |
|
outcomes_field=OUTCOMES_FIELD, |
|
title_field=TITLE_FIELD, |
|
) |
|
print(f"markets query = {fpmms_query}") |
|
yield query_subgraph(omen_subgraph, fpmms_query, FPMMS_FIELD) |
|
|
|
|
|
def fetch_qs_fpmms() -> pd.DataFrame: |
|
"""Fetch all the fpmms of the creator.""" |
|
latest_id = "" |
|
fpmms = [] |
|
trader_category = "quickstart" |
|
print(f"Getting markets for {trader_category}") |
|
fetcher = fpmms_fetcher(trader_category) |
|
for _ in tqdm(fetcher, unit="fpmms", unit_scale=BATCH_SIZE): |
|
batch = fetcher.send(latest_id) |
|
if len(batch) == 0: |
|
break |
|
|
|
latest_id = batch[-1].get(ID_FIELD, "") |
|
if latest_id == "": |
|
raise ValueError(f"Unexpected data format retrieved: {batch}") |
|
|
|
fpmms.extend(batch) |
|
|
|
return pd.DataFrame(fpmms) |
|
|
|
|
|
def fetch_pearl_fpmms() -> pd.DataFrame: |
|
"""Fetch all the fpmms of the creator.""" |
|
latest_id = "" |
|
fpmms = [] |
|
trader_category = "pearl" |
|
print(f"Getting markets for {trader_category}") |
|
fetcher = fpmms_fetcher(trader_category) |
|
for _ in tqdm(fetcher, unit="fpmms", unit_scale=BATCH_SIZE): |
|
batch = fetcher.send(latest_id) |
|
if len(batch) == 0: |
|
break |
|
|
|
latest_id = batch[-1].get(ID_FIELD, "") |
|
if latest_id == "": |
|
raise ValueError(f"Unexpected data format retrieved: {batch}") |
|
|
|
fpmms.extend(batch) |
|
|
|
return pd.DataFrame(fpmms) |
|
|
|
|
|
def get_answer(fpmm: pd.Series) -> str: |
|
"""Get an answer from its index, using Series of an FPMM.""" |
|
return fpmm[QUESTION_FIELD][OUTCOMES_FIELD][fpmm[ANSWER_FIELD]] |
|
|
|
|
|
def transform_fpmms(fpmms: pd.DataFrame) -> pd.DataFrame: |
|
"""Transform an FPMMS dataframe.""" |
|
transformed = fpmms.dropna() |
|
transformed = transformed.drop_duplicates([ID_FIELD]) |
|
transformed = transformed.loc[transformed[ANSWER_FIELD] != MAX_UINT_HEX] |
|
transformed.loc[:, ANSWER_FIELD] = ( |
|
transformed[ANSWER_FIELD].str.slice(-1).astype(int) |
|
) |
|
transformed.loc[:, ANSWER_FIELD] = transformed.apply(get_answer, axis=1) |
|
transformed = transformed.drop(columns=[QUESTION_FIELD]) |
|
|
|
return transformed |
|
|
|
|
|
def etl(filename: Optional[str] = None) -> pd.DataFrame: |
|
"""Fetch, process, store and return the markets as a Dataframe.""" |
|
qs_fpmms = fetch_qs_fpmms() |
|
qs_fpmms = transform_fpmms(qs_fpmms) |
|
qs_fpmms["market_creator"] = "quickstart" |
|
print(f"Results for the market creator quickstart. Len = {len(qs_fpmms)}") |
|
|
|
pearl_fpmms = fetch_pearl_fpmms() |
|
pearl_fpmms = transform_fpmms(pearl_fpmms) |
|
pearl_fpmms["market_creator"] = "pearl" |
|
print(f"Results for the market creator quickstart. Len = {len(pearl_fpmms)}") |
|
fpmms = pd.concat([qs_fpmms, pearl_fpmms], ignore_index=True) |
|
|
|
if filename: |
|
fpmms.to_parquet(DATA_DIR / filename, index=False) |
|
|
|
return fpmms |
|
|
|
|
|
if __name__ == "__main__": |
|
etl("all_fpmms.parquet") |
|
|