MatteoScript's picture
Update app.py
d5a81fe verified
import streamlit as st
import time
import google.generativeai as genai
from pydantic import ValidationError
import mimetypes
import os
import settings_ai
from settings_ai import Documento, Articolo
import pandas as pd
from PyPDF2 import PdfReader, PdfWriter
import json
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeDocumentRequest
from streamlit_pdf_viewer import pdf_viewer
import io
from PyPDF2 import PdfReader, PdfWriter
import fitz
import re
import io
from collections import Counter
import secrets
st.set_page_config(page_title="Import Fatture AI✨")
from streamlit_google_auth import Authenticate
google_auth_str = os.getenv("JSON_GOOGLE_SECRET")
google_auth_data = json.loads(google_auth_str)
with open("google_credentials.json", "w") as f:
json.dump(google_auth_data, f, indent=4)
authenticator = Authenticate(
secret_credentials_path='google_credentials.json',
cookie_name=f"llm_pdf_digest_{secrets.token_hex(4)}",
cookie_key=secrets.token_hex(32),
redirect_uri=os.getenv("URL_REDIRECT"),
)
st.title("Import Fatture AI ✨")
with st.expander("Guida completa"):
st.write("""Questa applicazione Python, basata su Streamlit, integra servizi di intelligenza artificiale di Gemini per automatizzare l'estrazione e la validazione dei dati dalle fatture. Il sistema gestisce documenti in vari formati (PDF, immagini) e li elabora in maniera modulare per facilitare la conversione e la verifica delle informazioni.
## Funzionalità Principali
- **Caricamento e Gestione dei Documenti**
- Supporta il caricamento di file PDF, JPG, JPEG e PNG tramite un’interfaccia Streamlit.
- Se il file è un PDF con più pagine, viene suddiviso in sezioni (configurabile tramite uno slider) per una gestione più efficace. Più il numero è basso più il risultato è preciso.
- **Conversione dei Dati**
- **Upload e Inoltro a Gemini**: I file vengono caricati e inviati al rispettivo servizio AI.
- **Estrazione dei Dati**: Il sistema invia il documento a un modello di generazione AI per ottenere una rappresentazione JSON contenente i dati (ad es. numero di documento, data, totale imponibile e articoli).
- **Validazione e Verifica**
- **Validazione JSON**: Utilizza Pydantic per verificare la correttezza della struttura e dei dati estratti. In caso di errori, il documento viene riprocessato fino a 3 volte per cercare di correggere le anomalie.
- **Verifica Incrociata dei Contenuti**: Per i PDF, viene estratto il testo con PyPDF2 e confrontato con i codici articolo per assicurarsi che i dati siano effettivamente presenti nel documento.
- **Filtraggio Articoli**: Vengono mantenuti solo gli articoli compatibili con i criteri specifici (codici articolo e importi non nulli).
- **Visualizzazione e Highlighting**
- I dati validati vengono mostrati in formato tabellare e in JSON.
- Se il documento è un PDF, il sistema evidenzia graficamente (con rettangoli rossi) i testi relativi agli articoli compatibili, semplificando il controllo visivo.
## Avvertenze per l'Operatore
- **Controllo Manuale Obbligatorio**:
L'operatore **deve sempre verificare** i dati elaborati.
Nonostante il riprocessamento automatico in caso di errori, è fondamentale controllare la correttezza dei dati estratti e la validità degli articoli.
- **Validazione dei Dati**:
L'operazione di validazione tramite Pydantic e la verifica incrociata sul contenuto del PDF sono cruciali.
Assicurarsi che non vi siano discrepanze, specialmente nei casi in cui alcuni articoli risultino non verificati.
""")
st.write("📄 **Legge più PDF:** Carica più file PDF contemporaneamente")
st.write("🤖 **Sfrutta l'AI di Gemini:** Per ogni documento, estrae i dati in formato JSON e in formato tabellare.")
st.write("✅ **Mostra Articoli Compatibili:** Filtra e visualizza solo gli articoli che rispettano i criteri richiesti.")
st.write("🔍 **Anteprima Documento:** Visualizza un'anteprima del documento evidenziando gli articoli compatibili.")
authenticator.check_authentification()
authenticator.login()
if not st.session_state.get('connected'):
with st.sidebar:
st.title("Login")
st.write("Seleziona l'account aziendale per accedere")
st.stop()
GENERATION_CONFIG = settings_ai.GENERATION_CONFIG
SYSTEM_INSTRUCTION = settings_ai.SYSTEM_INSTRUCTION
USER_MESSAGE = settings_ai.USER_MESSAGE
API_KEY_GEMINI = settings_ai.API_KEY_GEMINI
# Configura il modello Gemini
genai.configure(api_key=API_KEY_GEMINI)
model = genai.GenerativeModel(
model_name="gemini-2.0-flash",
generation_config=GENERATION_CONFIG,
system_instruction=SYSTEM_INSTRUCTION
)
# Upload File a GEMINI
def upload_to_gemini(path: str, mime_type: str = None):
"""Carica un file su Gemini e ne ritorna l'oggetto file."""
file = genai.upload_file(path, mime_type=mime_type)
print(f"Uploaded file '{file.display_name}' as: {file.uri}")
return file
# Attesa Upload Files
def wait_for_files_active(files):
"""Attende che i file siano nello stato ACTIVE su Gemini."""
print("Waiting for file processing...")
for name in (f.name for f in files):
file_status = genai.get_file(name)
while file_status.state.name == "PROCESSING":
print(".", end="", flush=True)
time.sleep(10)
file_status = genai.get_file(name)
if file_status.state.name != "ACTIVE":
raise Exception(f"File {file_status.name} failed to process")
print("\n...all files ready")
# Chiamata API Gemini
def send_message_to_gemini(chat_session, message, max_attempts=3):
"""Tenta di inviare il messaggio tramite la chat_session, riprovando fino a max_attempts in caso di eccezioni, con un delay di 10 secondi tra i tentativi. """
for attempt in range(max_attempts):
try:
print(f"Generazione AI con PROMPT: {message}")
response_local = chat_session.send_message(message)
return response_local
except Exception as e:
print(f"Errore in send_message (tentativo {attempt+1}/{max_attempts}): {e}")
if attempt < max_attempts - 1:
print("Riprovo tra 10 secondi...")
time.sleep(10)
raise RuntimeError(f"Invio messaggio fallito dopo {max_attempts} tentativi.")
# Unisce i rettangoli evidenziati (se codice e descrizione stanno su linee diverse viene mostrato un solo rettangolo evidenziato)
def merge_intervals(intervals):
"""Unisce gli intervalli sovrapposti. Gli intervalli sono tuple (y0, y1)."""
if not intervals:
return []
intervals.sort(key=lambda x: x[0])
merged = [intervals[0]]
for current in intervals[1:]:
last = merged[-1]
if current[0] <= last[1]:
merged[-1] = (last[0], max(last[1], current[1]))
else:
merged.append(current)
return merged
# Evidenzia le corrispondenze
def highlight_text_in_pdf(input_pdf_bytes, text_list):
"""Crea rettangoli rossi che evidenziano i testi trovati, unendo gli intervalli sovrapposti in un unico rettangolo. """
pdf_document = fitz.open(stream=input_pdf_bytes, filetype="pdf")
patterns = []
for text in text_list:
text_pattern = re.escape(text).replace(r'\ ', r'\s*')
patterns.append(text_pattern)
pattern = r'\b(' + '|'.join(patterns) + r')\b'
regex = re.compile(pattern, re.IGNORECASE)
highlight_color = (1, 0, 0) # rosso
for page in pdf_document:
page_text = page.get_text()
matches = list(regex.finditer(page_text))
intervals = []
for match in matches:
match_text = match.group(0)
text_instances = page.search_for(match_text)
for inst in text_instances:
text_height = inst.y1 - inst.y0
new_y0 = inst.y0 - 0.1 * text_height
new_y1 = inst.y1 + 0.1 * text_height
intervals.append((new_y0, new_y1))
merged_intervals = merge_intervals(intervals)
for y0, y1 in merged_intervals:
full_width_rect = fitz.Rect(page.rect.x0, y0, page.rect.x1, y1)
rect_annot = page.add_rect_annot(full_width_rect)
rect_annot.set_colors(stroke=highlight_color, fill=highlight_color)
rect_annot.set_opacity(0.15)
rect_annot.update()
output_stream = io.BytesIO()
pdf_document.save(output_stream)
pdf_document.close()
output_stream.seek(0)
return output_stream
# Formattazione Euro
def format_euro(amount):
formatted = f"{amount:,.2f}"
formatted = formatted.replace(",", "X").replace(".", ",").replace("X", ".")
return f"€ {formatted}"
# Testo da PDF
def pdf_to_text(path_file: str) -> str:
""" Estrae e concatena il testo da tutte le pagine del PDF. """
reader = PdfReader(path_file)
full_text = ""
for page in reader.pages:
page_text = page.extract_text() or ""
full_text += page_text + "\n"
return full_text
# Funzione che verifica se gli articoli sono corretti (facendo un partsing PDF to TEXT)
def verify_articles(file_path: str, chunk_document):
''' La funzione trasforma il PDF in TESTO e cerca se ogni articolo è presente (al netto degli spazi) '''
if not file_path.lower().endswith(".pdf"):
for articolo in chunk_document.Articoli:
articolo.Verificato = 2
return None
pdf_text = pdf_to_text(file_path)
if '□' in pdf_text:
for articolo in chunk_document.Articoli:
articolo.Verificato = 2
return None
for articolo in chunk_document.Articoli:
articolo.Verificato = 1 if articolo.CodiceArticolo and (articolo.CodiceArticolo in pdf_text) else 0
if articolo.Verificato == 0:
articolo.Verificato = 1 if articolo.CodiceArticolo and (articolo.CodiceArticolo.replace(" ", "") in pdf_text.replace(" ", "")) else 0
if not any(articolo.Verificato == 0 for articolo in chunk_document.Articoli):
return None
unverified_articles = [articolo for articolo in chunk_document.Articoli if articolo.Verificato == 0]
json_unverified_articles = json.dumps([articolo.model_dump() for articolo in unverified_articles])
return json_unverified_articles
# Funzione ausiliaria che elabora un file (o un chunk) inviandolo tramite send_message_to_gemini
def process_document_splitted(file_path: str, chunk_label: str, use_azure: bool = False) -> Documento:
""" Elabora il file (o il chunk) inviandolo a Gemini e processando la risposta:
- Determina il mime type.
- Effettua l'upload tramite upload_to_gemini e attende che il file sia attivo.
- Avvia una chat con il file caricato e invia il messaggio utente.
- Tenta fino a 3 volte di validare il JSON ottenuto, filtrando gli Articoli.
- Se presenti errori, RIPROCESSA il documento 3 volta passando il risultato precedente, In questo modo riesce a gestire gli errori in modo più preciso!
Ritorna l'istanza di Documento validata. """
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type is None:
mime_type = "application/octet-stream"
if not use_azure:
files = [upload_to_gemini(file_path, mime_type=mime_type)]
wait_for_files_active(files)
chat_history = [{ "role": "user","parts": [files[0]]}]
chat_session = model.start_chat(history=chat_history)
max_validation_attempts = 3
max_number_reprocess = 3
chunk_document = None
for i in range(max_number_reprocess):
print(f"Reprocessamento {i+1} di {max_number_reprocess} per il chunk {chunk_label}")
response = None
for attempt in range(max_validation_attempts):
message = USER_MESSAGE
if i > 0:
message += f". Attenzione, RIPROVA perché i seguenti articoli sono da ESCLUDERE in quanto ERRATI! {json_unverified_articles}"
if not use_azure:
response = send_message_to_gemini(chat_session, message)
else:
chunk_document = analyze_invoice_azure(file_path)
try:
if not use_azure:
chunk_document = Documento.model_validate_json(response.text)
chunk_document.Articoli = [
art for art in chunk_document.Articoli
if art.CodiceArticolo.startswith(("AVE", "AV", "3V", "44"))
and art.TotaleNonIvato != 0
and art.CodiceArticolo not in ("AVE", "AV", "3V", "44")
]
break
except ValidationError as ve:
print(f"Errore di validazione {chunk_label} (tentativo {attempt+1}/{max_validation_attempts}): {ve}")
if attempt < max_validation_attempts - 1:
print("Riprovo tra 5 secondi...")
time.sleep(5)
else:
raise RuntimeError(f"Superato il numero massimo di tentativi di validazione {chunk_label}.")
json_unverified_articles = verify_articles(file_path, chunk_document)
if not json_unverified_articles:
return chunk_document
return chunk_document
# Funzione principale che elabora il documento
def process_document(path_file: str, number_pages_split: int, use_azure: bool = False) -> Documento:
""" Elabora il documento in base al tipo di file:
1. Se il file non è un PDF, lo tratta "così com'è" (ad esempio, come immagine) e ne processa il contenuto tramite process_document_splitted.
2. Se il file è un PDF e contiene più di 5 pagine, lo divide in chunk da 5 pagine.
Per ogni chunk viene effettuato l’upload, viene elaborato il JSON e validato.
I chunk successivi vengono aggregati nel Documento finale e, se il PDF ha più
di 5 pagine, al termine il campo TotaleMerce viene aggiornato con il valore riportato dall’ultimo chunk. """
mime_type, _ = mimetypes.guess_type(path_file)
if mime_type is None:
mime_type = "application/octet-stream"
if use_azure:
number_pages_split = 2
if not path_file.lower().endswith(".pdf"):
print("File non PDF: elaborazione come immagine.")
documento_finale = process_document_splitted(path_file, chunk_label="(immagine)", use_azure=use_azure)
return documento_finale
reader = PdfReader(path_file)
total_pages = len(reader.pages)
documento_finale = None
ultimo_totale_merce = None
for chunk_index in range(0, total_pages, number_pages_split):
writer = PdfWriter()
for page in reader.pages[chunk_index:chunk_index + number_pages_split]:
writer.add_page(page)
temp_filename = "temp_chunk_"
if use_azure:
temp_filename+="azure_"
temp_filename += f"{chunk_index // number_pages_split}.pdf"
with open(temp_filename, "wb") as temp_file:
writer.write(temp_file)
chunk_label = f"(chunk {chunk_index // number_pages_split})"
chunk_document = process_document_splitted(temp_filename, chunk_label=chunk_label, use_azure=use_azure)
if hasattr(chunk_document, "TotaleImponibile"):
ultimo_totale_merce = chunk_document.TotaleImponibile
if documento_finale is None:
documento_finale = chunk_document
else:
documento_finale.Articoli.extend(chunk_document.Articoli)
os.remove(temp_filename)
if total_pages > number_pages_split and ultimo_totale_merce is not None:
documento_finale.TotaleImponibile = ultimo_totale_merce
if documento_finale is None:
raise RuntimeError("Nessun documento elaborato.")
# Controlli aggiuntivi: Se esiste un AVE non possono esistere altri articoli non ave. Se articoli DOPPI segnalo!
if any(articolo.CodiceArticolo.startswith("AVE") for articolo in documento_finale.Articoli):
documento_finale.Articoli = [articolo for articolo in documento_finale.Articoli if articolo.CodiceArticolo.startswith("AVE")]
combinazioni = [(articolo.CodiceArticolo, articolo.TotaleNonIvato) for articolo in documento_finale.Articoli]
conta_combinazioni = Counter(combinazioni)
for articolo in documento_finale.Articoli:
if conta_combinazioni[(articolo.CodiceArticolo, articolo.TotaleNonIvato)] > 1:
articolo.Verificato = False
return documento_finale
# Analizza Fattura con AZURE
def analyze_invoice_azure(file_path: str):
"""Invia il file (dal percorso specificato) al servizio prebuilt-invoice e restituisce il risultato dell'analisi."""
# Apri il file in modalità binaria e leggi il contenuto
with open(file_path, "rb") as file:
file_data = file.read()
client = DocumentIntelligenceClient(endpoint=settings_ai.ENDPOINT_AZURE, credential=AzureKeyCredential(settings_ai.API_AZURE))
poller = client.begin_analyze_document("prebuilt-invoice", body=file_data)
result = poller.result()
return parse_invoice_to_documento_azure(result)
# Parsing Fattura con AZURE
def parse_invoice_to_documento_azure(result) -> Documento:
""" Parssa il risultato dell'analisi e mappa i campi rilevanti nel modello Pydantic Documento. """
if not result.documents:
raise ValueError("Nessun documento analizzato trovato.")
invoice = result.documents[0]
invoice_id_field = invoice.fields.get("InvoiceId")
numero_documento = invoice_id_field.value_string if invoice_id_field and invoice_id_field.value_string else ""
invoice_date_field = invoice.fields.get("InvoiceDate")
data_str = invoice_date_field.value_date.isoformat() if invoice_date_field and invoice_date_field.value_date else ""
subtotal_field = invoice.fields.get("SubTotal")
if subtotal_field and subtotal_field.value_currency:
totale_imponibile = subtotal_field.value_currency.amount
else:
invoice_total_field = invoice.fields.get("InvoiceTotal")
totale_imponibile = invoice_total_field.value_currency.amount if invoice_total_field and invoice_total_field.value_currency else 0.0
articoli = []
items_field = invoice.fields.get("Items")
if items_field and items_field.value_array:
for item in items_field.value_array:
product_code_field = item.value_object.get("ProductCode")
codice_articolo = product_code_field.value_string if product_code_field and product_code_field.value_string else ""
amount_field = item.value_object.get("Amount")
totale_non_ivato = amount_field.value_currency.amount if amount_field and amount_field.value_currency else 0.0
articolo = Articolo(
CodiceArticolo=codice_articolo,
TotaleNonIvato=totale_non_ivato,
Verificato=None
)
articoli.append(articolo)
documento = Documento(
TipoDocumento="Fattura",
NumeroDocumento=numero_documento,
Data=data_str,
TotaleImponibile=totale_imponibile,
Articoli=articoli
)
return documento
# Front-End con Streamlit
def main():
#st.set_page_config(page_title="Import Fatture AI", page_icon="✨")
st.sidebar.title("Caricamento File")
uploaded_files = st.sidebar.file_uploader("Seleziona uno o più PDF", type=["pdf", "jpg", "jpeg", "png"], accept_multiple_files=True)
model_ai = st.sidebar.selectbox("Modello", ['Gemini Flash 2.0']) # 'Azure Intelligence'])
use_azure = True if model_ai == 'Azure Intelligence' else False
number_pages_split = st.sidebar.slider('Split Pagine', 1, 30, 2, help="Numero suddivisione pagine del PDF. Più il numero è basso e più il modello AI è preciso, più è alto più è veloce")
if st.sidebar.button("Importa", type="primary", use_container_width=True):
if not uploaded_files:
st.warning("Nessun file caricato!")
else:
for uploaded_file in uploaded_files:
st.subheader(f"📄 {uploaded_file.name}")
file_path = uploaded_file.name
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
with st.spinner(f"Elaborazione in corso"):
try:
doc = process_document(uploaded_file.name, number_pages_split, use_azure=use_azure)
totale_non_ivato_verificato = sum(articolo.TotaleNonIvato for articolo in doc.Articoli if articolo.Verificato == 1)
totale_non_ivato_non_verificato = sum(articolo.TotaleNonIvato for articolo in doc.Articoli if articolo.Verificato != 1)
totale_non_ivato = totale_non_ivato_verificato + totale_non_ivato_non_verificato
st.write(
f"- **Tipo**: {doc.TipoDocumento}\n"
f"- **Numero**: {doc.NumeroDocumento}\n"
f"- **Data**: {doc.Data}\n"
f"- **Articoli Compatibili**: {len(doc.Articoli)}\n"
f"- **Totale Documento**: {format_euro(doc.TotaleImponibile)}\n"
)
if totale_non_ivato_non_verificato > 0:
st.error(f"Totale Ave Non Verificato: {format_euro(totale_non_ivato_verificato)}")
elif totale_non_ivato != 0:
st.success(f"Totale Ave Verificato: {format_euro(totale_non_ivato_verificato)}")
df = pd.DataFrame([{k: v for k, v in Articolo.model_dump().items() if k != ""} for Articolo in doc.Articoli])
if 'Verificato' in df.columns:
df['Verificato'] = df['Verificato'].apply(lambda x: "✅" if x == 1 else "❌" if x == 0 else "❓" if x == 2 else x)
if totale_non_ivato > 0:
st.dataframe(df, use_container_width=True ,column_config={"TotaleNonIvato": st.column_config.NumberColumn("Totale non Ivato",format="€ %.2f")})
st.json(doc.model_dump(), expanded=False)
if totale_non_ivato == 0:
st.info(f"Non sono presenti articoli 'AVE'")
if uploaded_file and file_path.lower().endswith(".pdf"):
list_art = list_art = [articolo.CodiceArticolo for articolo in doc.Articoli] + [articolo.DescrizioneArticolo for articolo in doc.Articoli]
if list_art:
new_pdf = highlight_text_in_pdf(uploaded_file.getvalue(), list_art)
pdf_viewer(input=new_pdf.getvalue(), width=1200)
else:
pdf_viewer(input=uploaded_file.getvalue(), width=1200)
else:
st.image(file_path)
st.divider()
except Exception as e:
st.error(f"Errore durante l'elaborazione di {uploaded_file.name}: {e}")
finally:
if os.path.exists(file_path):
os.remove(file_path)
if __name__ == "__main__":
st.divider()
main()