Spaces:
Running
Running
File size: 9,094 Bytes
0ae08d5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
import logging
from pathlib import Path
from typing import List, Dict, Union, Optional
import re
import openai
import requests
from PyPDF2 import PdfReader
from gradio_client import Client
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def extract_text_from_pdf(file_path: str) -> str:
"""
Extract text from a PDF file with robust error handling.
Args:
file_path: Path to the PDF file
Returns:
Extracted text as a string
Raises:
ValueError: If file doesn't exist or isn't readable
RuntimeError: If text extraction fails
"""
try:
if not Path(file_path).exists():
raise ValueError(f"PDF file not found: {file_path}")
reader = PdfReader(file_path)
text_content = []
for page_num, page in enumerate(reader.pages, 1):
try:
text = page.extract_text()
if text.strip():
text_content.append(text)
else:
logger.warning(f"Page {page_num} appears to be empty or unreadable")
except Exception as e:
logger.error(f"Error extracting text from page {page_num}: {str(e)}")
continue
if not text_content:
raise RuntimeError("No readable text found in PDF")
return "\n\n".join(text_content)
except Exception as e:
logger.error(f"PDF extraction failed: {str(e)}")
raise RuntimeError(f"Failed to process PDF: {str(e)}")
def format_content(text: str, format_type: str) -> str:
"""
Format extracted text into the specified output format.
Args:
text: Raw text content
format_type: Output format ('txt', 'md', 'html')
Returns:
Formatted text string
Raises:
ValueError: If format type is invalid
"""
if not isinstance(text, str):
raise ValueError("Input text must be a string")
# Clean up common PDF extraction artifacts
text = re.sub(r'\s+', ' ', text) # Normalize whitespace
text = re.sub(r'(?<=[.!?])\s+', '\n\n', text) # Split sentences into paragraphs
text = text.strip()
if format_type.lower() == 'txt':
return text
elif format_type.lower() == 'md':
paragraphs = text.split('\n\n')
md_text = []
for para in paragraphs:
# Detect and format headers
if re.match(r'^[A-Z][^.!?]*$', para.strip()):
md_text.append(f"## {para.strip()}")
else:
md_text.append(para.strip())
return '\n\n'.join(md_text)
elif format_type.lower() == 'html':
paragraphs = text.split('\n\n')
html_parts = ['<!DOCTYPE html>', '<html>', '<body>']
for para in paragraphs:
if re.match(r'^[A-Z][^.!?]*$', para.strip()):
html_parts.append(f"<h2>{para.strip()}</h2>")
else:
html_parts.append(f"<p>{para.strip()}</p>")
html_parts.extend(['</body>', '</html>'])
return '\n'.join(html_parts)
else:
raise ValueError(f"Unsupported format type: {format_type}")
def split_into_snippets(text: str, chunk_size: int = 4000, overlap: int = 200) -> List[str]:
"""
Split text into overlapping chunks that fit within model context windows.
Args:
text: Input text to split
chunk_size: Maximum size of each chunk
overlap: Number of characters to overlap between chunks
Returns:
List of text snippets
Raises:
ValueError: If chunk_size is too small or text is empty
"""
if not text:
raise ValueError("Input text is empty")
if chunk_size < 1000:
raise ValueError("Chunk size must be at least 1000 characters")
# Split into paragraphs first
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_size = 0
for para in paragraphs:
para_size = len(para)
if current_size + para_size <= chunk_size:
current_chunk.append(para)
current_size += para_size + 2 # +2 for newlines
else:
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
# Start new chunk with overlap
if chunks:
overlap_text = chunks[-1][-overlap:] if overlap > 0 else ""
current_chunk = [overlap_text, para]
current_size = len(overlap_text) + para_size + 2
else:
current_chunk = [para]
current_size = para_size
# Add the last chunk if it exists
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
def build_prompts(chunks: List[str], custom_prompt: Optional[str] = None) -> List[str]:
"""
Build formatted prompts for each text chunk.
Args:
chunks: List of text chunks
custom_prompt: Optional custom instruction
Returns:
List of formatted prompt strings
"""
default_prompt = """Please analyze and summarize the following text. Focus on:
1. Key points and main ideas
2. Important details and supporting evidence
3. Any conclusions or recommendations
Please maintain the original meaning while being concise."""
instruction = custom_prompt if custom_prompt else default_prompt
prompts = []
for i, chunk in enumerate(chunks, 1):
prompt = f"""### Instruction
{instruction}
### Input Text (Part {i} of {len(chunks)})
{chunk}
### End of Input Text
Please provide your summary below:"""
prompts.append(prompt)
return prompts
def process_with_model(
prompt: str,
model_choice: str,
api_key: Optional[str] = None,
oauth_token: Optional[str] = None
) -> str:
"""
Process text with selected model.
Args:
prompt: Input prompt
model_choice: Selected model name
api_key: OpenAI API key for GPT models
oauth_token: Hugging Face token for other models
Returns:
Generated summary
Raises:
ValueError: If required credentials are missing
RuntimeError: If model processing fails
"""
try:
if 'gpt' in model_choice.lower():
if not api_key:
raise ValueError("OpenAI API key required for GPT models")
openai.api_key = api_key
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo" if "3.5" in model_choice else "gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1500
)
return response.choices[0].message.content
else: # Hugging Face models
if not oauth_token:
raise ValueError("Hugging Face token required")
headers = {"Authorization": f"Bearer {oauth_token}"}
# Map model choice to actual model ID
model_map = {
"Claude-3": "anthropic/claude-3-opus-20240229",
"Mistral": "mistralai/Mixtral-8x7B-Instruct-v0.1"
}
model_id = model_map.get(model_choice)
if not model_id:
raise ValueError(f"Unknown model: {model_choice}")
response = requests.post(
f"https://api-inference.huggingface.co/models/{model_id}",
headers=headers,
json={"inputs": prompt}
)
if response.status_code != 200:
raise RuntimeError(f"Model API error: {response.text}")
return response.json()[0]["generated_text"]
except Exception as e:
logger.error(f"Model processing failed: {str(e)}")
raise RuntimeError(f"Failed to process with model: {str(e)}")
def validate_api_keys(openai_key: Optional[str] = None, hf_token: Optional[str] = None) -> Dict[str, bool]:
"""
Validate API keys for different services.
Args:
openai_key: OpenAI API key
hf_token: Hugging Face token
Returns:
Dictionary with validation results
"""
results = {"openai": False, "huggingface": False}
if openai_key:
try:
openai.api_key = openai_key
openai.Model.list()
results["openai"] = True
except:
pass
if hf_token:
try:
response = requests.get(
"https://huggingface.co/api/models",
headers={"Authorization": f"Bearer {hf_token}"}
)
results["huggingface"] = response.status_code == 200
except:
pass
return results |