Spaces:
Sleeping
Sleeping
"""Module for fetching and parsing articles from PubMed and PMC using Entrez efetch.""" | |
from __future__ import annotations | |
import html | |
import requests | |
import unicodedata | |
from abc import ABC, abstractmethod | |
from io import StringIO | |
from pathlib import Path | |
from typing import IO, Any, Dict, Union | |
from xml.etree.ElementTree import Element # nosec | |
from zipfile import ZipFile | |
from typing import Generator | |
from defusedxml import ElementTree | |
_ENTREZ_EFETCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" | |
def _db_parser(article_id:str) -> str|None: | |
"""Parse the article ID to ensure it is in the correct format.""" | |
db = None | |
if article_id.startswith('PMC') and article_id[3:].isdigit(): | |
db = "pmc" | |
elif article_id.isdigit(): | |
db = "pubmed" | |
return db | |
def _dl_article_xml(article_id:str, db:str|None) -> tuple[None|str,str] : | |
xml_string = None | |
params = {"db": db, "id": article_id, "retmode": "xml"} | |
response = requests.get(_ENTREZ_EFETCH_URL, params=params) | |
if response.status_code == 200: | |
xml_string = response.text | |
return xml_string | |
def _parse_article(xml_string:str, db:str) -> Union[None,ArticleParser] : | |
parsed_article = None | |
if db == "pmc": | |
parsed_article = JATSXMLParser.from_string(xml_string) | |
elif db == "pubmed": | |
parsed_article = PubMedXMLParser(xml_string) | |
# check if parsing was successful | |
if not parsed_article.abstract and not parsed_article.paragraphs: | |
parsed_article = None | |
return parsed_article | |
def _reformat_article(parsed_article:ArticleParser) -> Dict[str,Any] : | |
reformatted_article = {"Title":[parsed_article.title]} | |
for sec_title,sentence in parsed_article.abstract : | |
sec_title = "Abstract" if sec_title is None else "Abstract - " + sec_title | |
reformatted_article[sec_title] = reformatted_article.get(sec_title,[]) + [sentence] | |
for sec_title,sentence in parsed_article.paragraphs : | |
reformatted_article[sec_title] = reformatted_article.get(sec_title,[]) + [sentence] | |
return reformatted_article | |
def dl_and_parse(article_id:str) -> Dict[str,Union[None,Any]]: | |
"""Fetch article from PubMed or PMC using the ID using Entrez efetch | |
and parse it using the appropriate parser. Then returns dict containing keys : | |
article_xml(raw xml of downloaded article) and | |
article_sections (parsed sections in the form of a dictionary with keys as section titles | |
and values as list of text content)""" | |
parse_output = { | |
"db" : None, | |
"article_xml": None, | |
"article_sections": None, | |
} | |
# parse id for correct db format | |
parse_output["db"] = _db_parser(article_id) | |
if parse_output["db"] is None: | |
return parse_output | |
parse_output["article_xml"] = _dl_article_xml(article_id, parse_output["db"]) | |
article_parser = _parse_article(parse_output["article_xml"], parse_output["db"]) | |
if article_parser is None : | |
return parse_output | |
parse_output["article_sections"] = _reformat_article(article_parser) | |
return parse_output | |
class ArticleParser(ABC): | |
"""An abstract base class for article parsers.""" | |
def title(self) -> str: | |
"""Get the article title. | |
Returns | |
------- | |
str | |
The article title. | |
""" | |
def abstract(self) -> list[str]: | |
"""Get a sequence of paragraphs in the article abstract. | |
Returns | |
------- | |
list of str | |
The paragraphs of the article abstract. | |
""" | |
def paragraphs(self) -> list[tuple[str, str]]: | |
"""Get all paragraphs and titles of sections they are part of. | |
Returns | |
------- | |
list of (str, str) | |
For each paragraph a tuple with two strings is returned. The first | |
is the section title, the second the paragraph content. | |
""" | |
class JATSXMLParser(ArticleParser): | |
def __init__(self, xml_stream: IO[Any]) -> None: | |
super().__init__() | |
self.content = ElementTree.parse(xml_stream) | |
if self.content.getroot().tag == "pmc-articleset": | |
self.content = self.content.find("article") | |
def from_string(cls, xml_string: str) -> JATSXMLParser: | |
with StringIO(xml_string) as stream: | |
obj = cls(stream) | |
return obj | |
def from_zip(cls, path: str | Path) -> JATSXMLParser: | |
with ZipFile(path) as myzip: | |
xml_files = [ | |
x | |
for x in myzip.namelist() | |
if x.startswith("content/") and x.endswith(".xml") | |
] | |
if len(xml_files) != 1: | |
raise ValueError( | |
"There needs to be exactly one .xml file inside of content/" | |
) | |
xml_file = xml_files[0] | |
# Parsing logic | |
with myzip.open(xml_file, "r") as fh: | |
obj = cls(fh) | |
return obj | |
def title(self) -> str: | |
titles = self.content.find("./front/article-meta/title-group/article-title") | |
return self._element_to_str(titles) | |
def abstract(self) -> list[tuple[str, str]]: | |
abstract = self.content.find("./front/article-meta/abstract") | |
abstract_list: list[tuple[str, str]] = [] | |
if abstract: | |
for sec_title, text in self.parse_section(abstract): | |
abstract_list.append((sec_title,text)) | |
return abstract_list | |
def paragraphs(self) -> list[tuple[str, str]]: | |
paragraph_list: list[tuple[str, str]] = [] | |
# Paragraphs of text body | |
body = self.content.find("./body") | |
if body: | |
paragraph_list.extend(self.parse_section(body,"")) | |
# Figure captions | |
figs = self.content.findall("./body//fig") | |
for fig in figs: | |
fig_captions = fig.findall("caption") | |
if fig_captions is None: | |
continue | |
caption = " ".join(self._element_to_str(c) for c in list(fig_captions)) | |
if caption: | |
paragraph_list.append(("Figure Caption", caption)) | |
# Table captions | |
tables = self.content.findall("./body//table-wrap") | |
for table in tables: | |
caption_elements = table.findall("./caption/p") or table.findall( | |
"./caption/title" | |
) | |
if caption_elements is None: | |
continue | |
caption = " ".join(self._element_to_str(c) for c in caption_elements) | |
if caption: | |
paragraph_list.append(("Table Caption", caption)) | |
return paragraph_list | |
def parse_section(self, section: Element, sec_title_path: str = "") -> Generator[tuple[str, str], None, None]: | |
sec_title = self._element_to_str(section.find("title")) | |
if sec_title == "Author contributions": | |
return | |
sec_title_path = sec_title_path + " - " + sec_title if sec_title_path else sec_title | |
for element in section: | |
if element.tag == "sec": | |
yield from self.parse_section(element, sec_title_path) | |
elif element.tag in {"title", "caption", "fig", "table-wrap", "label"}: | |
continue | |
else: | |
text = self._element_to_str(element) | |
if text: | |
yield sec_title_path, text | |
def _inner_text(self, element: Element) -> str: | |
text_parts = [html.unescape(element.text or "")] | |
for sub_element in element: | |
# recursively parse the sub-element | |
text_parts.append(self._element_to_str(sub_element)) | |
# don't forget the text after the sub-element | |
text_parts.append(html.unescape(sub_element.tail or "")) | |
return unicodedata.normalize("NFKC", "".join(text_parts)).strip() | |
def _element_to_str(self, element: Element | None) -> str: | |
if element is None: | |
return "" | |
if element.tag in { | |
"bold", | |
"italic", | |
"monospace", | |
"p", | |
"sc", | |
"styled-content", | |
"underline", | |
"xref", | |
}: | |
# Mostly styling tags for which getting the inner text is enough. | |
# Currently this is the same as the default handling. Writing it out | |
# explicitly here to decouple from the default handling, which may | |
# change in the future. | |
return self._inner_text(element) | |
elif element.tag == "sub": | |
return f"_{self._inner_text(element)}" | |
elif element.tag == "sup": | |
return f"^{self._inner_text(element)}" | |
elif element.tag in { | |
"disp-formula", | |
"email", | |
"ext-link", | |
"inline-formula", | |
"uri", | |
}: | |
return "" | |
else: | |
# Default handling for all other element tags | |
return self._inner_text(element) | |
class PubMedXMLParser(ArticleParser): | |
"""Parser for PubMed abstract.""" | |
def __init__(self, data: str | bytes) -> None: | |
super().__init__() | |
self.content = ElementTree.fromstring(data) | |
def title(self) -> str: | |
title = self.content.find("./PubmedArticle/MedlineCitation/Article/ArticleTitle") | |
if title is None: | |
return "" | |
return "".join(title.itertext()) | |
def abstract(self) -> list[tuple[str,str]]: | |
abstract = self.content.find("./PubmedArticle/MedlineCitation/Article/Abstract") | |
if abstract is None: | |
# No paragraphs to parse: stop and return an empty iterable. | |
return [] # noqa | |
paragraphs = abstract.iter("AbstractText") | |
abstract_list: list[tuple[str,str]] = [] | |
if paragraphs is not None: | |
for paragraph in paragraphs: | |
sec_title = paragraph.get("Label") | |
abstract_list.append((sec_title,"".join(paragraph.itertext()))) | |
return abstract_list | |
def paragraphs(self) -> list[tuple[str, str]]: | |
# No paragraph to parse in PubMed article sets: return an empty iterable. | |
return [] | |