Spaces:
Running
Running
import os | |
import re | |
import tempfile | |
import requests | |
import gradio as gr | |
print(f"Gradio version: {gr.__version__}") | |
from PyPDF2 import PdfReader | |
import fitz # pymupdf | |
import logging | |
import webbrowser | |
from huggingface_hub import InferenceClient | |
from typing import Dict, List, Optional, Tuple | |
import time | |
from groq import Groq # Import the Groq client | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Constants | |
CONTEXT_SIZES = { | |
"4K": 4096, | |
"8K": 8192, | |
"32K": 32768, | |
"64K": 65536, | |
"128K": 131072 | |
} | |
MODEL_CONTEXT_SIZES = { | |
"Clipboard only": 4096, | |
"OpenAI ChatGPT": { | |
"gpt-3.5-turbo": 16385, | |
"gpt-3.5-turbo-0125": 16385, | |
"gpt-3.5-turbo-1106": 16385, | |
"gpt-3.5-turbo-instruct": 4096, | |
"gpt-4": 8192, | |
"gpt-4-0314": 8192, | |
"gpt-4-0613": 8192, | |
"gpt-4-turbo": 128000, | |
"gpt-4-turbo-2024-04-09": 128000, | |
"gpt-4-turbo-preview": 128000, | |
"gpt-4-0125-preview": 128000, | |
"gpt-4-1106-preview": 128000, | |
"gpt-4o": 128000, | |
"gpt-4o-2024-11-20": 128000, | |
"gpt-4o-2024-08-06": 128000, | |
"gpt-4o-2024-05-13": 128000, | |
"chatgpt-4o-latest": 128000, | |
"gpt-4o-mini": 128000, | |
"gpt-4o-mini-2024-07-18": 128000, | |
"gpt-4o-realtime-preview": 128000, | |
"gpt-4o-realtime-preview-2024-10-01": 128000, | |
"gpt-4o-audio-preview": 128000, | |
"gpt-4o-audio-preview-2024-10-01": 128000, | |
"o1-preview": 128000, | |
"o1-preview-2024-09-12": 128000, | |
"o1-mini": 128000, | |
"o1-mini-2024-09-12": 128000, | |
}, | |
"HuggingFace Inference": { | |
"microsoft/phi-3-mini-4k-instruct": 4096, | |
"microsoft/Phi-3-mini-128k-instruct": 131072, # Added Phi-3 128k | |
"HuggingFaceH4/zephyr-7b-beta": 8192, | |
"deepseek-ai/DeepSeek-Coder-V2-Instruct": 8192, | |
"meta-llama/Llama-3-8b-Instruct": 8192, | |
"mistralai/Mistral-7B-Instruct-v0.3": 32768, | |
"NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO": 32768, | |
"microsoft/Phi-3.5-mini-instruct": 4096, | |
"HuggingFaceTB/SmolLM2-1.7B-Instruct": 2048, | |
"google/gemma-2-2b-it": 2048, | |
"openai-community/gpt2": 1024, | |
"microsoft/phi-2": 2048, | |
"TinyLlama/TinyLlama-1.1B-Chat-v1.0": 2048 | |
}, | |
"Groq API": { | |
"gemma2-9b-it": 8192, | |
"gemma-7b-it": 8192, | |
"llama-3.3-70b-versatile": 131072, | |
"llama-3.1-70b-versatile": 131072, # Deprecated | |
"llama-3.1-8b-instant": 131072, | |
"llama-guard-3-8b": 8192, | |
"llama3-70b-8192": 8192, | |
"llama3-8b-8192": 8192, | |
"mixtral-8x7b-32768": 32768, | |
"llama3-groq-70b-8192-tool-use-preview": 8192, | |
"llama3-groq-8b-8192-tool-use-preview": 8192, | |
"llama-3.3-70b-specdec": 131072, | |
"llama-3.1-70b-specdec": 131072, | |
"llama-3.2-1b-preview": 131072, | |
"llama-3.2-3b-preview": 131072, | |
}, | |
"Cohere API": { | |
"command-r-plus-08-2024": 131072, # 128k | |
"command-r-plus-04-2024": 131072, | |
"command-r-plus": 131072, | |
"command-r-08-2024": 131072, | |
"command-r-03-2024": 131072, | |
"command-r": 131072, | |
"command": 4096, | |
"command-nightly": 131072, | |
"command-light": 4096, | |
"command-light-nightly": 4096, | |
"c4ai-aya-expanse-8b": 8192, | |
"c4ai-aya-expanse-32b": 131072, | |
} | |
} | |
class ModelRegistry: | |
def __init__(self): | |
# HuggingFace Models | |
self.hf_models = { | |
"Phi-3 Mini 4K": "microsoft/phi-3-mini-4k-instruct", | |
"Phi-3 Mini 128k": "microsoft/Phi-3-mini-128k-instruct", # Added | |
"Zephyr 7B Beta": "HuggingFaceH4/zephyr-7b-beta", | |
"DeepSeek Coder V2": "deepseek-ai/DeepSeek-Coder-V2-Instruct", | |
"Meta Llama 3.1 8B": "meta-llama/Llama-3-8b-Instruct", | |
"Meta Llama 3.1 70B": "meta-llama/Meta-Llama-3.1-70B-Instruct", | |
"Mixtral 7B": "mistralai/Mistral-7B-Instruct-v0.3", | |
"Nous-Hermes": "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", | |
"Cohere Command R+": "CohereForAI/c4ai-command-r-plus", | |
"Aya 23-35B": "CohereForAI/aya-23-35B", | |
"Phi-3.5 Mini": "microsoft/Phi-3.5-mini-instruct", # Added | |
"SmolLM2 1.7B": "HuggingFaceTB/SmolLM2-1.7B-Instruct", # Added | |
"Gemma 2 2B": "google/gemma-2-2b-it", # Added | |
"GPT2": "openai-community/gpt2", # Added | |
"Phi-2": "microsoft/phi-2", # Added | |
"TinyLlama 1.1B": "TinyLlama/TinyLlama-1.1B-Chat-v1.0", # Added | |
"Custom Model": "" # Keep for custom models | |
} | |
# Default Groq Models | |
self.default_groq_models = { # Keep defaults in case fetching fails | |
"gemma2-9b-it": "gemma2-9b-it", | |
"gemma-7b-it": "gemma-7b-it", | |
"llama-3.3-70b-versatile": "llama-3.3-70b-versatile", | |
"llama-3.1-70b-versatile": "llama-3.1-70b-versatile", # Deprecated | |
"llama-3.1-8b-instant": "llama-3.1-8b-instant", | |
"llama-guard-3-8b": "llama-guard-3-8b", | |
"llama3-70b-8192": "llama3-70b-8192", | |
"llama3-8b-8192": "llama3-8b-8192", | |
"mixtral-8x7b-32768": "mixtral-8x7b-32768", | |
"llama3-groq-70b-8192-tool-use-preview": "llama3-groq-70b-8192-tool-use-preview", | |
"llama3-groq-8b-8192-tool-use-preview": "llama3-groq-8b-8192-tool-use-preview", | |
"llama-3.3-70b-specdec": "llama-3.3-70b-specdec", | |
"llama-3.1-70b-specdec": "llama-3.1-70b-specdec", | |
"llama-3.2-1b-preview": "llama-3.2-1b-preview", | |
"llama-3.2-3b-preview": "llama-3.2-3b-preview", | |
} | |
self.groq_models = self._fetch_groq_models() | |
def _fetch_groq_models(self) -> Dict[str, str]: | |
"""Fetch available Groq models with proper error handling""" | |
try: | |
groq_api_key = os.getenv('GROQ_API_KEY') | |
if not groq_api_key: | |
logging.warning("No GROQ_API_KEY found in environment") | |
return self.default_groq_models | |
headers = { | |
"Authorization": f"Bearer {groq_api_key}", | |
"Content-Type": "application/json" | |
} | |
response = requests.get( | |
"https://api.groq.com/openai/v1/models", | |
headers=headers, | |
timeout=10 | |
) | |
if response.status_code == 200: | |
models = response.json().get("data", []) | |
model_dict = {model["id"]: model["id"] for model in models} | |
# Merge with defaults to ensure all models are available | |
return {**self.default_groq_models, **model_dict} | |
else: | |
logging.error(f"Failed to fetch Groq models: {response.status_code}") | |
return self.default_groq_models | |
except requests.exceptions.Timeout: | |
logging.error("Timeout while fetching Groq models") | |
return self.default_groq_models | |
except Exception as e: | |
logging.error(f"Error fetching Groq models: {e}") | |
return self.default_groq_models | |
def _get_default_groq_models(self) -> Dict[str, str]: | |
"""Return default Groq models""" | |
return self.default_groq_models | |
def refresh_groq_models(self) -> Dict[str, str]: | |
"""Refresh the list of available Groq models""" | |
self.groq_models = self._fetch_groq_models() | |
return self.groq_models | |
class PDFProcessor: | |
"""Handles PDF conversion to text and markdown using different methods""" | |
def txt_convert(pdf_path: str) -> str: | |
"""Basic text extraction using PyPDF2""" | |
try: | |
reader = PdfReader(pdf_path) | |
text = "" | |
for page_num, page in enumerate(reader.pages, start=1): | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
else: | |
logging.warning(f"No text found on page {page_num}.") | |
return text | |
except Exception as e: | |
logging.error(f"Error in txt conversion: {e}") | |
return f"Error: {str(e)}" | |
def md_convert_with_pymupdf(pdf_path: str) -> str: | |
"""Convert PDF to Markdown using pymupdf""" | |
try: | |
doc = fitz.open(pdf_path) | |
markdown_text = [] | |
for page in doc: | |
blocks = page.get_text("dict")["blocks"] | |
for block in blocks: | |
if "lines" in block: | |
for line in block["lines"]: | |
for span in line["spans"]: | |
font_size = span["size"] | |
content = span["text"] | |
font_flags = span["flags"] # Contains bold, italic info | |
# Handle headers based on font size | |
if font_size > 20: | |
markdown_text.append(f"# {content}\n") | |
elif font_size > 16: | |
markdown_text.append(f"## {content}\n") | |
elif font_size > 14: | |
markdown_text.append(f"### {content}\n") | |
else: | |
# Handle bold and italic | |
if font_flags & 2**4: # Bold | |
content = f"**{content}**" | |
if font_flags & 2**1: # Italic | |
content = f"*{content}*" | |
markdown_text.append(content) | |
markdown_text.append(" ") # Space between spans | |
markdown_text.append("\n") # Newline between lines | |
# Add extra newline between blocks for paragraphs | |
markdown_text.append("\n") | |
doc.close() | |
return "".join(markdown_text) | |
except Exception as e: | |
logging.error(f"Error in pymupdf conversion: {e}") | |
return f"Error: {str(e)}" | |
# Initialize model registry | |
model_registry = ModelRegistry() | |
def extract_text_from_pdf(pdf_path: str, format_type: str = "txt") -> str: | |
""" | |
Extract and format text from PDF using different processors based on format. | |
Args: | |
pdf_path: Path to PDF file | |
format_type: Either 'txt' or 'md' | |
Returns: | |
Formatted text content | |
""" | |
processor = PDFProcessor() | |
try: | |
if format_type == "txt": | |
return processor.txt_convert(pdf_path) | |
elif format_type == "md": | |
return processor.md_convert_with_pymupdf(pdf_path) | |
else: | |
return f"Error: Unsupported format type: {format_type}" | |
except Exception as e: | |
logging.error(f"Error in PDF conversion: {e}") | |
return f"Error: {str(e)}" | |
def format_content(text: str, format_type: str) -> str: | |
"""Format extracted text according to specified format.""" | |
if format_type == 'txt': | |
return text | |
elif format_type == 'md': | |
paragraphs = text.split('\n\n') | |
return '\n\n'.join(paragraphs) | |
elif format_type == 'html': | |
paragraphs = text.split('\n\n') | |
return ''.join([f'<p>{para.strip()}</p>' for para in paragraphs if para.strip()]) | |
else: | |
logging.error(f"Unsupported format: {format_type}") | |
return f"Unsupported format: {format_type}" | |
def split_into_snippets(text: str, context_size: int) -> List[str]: | |
"""Split text into manageable snippets based on context size.""" | |
sentences = re.split(r'(?<=[.!?]) +', text) | |
snippets = [] | |
current_snippet = "" | |
for sentence in sentences: | |
if len(current_snippet) + len(sentence) + 1 > context_size: | |
if current_snippet: | |
snippets.append(current_snippet.strip()) | |
current_snippet = sentence + " " | |
else: | |
snippets.append(sentence.strip()) | |
current_snippet = "" | |
else: | |
current_snippet += sentence + " " | |
if current_snippet.strip(): | |
snippets.append(current_snippet.strip()) | |
return snippets | |
def build_prompts(snippets: List[str], prompt_instruction: str, custom_prompt: Optional[str], snippet_num: Optional[int] = None) -> str: | |
"""Build formatted prompts from text snippets.""" | |
if snippet_num is not None: | |
if 1 <= snippet_num <= len(snippets): | |
selected_snippets = [snippets[snippet_num - 1]] | |
else: | |
return f"Error: Invalid snippet number. Please choose between 1 and {len(snippets)}." | |
else: | |
selected_snippets = snippets | |
prompts = [] | |
base_prompt = custom_prompt if custom_prompt else prompt_instruction | |
for idx, snippet in enumerate(selected_snippets, start=1): | |
if len(selected_snippets) > 1: | |
prompt_header = f"{base_prompt} Part {idx} of {len(selected_snippets)}: ---\n" | |
else: | |
prompt_header = f"{base_prompt} ---\n" | |
framed_prompt = f"{prompt_header}{snippet}\n---" | |
prompts.append(framed_prompt) | |
return "\n\n".join(prompts) | |
def send_to_model(prompt, model_selection, hf_model_choice, hf_custom_model, hf_api_key, | |
groq_model_choice, groq_api_key, openai_api_key, openai_model_choice): | |
"""Wrapper function for send_to_model_impl with comprehensive error handling.""" | |
logging.info("send to model starting...") | |
if not prompt or not prompt.strip(): | |
return "Error: No prompt provided", None | |
try: | |
logging.info("sending to model preparation.") | |
# Basic input validation | |
valid_selections = ["Clipboard only", "HuggingFace Inference", "Groq API", "OpenAI ChatGPT", "Cohere API"] | |
if model_selection not in valid_selections: | |
return "Error: Invalid model selection", None | |
# Model-specific validation | |
if model_selection == "Groq API" and not groq_api_key: | |
return "Error: Groq API key required", None | |
elif model_selection == "OpenAI ChatGPT" and not openai_api_key: | |
return "Error: OpenAI API key required", None | |
# Call implementation with error handling | |
try: | |
logging.info("calling send_to_model_impl.") | |
summary, download_file = send_to_model_impl( | |
prompt=prompt.strip(), | |
model_selection=model_selection, | |
hf_model_choice=hf_model_choice, | |
hf_custom_model=hf_custom_model, | |
hf_api_key=hf_api_key, | |
groq_model_choice=groq_model_choice, | |
groq_api_key=groq_api_key, | |
openai_api_key=openai_api_key, | |
openai_model_choice=openai_model_choice | |
) | |
logging.info("summary received:", summary) | |
if summary is None or not isinstance(summary, str): | |
return "Error: No response from model", None | |
return summary, download_file | |
except Exception as impl_error: | |
error_msg = str(impl_error) | |
if not error_msg: | |
error_msg = "Unknown error occurred in model implementation" | |
logging.error(f"Model implementation error: {error_msg}") | |
return f"Error: {error_msg}", None | |
except Exception as e: | |
error_msg = str(e) | |
if not error_msg: | |
error_msg = "Unknown error occurred" | |
logging.error(f"Error in send_to_model: {error_msg}") | |
return f"Error: {error_msg}", None | |
finally: | |
logging.info("send to model completed.") | |
def send_to_model_impl(prompt, model_selection, hf_model_choice, hf_custom_model, hf_api_key, | |
groq_model_choice, groq_api_key, openai_api_key, openai_model_choice): | |
"""Implementation of model sending with improved error handling.""" | |
logging.info("send to model impl commencing...") | |
try: | |
if model_selection == "Clipboard only": | |
return "Text copied to clipboard. Use paste for processing.", None | |
if model_selection == "HuggingFace Inference": | |
# First try without API key | |
model_id = hf_custom_model if hf_model_choice == "Custom Model" else model_registry.hf_models[hf_model_choice] | |
summary = send_to_hf_inference(prompt, model_id) | |
if summary.startswith("Error"): | |
if hf_api_key: # If first try failed and we have an API key, try with it | |
summary = send_to_hf_inference(prompt, model_id, hf_api_key) | |
elif model_selection == "Groq API": | |
summary = send_to_groq(prompt, groq_model_choice, groq_api_key) | |
elif model_selection == "OpenAI ChatGPT": | |
summary = send_to_openai(prompt, openai_api_key, model=openai_model_choice) | |
elif model_selection == "Cohere API": | |
summary = send_to_cohere(prompt) | |
else: | |
return "Error: Invalid model selection", None | |
# Validate response | |
if not summary or not isinstance(summary, str): | |
return "Error: Invalid response from model", None | |
# Create download file for valid responses | |
if not summary.startswith("Error"): | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: | |
f.write(summary) | |
return summary, f.name | |
return summary, None | |
except Exception as e: | |
error_msg = str(e) | |
if not error_msg: | |
error_msg = "Unknown error occurred" | |
logging.error(f"Error in send_to_model_impl: {error_msg}") | |
return f"Error: {error_msg}", None | |
def send_to_hf_inference(prompt: str, model_name: str, api_key: str = None) -> str: | |
"""Send prompt to HuggingFace Inference API with optional authentication.""" | |
try: | |
client = InferenceClient(token=api_key) if api_key else InferenceClient() | |
response = client.text_generation( | |
prompt, | |
model=model_name, | |
max_new_tokens=500, | |
temperature=0.7, | |
top_p=0.95, | |
repetition_penalty=1.1 | |
) | |
return str(response) | |
except Exception as e: | |
logging.error(f"HuggingFace inference error: {e}") | |
return f"Error with HuggingFace inference: {str(e)}" # Return error message instead of raising | |
def send_to_hf_inference_old(prompt: str, model_name: str, api_key: str = None) -> str: | |
"""Send prompt to HuggingFace Inference API with optional authentication.""" | |
try: | |
# First try without authentication | |
try: | |
client = InferenceClient() # No token | |
response = client.text_generation( | |
prompt, | |
model=model_name, | |
max_new_tokens=500, | |
temperature=0.7, | |
top_p=0.95, | |
repetition_penalty=1.1 | |
) | |
return str(response) | |
except Exception as public_error: | |
logging.info(f"Public inference failed: {public_error}") | |
# If that fails and we have an API key, try with authentication | |
if api_key: | |
client = InferenceClient(token=api_key) | |
response = client.text_generation( | |
prompt, | |
model=model_name, | |
max_new_tokens=500, | |
temperature=0.7, | |
top_p=0.95, | |
repetition_penalty=1.1 | |
) | |
return str(response) | |
else: | |
# If we don't have an API key, inform the user they need one | |
return "Error: This model requires authentication. Please enter your HuggingFace API key." | |
except Exception as e: | |
logging.error(f"HuggingFace inference error: {e}") | |
return f"Error with HuggingFace inference: {str(e)}" | |
def send_to_groq(prompt: str, model_name: str, api_key: str) -> str: | |
"""Send prompt to Groq API with better error handling.""" | |
try: | |
client = Groq(api_key=api_key) | |
response = client.chat.completions.create( | |
model=model_name, | |
messages=[{ | |
"role": "user", | |
"content": prompt | |
}], | |
temperature=0.7, | |
max_tokens=500, | |
top_p=0.95 | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
logging.error(f"Groq API error: {e}") | |
raise # Re-raise to be handled by caller | |
def send_to_openai(prompt: str, api_key: str, model: str = "gpt-3.5-turbo") -> str: | |
"""Send prompt to OpenAI API.""" | |
try: | |
from openai import OpenAI | |
client = OpenAI(api_key=api_key) | |
response = client.chat.completions.create( | |
model=model, | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant that provides detailed responses."}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=0.7, | |
max_tokens=500, | |
top_p=0.95 | |
) | |
if response.choices and len(response.choices) > 0: | |
return response.choices[0].message.content | |
else: | |
raise Exception("No response generated") | |
except ImportError: | |
raise Exception("Please install the latest version of openai package (pip install --upgrade openai)") | |
except Exception as e: | |
logging.error(f"OpenAI API error: {e}") | |
raise # Re-raise to be handled by caller | |
def send_to_cohere(prompt: str, api_key: str = None) -> str: | |
"""Send prompt to Cohere API with optional authentication.""" | |
try: | |
import cohere | |
client = cohere.Client(api_key) if api_key else cohere.Client() | |
response = client.chat( | |
message=prompt, | |
temperature=0.7, | |
max_tokens=500, | |
) | |
if hasattr(response, 'text'): | |
return response.text | |
else: | |
return "Error: No response text from Cohere" | |
except Exception as e: | |
logging.error(f"Cohere API error: {e}") | |
return f"Error with Cohere API: {str(e)}" # Return error message instead of raising | |
def copy_text_js(element_id: str) -> str: | |
return f"""function() {{ | |
let textarea = document.getElementById('{element_id}'); | |
if (!textarea) return 'Element not found'; | |
textarea.select(); | |
try {{ | |
document.execCommand('copy'); | |
return 'Copied to clipboard!'; | |
}} catch(err) {{ | |
return 'Failed to copy: ' + err; | |
}} | |
}}""" | |
def open_chatgpt() -> str: | |
"""Open ChatGPT in new browser tab""" | |
return """window.open('https://chat.openai.com/', '_blank');""" | |
def process_pdf(pdf, fmt, ctx_size): | |
"""Process PDF and return text and snippets""" | |
try: | |
if not pdf: | |
return "Please upload a PDF file.", "", [], None | |
# Extract text | |
text = extract_text_from_pdf(pdf.name) | |
if text.startswith("Error"): | |
return text, "", [], None | |
# Format content | |
formatted_text = format_content(text, fmt) | |
# Split into snippets | |
snippets = split_into_snippets(formatted_text, ctx_size) | |
# Save full text for download | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as text_file: | |
text_file.write(formatted_text) | |
snippet_choices = [f"Snippet {i+1} of {len(snippets)}" for i in range(len(snippets))] | |
return ( | |
"PDF processed successfully!", | |
formatted_text, | |
snippets, | |
snippet_choices, | |
[text_file.name] | |
) | |
except Exception as e: | |
logging.error(f"Error processing PDF: {e}") | |
return f"Error processing PDF: {str(e)}", "", [], None | |
def generate_prompt(text, template, snippet_idx=None): | |
"""Generate prompt from text or selected snippet""" | |
try: | |
if not text: | |
return "No text available.", "", None | |
default_prompt = "Summarize the following text:" | |
prompt_template = template if template else default_prompt | |
if isinstance(text, list): | |
# If text is list of snippets | |
if snippet_idx is not None: | |
if 0 <= snippet_idx < len(text): | |
content = text[snippet_idx] | |
else: | |
return "Invalid snippet index.", "", None | |
else: | |
content = "\n\n".join(text) | |
else: | |
content = text | |
prompt = f"{prompt_template}\n---\n{content}\n---" | |
# Save prompt for download | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as prompt_file: | |
prompt_file.write(prompt) | |
return "Prompt generated!", prompt, [prompt_file.name] | |
except Exception as e: | |
logging.error(f"Error generating prompt: {e}") | |
return f"Error generating prompt: {str(e)}", "", None | |
# Main Interface | |
with gr.Blocks(css=""" | |
.gradio-container {max-width: 90%; margin: 0 auto;} | |
@media (max-width: 768px) {.gradio-container {max-width: 98%; padding: 10px;} .gr-row {flex-direction: column;} .gr-col {width: 100%; margin-bottom: 10px;}} | |
""") as demo: | |
# State variables | |
pdf_content = gr.State("") | |
snippets = gr.State([]) | |
# Header | |
gr.Markdown("# π Smart PDF Summarizer") | |
gr.Markdown("Upload a PDF document and get AI-powered summaries using various AI models.") | |
with gr.Tabs() as tabs: | |
# Tab 1: PDF Processing | |
with gr.Tab("1οΈβ£ PDF Processing"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
pdf_input = gr.File( | |
label="π Upload PDF", | |
file_types=[".pdf"] | |
) | |
format_type = gr.Radio( | |
choices=["txt", "md"], | |
value="txt", | |
label="π Output Format" | |
) | |
context_size = gr.Slider( | |
minimum=1000, | |
maximum=200000, | |
step=1000, | |
value=4096, | |
label="Context Size" | |
) | |
gr.Markdown("### Context Size") | |
with gr.Row(): | |
for size_name, size_value in CONTEXT_SIZES.items(): | |
gr.Button( | |
size_name, | |
size="sm", | |
scale=1 | |
).click( | |
lambda v=size_value: gr.update(value=v), | |
None, | |
context_size | |
) | |
process_button = gr.Button("π Process PDF", variant="primary") | |
with gr.Column(scale=1): | |
progress_status = gr.Textbox( | |
label="Status", | |
interactive=False, | |
show_label=True, | |
visible=True # Ensure error messages are always visible | |
) | |
processed_text = gr.Textbox( | |
label="Processed Text", | |
lines=10, | |
max_lines=50, | |
show_copy_button=True | |
) | |
download_full_text = gr.File(label="π₯ Download Full Text") | |
# Tab 2: Snippet Selection | |
with gr.Tab("2οΈβ£ Snippet Selection"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
snippet_selector = gr.Dropdown( | |
label="Select Snippet", | |
choices=[], | |
interactive=True | |
) | |
custom_prompt = gr.Textbox( | |
label="βοΈ Custom Prompt Template", | |
placeholder="Enter your custom prompt here...", | |
lines=2 | |
) | |
generate_prompt_btn = gr.Button("Generate Prompt", variant="primary") | |
with gr.Column(scale=1): | |
generated_prompt = gr.Textbox( | |
label="π Generated Prompt", | |
lines=10, | |
max_lines=50, | |
show_copy_button=True, | |
elem_id="generated_prompt" # Add this | |
) | |
with gr.Row(): | |
download_prompt = gr.File(label="π₯ Download Prompt") | |
download_snippet = gr.File(label="π₯ Download Selected Snippet") | |
# Tab 3: Model Processing | |
with gr.Tab("3οΈβ£ Model Processing"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
model_choice = gr.Radio( | |
choices=list(MODEL_CONTEXT_SIZES.keys()), | |
value="Clipboard only", | |
label="π€ Provider Selection" | |
) | |
with gr.Column(visible=False) as openai_options: | |
openai_model = gr.Dropdown( | |
choices=list(MODEL_CONTEXT_SIZES["OpenAI ChatGPT"].keys()), | |
value="gpt-3.5-turbo", | |
label="OpenAI Model" | |
) | |
openai_api_key = gr.Textbox( | |
label="π OpenAI API Key", | |
type="password" | |
) | |
with gr.Column(visible=False) as hf_options: | |
hf_model = gr.Dropdown( | |
choices=list(model_registry.hf_models.keys()), | |
label="π§ HuggingFace Model", | |
value="Phi-3 Mini 4K" | |
) | |
hf_custom_model = gr.Textbox( # This needs to be defined before being used | |
label="Custom Model ID", | |
placeholder="Enter custom model ID...", | |
visible=False | |
) | |
hf_api_key = gr.Textbox( | |
label="π HuggingFace API Key", | |
type="password" | |
) | |
with gr.Column(visible=False) as groq_options: | |
groq_model = gr.Dropdown( | |
choices=list(model_registry.groq_models.keys()), # Use model_registry.groq_models | |
value=list(model_registry.groq_models.keys())[0] if model_registry.groq_models else None, # Set a default value if available | |
label="Groq Model" | |
) | |
groq_api_key = gr.Textbox( | |
label="π Groq API Key", | |
type="password" | |
) | |
groq_refresh_btn = gr.Button("π Refresh Groq Models") # Add refresh button | |
send_to_model_btn = gr.Button("π Send to Model", variant="primary") | |
open_chatgpt_button = gr.Button("π Open ChatGPT") | |
with gr.Column(scale=1): | |
summary_output = gr.Textbox( | |
label="π Summary", | |
lines=15, | |
max_lines=50, | |
show_copy_button=True, | |
elem_id="summary_output" # Add this | |
) | |
with gr.Row(): | |
download_summary = gr.File(label="π₯ Download Summary") | |
# Hidden components for file handling | |
download_files = gr.Files(label="π₯ Downloads", visible=False) | |
# Event Handlers | |
def update_context_size(size: int) -> None: | |
"""Update context size slider with validation""" | |
if not isinstance(size, (int, float)): | |
size = 4096 # Default size | |
return gr.update(value=int(size)) | |
def get_model_context_size(choice: str, groq_model: str = None) -> int: | |
"""Get context size for model with better defaults""" | |
if choice == "Groq API" and groq_model: | |
return MODEL_CONTEXT_SIZES["Groq API"].get(groq_model, 4096) | |
elif choice == "OpenAI ChatGPT": | |
return 4096 | |
elif choice == "HuggingFace Inference": | |
return 4096 | |
return 32000 # Safe default | |
def update_snippet_choices(snippets_list: List[str]) -> List[str]: | |
"""Create formatted snippet choices""" | |
return [f"Snippet {i+1} of {len(snippets_list)}" for i in range(len(snippets_list))] | |
def get_snippet_index(choice: str) -> int: | |
"""Extract snippet index from choice string""" | |
if not choice: | |
return 0 | |
try: | |
return int(choice.split()[1]) - 1 | |
except: | |
return 0 | |
def toggle_model_options(choice): | |
return ( | |
gr.update(visible=choice == "HuggingFace Inference"), | |
gr.update(visible=choice == "Groq API"), | |
gr.update(visible=choice == "OpenAI ChatGPT") | |
) | |
def refresh_groq_models_list(): | |
try: | |
with gr.Progress() as progress: | |
progress(0, "Refreshing Groq models...") | |
updated_models = model_registry.refresh_groq_models() | |
progress(1, "Complete!") | |
return gr.update(choices=list(updated_models.keys())) | |
except Exception as e: | |
logging.error(f"Error refreshing models: {e}") | |
return gr.update() | |
def toggle_custom_model(model_name): | |
return gr.update(visible=model_name == "Custom Model") | |
def handle_groq_model_change(model_name): | |
"""Handle Groq model selection change""" | |
return update_context_size("Groq API", model_name) | |
def handle_model_selection(choice): | |
"""Handle model selection and update UI""" | |
ctx_size = MODEL_CONTEXT_SIZES.get(choice, {}) | |
if isinstance(ctx_size, dict): | |
first_model = list(ctx_size.keys())[0] | |
ctx_size = ctx_size[first_model] | |
# Prepare dropdown choices based on provider | |
if choice == "OpenAI ChatGPT": | |
model_choices = list(MODEL_CONTEXT_SIZES["OpenAI ChatGPT"].keys()) | |
return [ | |
gr.update(visible=False), # hf_options | |
gr.update(visible=False), # groq_options | |
gr.update(visible=True), # openai_options | |
gr.update(value=ctx_size), # context_size | |
gr.Dropdown(choices=model_choices, value=first_model) # openai_model | |
] | |
elif choice == "HuggingFace Inference": | |
model_choices = list(model_registry.hf_models.keys()) | |
return [ | |
gr.update(visible=True), # hf_options | |
gr.update(visible=False), # groq_options | |
gr.update(visible=False), # openai_options | |
gr.update(value=ctx_size), # context_size | |
gr.Dropdown(choices=model_choices, value="Phi-3 Mini 4K") # openai_model (not used) | |
] | |
elif choice == "Groq API": | |
model_choices = list(model_registry.groq_models.keys()) | |
return [ | |
gr.update(visible=False), # hf_options | |
gr.update(visible=True), # groq_options | |
gr.update(visible=False), # openai_options | |
gr.update(value=ctx_size), # context_size | |
gr.Dropdown(choices=model_choices, value=model_choices[0] if model_choices else None) # openai_model (not used) | |
] | |
# Default return for "Clipboard only" or other options | |
return [ | |
gr.update(visible=False), # hf_options | |
gr.update(visible=False), # groq_options | |
gr.update(visible=False), # openai_options | |
gr.update(value=4096), # context_size | |
gr.Dropdown(choices=[]) # openai_model (not used) | |
] | |
# PDF Processing Handlers | |
def handle_pdf_process(pdf, fmt, ctx_size): # Remove md_eng parameter | |
if not pdf: | |
return "Please upload a PDF file.", "", "", [], gr.update(choices=[], value=None), None | |
try: | |
text = extract_text_from_pdf(pdf.name, format_type=fmt) # Just use format_type | |
if text.startswith("Error"): | |
return text, "", "", [], gr.update(choices=[], value=None), None | |
# The important part: still do snippets processing | |
snippets_list = split_into_snippets(text, ctx_size) | |
snippet_choices = update_snippet_choices(snippets_list) | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=f'.{fmt}') as f: | |
f.write(text) | |
download_file = f.name | |
return ( | |
f"PDF processed successfully! Generated {len(snippets_list)} snippets.", | |
text, | |
text, | |
snippets_list, | |
gr.update(choices=snippet_choices, value=snippet_choices[0] if snippet_choices else None), | |
download_file | |
) | |
except Exception as e: | |
error_msg = f"Error processing PDF: {str(e)}" | |
logging.error(error_msg) | |
return error_msg, "", "", [], gr.update(choices=[], value=None), None | |
def handle_snippet_selection(choice, snippets_list): # Add download_snippet output | |
"""Handle snippet selection, update prompt, and provide snippet download.""" | |
if not snippets_list: | |
return "No snippets available.", "", None # Return None for download | |
try: | |
idx = get_snippet_index(choice) | |
selected_snippet = snippets_list[idx] | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: | |
f.write(selected_snippet) | |
snippet_download_file = f.name # Store the file path | |
return ( | |
f"Selected snippet {idx + 1}", | |
selected_snippet, | |
snippet_download_file # Return file for download | |
) | |
except Exception as e: | |
error_msg = f"Error selecting snippet: {str(e)}" | |
logging.error(error_msg) | |
return ( | |
error_msg, | |
"", | |
None | |
) | |
# Copy button handlers | |
def handle_prompt_generation(snippet_text, template, snippet_choice, snippets_list): | |
try: | |
if not snippets_list: | |
return "No text available.", "", None | |
idx = get_snippet_index(snippet_choice) | |
base_prompt = template if template else "Summarize the following text:" | |
content = snippets_list[idx] | |
prompt = f"{base_prompt}\n---\n{content}\n---" | |
# Save prompt for download | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: | |
f.write(prompt) | |
download_file = f.name | |
return "Prompt generated!", prompt, download_file # Return the file for download_prompt | |
except Exception as e: | |
logging.error(f"Error generating prompt: {e}") | |
return f"Error: {str(e)}", "", None | |
def handle_copy_action(text): | |
"""Handle copy to clipboard action""" | |
return { | |
progress_status: gr.update(value="Text copied to clipboard!", visible=True) | |
} | |
# Connect all event handlers | |
# Core event handlers | |
process_button.click( | |
handle_pdf_process, | |
inputs=[pdf_input, format_type, context_size], | |
outputs=[progress_status, processed_text, pdf_content, snippets, snippet_selector, download_full_text] | |
) | |
generate_prompt_btn.click( | |
handle_prompt_generation, | |
inputs=[generated_prompt, custom_prompt, snippet_selector, snippets], | |
outputs=[progress_status, generated_prompt, download_prompt] | |
) | |
# Snippet handling | |
snippet_selector.change( | |
handle_snippet_selection, | |
inputs=[snippet_selector, snippets], | |
outputs=[progress_status, generated_prompt, download_snippet] # Connect download_snippet | |
) | |
# Model selection | |
model_choice.change( | |
handle_model_selection, | |
inputs=[model_choice], | |
outputs=[ | |
hf_options, | |
groq_options, | |
openai_options, | |
context_size, | |
openai_model | |
] | |
) | |
hf_model.change( | |
toggle_custom_model, | |
inputs=[hf_model], | |
outputs=[hf_custom_model] | |
) | |
groq_model.change( | |
handle_groq_model_change, | |
inputs=[groq_model], | |
outputs=[context_size] | |
) | |
def download_file(content: str, prefix: str) -> List[str]: | |
if not content: | |
return [] | |
try: | |
filename = f"{prefix}_{int(time.time())}.txt" # Add timestamp | |
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt', prefix=filename) as f: | |
f.write(content) | |
return [f.name] | |
except Exception as e: | |
logging.error(f"Error creating download file: {e}") | |
return [] | |
# ChatGPT handler | |
open_chatgpt_button.click( | |
fn=lambda: "window.open('https://chat.openai.com/', '_blank'); return 'Opened ChatGPT in new tab';", | |
inputs=None, | |
outputs=progress_status, | |
js=True | |
) | |
# Model processing | |
send_to_model_btn.click( | |
send_to_model, | |
inputs=[ | |
generated_prompt, | |
model_choice, | |
hf_model, | |
hf_custom_model, | |
hf_api_key, | |
groq_model, | |
groq_api_key, | |
openai_api_key, | |
openai_model | |
], | |
outputs=[summary_output, download_summary] | |
) | |
groq_refresh_btn.click( | |
refresh_groq_models_list, | |
outputs=[groq_model] | |
) | |
# Instructions | |
gr.Markdown(""" | |
### π Instructions: | |
1. Upload a PDF document | |
2. Choose output format and context window size | |
3. Select snippet number (default: 1) or enter custom prompt | |
4. Select your preferred model in case you want to proceed directly (or continue with 5): | |
- OpenAI ChatGPT: Manual copy/paste workflow | |
- HuggingFace Inference: Direct API integration | |
- Groq API: High-performance inference | |
5. Click 'Process PDF' to generate summary | |
6. Use 'Copy Prompt' and, optionally, 'Open ChatGPT' for manual processing | |
7. Download generated files as needed | |
""") | |
# Launch the interface | |
if __name__ == "__main__": | |
demo.launch(share=False, debug=True) |