|
import os
|
|
import tkinter as tk
|
|
from tkinter import filedialog, messagebox
|
|
import PyPDF2
|
|
import re
|
|
import json
|
|
import torch
|
|
import ollama
|
|
from openai import OpenAI
|
|
import argparse
|
|
|
|
|
|
PINK = '\033[95m'
|
|
CYAN = '\033[96m'
|
|
YELLOW = '\033[93m'
|
|
NEON_GREEN = '\033[92m'
|
|
RESET_COLOR = '\033[0m'
|
|
|
|
|
|
def open_file(filepath):
|
|
with open(filepath, 'r', encoding='utf-8') as infile:
|
|
return infile.read()
|
|
|
|
|
|
def convert_pdf_to_text():
|
|
file_path = filedialog.askopenfilename(filetypes=[("PDF Files", "*.pdf")])
|
|
if file_path:
|
|
base_directory = os.path.join("local-rag", "text_parse")
|
|
file_name = os.path.basename(file_path)
|
|
output_file_name = os.path.splitext(file_name)[0] + ".txt"
|
|
file_output_path = os.path.join(base_directory, output_file_name)
|
|
|
|
if not os.path.exists(base_directory):
|
|
os.makedirs(base_directory)
|
|
print(f"Directory '{base_directory}' created.")
|
|
|
|
with open(file_path, 'rb') as pdf_file:
|
|
pdf_reader = PyPDF2.PdfReader(pdf_file)
|
|
text = ''
|
|
for page_num in range(len(pdf_reader.pages)):
|
|
page = pdf_reader.pages[page_num]
|
|
if page.extract_text():
|
|
text += page.extract_text() + " "
|
|
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
sentences = re.split(r'(?<=[.!?]) +', text)
|
|
chunks = []
|
|
current_chunk = ""
|
|
for sentence in sentences:
|
|
if len(current_chunk) + len(sentence) + 1 < 1000:
|
|
current_chunk += (sentence + " ").strip()
|
|
else:
|
|
chunks.append(current_chunk)
|
|
current_chunk = sentence + " "
|
|
if current_chunk:
|
|
chunks.append(current_chunk)
|
|
|
|
with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
|
|
temp_file.write(output_file_name + "\n")
|
|
for chunk in chunks:
|
|
temp_file.write(chunk.strip() + "\n")
|
|
|
|
with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
|
|
vault_file.write("\n")
|
|
for chunk in chunks:
|
|
vault_file.write(chunk.strip() + "\n")
|
|
|
|
if not os.path.exists(file_output_path):
|
|
with open(file_output_path, "w", encoding="utf-8") as f:
|
|
for chunk in chunks:
|
|
f.write(chunk.strip() + "\n")
|
|
f.write("====================NOT FINISHED====================\n")
|
|
print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
|
|
else:
|
|
print(f"File '{file_output_path}' already exists.")
|
|
|
|
print(f"PDF content appended to vault.txt with each chunk on a separate line.")
|
|
|
|
|
|
input_value = input("Enter your question:")
|
|
process_text_files(input_value)
|
|
|
|
|
|
def upload_txtfile():
|
|
file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")])
|
|
if file_path:
|
|
|
|
base_directory = os.path.join("local-rag", "text_parse")
|
|
|
|
|
|
file_name = os.path.basename(file_path)
|
|
output_file_name = os.path.splitext(file_name)[0] + ".txt"
|
|
|
|
|
|
|
|
file_output_path = os.path.join(base_directory, output_file_name)
|
|
|
|
|
|
if not os.path.exists(base_directory):
|
|
os.makedirs(base_directory)
|
|
print(f"Directory '{base_directory}' created.")
|
|
|
|
|
|
with open(file_path, 'r', encoding="utf-8") as txt_file:
|
|
text = txt_file.read()
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
|
|
|
|
sentences = re.split(r'(?<=[.!?]) +', text)
|
|
chunks = []
|
|
current_chunk = ""
|
|
for sentence in sentences:
|
|
|
|
if len(current_chunk) + len(sentence) + 1 < 1000:
|
|
current_chunk += (sentence + " ").strip()
|
|
else:
|
|
|
|
chunks.append(current_chunk)
|
|
current_chunk = sentence + " "
|
|
if current_chunk:
|
|
chunks.append(current_chunk)
|
|
|
|
|
|
with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
|
|
temp_file.write(output_file_name + "\n")
|
|
for chunk in chunks:
|
|
|
|
temp_file.write(chunk.strip() + "\n")
|
|
|
|
with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
|
|
vault_file.write("\n")
|
|
for chunk in chunks:
|
|
|
|
vault_file.write(chunk.strip() + "\n")
|
|
|
|
|
|
if not os.path.exists(file_output_path):
|
|
with open(file_output_path, "w") as f:
|
|
f.write("")
|
|
f.write("====================NOT FINISHED====================\n")
|
|
print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
|
|
else:
|
|
print(f"File '{file_output_path}' already exists.")
|
|
|
|
print(f"Text file content appended to vault.txt with each chunk on a separate line.")
|
|
|
|
input_value = input("Enter your question:")
|
|
process_text_files(input_value)
|
|
else:
|
|
print("No file selected.")
|
|
|
|
|
|
def upload_jsonfile():
|
|
file_path = filedialog.askopenfilename(filetypes=[("JSON Files", "*.json")])
|
|
if file_path:
|
|
|
|
|
|
base_directory = os.path.join("local-rag", "text_parse")
|
|
|
|
|
|
file_name = os.path.basename(file_path)
|
|
output_file_name = os.path.splitext(file_name)[0] + ".txt"
|
|
|
|
|
|
|
|
file_output_path = os.path.join(base_directory, output_file_name)
|
|
|
|
|
|
if not os.path.exists(base_directory):
|
|
os.makedirs(base_directory)
|
|
print(f"Directory '{base_directory}' created.")
|
|
|
|
|
|
|
|
|
|
with open(file_path, 'r', encoding="utf-8") as json_file:
|
|
data = json.load(json_file)
|
|
|
|
|
|
text = json.dumps(data, ensure_ascii=False)
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|
|
|
|
|
sentences = re.split(r'(?<=[.!?]) +', text)
|
|
chunks = []
|
|
current_chunk = ""
|
|
for sentence in sentences:
|
|
|
|
if len(current_chunk) + len(sentence) + 1 < 1000:
|
|
current_chunk += (sentence + " ").strip()
|
|
else:
|
|
|
|
chunks.append(current_chunk)
|
|
current_chunk = sentence + " "
|
|
if current_chunk:
|
|
chunks.append(current_chunk)
|
|
|
|
|
|
with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
|
|
temp_file.write(output_file_name + "\n")
|
|
for chunk in chunks:
|
|
|
|
temp_file.write(chunk.strip() + "\n")
|
|
|
|
with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
|
|
vault_file.write("\n")
|
|
for chunk in chunks:
|
|
|
|
vault_file.write(chunk.strip() + "\n")
|
|
|
|
if not os.path.exists(file_output_path):
|
|
with open(file_output_path, "w", encoding="utf-8") as f:
|
|
for chunk in chunks:
|
|
f.write(chunk.strip() + "\n")
|
|
f.write("====================NOT FINISHED====================\n")
|
|
print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
|
|
else:
|
|
print(f"File '{file_output_path}' already exists.")
|
|
|
|
|
|
|
|
print(f"JSON file content appended to vault.txt with each chunk on a separate line.")
|
|
|
|
input_value = input("Enter your question:")
|
|
process_text_files(input_value)
|
|
|
|
def summarize():
|
|
summary_window = tk.Toplevel(root)
|
|
summary_window.title("Text Summarizer")
|
|
summary_window.geometry("400x200")
|
|
|
|
|
|
label = tk.Label(summary_window, text="Choose an option to summarize text:")
|
|
label.pack(pady=10)
|
|
|
|
|
|
upload_button = tk.Button(summary_window, text="Upload from .txt File", command=summarize_from_file)
|
|
upload_button.pack(pady=5)
|
|
|
|
paste_button = tk.Button(summary_window, text="Paste your text", command=lambda: open_paste_window(summary_window))
|
|
paste_button.pack(pady=5)
|
|
|
|
|
|
def summarize_from_file():
|
|
file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")])
|
|
if file_path:
|
|
|
|
base_directory = os.path.join("local-rag", "text_sum")
|
|
|
|
file_name = os.path.basename(file_path)
|
|
|
|
|
|
if not os.path.exists(base_directory):
|
|
os.makedirs(base_directory)
|
|
print(f"Directory '{base_directory}' created.")
|
|
|
|
summary_content = []
|
|
if os.path.exists(file_name):
|
|
with open(file_name, "r", encoding='utf-8') as sum_file:
|
|
summary_content = sum_file.readlines()
|
|
|
|
summary_embeddings = []
|
|
for content in summary_content:
|
|
response = ollama.embeddings(model='mxbai-embed-large', prompt=content)
|
|
summary_embeddings.append(response["embedding"])
|
|
|
|
summary_embeddings_tensor = torch.tensor(summary_embeddings)
|
|
print("Embeddings for each line in the vault:")
|
|
print(summary_embeddings_tensor)
|
|
|
|
conversation_history = []
|
|
system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document"
|
|
user_input = "Summarize this paragraph"
|
|
|
|
response = ollama_chat(user_input, system_message, summary_embeddings_tensor, summary_content, args.model, conversation_history)
|
|
|
|
messagebox.showinfo("Summary", response)
|
|
else:
|
|
messagebox.showerror("Error", "No file selected!")
|
|
|
|
|
|
def open_paste_window(parent_window):
|
|
|
|
paste_window = tk.Toplevel(parent_window)
|
|
paste_window.title("Paste Your Text")
|
|
paste_window.geometry("400x300")
|
|
|
|
|
|
label = tk.Label(paste_window, text="Paste your text below:")
|
|
label.pack(pady=5)
|
|
|
|
input_textbox = tk.Text(paste_window, height=8, width=40)
|
|
input_textbox.pack(pady=5)
|
|
|
|
|
|
def submit_text():
|
|
pasted_text = input_textbox.get("1.0", tk.END).strip()
|
|
if pasted_text:
|
|
|
|
system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document"
|
|
user_input = "Summarize this paragraph:"
|
|
new_value = user_input + pasted_text
|
|
messages = [
|
|
{
|
|
"system",
|
|
system_message,
|
|
},
|
|
{"human", new_value},
|
|
]
|
|
response = client.chat.completions.create(model=args.model, messages=messages)
|
|
|
|
response_value = response.choices[0].message.content
|
|
|
|
|
|
messagebox.showinfo("Summary", response_value)
|
|
paste_window.destroy()
|
|
else:
|
|
messagebox.showerror("Error", "No text entered!")
|
|
|
|
|
|
submit_button = tk.Button(paste_window, text="Submit", command=submit_text)
|
|
submit_button.pack(side=tk.LEFT, padx=10, pady=10)
|
|
|
|
cancel_button = tk.Button(paste_window, text="Cancel", command=paste_window.destroy)
|
|
cancel_button.pack(side=tk.RIGHT, padx=10, pady=10)
|
|
|
|
|
|
|
|
def get_relevant_context(rewritten_input, vault_embeddings, vault_content, top_k=3):
|
|
if vault_embeddings.nelement() == 0:
|
|
return []
|
|
input_embedding = ollama.embeddings(model='mxbai-embed-large', prompt=rewritten_input)["embedding"]
|
|
cos_scores = torch.cosine_similarity(torch.tensor(input_embedding).unsqueeze(0), vault_embeddings)
|
|
top_k = min(top_k, len(cos_scores))
|
|
top_indices = torch.topk(cos_scores, k=top_k)[1].tolist()
|
|
relevant_context = [vault_content[idx].strip() for idx in top_indices]
|
|
return relevant_context
|
|
|
|
|
|
def ollama_chat(user_input, system_message, vault_embeddings, vault_content, ollama_model, conversation_history):
|
|
relevant_context = get_relevant_context(user_input, vault_embeddings, vault_content, top_k=3)
|
|
if relevant_context:
|
|
context_str = "\n".join(relevant_context)
|
|
print("Context Pulled from Documents: \n\n" + CYAN + context_str + RESET_COLOR)
|
|
else:
|
|
print(CYAN + "No relevant context found." + RESET_COLOR)
|
|
|
|
user_input_with_context = user_input
|
|
if relevant_context:
|
|
user_input_with_context = context_str + "\n\n" + user_input
|
|
|
|
conversation_history.append({"role": "user", "content": user_input_with_context})
|
|
messages = [{"role": "system", "content": system_message}, *conversation_history]
|
|
|
|
response = client.chat.completions.create(model=ollama_model, messages=messages)
|
|
conversation_history.append({"role": "assistant", "content": response.choices[0].message.content})
|
|
|
|
return response.choices[0].message.content
|
|
|
|
|
|
def process_text_files(user_input):
|
|
text_parse_directory = os.path.join("local-rag", "text_parse")
|
|
temp_file_path = os.path.join("local-rag", "temp.txt")
|
|
|
|
if not os.path.exists(text_parse_directory):
|
|
print(f"Directory '{text_parse_directory}' does not exist.")
|
|
return False
|
|
|
|
if not os.path.exists(temp_file_path):
|
|
print("temp.txt does not exist.")
|
|
return False
|
|
|
|
with open(temp_file_path, 'r', encoding='utf-8') as temp_file:
|
|
first_line = temp_file.readline().strip()
|
|
|
|
text_files = [f for f in os.listdir(text_parse_directory) if f.endswith('.txt')]
|
|
|
|
if f"{first_line}" not in text_files:
|
|
print(f"No matching file found for '{first_line}.txt' in text_parse directory.")
|
|
return False
|
|
|
|
file_path = os.path.join(text_parse_directory, f"{first_line}")
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
|
|
lines = [line.strip() for line in lines]
|
|
|
|
if len(lines) >= 2 and lines[-1] == "====================NOT FINISHED====================":
|
|
print(f"'{first_line}' contains the 'NOT FINISHED' flag. Computing embeddings.")
|
|
|
|
vault_content = []
|
|
if os.path.exists(temp_file_path):
|
|
with open(temp_file_path, "r", encoding='utf-8') as vault_file:
|
|
vault_content = vault_file.readlines()
|
|
|
|
vault_embeddings = []
|
|
for content in vault_content:
|
|
response = ollama.embeddings(model='mxbai-embed-large', prompt=content)
|
|
vault_embeddings.append(response["embedding"])
|
|
|
|
vault_embeddings_tensor = torch.tensor(vault_embeddings)
|
|
print("Embeddings for each line in the vault:")
|
|
print(vault_embeddings_tensor)
|
|
|
|
with open(os.path.join(text_parse_directory, f"{first_line}_embedding.pt"), "wb") as tensor_file:
|
|
torch.save(vault_embeddings_tensor, tensor_file)
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
f.writelines(lines[:-1])
|
|
|
|
else:
|
|
print(f"'{first_line}' does not contain the 'NOT FINISHED' flag or is already complete. Loading tensor if it exists.")
|
|
|
|
tensor_file_path = os.path.join(text_parse_directory, f"{first_line}_embedding.pt")
|
|
if os.path.exists(tensor_file_path):
|
|
vault_embeddings_tensor = torch.load(tensor_file_path)
|
|
print("Loaded Vault Embedding Tensor:")
|
|
print(vault_embeddings_tensor)
|
|
|
|
vault_content = []
|
|
if os.path.exists(temp_file_path):
|
|
with open(temp_file_path, "r", encoding='utf-8') as vault_file:
|
|
vault_content = vault_file.readlines()
|
|
|
|
conversation_history = []
|
|
system_message = "You are a helpful assistant that is an expert at extracting the most useful information from a given text"
|
|
response = ollama_chat(user_input, system_message, vault_embeddings_tensor, vault_content, args.model, conversation_history)
|
|
|
|
print (response)
|
|
|
|
return response
|
|
|
|
|
|
root = tk.Tk()
|
|
root.title("Upload .pdf, .txt, or .json")
|
|
|
|
|
|
pdf_button = tk.Button(root, text="Upload PDF", command=convert_pdf_to_text)
|
|
pdf_button.pack(pady=15)
|
|
|
|
|
|
txt_button = tk.Button(root, text="Upload Text File", command=upload_txtfile)
|
|
txt_button.pack(pady=15)
|
|
|
|
|
|
json_button = tk.Button(root, text="Upload JSON File", command=upload_jsonfile)
|
|
json_button.pack(pady=15)
|
|
|
|
|
|
json_button = tk.Button(root, text="Summarize This!", command=summarize)
|
|
json_button.pack(pady=15)
|
|
|
|
|
|
client = OpenAI(base_url='http://localhost:11434/v1', api_key='llama3')
|
|
|
|
|
|
parser = argparse.ArgumentParser(description="Ollama Chat")
|
|
parser.add_argument("--model", default="llama3", help="Ollama model to use (default: llama3)")
|
|
args = parser.parse_args()
|
|
|
|
|
|
root.mainloop()
|
|
|