Spaces:
Sleeping
Sleeping
import os | |
import re | |
import shutil | |
import time | |
import dotenv | |
import fitz # PyMuPDF | |
import pandas as pd | |
import requests | |
import schedule | |
import srsly | |
from bs4 import BeautifulSoup | |
from datasets import Dataset, Image, concatenate_datasets, load_dataset | |
from huggingface_hub import HfApi, create_repo, login, whoami | |
from PIL import Image as PILImage | |
from retry import retry | |
from tqdm.auto import tqdm | |
dotenv.load_dotenv() | |
login(token=os.environ.get("HF_TOKEN")) | |
api = HfApi() | |
hf_user = whoami(os.environ.get("HF_TOKEN"))["name"] | |
HF_REPO_ID_TXT = f"{hf_user}/zotero-answer-ai-texts" | |
HF_REPO_ID_IMG = f"{hf_user}/zotero-answer-ai-images" | |
######################################################## | |
### GET ZOTERO ITEMS | |
######################################################## | |
def _fetch_one_zotero_batch(url, headers, params): | |
""" | |
Fetch articles from Zotero API | |
""" | |
response = requests.get(url, headers=headers, params=params) | |
response.raise_for_status() | |
return response.json() | |
def get_zotero_items(debug=False): | |
""" | |
fetch items from zotero library | |
""" | |
GROUP_ID = os.getenv("GROUP_ID") | |
API_KEY = os.getenv("API_KEY") | |
BASE_URL = f"https://api.zotero.org/groups/{GROUP_ID}/items" | |
LIMIT = 100 | |
headers = {"Zotero-API-Key": API_KEY, "Content-Type": "application/json"} | |
items = [] | |
start = 0 | |
i = 1 | |
while True: | |
i += 1 | |
params = {"limit": LIMIT, "start": start} | |
page_items = _fetch_one_zotero_batch(BASE_URL, headers, params) | |
if not page_items: | |
break | |
items.extend(page_items) | |
start += LIMIT | |
print(f"# items fetched {len(items)}") | |
if debug: | |
if len(items) > 1600: | |
break | |
return items | |
######################################################## | |
### EXTRACT ARXIV LINKS AND PDFs | |
######################################################## | |
def get_arxiv_items(items): | |
visited = set() | |
arxiv_items = [] | |
arxiv_pattern = re.compile(r"arxiv.org/abs/(\d+\.\d+)") | |
for item in items: | |
data = item.get("data", {}) | |
attachments = item.get("links", {}).get("attachment", {}) | |
arxiv_url = None | |
pdf_url = None | |
if "url" in data and "arxiv.org" in data["url"]: | |
arxiv_match = arxiv_pattern.search(data["url"]) | |
if arxiv_match: | |
arxiv_url = data["url"] | |
if attachments: | |
pdf_url = attachments["href"] | |
if arxiv_url: | |
arxiv_id = arxiv_url.split("/")[-1] | |
if arxiv_id in visited: | |
continue | |
authors = [] | |
for author in data.get("creators", []): | |
authors.append(f"{author.get('firstName', '')} {author.get('lastName', '')}") | |
arxiv_items.append( | |
{ | |
"arxiv_id": arxiv_id, | |
"arxiv_url": arxiv_url, | |
"title": data.get("title", ""), | |
"authors": authors, | |
"pdf_url": pdf_url, | |
"date_published": data.get("date", ""), | |
"added_by": item["meta"]["createdByUser"]["username"], | |
"date_added": data.get("dateAdded", ""), | |
} | |
) | |
visited.add(arxiv_id) | |
return arxiv_items | |
def fetch_arxiv_html(arxiv_id): | |
url = f"https://ar5iv.labs.arxiv.org/html/{arxiv_id.split('v')[0]}" | |
response = requests.get(url) | |
return response.text if response.status_code == 200 else None | |
def fetch_arxiv_htmls(arxiv_items): | |
for item in tqdm(arxiv_items): | |
html = fetch_arxiv_html(item["arxiv_id"]) | |
if html: | |
item["raw_content"] = html | |
else: | |
print(f"failed to fetch html for {item['arxiv_id']}") | |
item["raw_content"] = "Error" | |
return arxiv_items | |
######################################################## | |
### PARSE CONTENT FROM ARXIV HTML # | |
######################################################## | |
def parse_html_content(html): | |
""" | |
Parse content from arxiv html | |
""" | |
arxiv_id_match = re.search(r"\[(\d+\.\d+(v\d+)?)\]", html) | |
arxiv_id = arxiv_id_match.group(1) if arxiv_id_match else None | |
soup = BeautifulSoup(html, "html.parser") | |
result = [] | |
# Extract paper title | |
try: | |
paper_title = soup.find("h1", class_="ltx_title ltx_title_document").get_text(strip=True) | |
except Exception: | |
paper_title = soup.find("title").get_text(strip=True) | |
paper_title = re.sub(r"^\[\d+\.\d+(v\d+)?\]\s*", "", paper_title) | |
for math in soup.find_all("math"): | |
math.decompose() | |
for cite in soup.find_all("cite"): | |
cite.decompose() | |
# Extract abstract | |
abstract = soup.find("div", class_="ltx_abstract") | |
if abstract: | |
result.append( | |
{ | |
"content": " ".join(p.get_text(strip=True) for p in abstract.find_all("p")).replace(")", ") "), | |
"title": "Abstract", | |
"paper_title": paper_title, | |
"content_type": "abstract", | |
} | |
) | |
# Extract sections | |
sections = soup.find_all("section", class_="ltx_section") | |
for index, section in enumerate(sections): | |
section_title = section.find("h2", class_="ltx_title ltx_title_section") | |
section_title = section_title.get_text(strip=True) if section_title else f"Section {index + 1}" | |
section_content = section.get_text(strip=True).replace(")", ") ") | |
content_type = "body" | |
if index == 0: | |
content_type = "introduction" | |
elif index == len(sections) - 1: | |
content_type = "conclusion" | |
result.append( | |
{ | |
"content": section_content, | |
"title": section_title, | |
"paper_title": paper_title, | |
"content_type": content_type, | |
} | |
) | |
for c in result: | |
c["arxiv_id"] = arxiv_id | |
return result | |
######################################################## | |
### GET TEXTS FROM PDF & PARSE | |
######################################################## | |
def get_pdf_text(arxiv_id): | |
url = "http://147.189.194.113:80/extract" # fix: currently down | |
try: | |
response = requests.get(url, params={"arxiv_id": arxiv_id}) | |
response = response.json() | |
if "text" in response: | |
return response["text"] | |
return None | |
except Exception as e: | |
print(e) | |
return None | |
def get_content_type(section_type, section_count): | |
"""Determine the content type based on the section type and count""" | |
if section_type == "abstract": | |
return "abstract" | |
elif section_type == "introduction" or section_count == 1: | |
return "introduction" | |
elif section_type == "conclusion" or section_type == "references": | |
return section_type | |
else: | |
return "body" | |
def get_section_type(title): | |
"""Determine the section type based on the title""" | |
title_lower = title.lower() | |
if "abstract" in title_lower: | |
return "abstract" | |
elif "introduction" in title_lower: | |
return "introduction" | |
elif "conclusion" in title_lower: | |
return "conclusion" | |
elif "reference" in title_lower: | |
return "references" | |
else: | |
return "body" | |
def parse_markdown_content(md_content, arxiv_id): | |
""" | |
Parses markdown content to identify and extract sections based on headers. | |
""" | |
lines = md_content.split("\n") | |
parsed = [] | |
current_section = None | |
content = [] | |
paper_title = None | |
current_title = None | |
# identify sections based on headers | |
for line in lines: | |
if line.startswith("#"): | |
if paper_title is None: | |
paper_title = line.lstrip("#").strip() | |
continue | |
if content: | |
if current_title: | |
parsed.append( | |
{ | |
"content": " ".join(content), | |
"title": current_title, | |
"paper_title": paper_title, | |
"content_type": get_content_type(current_section, len(parsed)), | |
"arxiv_id": arxiv_id, | |
} | |
) | |
content = [] | |
current_title = line.lstrip("#").lstrip("#").lstrip() | |
if "bit" not in current_title: | |
current_title = ( | |
current_title.lstrip("123456789") | |
.lstrip() | |
.lstrip(".") | |
.lstrip() | |
.lstrip("123456789") | |
.lstrip() | |
.lstrip(".") | |
.lstrip() | |
) | |
current_section = get_section_type(current_title) | |
else: | |
content.append(line) | |
# Add the last section | |
if content and current_title: | |
parsed.append( | |
{ | |
"content": " ".join(content).replace(")", ") "), | |
"title": current_title, | |
"paper_title": paper_title, | |
"content_type": get_content_type(current_section, len(parsed)), | |
"arxiv_id": arxiv_id, | |
} | |
) | |
return parsed | |
######################################################## | |
### Image Dataset | |
######################################################## | |
def download_arxiv_pdf(arxiv_id): | |
arxiv_id = arxiv_id.split("v")[0] | |
url = f"https://arxiv.org/pdf/{arxiv_id}.pdf" | |
response = requests.get(url) | |
if response.status_code == 200: | |
return response.content | |
else: | |
raise Exception(f"Failed to download PDF. Status code: {response.status_code}") | |
def pdf_to_jpegs(pdf_content, output_folder, max_pages=128): | |
# Create output folder if it doesn't exist | |
os.makedirs(output_folder, exist_ok=True) | |
# Open the PDF | |
doc = fitz.open(stream=pdf_content, filetype="pdf") | |
# Iterate through pages | |
for page_num in range(len(doc)): | |
page = doc.load_page(page_num) | |
# Convert page to image | |
pix = page.get_pixmap() | |
# Save image as JPEG | |
image_path = os.path.join(output_folder, f"page_{page_num + 1}.jpg") | |
pix.save(image_path) | |
# print(f"Saved {image_path}") | |
if page_num >= max_pages: | |
break | |
doc.close() | |
def save_arxiv_article_images(arxiv_id): | |
output_folder = os.path.join("data", "arxiv_images", arxiv_id) | |
try: | |
pdf_content = download_arxiv_pdf(arxiv_id) | |
pdf_to_jpegs(pdf_content, output_folder) | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
def create_hf_image_dataset(base_dir): | |
data = [] | |
# Walk through the directory | |
for root, dirs, files in os.walk(base_dir): | |
for file in files: | |
if file.endswith(".jpg"): | |
# Extract arxiv_id from the path | |
arxiv_id = os.path.basename(root) | |
# Extract page number from the filename | |
match = re.search(r"page_(\d+)", file) | |
if match: | |
page_number = int(match.group(1)) | |
else: | |
continue # Skip if page number can't be extracted | |
# Full path to the image | |
image_path = os.path.join(root, file) | |
# Open the image to get its size | |
with PILImage.open(image_path) as img: | |
width, height = img.size | |
# Add the data | |
data.append( | |
{"image": image_path, "arxiv_id": arxiv_id, "page_number": page_number, "width": width, "height": height} | |
) | |
# Create the dataset | |
dataset = Dataset.from_dict( | |
{ | |
"image": [d["image"] for d in data], | |
"arxiv_id": [d["arxiv_id"] for d in data], | |
"page_number": [d["page_number"] for d in data], | |
} | |
) | |
# Cast the image column to Image | |
dataset = dataset.cast_column("image", Image()) | |
return dataset | |
######################################################## | |
### HF UPLOAD | |
######################################################## | |
def upload_to_hf(abstract_df, contents_df, processed_arxiv_ids): | |
# repo_id = HF_REPO_ID | |
create_repo( | |
repo_id=HF_REPO_ID_TXT, | |
token=os.environ.get("HF_TOKEN"), | |
private=True, | |
repo_type="dataset", | |
exist_ok=True, | |
) | |
create_repo( | |
repo_id=HF_REPO_ID_IMG, | |
token=os.environ.get("HF_TOKEN"), | |
private=True, | |
repo_type="dataset", | |
exist_ok=True, | |
) | |
# upload image dataset | |
try: | |
img_ds = create_hf_image_dataset("data/arxiv_images") | |
try: | |
old_img_ds = load_dataset(HF_REPO_ID_IMG, "images")["train"] | |
img_ds = concatenate_datasets([old_img_ds, img_ds]) | |
except Exception as e: | |
print(e) | |
img_ds.push_to_hub(HF_REPO_ID_IMG, "images", token=os.environ.get("HF_TOKEN")) | |
except Exception as e: | |
print(e) | |
# upload first pages only | |
try: | |
img_ds = img_ds.filter(lambda x: x["page_number"] == 1) | |
img_ds.push_to_hub(HF_REPO_ID_IMG, "images_first_page", token=os.environ.get("HF_TOKEN")) | |
except Exception as e: | |
print(e) | |
try: | |
# push id_to_abstract | |
abstract_ds = Dataset.from_pandas(abstract_df) | |
abstract_ds.push_to_hub(HF_REPO_ID_TXT, "abstracts", token=os.environ.get("HF_TOKEN")) | |
# push arxiv_items | |
arxiv_ds = Dataset.from_pandas(contents_df) | |
arxiv_ds.push_to_hub(HF_REPO_ID_TXT, "articles", token=os.environ.get("HF_TOKEN")) | |
# push processed_arxiv_ids | |
processed_arxiv_ids = [{"arxiv_id": arxiv_id} for arxiv_id in processed_arxiv_ids] | |
processed_arxiv_ids_ds = Dataset.from_list(processed_arxiv_ids) | |
processed_arxiv_ids_ds.push_to_hub(HF_REPO_ID_TXT, "processed_arxiv_ids", token=os.environ.get("HF_TOKEN")) | |
except Exception as e: | |
print(e) | |
# trigger refresh of connected datasets | |
print("==" * 40) | |
print("Triggering refresh of connected datasets") | |
api.restart_space(repo_id="answerdotai/zotero-weekly") | |
print("==" * 40) | |
######################################################## | |
### MAIN | |
######################################################## | |
def main(): | |
# items = get_zotero_items(debug=True) | |
items = get_zotero_items(debug=False) | |
print(f"# of items fetched from zotero: {len(items)}") | |
arxiv_items = get_arxiv_items(items) | |
print(f"# of arxiv papers: {len(arxiv_items)}") | |
# get already processed arxiv ids from HF | |
try: | |
existing_arxiv_ids = load_dataset(HF_REPO_ID_TXT, "processed_arxiv_ids")["train"]["arxiv_id"] | |
except Exception as e: | |
print(e) | |
existing_arxiv_ids = [] | |
existing_arxiv_ids = set(existing_arxiv_ids) | |
print(f"# of existing arxiv ids: {len(existing_arxiv_ids)}") | |
# new arxiv items | |
arxiv_items = [item for item in arxiv_items if item["arxiv_id"] not in existing_arxiv_ids] | |
arxiv_items = fetch_arxiv_htmls(arxiv_items) | |
print(f"# of new arxiv items: {len(arxiv_items)}") | |
if len(arxiv_items) == 0: | |
print("No new arxiv items to process") | |
return | |
processed_arxiv_ids = set() | |
pbar = tqdm(range(len(arxiv_items))) | |
# remove "data" directory if it exists | |
if os.path.exists("data"): | |
try: | |
shutil.rmtree("data") | |
except Exception as e: | |
print(e) | |
for item in arxiv_items: | |
# download images -- | |
save_arxiv_article_images(item["arxiv_id"]) | |
# parse html | |
try: | |
item["contents"] = parse_html_content(item["raw_content"]) | |
except Exception as e: | |
print(f"Failed to parse html for {item['arxiv_id']}: {e}") | |
item["contents"] = [] | |
if len(item["contents"]) == 0: | |
print("Extracting from pdf...") | |
md_content = get_pdf_text(item["arxiv_id"]) # fix this | |
item["raw_content"] = md_content | |
if md_content: | |
item["contents"] = parse_markdown_content(md_content, item["arxiv_id"]) | |
else: | |
item["contents"] = [] | |
if len(item["contents"]) > 0: | |
processed_arxiv_ids.add(item["arxiv_id"]) | |
if len(item["authors"]) == 0: | |
item["authors"] = [] # ["unknown"] | |
item["title"] = item["contents"][0]["paper_title"] | |
pbar.update(1) | |
pbar.close() | |
# save contents --- | |
processed_arxiv_ids = list(processed_arxiv_ids) | |
print(f"# of processed arxiv ids: {len(processed_arxiv_ids)}") | |
# save abstracts --- | |
id_to_abstract = {} | |
for item in arxiv_items: | |
for entry in item["contents"]: | |
if entry["content_type"] == "abstract": | |
id_to_abstract[item["arxiv_id"]] = entry["content"] | |
break | |
print(f"# of abstracts: {len(id_to_abstract)}") | |
abstract_df = pd.Series(id_to_abstract).reset_index().rename(columns={"index": "arxiv_id", 0: "abstract"}) | |
print(abstract_df.head()) | |
# add to existing dataset | |
try: | |
old_abstract_df = load_dataset(HF_REPO_ID_TXT, "abstracts")["train"].to_pandas() | |
except Exception as e: | |
print(e) | |
old_abstract_df = pd.DataFrame(columns=abstract_df.columns) | |
print(old_abstract_df.head()) | |
abstract_df = pd.concat([old_abstract_df, abstract_df]).reset_index(drop=True) | |
abstract_df = abstract_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True) | |
# contents | |
contents_df = pd.DataFrame(arxiv_items) | |
print(contents_df.head()) | |
try: | |
old_contents_df = load_dataset(HF_REPO_ID_TXT, "articles")["train"].to_pandas() | |
except Exception as e: | |
print(e) | |
old_contents_df = pd.DataFrame(columns=contents_df.columns) | |
if len(old_contents_df) > 0: | |
print(old_contents_df.sample().T) | |
contents_df = pd.concat([old_contents_df, contents_df]).reset_index(drop=True) | |
contents_df = contents_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True) | |
# upload to hf | |
processed_arxiv_ids = list(set(processed_arxiv_ids + list(existing_arxiv_ids))) | |
upload_to_hf(abstract_df, contents_df, processed_arxiv_ids) | |
# save as local copy | |
os.makedirs("data", exist_ok=True) | |
abstract_df.to_parquet("data/abstracts.parquet") | |
contents_df.to_parquet("data/contents.parquet") | |
srsly.write_json("data/processed_arxiv_ids.json", processed_arxiv_ids) | |
def schedule_periodic_task(): | |
""" | |
Schedule the main task to run at the user-defined frequency | |
""" | |
# main() # run once initially | |
frequency = "daily" # TODO: env | |
if frequency == "hourly": | |
print("Scheduling tasks to run every hour at the top of the hour") | |
schedule.every().hour.at(":00").do(main) | |
elif frequency == "daily": | |
start_time = "10:00" | |
print("Scheduling tasks to run every day at: {start_time} UTC+00") | |
schedule.every().day.at(start_time).do(main) | |
while True: | |
schedule.run_pending() | |
time.sleep(1) | |
if __name__ == "__main__": | |
schedule_periodic_task() | |