import os import re import shutil import time import dotenv import fitz # PyMuPDF import pandas as pd import requests import schedule import srsly from bs4 import BeautifulSoup from datasets import Dataset, Image, concatenate_datasets, load_dataset from huggingface_hub import HfApi, create_repo, login, whoami from PIL import Image as PILImage from retry import retry from tqdm.auto import tqdm dotenv.load_dotenv() login(token=os.environ.get("HF_TOKEN")) api = HfApi() hf_user = whoami(os.environ.get("HF_TOKEN"))["name"] HF_REPO_ID_TXT = f"{hf_user}/zotero-answer-ai-texts" HF_REPO_ID_IMG = f"{hf_user}/zotero-answer-ai-images" ######################################################## ### GET ZOTERO ITEMS ######################################################## @retry(tries=3, delay=8) def _fetch_one_zotero_batch(url, headers, params): """ Fetch articles from Zotero API """ response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response.json() def get_zotero_items(debug=False): """ fetch items from zotero library """ GROUP_ID = os.getenv("GROUP_ID") API_KEY = os.getenv("API_KEY") BASE_URL = f"https://api.zotero.org/groups/{GROUP_ID}/items" LIMIT = 100 headers = {"Zotero-API-Key": API_KEY, "Content-Type": "application/json"} items = [] start = 0 i = 1 while True: i += 1 params = {"limit": LIMIT, "start": start} page_items = _fetch_one_zotero_batch(BASE_URL, headers, params) if not page_items: break items.extend(page_items) start += LIMIT print(f"# items fetched {len(items)}") if debug: if len(items) > 1600: break return items ######################################################## ### EXTRACT ARXIV LINKS AND PDFs ######################################################## def get_arxiv_items(items): visited = set() arxiv_items = [] arxiv_pattern = re.compile(r"arxiv.org/abs/(\d+\.\d+)") for item in items: data = item.get("data", {}) attachments = item.get("links", {}).get("attachment", {}) arxiv_url = None pdf_url = None if "url" in data and "arxiv.org" in data["url"]: arxiv_match = arxiv_pattern.search(data["url"]) if arxiv_match: arxiv_url = data["url"] if attachments: pdf_url = attachments["href"] if arxiv_url: arxiv_id = arxiv_url.split("/")[-1] if arxiv_id in visited: continue authors = [] for author in data.get("creators", []): authors.append(f"{author.get('firstName', '')} {author.get('lastName', '')}") arxiv_items.append( { "arxiv_id": arxiv_id, "arxiv_url": arxiv_url, "title": data.get("title", ""), "authors": authors, "pdf_url": pdf_url, "date_published": data.get("date", ""), "added_by": item["meta"]["createdByUser"]["username"], "date_added": data.get("dateAdded", ""), } ) visited.add(arxiv_id) return arxiv_items @retry(tries=3, delay=15, backoff=2) def fetch_arxiv_html(arxiv_id): url = f"https://ar5iv.labs.arxiv.org/html/{arxiv_id.split('v')[0]}" response = requests.get(url) return response.text if response.status_code == 200 else None def fetch_arxiv_htmls(arxiv_items): for item in tqdm(arxiv_items): html = fetch_arxiv_html(item["arxiv_id"]) if html: item["raw_content"] = html else: print(f"failed to fetch html for {item['arxiv_id']}") item["raw_content"] = "Error" return arxiv_items ######################################################## ### PARSE CONTENT FROM ARXIV HTML # ######################################################## def parse_html_content(html): """ Parse content from arxiv html """ arxiv_id_match = re.search(r"\[(\d+\.\d+(v\d+)?)\]", html) arxiv_id = arxiv_id_match.group(1) if arxiv_id_match else None soup = BeautifulSoup(html, "html.parser") result = [] # Extract paper title try: paper_title = soup.find("h1", class_="ltx_title ltx_title_document").get_text(strip=True) except Exception: paper_title = soup.find("title").get_text(strip=True) paper_title = re.sub(r"^\[\d+\.\d+(v\d+)?\]\s*", "", paper_title) for math in soup.find_all("math"): math.decompose() for cite in soup.find_all("cite"): cite.decompose() # Extract abstract abstract = soup.find("div", class_="ltx_abstract") if abstract: result.append( { "content": " ".join(p.get_text(strip=True) for p in abstract.find_all("p")).replace(")", ") "), "title": "Abstract", "paper_title": paper_title, "content_type": "abstract", } ) # Extract sections sections = soup.find_all("section", class_="ltx_section") for index, section in enumerate(sections): section_title = section.find("h2", class_="ltx_title ltx_title_section") section_title = section_title.get_text(strip=True) if section_title else f"Section {index + 1}" section_content = section.get_text(strip=True).replace(")", ") ") content_type = "body" if index == 0: content_type = "introduction" elif index == len(sections) - 1: content_type = "conclusion" result.append( { "content": section_content, "title": section_title, "paper_title": paper_title, "content_type": content_type, } ) for c in result: c["arxiv_id"] = arxiv_id return result ######################################################## ### GET TEXTS FROM PDF & PARSE ######################################################## def get_pdf_text(arxiv_id): url = "http://147.189.194.113:80/extract" # fix: currently down try: response = requests.get(url, params={"arxiv_id": arxiv_id}) response = response.json() if "text" in response: return response["text"] return None except Exception as e: print(e) return None def get_content_type(section_type, section_count): """Determine the content type based on the section type and count""" if section_type == "abstract": return "abstract" elif section_type == "introduction" or section_count == 1: return "introduction" elif section_type == "conclusion" or section_type == "references": return section_type else: return "body" def get_section_type(title): """Determine the section type based on the title""" title_lower = title.lower() if "abstract" in title_lower: return "abstract" elif "introduction" in title_lower: return "introduction" elif "conclusion" in title_lower: return "conclusion" elif "reference" in title_lower: return "references" else: return "body" def parse_markdown_content(md_content, arxiv_id): """ Parses markdown content to identify and extract sections based on headers. """ lines = md_content.split("\n") parsed = [] current_section = None content = [] paper_title = None current_title = None # identify sections based on headers for line in lines: if line.startswith("#"): if paper_title is None: paper_title = line.lstrip("#").strip() continue if content: if current_title: parsed.append( { "content": " ".join(content), "title": current_title, "paper_title": paper_title, "content_type": get_content_type(current_section, len(parsed)), "arxiv_id": arxiv_id, } ) content = [] current_title = line.lstrip("#").lstrip("#").lstrip() if "bit" not in current_title: current_title = ( current_title.lstrip("123456789") .lstrip() .lstrip(".") .lstrip() .lstrip("123456789") .lstrip() .lstrip(".") .lstrip() ) current_section = get_section_type(current_title) else: content.append(line) # Add the last section if content and current_title: parsed.append( { "content": " ".join(content).replace(")", ") "), "title": current_title, "paper_title": paper_title, "content_type": get_content_type(current_section, len(parsed)), "arxiv_id": arxiv_id, } ) return parsed ######################################################## ### Image Dataset ######################################################## def download_arxiv_pdf(arxiv_id): arxiv_id = arxiv_id.split("v")[0] url = f"https://arxiv.org/pdf/{arxiv_id}.pdf" response = requests.get(url) if response.status_code == 200: return response.content else: raise Exception(f"Failed to download PDF. Status code: {response.status_code}") def pdf_to_jpegs(pdf_content, output_folder, max_pages=128): # Create output folder if it doesn't exist os.makedirs(output_folder, exist_ok=True) # Open the PDF doc = fitz.open(stream=pdf_content, filetype="pdf") # Iterate through pages for page_num in range(len(doc)): page = doc.load_page(page_num) # Convert page to image pix = page.get_pixmap() # Save image as JPEG image_path = os.path.join(output_folder, f"page_{page_num + 1}.jpg") pix.save(image_path) # print(f"Saved {image_path}") if page_num >= max_pages: break doc.close() def save_arxiv_article_images(arxiv_id): output_folder = os.path.join("data", "arxiv_images", arxiv_id) try: pdf_content = download_arxiv_pdf(arxiv_id) pdf_to_jpegs(pdf_content, output_folder) except Exception as e: print(f"An error occurred: {str(e)}") def create_hf_image_dataset(base_dir): data = [] # Walk through the directory for root, dirs, files in os.walk(base_dir): for file in files: if file.endswith(".jpg"): # Extract arxiv_id from the path arxiv_id = os.path.basename(root) # Extract page number from the filename match = re.search(r"page_(\d+)", file) if match: page_number = int(match.group(1)) else: continue # Skip if page number can't be extracted # Full path to the image image_path = os.path.join(root, file) # Open the image to get its size with PILImage.open(image_path) as img: width, height = img.size # Add the data data.append( {"image": image_path, "arxiv_id": arxiv_id, "page_number": page_number, "width": width, "height": height} ) # Create the dataset dataset = Dataset.from_dict( { "image": [d["image"] for d in data], "arxiv_id": [d["arxiv_id"] for d in data], "page_number": [d["page_number"] for d in data], } ) # Cast the image column to Image dataset = dataset.cast_column("image", Image()) return dataset ######################################################## ### HF UPLOAD ######################################################## def upload_to_hf(abstract_df, contents_df, processed_arxiv_ids): # repo_id = HF_REPO_ID create_repo( repo_id=HF_REPO_ID_TXT, token=os.environ.get("HF_TOKEN"), private=True, repo_type="dataset", exist_ok=True, ) create_repo( repo_id=HF_REPO_ID_IMG, token=os.environ.get("HF_TOKEN"), private=True, repo_type="dataset", exist_ok=True, ) # upload image dataset try: img_ds = create_hf_image_dataset("data/arxiv_images") try: old_img_ds = load_dataset(HF_REPO_ID_IMG, "images")["train"] img_ds = concatenate_datasets([old_img_ds, img_ds]) except Exception as e: print(e) img_ds.push_to_hub(HF_REPO_ID_IMG, "images", token=os.environ.get("HF_TOKEN")) except Exception as e: print(e) # upload first pages only try: img_ds = img_ds.filter(lambda x: x["page_number"] == 1) img_ds.push_to_hub(HF_REPO_ID_IMG, "images_first_page", token=os.environ.get("HF_TOKEN")) except Exception as e: print(e) try: # push id_to_abstract abstract_ds = Dataset.from_pandas(abstract_df) abstract_ds.push_to_hub(HF_REPO_ID_TXT, "abstracts", token=os.environ.get("HF_TOKEN")) # push arxiv_items arxiv_ds = Dataset.from_pandas(contents_df) arxiv_ds.push_to_hub(HF_REPO_ID_TXT, "articles", token=os.environ.get("HF_TOKEN")) # push processed_arxiv_ids processed_arxiv_ids = [{"arxiv_id": arxiv_id} for arxiv_id in processed_arxiv_ids] processed_arxiv_ids_ds = Dataset.from_list(processed_arxiv_ids) processed_arxiv_ids_ds.push_to_hub(HF_REPO_ID_TXT, "processed_arxiv_ids", token=os.environ.get("HF_TOKEN")) except Exception as e: print(e) # trigger refresh of connected datasets print("==" * 40) print("Triggering refresh of connected datasets") api.restart_space(repo_id="answerdotai/zotero-weekly") print("==" * 40) ######################################################## ### MAIN ######################################################## def main(): # items = get_zotero_items(debug=True) items = get_zotero_items(debug=False) print(f"# of items fetched from zotero: {len(items)}") arxiv_items = get_arxiv_items(items) print(f"# of arxiv papers: {len(arxiv_items)}") # get already processed arxiv ids from HF try: existing_arxiv_ids = load_dataset(HF_REPO_ID_TXT, "processed_arxiv_ids")["train"]["arxiv_id"] except Exception as e: print(e) existing_arxiv_ids = [] existing_arxiv_ids = set(existing_arxiv_ids) print(f"# of existing arxiv ids: {len(existing_arxiv_ids)}") # new arxiv items arxiv_items = [item for item in arxiv_items if item["arxiv_id"] not in existing_arxiv_ids] arxiv_items = fetch_arxiv_htmls(arxiv_items) print(f"# of new arxiv items: {len(arxiv_items)}") if len(arxiv_items) == 0: print("No new arxiv items to process") return processed_arxiv_ids = set() pbar = tqdm(range(len(arxiv_items))) # remove "data" directory if it exists if os.path.exists("data"): try: shutil.rmtree("data") except Exception as e: print(e) for item in arxiv_items: # download images -- save_arxiv_article_images(item["arxiv_id"]) # parse html try: item["contents"] = parse_html_content(item["raw_content"]) except Exception as e: print(f"Failed to parse html for {item['arxiv_id']}: {e}") item["contents"] = [] if len(item["contents"]) == 0: print("Extracting from pdf...") md_content = get_pdf_text(item["arxiv_id"]) # fix this item["raw_content"] = md_content if md_content: item["contents"] = parse_markdown_content(md_content, item["arxiv_id"]) else: item["contents"] = [] if len(item["contents"]) > 0: processed_arxiv_ids.add(item["arxiv_id"]) if len(item["authors"]) == 0: item["authors"] = [] # ["unknown"] item["title"] = item["contents"][0]["paper_title"] pbar.update(1) pbar.close() # save contents --- processed_arxiv_ids = list(processed_arxiv_ids) print(f"# of processed arxiv ids: {len(processed_arxiv_ids)}") # save abstracts --- id_to_abstract = {} for item in arxiv_items: for entry in item["contents"]: if entry["content_type"] == "abstract": id_to_abstract[item["arxiv_id"]] = entry["content"] break print(f"# of abstracts: {len(id_to_abstract)}") abstract_df = pd.Series(id_to_abstract).reset_index().rename(columns={"index": "arxiv_id", 0: "abstract"}) print(abstract_df.head()) # add to existing dataset try: old_abstract_df = load_dataset(HF_REPO_ID_TXT, "abstracts")["train"].to_pandas() except Exception as e: print(e) old_abstract_df = pd.DataFrame(columns=abstract_df.columns) print(old_abstract_df.head()) abstract_df = pd.concat([old_abstract_df, abstract_df]).reset_index(drop=True) abstract_df = abstract_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True) # contents contents_df = pd.DataFrame(arxiv_items) print(contents_df.head()) try: old_contents_df = load_dataset(HF_REPO_ID_TXT, "articles")["train"].to_pandas() except Exception as e: print(e) old_contents_df = pd.DataFrame(columns=contents_df.columns) if len(old_contents_df) > 0: print(old_contents_df.sample().T) contents_df = pd.concat([old_contents_df, contents_df]).reset_index(drop=True) contents_df = contents_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True) # upload to hf processed_arxiv_ids = list(set(processed_arxiv_ids + list(existing_arxiv_ids))) upload_to_hf(abstract_df, contents_df, processed_arxiv_ids) # save as local copy os.makedirs("data", exist_ok=True) abstract_df.to_parquet("data/abstracts.parquet") contents_df.to_parquet("data/contents.parquet") srsly.write_json("data/processed_arxiv_ids.json", processed_arxiv_ids) def schedule_periodic_task(): """ Schedule the main task to run at the user-defined frequency """ # main() # run once initially frequency = "daily" # TODO: env if frequency == "hourly": print("Scheduling tasks to run every hour at the top of the hour") schedule.every().hour.at(":00").do(main) elif frequency == "daily": start_time = "10:00" print("Scheduling tasks to run every day at: {start_time} UTC+00") schedule.every().day.at(start_time).do(main) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": schedule_periodic_task()