PDF-Summarizer / functions.py
cstr's picture
Create functions.py
0ae08d5 verified
import logging
from pathlib import Path
from typing import List, Dict, Union, Optional
import re
import openai
import requests
from PyPDF2 import PdfReader
from gradio_client import Client
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def extract_text_from_pdf(file_path: str) -> str:
"""
Extract text from a PDF file with robust error handling.
Args:
file_path: Path to the PDF file
Returns:
Extracted text as a string
Raises:
ValueError: If file doesn't exist or isn't readable
RuntimeError: If text extraction fails
"""
try:
if not Path(file_path).exists():
raise ValueError(f"PDF file not found: {file_path}")
reader = PdfReader(file_path)
text_content = []
for page_num, page in enumerate(reader.pages, 1):
try:
text = page.extract_text()
if text.strip():
text_content.append(text)
else:
logger.warning(f"Page {page_num} appears to be empty or unreadable")
except Exception as e:
logger.error(f"Error extracting text from page {page_num}: {str(e)}")
continue
if not text_content:
raise RuntimeError("No readable text found in PDF")
return "\n\n".join(text_content)
except Exception as e:
logger.error(f"PDF extraction failed: {str(e)}")
raise RuntimeError(f"Failed to process PDF: {str(e)}")
def format_content(text: str, format_type: str) -> str:
"""
Format extracted text into the specified output format.
Args:
text: Raw text content
format_type: Output format ('txt', 'md', 'html')
Returns:
Formatted text string
Raises:
ValueError: If format type is invalid
"""
if not isinstance(text, str):
raise ValueError("Input text must be a string")
# Clean up common PDF extraction artifacts
text = re.sub(r'\s+', ' ', text) # Normalize whitespace
text = re.sub(r'(?<=[.!?])\s+', '\n\n', text) # Split sentences into paragraphs
text = text.strip()
if format_type.lower() == 'txt':
return text
elif format_type.lower() == 'md':
paragraphs = text.split('\n\n')
md_text = []
for para in paragraphs:
# Detect and format headers
if re.match(r'^[A-Z][^.!?]*$', para.strip()):
md_text.append(f"## {para.strip()}")
else:
md_text.append(para.strip())
return '\n\n'.join(md_text)
elif format_type.lower() == 'html':
paragraphs = text.split('\n\n')
html_parts = ['<!DOCTYPE html>', '<html>', '<body>']
for para in paragraphs:
if re.match(r'^[A-Z][^.!?]*$', para.strip()):
html_parts.append(f"<h2>{para.strip()}</h2>")
else:
html_parts.append(f"<p>{para.strip()}</p>")
html_parts.extend(['</body>', '</html>'])
return '\n'.join(html_parts)
else:
raise ValueError(f"Unsupported format type: {format_type}")
def split_into_snippets(text: str, chunk_size: int = 4000, overlap: int = 200) -> List[str]:
"""
Split text into overlapping chunks that fit within model context windows.
Args:
text: Input text to split
chunk_size: Maximum size of each chunk
overlap: Number of characters to overlap between chunks
Returns:
List of text snippets
Raises:
ValueError: If chunk_size is too small or text is empty
"""
if not text:
raise ValueError("Input text is empty")
if chunk_size < 1000:
raise ValueError("Chunk size must be at least 1000 characters")
# Split into paragraphs first
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_size = 0
for para in paragraphs:
para_size = len(para)
if current_size + para_size <= chunk_size:
current_chunk.append(para)
current_size += para_size + 2 # +2 for newlines
else:
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
# Start new chunk with overlap
if chunks:
overlap_text = chunks[-1][-overlap:] if overlap > 0 else ""
current_chunk = [overlap_text, para]
current_size = len(overlap_text) + para_size + 2
else:
current_chunk = [para]
current_size = para_size
# Add the last chunk if it exists
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
def build_prompts(chunks: List[str], custom_prompt: Optional[str] = None) -> List[str]:
"""
Build formatted prompts for each text chunk.
Args:
chunks: List of text chunks
custom_prompt: Optional custom instruction
Returns:
List of formatted prompt strings
"""
default_prompt = """Please analyze and summarize the following text. Focus on:
1. Key points and main ideas
2. Important details and supporting evidence
3. Any conclusions or recommendations
Please maintain the original meaning while being concise."""
instruction = custom_prompt if custom_prompt else default_prompt
prompts = []
for i, chunk in enumerate(chunks, 1):
prompt = f"""### Instruction
{instruction}
### Input Text (Part {i} of {len(chunks)})
{chunk}
### End of Input Text
Please provide your summary below:"""
prompts.append(prompt)
return prompts
def process_with_model(
prompt: str,
model_choice: str,
api_key: Optional[str] = None,
oauth_token: Optional[str] = None
) -> str:
"""
Process text with selected model.
Args:
prompt: Input prompt
model_choice: Selected model name
api_key: OpenAI API key for GPT models
oauth_token: Hugging Face token for other models
Returns:
Generated summary
Raises:
ValueError: If required credentials are missing
RuntimeError: If model processing fails
"""
try:
if 'gpt' in model_choice.lower():
if not api_key:
raise ValueError("OpenAI API key required for GPT models")
openai.api_key = api_key
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo" if "3.5" in model_choice else "gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1500
)
return response.choices[0].message.content
else: # Hugging Face models
if not oauth_token:
raise ValueError("Hugging Face token required")
headers = {"Authorization": f"Bearer {oauth_token}"}
# Map model choice to actual model ID
model_map = {
"Claude-3": "anthropic/claude-3-opus-20240229",
"Mistral": "mistralai/Mixtral-8x7B-Instruct-v0.1"
}
model_id = model_map.get(model_choice)
if not model_id:
raise ValueError(f"Unknown model: {model_choice}")
response = requests.post(
f"https://api-inference.huggingface.co/models/{model_id}",
headers=headers,
json={"inputs": prompt}
)
if response.status_code != 200:
raise RuntimeError(f"Model API error: {response.text}")
return response.json()[0]["generated_text"]
except Exception as e:
logger.error(f"Model processing failed: {str(e)}")
raise RuntimeError(f"Failed to process with model: {str(e)}")
def validate_api_keys(openai_key: Optional[str] = None, hf_token: Optional[str] = None) -> Dict[str, bool]:
"""
Validate API keys for different services.
Args:
openai_key: OpenAI API key
hf_token: Hugging Face token
Returns:
Dictionary with validation results
"""
results = {"openai": False, "huggingface": False}
if openai_key:
try:
openai.api_key = openai_key
openai.Model.list()
results["openai"] = True
except:
pass
if hf_token:
try:
response = requests.get(
"https://huggingface.co/api/models",
headers={"Authorization": f"Bearer {hf_token}"}
)
results["huggingface"] = response.status_code == 200
except:
pass
return results