Spaces:
Running
Running
import logging | |
from pathlib import Path | |
from typing import List, Dict, Union, Optional | |
import re | |
import openai | |
import requests | |
from PyPDF2 import PdfReader | |
from gradio_client import Client | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
def extract_text_from_pdf(file_path: str) -> str: | |
""" | |
Extract text from a PDF file with robust error handling. | |
Args: | |
file_path: Path to the PDF file | |
Returns: | |
Extracted text as a string | |
Raises: | |
ValueError: If file doesn't exist or isn't readable | |
RuntimeError: If text extraction fails | |
""" | |
try: | |
if not Path(file_path).exists(): | |
raise ValueError(f"PDF file not found: {file_path}") | |
reader = PdfReader(file_path) | |
text_content = [] | |
for page_num, page in enumerate(reader.pages, 1): | |
try: | |
text = page.extract_text() | |
if text.strip(): | |
text_content.append(text) | |
else: | |
logger.warning(f"Page {page_num} appears to be empty or unreadable") | |
except Exception as e: | |
logger.error(f"Error extracting text from page {page_num}: {str(e)}") | |
continue | |
if not text_content: | |
raise RuntimeError("No readable text found in PDF") | |
return "\n\n".join(text_content) | |
except Exception as e: | |
logger.error(f"PDF extraction failed: {str(e)}") | |
raise RuntimeError(f"Failed to process PDF: {str(e)}") | |
def format_content(text: str, format_type: str) -> str: | |
""" | |
Format extracted text into the specified output format. | |
Args: | |
text: Raw text content | |
format_type: Output format ('txt', 'md', 'html') | |
Returns: | |
Formatted text string | |
Raises: | |
ValueError: If format type is invalid | |
""" | |
if not isinstance(text, str): | |
raise ValueError("Input text must be a string") | |
# Clean up common PDF extraction artifacts | |
text = re.sub(r'\s+', ' ', text) # Normalize whitespace | |
text = re.sub(r'(?<=[.!?])\s+', '\n\n', text) # Split sentences into paragraphs | |
text = text.strip() | |
if format_type.lower() == 'txt': | |
return text | |
elif format_type.lower() == 'md': | |
paragraphs = text.split('\n\n') | |
md_text = [] | |
for para in paragraphs: | |
# Detect and format headers | |
if re.match(r'^[A-Z][^.!?]*$', para.strip()): | |
md_text.append(f"## {para.strip()}") | |
else: | |
md_text.append(para.strip()) | |
return '\n\n'.join(md_text) | |
elif format_type.lower() == 'html': | |
paragraphs = text.split('\n\n') | |
html_parts = ['<!DOCTYPE html>', '<html>', '<body>'] | |
for para in paragraphs: | |
if re.match(r'^[A-Z][^.!?]*$', para.strip()): | |
html_parts.append(f"<h2>{para.strip()}</h2>") | |
else: | |
html_parts.append(f"<p>{para.strip()}</p>") | |
html_parts.extend(['</body>', '</html>']) | |
return '\n'.join(html_parts) | |
else: | |
raise ValueError(f"Unsupported format type: {format_type}") | |
def split_into_snippets(text: str, chunk_size: int = 4000, overlap: int = 200) -> List[str]: | |
""" | |
Split text into overlapping chunks that fit within model context windows. | |
Args: | |
text: Input text to split | |
chunk_size: Maximum size of each chunk | |
overlap: Number of characters to overlap between chunks | |
Returns: | |
List of text snippets | |
Raises: | |
ValueError: If chunk_size is too small or text is empty | |
""" | |
if not text: | |
raise ValueError("Input text is empty") | |
if chunk_size < 1000: | |
raise ValueError("Chunk size must be at least 1000 characters") | |
# Split into paragraphs first | |
paragraphs = text.split('\n\n') | |
chunks = [] | |
current_chunk = [] | |
current_size = 0 | |
for para in paragraphs: | |
para_size = len(para) | |
if current_size + para_size <= chunk_size: | |
current_chunk.append(para) | |
current_size += para_size + 2 # +2 for newlines | |
else: | |
if current_chunk: | |
chunks.append('\n\n'.join(current_chunk)) | |
# Start new chunk with overlap | |
if chunks: | |
overlap_text = chunks[-1][-overlap:] if overlap > 0 else "" | |
current_chunk = [overlap_text, para] | |
current_size = len(overlap_text) + para_size + 2 | |
else: | |
current_chunk = [para] | |
current_size = para_size | |
# Add the last chunk if it exists | |
if current_chunk: | |
chunks.append('\n\n'.join(current_chunk)) | |
return chunks | |
def build_prompts(chunks: List[str], custom_prompt: Optional[str] = None) -> List[str]: | |
""" | |
Build formatted prompts for each text chunk. | |
Args: | |
chunks: List of text chunks | |
custom_prompt: Optional custom instruction | |
Returns: | |
List of formatted prompt strings | |
""" | |
default_prompt = """Please analyze and summarize the following text. Focus on: | |
1. Key points and main ideas | |
2. Important details and supporting evidence | |
3. Any conclusions or recommendations | |
Please maintain the original meaning while being concise.""" | |
instruction = custom_prompt if custom_prompt else default_prompt | |
prompts = [] | |
for i, chunk in enumerate(chunks, 1): | |
prompt = f"""### Instruction | |
{instruction} | |
### Input Text (Part {i} of {len(chunks)}) | |
{chunk} | |
### End of Input Text | |
Please provide your summary below:""" | |
prompts.append(prompt) | |
return prompts | |
def process_with_model( | |
prompt: str, | |
model_choice: str, | |
api_key: Optional[str] = None, | |
oauth_token: Optional[str] = None | |
) -> str: | |
""" | |
Process text with selected model. | |
Args: | |
prompt: Input prompt | |
model_choice: Selected model name | |
api_key: OpenAI API key for GPT models | |
oauth_token: Hugging Face token for other models | |
Returns: | |
Generated summary | |
Raises: | |
ValueError: If required credentials are missing | |
RuntimeError: If model processing fails | |
""" | |
try: | |
if 'gpt' in model_choice.lower(): | |
if not api_key: | |
raise ValueError("OpenAI API key required for GPT models") | |
openai.api_key = api_key | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo" if "3.5" in model_choice else "gpt-4", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.7, | |
max_tokens=1500 | |
) | |
return response.choices[0].message.content | |
else: # Hugging Face models | |
if not oauth_token: | |
raise ValueError("Hugging Face token required") | |
headers = {"Authorization": f"Bearer {oauth_token}"} | |
# Map model choice to actual model ID | |
model_map = { | |
"Claude-3": "anthropic/claude-3-opus-20240229", | |
"Mistral": "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
} | |
model_id = model_map.get(model_choice) | |
if not model_id: | |
raise ValueError(f"Unknown model: {model_choice}") | |
response = requests.post( | |
f"https://api-inference.huggingface.co/models/{model_id}", | |
headers=headers, | |
json={"inputs": prompt} | |
) | |
if response.status_code != 200: | |
raise RuntimeError(f"Model API error: {response.text}") | |
return response.json()[0]["generated_text"] | |
except Exception as e: | |
logger.error(f"Model processing failed: {str(e)}") | |
raise RuntimeError(f"Failed to process with model: {str(e)}") | |
def validate_api_keys(openai_key: Optional[str] = None, hf_token: Optional[str] = None) -> Dict[str, bool]: | |
""" | |
Validate API keys for different services. | |
Args: | |
openai_key: OpenAI API key | |
hf_token: Hugging Face token | |
Returns: | |
Dictionary with validation results | |
""" | |
results = {"openai": False, "huggingface": False} | |
if openai_key: | |
try: | |
openai.api_key = openai_key | |
openai.Model.list() | |
results["openai"] = True | |
except: | |
pass | |
if hf_token: | |
try: | |
response = requests.get( | |
"https://huggingface.co/api/models", | |
headers={"Authorization": f"Bearer {hf_token}"} | |
) | |
results["huggingface"] = response.status_code == 200 | |
except: | |
pass | |
return results |