chat-with-docs / app.py
Kushwanth Chowday Kandala
text input sample
23247c4 unverified
raw
history blame
5.93 kB
import streamlit as st
import os
from streamlit_chat import message
import numpy as np
import pandas as pd
from io import StringIO
import PyPDF2
from tqdm import tqdm
import math
# import json
# st.config(PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION="python")
# from datasets import load_dataset
# dataset = load_dataset("wikipedia", "20220301.en", split="train[240000:250000]")
# wikidata = []
# for record in dataset:
# wikidata.append(record["text"])
# wikidata = list(set(wikidata))
# # print("\n".join(wikidata[:5]))
# # print(len(wikidata))
from sentence_transformers import SentenceTransformer
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
if device != 'cuda':
st.markdown(f"you are using {device}. This is much slower than using "
"a CUDA-enabled GPU. If on colab you can change this by "
"clicking Runtime > change runtime type > GPU.")
model = SentenceTransformer("all-MiniLM-L6-v2", device=device)
st.divider()
# Creating a Index(Pinecone Vector Database)
import os
# import pinecone
from pinecone.grpc import PineconeGRPC
PINECONE_API_KEY=os.getenv("PINECONE_API_KEY")
PINECONE_ENV=os.getenv("PINECONE_ENV")
PINECONE_ENVIRONMENT=os.getenv("PINECONE_ENVIRONMENT")
# pc = PineconeGRPC( api_key=os.environ.get("PINECONE_API_KEY") ) # Now do stuff if 'my_index' not in pc.list_indexes().names(): pc.create_index( name='my_index', dimension=1536, metric='euclidean', spec=ServerlessSpec( cloud='aws', region='us-west-2' ) )
def connect_pinecone():
pinecone = PineconeGRPC(api_key=PINECONE_API_KEY, environment=PINECONE_ENV)
# st.code(pinecone)
# st.divider()
# st.text(pinecone.list_indexes().names())
# st.divider()
# st.text(f"Succesfully connected to the pinecone")
return pinecone
def get_pinecone_semantic_index(pinecone):
index_name = "sematic-search"
# only create if it deosnot exists
if index_name not in pinecone.list_indexes().names():
pinecone.create_index(
name=index_name,
description="Semantic search",
dimension=model.get_sentence_embedding_dimension(),
metric="cosine",
spec=ServerlessSpec( cloud='gcp', region='us-central1' )
)
# now connect to index
index = pinecone.Index(index_name)
# st.text(f"Succesfully connected to the pinecone index")
return index
def chat_actions():
pinecone = connect_pinecone()
index = get_pinecone_semantic_index(pinecone)
st.session_state["chat_history"].append(
{"role": "user", "content": st.session_state["chat_input"]},
)
query_embedding = model.encode(st.session_state["chat_input"])
# create the query vector
query_vector = query_embedding.tolist()
# now query vector database
result = index.query(query_vector, top_k=5, include_metadata=True) # xc is a list of tuples
# Create a list of lists
data = []
i = 0
for res in result['matches']:
i = i + 1
data.append([f"{i}⭐", res['score'], res['metadata']['text']])
# Create a DataFrame from the list of lists
resdf = pd.DataFrame(data, columns=['TopRank', 'Score', 'Text'])
with st.sidebar:
st.markdown("*:red[semantic search results]* with **:green[Retrieval Augmented Generation]** ***(RAG)***.")
st.dataframe(resdf)
for res in result['matches']:
st.session_state["chat_history"].append(
{
"role": "assistant",
"content": f"{res['metadata']['text']}",
}, # This can be replaced with your chat response logic
)
break;
if "chat_history" not in st.session_state:
st.session_state["chat_history"] = []
st.chat_input("show me the contents of ML paper published on xxx with article no. xx?", on_submit=chat_actions, key="chat_input")
for i in st.session_state["chat_history"]:
with st.chat_message(name=i["role"]):
st.write(i["content"])
### Creating a Index(Pinecone Vector Database)
# %%writefile .env
# PINECONE_API_KEY=os.getenv("PINECONE_API_KEY")
# PINECONE_ENV=os.getenv("PINECONE_ENV")
# PINECONE_ENVIRONMENT=os.getenv("PINECONE_ENVIRONMENT")
# import os
# import pinecone
# from pinecone import Index, GRPCIndex
# pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENV)
# st.text(pinecone)
def print_out(pages):
for i in range(len(pages)):
text = pages[i].extract_text().strip()
st.write(f"Page {i} : {text}")
def combine_text(pages):
concatenates_text = ""
for page in tqdm(pages):
text = page.extract_text().strip()
concatenates_text += text
bytesize = concatenates_text.encode("utf-8")
p = math.pow(1024, 2)
mbsize = round(len(bytesize) / p, 2)
st.write(f"There are {len(concatenates_text)} characters in the pdf with {mbsize}MB size")
with st.sidebar:
st.markdown("""
***Follow this steps***
- upload pdf file to train the model on your own docs
- wait see success message on train completion
- Takes couple of mins after upload the pdf
- Now Chat with model to get the summarized info or Generative reponse
""")
uploaded_files = st.file_uploader('Choose your .pdf file', type="pdf", accept_multiple_files=True)
for uploaded_file in uploaded_files:
# To read file as bytes:
# bytes_data = uploaded_file.getvalue()
# st.write(bytes_data)
# To convert to a string based IO:
# stringio = StringIO(uploaded_file.getvalue().decode("utf-8"))
# st.write(stringio)
# To read file as string:
# string_data = stringio.read()
# st.write(string_data)
# Can be used wherever a "file-like" object is accepted:
# dataframe = pd.read_csv(uploaded_file)
# st.write(dataframe)
reader = PyPDF2.PdfReader(uploaded_file)
pages = reader.pages
print_out(pages)
combine_text(pages)