Wolverine-Code-Companion / test_product.py
LanceY2004's picture
Upload test_product.py
5a7671c verified
import os
import tkinter as tk
from tkinter import filedialog, messagebox
import PyPDF2
import re
import json
import torch
import ollama
from openai import OpenAI
import argparse
# ANSI escape codes for colors
PINK = '\033[95m'
CYAN = '\033[96m'
YELLOW = '\033[93m'
NEON_GREEN = '\033[92m'
RESET_COLOR = '\033[0m'
# Function to open a file and return its contents as a string
def open_file(filepath):
with open(filepath, 'r', encoding='utf-8') as infile:
return infile.read()
# Function to convert PDF to text and append to vault.txt
def convert_pdf_to_text():
file_path = filedialog.askopenfilename(filetypes=[("PDF Files", "*.pdf")])
if file_path:
base_directory = os.path.join("local-rag", "text_parse")
file_name = os.path.basename(file_path)
output_file_name = os.path.splitext(file_name)[0] + ".txt"
file_output_path = os.path.join(base_directory, output_file_name)
if not os.path.exists(base_directory):
os.makedirs(base_directory)
print(f"Directory '{base_directory}' created.")
with open(file_path, 'rb') as pdf_file:
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ''
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
if page.extract_text():
text += page.extract_text() + " "
text = re.sub(r'\s+', ' ', text).strip()
sentences = re.split(r'(?<=[.!?]) +', text)
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 < 1000:
current_chunk += (sentence + " ").strip()
else:
chunks.append(current_chunk)
current_chunk = sentence + " "
if current_chunk:
chunks.append(current_chunk)
with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
temp_file.write(output_file_name + "\n")
for chunk in chunks:
temp_file.write(chunk.strip() + "\n")
with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
vault_file.write("\n")
for chunk in chunks:
vault_file.write(chunk.strip() + "\n")
if not os.path.exists(file_output_path):
with open(file_output_path, "w", encoding="utf-8") as f:
for chunk in chunks:
f.write(chunk.strip() + "\n")
f.write("====================NOT FINISHED====================\n")
print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
else:
print(f"File '{file_output_path}' already exists.")
print(f"PDF content appended to vault.txt with each chunk on a separate line.")
# Call the second part after the PDF conversion is done
input_value = input("Enter your question:")
process_text_files(input_value)
# Function to upload a text file and append to vault.txt
def upload_txtfile():
file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")])
if file_path:
# Define the base directory
base_directory = os.path.join("local-rag", "text_parse")
# Get the file name without the directory and extension
file_name = os.path.basename(file_path)
output_file_name = os.path.splitext(file_name)[0] + ".txt" # Convert PDF filename to .txt
# Construct the output file path in the base directory
file_output_path = os.path.join(base_directory, output_file_name)
# Create base directory if it doesn't exist
if not os.path.exists(base_directory):
os.makedirs(base_directory)
print(f"Directory '{base_directory}' created.")
with open(file_path, 'r', encoding="utf-8") as txt_file:
text = txt_file.read()
# Normalize whitespace and clean up text
text = re.sub(r'\s+', ' ', text).strip()
# Split text into chunks by sentences, respecting a maximum chunk size
sentences = re.split(r'(?<=[.!?]) +', text) # split on spaces following sentence-ending punctuation
chunks = []
current_chunk = ""
for sentence in sentences:
# Check if the current sentence plus the current chunk exceeds the limit
if len(current_chunk) + len(sentence) + 1 < 1000: # +1 for the space
current_chunk += (sentence + " ").strip()
else:
# When the chunk exceeds 1000 characters, store it and start a new one
chunks.append(current_chunk)
current_chunk = sentence + " "
if current_chunk: # Don't forget the last chunk!
chunks.append(current_chunk)
# Clear temp.txt and write the new content
with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
temp_file.write(output_file_name + "\n") # Write the output file name as the first line
for chunk in chunks:
# Write each chunk to its own line
temp_file.write(chunk.strip() + "\n") # Each chunk on a new line
with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
vault_file.write("\n") # Add a new line to separate content
for chunk in chunks:
# Write each chunk to its own line
vault_file.write(chunk.strip() + "\n") # Two newlines to separate chunks
# Create the file in the directory if it doesn't exist
if not os.path.exists(file_output_path):
with open(file_output_path, "w") as f:
f.write("") # Create an empty file
f.write("====================NOT FINISHED====================\n")
print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
else:
print(f"File '{file_output_path}' already exists.")
print(f"Text file content appended to vault.txt with each chunk on a separate line.")
input_value = input("Enter your question:")
process_text_files(input_value)
else:
print("No file selected.")
# Function to upload a JSON file and append to vault.txt
def upload_jsonfile():
file_path = filedialog.askopenfilename(filetypes=[("JSON Files", "*.json")])
if file_path:
# Define the base directory
base_directory = os.path.join("local-rag", "text_parse")
# Get the file name without the directory and extension
file_name = os.path.basename(file_path)
output_file_name = os.path.splitext(file_name)[0] + ".txt" # Convert PDF filename to .txt
# Construct the output file path in the base directory
file_output_path = os.path.join(base_directory, output_file_name)
# Create base directory if it doesn't exist
if not os.path.exists(base_directory):
os.makedirs(base_directory)
print(f"Directory '{base_directory}' created.")
with open(file_path, 'r', encoding="utf-8") as json_file:
data = json.load(json_file)
# Flatten the JSON data into a single string
text = json.dumps(data, ensure_ascii=False)
# Normalize whitespace and clean up text
text = re.sub(r'\s+', ' ', text).strip()
# Split text into chunks by sentences, respecting a maximum chunk size
sentences = re.split(r'(?<=[.!?]) +', text) # split on spaces following sentence-ending punctuation
chunks = []
current_chunk = ""
for sentence in sentences:
# Check if the current sentence plus the current chunk exceeds the limit
if len(current_chunk) + len(sentence) + 1 < 1000: # +1 for the space
current_chunk += (sentence + " ").strip()
else:
# When the chunk exceeds 1000 characters, store it and start a new one
chunks.append(current_chunk)
current_chunk = sentence + " "
if current_chunk: # Don't forget the last chunk!
chunks.append(current_chunk)
# Clear temp.txt and write the new content
with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
temp_file.write(output_file_name + "\n") # Write the output file name as the first line
for chunk in chunks:
# Write each chunk to its own line
temp_file.write(chunk.strip() + "\n") # Each chunk on a new line
with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
vault_file.write("\n") # Add a new line to separate content
for chunk in chunks:
# Write each chunk to its own line
vault_file.write(chunk.strip() + "\n") # Two newlines to separate chunks
if not os.path.exists(file_output_path):
with open(file_output_path, "w", encoding="utf-8") as f:
for chunk in chunks:
f.write(chunk.strip() + "\n") # Each chunk on a new line
f.write("====================NOT FINISHED====================\n")
print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
else:
print(f"File '{file_output_path}' already exists.")
print(f"JSON file content appended to vault.txt with each chunk on a separate line.")
input_value = input("Enter your question:")
process_text_files(input_value)
def summarize():
summary_window = tk.Toplevel(root)
summary_window.title("Text Summarizer")
summary_window.geometry("400x200")
# Create a label for the window
label = tk.Label(summary_window, text="Choose an option to summarize text:")
label.pack(pady=10)
# Create two buttons: one for uploading a .txt file, and one for pasting text directly
upload_button = tk.Button(summary_window, text="Upload from .txt File", command=summarize_from_file)
upload_button.pack(pady=5)
paste_button = tk.Button(summary_window, text="Paste your text", command=lambda: open_paste_window(summary_window))
paste_button.pack(pady=5)
# Function to upload a .txt file and summarize
def summarize_from_file():
file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")])
if file_path:
# Define the base directory where the file will be saved
base_directory = os.path.join("local-rag", "text_sum")
file_name = os.path.basename(file_path)
# Create the directory if it doesn't exist
if not os.path.exists(base_directory):
os.makedirs(base_directory)
print(f"Directory '{base_directory}' created.")
summary_content = []
if os.path.exists(file_name):
with open(file_name, "r", encoding='utf-8') as sum_file:
summary_content = sum_file.readlines()
summary_embeddings = []
for content in summary_content:
response = ollama.embeddings(model='mxbai-embed-large', prompt=content)
summary_embeddings.append(response["embedding"])
summary_embeddings_tensor = torch.tensor(summary_embeddings)
print("Embeddings for each line in the vault:")
print(summary_embeddings_tensor)
conversation_history = []
system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document"
user_input = "Summarize this paragraph"
response = ollama_chat(user_input, system_message, summary_embeddings_tensor, summary_content, args.model, conversation_history)
messagebox.showinfo("Summary", response) # Replace with actual summarizing logic
else:
messagebox.showerror("Error", "No file selected!")
# Function to open a window for pasting text and summarizing
def open_paste_window(parent_window):
# Create a new window for pasting text
paste_window = tk.Toplevel(parent_window)
paste_window.title("Paste Your Text")
paste_window.geometry("400x300")
# Create a label and text box for the pasted text
label = tk.Label(paste_window, text="Paste your text below:")
label.pack(pady=5)
input_textbox = tk.Text(paste_window, height=8, width=40)
input_textbox.pack(pady=5)
# Function to handle the "Submit" button click
def submit_text():
pasted_text = input_textbox.get("1.0", tk.END).strip()
if pasted_text:
system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document"
user_input = "Summarize this paragraph:"
new_value = user_input + pasted_text
messages = [
{
"system",
system_message,
},
{"human", new_value},
]
response = client.chat.completions.create(model=args.model, messages=messages)
response_value = response.choices[0].message.content
messagebox.showinfo("Summary", response_value) # Replace with actual summarizing logic
paste_window.destroy() # Close the window
else:
messagebox.showerror("Error", "No text entered!")
# Add Submit and Cancel buttons
submit_button = tk.Button(paste_window, text="Submit", command=submit_text)
submit_button.pack(side=tk.LEFT, padx=10, pady=10)
cancel_button = tk.Button(paste_window, text="Cancel", command=paste_window.destroy)
cancel_button.pack(side=tk.RIGHT, padx=10, pady=10)
# Function to get relevant context from the vault based on user input
def get_relevant_context(rewritten_input, vault_embeddings, vault_content, top_k=3):
if vault_embeddings.nelement() == 0:
return []
input_embedding = ollama.embeddings(model='mxbai-embed-large', prompt=rewritten_input)["embedding"]
cos_scores = torch.cosine_similarity(torch.tensor(input_embedding).unsqueeze(0), vault_embeddings)
top_k = min(top_k, len(cos_scores))
top_indices = torch.topk(cos_scores, k=top_k)[1].tolist()
relevant_context = [vault_content[idx].strip() for idx in top_indices]
return relevant_context
# Function to interact with the Ollama model
def ollama_chat(user_input, system_message, vault_embeddings, vault_content, ollama_model, conversation_history):
relevant_context = get_relevant_context(user_input, vault_embeddings, vault_content, top_k=3)
if relevant_context:
context_str = "\n".join(relevant_context)
print("Context Pulled from Documents: \n\n" + CYAN + context_str + RESET_COLOR)
else:
print(CYAN + "No relevant context found." + RESET_COLOR)
user_input_with_context = user_input
if relevant_context:
user_input_with_context = context_str + "\n\n" + user_input
conversation_history.append({"role": "user", "content": user_input_with_context})
messages = [{"role": "system", "content": system_message}, *conversation_history]
response = client.chat.completions.create(model=ollama_model, messages=messages)
conversation_history.append({"role": "assistant", "content": response.choices[0].message.content})
return response.choices[0].message.content
# Function to process text files, check for NOT FINISHED flag, and compute embeddings
def process_text_files(user_input):
text_parse_directory = os.path.join("local-rag", "text_parse")
temp_file_path = os.path.join("local-rag", "temp.txt")
if not os.path.exists(text_parse_directory):
print(f"Directory '{text_parse_directory}' does not exist.")
return False
if not os.path.exists(temp_file_path):
print("temp.txt does not exist.")
return False
with open(temp_file_path, 'r', encoding='utf-8') as temp_file:
first_line = temp_file.readline().strip()
text_files = [f for f in os.listdir(text_parse_directory) if f.endswith('.txt')]
if f"{first_line}" not in text_files:
print(f"No matching file found for '{first_line}.txt' in text_parse directory.")
return False
file_path = os.path.join(text_parse_directory, f"{first_line}")
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
lines = [line.strip() for line in lines]
if len(lines) >= 2 and lines[-1] == "====================NOT FINISHED====================":
print(f"'{first_line}' contains the 'NOT FINISHED' flag. Computing embeddings.")
vault_content = []
if os.path.exists(temp_file_path):
with open(temp_file_path, "r", encoding='utf-8') as vault_file:
vault_content = vault_file.readlines()
vault_embeddings = []
for content in vault_content:
response = ollama.embeddings(model='mxbai-embed-large', prompt=content)
vault_embeddings.append(response["embedding"])
vault_embeddings_tensor = torch.tensor(vault_embeddings)
print("Embeddings for each line in the vault:")
print(vault_embeddings_tensor)
with open(os.path.join(text_parse_directory, f"{first_line}_embedding.pt"), "wb") as tensor_file:
torch.save(vault_embeddings_tensor, tensor_file)
with open(file_path, 'w', encoding='utf-8') as f:
f.writelines(lines[:-1])
else:
print(f"'{first_line}' does not contain the 'NOT FINISHED' flag or is already complete. Loading tensor if it exists.")
tensor_file_path = os.path.join(text_parse_directory, f"{first_line}_embedding.pt")
if os.path.exists(tensor_file_path):
vault_embeddings_tensor = torch.load(tensor_file_path)
print("Loaded Vault Embedding Tensor:")
print(vault_embeddings_tensor)
vault_content = []
if os.path.exists(temp_file_path):
with open(temp_file_path, "r", encoding='utf-8') as vault_file:
vault_content = vault_file.readlines()
conversation_history = []
system_message = "You are a helpful assistant that is an expert at extracting the most useful information from a given text"
response = ollama_chat(user_input, system_message, vault_embeddings_tensor, vault_content, args.model, conversation_history)
print (response)
return response
# Create the main window
root = tk.Tk()
root.title("Upload .pdf, .txt, or .json")
# Create a button to open the file dialog for PDF
pdf_button = tk.Button(root, text="Upload PDF", command=convert_pdf_to_text)
pdf_button.pack(pady=15)
# Create a button to open the file dialog for text file
txt_button = tk.Button(root, text="Upload Text File", command=upload_txtfile)
txt_button.pack(pady=15)
# Create a button to open the file dialog for JSON file
json_button = tk.Button(root, text="Upload JSON File", command=upload_jsonfile)
json_button.pack(pady=15)
# Create a button to open the summerizer
json_button = tk.Button(root, text="Summarize This!", command=summarize)
json_button.pack(pady=15)
# Configuration for the Ollama API client
client = OpenAI(base_url='http://localhost:11434/v1', api_key='llama3')
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Ollama Chat")
parser.add_argument("--model", default="llama3", help="Ollama model to use (default: llama3)")
args = parser.parse_args()
# Run the main event loop
root.mainloop()