|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Script for retrieving mech requests and their delivers.""" |
|
import json |
|
import time |
|
import pickle |
|
from typing import Any, Dict, List, Tuple |
|
from pathlib import Path |
|
import requests |
|
from gql import Client, gql |
|
from gql.transport.requests import RequestsHTTPTransport |
|
from tools import ( |
|
IPFS_POLL_INTERVAL, |
|
GET_CONTENTS_BATCH_SIZE, |
|
IRRELEVANT_TOOLS, |
|
create_session, |
|
request, |
|
) |
|
from tqdm import tqdm |
|
from markets import PEARL_CREATOR, CREATOR |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
NUM_WORKERS = 10 |
|
BLOCKS_CHUNK_SIZE = 10000 |
|
TEXT_ALIGNMENT = 30 |
|
MINIMUM_WRITE_FILE_DELAY_SECONDS = 20 |
|
MECH_FROM_BLOCK_RANGE = 50000 |
|
SCRIPTS_DIR = Path(__file__).parent |
|
ROOT_DIR = SCRIPTS_DIR.parent |
|
JSON_DATA_DIR = ROOT_DIR / "json_data" |
|
DATA_DIR = ROOT_DIR / "data" |
|
IPFS_ADDRESS = "https://gateway.autonolas.tech/ipfs/" |
|
THEGRAPH_ENDPOINT = "https://api.studio.thegraph.com/query/57238/mech/0.0.2" |
|
last_write_time = 0.0 |
|
|
|
REQUESTS_QUERY_FILTER = """ |
|
query requests_query($sender_not_in: [Bytes!], $id_gt: Bytes, $blockNumber_gte: BigInt, $blockNumber_lte: BigInt) { |
|
requests(where: {sender_not_in: $sender_not_in, id_gt: $id_gt, blockNumber_gte: $blockNumber_gte, blockNumber_lte: $blockNumber_lte}, orderBy: id, first: 1000) { |
|
blockNumber |
|
blockTimestamp |
|
id |
|
ipfsHash |
|
requestId |
|
sender |
|
transactionHash |
|
} |
|
} |
|
""" |
|
|
|
DELIVERS_QUERY_NO_FILTER = """ |
|
query delivers_query($id_gt: Bytes, $blockNumber_gte: BigInt, $blockNumber_lte: BigInt) { |
|
delivers(where: {id_gt: $id_gt, blockNumber_gte: $blockNumber_gte, blockNumber_lte: $blockNumber_lte}, orderBy: id, first: 1000) { |
|
blockNumber |
|
blockTimestamp |
|
id |
|
ipfsHash |
|
requestId |
|
sender |
|
transactionHash |
|
} |
|
} |
|
|
|
""" |
|
DELIVERS_QUERY = """ |
|
query delivers_query($requestId: BigInt, $blockNumber_gte: BigInt, $blockNumber_lte: BigInt) { |
|
delivers(where: {requestId: $requestId, blockNumber_gte: $blockNumber_gte, blockNumber_lte: $blockNumber_lte}, orderBy: blockNumber, first: 1000) { |
|
blockNumber |
|
blockTimestamp |
|
id |
|
ipfsHash |
|
requestId |
|
sender |
|
transactionHash |
|
} |
|
} |
|
""" |
|
|
|
MISSING_DELIVERS_QUERY = """ |
|
query delivers_query($requestId: BigInt, $blockNumber_gte: BigInt, $blockNumber_lte: BigInt) { |
|
delivers(where: {requestId: $requestId, blockNumber_gte: $blockNumber_gte, blockNumber_lte: $blockNumber_lte}, orderBy: blockNumber, first: 1000) { |
|
blockNumber |
|
blockTimestamp |
|
id |
|
ipfsHash |
|
requestId |
|
sender |
|
transactionHash |
|
} |
|
} |
|
""" |
|
|
|
|
|
def collect_all_mech_requests(from_block: int, to_block: int) -> Tuple: |
|
|
|
print(f"Fetching all mech requests from {from_block} to {to_block}") |
|
mech_requests = {} |
|
duplicated_reqIds = [] |
|
transport = RequestsHTTPTransport(url=THEGRAPH_ENDPOINT) |
|
client = Client(transport=transport, fetch_schema_from_transport=True) |
|
|
|
id_gt = "0x00" |
|
while True: |
|
variables = { |
|
"sender_not_in": [CREATOR, PEARL_CREATOR], |
|
"id_gt": id_gt, |
|
"blockNumber_gte": str(from_block), |
|
"blockNumber_lte": str(to_block), |
|
} |
|
try: |
|
response = client.execute( |
|
gql(REQUESTS_QUERY_FILTER), variable_values=variables |
|
) |
|
|
|
items = response.get("requests", []) |
|
|
|
if not items: |
|
break |
|
|
|
for mech_request in items: |
|
if mech_request["id"] not in mech_requests: |
|
mech_requests[mech_request["id"]] = mech_request |
|
else: |
|
duplicated_reqIds.append(mech_request["id"]) |
|
except Exception as e: |
|
print(f"Error while getting the response: {e}") |
|
|
|
id_gt = items[-1]["id"] |
|
time.sleep(IPFS_POLL_INTERVAL) |
|
print(f"New execution for id_gt = {id_gt}") |
|
if len(duplicated_reqIds) > 0: |
|
print(f"Number of duplicated req Ids = {len(duplicated_reqIds)}") |
|
save_json_file(mech_requests, "mech_requests.json") |
|
|
|
print(f"Number of requests = {len(mech_requests)}") |
|
print(f"Number of duplicated req Ids = {len(duplicated_reqIds)}") |
|
save_json_file(mech_requests, "mech_requests.json") |
|
return mech_requests, duplicated_reqIds |
|
|
|
|
|
def collect_all_mech_delivers(from_block: int, to_block: int) -> Tuple: |
|
|
|
print(f"Fetching all mech delivers from {from_block} to {to_block}") |
|
mech_delivers = {} |
|
duplicated_requestIds = [] |
|
transport = RequestsHTTPTransport(url=THEGRAPH_ENDPOINT) |
|
client = Client(transport=transport, fetch_schema_from_transport=True) |
|
to_block = ( |
|
to_block + MECH_FROM_BLOCK_RANGE |
|
) |
|
id_gt = "" |
|
while True: |
|
variables = { |
|
"id_gt": id_gt, |
|
"blockNumber_gte": str(from_block), |
|
"blockNumber_lte": str(to_block), |
|
} |
|
try: |
|
response = client.execute( |
|
gql(DELIVERS_QUERY_NO_FILTER), variable_values=variables |
|
) |
|
items = response.get("delivers", []) |
|
|
|
if not items: |
|
break |
|
|
|
for mech_deliver in items: |
|
if mech_deliver["requestId"] not in mech_delivers: |
|
mech_delivers[mech_deliver["requestId"]] = [mech_deliver] |
|
else: |
|
duplicated_requestIds.append(mech_deliver["requestId"]) |
|
|
|
mech_delivers[mech_deliver["requestId"]].append(mech_deliver) |
|
except Exception as e: |
|
print(f"Error while getting the response: {e}") |
|
return |
|
|
|
id_gt = items[-1]["id"] |
|
time.sleep(IPFS_POLL_INTERVAL) |
|
print(f"New execution for id_gt = {id_gt}") |
|
if len(duplicated_requestIds) > 0: |
|
print(f"Number of duplicated request id = {len(duplicated_requestIds)}") |
|
save_json_file(mech_delivers, "mech_delivers.json") |
|
print(f"Number of delivers = {len(mech_delivers)}") |
|
print(f"Number of duplicated request id = {len(duplicated_requestIds)}") |
|
save_json_file(mech_delivers, "mech_delivers.json") |
|
return mech_delivers, duplicated_requestIds |
|
|
|
|
|
def collect_missing_delivers(request_id: int, block_number: int) -> Dict[str, Any]: |
|
to_block = ( |
|
block_number + MECH_FROM_BLOCK_RANGE |
|
) |
|
print(f"Fetching all missing delivers from {block_number} to {to_block}") |
|
mech_delivers = {} |
|
transport = RequestsHTTPTransport(url=THEGRAPH_ENDPOINT) |
|
client = Client(transport=transport, fetch_schema_from_transport=True) |
|
|
|
variables = { |
|
"requestId": request_id, |
|
"blockNumber_gte": str(block_number), |
|
"blockNumber_lte": str(to_block), |
|
} |
|
try: |
|
response = client.execute( |
|
gql(MISSING_DELIVERS_QUERY), variable_values=variables |
|
) |
|
items = response.get("delivers", []) |
|
|
|
|
|
|
|
|
|
if items: |
|
return items[0] |
|
except Exception as e: |
|
print(f"Error while getting the response: {e}") |
|
|
|
return mech_delivers |
|
|
|
|
|
def populate_requests_ipfs_contents( |
|
session: requests.Session, mech_requests: Dict[str, Any], keys_to_traverse: list |
|
) -> dict: |
|
updated_dict = {} |
|
wrong_response_count = 0 |
|
for k in tqdm( |
|
keys_to_traverse, |
|
desc="Fetching IPFS contents for requests", |
|
position=1, |
|
unit="results", |
|
): |
|
mech_request = mech_requests[k] |
|
|
|
if "ipfsContents" not in mech_request: |
|
ipfs_hash = mech_request["ipfsHash"] |
|
url = f"{IPFS_ADDRESS}{ipfs_hash}/metadata.json" |
|
response = request(session, url) |
|
if response is None: |
|
tqdm.write(f"Skipping {mech_request=}. because response was None") |
|
wrong_response_count += 1 |
|
continue |
|
try: |
|
contents = response.json() |
|
if contents["tool"] in IRRELEVANT_TOOLS: |
|
continue |
|
mech_request["ipfsContents"] = contents |
|
except requests.exceptions.JSONDecodeError: |
|
tqdm.write( |
|
f"Skipping {mech_request} because of JSONDecodeError when parsing response" |
|
) |
|
wrong_response_count += 1 |
|
continue |
|
updated_dict[k] = mech_request |
|
time.sleep(IPFS_POLL_INTERVAL) |
|
|
|
return updated_dict |
|
|
|
|
|
def populate_delivers_ipfs_contents( |
|
session: requests.Session, mech_requests: Dict[str, Any], keys_to_traverse: list |
|
) -> dict: |
|
"""Function to complete the delivers content info from ipfs""" |
|
updated_dict = {} |
|
for k in tqdm( |
|
keys_to_traverse, |
|
desc="Fetching IPFS contents for delivers", |
|
position=1, |
|
unit="results", |
|
): |
|
mech_request = mech_requests[k] |
|
if "deliver" not in mech_request or len(mech_request["deliver"]) == 0: |
|
print(f"Skipping mech request {mech_request} because of no delivers info") |
|
continue |
|
|
|
deliver = mech_request["deliver"] |
|
if "ipfsContents" not in deliver: |
|
ipfs_hash = deliver["ipfsHash"] |
|
request_id = deliver["requestId"] |
|
url = f"{IPFS_ADDRESS}{ipfs_hash}/{request_id}" |
|
response = request(session, url) |
|
if response is None: |
|
tqdm.write(f"Skipping {mech_request=}.") |
|
continue |
|
try: |
|
contents = response.json() |
|
metadata = contents.get("metadata", None) |
|
if metadata and contents["metadata"]["tool"] in IRRELEVANT_TOOLS: |
|
continue |
|
contents.pop("cost_dict", None) |
|
deliver["ipfsContents"] = contents |
|
except requests.exceptions.JSONDecodeError: |
|
tqdm.write(f"Skipping {mech_request} because of JSONDecodeError") |
|
continue |
|
except Exception: |
|
tqdm.write( |
|
f"Skipping {mech_request} because of error parsing the response" |
|
) |
|
continue |
|
updated_dict[k] = mech_request |
|
time.sleep(IPFS_POLL_INTERVAL) |
|
|
|
return updated_dict |
|
|
|
|
|
def write_mech_events_to_file( |
|
mech_requests: Dict[str, Any], |
|
filename: str, |
|
force_write: bool = False, |
|
) -> None: |
|
global last_write_time |
|
now = time.time() |
|
|
|
if len(mech_requests) == 0: |
|
return |
|
|
|
filename_path = DATA_DIR / filename |
|
if force_write or (now - last_write_time) >= MINIMUM_WRITE_FILE_DELAY_SECONDS: |
|
with open(filename_path, "w", encoding="utf-8") as file: |
|
json.dump(mech_requests, file, indent=2) |
|
last_write_time = now |
|
|
|
|
|
def save_json_file(data: Dict[str, Any], filename: str): |
|
"""Function to save the content into a json file""" |
|
filename_path = JSON_DATA_DIR / filename |
|
with open(filename_path, "w", encoding="utf-8") as file: |
|
json.dump(data, file, indent=2) |
|
|
|
|
|
def clean_mech_delivers() -> None: |
|
"""Function to remove from the delivers json file the request Ids that are not in the mech requests""" |
|
|
|
with open(JSON_DATA_DIR / "mech_requests.json", "r") as file: |
|
mech_requests = json.load(file) |
|
|
|
list_reqIds = [mech_requests[k].get("requestId") for k in mech_requests.keys()] |
|
|
|
list_reqIds = list(set(list_reqIds)) |
|
|
|
|
|
with open(JSON_DATA_DIR / "mech_delivers.json", "r") as file: |
|
mech_delivers = json.load(file) |
|
|
|
print(f"original size of the file {len(mech_delivers)}") |
|
to_delete = [] |
|
for r in mech_delivers.keys(): |
|
if r not in list_reqIds: |
|
to_delete.append(r) |
|
|
|
for r in to_delete: |
|
mech_delivers.pop(r, None) |
|
print(f"final size of the file {len(mech_delivers)}") |
|
save_json_file(mech_delivers, "mech_delivers.json") |
|
|
|
|
|
def get_request_block_numbers( |
|
mech_requests: Dict[str, Any], target_req_id: int |
|
) -> list: |
|
block_numbers = [] |
|
|
|
for entry in mech_requests.values(): |
|
if entry["requestId"] == target_req_id: |
|
block_numbers.append(entry["blockNumber"]) |
|
|
|
return block_numbers |
|
|
|
|
|
def update_block_request_map(block_request_id_map: dict) -> None: |
|
print("Saving block request id map info") |
|
with open(JSON_DATA_DIR / "block_request_id_map.pickle", "wb") as handle: |
|
pickle.dump(block_request_id_map, handle, protocol=pickle.HIGHEST_PROTOCOL) |
|
|
|
|
|
def fix_duplicate_requestIds() -> dict: |
|
with open(JSON_DATA_DIR / "mech_delivers.json", "r") as file: |
|
data_delivers = json.load(file) |
|
|
|
with open(JSON_DATA_DIR / "mech_requests.json", "r") as file: |
|
mech_requests = json.load(file) |
|
list_request_Ids = list(data_delivers.keys()) |
|
|
|
list_duplicated_reqIds = [] |
|
for req_Id in list_request_Ids: |
|
if len(data_delivers.get(req_Id)) > 1: |
|
list_duplicated_reqIds.append(req_Id) |
|
|
|
print(len(list_duplicated_reqIds)) |
|
block_request_id_map = {} |
|
|
|
for req_Id in list_duplicated_reqIds: |
|
|
|
block_nrs = get_request_block_numbers(mech_requests, req_Id) |
|
|
|
mech_delivers_list = data_delivers.get(req_Id) |
|
if len(block_nrs) > 1: |
|
print("More than one block number was found") |
|
for block_nr in block_nrs: |
|
key = (block_nr, req_Id) |
|
min_difference_request = min( |
|
mech_delivers_list, |
|
key=lambda x: abs(int(x["blockNumber"]) - int(block_nr)), |
|
) |
|
block_request_id_map[key] = min_difference_request |
|
|
|
update_block_request_map(block_request_id_map) |
|
|
|
return block_request_id_map |
|
|
|
|
|
def merge_requests_delivers() -> None: |
|
"""Function to map requests and delivers""" |
|
with open(JSON_DATA_DIR / "mech_delivers.json", "r") as file: |
|
mech_delivers = json.load(file) |
|
|
|
with open(JSON_DATA_DIR / "mech_requests.json", "r") as file: |
|
mech_requests = json.load(file) |
|
|
|
|
|
with open(JSON_DATA_DIR / "block_request_id_map.pickle", "rb") as handle: |
|
|
|
block_request_id_map = pickle.load(handle) |
|
for _, mech_req in tqdm( |
|
mech_requests.items(), |
|
desc=f"Merging delivers data into the mech requests", |
|
): |
|
if "deliver" in mech_req: |
|
continue |
|
|
|
block_number_req = mech_req["blockNumber"] |
|
req_Id = mech_req["requestId"] |
|
|
|
key = (block_number_req, req_Id) |
|
if key in block_request_id_map.keys(): |
|
deliver_dict = block_request_id_map[key] |
|
elif req_Id in mech_delivers.keys(): |
|
deliver_dict = mech_delivers.get(req_Id)[0] |
|
else: |
|
print("No deliver entry found for this request Id") |
|
deliver_dict = collect_missing_delivers( |
|
request_id=req_Id, block_number=int(block_number_req) |
|
) |
|
|
|
|
|
mech_req["deliver"] = deliver_dict |
|
save_json_file(mech_requests, "merged_requests.json") |
|
return |
|
|
|
|
|
def get_ipfs_data(): |
|
with open(JSON_DATA_DIR / "merged_requests.json", "r") as file: |
|
mech_requests = json.load(file) |
|
|
|
total_keys_to_traverse = list(mech_requests.keys()) |
|
updated_mech_requests = dict() |
|
session = create_session() |
|
print("UPDATING IPFS CONTENTS OF REQUESTS") |
|
|
|
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: |
|
futures = [] |
|
for i in range(0, len(mech_requests), GET_CONTENTS_BATCH_SIZE): |
|
futures.append( |
|
executor.submit( |
|
populate_requests_ipfs_contents, |
|
session, |
|
mech_requests, |
|
total_keys_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE], |
|
) |
|
) |
|
|
|
for future in tqdm( |
|
as_completed(futures), |
|
total=len(futures), |
|
desc=f"Fetching all ipfs contents from requests ", |
|
): |
|
partial_dict = future.result() |
|
updated_mech_requests.update(partial_dict) |
|
|
|
save_json_file(updated_mech_requests, "tools_info.json") |
|
|
|
|
|
print("UPDATING IPFS CONTENTS OF DELIVERS") |
|
total_keys_to_traverse = list(updated_mech_requests.keys()) |
|
final_tools_content = {} |
|
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: |
|
futures = [] |
|
for i in range(0, len(updated_mech_requests), GET_CONTENTS_BATCH_SIZE): |
|
futures.append( |
|
executor.submit( |
|
populate_delivers_ipfs_contents, |
|
session, |
|
updated_mech_requests, |
|
total_keys_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE], |
|
) |
|
) |
|
|
|
for future in tqdm( |
|
as_completed(futures), |
|
total=len(futures), |
|
desc=f"Fetching all ipfs contents from delivers ", |
|
): |
|
partial_dict = future.result() |
|
final_tools_content.update(partial_dict) |
|
|
|
save_json_file(final_tools_content, "tools_info.json") |
|
|
|
|
|
def only_delivers_loop(): |
|
with open(DATA_DIR / "tools_info.json", "r") as file: |
|
updated_mech_requests = json.load(file) |
|
|
|
|
|
session = create_session() |
|
print("UPDATING IPFS CONTENTS OF DELIVERS") |
|
total_keys_to_traverse = list(updated_mech_requests.keys()) |
|
final_tools_content = {} |
|
with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor: |
|
futures = [] |
|
for i in range(0, len(updated_mech_requests), GET_CONTENTS_BATCH_SIZE): |
|
futures.append( |
|
executor.submit( |
|
populate_delivers_ipfs_contents, |
|
session, |
|
updated_mech_requests, |
|
total_keys_to_traverse[i : i + GET_CONTENTS_BATCH_SIZE], |
|
) |
|
) |
|
|
|
for future in tqdm( |
|
as_completed(futures), |
|
total=len(futures), |
|
desc=f"Fetching all ipfs contents from delivers ", |
|
): |
|
partial_dict = future.result() |
|
final_tools_content.update(partial_dict) |
|
|
|
save_json_file(final_tools_content, "tools_info.json") |
|
|