|
import os |
|
from datetime import datetime |
|
from typing import Any, Dict, List, Tuple, Union |
|
|
|
import numpy as np |
|
import pandas as pd |
|
import PyPDF2 |
|
from openai import OpenAI |
|
from together import Together |
|
|
|
|
|
|
|
def current_year(): |
|
now = datetime.now() |
|
return now.year |
|
|
|
|
|
def read_and_textify( |
|
files: List[str], chunk_size: int = 2 |
|
) -> Tuple[List[str], List[str]]: |
|
""" |
|
Reads PDF files and extracts text from each page, breaking the text into specified segments. |
|
|
|
This function iterates over a list of uploaded PDF files, extracts text from each page, |
|
and compiles a list of texts and corresponding source information, segmented into smaller parts |
|
of approximately 'chunk_size' words each. |
|
|
|
Args: |
|
files (List[st.uploaded_file_manager.UploadedFile]): A list of uploaded PDF files. |
|
chunk_size (int): The number of words per text segment. Default is 50. |
|
|
|
Returns: |
|
Tuple[List[str], List[str]]: A tuple containing two lists: |
|
1. A list of strings, where each string is a segment of text extracted from a PDF page. |
|
2. A list of strings indicating the source of each text segment (file name, page number, and segment number). |
|
""" |
|
|
|
text_list = [] |
|
sources_list = [] |
|
|
|
|
|
for file in files: |
|
pdfReader = PyPDF2.PdfReader(file) |
|
|
|
for i in range(len(pdfReader.pages)): |
|
pageObj = pdfReader.pages[i] |
|
text = pageObj.extract_text() |
|
if text: |
|
|
|
words = text.split(". ") |
|
for j in range(0, len(words), chunk_size): |
|
chunk = ". ".join(words[j : j + chunk_size]) + "." |
|
text_list.append(chunk) |
|
|
|
sources_list.append(f"{file.name}_page_{i}_chunk_{j // chunk_size}") |
|
else: |
|
|
|
text_list.append("") |
|
sources_list.append(f"{file.name}_page_{i}_chunk_0") |
|
pageObj.clear() |
|
|
|
return text_list, sources_list |
|
|
|
|
|
def read_and_textify_advanced( |
|
files: List[str], chunk_size: int = 2 |
|
) -> Tuple[List[str], List[str]]: |
|
""" |
|
Reads PDF files and extracts text from each page, breaking the text into specified segments. |
|
|
|
This function iterates over a list of uploaded PDF files, extracts text from each page, |
|
and compiles a list of texts and corresponding source information, segmented into smaller parts |
|
of approximately 'chunk_size' words each. |
|
|
|
Args: |
|
files (List[st.uploaded_file_manager.UploadedFile]): A list of uploaded PDF files. |
|
chunk_size (int): The number of words per text segment. Default is 50. |
|
|
|
Returns: |
|
Tuple[List[str], List[str]]: A tuple containing two lists: |
|
1. A list of strings, where each string is a segment of text extracted from a PDF page. |
|
2. A list of strings indicating the source of each text segment (file name, page number, and segment number). |
|
""" |
|
|
|
text_list = [] |
|
sources_list = [] |
|
|
|
|
|
for file in files: |
|
pdfReader = PyPDF2.PdfReader(file) |
|
|
|
for i in range(len(pdfReader.pages)): |
|
pageObj = pdfReader.pages[i] |
|
text = pageObj.extract_text() |
|
if text: |
|
|
|
words = text.split(". ") |
|
for j in range(len(words)): |
|
|
|
start = max(0, j - chunk_size) |
|
end = min(len(words), j + chunk_size + 1) |
|
chunk = ". ".join(words[start:end]) + '.' |
|
text_list.append(chunk) |
|
|
|
sources_list.append(f"{file.name}_page_{i}_chunk_{j}") |
|
else: |
|
|
|
text_list.append("") |
|
sources_list.append(f"{file.name}_page_{i}_chunk_0") |
|
pageObj.clear() |
|
|
|
return text_list, sources_list |
|
|
|
|
|
openai_client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) |
|
|
|
|
|
def list_to_nums(sentences: List[str]) -> List[List[float]]: |
|
""" |
|
Converts a list of sentences into a list of numerical embeddings using OpenAI's embedding model. |
|
|
|
Args: |
|
- sentences (List[str]): A list of sentences (strings). |
|
|
|
Returns: |
|
- List[List[float]]: A list of lists of numerical embeddings. |
|
""" |
|
|
|
|
|
embeddings = [] |
|
|
|
|
|
for sentence in sentences: |
|
|
|
|
|
response = openai_client.embeddings.create( |
|
input=sentence, model="text-embedding-3-small" |
|
) |
|
|
|
embeddings.append(response.data[0].embedding) |
|
|
|
return embeddings |
|
|
|
|
|
def call_gpt(prompt: str, content: str) -> str: |
|
""" |
|
Sends a structured conversation context including a system prompt, user prompt, |
|
and additional background content to the GPT-3.5-turbo model for a response. |
|
|
|
This function is responsible for generating an AI-powered response by interacting |
|
with the OpenAI API. It puts together a preset system message, a formatted user query, |
|
and additional background information before requesting the completion from the model. |
|
|
|
Args: |
|
prompt (str): The main question or topic that the user wants to address. |
|
content (str): Additional background information or details relevant to the prompt. |
|
|
|
Returns: |
|
str: The generated response from the GPT model based on the given prompts and content. |
|
|
|
Note: 'openai_client' is assumed to be an already created and authenticated instance of the OpenAI |
|
openai_client, which should be set up prior to calling this function. |
|
""" |
|
|
|
|
|
response = openai_client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[ |
|
|
|
{"role": "system", "content": "You are a helpful assistant."}, |
|
|
|
{"role": "user", "content": f"I want to ask you a question: {prompt}"}, |
|
|
|
{"role": "assistant", "content": "What is the background content?"}, |
|
|
|
{"role": "user", "content": content}, |
|
], |
|
) |
|
|
|
|
|
return response.choices[0].message.content |
|
|
|
|
|
together_client = Together(api_key=os.environ["TOGETHER_API_KEY"]) |
|
|
|
|
|
def call_llama(prompt: str) -> str: |
|
""" |
|
Send a prompt to the Llama model and return the response. |
|
Args: |
|
prompt (str): The input prompt to send to the Llama model. |
|
Returns: |
|
str: The response from the Llama model. |
|
""" |
|
|
|
|
|
response = together_client.chat.completions.create( |
|
|
|
model="meta-llama/Llama-3-8b-chat-hf", |
|
|
|
messages=[{"role": "user", "content": prompt}], |
|
) |
|
|
|
|
|
return response.choices[0].message.content |
|
|
|
|
|
def quantize_to_kbit(arr: Union[np.ndarray, Any], k: int = 16) -> np.ndarray: |
|
"""Converts an array to a k-bit representation by normalizing and scaling its values. |
|
|
|
Args: |
|
arr (Union[np.ndarray, Any]): The input array to be quantized. |
|
k (int): The number of levels to quantize to. Defaults to 16 for 4-bit quantization. |
|
Returns: |
|
np.ndarray: The quantized array with values scaled to 0 to k-1. |
|
""" |
|
if not isinstance(arr, np.ndarray): |
|
arr = np.array(arr) |
|
arr_min = arr.min() |
|
arr_max = arr.max() |
|
normalized_arr = (arr - arr_min) / ( |
|
arr_max - arr_min |
|
) |
|
return np.round(normalized_arr * (k - 1)).astype( |
|
int |
|
) |
|
|
|
|
|
def quantized_influence( |
|
arr1: np.ndarray, arr2: np.ndarray, k: int = 16, use_dagger: bool = False |
|
) -> Tuple[float, List[float]]: |
|
""" |
|
Calculates a weighted measure of influence based on quantized version of input arrays and optionally applies a transformation. |
|
|
|
Args: |
|
arr1 (np.ndarray): First input array to be quantized and analyzed. |
|
arr2 (np.ndarray): Second input array to be quantized and used for influence measurement. |
|
k (int): The quantization level, defaults to 16 for 4-bit quantization. |
|
use_dagger (bool): Flag to apply a transformation based on local averages, defaults to False. |
|
Returns: |
|
Tuple[float, List[float]]: A tuple containing the quantized influence measure and an optional list of transformed values based on local estimates. |
|
""" |
|
|
|
arr1_quantized = quantize_to_kbit(arr1, k) |
|
arr2_quantized = quantize_to_kbit(arr2, k) |
|
|
|
|
|
unique_values = np.unique(arr1_quantized) |
|
|
|
|
|
total_samples = len(arr2_quantized) |
|
y_bar_global = np.mean(arr2_quantized) |
|
|
|
|
|
weighted_local_averages = [ |
|
(np.mean(arr2_quantized[arr1_quantized == val]) - y_bar_global) ** 2 |
|
* len(arr2_quantized[arr1_quantized == val]) ** 2 |
|
for val in unique_values |
|
] |
|
qim = np.sum(weighted_local_averages) / ( |
|
total_samples * np.std(arr2_quantized) |
|
) |
|
|
|
if use_dagger: |
|
|
|
local_estimates = [ |
|
np.mean(arr2_quantized[arr1_quantized == val]) for val in unique_values |
|
] |
|
daggers = { |
|
unique_values[i]: v for i, v in enumerate(local_estimates) |
|
} |
|
|
|
def find_val_(i: int) -> float: |
|
"""Helper function to map quantized values to their local estimates.""" |
|
return daggers[i] |
|
|
|
|
|
daggered_values = list(map(find_val_, arr1_quantized)) |
|
return qim, daggered_values |
|
else: |
|
|
|
daggered_values = arr1_quantized.tolist() |
|
return qim |
|
|
|
|
|
def query_search( |
|
prompt: str, |
|
sentences: list[str], |
|
query_database: list[list[float]], |
|
sources: list[str], |
|
levels: int, |
|
) -> pd.DataFrame: |
|
""" |
|
Takes a text prompt and searches a predefined database by converting the prompt |
|
and database entries to embeddings, and then calculating a quantized influence metric. |
|
|
|
Args: |
|
- prompt (str): A text prompt to search for in the database. |
|
|
|
Returns: |
|
- pd.DataFrame: A pandas DataFrame sorted by the quantized influence metric in descending order. |
|
The DataFrame contains the original sentences, their embeddings, and the computed scores. |
|
""" |
|
|
|
prompt_embed_ = list_to_nums([prompt]) |
|
|
|
|
|
scores = [ |
|
[ |
|
sentences[i], |
|
|
|
sources[i], |
|
quantized_influence( |
|
prompt_embed_[0], query_database[i], k=levels, use_dagger=False |
|
), |
|
] |
|
for i in range(len(query_database)) |
|
] |
|
|
|
|
|
refs = pd.DataFrame(scores) |
|
|
|
refs = refs.rename( |
|
|
|
columns={0: "sentences", 1: "page no", 2: "qim"} |
|
) |
|
|
|
refs = refs.sort_values(by="qim", ascending=False) |
|
|
|
return refs |
|
|