arquiteturia / app.py
pierreguillou's picture
Update app.py
1f9a729 verified
raw
history blame
9.2 kB
import gradio as gr
import os
import shutil
import fitz
from PIL import Image
import numpy as np
import cv2
import pytesseract
from pytesseract import Output
import zipfile
from pdf2image import convert_from_path
import google.generativeai as genai
import json
# Helper Functions
def convert_to_rgb(image_path):
img = Image.open(image_path)
rgb_img = img.convert("RGB")
return rgb_img
def preprocess_image(image):
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
denoised = cv2.fastNlMeansDenoising(binary, None, 30, 7, 21)
resized = cv2.resize(denoised, None, fx=2, fy=2, interpolation=cv2.INTER_CUBIC)
return resized
def extract_vertical_blocks(image):
image_np = np.array(image)
data = pytesseract.image_to_data(image_np, lang='fra', output_type=Output.DICT)
blocks = []
current_block = ""
current_block_coords = [float('inf'), float('inf'), 0, 0]
last_bottom = -1
line_height = 0
for i in range(len(data['text'])):
if int(data['conf'][i]) > 0:
text = data['text'][i]
x, y, w, h = data['left'][i], data['top'][i], data['width'][i], data['height'][i]
if line_height == 0:
line_height = h * 1.2
if y > last_bottom + line_height:
if current_block:
blocks.append({
"text": current_block.strip(),
"coords": current_block_coords
})
current_block = ""
current_block_coords = [float('inf'), float('inf'), 0, 0]
current_block += text + " "
current_block_coords[0] = min(current_block_coords[0], x)
current_block_coords[1] = min(current_block_coords[1], y)
current_block_coords[2] = max(current_block_coords[2], x + w)
current_block_coords[3] = max(current_block_coords[3], y + h)
last_bottom = y + h
if current_block:
blocks.append({
"text": current_block.strip(),
"coords": current_block_coords
})
return blocks
def draw_blocks_on_image(image_path, blocks, output_path):
image = cv2.imread(image_path)
for block in blocks:
coords = block['coords']
cv2.rectangle(image, (coords[0], coords[1]), (coords[2], coords[3]), (0, 0, 255), 2)
cv2.imwrite(output_path, image)
return output_path
def process_image(image, output_folder, page_number):
image = convert_to_rgb(image)
blocks = extract_vertical_blocks(image)
base_name = f'page_{page_number + 1}.png'
image_path = os.path.join(output_folder, base_name)
image.save(image_path)
annotated_image_path = os.path.join(output_folder, f'annotated_{base_name}')
annotated_image_path = draw_blocks_on_image(image_path, blocks, annotated_image_path)
return blocks, annotated_image_path
def save_extracted_text(blocks, page_number, output_folder):
text_file_path = os.path.join(output_folder, 'extracted_text.txt')
with open(text_file_path, 'a', encoding='utf-8') as f:
f.write(f"[PAGE {page_number}]\n")
for block in blocks:
f.write(block['text'] + "\n")
f.write("[FIN DE PAGE]\n\n")
return text_file_path
# Gemini Functions
def initialize_gemini():
try:
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
generation_config = {
"temperature": 1,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": 8192,
"response_mime_type": "text/plain",
}
model = genai.GenerativeModel(
model_name="gemini-1.5-pro",
generation_config=generation_config,
)
return model
except Exception as e:
raise gr.Error(f"Error initializing Gemini: {str(e)}")
def create_prompt(extracted_text: str, path_to_data_to_extract: str) -> str:
# load data to extract
with open(path_to_data_to_extract, 'r', encoding='utf-8') as file:
data_to_extract = json.load(file)
prompt = f"""Tu es un assistant juridique expert en analyse de documents judiciaires français.
Je vais te fournir le contenu d'un document judiciaire extrait d'un PDF.
Ta tâche est d'analyser ce texte et d'en extraire les informations suivantes de manière précise :
{json.dumps(data_to_extract, indent=2, ensure_ascii=False)}
Voici quelques règles à suivre :
- Si une information n'est pas présente dans le texte, indique "Non spécifié" pour cette catégorie.
- Pour les noms des parties (demandeurs et défendeurs, et leurs avocats), liste tous ceux que tu trouves
- Assure-toi de différencier correctement les demandeurs des défendeurs.
- Si tu n'es pas sûr d'une information, indique-le clairement.
Présente tes résultats sous forme de JSON, en utilisant les catégories mentionnées ci-dessus.
Voici le contenu du document :
{extracted_text.strip()}
Analyse ce texte et fournis-moi les informations demandées au format JSON uniquement.""".strip()
return prompt
def extract_data_with_gemini(text_file_path: str, path_to_data_to_extract: str) -> dict:
try:
# Initialize Gemini
model = initialize_gemini()
# Read the extracted text
with open(text_file_path, 'r', encoding='utf-8') as f:
extracted_text = f.read()
# Create prompt and get response
prompt = create_prompt(extracted_text, path_to_data_to_extract)
response = model.generate_content(prompt)
# Parse the JSON response
try:
# Extract JSON from the response text
json_str = response.text
if "json" in json_str.lower():
json_str = json_str.split("json")[1].split("```")[0]
elif "```" in json_str:
json_str = json_str.split("```")[1]
result = json.loads(json_str)
except:
result = {"error": "Failed to parse JSON response", "raw_response": response.text}
return result
except Exception as e:
raise gr.Error(f"Error in Gemini processing: {str(e)}")
# Main Processing Function
def process_pdf(pdf_file):
template_dir = os.path.join(os.getcwd(), "templates")
temp_dir = os.path.join(os.getcwd(), "temp_processing")
output_dir = os.path.join(temp_dir, 'output_images')
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
os.makedirs(output_dir, exist_ok=True)
## JSON of teh data to extract with descriptions
path_to_data_to_extract = os.path.join(template_dir, "data_to_extract.json")
try:
# Convert PDF to images and process
images = convert_from_path(pdf_file.name)
annotated_images = []
for i, img in enumerate(images):
temp_img_path = os.path.join(temp_dir, f'temp_page_{i}.png')
img.save(temp_img_path)
blocks, annotated_image_path = process_image(temp_img_path, output_dir, i)
annotated_images.append(annotated_image_path)
save_extracted_text(blocks, i + 1, output_dir)
# Create ZIP file
zip_path = os.path.join(temp_dir, "annotated_images.zip")
with zipfile.ZipFile(zip_path, 'w') as zipf:
for img_path in annotated_images:
zipf.write(img_path, os.path.basename(img_path))
# Get the text file
text_file_path = os.path.join(output_dir, 'extracted_text.txt')
# Process with Gemini
extracted_data = extract_data_with_gemini(text_file_path, path_to_data_to_extract)
# Save extracted data to JSON file
json_path = os.path.join(temp_dir, "extracted_data.json")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(extracted_data, f, ensure_ascii=False, indent=2)
return text_file_path, zip_path, json_path
except Exception as e:
raise gr.Error(f"Error processing PDF: {str(e)}")
# Gradio Interface
css = """
.gradio-container {
font-family: 'IBM Plex Sans', sans-serif;
}
.gr-button {
color: white;
border-radius: 8px;
background: linear-gradient(45deg, #7928CA, #FF0080);
border: none;
}
"""
demo = gr.Interface(
fn=process_pdf,
inputs=[
gr.File(
label="Upload PDF Document",
file_types=[".pdf"],
type="filepath"
)
],
outputs=[
gr.File(label="Extracted Text (TXT)"),
gr.File(label="Annotated Images (ZIP)"),
gr.File(label="Extracted Data (JSON)")
],
title="PDF Text Extraction and Analysis",
description="""
Upload a PDF document to:
1. Extract text content
2. Get annotated images showing detected text blocks
3. Extract structured data using AI analysis
Supports multiple pages and French legal documents.
""",
article="Created by [Your Name] - [Your GitHub/Profile Link]",
css=css,
examples=[], # Add example PDFs if you have any
cache_examples=False,
theme=gr.themes.Soft()
)
# Launch the app
if __name__ == "__main__":
demo.launch()