PDF-Summarizer / functions.py
cstr's picture
Create functions.py
0ae08d5 verified
raw
history blame
9.09 kB
import logging
from pathlib import Path
from typing import List, Dict, Union, Optional
import re
import openai
import requests
from PyPDF2 import PdfReader
from gradio_client import Client
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def extract_text_from_pdf(file_path: str) -> str:
"""
Extract text from a PDF file with robust error handling.
Args:
file_path: Path to the PDF file
Returns:
Extracted text as a string
Raises:
ValueError: If file doesn't exist or isn't readable
RuntimeError: If text extraction fails
"""
try:
if not Path(file_path).exists():
raise ValueError(f"PDF file not found: {file_path}")
reader = PdfReader(file_path)
text_content = []
for page_num, page in enumerate(reader.pages, 1):
try:
text = page.extract_text()
if text.strip():
text_content.append(text)
else:
logger.warning(f"Page {page_num} appears to be empty or unreadable")
except Exception as e:
logger.error(f"Error extracting text from page {page_num}: {str(e)}")
continue
if not text_content:
raise RuntimeError("No readable text found in PDF")
return "\n\n".join(text_content)
except Exception as e:
logger.error(f"PDF extraction failed: {str(e)}")
raise RuntimeError(f"Failed to process PDF: {str(e)}")
def format_content(text: str, format_type: str) -> str:
"""
Format extracted text into the specified output format.
Args:
text: Raw text content
format_type: Output format ('txt', 'md', 'html')
Returns:
Formatted text string
Raises:
ValueError: If format type is invalid
"""
if not isinstance(text, str):
raise ValueError("Input text must be a string")
# Clean up common PDF extraction artifacts
text = re.sub(r'\s+', ' ', text) # Normalize whitespace
text = re.sub(r'(?<=[.!?])\s+', '\n\n', text) # Split sentences into paragraphs
text = text.strip()
if format_type.lower() == 'txt':
return text
elif format_type.lower() == 'md':
paragraphs = text.split('\n\n')
md_text = []
for para in paragraphs:
# Detect and format headers
if re.match(r'^[A-Z][^.!?]*$', para.strip()):
md_text.append(f"## {para.strip()}")
else:
md_text.append(para.strip())
return '\n\n'.join(md_text)
elif format_type.lower() == 'html':
paragraphs = text.split('\n\n')
html_parts = ['<!DOCTYPE html>', '<html>', '<body>']
for para in paragraphs:
if re.match(r'^[A-Z][^.!?]*$', para.strip()):
html_parts.append(f"<h2>{para.strip()}</h2>")
else:
html_parts.append(f"<p>{para.strip()}</p>")
html_parts.extend(['</body>', '</html>'])
return '\n'.join(html_parts)
else:
raise ValueError(f"Unsupported format type: {format_type}")
def split_into_snippets(text: str, chunk_size: int = 4000, overlap: int = 200) -> List[str]:
"""
Split text into overlapping chunks that fit within model context windows.
Args:
text: Input text to split
chunk_size: Maximum size of each chunk
overlap: Number of characters to overlap between chunks
Returns:
List of text snippets
Raises:
ValueError: If chunk_size is too small or text is empty
"""
if not text:
raise ValueError("Input text is empty")
if chunk_size < 1000:
raise ValueError("Chunk size must be at least 1000 characters")
# Split into paragraphs first
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_size = 0
for para in paragraphs:
para_size = len(para)
if current_size + para_size <= chunk_size:
current_chunk.append(para)
current_size += para_size + 2 # +2 for newlines
else:
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
# Start new chunk with overlap
if chunks:
overlap_text = chunks[-1][-overlap:] if overlap > 0 else ""
current_chunk = [overlap_text, para]
current_size = len(overlap_text) + para_size + 2
else:
current_chunk = [para]
current_size = para_size
# Add the last chunk if it exists
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
def build_prompts(chunks: List[str], custom_prompt: Optional[str] = None) -> List[str]:
"""
Build formatted prompts for each text chunk.
Args:
chunks: List of text chunks
custom_prompt: Optional custom instruction
Returns:
List of formatted prompt strings
"""
default_prompt = """Please analyze and summarize the following text. Focus on:
1. Key points and main ideas
2. Important details and supporting evidence
3. Any conclusions or recommendations
Please maintain the original meaning while being concise."""
instruction = custom_prompt if custom_prompt else default_prompt
prompts = []
for i, chunk in enumerate(chunks, 1):
prompt = f"""### Instruction
{instruction}
### Input Text (Part {i} of {len(chunks)})
{chunk}
### End of Input Text
Please provide your summary below:"""
prompts.append(prompt)
return prompts
def process_with_model(
prompt: str,
model_choice: str,
api_key: Optional[str] = None,
oauth_token: Optional[str] = None
) -> str:
"""
Process text with selected model.
Args:
prompt: Input prompt
model_choice: Selected model name
api_key: OpenAI API key for GPT models
oauth_token: Hugging Face token for other models
Returns:
Generated summary
Raises:
ValueError: If required credentials are missing
RuntimeError: If model processing fails
"""
try:
if 'gpt' in model_choice.lower():
if not api_key:
raise ValueError("OpenAI API key required for GPT models")
openai.api_key = api_key
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo" if "3.5" in model_choice else "gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1500
)
return response.choices[0].message.content
else: # Hugging Face models
if not oauth_token:
raise ValueError("Hugging Face token required")
headers = {"Authorization": f"Bearer {oauth_token}"}
# Map model choice to actual model ID
model_map = {
"Claude-3": "anthropic/claude-3-opus-20240229",
"Mistral": "mistralai/Mixtral-8x7B-Instruct-v0.1"
}
model_id = model_map.get(model_choice)
if not model_id:
raise ValueError(f"Unknown model: {model_choice}")
response = requests.post(
f"https://api-inference.huggingface.co/models/{model_id}",
headers=headers,
json={"inputs": prompt}
)
if response.status_code != 200:
raise RuntimeError(f"Model API error: {response.text}")
return response.json()[0]["generated_text"]
except Exception as e:
logger.error(f"Model processing failed: {str(e)}")
raise RuntimeError(f"Failed to process with model: {str(e)}")
def validate_api_keys(openai_key: Optional[str] = None, hf_token: Optional[str] = None) -> Dict[str, bool]:
"""
Validate API keys for different services.
Args:
openai_key: OpenAI API key
hf_token: Hugging Face token
Returns:
Dictionary with validation results
"""
results = {"openai": False, "huggingface": False}
if openai_key:
try:
openai.api_key = openai_key
openai.Model.list()
results["openai"] = True
except:
pass
if hf_token:
try:
response = requests.get(
"https://huggingface.co/api/models",
headers={"Authorization": f"Bearer {hf_token}"}
)
results["huggingface"] = response.status_code == 200
except:
pass
return results