LanceY2004's picture
Update README.md
bbd6a4c verified
|
raw
history blame
20.5 kB
metadata
license: mit
language:
  - en
base_model:
  - meta-llama/Llama-3.1-8B
pipeline_tag: reinforcement-learning

import os import tkinter as tk from tkinter import filedialog, messagebox import PyPDF2 import re import json import torch import ollama from openai import OpenAI import argparse

ANSI escape codes for colors

PINK = '\033[95m' CYAN = '\033[96m' YELLOW = '\033[93m' NEON_GREEN = '\033[92m' RESET_COLOR = '\033[0m'

Function to open a file and return its contents as a string

def open_file(filepath): with open(filepath, 'r', encoding='utf-8') as infile: return infile.read()

Function to convert PDF to text and append to vault.txt

def convert_pdf_to_text(): file_path = filedialog.askopenfilename(filetypes=[("PDF Files", "*.pdf")]) if file_path: base_directory = os.path.join("local-rag", "text_parse") file_name = os.path.basename(file_path) output_file_name = os.path.splitext(file_name)[0] + ".txt" file_output_path = os.path.join(base_directory, output_file_name)

    if not os.path.exists(base_directory):
        os.makedirs(base_directory)
        print(f"Directory '{base_directory}' created.")

    with open(file_path, 'rb') as pdf_file:
        pdf_reader = PyPDF2.PdfReader(pdf_file)
        text = ''
        for page_num in range(len(pdf_reader.pages)):
            page = pdf_reader.pages[page_num]
            if page.extract_text():
                text += page.extract_text() + " "

        text = re.sub(r'\s+', ' ', text).strip()
        sentences = re.split(r'(?<=[.!?]) +', text)
        chunks = []
        current_chunk = ""
        for sentence in sentences:
            if len(current_chunk) + len(sentence) + 1 < 1000:
                current_chunk += (sentence + " ").strip()
            else:
                chunks.append(current_chunk)
                current_chunk = sentence + " "
        if current_chunk:
            chunks.append(current_chunk)

        with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
            temp_file.write(output_file_name + "\n")
            for chunk in chunks:
                temp_file.write(chunk.strip() + "\n")
        
        with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
            vault_file.write("\n")
            for chunk in chunks:
                vault_file.write(chunk.strip() + "\n")

        if not os.path.exists(file_output_path):
            with open(file_output_path, "w", encoding="utf-8") as f:
                for chunk in chunks:
                    f.write(chunk.strip() + "\n")
                f.write("====================NOT FINISHED====================\n")
            print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
        else:
            print(f"File '{file_output_path}' already exists.")

        print(f"PDF content appended to vault.txt with each chunk on a separate line.")
        # Call the second part after the PDF conversion is done

    input_value = input("Enter your question:")
    process_text_files(input_value)

Function to upload a text file and append to vault.txt

def upload_txtfile(): file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")]) if file_path: # Define the base directory base_directory = os.path.join("local-rag", "text_parse")

    # Get the file name without the directory and extension
    file_name = os.path.basename(file_path)
    output_file_name = os.path.splitext(file_name)[0] + ".txt"  # Convert PDF filename to .txt


    # Construct the output file path in the base directory
    file_output_path = os.path.join(base_directory, output_file_name)

    # Create base directory if it doesn't exist
    if not os.path.exists(base_directory):
        os.makedirs(base_directory)
        print(f"Directory '{base_directory}' created.")

        
    with open(file_path, 'r', encoding="utf-8") as txt_file:
        text = txt_file.read()
        
        # Normalize whitespace and clean up text
        text = re.sub(r'\s+', ' ', text).strip()
        
        # Split text into chunks by sentences, respecting a maximum chunk size
        sentences = re.split(r'(?<=[.!?]) +', text)  # split on spaces following sentence-ending punctuation
        chunks = []
        current_chunk = ""
        for sentence in sentences:
            # Check if the current sentence plus the current chunk exceeds the limit
            if len(current_chunk) + len(sentence) + 1 < 1000:  # +1 for the space
                current_chunk += (sentence + " ").strip()
            else:
                # When the chunk exceeds 1000 characters, store it and start a new one
                chunks.append(current_chunk)
                current_chunk = sentence + " "
        if current_chunk:  # Don't forget the last chunk!
            chunks.append(current_chunk)
        
        # Clear temp.txt and write the new content
        with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
            temp_file.write(output_file_name + "\n")  # Write the output file name as the first line
            for chunk in chunks:
                # Write each chunk to its own line
                temp_file.write(chunk.strip() + "\n")  # Each chunk on a new line
        
        with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
            vault_file.write("\n")  # Add a new line to separate content
            for chunk in chunks:
                # Write each chunk to its own line
                vault_file.write(chunk.strip() + "\n")  # Two newlines to separate chunks
        
        # Create the file in the directory if it doesn't exist
        if not os.path.exists(file_output_path):
            with open(file_output_path, "w") as f:
                f.write("")  # Create an empty file
                f.write("====================NOT FINISHED====================\n")
            print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
        else:
            print(f"File '{file_output_path}' already exists.")
            
        print(f"Text file content appended to vault.txt with each chunk on a separate line.")

        input_value = input("Enter your question:")
        process_text_files(input_value)
