Spaces:
Running
Running
import streamlit as st | |
import os | |
from streamlit_chat import message | |
import numpy as np | |
import pandas as pd | |
from io import StringIO | |
import io | |
import PyPDF2 | |
import pymupdf | |
import tempfile | |
import base64 | |
# from tqdm.auto import tqdm | |
import math | |
# from transformers import pipeline | |
from collections import Counter | |
import nltk | |
nltk.download('stopwords') | |
from nltk.corpus import stopwords | |
import re | |
from streamlit_image_zoom import image_zoom | |
from PIL import Image | |
from sentence_transformers import SentenceTransformer | |
import torch | |
from langchain_community.llms.ollama import Ollama | |
from langchain.prompts import ChatPromptTemplate | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.llms import HuggingFaceHub | |
# from langchain.vectorstores import faiss | |
# from langchain.vectorstores import FAISS | |
import time | |
from time import sleep | |
from stqdm import stqdm | |
from dotenv import load_dotenv | |
# Load environment variables from .env file | |
load_dotenv() | |
HUGGINGFACEHUB_API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
# if device != 'cuda': | |
# st.markdown(f"you are using {device}. This is much slower than using " | |
# "a CUDA-enabled GPU. If on colab you can change this by " | |
# "clicking Runtime > change runtime type > GPU.") | |
st.set_page_config(page_title="Vedic Scriptures",page_icon='π') | |
model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2", device=device) | |
def display_title(): | |
selected_value = st.session_state["value"] | |
st.header(f'Vedic Scriptures: {selected_value} :blue[book] :books:') | |
question = "ask anything about scriptures" | |
def open_chat(): | |
question = st.session_state["faq"] | |
if "value" not in st.session_state: | |
st.session_state["value"] = None | |
if "faq" not in st.session_state: | |
st.session_state["faq"] = None | |
url1 = "https://vedabase.io/en/library/bg/" | |
url2 = "https://docs.google.com/file/d/0B5WZMlc4xl-8NThSSDJnTmE5N2M/view?resourcekey=0-CupZPMHFLx-54g_UDTOTYA" | |
st.write("ππ» :rainbow[slide to ask bhagvatgeetha questions]") | |
st.write("choose FAQ or ask your own doubts") | |
st.markdown(":rainbow[checkout source reference]: :blue-background[ISKCON] [1](%s), [2](%s) — :tulip::cherry_blossom::rose::hibiscus::sunflower::blossom:" % (url1, url2)) | |
# st.divider() | |
def upload_file(): | |
uploaded_file = st.file_uploader("Upload a file", type=["pdf"]) | |
if uploaded_file is not None: | |
st.write(uploaded_file.name) | |
return uploaded_file.name | |
def create_pickle_file(filepath): | |
from langchain_community.document_loaders import PyMuPDFLoader | |
loader = PyMuPDFLoader(filepath) | |
pages = loader.load() | |
# Load a pre-trained sentence transformer model | |
model_name = "sentence-transformers/all-mpnet-base-v2" | |
model_kwargs = {'device': 'cpu'} | |
encode_kwargs = {'normalize_embeddings': False} | |
# Create a HuggingFaceEmbeddings object | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
embeddings = HuggingFaceEmbeddings(model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs) | |
# from pathlib import Path | |
# path = Path(filepath) | |
filename = filepath.split(".") | |
print(filename[0]) | |
filename = filename[0] | |
from datetime import datetime | |
# Get current date and time | |
now = datetime.now() | |
# Format as string with milliseconds | |
formatted_datetime = now.strftime("%Y-%m-%d_%H:%M:%S.%f")[:-3] | |
print(formatted_datetime) | |
# Create FAISS index with the HuggingFace embeddings | |
faiss_index = FAISS.from_documents(pages, embeddings) | |
with open(f"./{filename}_{formatted_datetime}.pkl", "wb") as f: | |
pickle.dump(faiss_index, f) | |
# uploaded_file_name = upload_file() | |
# if uploaded_file_name is not None: | |
# create_pickle_file(uploaded_file_name) | |
def highlight_pdf(file_path, text_to_highlight, page_numbers): | |
# Open the original PDF | |
doc = pymupdf.open(file_path) | |
pages_to_display = [doc.load_page(page_number - 1) for page_number in page_numbers] | |
# Tokenize the text into words | |
words = text_to_highlight.split() | |
# Remove stopwords | |
stop_words = set(stopwords.words("english")) | |
words = [word for word in words if word.lower() not in stop_words] | |
# Highlight the specified words on the canvas | |
for page in pages_to_display: | |
for word in words: | |
highlight_rects = page.search_for(word, quads=True) | |
for rect in highlight_rects: | |
page.add_highlight_annot(rect) | |
# Create a new document with only the specified pages | |
new_doc = pymupdf.open() | |
new_page_numbers = [] | |
for page in pages_to_display: | |
new_doc.insert_pdf(doc, from_page=page.number, to_page=page.number) | |
new_page_numbers.append(new_doc.page_count) # Keep track of new page numbers | |
# Save the modified PDF to a temporary file | |
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file: | |
temp_pdf_path = temp_file.name | |
new_doc.save(temp_file.name) | |
new_doc.save("example_highlighted.pdf") | |
return temp_pdf_path, new_page_numbers | |
file_path = "Bhagavad-Gita-As-It-Is.pdf" | |
text_to_highlight = "" | |
sources = [] | |
def pdf_to_images(pdf_path, page_numbers): | |
doc = pymupdf.open(pdf_path) | |
images = [] | |
for page_number in page_numbers: | |
page = doc.load_page(page_number - 1) | |
pix = page.get_pixmap() | |
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
images.append(img) | |
return images | |
# Function to display PDF in Streamlit | |
def display_highlighted_pdf(file_path, text_to_highlight, sources): | |
# pdf_path = "../Transformers/Bhagavad-Gita-As-It-Is.pdf" | |
# sources = [7,8] | |
# response_text = "I offer my respectful obeisances unto the lotus feet of my spiritual master and unto the feet of all VainΜeΜavas. I offer my respectful" | |
highlighted_pdf_path, new_page_numbers = highlight_pdf(file_path=file_path, text_to_highlight=text_to_highlight, page_numbers=sources) | |
images = pdf_to_images(highlighted_pdf_path, new_page_numbers) | |
# Calculate the number of rows and columns based on the number of pages | |
num_pages = len(new_page_numbers) | |
num_cols = 2 # Number of columns | |
num_rows = (num_pages + num_cols - 1) // num_cols # Number of rows | |
# Display images in a grid layout with spacing | |
for row in range(num_rows): | |
cols = st.columns(num_cols) | |
for col in range(num_cols): | |
idx = row * num_cols + col | |
if idx < num_pages: | |
img = images[idx] | |
if isinstance(img, Image.Image): | |
with cols[col]: | |
st.image(img, use_column_width=True) | |
st.write("") # Add spacing | |
else: | |
st.error("The provided image is not a valid Pillow Image object.") | |
# Creating a Index(Pinecone Vector Database) | |
import os | |
# import pinecone | |
import pickle | |
def get_faiss_semantic_index(): | |
try: | |
index_path = "./HuggingFaceEmbeddings.pkl" | |
print(index_path) | |
# Load embeddings from the pickle file | |
for _ in stqdm(range(5)): | |
with open(index_path, "rb") as f: | |
faiss_index = pickle.load(f) | |
sleep(0.1) | |
# st.write("Embeddings loaded successfully.") | |
return faiss_index | |
except Exception as e: | |
st.error(f"Error loading embeddings: {e}") | |
return None | |
faiss_index = get_faiss_semantic_index() | |
print(faiss_index) | |
# def promt_engineer(text): | |
PROMPT_TEMPLATE = """ | |
Instructions: | |
------------------------------------------------------------------------------------------------------------------------------- | |
Answer the question only based on the below context: | |
- You're a Vedic AI expert in the Hindu Vedic scriptures. | |
- Questions with out-of-context replay with The question is out of context. | |
- Always try to provide Keep it simple answers in nice format without incomplete sentence. | |
- Give the answer atleast 5 seperate lines addition to the title info. | |
- Only If question is relevent to context provide Title: <title> Chapter: <chapter> Text No: <textnumber> Page No: <pagenumber> | |
------------------------------------------------------------------------------------------------------------------------------- | |
{context} | |
------------------------------------------------------------------------------------------------------------------------------- | |
Answer the question based on the above context: {question} | |
""" | |
# # Load the summarization pipeline with the specified model | |
# summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
# # Generate the prompt | |
# prompt = prompt_template.format(text=text) | |
# # Generate the summary | |
# summary = summarizer(prompt, max_length=1024, min_length=50)[0]["summary_text"] | |
# with st.sidebar: | |
# st.divider() | |
# st.markdown("*:red[Text Summary Generation]* from above Top 5 **:green[similarity search results]**.") | |
# st.write(summary) | |
# st.divider() | |
def chat_actions(): | |
st.session_state["chat_history"].append( | |
{"role": "user", "content": st.session_state["chat_input"]}, | |
) | |
# query_embedding = model.encode(st.session_state["chat_input"]) | |
query = st.session_state["chat_input"] | |
if faiss_index is not None: | |
docs = faiss_index.similarity_search(query, k=6) | |
else: | |
st.error("Failed to load embeddings.") | |
# docs = faiss_index.similarity_search(query, k=2) | |
for doc in docs: | |
print("\n") | |
print(str(doc.metadata["page"]+1) + ":", doc.page_content) | |
context_text = "\n\n---\n\n".join([doc.page_content for doc in docs]) | |
sources = [doc.metadata.get("page", None) for doc in docs] | |
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE) | |
prompt = prompt_template.format(context=context_text, question=query) | |
response_text = "" | |
result = "" | |
try: | |
llm = HuggingFaceHub( | |
repo_id="meta-llama/Meta-Llama-3-8B-Instruct", model_kwargs={"temperature": 0.1, "max_new_tokens": 256, "task":"text-generation"} | |
) | |
response_text = llm.invoke(prompt) | |
escaped_query = re.escape(query) | |
result = re.split(f'Answer the question based on the above context: {escaped_query}\n',response_text)[-1] | |
st.write(result) | |
except Exception as e: | |
st.error(f"Error invoke: {e}") | |
formatted_response = f"Response: {result}\nSources: {sources}" | |
print(formatted_response) | |
st.session_state["chat_history"].append( | |
{ | |
"role": "assistant", | |
"content": f"{result}", | |
}, # This can be replaced with your chat response logic | |
) | |
# break; | |
# Example usage | |
file_path = "Bhagavad-Gita-As-It-Is.pdf" | |
text_to_highlight = context_text.strip() | |
if "out of context" not in result: | |
st.success("We found some helpful pages related to your question. Please refer to the highlighted sections below.") | |
display_highlighted_pdf(file_path, result, sources) | |
else: | |
st.error("Unfortunately, the question is out of context, and we couldn't find relevant pages for you.") | |
with st.sidebar: | |
option = st.selectbox( | |
"Select Your Favorite Scriptures", | |
("Bhagvatgeetha", "Bhagavatham", "Ramayanam"), | |
# index=None, | |
# placeholder="Select scriptures...", | |
key="value", | |
on_change=display_title | |
) | |
st.write("You selected:", option) | |
faq = st.selectbox( | |
"Check FAQ'S", | |
("what is jeevathma and paramathma?", | |
"who am I?", | |
"who are you?", | |
"what is this book all about?", | |
"who is supreme god head and why?", | |
"what is the most spoken topic by Krishna?", | |
"What Krishna says to Arjuna?", | |
"What are the key points from Krishna?", | |
"Why does atheism exist even when all questions are answered in BhagavadGita?", | |
"Why donβt all souls surrender to Lord Krishna, although he has demonstrated that everyone is part and parcel of Him, and all can be liberated from all sufferings by surrendering to Him?", | |
"Why do souls misuse their independence by rebelling against Lord Krishna?", | |
"How do I put an end to my suffering in this world?", | |
"what is the reason behind Krishna decided to go far battle?"), | |
# index=None, | |
# placeholder="Select scriptures...", | |
key="faq", | |
on_change=open_chat | |
) | |
st.write("You selected:", faq) | |
st.write("Copy FAQ or ask your Query belowππ»") | |
if "chat_history" not in st.session_state: | |
st.session_state["chat_history"] = [] | |
st.chat_input(question, on_submit=chat_actions, key="chat_input") | |
st.write(":rainbow[side to read script] ππ»") | |
# for i in st.session_state["chat_history"]: | |
# with st.chat_message(name=i["role"]): | |
# st.write(i["content"]) | |