Spaces:
Running
Running
import gradio as gr | |
import numpy as np | |
import pdfplumber | |
from sklearn.metrics.pairwise import cosine_similarity | |
from transformers import AutoTokenizer, AutoModel | |
import torch | |
from groq import Groq | |
import os | |
from fpdf import FPDF | |
import PyPDF2 | |
from dotenv import load_dotenv | |
import pickle | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain.prompts import PromptTemplate | |
from langchain_together import Together | |
load_dotenv() | |
def extract_text_from_pdf(pdf_path): | |
try: | |
with pdfplumber.open(pdf_path) as pdf: | |
text = "\n".join(page.extract_text() for page in pdf.pages if page.extract_text()) | |
return text | |
except Exception as e: | |
return f"Error extracting text: {str(e)}" | |
def get_huggingface_embeddings(text_chunks): | |
model_name = "sentence-transformers/all-MiniLM-L6-v2" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModel.from_pretrained(model_name) | |
inputs = tokenizer(text_chunks, return_tensors="pt", padding=True, truncation=True) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
embeddings = outputs.last_hidden_state.mean(dim=1).numpy() | |
return embeddings | |
def compute_similarity(embeddings1, embeddings2): | |
return cosine_similarity(embeddings1, embeddings2) | |
def generate_comparison_summary(similarity_score, similar_terms): | |
summary = f"Similarity Score: {similarity_score:.2f}%\n" | |
summary += "Important terms/phrases:\n" | |
summary += "\n".join(similar_terms) | |
summary += "\nThis comparison highlights the key related points between the documents." | |
return summary | |
def generate_pdf_report(similarity_score, similar_terms): | |
pdf = FPDF() | |
pdf.set_auto_page_break(auto=True, margin=15) | |
pdf.add_page() | |
pdf.set_font("Arial", style='B', size=16) | |
pdf.cell(200, 10, "PDF Similarity Report", ln=True, align='C') | |
pdf.ln(10) | |
pdf.set_font("Arial", size=12) | |
pdf.multi_cell(0, 10, f"Similarity Score: {similarity_score:.2f}%\n\n") | |
pdf.multi_cell(0, 10, "Important terms/phrases:") | |
for term in similar_terms: | |
pdf.multi_cell(0, 8, f"- {term}") | |
pdf.multi_cell(0, 10, "\nThis comparison highlights the key related points between the documents.") | |
pdf_path = "similarity_report.pdf" | |
pdf.output(pdf_path) | |
return pdf_path | |
def compare_documents(file1, file2): | |
try: | |
file1_path = file1.name | |
file2_path = file2.name | |
text1 = extract_text_from_pdf(file1_path) | |
text2 = extract_text_from_pdf(file2_path) | |
chunks1 = text1.split(". ") | |
chunks2 = text2.split(". ") | |
embeddings1 = get_huggingface_embeddings(chunks1) | |
embeddings2 = get_huggingface_embeddings(chunks2) | |
similarity_scores = compute_similarity(embeddings1, embeddings2) | |
overall_similarity = np.mean(similarity_scores) * 100 | |
groq_api_key = os.getenv("API_KEY") | |
if groq_api_key: | |
client = Groq(api_key=groq_api_key) | |
response = client.chat.completions.create( | |
model="llama-3.3-70b-versatile", | |
messages=[ | |
{"role": "system", | |
"content": "You are a helpful assistant. Only give required and should not give outside context and dont give groq context or anything like that. Stick to work."}, | |
{"role": "user", | |
"content": f"Here is the text of two documents. Extract only the important terms or phrases from both, filtering out common terms like 'court', 'judge', etc.\n\nDocument 1:\n{text1}\n\nDocument 2:\n{text2}"} | |
] | |
) | |
groq_response = response.choices[0].message.content | |
similar_terms = groq_response.split('\n') | |
else: | |
groq_response = "API key not found. Skipping further analysis." | |
similar_terms = ["Error retrieving important terms."] | |
comparison_summary = generate_comparison_summary(overall_similarity, similar_terms) | |
pdf_report_path = generate_pdf_report(overall_similarity, similar_terms) | |
return comparison_summary, pdf_report_path | |
except Exception as e: | |
return f"Error: {str(e)}", None | |
def extract_text_from_pdf_translator(file): | |
reader = PyPDF2.PdfReader(file) | |
text = "" | |
for page in reader.pages: | |
text += page.extract_text() | |
return text | |
def split_text_into_chunks(text, chunk_size=1000): | |
chunks = [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] | |
return chunks | |
def translate_text_with_groq(text, target_language): | |
api_key = os.getenv("API_KEY") | |
if not api_key: | |
raise ValueError("API key is missing. Please set the 'API_KEY' environment variable.") | |
client = Groq(api_key=api_key) | |
prompt_text = f"Translate the following text to {target_language}:\n{text}\n\nConvert to {target_language}" | |
try: | |
completion = client.chat.completions.create( | |
model="llama-3.3-70b-versatile", | |
messages=[{ | |
"role": "user", | |
"content": prompt_text | |
}], | |
temperature=1, | |
max_tokens=1024, | |
top_p=1, | |
stream=True, | |
stop=None, | |
) | |
translated_text = "" | |
for chunk in completion: | |
translated_text += chunk.choices[0].delta.content or "" | |
return translated_text | |
except Exception as e: | |
return f"Translation error: {str(e)}. Please try again later." | |
def get_font_for_language(language): | |
font_map = { | |
"Tamil": "NotoSansTamil-Regular.ttf", | |
"Telugu": "NotoSansTelugu-Regular.ttf", | |
"Malayalam": "NotoSansMalayalam-Regular.ttf", | |
"Hindi": "NotoSansDevanagari-Regular.ttf" | |
} | |
return font_map.get(language, "DejaVuSans.ttf") | |
def save_translated_text_as_pdf(translated_text, target_language, output_filename="translated_text.pdf"): | |
pdf = FPDF() | |
pdf.add_page() | |
font_path = get_font_for_language(target_language) | |
try: | |
pdf.add_font("CustomFont", "", font_path, uni=True) | |
pdf.set_font("CustomFont", size=12) | |
except Exception as e: | |
return f"Error loading font: {e}" | |
pdf.set_auto_page_break(auto=True, margin=15) | |
pdf.set_left_margin(15) | |
pdf.set_right_margin(15) | |
width = pdf.w - 2 * pdf.l_margin | |
pdf.multi_cell(width, 10, translated_text, align='L') | |
pdf.output(output_filename) | |
return output_filename | |
def process_pdf_and_translate(file, target_language): | |
text = extract_text_from_pdf_translator(file) | |
chunks = split_text_into_chunks(text) | |
translated_text = "" | |
for chunk in chunks: | |
translated_text += translate_text_with_groq(chunk, target_language) | |
pdf_file = save_translated_text_as_pdf(translated_text, target_language) | |
return translated_text, pdf_file | |
def extract_text_from_pdf_summarizer(file): | |
reader = PyPDF2.PdfReader(file) | |
text = "" | |
for page in reader.pages: | |
text += page.extract_text() | |
return text | |
def trim_text_for_groq(text, limit=3000): | |
if len(text) > 2 * limit: | |
return text[:limit] + '\n...\n' + text[-limit:] | |
return text | |
def summarize_text_with_groq(text): | |
api_key = os.getenv("API_KEY") | |
if not api_key: | |
raise ValueError("API key is missing. Please set the 'API_KEY' environment variable.") | |
client = Groq(api_key=api_key) | |
completion = client.chat.completions.create( | |
model="llama-3.3-70b-versatile", | |
messages=[ | |
{ | |
"role": "user", | |
"content": text | |
} | |
], | |
temperature=1, | |
max_tokens=1024, | |
top_p=1, | |
stream=True, | |
stop=None, | |
) | |
summary = "" | |
for chunk in completion: | |
summary += chunk.choices[0].delta.content or "" | |
return summary | |
def save_summary_as_pdf(summary, output_filename="summary.pdf"): | |
pdf = FPDF() | |
pdf.add_page() | |
pdf.set_auto_page_break(auto=True, margin=15) | |
pdf.set_font("Arial", size=12) | |
pdf.multi_cell(0, 10, summary) | |
pdf.output(output_filename) | |
return output_filename | |
def process_pdf_and_summarize(file): | |
text = extract_text_from_pdf_summarizer(file) | |
trimmed_text = trim_text_for_groq(text) | |
summary = summarize_text_with_groq(trimmed_text) | |
pdf_file = save_summary_as_pdf(summary) | |
return summary, pdf_file | |
def load_embeddings(): | |
try: | |
embeddings = HuggingFaceEmbeddings( | |
model_name="nomic-ai/nomic-embed-text-v1", | |
model_kwargs={"trust_remote_code": True, "revision": "289f532e14dbbbd5a04753fa58739e9ba766f3c7"} | |
) | |
print("Embeddings loaded successfully.") | |
return embeddings | |
except Exception as e: | |
raise RuntimeError(f"Error loading embeddings: {e}") | |
embeddings = load_embeddings() | |
def load_db(): | |
try: | |
db = FAISS.load_local("law_vector_db", embeddings, allow_dangerous_deserialization=True) | |
print(f"FAISS index loaded successfully.") | |
with open('law_vector_db/index.pkl', 'rb') as pkl_file: | |
metadata = pickle.load(pkl_file) | |
print("Pickle file loaded successfully.") | |
return db, metadata | |
except Exception as e: | |
raise RuntimeError(f"Error loading FAISS index or pickle file: {e}") | |
db, metadata = load_db() | |
db_retriever = db.as_retriever(search_type="similarity", search_kwargs={"k": 4}) | |
prompt_template = """ | |
<s>[INST]This is a chat template and As a legal chatbot specializing in Indian Penal Code queries, your primary objective is to provide accurate and concise information based on the user's questions. | |
Do not generate your own questions and answers. You will adhere strictly to the instructions provided, offering relevant context from the knowledge base while avoiding unnecessary details. | |
Your responses will be brief, to the point, and in compliance with the established format. | |
If a question falls outside the given context, you will refrain from utilizing the chat history and instead rely on your own knowledge base to generate an appropriate response. | |
You will prioritize the user's query and refrain from posing additional questions. | |
The aim is to deliver professional, precise, and contextually relevant information pertaining to the Indian Penal Code. | |
CONTEXT: {context} | |
CHAT HISTORY: {chat_history} | |
QUESTION: {question} | |
ANSWER:</s>[INST] | |
""" | |
prompt = PromptTemplate(template=prompt_template, input_variables=['context', 'question', 'chat_history']) | |
TOGETHER_AI_API = os.getenv("T_API_KEY") | |
llm = Together( | |
model="mistralai/Mistral-7B-Instruct-v0.2", | |
temperature=0.5, | |
max_tokens=1024, | |
together_api_key=TOGETHER_AI_API | |
) | |
def ask_question(user_question, chat_history=[]): | |
try: | |
context_docs = db_retriever.get_relevant_documents(user_question) | |
context = "\n".join( | |
[doc.page_content for doc in context_docs]) if context_docs else "No relevant context found." | |
input_data = { | |
"context": context, | |
"question": user_question, | |
"chat_history": "\n".join(chat_history) | |
} | |
response = llm(prompt.format(**input_data)) | |
return response | |
except Exception as e: | |
return f"Error: {e}" | |
def chat_bot_interface(user_message, chat_history=[]): | |
if not user_message: | |
return chat_history, chat_history | |
chat_history.append(("User", user_message)) | |
response = ask_question(user_message, [msg[1] for msg in chat_history if msg[0] == "User"]) | |
chat_history.append(("Assistant", response)) | |
return chat_history, chat_history | |
with gr.Blocks() as demo: | |
with gr.Tabs(): | |
with gr.Tab("Summarizer"): | |
gr.Markdown("## PDF Summarizer") | |
with gr.Row(): | |
with gr.Column(): | |
pdf_input_summary = gr.File(label="Upload PDF", file_types=[".pdf"]) | |
summarize_button = gr.Button("Summarize") | |
with gr.Column(): | |
summary_output = gr.Textbox(label="Summary", lines=10, interactive=False) | |
download_link_summary = gr.File(label="Download Summary as PDF", interactive=False) | |
summarize_button.click(process_pdf_and_summarize, inputs=[pdf_input_summary], outputs=[summary_output, download_link_summary]) | |
with gr.Tab("Translator"): | |
gr.Markdown("## Document Translation using Groq") | |
with gr.Row(): | |
with gr.Column(): | |
pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"]) | |
language_input = gr.Dropdown( | |
label="Select Target Language", | |
choices=["Tamil", "Malayalam", "Telugu", "Hindi"], | |
value="Tamil" | |
) | |
translate_button = gr.Button("Translate") | |
with gr.Column(): | |
translated_output = gr.Textbox(label="Translated Text", lines=10, interactive=False) | |
download_link = gr.File(label="Download Translated PDF", interactive=False) | |
translate_button.click(process_pdf_and_translate, inputs=[pdf_input, language_input], outputs=[translated_output, download_link]) | |
with gr.Tab("PDF Similarity Checker"): | |
gr.Markdown("## PDF Similarity Checker") | |
with gr.Row(): | |
with gr.Column(): | |
file1 = gr.File(label="Upload PDF 1") | |
file2 = gr.File(label="Upload PDF 2") | |
compare_button = gr.Button("Compare") | |
with gr.Column(): | |
result = gr.Textbox(label="Results") | |
report = gr.File(label="Download Report") | |
compare_button.click(compare_documents, inputs=[file1, file2], outputs=[result, report]) | |
with gr.Tab("Law Chatbot"): | |
gr.Markdown("<h1 style='text-align: center;'>Legal Chatbot</h1>") | |
chatbot = gr.Chatbot(label="Chatbot Interface") | |
user_input = gr.Textbox(label="Ask a Question", placeholder="Type your question here...", lines=1) | |
clear_button = gr.Button("Clear") | |
chat_history = gr.State([]) | |
def clear_chat(): | |
return [], [] | |
user_input.submit(chat_bot_interface, inputs=[user_input, chat_history], outputs=[chatbot, chat_history]) | |
clear_button.click(clear_chat, outputs=[chatbot, chat_history]) | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch() |