|
from PyPDF2 import PdfReader |
|
from concurrent.futures import ThreadPoolExecutor |
|
import streamlit as st |
|
import io |
|
from anthropic import Anthropic |
|
import tiktoken |
|
import re |
|
client = Anthropic() |
|
encoding_openAI = tiktoken.get_encoding("cl100k_base") |
|
encoding_anthropic = client.get_tokenizer() |
|
|
|
|
|
model_choice = st.sidebar.selectbox("Choose a Model", ["OpenAI", "Anthropic"]) |
|
|
|
|
|
def clean_text_content(text): |
|
|
|
cleaned_text = re.sub(r'[^a-zA-Z0-9 \r\n.,;!?()\-\'\"&+:%$#@*]', '', text) |
|
return cleaned_text |
|
|
|
|
|
def create_chunks(text, n, tokenizer_name): |
|
"""Returns successive n-sized chunks from provided text.""" |
|
tokenizer = encoding_openAI if tokenizer_name == "OpenAI" else encoding_anthropic |
|
encoded = tokenizer.encode(text) |
|
|
|
|
|
tokens = encoded.ids if hasattr(encoded, "ids") else encoded |
|
|
|
i = 0 |
|
while i < len(tokens): |
|
|
|
j = min(i + int(1.5 * n), len(tokens)) |
|
while j > i + int(0.5 * n): |
|
|
|
chunk = tokenizer.decode(tokens[i:j]) |
|
if chunk.endswith(".") or chunk.endswith("\n"): |
|
break |
|
j -= 1 |
|
|
|
if j == i + int(0.5 * n): |
|
j = min(i + n, len(tokens)) |
|
yield tokens[i:j] |
|
i = j |
|
|
|
|
|
def convert_pdf_to_text(pdf_file_data, file_name): |
|
text = "\n---\n" |
|
text += f"file name: {file_name}\n content: \n" |
|
pdf_reader = PdfReader(pdf_file_data) |
|
text += "".join([page.extract_text() for page in pdf_reader.pages]) |
|
text += "\n---\n" |
|
return text |
|
|
|
|
|
def pdf_to_text(pdf_files_data, file_names): |
|
with ThreadPoolExecutor() as executor: |
|
results = executor.map(convert_pdf_to_text, pdf_files_data, file_names) |
|
return results |
|
|
|
|
|
st.title("PDF Utility") |
|
|
|
|
|
step01 = "Step 01: Upload Files" |
|
step02 = "Step 02: Edit Knowledge Base" |
|
step03 = "Step 03: Split text" |
|
tabs = [step01, step02, step03] |
|
if "selected_tab" not in st.session_state: |
|
st.session_state.selected_tab = step01 |
|
|
|
selected_tab = st.sidebar.radio( |
|
"Choose a tab", tabs, index=tabs.index(st.session_state.selected_tab)) |
|
|
|
if "text_content" not in st.session_state: |
|
st.session_state.text_content = "" |
|
|
|
|
|
if selected_tab == step02: |
|
st.subheader("Knowledge Base Text Area") |
|
st.session_state.text_content = st.text_area( |
|
"Knowledge Text Area", st.session_state.text_content, height=400) |
|
if st.button("Compute Tokens"): |
|
if model_choice == "OpenAI": |
|
num_tokens = len(encoding_openAI.encode( |
|
st.session_state.text_content)) |
|
st.write(f"Total number of tokens (OpenAI): {num_tokens}") |
|
else: |
|
tokens_count = len(encoding_anthropic.encode( |
|
st.session_state.text_content)) |
|
st.write(f"Total number of tokens (Anthropic): {tokens_count}") |
|
elif selected_tab == step01: |
|
st.subheader("Upload PDFs to Append to Knowledge Base") |
|
|
|
uploaded_files = st.file_uploader( |
|
"Upload PDF files", type="pdf", accept_multiple_files=True) |
|
if uploaded_files: |
|
pdf_files_data = [io.BytesIO(uploaded_file.read()) |
|
for uploaded_file in uploaded_files] |
|
file_names = [uploaded_file.name for uploaded_file in uploaded_files] |
|
|
|
if st.button('Convert to text'): |
|
converting_message = st.text("Converting PDFs...") |
|
converted_text = "\n".join(pdf_to_text(pdf_files_data, file_names)) |
|
st.session_state.text_content += converted_text |
|
converting_message.empty() |
|
st.session_state.selected_tab = step02 |
|
st.experimental_rerun() |
|
|
|
elif selected_tab == step03: |
|
st.subheader("Splitting Options") |
|
|
|
model_choice = st.selectbox( |
|
"Choose a Model", ["OpenAI", "Anthropic"], key="model_choice_selectbox") |
|
max_tokens = st.number_input( |
|
"Max number of tokens per chunk", min_value=100, value=8000, key="max_tokens_input") |
|
clean_text = st.checkbox("Clean text before encoding and splitting?") |
|
|
|
|
|
prefix = st.text_area("Prefix for each chunk:", "") |
|
postfix = st.text_area("Postfix for each chunk:", "") |
|
|
|
if clean_text: |
|
st.session_state.text_content = clean_text_content( |
|
st.session_state.text_content) |
|
|
|
chunks_generator = create_chunks( |
|
st.session_state.text_content, max_tokens, model_choice) |
|
chunks = [encoding_openAI.decode(chunk_tokens) if model_choice == "OpenAI" else encoding_anthropic.decode( |
|
chunk_tokens) for chunk_tokens in chunks_generator] |
|
|
|
for i, chunk in enumerate(chunks, 1): |
|
|
|
chunk_with_affixes = f"{prefix}{chunk}{postfix}" |
|
chunk_content = st.text_area( |
|
f"Chunk {i} content:", chunk_with_affixes, height=200) |
|
|