else:
    print("No file selected.")        

Function to upload a JSON file and append to vault.txt

def upload_jsonfile(): file_path = filedialog.askopenfilename(filetypes=[("JSON Files", "*.json")]) if file_path:

    # Define the base directory
    base_directory = os.path.join("local-rag", "text_parse")

    # Get the file name without the directory and extension
    file_name = os.path.basename(file_path)
    output_file_name = os.path.splitext(file_name)[0] + ".txt"  # Convert PDF filename to .txt


    # Construct the output file path in the base directory
    file_output_path = os.path.join(base_directory, output_file_name)
    
    # Create base directory if it doesn't exist
    if not os.path.exists(base_directory):
        os.makedirs(base_directory)
        print(f"Directory '{base_directory}' created.")

    
    
    
    with open(file_path, 'r', encoding="utf-8") as json_file:
        data = json.load(json_file)
        
        # Flatten the JSON data into a single string
        text = json.dumps(data, ensure_ascii=False)
        
        # Normalize whitespace and clean up text
        text = re.sub(r'\s+', ' ', text).strip()
        
        # Split text into chunks by sentences, respecting a maximum chunk size
        sentences = re.split(r'(?<=[.!?]) +', text)  # split on spaces following sentence-ending punctuation
        chunks = []
        current_chunk = ""
        for sentence in sentences:
            # Check if the current sentence plus the current chunk exceeds the limit
            if len(current_chunk) + len(sentence) + 1 < 1000:  # +1 for the space
                current_chunk += (sentence + " ").strip()
            else:
                # When the chunk exceeds 1000 characters, store it and start a new one
                chunks.append(current_chunk)
                current_chunk = sentence + " "
        if current_chunk:  # Don't forget the last chunk!
            chunks.append(current_chunk)
        
        # Clear temp.txt and write the new content
        with open(os.path.join("local-rag", "temp.txt"), "w", encoding="utf-8") as temp_file:
            temp_file.write(output_file_name + "\n")  # Write the output file name as the first line
            for chunk in chunks:
                # Write each chunk to its own line
                temp_file.write(chunk.strip() + "\n")  # Each chunk on a new line
        
        with open(os.path.join("local-rag", "vault.txt"), "a", encoding="utf-8") as vault_file:
            vault_file.write("\n")  # Add a new line to separate content
            for chunk in chunks:
                # Write each chunk to its own line
                vault_file.write(chunk.strip() + "\n")  # Two newlines to separate chunks
                
        if not os.path.exists(file_output_path):
            with open(file_output_path, "w", encoding="utf-8") as f:
                for chunk in chunks:
                    f.write(chunk.strip() + "\n")  # Each chunk on a new line
                f.write("====================NOT FINISHED====================\n")
            print(f"File '{file_output_path}' created with NOT FINISHED flag at the end.")
        else:
            print(f"File '{file_output_path}' already exists.")
        

        
        print(f"JSON file content appended to vault.txt with each chunk on a separate line.")
        
        input_value = input("Enter your question:")
        process_text_files(input_value)

def summarize(): summary_window = tk.Toplevel(root) summary_window.title("Text Summarizer") summary_window.geometry("400x200")

# Create a label for the window
label = tk.Label(summary_window, text="Choose an option to summarize text:")
label.pack(pady=10)

