Spaces:
Running
Running
import gradio as gr | |
import os | |
import shutil | |
import fitz | |
from PIL import Image | |
import numpy as np | |
import cv2 | |
import pytesseract | |
from pytesseract import Output | |
import zipfile | |
from pdf2image import convert_from_path | |
# [Keep all the helper functions from the original code] | |
def convert_to_rgb(image_path): | |
img = Image.open(image_path) | |
rgb_img = img.convert("RGB") | |
return rgb_img | |
def preprocess_image(image): | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
denoised = cv2.fastNlMeansDenoising(binary, None, 30, 7, 21) | |
resized = cv2.resize(denoised, None, fx=2, fy=2, interpolation=cv2.INTER_CUBIC) | |
return resized | |
def extract_vertical_blocks(image): | |
image_np = np.array(image) | |
data = pytesseract.image_to_data(image_np, lang='fra', output_type=Output.DICT) | |
blocks = [] | |
current_block = "" | |
current_block_coords = [float('inf'), float('inf'), 0, 0] | |
last_bottom = -1 | |
line_height = 0 | |
for i in range(len(data['text'])): | |
if int(data['conf'][i]) > 0: | |
text = data['text'][i] | |
x, y, w, h = data['left'][i], data['top'][i], data['width'][i], data['height'][i] | |
if line_height == 0: | |
line_height = h * 1.2 | |
if y > last_bottom + line_height: | |
if current_block: | |
blocks.append({ | |
"text": current_block.strip(), | |
"coords": current_block_coords | |
}) | |
current_block = "" | |
current_block_coords = [float('inf'), float('inf'), 0, 0] | |
current_block += text + " " | |
current_block_coords[0] = min(current_block_coords[0], x) | |
current_block_coords[1] = min(current_block_coords[1], y) | |
current_block_coords[2] = max(current_block_coords[2], x + w) | |
current_block_coords[3] = max(current_block_coords[3], y + h) | |
last_bottom = y + h | |
if current_block: | |
blocks.append({ | |
"text": current_block.strip(), | |
"coords": current_block_coords | |
}) | |
return blocks | |
def draw_blocks_on_image(image_path, blocks, output_path): | |
image = cv2.imread(image_path) | |
for block in blocks: | |
coords = block['coords'] | |
cv2.rectangle(image, (coords[0], coords[1]), (coords[2], coords[3]), (0, 0, 255), 2) | |
cv2.imwrite(output_path, image) | |
return output_path | |
def process_image(image, output_folder, page_number): | |
image = convert_to_rgb(image) | |
blocks = extract_vertical_blocks(image) | |
base_name = f'page_{page_number + 1}.png' | |
image_path = os.path.join(output_folder, base_name) | |
image.save(image_path) | |
annotated_image_path = os.path.join(output_folder, f'annotated_{base_name}') | |
annotated_image_path = draw_blocks_on_image(image_path, blocks, annotated_image_path) | |
return blocks, annotated_image_path | |
def save_extracted_text(blocks, page_number, output_folder): | |
text_file_path = os.path.join(output_folder, 'extracted_text.txt') | |
with open(text_file_path, 'a', encoding='utf-8') as f: | |
f.write(f"[PAGE {page_number}]\n") | |
for block in blocks: | |
f.write(block['text'] + "\n") | |
f.write(f"[FIN DE PAGE {page_number}]\n\n") | |
return text_file_path | |
# Modified process_pdf function with better temp file handling | |
def process_pdf(pdf_file): | |
# Create unique temporary working directory | |
temp_dir = os.path.join(os.getcwd(), "temp_processing") | |
output_dir = os.path.join(temp_dir, 'output_images') | |
# Clean up any existing temp directories | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir) | |
os.makedirs(output_dir, exist_ok=True) | |
try: | |
# Convert PDF to images | |
images = convert_from_path(pdf_file.name) | |
# Process each image | |
annotated_images = [] | |
for i, img in enumerate(images): | |
# Save temporary image | |
temp_img_path = os.path.join(temp_dir, f'temp_page_{i}.png') | |
img.save(temp_img_path) | |
# Process the image | |
blocks, annotated_image_path = process_image(temp_img_path, output_dir, i) | |
annotated_images.append(annotated_image_path) | |
save_extracted_text(blocks, i + 1, output_dir) | |
# Create ZIP file of annotated images | |
zip_path = os.path.join(temp_dir, "annotated_images.zip") | |
with zipfile.ZipFile(zip_path, 'w') as zipf: | |
for img_path in annotated_images: | |
zipf.write(img_path, os.path.basename(img_path)) | |
# Get the text file | |
text_file_path = os.path.join(output_dir, 'extracted_text.txt') | |
# Read the files into memory before cleanup | |
with open(text_file_path, 'rb') as f: | |
text_content = f.read() | |
with open(zip_path, 'rb') as f: | |
zip_content = f.read() | |
return (text_file_path, zip_path) | |
except Exception as e: | |
raise gr.Error(f"Error processing PDF: {str(e)}") | |
finally: | |
# Clean up will be handled by Hugging Face Spaces | |
pass | |
# Create Gradio interface with theme and better styling | |
css = """ | |
.gradio-container { | |
font-family: 'IBM Plex Sans', sans-serif; | |
} | |
.gr-button { | |
color: white; | |
border-radius: 8px; | |
background: linear-gradient(45deg, #7928CA, #FF0080); | |
border: none; | |
} | |
""" | |
# Create Gradio interface | |
demo = gr.Interface( | |
fn=process_pdf, | |
inputs=[ | |
gr.File( | |
label="Upload PDF Document", | |
file_types=[".pdf"], | |
type="filepath" | |
) | |
], | |
outputs=[ | |
gr.File(label="Extracted Text (TXT)"), | |
gr.File(label="Annotated Images (ZIP)") | |
], | |
title="PDF Text Extraction and Annotation", | |
description=""" | |
Upload a PDF document to: | |
1. Extract text content | |
2. Get annotated images showing detected text blocks | |
Supports multiple pages and French language text. | |
""", | |
article="Created by [Your Name] - [Your GitHub/Profile Link]", | |
css=css, | |
examples=[], # Add example PDFs if you have any | |
cache_examples=False, | |
theme=gr.themes.Soft() | |
) | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch() |