File size: 5,191 Bytes
e60c070
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
import tempfile
import config
import nltk

from typing import List
from nltk.corpus import words
from loguru import logger
from llm.call_llm import get_completion_from_messages
from embedding.call_embedding import get_embedding
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import PyMuPDFLoader
from langchain.vectorstores import Chroma

from prompts import system_message_select

WORDS_DB_PATH = "../words_db"
VECTOR_DB_PATH = "./vector_db/chroma"

def parse_file(file_path):
    docs = []
    # check file type
    file_type = file_path.split('.')[-1]
    if file_type == 'pdf':
        loader = PyMuPDFLoader(file_path)
        content = loader.load()
        docs.extend(content)
    else: 
        return "File type not supported"
    if len(docs) > 5:
        return "File too large, please select a pdf file with less than 5 pages"
    
    slices = split_text(docs)  # split content into slices 
    words = extract_words(slices)  # extract words from slices 
    try: 
        vectorize_words(words)   # store words into vector database 
    except Exception as e:
        logger.error(e)
    
    return ""

def parse_text(input: str):
    content = input
    return content

def split_text(docs: List[object]):
    """Split text into slices"""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size = 1500,  
        chunk_overlap = 150 
    )
    splits = text_splitter.split_documents(docs)
    logger.info(f"Split {len(docs)} pages document into {len(splits)} slices")
    return splits

def extract_words(splits: List[object]):
    """Extract words from slices"""
    all_words = []
    for slice in splits:
        tmp_content = slice.page_content
        messages =  [  
            {'role':'system', 
            'content': system_message_select},    
            {'role':'user', 
            'content': f"{tmp_content}"},  
        ]
        respond = get_completion_from_messages(messages, api_key=config.api_key)
        words_list = respond.split(", ")
        if len(words_list) == 0:
            continue
        else:
            all_words.extend(words_list)
    all_words = wash_words(all_words)
    logger.info(f"Extract {len(all_words)} words from slices")
    return all_words

def wash_words(input_words: list[str]):
    """Wash words into a list of correct english words"""
    words_list = [word for word in input_words 
                      if len(word) >= 3 and len(word) <= 30]
    nltk.download('words')
    english_words = set(words.words())
    filtered_words = [word.lower() for word in words_list if word.lower() in english_words]
    filtered_words = list(set(filtered_words))
    logger.info(f"Wash {len(filtered_words)} words into a list of correct english words")
    return filtered_words

def get_words_from_text(input: str):
    words = input.split(' ')
    return words

def store_words(input: str, db_path=WORDS_DB_PATH):
    """Store words into database"""
    pass

def vectorize_words(input: list[str], embedding=None):
    """Vectorize words into vectors"""
    model = get_embedding("openai", embedding_key=config.api_key)
    persist_path = VECTOR_DB_PATH
    vectordb = Chroma.from_texts(
        texts=input,
        embedding=model,
        persist_directory=persist_path
    ) 
    vectordb.persist()
    logger.info(f"Vectorized {len(input)} words into vectors")
    return vectordb

def get_similar_k_words(query_word, k=3) -> List[str]: 
    # get 3 simlilar words from DB 
    model = get_embedding("openai", embedding_key=config.api_key)
    vectordb = Chroma(persist_directory=VECTOR_DB_PATH, embedding_function=model)
    similar_words = vectordb.max_marginal_relevance_search(query_word, k=k)
    similar_words = [word.page_content for word in similar_words]
    logger.info(f"Get {k} similar words {similar_words} from DB")
    return similar_words

def create_db(input, chat_history):
    """The input is file or text"""
    action_msg = ""  # the description of user action: put file or text into database
    # 1. for file upload
    if isinstance(input, tempfile._TemporaryFileWrapper):  
        tmp_file_path = input.name
        file_name = tmp_file_path.split('/')[-1]
        action_msg = f"Add words from my file: {file_name} to database"
        try:
            parse_file(tmp_file_path) #TODO
            output = f"Words from your file: {file_name} has been added to database"
        except Exception as e:
            logger.error(e)
            output = f"Error: failed to use your file: {file_name} generate dictionary"
    # 2. for text input
    elif isinstance(input, str):   
        action_msg = f"Add words from my text: {input} to database"
        try:
            parse_text(input)  #TODO
            output = f"Words from your text: {input} has been added to database"
        except Exception as e:
            logger.error(e)
            output = f"Error: failed to use your text: {input} generate dictionary"
    chat_history.append((action_msg, output))

    return chat_history


if __name__ == "__main__":
    create_db(embeddings="m3e")