import json import logging import os.path import re import tempfile from functools import partial from typing import List, Optional, Tuple, Union import arxiv import gradio as gr import pandas as pd import requests import torch from bs4 import BeautifulSoup from document_store import DocumentStore, get_annotation_from_document from embedding import EmbeddingModel from model_utils import annotate_document, create_document, load_models from pie_modules.taskmodules import PointerNetworkTaskModuleForEnd2EndRE from pytorch_ie import Pipeline from pytorch_ie.documents import ( TextDocumentWithLabeledMultiSpansBinaryRelationsAndLabeledPartitions, TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, ) from rendering_utils import HIGHLIGHT_SPANS_JS, render_displacy, render_pretty_table from transformers import PreTrainedModel, PreTrainedTokenizer from vector_store import QdrantVectorStore, SimpleVectorStore logger = logging.getLogger(__name__) RENDER_WITH_DISPLACY = "displaCy + highlighted arguments" RENDER_WITH_PRETTY_TABLE = "Pretty Table" DEFAULT_MODEL_NAME = "ArneBinder/sam-pointer-bart-base-v0.3" DEFAULT_MODEL_REVISION = "76300f8e534e2fcf695f00cb49bba166739b8d8a" # local path # DEFAULT_MODEL_NAME = "models/dataset-sciarg/task-ner_re/v0.3/2024-05-28_23-33-46" # DEFAULT_MODEL_REVISION = None DEFAULT_EMBEDDING_MODEL_NAME = "allenai/scibert_scivocab_uncased" DEFAULT_DEVICE = "cuda" if torch.cuda.is_available() else "cpu" DEFAULT_EMBEDDING_MAX_LENGTH = 512 DEFAULT_EMBEDDING_BATCH_SIZE = 32 DEFAULT_SPLIT_REGEX = "\n\n\n+" DEFAULT_ARXIV_ID = "1706.03762" # Whether to handle segmented entities in the document. If True, labeled_spans are converted # to labeled_multi_spans and binary_relations with label "parts_of_same" are used to merge them. HANDLE_PARTS_OF_SAME = True def escape_regex(regex: str) -> str: # "double escape" the backslashes result = regex.encode("unicode_escape").decode("utf-8") return result def unescape_regex(regex: str) -> str: # reverse of escape_regex result = regex.encode("utf-8").decode("unicode_escape") return result def render_annotated_document( document: Union[ TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, TextDocumentWithLabeledMultiSpansBinaryRelationsAndLabeledPartitions, ], render_with: str, render_kwargs_json: str, ) -> str: render_kwargs = json.loads(render_kwargs_json) if render_with == RENDER_WITH_PRETTY_TABLE: html = render_pretty_table(document, **render_kwargs) elif render_with == RENDER_WITH_DISPLACY: html = render_displacy(document, **render_kwargs) else: raise ValueError(f"Unknown render_with value: {render_with}") return html def wrapped_process_text( text: str, doc_id: str, models: Tuple[Pipeline, Optional[EmbeddingModel]], document_store: DocumentStore, split_regex_escaped: str, handle_parts_of_same: bool = False, ) -> Tuple[ dict, Union[ TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, TextDocumentWithLabeledMultiSpansBinaryRelationsAndLabeledPartitions, ], ]: try: document = create_document( text=text, doc_id=doc_id, split_regex=unescape_regex(split_regex_escaped) if len(split_regex_escaped) > 0 else None, ) document = annotate_document( document=document, annotation_pipeline=models[0], embedding_model=models[1], handle_parts_of_same=handle_parts_of_same, ) document_store.add_document(document) except Exception as e: raise gr.Error(f"Failed to process text: {e}") # remove the embeddings because they are very large if document.metadata.get("embeddings"): document.metadata = {k: v for k, v in document.metadata.items() if k != "embeddings"} # Return as dict and document to avoid serialization issues return document.asdict(), document def process_uploaded_files( file_names: List[str], models: Tuple[Pipeline, Optional[EmbeddingModel]], document_store: DocumentStore, split_regex_escaped: str, show_max_cross_doc_sims: bool = False, min_similarity: float = 0.95, handle_parts_of_same: bool = False, ) -> pd.DataFrame: try: new_documents = [] for file_name in file_names: if file_name.lower().endswith(".txt"): # read the file content with open(file_name, "r", encoding="utf-8") as f: text = f.read() base_file_name = os.path.basename(file_name) gr.Info(f"Processing file '{base_file_name}' ...") new_document = create_document( text=text, doc_id=base_file_name, split_regex=unescape_regex(split_regex_escaped) if len(split_regex_escaped) > 0 else None, ) new_document = annotate_document( document=new_document, annotation_pipeline=models[0], embedding_model=models[1], handle_parts_of_same=handle_parts_of_same, ) new_documents.append(new_document) else: raise gr.Error(f"Unsupported file format: {file_name}") document_store.add_documents(new_documents) except Exception as e: raise gr.Error(f"Failed to process uploaded files: {e}") return document_store.overview( with_max_cross_doc_sims=show_max_cross_doc_sims, min_similarity=min_similarity ) def open_accordion(): return gr.Accordion(open=True) def close_accordion(): return gr.Accordion(open=False) def select_processed_document( evt: gr.SelectData, processed_documents_df: pd.DataFrame, document_store: DocumentStore, ) -> Union[ TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, TextDocumentWithLabeledMultiSpansBinaryRelationsAndLabeledPartitions, ]: row_idx, col_idx = evt.index col_name = processed_documents_df.columns[col_idx] if not col_name.endswith("doc_id"): col_name = "doc_id" doc_id = processed_documents_df.iloc[row_idx][col_name] doc = document_store.get_document(doc_id, with_embeddings=False) return doc def set_relation_types( models: Tuple[Pipeline, Optional[PreTrainedModel], Optional[PreTrainedTokenizer]], default: Optional[List[str]] = None, ) -> gr.Dropdown: arg_pipeline = models[0] if isinstance(arg_pipeline.taskmodule, PointerNetworkTaskModuleForEnd2EndRE): relation_types = arg_pipeline.taskmodule.labels_per_layer["binary_relations"] else: raise gr.Error("Unsupported taskmodule for relation types") return gr.Dropdown( choices=relation_types, label="Argumentative Relation Types", value=default, multiselect=True, ) def download_processed_documents( document_store: DocumentStore, file_name: str = "processed_documents.json", ) -> str: file_path = os.path.join(tempfile.gettempdir(), file_name) document_store.save_to_file(file_path, indent=2) return file_path def upload_processed_documents( file_name: str, document_store: DocumentStore, ) -> pd.DataFrame: document_store.add_documents_from_file(file_name) return document_store.overview() def clean_spaces(text: str) -> str: # replace all multiple spaces with a single space text = re.sub(" +", " ", text) # reduce more than two newlines to two newlines text = re.sub("\n\n+", "\n\n", text) # remove leading and trailing whitespaces text = text.strip() return text def get_cleaned_arxiv_paper_text(html_content: str) -> str: # parse the HTML content with BeautifulSoup soup = BeautifulSoup(html_content, "html.parser") # get alerts (this is one div with classes "package-alerts" and "ltx_document") alerts = soup.find("div", class_="package-alerts ltx_document") # get the "article" html element article = soup.find("article") article_text = article.get_text() # cleanup the text article_text_clean = clean_spaces(article_text) return article_text_clean def load_text_from_arxiv(arxiv_id: str, abstract_only: bool = False) -> Tuple[str, str]: arxiv_id = arxiv_id.strip() if not arxiv_id: arxiv_id = DEFAULT_ARXIV_ID search_by_id = arxiv.Search(id_list=[arxiv_id]) try: result = list(arxiv.Client().results(search_by_id)) except arxiv.HTTPError as e: raise gr.Error(f"Failed to fetch arXiv data: {e}") if len(result) == 0: raise gr.Error(f"Could not find any paper with arXive ID '{arxiv_id}'") first_result = result[0] if abstract_only: abstract_clean = first_result.summary.replace("\n", " ") return abstract_clean, first_result.entry_id if "/abs/" not in first_result.entry_id: raise gr.Error( f"Could not create the HTML URL for arXive ID '{arxiv_id}' because its entry ID has " f"an unexpected format: {first_result.entry_id}" ) html_url = first_result.entry_id.replace("/abs/", "/html/") request_result = requests.get(html_url) if request_result.status_code != 200: raise gr.Error( f"Could not fetch the HTML content for arXive ID '{arxiv_id}', status code: " f"{request_result.status_code}" ) html_content = request_result.text text_clean = get_cleaned_arxiv_paper_text(html_content) return text_clean, html_url def main(): example_text = "Scholarly Argumentation Mining (SAM) has recently gained attention due to its potential to help scholars with the rapid growth of published scientific literature. It comprises two subtasks: argumentative discourse unit recognition (ADUR) and argumentative relation extraction (ARE), both of which are challenging since they require e.g. the integration of domain knowledge, the detection of implicit statements, and the disambiguation of argument structure. While previous work focused on dataset construction and baseline methods for specific document sections, such as abstract or results, full-text scholarly argumentation mining has seen little progress. In this work, we introduce a sequential pipeline model combining ADUR and ARE for full-text SAM, and provide a first analysis of the performance of pretrained language models (PLMs) on both subtasks. We establish a new SotA for ADUR on the Sci-Arg corpus, outperforming the previous best reported result by a large margin (+7% F1). We also present the first results for ARE, and thus for the full AM pipeline, on this benchmark dataset. Our detailed error analysis reveals that non-contiguous ADUs as well as the interpretation of discourse connectors pose major challenges and that data annotation needs to be more consistent." print("Loading models ...") argumentation_model, embedding_model = load_models( model_name=DEFAULT_MODEL_NAME, revision=DEFAULT_MODEL_REVISION, embedding_model_name=DEFAULT_EMBEDDING_MODEL_NAME, embedding_max_length=DEFAULT_EMBEDDING_MAX_LENGTH, embedding_batch_size=DEFAULT_EMBEDDING_BATCH_SIZE, device=DEFAULT_DEVICE, ) default_render_kwargs = { "entity_options": { # we need to convert the keys to uppercase because the spacy rendering function expects them in uppercase "colors": { "own_claim".upper(): "#009933", "background_claim".upper(): "#99ccff", "data".upper(): "#993399", } }, "colors_hover": { "selected": "#ffa", # "tail": "#aff", "tail": { # green "supports": "#9f9", # red "contradicts": "#f99", # do not highlight "parts_of_same": None, }, "head": None, # "#faf", "other": None, }, } with gr.Blocks() as demo: document_store_state = gr.State( DocumentStore( span_annotation_caption="adu", relation_annotation_caption="relation", vector_store=QdrantVectorStore(), document_type=TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions if not HANDLE_PARTS_OF_SAME else TextDocumentWithLabeledMultiSpansBinaryRelationsAndLabeledPartitions, span_layer_name="labeled_spans" if not HANDLE_PARTS_OF_SAME else "labeled_multi_spans", ) ) # wrap the pipeline and the embedding model/tokenizer in a tuple to avoid that it gets called models_state = gr.State((argumentation_model, embedding_model)) with gr.Row(): with gr.Column(scale=1): doc_id = gr.Textbox( label="Document ID", value="user_input", ) doc_text = gr.Textbox( label="Text", lines=20, value=example_text, ) with gr.Accordion("Load Text from arXiv", open=False): arxiv_id = gr.Textbox( label="arXiv paper ID", placeholder=f"e.g. {DEFAULT_ARXIV_ID}", max_lines=1, ) load_arxiv_only_abstract = gr.Checkbox(label="abstract only", value=False) load_arxiv_btn = gr.Button("Load Text from arXiv", variant="secondary") load_arxiv_btn.click( fn=load_text_from_arxiv, inputs=[arxiv_id, load_arxiv_only_abstract], outputs=[doc_text, doc_id], ) with gr.Accordion("Model Configuration", open=False): model_name = gr.Textbox( label="Model Name", value=DEFAULT_MODEL_NAME, ) model_revision = gr.Textbox( label="Model Revision", value=DEFAULT_MODEL_REVISION, ) embedding_model_name = gr.Textbox( label=f"Embedding Model Name (e.g. {DEFAULT_EMBEDDING_MODEL_NAME})", value=DEFAULT_EMBEDDING_MODEL_NAME, ) embedding_max_length = gr.Slider( label="Embedding Model Max Length", minimum=16, maximum=2048, step=8, value=DEFAULT_EMBEDDING_MAX_LENGTH, ) embedding_batch_size = gr.Slider( label="Embedding Model Batch Size", minimum=1, maximum=128, step=1, value=DEFAULT_EMBEDDING_BATCH_SIZE, ) device = gr.Textbox( label="Device (e.g. 'cuda' or 'cpu')", value=DEFAULT_DEVICE, ) load_models_btn = gr.Button("Load Models") load_models_btn.click( fn=load_models, inputs=[ model_name, model_revision, embedding_model_name, embedding_max_length, embedding_batch_size, device, ], outputs=models_state, ) split_regex_escaped = gr.Textbox( label="Regex to partition the text", placeholder="Regular expression pattern to split the text into partitions", value=escape_regex(DEFAULT_SPLIT_REGEX), ) predict_btn = gr.Button("Analyse") document_state = gr.State() with gr.Column(scale=1): with gr.Accordion("See plain result ...", open=False) as output_accordion: document_json = gr.JSON(label="Model Output") with gr.Accordion("Render Options", open=False): render_as = gr.Dropdown( label="Render with", choices=[RENDER_WITH_PRETTY_TABLE, RENDER_WITH_DISPLACY], value=RENDER_WITH_DISPLACY, ) render_kwargs = gr.Textbox( label="Render Arguments", lines=5, value=json.dumps(default_render_kwargs, indent=2), ) render_btn = gr.Button("Re-render") rendered_output = gr.HTML(label="Rendered Output") with gr.Column(scale=1): with gr.Accordion( "Indexed Documents", open=False ) as processed_documents_accordion: processed_documents_df = gr.DataFrame( headers=["id", "num_adus", "num_relations"], interactive=False, ) show_max_cross_docu_sims = gr.Checkbox( label="Show max cross-document similarities", value=False ) gr.Markdown("Data Snapshot:") with gr.Row(): download_processed_documents_btn = gr.DownloadButton("Download") upload_processed_documents_btn = gr.UploadButton( "Upload", file_types=["json"] ) upload_btn = gr.UploadButton( "Upload & Analyse Reference Documents", file_types=["text"], file_count="multiple", ) with gr.Accordion("Selected ADU", open=False): selected_adu_id = gr.Textbox(label="ID", elem_id="selected_adu_id") selected_adu_text = gr.Textbox(label="Text") with gr.Accordion("Retrieval Configuration", open=False): min_similarity = gr.Slider( label="Minimum Similarity", minimum=0.0, maximum=1.0, step=0.01, value=0.95, ) top_k = gr.Slider( label="Top K", minimum=2, maximum=50, step=1, value=20, ) retrieve_similar_adus_btn = gr.Button("Retrieve similar ADUs") similar_adus = gr.DataFrame(headers=["doc_id", "adu_id", "score", "text"]) all2all_adu_similarities_button = gr.Button( "Compute all ADU-to-ADU similarities" ) all2all_adu_similarities = gr.DataFrame( headers=["sim_score", "doc_id", "other_doc_id", "text", "other_text"] ) relation_types = set_relation_types( models_state.value, default=["supports", "contradicts"] ) # retrieve_relevant_adus_btn = gr.Button("Retrieve relevant ADUs") relevant_adus = gr.DataFrame( label="Relevant ADUs from other documents", headers=[ "relation", "adu", "reference_adu", "doc_id", "sim_score", "rel_score", ], interactive=False, ) render_event_kwargs = dict( fn=render_annotated_document, inputs=[document_state, render_as, render_kwargs], outputs=rendered_output, ) show_overview_kwargs = dict( fn=lambda document_store, show_max_sims, min_sim: document_store.overview( with_max_cross_doc_sims=show_max_sims, min_similarity=min_sim ), inputs=[document_store_state, show_max_cross_docu_sims, min_similarity], outputs=[processed_documents_df], ) predict_btn.click(fn=open_accordion, inputs=[], outputs=[output_accordion]).then( fn=partial(wrapped_process_text, handle_parts_of_same=HANDLE_PARTS_OF_SAME), inputs=[ doc_text, doc_id, models_state, document_store_state, split_regex_escaped, ], outputs=[document_json, document_state], api_name="predict", ).success(**show_overview_kwargs) render_btn.click(**render_event_kwargs, api_name="render") document_state.change( fn=lambda doc: doc.asdict(), inputs=[document_state], outputs=[document_json], ).success(close_accordion, inputs=[], outputs=[output_accordion]).then( **render_event_kwargs ) upload_btn.upload( fn=open_accordion, inputs=[], outputs=[processed_documents_accordion] ).then( fn=partial(process_uploaded_files, handle_parts_of_same=HANDLE_PARTS_OF_SAME), inputs=[ upload_btn, models_state, document_store_state, split_regex_escaped, show_max_cross_docu_sims, min_similarity, ], outputs=[processed_documents_df], ) processed_documents_df.select( select_processed_document, inputs=[processed_documents_df, document_store_state], outputs=[document_state], ) show_max_cross_docu_sims.change(**show_overview_kwargs) download_processed_documents_btn.click( fn=partial(download_processed_documents, file_name="processed_documents.zip"), inputs=[document_store_state], outputs=[download_processed_documents_btn], ) upload_processed_documents_btn.upload( fn=upload_processed_documents, inputs=[upload_processed_documents_btn, document_store_state], outputs=[processed_documents_df], ) retrieve_relevant_adus_event_kwargs = dict( fn=partial( DocumentStore.get_related_annotations_from_other_documents_df, columns=relevant_adus.headers, ), inputs=[ document_store_state, selected_adu_id, document_state, min_similarity, top_k, relation_types, ], outputs=[relevant_adus], ) selected_adu_id.change( fn=partial( get_annotation_from_document, annotation_layer="labeled_spans" if not HANDLE_PARTS_OF_SAME else "labeled_multi_spans", use_predictions=True, ), inputs=[document_state, selected_adu_id], outputs=[selected_adu_text], ).success(**retrieve_relevant_adus_event_kwargs) retrieve_similar_adus_btn.click( fn=lambda document_store, ann_id, document, min_sim, k: document_store.get_similar_annotations_df( ref_annotation_id=ann_id, ref_document=document, min_similarity=min_sim, top_k=k, annotation_layer="labeled_spans" if not HANDLE_PARTS_OF_SAME else "labeled_multi_spans", ), inputs=[ document_store_state, selected_adu_id, document_state, min_similarity, top_k, ], outputs=[similar_adus], ) models_state.change( fn=set_relation_types, inputs=[models_state], outputs=[relation_types], ) all2all_adu_similarities_button.click( fn=partial( DocumentStore.get_all2all_adu_similarities, columns=all2all_adu_similarities.headers, ), inputs=[document_store_state, min_similarity], outputs=[all2all_adu_similarities], ) # retrieve_relevant_adus_btn.click( # **retrieve_relevant_adus_event_kwargs # ) rendered_output.change(fn=None, js=HIGHLIGHT_SPANS_JS, inputs=[], outputs=[]) demo.launch() if __name__ == "__main__": # configure logging logging.basicConfig() main()