|
import os |
|
from datetime import datetime |
|
from typing import Any, Dict, List, Tuple, Union |
|
|
|
import numpy as np |
|
import pandas as pd |
|
import PyPDF2 |
|
from openai import OpenAI |
|
|
|
|
|
|
|
def current_year(): |
|
now = datetime.now() |
|
return now.year |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from typing import List, Tuple |
|
|
|
import PyPDF2 |
|
|
|
|
|
def read_and_textify( |
|
files: List[str], chunk_size: int = 50 |
|
) -> Tuple[List[str], List[str]]: |
|
""" |
|
Reads PDF files and extracts text from each page, breaking the text into specified segments. |
|
|
|
This function iterates over a list of uploaded PDF files, extracts text from each page, |
|
and compiles a list of texts and corresponding source information, segmented into smaller parts |
|
of approximately 'chunk_size' words each. |
|
|
|
Args: |
|
files (List[st.uploaded_file_manager.UploadedFile]): A list of uploaded PDF files. |
|
chunk_size (int): The number of words per text segment. Default is 50. |
|
|
|
Returns: |
|
Tuple[List[str], List[str]]: A tuple containing two lists: |
|
1. A list of strings, where each string is a segment of text extracted from a PDF page. |
|
2. A list of strings indicating the source of each text segment (file name, page number, and segment number). |
|
""" |
|
|
|
text_list = [] |
|
sources_list = [] |
|
|
|
|
|
for file in files: |
|
pdfReader = PyPDF2.PdfReader(file) |
|
|
|
for i in range(len(pdfReader.pages)): |
|
pageObj = pdfReader.pages[i] |
|
text = pageObj.extract_text() |
|
if text: |
|
|
|
words = text.split() |
|
for j in range(0, len(words), chunk_size): |
|
chunk = " ".join(words[j : j + chunk_size]) |
|
text_list.append(chunk) |
|
|
|
sources_list.append(f"{file.name}_page_{i}_chunk_{j // chunk_size}") |
|
else: |
|
|
|
text_list.append("") |
|
sources_list.append(f"{file.name}_page_{i}_chunk_0") |
|
pageObj.clear() |
|
|
|
return text_list, sources_list |
|
|
|
|
|
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"]) |
|
|
|
|
|
def list_to_nums(sentences: List[str]) -> List[List[float]]: |
|
""" |
|
Converts a list of sentences into a list of numerical embeddings using OpenAI's embedding model. |
|
|
|
Args: |
|
- sentences (List[str]): A list of sentences (strings). |
|
|
|
Returns: |
|
- List[List[float]]: A list of lists of numerical embeddings. |
|
""" |
|
|
|
|
|
embeddings = [] |
|
|
|
|
|
for sentence in sentences: |
|
|
|
|
|
response = client.embeddings.create( |
|
input=sentence, model="text-embedding-3-small" |
|
) |
|
|
|
embeddings.append(response.data[0].embedding) |
|
|
|
return embeddings |
|
|
|
|
|
def quantize_to_kbit(arr: Union[np.ndarray, Any], k: int = 16) -> np.ndarray: |
|
"""Converts an array to a k-bit representation by normalizing and scaling its values. |
|
|
|
Args: |
|
arr (Union[np.ndarray, Any]): The input array to be quantized. |
|
k (int): The number of levels to quantize to. Defaults to 16 for 4-bit quantization. |
|
Returns: |
|
np.ndarray: The quantized array with values scaled to 0 to k-1. |
|
""" |
|
if not isinstance(arr, np.ndarray): |
|
arr = np.array(arr) |
|
arr_min = arr.min() |
|
arr_max = arr.max() |
|
normalized_arr = (arr - arr_min) / ( |
|
arr_max - arr_min |
|
) |
|
return np.round(normalized_arr * (k - 1)).astype( |
|
int |
|
) |
|
|
|
|
|
def quantized_influence( |
|
arr1: np.ndarray, arr2: np.ndarray, k: int = 16, use_dagger: bool = False |
|
) -> Tuple[float, List[float]]: |
|
""" |
|
Calculates a weighted measure of influence based on quantized version of input arrays and optionally applies a transformation. |
|
|
|
Args: |
|
arr1 (np.ndarray): First input array to be quantized and analyzed. |
|
arr2 (np.ndarray): Second input array to be quantized and used for influence measurement. |
|
k (int): The quantization level, defaults to 16 for 4-bit quantization. |
|
use_dagger (bool): Flag to apply a transformation based on local averages, defaults to False. |
|
Returns: |
|
Tuple[float, List[float]]: A tuple containing the quantized influence measure and an optional list of transformed values based on local estimates. |
|
""" |
|
|
|
arr1_quantized = quantize_to_kbit(arr1, k) |
|
arr2_quantized = quantize_to_kbit(arr2, k) |
|
|
|
|
|
unique_values = np.unique(arr1_quantized) |
|
|
|
|
|
total_samples = len(arr2_quantized) |
|
y_bar_global = np.mean(arr2_quantized) |
|
|
|
|
|
weighted_local_averages = [ |
|
(np.mean(arr2_quantized[arr1_quantized == val]) - y_bar_global) ** 2 |
|
* len(arr2_quantized[arr1_quantized == val]) ** 2 |
|
for val in unique_values |
|
] |
|
qim = np.sum(weighted_local_averages) / ( |
|
total_samples * np.std(arr2_quantized) |
|
) |
|
|
|
if use_dagger: |
|
|
|
local_estimates = [ |
|
np.mean(arr2_quantized[arr1_quantized == val]) for val in unique_values |
|
] |
|
daggers = { |
|
unique_values[i]: v for i, v in enumerate(local_estimates) |
|
} |
|
|
|
def find_val_(i: int) -> float: |
|
"""Helper function to map quantized values to their local estimates.""" |
|
return daggers[i] |
|
|
|
|
|
daggered_values = list(map(find_val_, arr1_quantized)) |
|
return qim, daggered_values |
|
else: |
|
|
|
daggered_values = arr1_quantized.tolist() |
|
return qim |
|
|
|
|
|
def query_search( |
|
prompt: str, |
|
sentences: list[str], |
|
query_database: list[list[float]], |
|
sources: list[str], |
|
levels: int, |
|
) -> pd.DataFrame: |
|
""" |
|
Takes a text prompt and searches a predefined database by converting the prompt |
|
and database entries to embeddings, and then calculating a quantized influence metric. |
|
|
|
Args: |
|
- prompt (str): A text prompt to search for in the database. |
|
|
|
Returns: |
|
- pd.DataFrame: A pandas DataFrame sorted by the quantized influence metric in descending order. |
|
The DataFrame contains the original sentences, their embeddings, and the computed scores. |
|
""" |
|
|
|
prompt_embed_ = list_to_nums([prompt]) |
|
|
|
|
|
scores = [ |
|
[ |
|
sentences[i], |
|
query_database[i], |
|
sources[i], |
|
quantized_influence( |
|
prompt_embed_[0], query_database[i], k=levels, use_dagger=False |
|
), |
|
] |
|
for i in range(len(query_database)) |
|
] |
|
|
|
|
|
refs = pd.DataFrame(scores) |
|
|
|
refs = refs.rename( |
|
columns={0: "sentences", 1: "query_embeddings", 2: "page no", 3: "qim"} |
|
) |
|
|
|
refs = refs.sort_values(by="qim", ascending=False) |
|
|
|
return refs |
|
|