# import chromadb # from chromadb import Documents, EmbeddingFunction, Embeddings # from transformers import AutoModel # import json # from numpy.linalg import norm # import sqlite3 # import urllib # from django.conf import settings # # this module act as a singleton class # class JinaAIEmbeddingFunction(EmbeddingFunction): # def __init__(self, model): # super().__init__() # self.model = model # def __call__(self, input: Documents) -> Embeddings: # embeddings = self.model.encode(input) # return embeddings.tolist() # # instance of embedding_model # embedding_model = AutoModel.from_pretrained('jinaai/jina-embeddings-v2-base-en', # trust_remote_code=True, # cache_dir='models') # # instance of JinaAIEmbeddingFunction # ef = JinaAIEmbeddingFunction(embedding_model) # # list of topics # topic_descriptions = json.load(open("topic_descriptions.txt")) # topics = list(dict.keys(topic_descriptions)) # embeddings = [embedding_model.encode(topic_descriptions[key]) for key in topic_descriptions] # cos_sim = lambda a,b: (a @ b.T) / (norm(a)*norm(b)) # def choose_topic(summary): # embed = embedding_model.encode(summary) # topic = "" # max_sim = 0. # for i,key in enumerate(topics): # sim = cos_sim(embed,embeddings[i]) # if sim > max_sim: # topic = key # max_sim = sim # return topic # def authors_list_to_str(authors): # """input a list of authors, return a string represent authors""" # text = "" # for author in authors: # text+=author+", " # return text[:-3] # def authors_str_to_list(string): # """input a string of authors, return a list of authors""" # authors = [] # list_auth = string.split("and") # for author in list_auth: # if author != "et al.": # authors.append(author.strip()) # return authors # def chunk_texts(text, max_char=400): # """ # Chunk a long text into several chunks, with each chunk about 300-400 characters long, # but make sure no word is cut in half. # Args: # text: The long text to be chunked. # max_char: The maximum number of characters per chunk (default: 400). # Returns: # A list of chunks. # """ # chunks = [] # current_chunk = "" # words = text.split() # for word in words: # if len(current_chunk) + len(word) + 1 >= max_char: # chunks.append(current_chunk) # current_chunk = " " # else: # current_chunk += " " + word # chunks.append(current_chunk.strip()) # return chunks # def trimming(txt): # start = txt.find("{") # end = txt.rfind("}") # return txt[start:end+1].replace("\n"," ") # # crawl data # def extract_tag(txt,tagname): # return txt[txt.find("<"+tagname+">")+len(tagname)+2:txt.find("")] # def get_record(extract): # id = extract_tag(extract,"id") # updated = extract_tag(extract,"updated") # published = extract_tag(extract,"published") # title = extract_tag(extract,"title").replace("\n ","").strip() # summary = extract_tag(extract,"summary").replace("\n","").strip() # authors = [] # while extract.find("")!=-1: # author = extract_tag(extract,"name") # extract = extract[extract.find("")+9:] # authors.append(author) # pattern = '") != -1: # extract = xml[xml.find("")+7:xml.find("")] # xml = xml[xml.find("")+8:] # extract = get_record(extract) # topic = choose_topic(extract[6]) # records.append([topic,*extract]) # return records # except Exception as e: # return "Error: "+str(e) # def crawl_arxiv(keyword_list, max_results=100): # baseurl = 'http://export.arxiv.org/api/query?search_query=' # records = [] # for i,keyword in enumerate(keyword_list): # if i ==0: # url = baseurl + 'all:' + keyword # else: # url = url + '+OR+' + 'all:' + keyword # url = url+ '&max_results=' + str(max_results) # url = url.replace(' ', '%20') # try: # arxiv_page = urllib.request.urlopen(url,timeout=100).read() # xml = str(arxiv_page,encoding="utf-8") # while xml.find("") != -1: # extract = xml[xml.find("")+7:xml.find("")] # xml = xml[xml.find("")+8:] # extract = get_record(extract) # topic = choose_topic(extract[6]) # records.append([topic,*extract]) # return records # except Exception as e: # return "Error: "+str(e) # class ArxivSQL: # def __init__(self, table="arxivsql", name="db.sqlite3"): # self.con = sqlite3.connect(name) # self.cur = self.con.cursor() # self.table = table # def query(self, title="", author=[]): # if len(title)>0: # query_title = 'title like "%{}%"'.format(title) # else: # query_title = "True" # if len(author)>0: # query_author = 'authors like ' # for auth in author: # query_author += "'%{}%' or ".format(auth) # query_author = query_author[:-4] # else: # query_author = "True" # query = "select * from {} where {} and {}".format(self.table,query_title,query_author) # result = self.cur.execute(query) # return result.fetchall() # def query_id(self, ids=[]): # try: # if len(ids) == 0: # return None # query = "select * from {} where id in (".format(self.table) # for id in ids: # query+="'"+id+"'," # query = query[:-1] + ")" # result = self.cur.execute(query) # return result.fetchall() # except Exception as e: # print(e) # print("Error query: ",query) # def add(self, crawl_records): # """ # Add crawl_records (list) obtained from arxiv_crawlers # A record is a list of 8 columns: # [topic, id, updated, published, title, author, link, summary] # Return the final length of the database table # """ # results = "" # for record in crawl_records: # try: # query = """insert into arxivsql values("{}","{}","{}","{}","{}","{}","{}")""".format( # record[1][21:], # record[0], # record[4].replace('"',"'"), # authors_list_to_str(record[5]), # record[2][:10], # record[3][:10], # record[6] # ) # self.cur.execute(query) # self.con.commit() # except Exception as e: # result+=str(e) # result+="\n" + query + "\n" # finally: # return results # # instance of ArxivSQL # sqldb = ArxivSQL() # class ArxivChroma: # """ # Create an interface to arxivdb, which only support query and addition. # This interface do not support edition and deletion procedures. # """ # def __init__(self, table="arxiv_records", name="arxivdb/"): # self.client = chromadb.PersistentClient(name) # self.model = embedding_model # self.collection = self.client.get_or_create_collection(table, # embedding_function=JinaAIEmbeddingFunction( # model = self.model # )) # def query_relevant(self, keywords, query_texts, n_results=3): # """ # Perform a query using a list of keywords (str), # or using a relavant string # """ # contains = [] # for keyword in keywords: # contains.append({"$contains":keyword.lower()}) # return self.collection.query( # query_texts=query_texts, # where_document={ # "$or":contains # }, # n_results=n_results, # ) # def query_exact(self, id): # ids = ["{}_{}".format(id,j) for j in range(0,10)] # return self.collection.get(ids=ids) # def add(self, crawl_records): # """ # Add crawl_records (list) obtained from arxiv_crawlers # A record is a list of 8 columns: # [topic, id, updated, published, title, author, link, summary] # Return the final length of the database table # """ # for record in crawl_records: # embed_text = """ # Topic: {}, # Title: {}, # Summary: {} # """.format(record[0],record[4],record[7]) # chunks = chunk_texts(embed_text) # ids = [record[1][21:]+"_"+str(j) for j in range(len(chunks))] # paper_ids = [{"paper_id":record[1][21:]} for _ in range(len(chunks))] # self.collection.add( # documents = chunks, # metadatas=paper_ids, # ids = ids # ) # return self.collection.count() # # instance of ArxivChroma # db = ArxivChroma()