rbiswasfc's picture
pipe
0889949
import os
import re
import shutil
import time
import dotenv
import fitz # PyMuPDF
import pandas as pd
import requests
import schedule
import srsly
from bs4 import BeautifulSoup
from datasets import Dataset, Image, concatenate_datasets, load_dataset
from huggingface_hub import HfApi, create_repo, login, whoami
from PIL import Image as PILImage
from retry import retry
from tqdm.auto import tqdm
dotenv.load_dotenv()
login(token=os.environ.get("HF_TOKEN"))
api = HfApi()
hf_user = whoami(os.environ.get("HF_TOKEN"))["name"]
HF_REPO_ID_TXT = f"{hf_user}/zotero-answer-ai-texts"
HF_REPO_ID_IMG = f"{hf_user}/zotero-answer-ai-images"
########################################################
### GET ZOTERO ITEMS
########################################################
@retry(tries=3, delay=8)
def _fetch_one_zotero_batch(url, headers, params):
"""
Fetch articles from Zotero API
"""
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
def get_zotero_items(debug=False):
"""
fetch items from zotero library
"""
GROUP_ID = os.getenv("GROUP_ID")
API_KEY = os.getenv("API_KEY")
BASE_URL = f"https://api.zotero.org/groups/{GROUP_ID}/items"
LIMIT = 100
headers = {"Zotero-API-Key": API_KEY, "Content-Type": "application/json"}
items = []
start = 0
i = 1
while True:
i += 1
params = {"limit": LIMIT, "start": start}
page_items = _fetch_one_zotero_batch(BASE_URL, headers, params)
if not page_items:
break
items.extend(page_items)
start += LIMIT
print(f"# items fetched {len(items)}")
if debug:
if len(items) > 1600:
break
return items
########################################################
### EXTRACT ARXIV LINKS AND PDFs
########################################################
def get_arxiv_items(items):
visited = set()
arxiv_items = []
arxiv_pattern = re.compile(r"arxiv.org/abs/(\d+\.\d+)")
for item in items:
data = item.get("data", {})
attachments = item.get("links", {}).get("attachment", {})
arxiv_url = None
pdf_url = None
if "url" in data and "arxiv.org" in data["url"]:
arxiv_match = arxiv_pattern.search(data["url"])
if arxiv_match:
arxiv_url = data["url"]
if attachments:
pdf_url = attachments["href"]
if arxiv_url:
arxiv_id = arxiv_url.split("/")[-1]
if arxiv_id in visited:
continue
authors = []
for author in data.get("creators", []):
authors.append(f"{author.get('firstName', '')} {author.get('lastName', '')}")
arxiv_items.append(
{
"arxiv_id": arxiv_id,
"arxiv_url": arxiv_url,
"title": data.get("title", ""),
"authors": authors,
"pdf_url": pdf_url,
"date_published": data.get("date", ""),
"added_by": item["meta"]["createdByUser"]["username"],
"date_added": data.get("dateAdded", ""),
}
)
visited.add(arxiv_id)
return arxiv_items
@retry(tries=3, delay=15, backoff=2)
def fetch_arxiv_html(arxiv_id):
url = f"https://ar5iv.labs.arxiv.org/html/{arxiv_id.split('v')[0]}"
response = requests.get(url)
return response.text if response.status_code == 200 else None
def fetch_arxiv_htmls(arxiv_items):
for item in tqdm(arxiv_items):
html = fetch_arxiv_html(item["arxiv_id"])
if html:
item["raw_content"] = html
else:
print(f"failed to fetch html for {item['arxiv_id']}")
item["raw_content"] = "Error"
return arxiv_items
########################################################
### PARSE CONTENT FROM ARXIV HTML #
########################################################
def parse_html_content(html):
"""
Parse content from arxiv html
"""
arxiv_id_match = re.search(r"\[(\d+\.\d+(v\d+)?)\]", html)
arxiv_id = arxiv_id_match.group(1) if arxiv_id_match else None
soup = BeautifulSoup(html, "html.parser")
result = []
# Extract paper title
try:
paper_title = soup.find("h1", class_="ltx_title ltx_title_document").get_text(strip=True)
except Exception:
paper_title = soup.find("title").get_text(strip=True)
paper_title = re.sub(r"^\[\d+\.\d+(v\d+)?\]\s*", "", paper_title)
for math in soup.find_all("math"):
math.decompose()
for cite in soup.find_all("cite"):
cite.decompose()
# Extract abstract
abstract = soup.find("div", class_="ltx_abstract")
if abstract:
result.append(
{
"content": " ".join(p.get_text(strip=True) for p in abstract.find_all("p")).replace(")", ") "),
"title": "Abstract",
"paper_title": paper_title,
"content_type": "abstract",
}
)
# Extract sections
sections = soup.find_all("section", class_="ltx_section")
for index, section in enumerate(sections):
section_title = section.find("h2", class_="ltx_title ltx_title_section")
section_title = section_title.get_text(strip=True) if section_title else f"Section {index + 1}"
section_content = section.get_text(strip=True).replace(")", ") ")
content_type = "body"
if index == 0:
content_type = "introduction"
elif index == len(sections) - 1:
content_type = "conclusion"
result.append(
{
"content": section_content,
"title": section_title,
"paper_title": paper_title,
"content_type": content_type,
}
)
for c in result:
c["arxiv_id"] = arxiv_id
return result
########################################################
### GET TEXTS FROM PDF & PARSE
########################################################
def get_pdf_text(arxiv_id):
url = "http://147.189.194.113:80/extract" # fix: currently down
try:
response = requests.get(url, params={"arxiv_id": arxiv_id})
response = response.json()
if "text" in response:
return response["text"]
return None
except Exception as e:
print(e)
return None
def get_content_type(section_type, section_count):
"""Determine the content type based on the section type and count"""
if section_type == "abstract":
return "abstract"
elif section_type == "introduction" or section_count == 1:
return "introduction"
elif section_type == "conclusion" or section_type == "references":
return section_type
else:
return "body"
def get_section_type(title):
"""Determine the section type based on the title"""
title_lower = title.lower()
if "abstract" in title_lower:
return "abstract"
elif "introduction" in title_lower:
return "introduction"
elif "conclusion" in title_lower:
return "conclusion"
elif "reference" in title_lower:
return "references"
else:
return "body"
def parse_markdown_content(md_content, arxiv_id):
"""
Parses markdown content to identify and extract sections based on headers.
"""
lines = md_content.split("\n")
parsed = []
current_section = None
content = []
paper_title = None
current_title = None
# identify sections based on headers
for line in lines:
if line.startswith("#"):
if paper_title is None:
paper_title = line.lstrip("#").strip()
continue
if content:
if current_title:
parsed.append(
{
"content": " ".join(content),
"title": current_title,
"paper_title": paper_title,
"content_type": get_content_type(current_section, len(parsed)),
"arxiv_id": arxiv_id,
}
)
content = []
current_title = line.lstrip("#").lstrip("#").lstrip()
if "bit" not in current_title:
current_title = (
current_title.lstrip("123456789")
.lstrip()
.lstrip(".")
.lstrip()
.lstrip("123456789")
.lstrip()
.lstrip(".")
.lstrip()
)
current_section = get_section_type(current_title)
else:
content.append(line)
# Add the last section
if content and current_title:
parsed.append(
{
"content": " ".join(content).replace(")", ") "),
"title": current_title,
"paper_title": paper_title,
"content_type": get_content_type(current_section, len(parsed)),
"arxiv_id": arxiv_id,
}
)
return parsed
########################################################
### Image Dataset
########################################################
def download_arxiv_pdf(arxiv_id):
arxiv_id = arxiv_id.split("v")[0]
url = f"https://arxiv.org/pdf/{arxiv_id}.pdf"
response = requests.get(url)
if response.status_code == 200:
return response.content
else:
raise Exception(f"Failed to download PDF. Status code: {response.status_code}")
def pdf_to_jpegs(pdf_content, output_folder, max_pages=128):
# Create output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)
# Open the PDF
doc = fitz.open(stream=pdf_content, filetype="pdf")
# Iterate through pages
for page_num in range(len(doc)):
page = doc.load_page(page_num)
# Convert page to image
pix = page.get_pixmap()
# Save image as JPEG
image_path = os.path.join(output_folder, f"page_{page_num + 1}.jpg")
pix.save(image_path)
# print(f"Saved {image_path}")
if page_num >= max_pages:
break
doc.close()
def save_arxiv_article_images(arxiv_id):
output_folder = os.path.join("data", "arxiv_images", arxiv_id)
try:
pdf_content = download_arxiv_pdf(arxiv_id)
pdf_to_jpegs(pdf_content, output_folder)
except Exception as e:
print(f"An error occurred: {str(e)}")
def create_hf_image_dataset(base_dir):
data = []
# Walk through the directory
for root, dirs, files in os.walk(base_dir):
for file in files:
if file.endswith(".jpg"):
# Extract arxiv_id from the path
arxiv_id = os.path.basename(root)
# Extract page number from the filename
match = re.search(r"page_(\d+)", file)
if match:
page_number = int(match.group(1))
else:
continue # Skip if page number can't be extracted
# Full path to the image
image_path = os.path.join(root, file)
# Open the image to get its size
with PILImage.open(image_path) as img:
width, height = img.size
# Add the data
data.append(
{"image": image_path, "arxiv_id": arxiv_id, "page_number": page_number, "width": width, "height": height}
)
# Create the dataset
dataset = Dataset.from_dict(
{
"image": [d["image"] for d in data],
"arxiv_id": [d["arxiv_id"] for d in data],
"page_number": [d["page_number"] for d in data],
}
)
# Cast the image column to Image
dataset = dataset.cast_column("image", Image())
return dataset
########################################################
### HF UPLOAD
########################################################
def upload_to_hf(abstract_df, contents_df, processed_arxiv_ids):
# repo_id = HF_REPO_ID
create_repo(
repo_id=HF_REPO_ID_TXT,
token=os.environ.get("HF_TOKEN"),
private=True,
repo_type="dataset",
exist_ok=True,
)
create_repo(
repo_id=HF_REPO_ID_IMG,
token=os.environ.get("HF_TOKEN"),
private=True,
repo_type="dataset",
exist_ok=True,
)
# upload image dataset
try:
img_ds = create_hf_image_dataset("data/arxiv_images")
try:
old_img_ds = load_dataset(HF_REPO_ID_IMG, "images")["train"]
img_ds = concatenate_datasets([old_img_ds, img_ds])
except Exception as e:
print(e)
img_ds.push_to_hub(HF_REPO_ID_IMG, "images", token=os.environ.get("HF_TOKEN"))
except Exception as e:
print(e)
# upload first pages only
try:
img_ds = img_ds.filter(lambda x: x["page_number"] == 1)
img_ds.push_to_hub(HF_REPO_ID_IMG, "images_first_page", token=os.environ.get("HF_TOKEN"))
except Exception as e:
print(e)
try:
# push id_to_abstract
abstract_ds = Dataset.from_pandas(abstract_df)
abstract_ds.push_to_hub(HF_REPO_ID_TXT, "abstracts", token=os.environ.get("HF_TOKEN"))
# push arxiv_items
arxiv_ds = Dataset.from_pandas(contents_df)
arxiv_ds.push_to_hub(HF_REPO_ID_TXT, "articles", token=os.environ.get("HF_TOKEN"))
# push processed_arxiv_ids
processed_arxiv_ids = [{"arxiv_id": arxiv_id} for arxiv_id in processed_arxiv_ids]
processed_arxiv_ids_ds = Dataset.from_list(processed_arxiv_ids)
processed_arxiv_ids_ds.push_to_hub(HF_REPO_ID_TXT, "processed_arxiv_ids", token=os.environ.get("HF_TOKEN"))
except Exception as e:
print(e)
# trigger refresh of connected datasets
print("==" * 40)
print("Triggering refresh of connected datasets")
api.restart_space(repo_id="answerdotai/zotero-weekly")
print("==" * 40)
########################################################
### MAIN
########################################################
def main():
# items = get_zotero_items(debug=True)
items = get_zotero_items(debug=False)
print(f"# of items fetched from zotero: {len(items)}")
arxiv_items = get_arxiv_items(items)
print(f"# of arxiv papers: {len(arxiv_items)}")
# get already processed arxiv ids from HF
try:
existing_arxiv_ids = load_dataset(HF_REPO_ID_TXT, "processed_arxiv_ids")["train"]["arxiv_id"]
except Exception as e:
print(e)
existing_arxiv_ids = []
existing_arxiv_ids = set(existing_arxiv_ids)
print(f"# of existing arxiv ids: {len(existing_arxiv_ids)}")
# new arxiv items
arxiv_items = [item for item in arxiv_items if item["arxiv_id"] not in existing_arxiv_ids]
arxiv_items = fetch_arxiv_htmls(arxiv_items)
print(f"# of new arxiv items: {len(arxiv_items)}")
if len(arxiv_items) == 0:
print("No new arxiv items to process")
return
processed_arxiv_ids = set()
pbar = tqdm(range(len(arxiv_items)))
# remove "data" directory if it exists
if os.path.exists("data"):
try:
shutil.rmtree("data")
except Exception as e:
print(e)
for item in arxiv_items:
# download images --
save_arxiv_article_images(item["arxiv_id"])
# parse html
try:
item["contents"] = parse_html_content(item["raw_content"])
except Exception as e:
print(f"Failed to parse html for {item['arxiv_id']}: {e}")
item["contents"] = []
if len(item["contents"]) == 0:
print("Extracting from pdf...")
md_content = get_pdf_text(item["arxiv_id"]) # fix this
item["raw_content"] = md_content
if md_content:
item["contents"] = parse_markdown_content(md_content, item["arxiv_id"])
else:
item["contents"] = []
if len(item["contents"]) > 0:
processed_arxiv_ids.add(item["arxiv_id"])
if len(item["authors"]) == 0:
item["authors"] = [] # ["unknown"]
item["title"] = item["contents"][0]["paper_title"]
pbar.update(1)
pbar.close()
# save contents ---
processed_arxiv_ids = list(processed_arxiv_ids)
print(f"# of processed arxiv ids: {len(processed_arxiv_ids)}")
# save abstracts ---
id_to_abstract = {}
for item in arxiv_items:
for entry in item["contents"]:
if entry["content_type"] == "abstract":
id_to_abstract[item["arxiv_id"]] = entry["content"]
break
print(f"# of abstracts: {len(id_to_abstract)}")
abstract_df = pd.Series(id_to_abstract).reset_index().rename(columns={"index": "arxiv_id", 0: "abstract"})
print(abstract_df.head())
# add to existing dataset
try:
old_abstract_df = load_dataset(HF_REPO_ID_TXT, "abstracts")["train"].to_pandas()
except Exception as e:
print(e)
old_abstract_df = pd.DataFrame(columns=abstract_df.columns)
print(old_abstract_df.head())
abstract_df = pd.concat([old_abstract_df, abstract_df]).reset_index(drop=True)
abstract_df = abstract_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True)
# contents
contents_df = pd.DataFrame(arxiv_items)
print(contents_df.head())
try:
old_contents_df = load_dataset(HF_REPO_ID_TXT, "articles")["train"].to_pandas()
except Exception as e:
print(e)
old_contents_df = pd.DataFrame(columns=contents_df.columns)
if len(old_contents_df) > 0:
print(old_contents_df.sample().T)
contents_df = pd.concat([old_contents_df, contents_df]).reset_index(drop=True)
contents_df = contents_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True)
# upload to hf
processed_arxiv_ids = list(set(processed_arxiv_ids + list(existing_arxiv_ids)))
upload_to_hf(abstract_df, contents_df, processed_arxiv_ids)
# save as local copy
os.makedirs("data", exist_ok=True)
abstract_df.to_parquet("data/abstracts.parquet")
contents_df.to_parquet("data/contents.parquet")
srsly.write_json("data/processed_arxiv_ids.json", processed_arxiv_ids)
def schedule_periodic_task():
"""
Schedule the main task to run at the user-defined frequency
"""
# main() # run once initially
frequency = "daily" # TODO: env
if frequency == "hourly":
print("Scheduling tasks to run every hour at the top of the hour")
schedule.every().hour.at(":00").do(main)
elif frequency == "daily":
start_time = "10:00"
print("Scheduling tasks to run every day at: {start_time} UTC+00")
schedule.every().day.at(start_time).do(main)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
schedule_periodic_task()