# Create two buttons: one for uploading a .txt file, and one for pasting text directly
upload_button = tk.Button(summary_window, text="Upload from .txt File", command=summarize_from_file)
upload_button.pack(pady=5)

paste_button = tk.Button(summary_window, text="Paste your text", command=lambda: open_paste_window(summary_window))
paste_button.pack(pady=5)

Function to upload a .txt file and summarize

def summarize_from_file(): file_path = filedialog.askopenfilename(filetypes=[("Text Files", "*.txt")]) if file_path: # Define the base directory where the file will be saved base_directory = os.path.join("local-rag", "text_sum")

    file_name = os.path.basename(file_path)

    # Create the directory if it doesn't exist
    if not os.path.exists(base_directory):
        os.makedirs(base_directory)
        print(f"Directory '{base_directory}' created.")
    
    summary_content = []
    if os.path.exists(file_name):
        with open(file_name, "r", encoding='utf-8') as sum_file:
            summary_content = sum_file.readlines()
    
    summary_embeddings = []
    for content in summary_content:
        response = ollama.embeddings(model='mxbai-embed-large', prompt=content)
        summary_embeddings.append(response["embedding"])

    summary_embeddings_tensor = torch.tensor(summary_embeddings)
    print("Embeddings for each line in the vault:")
    print(summary_embeddings_tensor)

    conversation_history = []
    system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document"
    user_input = "Summarize this paragraph"

    response = ollama_chat(user_input, system_message, summary_embeddings_tensor, summary_content, args.model, conversation_history)

    messagebox.showinfo("Summary", response)  # Replace with actual summarizing logic
else:
    messagebox.showerror("Error", "No file selected!")

Function to open a window for pasting text and summarizing

def open_paste_window(parent_window): # Create a new window for pasting text paste_window = tk.Toplevel(parent_window) paste_window.title("Paste Your Text") paste_window.geometry("400x300")

# Create a label and text box for the pasted text
label = tk.Label(paste_window, text="Paste your text below:")
label.pack(pady=5)

input_textbox = tk.Text(paste_window, height=8, width=40)
input_textbox.pack(pady=5)

# Function to handle the "Submit" button click
def submit_text():
    pasted_text = input_textbox.get("1.0", tk.END).strip()
    if pasted_text:
           
        system_message = "You are a helpful assistant that is an expert at summarizing the text from a given document"
        user_input = "Summarize this paragraph:"
        new_value = user_input + pasted_text
        messages = [
            {
                "system",
                system_message,
            },
            {"human", new_value},
        ]        
        response = client.chat.completions.create(model=args.model, messages=messages)

        response_value = response.choices[0].message.content


        messagebox.showinfo("Summary", response_value)  # Replace with actual summarizing logic
        paste_window.destroy()  # Close the window
    else:
        messagebox.showerror("Error", "No text entered!")

# Add Submit and Cancel buttons
submit_button = tk.Button(paste_window, text="Submit", command=submit_text)
submit_button.pack(side=tk.LEFT, padx=10, pady=10)

cancel_button = tk.Button(paste_window, text="Cancel", command=paste_window.destroy)
cancel_button.pack(side=tk.RIGHT, padx=10, pady=10)

Function to get relevant context from the vault based on user input

def get_relevant_context(rewritten_input, vault_embeddings, vault_content, top_k=3): if vault_embeddings.nelement() == 0: return [] input_embedding = ollama.embeddings(model='mxbai-embed-large', prompt=rewritten_input)["embedding"] cos_scores = torch.cosine_similarity(torch.tensor(input_embedding).unsqueeze(0), vault_embeddings) top_k = min(top_k, len(cos_scores)) top_indices = torch.topk(cos_scores, k=top_k)[1].tolist() relevant_context = [vault_content[idx].strip() for idx in top_indices] return relevant_context

Function to interact with the Ollama model

def ollama_chat(user_input, system_message, vault_embeddings, vault_content, ollama_model, conversation_history): relevant_context = get_relevant_context(user_input, vault_embeddings, vault_content, top_k=3) if relevant_context: context_str = "\n".join(relevant_context) print("Context Pulled from Documents: \n\n" + CYAN + context_str + RESET_COLOR) else: print(CYAN + "No relevant context found." + RESET_COLOR)

