from pathlib import Path from arxiv_public_data.fulltext import convert_directory_parallel from arxiv_public_data import internal_citations import torch import os from summarizer import Summarizer from sentence_transformers import SentenceTransformer import spacy import numpy as np from keybert import KeyBERT import shutil, joblib from distutils.dir_util import copy_tree try: from transformers import * except: from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoConfig, AutoModel, LEDTokenizer, \ LEDForConditionalGeneration from src.defaults import DEFAULTS_CPU_COMPAT, DEFAULTS_HIGH_GPU class Surveyor: ''' A class to abstract all nlp and data mining helper functions as well as workflows required to generate the survey from a single query, with absolute configurability ''' def __init__( self, models_dir=None, title_model_name=None, ex_summ_model_name=None, ledmodel_name=None, embedder_name=None, nlp_name=None, similarity_nlp_name=None, kw_model_name=None, high_gpu=False, refresh_models=False, no_save_models=False, print_fn=None, survey_print_fn=None ): ''' Initializes models and directory structure for the surveyor Optional Params: - models_dir: String, directory to save to huge models - title_model_name: String, title model name/tag in hugging-face, defaults to `Callidior/bert2bert-base-arxiv-titlegen` - ex_summ_model_name: String, extractive summary model name/tag in hugging-face, defaults to `allenai/scibert_scivocab_uncased` - ledmodel_name: String, led model(for abstractive summary) name/tag in hugging-face, defaults to `allenai/led-large-16384-arxiv` - embedder_name: String, sentence embedder name/tag in hugging-face, defaults to `paraphrase-MiniLM-L6-v2` - nlp_name: String, spacy model name/tag in hugging-face (if changed - needs to be spacy-installed prior), defaults to `en_core_sci_scibert` - similarity_nlp_name: String, spacy downstream trained model(for similarity) name/tag in hugging-face (if changed - needs to be spacy-installed prior), defaults to `en_core_sci_lg` - kw_model_name: String, keyword extraction model name/tag in hugging-face, defaults to `distilbert-base-nli-mean-tokens` - high_gpu: Bool, High GPU usage permitted, defaults to False - refresh_models: Bool, Refresh model downloads with given names (needs atleast one model name param above), defaults to False - no_save_models: forces refresh models - max_search: int maximium number of papers to gaze at - defaults to 100 - num_papers: int maximium number of papers to download and analyse - defaults to 25 ''' self.print_fn = print if print_fn is not None: self.print_fn = print_fn self.survey_print_fn = self.print_fn if survey_print_fn is not None: self.survey_print_fn = survey_print_fn self.torch_device = 'cpu' self.print_fn("\nTorch_device: " + self.torch_device) if torch.cuda.is_available(): self.torch_device = 'cuda' spacy.require_gpu() self.high_gpu = high_gpu self.DEFAULTS = DEFAULTS_CPU_COMPAT if self.high_gpu: self.DEFAULTS = DEFAULTS_HIGH_GPU if not kw_model_name: kw_model_name = self.DEFAULTS["kw_model_name"] self.num_papers = self.DEFAULTS['num_papers'] self.max_search = self.DEFAULTS['max_search'] if not models_dir: models_dir = self.DEFAULTS['models_dir'] models_found = False if os.path.exists(models_dir) and not no_save_models: if len(os.listdir(models_dir)) > 3: models_found = True if not title_model_name: title_model_name = self.DEFAULTS["title_model_name"] if not ex_summ_model_name: ex_summ_model_name = self.DEFAULTS["ex_summ_model_name"] if not ledmodel_name: ledmodel_name = self.DEFAULTS["ledmodel_name"] if not embedder_name: embedder_name = self.DEFAULTS["embedder_name"] if not nlp_name: nlp_name = self.DEFAULTS["nlp_name"] if not similarity_nlp_name: similarity_nlp_name = self.DEFAULTS["similarity_nlp_name"] if refresh_models or not models_found: self.print_fn(f'\nInitializing models {"and saving (about 5GB)" if not no_save_models else ""}') if not no_save_models: self.clean_dirs([models_dir]) self.title_tokenizer = AutoTokenizer.from_pretrained(title_model_name, trust_remote_code=True) self.title_model = AutoModelForSeq2SeqLM.from_pretrained(title_model_name, trust_remote_code=True).to(self.torch_device) self.title_model.eval() if not no_save_models: self.title_model.save_pretrained(models_dir + "/title_model") #self.title_tokenizer.save_pretrained(models_dir + "/title_tokenizer") # summary model self.custom_config = AutoConfig.from_pretrained(ex_summ_model_name) self.custom_config.output_hidden_states = True self.summ_tokenizer = AutoTokenizer.from_pretrained(ex_summ_model_name) self.summ_model = AutoModel.from_pretrained(ex_summ_model_name, config=self.custom_config).to( self.torch_device) self.summ_model.eval() if not no_save_models: self.summ_model.save_pretrained(models_dir + "/summ_model") #self.summ_tokenizer.save_pretrained(models_dir + "/summ_tokenizer") self.model = Summarizer(custom_model=self.summ_model, custom_tokenizer=self.summ_tokenizer) if 'led' in ledmodel_name: self.ledtokenizer = LEDTokenizer.from_pretrained(ledmodel_name) self.ledmodel = LEDForConditionalGeneration.from_pretrained(ledmodel_name).to(self.torch_device) elif 't5' in ledmodel_name: self.ledtokenizer = AutoTokenizer.from_pretrained(ledmodel_name) self.ledmodel = T5ForConditionalGeneration.from_pretrained(ledmodel_name).to(self.torch_device) elif 'bart' in ledmodel_name: self.ledtokenizer = AutoTokenizer.from_pretrained(ledmodel_name) self.ledmodel = BartForConditionalGeneration.from_pretrained(ledmodel_name).to(self.torch_device) self.ledmodel.eval() if not no_save_models: self.ledmodel.save_pretrained(models_dir + "/ledmodel") #self.ledtokenizer.save_pretrained(models_dir + "/ledtokenizer") self.embedder = SentenceTransformer(embedder_name) self.embedder.eval() if not no_save_models: self.embedder.save(models_dir + "/embedder") else: self.print_fn("\n- Initializing from previously saved models at" + models_dir) self.title_tokenizer = AutoTokenizer.from_pretrained(title_model_name) self.title_model = AutoModelForSeq2SeqLM.from_pretrained(models_dir + "/title_model").to(self.torch_device) self.title_model.eval() # summary model #self.summ_config = AutoConfig.from_pretrained(ex_summ_model_name) #self.summ_config.output_hidden_states = True self.summ_tokenizer = AutoTokenizer.from_pretrained(ex_summ_model_name) self.summ_model = AutoModel.from_pretrained(models_dir + "/summ_model").to( self.torch_device) self.summ_model.eval() self.model = Summarizer(custom_model=self.summ_model, custom_tokenizer=self.summ_tokenizer) if 'led' in ledmodel_name: self.ledtokenizer = LEDTokenizer.from_pretrained(ledmodel_name) self.ledmodel = LEDForConditionalGeneration.from_pretrained(models_dir + "/ledmodel").to(self.torch_device) elif 't5' in ledmodel_name: self.ledtokenizer = AutoTokenizer.from_pretrained(ledmodel_name) self.ledmodel = T5ForConditionalGeneration.from_pretrained(models_dir + "/ledmodel").to(self.torch_device) elif 'bart' in ledmodel_name: self.ledtokenizer = AutoTokenizer.from_pretrained(ledmodel_name) self.ledmodel = BartForConditionalGeneration.from_pretrained(models_dir + "/ledmodel").to(self.torch_device) self.ledmodel.eval() self.embedder = SentenceTransformer(models_dir + "/embedder") self.embedder.eval() self.nlp = spacy.load(nlp_name) self.similarity_nlp = spacy.load(similarity_nlp_name) self.kw_model = KeyBERT(kw_model_name) def define_structure(self, pdf_dir=None, txt_dir=None, img_dir=None, tab_dir=None, dump_dir=None): if pdf_dir: survey_pdf_dir = pdf_dir else: survey_pdf_dir = self.DEFAULTS["pdf_dir"] if txt_dir: survey_txt_dir = txt_dir else: survey_txt_dir = self.DEFAULTS["txt_dir"] if img_dir: survey_img_dir = img_dir else: survey_img_dir = self.DEFAULTS["img_dir"] if tab_dir: survey_tab_dir = tab_dir else: survey_tab_dir = self.DEFAULTS["tab_dir"] if dump_dir: survey_dump_dir = dump_dir else: survey_dump_dir = self.DEFAULTS["dump_dir"] dirs = [survey_pdf_dir, survey_txt_dir, survey_img_dir, survey_tab_dir, survey_dump_dir] if sum([True for dir in dirs if 'arxiv_data/' in dir]): base = os.path.dirname("arxiv_data/") if not os.path.exists(base): os.mkdir(base) self.clean_dirs(dirs) return dirs def clean_dirs(self, dirs): import shutil for d in dirs: if os.path.exists(d): shutil.rmtree(d) os.mkdir(d) def pdf_route(self, pdf_dir, txt_dir, img_dir, tab_dir, dump_dir, papers_meta): ## Data prep import joblib # test full again - check images - check dfs !! self.clean_dirs([pdf_dir, txt_dir, img_dir, tab_dir, dump_dir]) papers = papers_meta[:self.num_papers] selected_papers = papers ids_none, papers, cites = self.fetch_papers(dump_dir, img_dir, papers, pdf_dir, tab_dir, txt_dir) self.print_fn("\n- First stage paper collection complete, papers collected: \n" + ', '.join([p['id'] for p in papers])) new_papers = papers_meta[self.num_papers : self.num_papers + len(ids_none)] # _ = self.get_freq_cited(cites) ''' filtered_idlist = [] for c in self.get_freq_cited(cites): if c in _, new_searched_papers = self.search(filtered_idlist) new_papers.extend(new_searched_papers) ''' selected_papers.extend(new_papers) _, new_papers, _ = self.fetch_papers(dump_dir, img_dir, new_papers, pdf_dir, tab_dir, txt_dir, repeat=True) self.print_fn("\n- Second stage paper collection complete, new papers collected: \n" + ', '.join([p['id'] for p in new_papers])) papers.extend(new_papers) joblib.dump(papers, dump_dir + 'papers_extracted_pdf_route.dmp') copy_tree(img_dir, dump_dir + os.path.basename(img_dir)) copy_tree(tab_dir, dump_dir + os.path.basename(tab_dir)) self.print_fn("\n- Extracting section-wise highlights.. ") papers = self.extract_highlights(papers) return papers, selected_papers, cites def get_freq_cited(self, cites_dict, k=5): cites_list = [] for k, v in cites_dict.items(): cites_list.append(k) [cites_list.append(val) for val in v] cite_freqs = {cite: cites_list.count(cite) for cite in set(cites_list)} sorted_cites = dict(sorted(cite_freqs.items(), key=lambda item: item[1], reverse=True)[:5]) return sorted_cites.keys() def fetch_papers(self, dump_dir, img_dir, papers, pdf_dir, tab_dir, txt_dir, repeat=False): import tempfile if repeat: with tempfile.TemporaryDirectory() as dirpath: self.print_fn("\n- downloading extra pdfs.. ") # full text preparation of selected papers self.download_pdfs(papers, dirpath) dirpath_pdfs = os.listdir(dirpath) for file_name in dirpath_pdfs: full_file_name = os.path.join(dirpath, file_name) if os.path.isfile(full_file_name): shutil.copy(full_file_name, pdf_dir) self.print_fn("\n- converting extra pdfs.. ") self.convert_pdfs(dirpath, txt_dir) else: self.print_fn("\n- downloading pdfs.. ") # full text preparation of selected papers self.download_pdfs(papers, pdf_dir) self.print_fn("\n- converting pdfs.. ") self.convert_pdfs(pdf_dir, txt_dir) # plugging citations to our papers object self.print_fn("\n- plugging in citation network.. ") papers, cites = self.cocitation_network(papers, txt_dir) joblib.dump(papers, dump_dir + 'papers_selected_pdf_route.dmp') from distutils.dir_util import copy_tree copy_tree(txt_dir, dump_dir + os.path.basename(txt_dir)) copy_tree(pdf_dir, dump_dir + os.path.basename(pdf_dir)) self.print_fn("\n- extracting structure.. ") papers, ids_none = self.extract_structure(papers, pdf_dir, txt_dir, img_dir, dump_dir, tab_dir) return ids_none, papers, cites def tar_route(self, pdf_dir, txt_dir, img_dir, tab_dir, papers): ## Data prep import joblib # test full again - check images - check dfs !! self.clean_dirs([pdf_dir, txt_dir, img_dir, tab_dir]) # full text preparation of selected papers self.download_sources(papers, pdf_dir) self.convert_pdfs(pdf_dir, txt_dir) # plugging citations to our papers object papers, cites = self.cocitation_network(papers, txt_dir) joblib.dump(papers, 'papers_selected_tar_route.dmp') papers = self.extract_structure(papers, pdf_dir, txt_dir, img_dir, tab_dir) joblib.dump(papers, 'papers_extracted_tar_route.dmp') return papers def build_doc(self, research_sections, papers, query=None, filename='survey.txt'): import arxiv2bib self.print_fn("\n- building bibliography entries.. ") bibentries = arxiv2bib.arxiv2bib([p['id'] for p in papers]) bibentries = [r.bibtex() for r in bibentries] self.print_fn("\n- building final survey file .. at "+ filename) file = open(filename, 'w+') if query is None: query = 'Internal(existing) research' self.survey_print_fn("#### Generated_survey:") file.write("----------------------------------------------------------------------") file.write("Title: A survey on " + query) self.survey_print_fn("") self.survey_print_fn("----------------------------------------------------------------------") self.survey_print_fn("Title: A survey on " + query) file.write("Author: Auto-Research (github.com/sidphbot/Auto-Research)") self.survey_print_fn("Author: Auto-Research (github.com/sidphbot/Auto-Research)") file.write("Dev: Auto-Research (github.com/sidphbot/Auto-Research)") self.survey_print_fn("Dev: Auto-Research (github.com/sidphbot/Auto-Research)") file.write("Disclaimer: This survey is intended to be a research starter. This Survey is Machine-Summarized, "+ "\nhence some sentences might be wrangled or grammatically incorrect. However all sentences are "+ "\nmined with proper citations. As All of the text is practically quoted texted, hence to "+ "\nimprove visibility, all the papers are duly cited in the Bibiliography section. as bibtex "+ "\nentries(only to avoid LaTex overhead). ") self.survey_print_fn("Disclaimer: This survey is intended to be a research starter. This Survey is Machine-Summarized, "+ "\nhence some sentences might be wrangled or grammatically incorrect. However all sentences are "+ "\nmined with proper citations. As All of the text is practically quoted texted, hence to "+ "\nimprove visibility, all the papers are duly cited in the Bibiliography section. as bibtex "+ "\nentries(only to avoid LaTex overhead). ") file.write("----------------------------------------------------------------------") self.survey_print_fn("----------------------------------------------------------------------") file.write("") self.survey_print_fn("") file.write('ABSTRACT') self.survey_print_fn('ABSTRACT') self.survey_print_fn("=================================================") file.write("=================================================") file.write("") self.survey_print_fn("") file.write(research_sections['abstract']) self.survey_print_fn(research_sections['abstract']) file.write("") self.survey_print_fn("") file.write('INTRODUCTION') self.survey_print_fn('INTRODUCTION') self.survey_print_fn("=================================================") file.write("=================================================") file.write("") self.survey_print_fn("") file.write(research_sections['introduction']) self.survey_print_fn(research_sections['introduction']) file.write("") self.survey_print_fn("") for k, v in research_sections.items(): if k not in ['abstract', 'introduction', 'conclusion']: file.write(k.upper()) self.survey_print_fn(k.upper()) self.survey_print_fn("=================================================") file.write("=================================================") file.write("") self.survey_print_fn("") file.write(v) self.survey_print_fn(v) file.write("") self.survey_print_fn("") file.write('CONCLUSION') self.survey_print_fn('CONCLUSION') self.survey_print_fn("=================================================") file.write("=================================================") file.write("") self.survey_print_fn("") file.write(research_sections['conclusion']) self.survey_print_fn(research_sections['conclusion']) file.write("") self.survey_print_fn("") file.write('REFERENCES') self.survey_print_fn('REFERENCES') self.survey_print_fn("=================================================") file.write("=================================================") file.write("") self.survey_print_fn("") for entry in bibentries: file.write(entry) self.survey_print_fn(entry) file.write("") self.survey_print_fn("") self.survey_print_fn("========================XXX=========================") file.write("========================XXX=========================") file.close() def build_basic_blocks(self, corpus_known_sections, corpus): research_blocks = {} for head, textarr in corpus_known_sections.items(): if 'cuda' in self.torch_device: torch.cuda.empty_cache() # self.print_fn(head.upper()) with torch.no_grad(): summtext = self.model(" ".join([l.lower() for l in textarr]), ratio=0.5) res = self.nlp(summtext) res = set([str(sent) for sent in list(res.sents)]) summtext = ''.join([line for line in res]) # pself.print_fn(summtext) research_blocks[head] = summtext return research_blocks def abstractive_summary(self, longtext): ''' faulty method input_ids = ledtokenizer(longtext, return_tensors="pt").input_ids global_attention_mask = torch.zeros_like(input_ids) # set global_attention_mask on first token global_attention_mask[:, 0] = 1 sequences = ledmodel.generate(input_ids, global_attention_mask=global_attention_mask).sequences summary = ledtokenizer.batch_decode(sequences) ''' if 'cuda' in self.torch_device: torch.cuda.empty_cache() inputs = self.ledtokenizer.prepare_seq2seq_batch(longtext, truncation=True, padding='longest', return_tensors='pt').to(self.torch_device) with torch.no_grad(): summary_ids = self.ledmodel.generate(**inputs) summary = self.ledtokenizer.batch_decode(summary_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True) res = self.nlp(summary[0]) res = set([str(sent) for sent in list(res.sents)]) summtext = ''.join([line for line in res]) #self.print_fn("abstractive summary type:" + str(type(summary))) return summtext def get_abstract(self, abs_lines, corpus_known_sections, research_blocks): # abs_lines = " ".join(abs_lines) abs_lines = "" abs_lines += " ".join([l.lower() for l in corpus_known_sections['abstract']]) abs_lines += research_blocks['abstract'] # self.print_fn(abs_lines) try: return self.abstractive_summary(abs_lines) except: highlights = self.extractive_summary(abs_lines) return self.abstractive_summary(highlights) def get_corpus_lines(self, corpus): abs_lines = [] types = set() for k, v in corpus.items(): # self.print_fn(v) types.add(type(v)) abstext = k + '. ' + v.replace('\n', ' ') abstext = self.nlp(abstext) abs_lines.extend([str(sent).lower() for sent in list(abstext.sents)]) #self.print_fn("unique corpus value types:" + str(types)) # abs_lines = '\n'.join([str(sent) for sent in abs_lines.sents]) return abs_lines def get_sectioned_docs(self, papers, papers_meta): import random docs = [] for p in papers: for section in p['sections']: if len(section['highlights']) > 0: content = self.extractive_summary(''.join(section['highlights'])) docs.append(content) selected_pids = [p['id'] for p in papers] meta_abs = [] for p in papers_meta: if p['id'] not in selected_pids: meta_abs.append(self.generate_title(p['abstract'])) docs.extend(meta_abs) #self.print_fn("meta_abs num"+str(len(meta_abs))) #self.print_fn("selected_pids num"+str(len(selected_pids))) #self.print_fn("papers_meta num"+str(len(papers_meta))) #assert (len(meta_abs) + len(selected_pids) == len(papers_meta)) assert ('str' in str(type(random.sample(docs, 1)[0]))) return [doc for doc in docs if doc != ''] def cluster_lines(self, abs_lines): from sklearn.cluster import KMeans # from bertopic import BERTopic # topic_model = BERTopic(embedding_model=embedder) if 'cuda' in self.torch_device: torch.cuda.empty_cache() corpus_embeddings = self.embedder.encode(abs_lines) # Normalize the embeddings to unit length corpus_embeddings = corpus_embeddings / np.linalg.norm(corpus_embeddings, axis=1, keepdims=True) with torch.no_grad(): optimal_k = self.model.calculate_optimal_k(' '.join(abs_lines), k_max=10) # Perform kmean clustering clustering_model = KMeans(n_clusters=optimal_k, n_init=20) # clustering_model = AgglomerativeClustering(n_clusters=optimal_k, affinity='cosine', linkage='average') #, affinity='cosine', linkage='average', distance_threshold=0.4) clustering_model.fit(corpus_embeddings) cluster_assignment = clustering_model.labels_ clustered_sentences = {} dummy_count = 0 for sentence_id, cluster_id in enumerate(cluster_assignment): if cluster_id not in clustered_sentences: clustered_sentences[cluster_id] = [] ''' if dummy_count < 5: self.print_fn("abs_line: "+abs_lines[sentence_id]) self.print_fn("cluster_ID: "+str(cluster_id)) self.print_fn("embedding: "+str(corpus_embeddings[sentence_id])) dummy_count += 1 ''' clustered_sentences[cluster_id].append(abs_lines[sentence_id]) # for i, cluster in clustered_sentences.items(): # self.print_fn("Cluster ", i+1) # self.print_fn(cluster) # self.print_fn("") return self.get_clustered_sections(clustered_sentences), clustered_sentences def get_clusters(self, papers, papers_meta): from sklearn.cluster import KMeans # from bertopic import BERTopic # topic_model = BERTopic(embedding_model=embedder) if 'cuda' in self.torch_device: torch.cuda.empty_cache() abs_lines = self.get_sectioned_docs(papers, papers_meta) corpus_embeddings = self.embedder.encode(abs_lines) # Normalize the embeddings to unit length corpus_embeddings = corpus_embeddings / np.linalg.norm(corpus_embeddings, axis=1, keepdims=True) with torch.no_grad(): optimal_k = self.model.calculate_optimal_k(' '.join(abs_lines), k_max=10) # Perform kmean clustering clustering_model = KMeans(n_clusters=optimal_k, n_init=20) # clustering_model = AgglomerativeClustering(n_clusters=optimal_k, affinity='cosine', linkage='average') #, affinity='cosine', linkage='average', distance_threshold=0.4) clustering_model.fit(corpus_embeddings) cluster_assignment = clustering_model.labels_ clustered_sentences = {} dummy_count = 0 for sentence_id, cluster_id in enumerate(cluster_assignment): if cluster_id not in clustered_sentences: clustered_sentences[cluster_id] = [] ''' if dummy_count < 5: self.print_fn("abs_line: "+abs_lines[sentence_id]) self.print_fn("cluster_ID: "+str(cluster_id)) self.print_fn("embedding: "+str(corpus_embeddings[sentence_id])) dummy_count += 1 ''' clustered_sentences[cluster_id].append(abs_lines[sentence_id]) # for i, cluster in clustered_sentences.items(): # self.print_fn("Cluster ", i+1) # self.print_fn(cluster) # self.print_fn("") return self.get_clustered_sections(clustered_sentences), clustered_sentences def generate_title(self, longtext): if 'cuda' in self.torch_device: torch.cuda.empty_cache() inputs = self.title_tokenizer.prepare_seq2seq_batch(longtext, truncation=True, padding='longest', return_tensors='pt').to(self.torch_device) with torch.no_grad(): summary_ids = self.title_model.generate(**inputs) summary = self.title_tokenizer.batch_decode(summary_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True) return str(summary[0]) def get_clustered_sections(self, clustered_lines): clusters_dict = {} for i, cluster in clustered_lines.items(): # self.print_fn(cluster) try: clusters_dict[self.generate_title(str(" ".join(cluster)))] = self.abstractive_summary( str(" ".join(cluster)).lower()) except: clusters_dict[self.generate_title(str(" ".join(cluster)))] = self.abstractive_summary( self.extractive_summary(str(" ".join(cluster)).lower())) return clusters_dict def get_intro(self, corpus_known_sections, research_blocks): intro_lines = "" intro_lines += str(" ".join([l.lower() for l in corpus_known_sections['introduction']])) + str( " ".join([l.lower() for l in corpus_known_sections['conclusion']])) intro_lines += research_blocks['introduction'] + research_blocks['conclusion'] try: return self.abstractive_summary(intro_lines) except: return self.abstractive_summary(self.extractive_summary(intro_lines)) def get_conclusion(self, research_sections): paper_body = "" for k, v in research_sections.items(): paper_body += v try: return self.abstractive_summary(paper_body) except: return self.abstractive_summary(self.extractive_summary(paper_body)) def build_corpus_sectionwise(self, papers): known = ['abstract', 'introduction', 'conclusion'] corpus_known_sections = {} for kh in known: khtext = [] for p in papers: for section in p['sections']: if kh in section['heading']: khtext.extend(section['highlights']) # self.print_fn(khtext) corpus_known_sections[kh] = khtext return corpus_known_sections def standardize_headings(self, papers): known = ['abstract', 'introduction', 'discussion', 'relatedwork', 'contribution', 'analysis', 'experiments', 'conclusion'] for p in papers: # self.print_fn("================================") headings = [section['heading'] for section in p['sections'] if len(section['heading'].split()) < 3] # self.print_fn("id: "+ str(p['id'])+"\nHeadings: \n"+str('\n'.join(headings))) for kh in known: for section in p['sections']: if len(section['heading'].split()) < 3: # self.print_fn(section['heading']) if kh in ''.join(filter(str.isalpha, section['heading'].replace(' ', '').lower())): # self.print_fn("orig head: "+ section['heading'] +", plain head:" + kh) section['heading'] = kh return papers def build_corpus(self, papers, papers_meta): corpus = self.build_meta_corpus(papers_meta) for p in papers: ph = [] for sid, section in enumerate(p['sections']): ph.extend(section['highlights']) for pid, ls in corpus.items(): if pid == p['id']: corpus[pid] = p['abstract'] + str(' '.join(ph)) ''' self.print_fn("================== final corpus ====================") self.print_fn('\n'.join([str("paper: "+ get_by_pid(pid, papers_meta)['title']+" \nhighlight count: " + str(len(phs))) for pid, phs in corpus.items()])) self.print_fn("======== sample point ========") p = random.choice(list(papers)) self.print_fn("paper: "+ p['title']+" \nhighlights: " + str(corpus[p['id']])) self.print_fn("======== sample meta point ========") p = random.choice(list(papers_meta)) self.print_fn("meta paper: "+ p['title']+" \nhighlights: " + str(corpus[p['id']])) ''' return corpus def get_by_pid(self, pid, papers): for p in papers: if p['id'] == pid: return p def build_meta_corpus(self, papers): meta_corpus = {} for p in papers: # pself.print_fn(p) pid = p['id'] ptext = p['title'] + ". " + p['abstract'] doc = self.nlp(ptext) phs, _, _ = self.extractive_highlights([str(sent) for sent in list(doc.sents)]) meta_corpus[pid] = str(' '.join(phs)) ''' self.print_fn("================== meta corpus ====================") self.print_fn('\n'.join([str("paper: "+ get_by_pid(pid, papers)['title']+" \nhighlight count: " + str(len(phs))) for pid, phs in meta_corpus.items()])) self.print_fn("======== sample point ========") p = random.choice(list(papers)) self.print_fn("paper: "+ p['title']+" \nhighlights: " + str(meta_corpus[p['id']])) ''' return meta_corpus def select_papers(self, papers, query, num_papers=20): import numpy as np # self.print_fn("paper sample: ") # self.print_fn(papers) meta_corpus = self.build_meta_corpus(papers) scores = [] pids = [] for id, highlights in meta_corpus.items(): score = self.text_para_similarity(query, highlights) scores.append(score) pids.append(id) # self.print_fn("corpus item: " + str(self.get_by_pid(id, papers)['title'])) idx = np.argsort(scores)[:num_papers] #for i in range(len(scores)): # self.print_fn("paper: " + str(self.get_by_pid(pids[i], papers)['title'])) # self.print_fn("score: " + str(scores[i])) # self.print_fn("argsort ids("+str(num_papers)+" papers): "+ str(idx)) idx = [pids[i] for i in idx] # self.print_fn("argsort pids("+str(num_papers)+" papers): "+ str(idx)) papers_selected = [p for p in papers if p['id'] in idx] # assert(len(papers_selected)==num_papers) self.print_fn("num papers selected: " + str(len(papers_selected))) for p in papers_selected: self.print_fn("Selected Paper: " + p['title']) #self.print_fn("constrast with natural selection: forward") #for p in papers[:4]: # self.print_fn("Selected Paper: " + p['title']) #self.print_fn("constrast with natural selection: backward") #for p in papers[-4:]: # self.print_fn("Selected Paper: " + p['title']) # arxiv search producing better relevnce return papers_selected def extractive_summary(self, text): if 'cuda' in self.torch_device: torch.cuda.empty_cache() with torch.no_grad(): res = self.model(text, ratio=0.5) res_doc = self.nlp(res) return " ".join(set([str(sent) for sent in list(res_doc.sents)])) def extractive_highlights(self, lines): # text = " ".join(lines) # text_doc = nlp(" ".join([l.lower() for l in lines])) # text = ' '.join([ str(sent) for sent in list(text_doc.sents)]) if 'cuda' in self.torch_device: torch.cuda.empty_cache() with torch.no_grad(): res = self.model(" ".join([l.lower() for l in lines]), ratio=0.5, ) res_doc = self.nlp(res) res_lines = set([str(sent) for sent in list(res_doc.sents)]) # self.print_fn("\n- ".join(res_sents)) with torch.no_grad(): keywords = self.kw_model.extract_keywords(str(" ".join([l.lower() for l in lines])), stop_words='english') keyphrases = self.kw_model.extract_keywords(str(" ".join([l.lower() for l in lines])), keyphrase_ngram_range=(4, 4), stop_words='english', use_mmr=True, diversity=0.7) return res_lines, keywords, keyphrases def extract_highlights(self, papers): for p in papers: sid = 0 p['sections'] = [] for heading, lines in p['body_text'].items(): hs, kws, kps = self.extractive_highlights(lines) p['sections'].append({ 'sid': sid, 'heading': heading, 'text': lines, 'highlights': hs, 'keywords': kws, 'keyphrases': kps, }) sid += 1 return papers def extract_structure(self, papers, pdf_dir, txt_dir, img_dir, dump_dir, tab_dir, tables=False): self.print_fn("\n- extracting sections.. ") papers, ids_none = self.extract_parts(papers, txt_dir, dump_dir) self.print_fn("\n- extracting images.. for future correlation use-cases ") papers = self.extract_images(papers, pdf_dir, img_dir) if tables: self.print_fn("\n- extracting tables.. for future correlation use-cases ") papers = self.extract_tables(papers, pdf_dir, tab_dir) return papers, ids_none def extract_parts(self, papers, txt_dir, dump_dir): headings_all = {} # refined = [] # model = build_summarizer() #for file in glob.glob(txt_dir + '/*.txt'): for p in papers: file = txt_dir + '/'+ p['id'] +'.txt' refined, headings_extracted = self.extract_headings(file) sections = self.extract_sections(headings_extracted, refined) # highlights = {k: extract_highlights(model,v) for k, v in sections.items()} #p = self.get_by_file(file, papers) #if len(headings_extracted) > 3: p['body_text'] = sections # p['body_highlights'] = highlights headings_all[p['id']] = headings_extracted ids_none = {i: h for i, h in headings_all.items() if len(h) < 3} ''' for f, h in headings_all.items(): if len(h) < 4: self.print_fn("=================headings almost undetected================") self.print_fn(f) self.print_fn(h) ''' # from pprint import pprint # pself.print_fn({f: len(h) for f,h in headings_all.items()}) papers_none = [p for p in papers if p['id'] in ids_none] for p in papers_none: os.remove(txt_dir + '/'+ p['id'] + '.txt') papers.remove(p) return papers, ids_none def check_para(self, df): size = 0 for col in df.columns: size += df[col].apply(lambda x: len(str(x))).median() return size / len(df.columns) > 25 def scan_blocks(self, lines): lines_mod = [line.strip().replace('\n', '') for line in lines if len(line.strip().replace('\n', '')) > 3] for i in range(len(lines_mod)): yield lines_mod[i:i + 3] def extract_sections(self, headings, lines, min_part_length=2): sections = {} self.check_list_elems_in_list(headings, lines) head_len = len(headings) for i in range(len(headings) - 1): start = headings[i] end = headings[i + 1] section = self.get_section(start, end, lines) # self.print_fn(start + " : "+ str(len(section)) +" lines") ''' if i > 0: old = headings[i-1] if len(section) < min_part_length + 1: sections[old].extend(start) sections[old].extend(section) else: sections[start] = section else: sections[start] = section ''' sections[start] = section return {k: v for k, v in sections.items()} def is_rubbish(self, s, rubbish_tolerance=0.2, min_char_len=4): # numbers = sum(c.isdigit() for c in s) letters = sum(c.isalpha() for c in s) spaces = sum(c.isspace() for c in s) # others = len(s) - numbers - letters - spaces if len(s) == 0: return False if ((len(s) - (letters + spaces)) / len(s) >= rubbish_tolerance) or self.alpha_length(s) < min_char_len: return True else: return False def get_section(self, first, last, lines): try: assert (first in lines) assert (last in lines) # start = lines.index( first ) + len( first ) # end = lines.index( last, start ) start = [i for i in range(len(lines)) if first is lines[i]][0] end = [i for i in range(len(lines)) if last is lines[i]][0] section_lines = lines[start + 1:end] # self.print_fn("heading: " + str(first)) # self.print_fn("section_lines: "+ str(section_lines)) # self.print_fn(section_lines) return section_lines except ValueError: self.print_fn("value error :") self.print_fn("first heading :" + str(first) + ", second heading :" + str(last)) self.print_fn("first index :" + str(start) + ", second index :" + str(end)) return "" def check_list_elems_in_list(self, headings, lines): import numpy as np # [self.print_fn(head) for head in headings if head not in lines ] return np.all([True if head in lines else False for head in headings]) def check_first_char_upper(self, text): for c in text: if c.isspace(): continue elif c.isalpha(): return c.isupper() def extract_headings(self, txt_file): import re fulltext = self.read_paper(txt_file) lines = self.clean_lines(fulltext) refined, headings = self.scan_text(lines) assert (self.check_list_elems_in_list(headings, refined)) headings = self.check_duplicates(headings) # self.print_fn('===========================================') # self.print_fn(txt_file +": first scan: \n"+str(len(headings))+" headings") # self.print_fn('\n'.join(headings)) # scan_failed - rescan with first match for abstract hook if len(headings) == 0: # self.print_fn('===================') # self.print_fn("run 1 failed") abs_cans = [line for line in lines if 'abstract' in re.sub("\s+", "", line.strip().lower())] if len(abs_cans) != 0: abs_head = abs_cans[0] refined, headings = self.scan_text(lines, abs_head=abs_head) self.check_list_elems_in_list(headings, refined) headings = self.check_duplicates(headings) # self.print_fn('===================') # self.print_fn(txt_file +": second scan: \n"+str(len(headings))+" headings") # if len(headings) == 0: # self.print_fn("heading scan failed completely") return refined, headings def check_duplicates(self, my_list): my_finallist = [] dups = [s for s in my_list if my_list.count(s) > 1] if len(dups) > 0: [my_finallist.append(n) for n in my_list if n not in my_finallist] # self.print_fn("original: "+str(len(my_list))+" new: "+str(len(my_finallist))) return my_finallist def clean_lines(self, text): import numpy as np import re # doc = nlp(text) # lines = [str(sent) for sent in doc.sents] lines = text.replace('\r', '').split('\n') lines = [line for line in lines if not self.is_rubbish(line)] lines = [line for line in lines if re.match("^[a-zA-Z1-9\.\[\]\(\):\-,\"\"\s]*$", line) and not 'Figure' in line and not 'Table' in line] lengths_cleaned = [self.alpha_length(line) for line in lines] mean_length_cleaned = np.median(lengths_cleaned) lines_standardized = [] for line in lines: if len(line) >= (1.8 * mean_length_cleaned): first_half = line[0:len(line) // 2] second_half = line[len(line) // 2 if len(line) % 2 == 0 else ((len(line) // 2) + 1):] lines_standardized.append(first_half) lines_standardized.append(second_half) else: lines_standardized.append(line) return lines def scan_text(self, lines, abs_head=None): import re # self.print_fn('\n'.join(lines)) record = False headings = [] refined = [] for i in range(1, len(lines) - 4): line = lines[i] line = line.replace('\n', '').strip() if 'abstract' in re.sub("\s+", "", line.strip().lower()) and len(line) - len('abstract') < 5 or ( abs_head is not None and abs_head in line): record = True headings.append(line) refined.append(line) if 'references' in re.sub("\s+", "", line.strip().lower()) and len(line) - len('references') < 5: headings.append(line) refined.append(line) break elif 'bibliography' in re.sub("\s+", "", line.strip().lower()) and len(line) - len('bibliography') < 5: headings.append(line) refined.append(line) break refined, headings = self.scanline(record, headings, refined, i, lines) # self.print_fn('=========in scan_text loop i : '+str(i)+' heading count : '+str(len(headings))+' =========') return refined, headings def scanline(self, record, headings, refined, id, lines): import numpy as np import re line = lines[id] if not len(line) == 0: # self.print_fn("in scanline") # self.print_fn(line) if record: refined.append(line) if len(lines[id - 1]) == 0 or len(lines[id + 1]) == 0 or re.match( "^[1-9XVIABCD]{0,4}(\.{0,1}[1-9XVIABCD]{0,4}){0, 3}\s{0,2}[A-Z][a-zA-Z\:\-\s]*$", line) and self.char_length(line) > 7: # self.print_fn("candidate") # self.print_fn(line) if np.mean([len(s) for s in lines[id + 2:id + 6]]) > 40 and self.check_first_char_upper( line) and re.match("^[a-zA-Z1-9\.\:\-\s]*$", line) and len(line.split()) < 10: # if len(line) < 20 and np.mean([len(s) for s in lines[i+1:i+5]]) > 30 : headings.append(line) assert (line in refined) # self.print_fn("selected") # self.print_fn(line) else: known_headings = ['introduction', 'conclusion', 'abstract', 'references', 'bibliography'] missing = [h for h in known_headings if not np.any([True for head in headings if h in head])] # for h in missing: head = [line for h in missing if h in re.sub("\s+", "", line.strip().lower())] # head = [line for known] if len(head) > 0: headings.append(head[0]) assert (head[0] in refined) return refined, headings def char_length(self, s): # numbers = sum(c.isdigit() for c in s) letters = sum(c.isalpha() for c in s) # spaces = sum(c.isspace() for c in s) # others = len(s) - numbers - letters - spaces return letters def get_by_file(self, file, papers): import os pid = os.path.basename(file) pid = pid.replace('.txt', '').replace('.pdf', '') for p in papers: if p['id'] == pid: return p self.print_fn("\n- paper not found by file, \nfile: "+file+"\nall papers: "+', '.join([p['id'] for p in papers])) def alpha_length(self, s): # numbers = sum(c.isdigit() for c in s) letters = sum(c.isalpha() for c in s) spaces = sum(c.isspace() for c in s) # others = len(s) - numbers - letters - spaces return letters + spaces def check_append(self, baselist, addstr): check = False for e in baselist: if addstr in e: check = True if not check: baselist.append(addstr) return baselist def extract_images(self, papers, pdf_dir, img_dir): import fitz # self.print_fn("in images") for p in papers: file = pdf_dir + p['id'] + ".pdf" print(Path(pdf_dir).resolve().glob('*')) pdf_file = fitz.open(file) images = [] for page_index in range(len(pdf_file)): page = pdf_file[page_index] images.extend(page.getImageList()) images_files = [self.save_image(pdf_file.extractImage(img[0]), i, p['id'], img_dir) for i, img in enumerate(set(images)) if img[0]] # self.print_fn(len(images_per_paper)) p['images'] = images_files # self.print_fn(len(p.keys())) # self.print_fn(papers[0].keys()) return papers def extract_images_from_file(self, pdf_file_name, img_dir): import fitz pdf_file = fitz.open(pdf_file_name) images = [] for page_index in range(len(pdf_file)): page = pdf_file[page_index] images.extend(page.getImageList()) images_files = [self.save_image(pdf_file.extractImage(img[0]), i, pdf_file_name.replace('.pdf', ''), img_dir) for i, img in enumerate(set(images)) if img[0]] return images_files def save_image(self, base_image, img_index, pid, img_dir): from PIL import Image import io image_bytes = base_image["image"] # get the image extension image_ext = base_image["ext"] # load it to PIL image = Image.open(io.BytesIO(image_bytes)) # save it to local disk fname = img_dir + "/" + str(pid) + "_" + str(img_index + 1) + "." + image_ext image.save(open(f"{fname}", "wb")) # self.print_fn(fname) return fname def save_tables(self, dfs, pid, tab_dir): # todo dfs = [df for df in dfs if not self.check_para(df)] files = [] for df in dfs: filename = tab_dir + "/" + str(pid) + ".csv" files.append(filename) df.to_csv(filename, index=False) return files def extract_tables(self, papers, pdf_dir, tab_dir): import tabula check = True # for file in glob.glob(pdf_dir+'/*.pdf'): for p in papers: dfs = tabula.read_pdf(pdf_dir + p['id'] + ".pdf", pages='all', multiple_tables=True, silent=True) p['tables'] = self.save_tables(dfs, p['id'], tab_dir) # self.print_fn(papers[0].keys()) return papers def extract_tables_from_file(self, pdf_file_name, tab_dir): import tabula check = True # for file in glob.glob(pdf_dir+'/*.pdf'): dfs = tabula.read_pdf(pdf_file_name, pages='all', multiple_tables=True, silent=True) return self.save_tables(dfs, pdf_file_name.replace('.pdf', ''), tab_dir) def search(self, query_text=None, id_list=None, max_search=100): import arxiv from urllib.parse import urlparse if query_text: search = arxiv.Search( query=query_text, max_results=max_search, sort_by=arxiv.SortCriterion.Relevance ) else: id_list = [id for id in id_list if '.' in id] search = arxiv.Search( id_list=id_list ) results = [result for result in search.get()] searched_papers = [] discarded_ids = [] for result in results: id = urlparse(result.entry_id).path.split('/')[-1].split('v')[0] if '.' in id: paper = { 'id': id, 'title': result.title, 'comments': result.comment if result.journal_ref else "None", 'journal-ref': result.journal_ref if result.journal_ref else "None", 'doi': str(result.doi), 'primary_category': result.primary_category, 'categories': result.categories, 'license': None, 'abstract': result.summary, 'published': result.published, 'pdf_url': result.pdf_url, 'links': [str(l) for l in result.links], 'update_date': result.updated, 'authors': [str(a.name) for a in result.authors], } searched_papers.append(paper) else: discarded_ids.append(urlparse(result.entry_id).path.split('/')[-1].split('v')[0]) self.print_fn("\n- Papers discarded due to id error [arxiv api bug: #74] :\n" + str(discarded_ids)) return results, searched_papers def download_pdfs(self, papers, pdf_dir): import arxiv from urllib.parse import urlparse ids = [p['id'] for p in papers] # asert(False) papers_filtered = arxiv.Search(id_list=ids).get() for p in papers_filtered: p_id = str(urlparse(p.entry_id).path.split('/')[-1]).split('v')[0] download_file = pdf_dir + "/" + p_id + ".pdf" p.download_pdf(filename=download_file) def download_sources(self, papers, src_dir): import arxiv from urllib.parse import urlparse ids = [p['id'] for p in papers] # asert(False) papers_filtered = arxiv.Search(id_list=ids).get() for p in papers_filtered: p_id = str(urlparse(p.entry_id).path.split('/')[-1]).split('v')[0] download_file = src_dir + "/" + p_id + ".tar.gz" p.download_source(filename=download_file) def convert_pdfs(self, pdf_dir, txt_dir): import glob, shutil import multiprocessing # import arxiv_public_data convert_directory_parallel(pdf_dir, multiprocessing.cpu_count()) for file in glob.glob(pdf_dir + '/*.txt'): shutil.move(file, txt_dir) def read_paper(self, path): f = open(path, 'r', encoding="utf-8") text = str(f.read()) f.close() return text def cocitation_network(self, papers, txt_dir): import multiprocessing cites = internal_citations.citation_list_parallel(N=multiprocessing.cpu_count(), directory=txt_dir) for p in papers: p['cites'] = cites[p['id']] return papers, cites def lookup_author(self, author_query): from scholarly import scholarly import operator # Retrieve the author's data, fill-in, and print self.print_fn("Searching Author: " + author_query) search_result = next(scholarly.search_author(author_query), None) if search_result is not None: author = scholarly.fill(search_result) author_stats = { 'name': author_query, 'affiliation': author['affiliation'] if author['affiliation'] else None, 'citedby': author['citedby'] if 'citedby' in author.keys() else 0, 'most_cited_year': max(author['cites_per_year'].items(), key=operator.itemgetter(1))[0] if len( author['cites_per_year']) > 0 else None, 'coauthors': [c['name'] for c in author['coauthors']], 'hindex': author['hindex'], 'impact': author['i10index'], 'interests': author['interests'], 'publications': [{'title': p['bib']['title'], 'citations': p['num_citations']} for p in author['publications']], 'url_picture': author['url_picture'], } else: self.print_fn("author not found") author_stats = { 'name': author_query, 'affiliation': "", 'citedby': 0, 'most_cited_year': None, 'coauthors': [], 'hindex': 0, 'impact': 0, 'interests': [], 'publications': [], 'url_picture': "", } # pself.print_fn(author_stats) return author_stats def author_stats(self, papers): all_authors = [] for p in papers: paper_authors = [a for a in p['authors']] all_authors.extend(paper_authors) searched_authors = [self.lookup_author(a) for a in set(all_authors)] return searched_authors def text_similarity(self, text1, text2): doc1 = self.similarity_nlp(text1) doc2 = self.similarity_nlp(text2) return doc1.similarity(doc2) def text_para_similarity(self, text, lines): doc1 = self.similarity_nlp(text) doc2 = self.similarity_nlp(" ".join(lines)) return doc1.similarity(doc2) def para_para_similarity(self, lines1, lines2): doc1 = self.similarity_nlp(" ".join(lines1)) doc2 = self.similarity_nlp(" ".join(lines2)) return doc1.similarity(doc2) def text_image_similarity(self, text, image): pass def ask(self, corpus, question): text = " ".join(corpus) import torch inputs = self.qatokenizer(question, text, return_tensors='pt') start_positions = torch.tensor([1]) end_positions = torch.tensor([3]) outputs = self.qamodel(**inputs, start_positions=start_positions, end_positions=end_positions) self.print_fn("context: " + text) self.print_fn("question: " + question) self.print_fn("outputs: " + outputs) return outputs def zip_outputs(self, dump_dir, zip_name): import zipfile def zipdir(path, ziph): # ziph is zipfile handle for root, dirs, files in os.walk(path): for file in files: ziph.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.join(path, '../..'))) zipf = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) zipdir(dump_dir, zipf) def survey(self, query=None, id_list=None, max_search=None, num_papers=None, debug=False, weigh_authors=False, pdf_dir=None, txt_dir=None, img_dir=None, tab_dir=None, dump_dir=None): import joblib import os, shutil dirs = self.define_structure(pdf_dir=pdf_dir, txt_dir=txt_dir, img_dir=img_dir, tab_dir=tab_dir, dump_dir=dump_dir) [survey_pdf_dir, survey_txt_dir, survey_img_dir, survey_tab_dir, survey_dump_dir] = dirs if not max_search: max_search = self.DEFAULTS['max_search'] if not num_papers: num_papers = self.DEFAULTS['num_papers'] if (query is None) and (id_list is None): raise ValueError('please provide a base to survey on: list of arxiv IDs or a few research keywords') # arxiv api relevance search and data preparation self.print_fn("\n- searching arXiv for top 100 papers.. ") results, searched_papers = self.search(query, id_list, max_search=max_search) joblib.dump(searched_papers, survey_dump_dir + 'papers_metadata.dmp') self.print_fn("\n- found " + str(len(searched_papers)) + " papers") # paper selection by scibert vector embedding relevance scores # papers_selected = select_papers(searched_papers, query, num_papers=num_papers) papers_highlighted, papers_selected, cites = self.pdf_route(survey_pdf_dir, survey_txt_dir, survey_img_dir, survey_tab_dir, survey_dump_dir, searched_papers) if weigh_authors: authors = self.author_stats(papers_highlighted) joblib.dump(papers_highlighted, survey_dump_dir + 'papers_highlighted.dmp') self.print_fn("\n- Standardizing known section headings per paper.. ") papers_standardized = self.standardize_headings(papers_highlighted) joblib.dump(papers_standardized, survey_dump_dir + 'papers_standardized.dmp') self.print_fn("\n- Building paper-wise corpus.. ") corpus = self.build_corpus(papers_highlighted, searched_papers) joblib.dump(corpus, survey_dump_dir + 'corpus.dmp') self.print_fn("\n- Building section-wise corpus.. ") corpus_sectionwise = self.build_corpus_sectionwise(papers_standardized) joblib.dump(corpus_sectionwise, survey_dump_dir + 'corpus_sectionwise.dmp') self.print_fn("\n- Building basic research highlights.. ") research_blocks = self.build_basic_blocks(corpus_sectionwise, corpus) joblib.dump(research_blocks, survey_dump_dir + 'research_blocks.dmp') self.print_fn("\n- Reducing corpus to lines.. ") corpus_lines = self.get_corpus_lines(corpus) joblib.dump(corpus_lines, survey_dump_dir + 'corpus_lines.dmp') # temp # searched_papers = joblib.load(dump_dir + 'papers_metadata.dmp') ''' papers_highlighted = joblib.load(dump_dir + 'papers_highlighted.dmp') corpus = joblib.load(dump_dir + 'corpus.dmp') papers_standardized = joblib.load(dump_dir + 'papers_standardized.dmp') corpus_sectionwise = joblib.load(dump_dir + 'corpus_sectionwise.dmp') research_blocks = joblib.load(dump_dir + 'research_blocks.dmp') corpus_lines = joblib.load(dump_dir + 'corpus_lines.dmp') ''' ''' self.print_fn("papers_highlighted types:"+ str(np.unique([str(type(p['sections'][0]['highlights'])) for p in papers_highlighted]))) self.print_fn("papers_highlighted example:") self.print_fn(random.sample(list(papers_highlighted), 1)[0]['sections'][0]['highlights']) self.print_fn("corpus types:"+ str(np.unique([str(type(txt)) for k,txt in corpus.items()]))) self.print_fn("corpus example:") self.print_fn(random.sample(list(corpus.items()), 1)[0]) self.print_fn("corpus_lines types:"+ str(np.unique([str(type(txt)) for txt in corpus_lines]))) self.print_fn("corpus_lines example:") self.print_fn(random.sample(list(corpus_lines), 1)[0]) self.print_fn("corpus_sectionwise types:"+ str(np.unique([str(type(txt)) for k,txt in corpus_sectionwise.items()]))) self.print_fn("corpus_sectionwise example:") self.print_fn(random.sample(list(corpus_sectionwise.items()), 1)[0]) self.print_fn("research_blocks types:"+ str(np.unique([str(type(txt)) for k,txt in research_blocks.items()]))) self.print_fn("research_blocks example:") self.print_fn(random.sample(list(research_blocks.items()), 1)[0]) ''' # self.print_fn("corpus types:"+ str(np.unique([type(txt) for k,txt in corpus.items()]))) self.print_fn("\n- Building abstract.. ") abstract_block = self.get_abstract(corpus_lines, corpus_sectionwise, research_blocks) joblib.dump(abstract_block, survey_dump_dir + 'abstract_block.dmp') ''' self.print_fn("abstract_block type:"+ str(type(abstract_block))) self.print_fn("abstract_block:") self.print_fn(abstract_block) ''' self.print_fn("\n- Building introduction.. ") intro_block = self.get_intro(corpus_sectionwise, research_blocks) joblib.dump(intro_block, survey_dump_dir + 'intro_block.dmp') ''' self.print_fn("intro_block type:"+ str(type(intro_block))) self.print_fn("intro_block:") self.print_fn(intro_block) ''' self.print_fn("\n- Building custom sections.. ") clustered_sections, clustered_sentences = self.get_clusters(papers_standardized, searched_papers) joblib.dump(clustered_sections, survey_dump_dir + 'clustered_sections.dmp') joblib.dump(clustered_sentences, survey_dump_dir + 'clustered_sentences.dmp') ''' self.print_fn("clusters extracted") self.print_fn("clustered_sentences types:"+ str(np.unique([str(type(txt)) for k,txt in clustered_sentences.items()]))) self.print_fn("clustered_sentences example:") self.print_fn(random.sample(list(clustered_sections.items()), 1)[0]) self.print_fn("clustered_sections types:"+ str(np.unique([str(type(txt)) for k,txt in clustered_sections.items()]))) self.print_fn("clustered_sections example:") self.print_fn(random.sample(list(clustered_sections.items()), 1)[0]) ''' clustered_sections['abstract'] = abstract_block clustered_sections['introduction'] = intro_block joblib.dump(clustered_sections, survey_dump_dir + 'research_sections.dmp') self.print_fn("\n- Building conclusion.. ") conclusion_block = self.get_conclusion(clustered_sections) joblib.dump(conclusion_block, survey_dump_dir + 'conclusion_block.dmp') clustered_sections['conclusion'] = conclusion_block ''' self.print_fn("conclusion_block type:"+ str(type(conclusion_block))) self.print_fn("conclusion_block:") self.print_fn(conclusion_block) ''' if query is None: query = self.generate_title(' '.join([v for v in clustered_sections.values()])) survey_file = 'A_Survey_on_' + query.replace(' ', '_') + '.txt' survey_file = Path(survey_dump_dir).resolve() / survey_file self.build_doc(clustered_sections, papers_standardized, query=query, filename=str(survey_file)) self.survey_print_fn("\n-citation-network: ") self.survey_print_fn(cites) for d in [survey_pdf_dir, survey_txt_dir, survey_img_dir, survey_tab_dir]: shutil.copytree(str(d), survey_dump_dir + f'/{Path(d).stem}/') assert (os.path.exists(survey_file)) zip_name = 'arxiv_dumps_'+query.replace(' ', '_')+'.zip' zip_name = Path(survey_dump_dir).parent.resolve() / zip_name self.zip_outputs(survey_dump_dir, str(zip_name)) self.print_fn("\n- Survey complete.. \nSurvey file path :" + str(survey_file) + "\nAll outputs zip path :" + str(zip_name)) return str(zip_name.resolve()), str(survey_file.resolve())