File size: 5,170 Bytes
6ce362f 11fb0d6 6ce362f 11fb0d6 6ce362f 11fb0d6 6ce362f d0341c5 11fb0d6 d0341c5 41dbe93 6ce362f 11fb0d6 d0341c5 41dbe93 d0341c5 41dbe93 d0341c5 11fb0d6 d0341c5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
from PyPDF2 import PdfReader
from concurrent.futures import ThreadPoolExecutor
import streamlit as st
import io
from anthropic import Anthropic
import tiktoken
import re
client = Anthropic()
encoding_openAI = tiktoken.get_encoding("cl100k_base")
encoding_anthropic = client.get_tokenizer()
# Model choice and max tokens input
model_choice = st.sidebar.selectbox("Choose a Model", ["OpenAI", "Anthropic"])
def clean_text_content(text):
# Keep only English letters, numbers, spaces, line breaks, and common punctuation/symbols
cleaned_text = re.sub(r'[^a-zA-Z0-9 \r\n.,;!?()\-\'\"&+:%$#@*]', '', text)
return cleaned_text
def create_chunks(text, n, tokenizer_name):
"""Returns successive n-sized chunks from provided text."""
tokenizer = encoding_openAI if tokenizer_name == "OpenAI" else encoding_anthropic
encoded = tokenizer.encode(text)
# Check for type of token and adapt accordingly
tokens = encoded.ids if hasattr(encoded, "ids") else encoded
i = 0
while i < len(tokens):
# Find the nearest end of sentence within a range of 0.5 * n and 1.5 * n tokens
j = min(i + int(1.5 * n), len(tokens))
while j > i + int(0.5 * n):
# Decode the tokens and check for full stop or newline
chunk = tokenizer.decode(tokens[i:j])
if chunk.endswith(".") or chunk.endswith("\n"):
break
j -= 1
# If no end of sentence found, use n tokens as the chunk size
if j == i + int(0.5 * n):
j = min(i + n, len(tokens))
yield tokens[i:j]
i = j
def convert_pdf_to_text(pdf_file_data, file_name):
text = "\n---\n"
text += f"file name: {file_name}\n content: \n"
pdf_reader = PdfReader(pdf_file_data)
text += "".join([page.extract_text() for page in pdf_reader.pages])
text += "\n---\n"
return text
def pdf_to_text(pdf_files_data, file_names):
with ThreadPoolExecutor() as executor:
results = executor.map(convert_pdf_to_text, pdf_files_data, file_names)
return results
st.title("PDF Utility")
# Create tabs
step01 = "Step 01: Upload Files"
step02 = "Step 02: Edit Knowledge Base"
step03 = "Step 03: Split text"
tabs = [step01, step02, step03]
if "selected_tab" not in st.session_state:
st.session_state.selected_tab = step01
selected_tab = st.sidebar.radio(
"Choose a tab", tabs, index=tabs.index(st.session_state.selected_tab))
if "text_content" not in st.session_state:
st.session_state.text_content = ""
# Define content for each tab
if selected_tab == step02:
st.subheader("Knowledge Base Text Area")
st.session_state.text_content = st.text_area(
"Knowledge Text Area", st.session_state.text_content, height=400)
if st.button("Compute Tokens"):
if model_choice == "OpenAI":
num_tokens = len(encoding_openAI.encode(
st.session_state.text_content))
st.write(f"Total number of tokens (OpenAI): {num_tokens}")
else:
tokens_count = len(encoding_anthropic.encode(
st.session_state.text_content))
st.write(f"Total number of tokens (Anthropic): {tokens_count}")
elif selected_tab == step01:
st.subheader("Upload PDFs to Append to Knowledge Base")
uploaded_files = st.file_uploader(
"Upload PDF files", type="pdf", accept_multiple_files=True)
if uploaded_files:
pdf_files_data = [io.BytesIO(uploaded_file.read())
for uploaded_file in uploaded_files]
file_names = [uploaded_file.name for uploaded_file in uploaded_files]
if st.button('Convert to text'):
converting_message = st.text("Converting PDFs...")
converted_text = "\n".join(pdf_to_text(pdf_files_data, file_names))
st.session_state.text_content += converted_text
converting_message.empty()
st.session_state.selected_tab = step02
st.experimental_rerun()
elif selected_tab == step03:
st.subheader("Splitting Options")
model_choice = st.selectbox(
"Choose a Model", ["OpenAI", "Anthropic"], key="model_choice_selectbox")
max_tokens = st.number_input(
"Max number of tokens per chunk", min_value=100, value=8000, key="max_tokens_input")
clean_text = st.checkbox("Clean text before encoding and splitting?")
# Add prefix and postfix input options
prefix = st.text_area("Prefix for each chunk:", "")
postfix = st.text_area("Postfix for each chunk:", "")
if clean_text:
st.session_state.text_content = clean_text_content(
st.session_state.text_content)
chunks_generator = create_chunks(
st.session_state.text_content, max_tokens, model_choice)
chunks = [encoding_openAI.decode(chunk_tokens) if model_choice == "OpenAI" else encoding_anthropic.decode(
chunk_tokens) for chunk_tokens in chunks_generator]
for i, chunk in enumerate(chunks, 1):
# Add prefix and postfix to each chunk
chunk_with_affixes = f"{prefix}{chunk}{postfix}"
chunk_content = st.text_area(
f"Chunk {i} content:", chunk_with_affixes, height=200)
|