user_input_with_context = user_input
if relevant_context:
    user_input_with_context = context_str + "\n\n" + user_input

conversation_history.append({"role": "user", "content": user_input_with_context})
messages = [{"role": "system", "content": system_message}, *conversation_history]

response = client.chat.completions.create(model=ollama_model, messages=messages)
conversation_history.append({"role": "assistant", "content": response.choices[0].message.content})

return response.choices[0].message.content

Function to process text files, check for NOT FINISHED flag, and compute embeddings

def process_text_files(user_input): text_parse_directory = os.path.join("local-rag", "text_parse") temp_file_path = os.path.join("local-rag", "temp.txt")

if not os.path.exists(text_parse_directory):
    print(f"Directory '{text_parse_directory}' does not exist.")
    return False

if not os.path.exists(temp_file_path):
    print("temp.txt does not exist.")
    return False

with open(temp_file_path, 'r', encoding='utf-8') as temp_file:
    first_line = temp_file.readline().strip()

text_files = [f for f in os.listdir(text_parse_directory) if f.endswith('.txt')]

if f"{first_line}" not in text_files:
    print(f"No matching file found for '{first_line}.txt' in text_parse directory.")
    return False

file_path = os.path.join(text_parse_directory, f"{first_line}")
with open(file_path, 'r', encoding='utf-8') as f:
    lines = f.readlines()

lines = [line.strip() for line in lines]

if len(lines) >= 2 and lines[-1] == "====================NOT FINISHED====================":
    print(f"'{first_line}' contains the 'NOT FINISHED' flag. Computing embeddings.")

    vault_content = []
    if os.path.exists(temp_file_path):
        with open(temp_file_path, "r", encoding='utf-8') as vault_file:
            vault_content = vault_file.readlines()

    vault_embeddings = []
    for content in vault_content:
        response = ollama.embeddings(model='mxbai-embed-large', prompt=content)
        vault_embeddings.append(response["embedding"])

    vault_embeddings_tensor = torch.tensor(vault_embeddings)
    print("Embeddings for each line in the vault:")
    print(vault_embeddings_tensor)
    
    with open(os.path.join(text_parse_directory, f"{first_line}_embedding.pt"), "wb") as tensor_file:
        torch.save(vault_embeddings_tensor, tensor_file)

    with open(file_path, 'w', encoding='utf-8') as f:
        f.writelines(lines[:-1])

else:
    print(f"'{first_line}' does not contain the 'NOT FINISHED' flag or is already complete. Loading tensor if it exists.")

    tensor_file_path = os.path.join(text_parse_directory, f"{first_line}_embedding.pt")
    if os.path.exists(tensor_file_path):
        vault_embeddings_tensor = torch.load(tensor_file_path)
        print("Loaded Vault Embedding Tensor:")
        print(vault_embeddings_tensor)

        vault_content = []
        if os.path.exists(temp_file_path):
            with open(temp_file_path, "r", encoding='utf-8') as vault_file:
                vault_content = vault_file.readlines()

conversation_history = []
system_message = "You are a helpful assistant that is an expert at extracting the most useful information from a given text"
response = ollama_chat(user_input, system_message, vault_embeddings_tensor, vault_content, args.model, conversation_history)

print (response)

return response

Create the main window

root = tk.Tk() root.title("Upload .pdf, .txt, or .json")

Create a button to open the file dialog for PDF

pdf_button = tk.Button(root, text="Upload PDF", command=convert_pdf_to_text) pdf_button.pack(pady=15)

Create a button to open the file dialog for text file

txt_button = tk.Button(root, text="Upload Text File", command=upload_txtfile) txt_button.pack(pady=15)

Create a button to open the file dialog for JSON file

json_button = tk.Button(root, text="Upload JSON File", command=upload_jsonfile) json_button.pack(pady=15)

Create a button to open the summerizer

json_button = tk.Button(root, text="Summarize This!", command=summarize) json_button.pack(pady=15)

Configuration for the Ollama API client

client = OpenAI(base_url='http://localhost:11434/v1', api_key='llama3')

Parse command-line arguments

parser = argparse.ArgumentParser(description="Ollama Chat") parser.add_argument("--model", default="llama3", help="Ollama model to use (default: llama3)") args = parser.parse_args()

Run the main event loop

root.mainloop()