|
--- |
|
license: mit |
|
language: |
|
- en |
|
base_model: |
|
- meta-llama/Llama-3.1-8B |
|
pipeline_tag: reinforcement-learning |
|
--- |
|
|
|
import os |
|
import tkinter as tk |
|
from tkinter import filedialog, messagebox |
|
import PyPDF2 |
|
import re |
|
import json |
|
import torch |
|
import ollama |
|
from openai import OpenAI |
|
import argparse |
|
|
|
# ANSI escape codes for colors |
|
PINK = '\033[95m' |
|
CYAN = '\033[96m' |
|
YELLOW = '\033[93m' |
|
NEON_GREEN = '\033[92m' |
|
RESET_COLOR = '\033[0m' |
|
|
|
# Function to open a file and return its contents as a string |
|
def open_file(filepath): |
|
with open(filepath, 'r', encoding='utf-8') as infile: |
|
return infile.read() |
|
|
|
# Function to convert PDF to text and append to vault.txt |
|
def convert_pdf_to_text(): |
|
file_path = filedialog.askopenfilename(filetypes=[("PDF Files", "*.pdf")]) |
|
if file_path: |
|
base_directory = os.path.join("local-rag", "text_parse") |
|
file_name = os.path.basename(file_path) |
|
output_file_name = os.path.splitext(file_name)[0] + ".txt" |
|
file_output_path = os.path.join(base_directory, output_file_name) |
|
|
|
if not os.path.exists(base_directory): |
|
os.makedirs(base_directory) |
|
print(f"Directory '{base_directory}' created.") |
|
|
|
with open(file_path, 'rb') as pdf_file: |
|
pdf_reader = PyPDF2.PdfReader(pdf_file) |
|
text = '' |
|
for page_num in range(len(pdf_reader.pages)): |
|
page = pdf_reader.pages[page_num] |
|
if page.extract_text(): |
|
text += page.extract_text() + " " |
|
|
|
text = re.sub(r'\s+', ' ', text).strip() |
|
sentences = re.split(r'(?<=[.!?]) +', text) |
|
chunks = [] |
|
current_chunk = "" |
|
for sentence in sentences: |
|
if len(current_chunk) + len(sentence) + 1 < 1000: |
|
current_chunk += (sentence + " ").strip() |
|
else: |
|
chunks.append(current_chunk) |
|
current_chunk = sentence + " " |
|
if current_chunk: |
|
chunks.append(current_chunk) |
|
|
|
with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file: |
|
temp_file.write(output_file_name + "\n") |
|
for chunk in chunks: |
|
temp_file.write(chunk.strip() + "\n") |
|
|
|
with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file: |
|
vault_file.write("\n") |
|
for chunk in chunks: |
|
vault_file.write(chunk.strip() + "\n") |
|
|
|
if not os.path.exists(file_output_path): |
|
with open(file_output_path, "w", encoding="utf-8") as f: |
|
for chunk in chunks: |
|
f.write(chunk.strip() + "\n") |
|
f.write("====================NOT FINISHED====================\n") |
|
print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.") |
|
else: |
|
print(f"File '{file_output_path}' already exists.") |
|
|
|
print(f"PDF content appended to vault.txt with each chunk on a separate line.") |
|
# Call the second part after the PDF conversion is done |
|
|
|
input_value = input("Enter your question:") |
|
process_text_files(input_value) |
|
|
|
# Function to upload a text file and append to vault.txt |
|
def upload_txtfile(): |
|
file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")]) |
|
if file_path: |
|
# Define the base directory |
|
base_directory = os.path.join("local-rag", "text_parse") |
|
|
|
# Get the file name without the directory and extension |
|
file_name = os.path.basename(file_path) |
|
output_file_name = os.path.splitext(file_name)[0] + ".txt" # Convert PDF filename to .txt |
|
|
|
|
|
# Construct the output file path in the base directory |
|
file_output_path = os.path.join(base_directory, output_file_name) |
|
|
|
# Create base directory if it doesn't exist |
|
if not os.path.exists(base_directory): |
|
os.makedirs(base_directory) |
|
print(f"Directory '{base_directory}' created.") |
|
|
|
|
|
with open(file_path, 'r', encoding="utf-8") as txt_file: |
|
text = txt_file.read() |
|
|
|
# Normalize whitespace and clean up text |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
# Split text into chunks by sentences, respecting a maximum chunk size |
|
sentences = re.split(r'(?<=[.!?]) +', text) # split on spaces following sentence-ending punctuation |
|
chunks = [] |
|
current_chunk = "" |
|
for sentence in sentences: |
|
# Check if the current sentence plus the current chunk exceeds the limit |
|
if len(current_chunk) + len(sentence) + 1 < 1000: # +1 for the space |
|
current_chunk += (sentence + " ").strip() |
|
else: |
|
# When the chunk exceeds 1000 characters, store it and start a new one |
|
chunks.append(current_chunk) |
|
current_chunk = sentence + " " |
|
if current_chunk: # Don't forget the last chunk! |
|
chunks.append(current_chunk) |
|
|
|
# Clear temp.txt and write the new content |
|
with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file: |
|
temp_file.write(output_file_name + "\n") # Write the output file name as the first line |
|
for chunk in chunks: |
|
# Write each chunk to its own line |
|
temp_file.write(chunk.strip() + "\n") # Each chunk on a new line |
|
|
|
with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file: |
|
vault_file.write("\n") # Add a new line to separate content |
|
for chunk in chunks: |
|
# Write each chunk to its own line |
|
vault_file.write(chunk.strip() + "\n") # Two newlines to separate chunks |
|
|
|
# Create the file in the directory if it doesn't exist |
|
if not os.path.exists(file_output_path): |
|
with open(file_output_path, "w") as f: |
|
f.write("") # Create an empty file |
|
f.write("====================NOT FINISHED====================\n") |
|
print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.") |
|
else: |
|
print(f"File '{file_output_path}' already exists.") |
|
|
|
print(f"Text file content appended to vault.txt with each chunk on a separate line.") |
|
|
|
input_value = input("Enter your question:") |
|
process_text_files(input_value) |
|
else: |
|
print("No file selected.") |
|
|
|
# Function to upload a JSON file and append to vault.txt |
|
def upload_jsonfile(): |
|
file_path = filedialog.askopenfilename(filetypes=[("JSON Files", "*.json")]) |
|
if file_path: |
|
|
|
# Define the base directory |
|
base_directory = os.path.join("local-rag", "text_parse") |
|
|
|
# Get the file name without the directory and extension |
|
file_name = os.path.basename(file_path) |
|
output_file_name = os.path.splitext(file_name)[0] + ".txt" # Convert PDF filename to .txt |
|
|
|
|
|
# Construct the output file path in the base directory |
|
file_output_path = os.path.join(base_directory, output_file_name) |
|
|
|
# Create base directory if it doesn't exist |
|
if not os.path.exists(base_directory): |
|
os.makedirs(base_directory) |
|
print(f"Directory '{base_directory}' created.") |
|
|
|
|
|
|
|
|
|
with open(file_path, 'r', encoding="utf-8") as json_file: |
|
data = json.load(json_file) |
|
|
|
# Flatten the JSON data into a single string |
|
text = json.dumps(data, ensure_ascii=False) |
|
|
|
# Normalize whitespace and clean up text |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
# Split text into chunks by sentences, respecting a maximum chunk size |
|
sentences = re.split(r'(?<=[.!?]) +', text) # split on spaces following sentence-ending punctuation |
|
chunks = [] |
|
current_chunk = "" |
|
for sentence in sentences: |
|
# Check if the current sentence plus the current chunk exceeds the limit |
|
if len(current_chunk) + len(sentence) + 1 < 1000: # +1 for the space |
|
current_chunk += (sentence + " ").strip() |
|
else: |
|
# When the chunk exceeds 1000 characters, store it and start a new one |
|
chunks.append(current_chunk) |
|
current_chunk = sentence + " " |
|
if current_chunk: # Don't forget the last chunk! |
|
chunks.append(current_chunk) |
|
|
|
# Clear temp.txt and write the new content |
|
with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file: |
|
temp_file.write(output_file_name + "\n") # Write the output file name as the first line |
|
for chunk in chunks: |
|
# Write each chunk to its own line |
|
temp_file.write(chunk.strip() + "\n") # Each chunk on a new line |
|
|
|
with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file: |
|
vault_file.write("\n") # Add a new line to separate content |
|
for chunk in chunks: |
|
# Write each chunk to its own line |
|
vault_file.write(chunk.strip() + "\n") # Two newlines to separate chunks |
|
|
|
if not os.path.exists(file_output_path): |
|
with open(file_output_path, "w", encoding="utf-8") as f: |
|
for chunk in chunks: |
|
f.write(chunk.strip() + "\n") # Each chunk on a new line |
|
f.write("====================NOT FINISHED====================\n") |
|
print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.") |
|
else: |
|
print(f"File '{file_output_path}' already exists.") |
|
|
|
|
|
|
|
print(f"JSON file content appended to vault.txt with each chunk on a separate line.") |
|
|
|
input_value = input("Enter your question:") |
|
process_text_files(input_value) |
|
|
|
def summarize(): |
|
summary_window = tk.Toplevel(root) |
|
summary_window.title("Text Summarizer") |
|
summary_window.geometry("400x200") |
|
|
|
# Create a label for the window |
|
label = tk.Label(summary_window, text="Choose an option to summarize text:") |
|
label.pack(pady=10) |
|
|
|
# Create two buttons: one for uploading a .txt file, and one for pasting text directly |
|
upload_button = tk.Button(summary_window, text="Upload from .txt File", command=summarize_from_file) |
|
upload_button.pack(pady=5) |
|
|
|
paste_button = tk.Button(summary_window, text="Paste your text", command=lambda: open_paste_window(summary_window)) |
|
paste_button.pack(pady=5) |
|
|
|
# Function to upload a .txt file and summarize |
|
def summarize_from_file(): |
|
file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")]) |
|
if file_path: |
|
# Define the base directory where the file will be saved |
|
base_directory = os.path.join("local-rag", "text_sum") |
|
|
|
file_name = os.path.basename(file_path) |
|
|
|
# Create the directory if it doesn't exist |
|
if not os.path.exists(base_directory): |
|
os.makedirs(base_directory) |
|
print(f"Directory '{base_directory}' created.") |
|
|
|
summary_content = [] |
|
if os.path.exists(file_name): |
|
with open(file_name, "r", encoding='utf-8') as sum_file: |
|
summary_content = sum_file.readlines() |
|
|
|
summary_embeddings = [] |
|
for content in summary_content: |
|
response = ollama.embeddings(model='mxbai-embed-large', prompt=content) |
|
summary_embeddings.append(response["embedding"]) |
|
|
|
summary_embeddings_tensor = torch.tensor(summary_embeddings) |
|
print("Embeddings for each line in the vault:") |
|
print(summary_embeddings_tensor) |
|
|
|
conversation_history = [] |
|
system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document" |
|
user_input = "Summarize this paragraph" |
|
|
|
response = ollama_chat(user_input, system_message, summary_embeddings_tensor, summary_content, args.model, conversation_history) |
|
|
|
messagebox.showinfo("Summary", response) # Replace with actual summarizing logic |
|
else: |
|
messagebox.showerror("Error", "No file selected!") |
|
|
|
# Function to open a window for pasting text and summarizing |
|
def open_paste_window(parent_window): |
|
# Create a new window for pasting text |
|
paste_window = tk.Toplevel(parent_window) |
|
paste_window.title("Paste Your Text") |
|
paste_window.geometry("400x300") |
|
|
|
# Create a label and text box for the pasted text |
|
label = tk.Label(paste_window, text="Paste your text below:") |
|
label.pack(pady=5) |
|
|
|
input_textbox = tk.Text(paste_window, height=8, width=40) |
|
input_textbox.pack(pady=5) |
|
|
|
# Function to handle the "Submit" button click |
|
def submit_text(): |
|
pasted_text = input_textbox.get("1.0", tk.END).strip() |
|
if pasted_text: |
|
|
|
system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document" |
|
user_input = "Summarize this paragraph:" |
|
new_value = user_input + pasted_text |
|
messages = [ |
|
{ |
|
"system", |
|
system_message, |
|
}, |
|
{"human", new_value}, |
|
] |
|
response = client.chat.completions.create(model=args.model, messages=messages) |
|
|
|
response_value = response.choices[0].message.content |
|
|
|
|
|
messagebox.showinfo("Summary", response_value) # Replace with actual summarizing logic |
|
paste_window.destroy() # Close the window |
|
else: |
|
messagebox.showerror("Error", "No text entered!") |
|
|
|
# Add Submit and Cancel buttons |
|
submit_button = tk.Button(paste_window, text="Submit", command=submit_text) |
|
submit_button.pack(side=tk.LEFT, padx=10, pady=10) |
|
|
|
cancel_button = tk.Button(paste_window, text="Cancel", command=paste_window.destroy) |
|
cancel_button.pack(side=tk.RIGHT, padx=10, pady=10) |
|
|
|
|
|
# Function to get relevant context from the vault based on user input |
|
def get_relevant_context(rewritten_input, vault_embeddings, vault_content, top_k=3): |
|
if vault_embeddings.nelement() == 0: |
|
return [] |
|
input_embedding = ollama.embeddings(model='mxbai-embed-large', prompt=rewritten_input)["embedding"] |
|
cos_scores = torch.cosine_similarity(torch.tensor(input_embedding).unsqueeze(0), vault_embeddings) |
|
top_k = min(top_k, len(cos_scores)) |
|
top_indices = torch.topk(cos_scores, k=top_k)[1].tolist() |
|
relevant_context = [vault_content[idx].strip() for idx in top_indices] |
|
return relevant_context |
|
|
|
# Function to interact with the Ollama model |
|
def ollama_chat(user_input, system_message, vault_embeddings, vault_content, ollama_model, conversation_history): |
|
relevant_context = get_relevant_context(user_input, vault_embeddings, vault_content, top_k=3) |
|
if relevant_context: |
|
context_str = "\n".join(relevant_context) |
|
print("Context Pulled from Documents: \n\n" + CYAN + context_str + RESET_COLOR) |
|
else: |
|
print(CYAN + "No relevant context found." + RESET_COLOR) |
|
|
|
user_input_with_context = user_input |
|
if relevant_context: |
|
user_input_with_context = context_str + "\n\n" + user_input |
|
|
|
conversation_history.append({"role": "user", "content": user_input_with_context}) |
|
messages = [{"role": "system", "content": system_message}, *conversation_history] |
|
|
|
response = client.chat.completions.create(model=ollama_model, messages=messages) |
|
conversation_history.append({"role": "assistant", "content": response.choices[0].message.content}) |
|
|
|
return response.choices[0].message.content |
|
|
|
# Function to process text files, check for NOT FINISHED flag, and compute embeddings |
|
def process_text_files(user_input): |
|
text_parse_directory = os.path.join("local-rag", "text_parse") |
|
temp_file_path = os.path.join("local-rag", "temp.txt") |
|
|
|
if not os.path.exists(text_parse_directory): |
|
print(f"Directory '{text_parse_directory}' does not exist.") |
|
return False |
|
|
|
if not os.path.exists(temp_file_path): |
|
print("temp.txt does not exist.") |
|
return False |
|
|
|
with open(temp_file_path, 'r', encoding='utf-8') as temp_file: |
|
first_line = temp_file.readline().strip() |
|
|
|
text_files = [f for f in os.listdir(text_parse_directory) if f.endswith('.txt')] |
|
|
|
if f"{first_line}" not in text_files: |
|
print(f"No matching file found for '{first_line}.txt' in text_parse directory.") |
|
return False |
|
|
|
file_path = os.path.join(text_parse_directory, f"{first_line}") |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
lines = f.readlines() |
|
|
|
lines = [line.strip() for line in lines] |
|
|
|
if len(lines) >= 2 and lines[-1] == "====================NOT FINISHED====================": |
|
print(f"'{first_line}' contains the 'NOT FINISHED' flag. Computing embeddings.") |
|
|
|
vault_content = [] |
|
if os.path.exists(temp_file_path): |
|
with open(temp_file_path, "r", encoding='utf-8') as vault_file: |
|
vault_content = vault_file.readlines() |
|
|
|
vault_embeddings = [] |
|
for content in vault_content: |
|
response = ollama.embeddings(model='mxbai-embed-large', prompt=content) |
|
vault_embeddings.append(response["embedding"]) |
|
|
|
vault_embeddings_tensor = torch.tensor(vault_embeddings) |
|
print("Embeddings for each line in the vault:") |
|
print(vault_embeddings_tensor) |
|
|
|
with open(os.path.join(text_parse_directory, f"{first_line}_embedding.pt"), "wb") as tensor_file: |
|
torch.save(vault_embeddings_tensor, tensor_file) |
|
|
|
with open(file_path, 'w', encoding='utf-8') as f: |
|
f.writelines(lines[:-1]) |
|
|
|
else: |
|
print(f"'{first_line}' does not contain the 'NOT FINISHED' flag or is already complete. Loading tensor if it exists.") |
|
|
|
tensor_file_path = os.path.join(text_parse_directory, f"{first_line}_embedding.pt") |
|
if os.path.exists(tensor_file_path): |
|
vault_embeddings_tensor = torch.load(tensor_file_path) |
|
print("Loaded Vault Embedding Tensor:") |
|
print(vault_embeddings_tensor) |
|
|
|
vault_content = [] |
|
if os.path.exists(temp_file_path): |
|
with open(temp_file_path, "r", encoding='utf-8') as vault_file: |
|
vault_content = vault_file.readlines() |
|
|
|
conversation_history = [] |
|
system_message = "You are a helpful assistant that is an expert at extracting the most useful information from a given text" |
|
response = ollama_chat(user_input, system_message, vault_embeddings_tensor, vault_content, args.model, conversation_history) |
|
|
|
print (response) |
|
|
|
return response |
|
|
|
# Create the main window |
|
root = tk.Tk() |
|
root.title("Upload .pdf, .txt, or .json") |
|
|
|
# Create a button to open the file dialog for PDF |
|
pdf_button = tk.Button(root, text="Upload PDF", command=convert_pdf_to_text) |
|
pdf_button.pack(pady=15) |
|
|
|
# Create a button to open the file dialog for text file |
|
txt_button = tk.Button(root, text="Upload Text File", command=upload_txtfile) |
|
txt_button.pack(pady=15) |
|
|
|
# Create a button to open the file dialog for JSON file |
|
json_button = tk.Button(root, text="Upload JSON File", command=upload_jsonfile) |
|
json_button.pack(pady=15) |
|
|
|
# Create a button to open the summerizer |
|
json_button = tk.Button(root, text="Summarize This!", command=summarize) |
|
json_button.pack(pady=15) |
|
|
|
# Configuration for the Ollama API client |
|
client = OpenAI(base_url='http://localhost:11434/v1', api_key='llama3') |
|
|
|
# Parse command-line arguments |
|
parser = argparse.ArgumentParser(description="Ollama Chat") |
|
parser.add_argument("--model", default="llama3", help="Ollama model to use (default: llama3)") |
|
args = parser.parse_args() |
|
|
|
# Run the main event loop |
|
root.mainloop() |
|
|