import streamlit as st import time import google.generativeai as genai from pydantic import ValidationError import mimetypes import os import settings_ai from settings_ai import Documento, Articolo import pandas as pd from PyPDF2 import PdfReader, PdfWriter import json from azure.core.credentials import AzureKeyCredential from azure.ai.documentintelligence import DocumentIntelligenceClient from azure.ai.documentintelligence.models import AnalyzeDocumentRequest from streamlit_pdf_viewer import pdf_viewer import io from PyPDF2 import PdfReader, PdfWriter import fitz import re import io from collections import Counter st.set_page_config(page_title="Import Fatture AI✨") st.title("Import Fatture AI ✨") # Gestionione LOGIN if "logged" not in st.session_state: st.session_state.logged = False if st.session_state.logged == False: login_placeholder = st.empty() with login_placeholder.container(): container = st.container(border=True) username = container.text_input('Username') password = container.text_input('Passowrd', type='password') login = container.button(' Login ', type='primary') if not login or username != os.getenv("LOGIN_USER") or password != os.getenv("LOGIN_PASSWORD"): if login: st.error('Password Errata') st.stop() st.session_state.logged = True login_placeholder.empty() with st.expander("Guida completa"): st.write("""Questa applicazione Python, basata su Streamlit, integra servizi di intelligenza artificiale di Gemini per automatizzare l'estrazione e la validazione dei dati dalle fatture. Il sistema gestisce documenti in vari formati (PDF, immagini) e li elabora in maniera modulare per facilitare la conversione e la verifica delle informazioni. ## Funzionalità Principali - **Caricamento e Gestione dei Documenti** - Supporta il caricamento di file PDF, JPG, JPEG e PNG tramite un’interfaccia Streamlit. - Se il file è un PDF con più pagine, viene suddiviso in sezioni (configurabile tramite uno slider) per una gestione più efficace. Più il numero è basso più il risultato è preciso. - **Conversione dei Dati** - **Upload e Inoltro a Gemini**: I file vengono caricati e inviati al rispettivo servizio AI. - **Estrazione dei Dati**: Il sistema invia il documento a un modello di generazione AI per ottenere una rappresentazione JSON contenente i dati (ad es. numero di documento, data, totale imponibile e articoli). - **Validazione e Verifica** - **Validazione JSON**: Utilizza Pydantic per verificare la correttezza della struttura e dei dati estratti. In caso di errori, il documento viene riprocessato fino a 3 volte per cercare di correggere le anomalie. - **Verifica Incrociata dei Contenuti**: Per i PDF, viene estratto il testo con PyPDF2 e confrontato con i codici articolo per assicurarsi che i dati siano effettivamente presenti nel documento. - **Filtraggio Articoli**: Vengono mantenuti solo gli articoli compatibili con i criteri specifici (codici articolo e importi non nulli). - **Visualizzazione e Highlighting** - I dati validati vengono mostrati in formato tabellare e in JSON. - Se il documento è un PDF, il sistema evidenzia graficamente (con rettangoli rossi) i testi relativi agli articoli compatibili, semplificando il controllo visivo. ## Avvertenze per l'Operatore - **Controllo Manuale Obbligatorio**: L'operatore **deve sempre verificare** i dati elaborati. Nonostante il riprocessamento automatico in caso di errori, è fondamentale controllare la correttezza dei dati estratti e la validità degli articoli. - **Validazione dei Dati**: L'operazione di validazione tramite Pydantic e la verifica incrociata sul contenuto del PDF sono cruciali. Assicurarsi che non vi siano discrepanze, specialmente nei casi in cui alcuni articoli risultino non verificati. """) st.write("📄 **Legge più PDF:** Carica più file PDF contemporaneamente") st.write("🤖 **Sfrutta l'AI di Gemini:** Per ogni documento, estrae i dati in formato JSON e in formato tabellare.") st.write("✅ **Mostra Articoli Compatibili:** Filtra e visualizza solo gli articoli che rispettano i criteri richiesti.") st.write("🔍 **Anteprima Documento:** Visualizza un'anteprima del documento evidenziando gli articoli compatibili.") GENERATION_CONFIG = settings_ai.GENERATION_CONFIG SYSTEM_INSTRUCTION = settings_ai.SYSTEM_INSTRUCTION USER_MESSAGE = settings_ai.USER_MESSAGE API_KEY_GEMINI = settings_ai.API_KEY_GEMINI # Configura il modello Gemini genai.configure(api_key=API_KEY_GEMINI) model = genai.GenerativeModel( model_name="gemini-2.0-flash", generation_config=GENERATION_CONFIG, system_instruction=SYSTEM_INSTRUCTION ) # Upload File a GEMINI def upload_to_gemini(path: str, mime_type: str = None): """Carica un file su Gemini e ne ritorna l'oggetto file.""" file = genai.upload_file(path, mime_type=mime_type) print(f"Uploaded file '{file.display_name}' as: {file.uri}") return file # Attesa Upload Files def wait_for_files_active(files): """Attende che i file siano nello stato ACTIVE su Gemini.""" print("Waiting for file processing...") for name in (f.name for f in files): file_status = genai.get_file(name) while file_status.state.name == "PROCESSING": print(".", end="", flush=True) time.sleep(10) file_status = genai.get_file(name) if file_status.state.name != "ACTIVE": raise Exception(f"File {file_status.name} failed to process") print("\n...all files ready") # Chiamata API Gemini def send_message_to_gemini(chat_session, message, max_attempts=5): """Tenta di inviare il messaggio tramite la chat_session, riprovando fino a max_attempts in caso di eccezioni, con un delay di 10 secondi tra i tentativi. """ for attempt in range(max_attempts): try: print(f"Generazione AI con PROMPT: {message}") response_local = chat_session.send_message(message) return response_local except Exception as e: print(f"Errore in send_message (tentativo {attempt+1}/{max_attempts}): {e}") if attempt < max_attempts - 1: print("Riprovo tra 10 secondi...") time.sleep(10) raise RuntimeError(f"Invio messaggio fallito dopo {max_attempts} tentativi.") # Unisce i rettangoli evidenziati (se codice e descrizione stanno su linee diverse viene mostrato un solo rettangolo evidenziato) def merge_intervals(intervals): """Unisce gli intervalli sovrapposti. Gli intervalli sono tuple (y0, y1).""" if not intervals: return [] intervals.sort(key=lambda x: x[0]) merged = [intervals[0]] for current in intervals[1:]: last = merged[-1] if current[0] <= last[1]: merged[-1] = (last[0], max(last[1], current[1])) else: merged.append(current) return merged # Evidenzia le corrispondenze def highlight_text_in_pdf(input_pdf_bytes, text_list): """Crea rettangoli rossi che evidenziano i testi trovati, unendo gli intervalli sovrapposti in un unico rettangolo. """ pdf_document = fitz.open(stream=input_pdf_bytes, filetype="pdf") patterns = [] for text in text_list: text_pattern = re.escape(text).replace(r'\ ', r'\s*') patterns.append(text_pattern) pattern = r'\b(' + '|'.join(patterns) + r')\b' regex = re.compile(pattern, re.IGNORECASE) highlight_color = (1, 0, 0) # rosso for page in pdf_document: page_text = page.get_text() matches = list(regex.finditer(page_text)) intervals = [] for match in matches: match_text = match.group(0) text_instances = page.search_for(match_text) for inst in text_instances: text_height = inst.y1 - inst.y0 new_y0 = inst.y0 - 0.1 * text_height new_y1 = inst.y1 + 0.1 * text_height intervals.append((new_y0, new_y1)) merged_intervals = merge_intervals(intervals) for y0, y1 in merged_intervals: full_width_rect = fitz.Rect(page.rect.x0, y0, page.rect.x1, y1) rect_annot = page.add_rect_annot(full_width_rect) rect_annot.set_colors(stroke=highlight_color, fill=highlight_color) rect_annot.set_opacity(0.15) rect_annot.update() output_stream = io.BytesIO() pdf_document.save(output_stream) pdf_document.close() output_stream.seek(0) return output_stream # Formattazione Euro def format_euro(amount): formatted = f"{amount:,.2f}" formatted = formatted.replace(",", "X").replace(".", ",").replace("X", ".") return f"€ {formatted}" # Testo da PDF def pdf_to_text(path_file: str) -> str: """ Estrae e concatena il testo da tutte le pagine del PDF. """ reader = PdfReader(path_file) full_text = "" for page in reader.pages: page_text = page.extract_text() or "" full_text += page_text + "\n" return full_text # Funzione che verifica se gli articoli sono corretti (facendo un partsing PDF to TEXT) def verify_articles(file_path: str, chunk_document): ''' La funzione trasforma il PDF in TESTO e cerca se ogni articolo è presente (al netto degli spazi) ''' if not file_path.lower().endswith(".pdf"): for articolo in chunk_document.Articoli: articolo.Verificato = 2 return None pdf_text = pdf_to_text(file_path) if '□' in pdf_text: for articolo in chunk_document.Articoli: articolo.Verificato = 2 return None for articolo in chunk_document.Articoli: articolo.Verificato = 1 if articolo.CodiceArticolo and (articolo.CodiceArticolo in pdf_text) else 0 if articolo.Verificato == 0: articolo.Verificato = 1 if articolo.CodiceArticolo and (articolo.CodiceArticolo.replace(" ", "") in pdf_text.replace(" ", "")) else 0 if not any(articolo.Verificato == 0 for articolo in chunk_document.Articoli): return None unverified_articles = [articolo for articolo in chunk_document.Articoli if articolo.Verificato == 0] json_unverified_articles = json.dumps([articolo.model_dump() for articolo in unverified_articles]) return json_unverified_articles # Funzione ausiliaria che elabora un file (o un chunk) inviandolo tramite send_message_to_gemini def process_document_splitted(file_path: str, chunk_label: str, use_azure: bool = False) -> Documento: """ Elabora il file (o il chunk) inviandolo a Gemini e processando la risposta: - Determina il mime type. - Effettua l'upload tramite upload_to_gemini e attende che il file sia attivo. - Avvia una chat con il file caricato e invia il messaggio utente. - Tenta fino a 3 volte di validare il JSON ottenuto, filtrando gli Articoli. - Se presenti errori, RIPROCESSA il documento 3 volta passando il risultato precedente, In questo modo riesce a gestire gli errori in modo più preciso! Ritorna l'istanza di Documento validata. """ mime_type, _ = mimetypes.guess_type(file_path) if mime_type is None: mime_type = "application/octet-stream" if not use_azure: files = [upload_to_gemini(file_path, mime_type=mime_type)] wait_for_files_active(files) chat_history = [{ "role": "user","parts": [files[0]]}] for attempt in range(3): try: chat_session = model.start_chat(history=chat_history) break except Exception as e: print(f"Errore nello Start chat") time.sleep(10) max_validation_attempts = 3 max_number_reprocess = 3 chunk_document = None for i in range(max_number_reprocess): print(f"Reprocessamento {i+1} di {max_number_reprocess} per il chunk {chunk_label}") response = None for attempt in range(max_validation_attempts): message = USER_MESSAGE if i > 0: message += f". Attenzione, RIPROVA perché i seguenti articoli sono da ESCLUDERE in quanto ERRATI! {json_unverified_articles}" if not use_azure: response = send_message_to_gemini(chat_session, message) else: chunk_document = analyze_invoice_azure(file_path) try: if not use_azure: chunk_document = Documento.model_validate_json(response.text) chunk_document.Articoli = [ art for art in chunk_document.Articoli if art.CodiceArticolo.startswith(("AVE", "AV", "3V", "44")) and art.TotaleNonIvato != 0 and art.CodiceArticolo not in ("AVE", "AV", "3V", "44") ] break except ValidationError as ve: print(f"Errore di validazione {chunk_label} (tentativo {attempt+1}/{max_validation_attempts}): {ve}") if attempt < max_validation_attempts - 1: print("Riprovo tra 5 secondi...") time.sleep(5) else: raise RuntimeError(f"Superato il numero massimo di tentativi di validazione {chunk_label}.") json_unverified_articles = verify_articles(file_path, chunk_document) if not json_unverified_articles: return chunk_document return chunk_document # Funzione principale che elabora il documento def process_document(path_file: str, number_pages_split: int, use_azure: bool = False) -> Documento: """ Elabora il documento in base al tipo di file: 1. Se il file non è un PDF, lo tratta "così com'è" (ad esempio, come immagine) e ne processa il contenuto tramite process_document_splitted. 2. Se il file è un PDF e contiene più di 5 pagine, lo divide in chunk da 5 pagine. Per ogni chunk viene effettuato l’upload, viene elaborato il JSON e validato. I chunk successivi vengono aggregati nel Documento finale e, se il PDF ha più di 5 pagine, al termine il campo TotaleMerce viene aggiornato con il valore riportato dall’ultimo chunk. """ mime_type, _ = mimetypes.guess_type(path_file) if mime_type is None: mime_type = "application/octet-stream" if use_azure: number_pages_split = 2 if not path_file.lower().endswith(".pdf"): print("File non PDF: elaborazione come immagine.") documento_finale = process_document_splitted(path_file, chunk_label="(immagine)", use_azure=use_azure) return documento_finale reader = PdfReader(path_file) total_pages = len(reader.pages) documento_finale = None ultimo_totale_merce = None for chunk_index in range(0, total_pages, number_pages_split): writer = PdfWriter() for page in reader.pages[chunk_index:chunk_index + number_pages_split]: writer.add_page(page) temp_filename = "temp_chunk_" if use_azure: temp_filename+="azure_" temp_filename += f"{chunk_index // number_pages_split}.pdf" with open(temp_filename, "wb") as temp_file: writer.write(temp_file) chunk_label = f"(chunk {chunk_index // number_pages_split})" chunk_document = process_document_splitted(temp_filename, chunk_label=chunk_label, use_azure=use_azure) if hasattr(chunk_document, "TotaleImponibile"): ultimo_totale_merce = chunk_document.TotaleImponibile if documento_finale is None: documento_finale = chunk_document else: documento_finale.Articoli.extend(chunk_document.Articoli) os.remove(temp_filename) if total_pages > number_pages_split and ultimo_totale_merce is not None: documento_finale.TotaleImponibile = ultimo_totale_merce if documento_finale is None: raise RuntimeError("Nessun documento elaborato.") # Controlli aggiuntivi: Se esiste un AVE non possono esistere altri articoli non ave. Se articoli DOPPI segnalo! if any(articolo.CodiceArticolo.startswith("AVE") for articolo in documento_finale.Articoli): documento_finale.Articoli = [articolo for articolo in documento_finale.Articoli if articolo.CodiceArticolo.startswith("AVE")] combinazioni = [(articolo.CodiceArticolo, articolo.TotaleNonIvato) for articolo in documento_finale.Articoli] conta_combinazioni = Counter(combinazioni) for articolo in documento_finale.Articoli: if conta_combinazioni[(articolo.CodiceArticolo, articolo.TotaleNonIvato)] > 1: articolo.Verificato = False return documento_finale # Analizza Fattura con AZURE def analyze_invoice_azure(file_path: str): """Invia il file (dal percorso specificato) al servizio prebuilt-invoice e restituisce il risultato dell'analisi.""" # Apri il file in modalità binaria e leggi il contenuto with open(file_path, "rb") as file: file_data = file.read() client = DocumentIntelligenceClient(endpoint=settings_ai.ENDPOINT_AZURE, credential=AzureKeyCredential(settings_ai.API_AZURE)) poller = client.begin_analyze_document("prebuilt-invoice", body=file_data) result = poller.result() return parse_invoice_to_documento_azure(result) # Parsing Fattura con AZURE def parse_invoice_to_documento_azure(result) -> Documento: """ Parssa il risultato dell'analisi e mappa i campi rilevanti nel modello Pydantic Documento. """ if not result.documents: raise ValueError("Nessun documento analizzato trovato.") invoice = result.documents[0] invoice_id_field = invoice.fields.get("InvoiceId") numero_documento = invoice_id_field.value_string if invoice_id_field and invoice_id_field.value_string else "" invoice_date_field = invoice.fields.get("InvoiceDate") data_str = invoice_date_field.value_date.isoformat() if invoice_date_field and invoice_date_field.value_date else "" subtotal_field = invoice.fields.get("SubTotal") if subtotal_field and subtotal_field.value_currency: totale_imponibile = subtotal_field.value_currency.amount else: invoice_total_field = invoice.fields.get("InvoiceTotal") totale_imponibile = invoice_total_field.value_currency.amount if invoice_total_field and invoice_total_field.value_currency else 0.0 articoli = [] items_field = invoice.fields.get("Items") if items_field and items_field.value_array: for item in items_field.value_array: product_code_field = item.value_object.get("ProductCode") codice_articolo = product_code_field.value_string if product_code_field and product_code_field.value_string else "" amount_field = item.value_object.get("Amount") totale_non_ivato = amount_field.value_currency.amount if amount_field and amount_field.value_currency else 0.0 articolo = Articolo( CodiceArticolo=codice_articolo, TotaleNonIvato=totale_non_ivato, Verificato=None ) articoli.append(articolo) documento = Documento( TipoDocumento="Fattura", NumeroDocumento=numero_documento, Data=data_str, TotaleImponibile=totale_imponibile, Articoli=articoli ) return documento # Front-End con Streamlit def main(): #st.set_page_config(page_title="Import Fatture AI", page_icon="✨") st.sidebar.title("Caricamento File") uploaded_files = st.sidebar.file_uploader("Seleziona uno o più PDF", type=["pdf", "jpg", "jpeg", "png"], accept_multiple_files=True) model_ai = st.sidebar.selectbox("Modello", ['Gemini Flash 2.0']) # 'Azure Intelligence']) use_azure = True if model_ai == 'Azure Intelligence' else False number_pages_split = st.sidebar.slider('Split Pagine', 1, 30, 2, help="Numero suddivisione pagine del PDF. Più il numero è basso e più il modello AI è preciso, più è alto più è veloce") if st.sidebar.button("Importa", type="primary", use_container_width=True): if not uploaded_files: st.warning("Nessun file caricato!") else: for uploaded_file in uploaded_files: st.subheader(f"📄 {uploaded_file.name}") file_path = uploaded_file.name with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) with st.spinner(f"Elaborazione in corso"): try: doc = process_document(uploaded_file.name, number_pages_split, use_azure=use_azure) totale_non_ivato_verificato = sum(articolo.TotaleNonIvato for articolo in doc.Articoli if articolo.Verificato == 1) totale_non_ivato_non_verificato = sum(articolo.TotaleNonIvato for articolo in doc.Articoli if articolo.Verificato != 1) totale_non_ivato = totale_non_ivato_verificato + totale_non_ivato_non_verificato st.write( f"- **Tipo**: {doc.TipoDocumento}\n" f"- **Numero**: {doc.NumeroDocumento}\n" f"- **Data**: {doc.Data}\n" f"- **Articoli Compatibili**: {len(doc.Articoli)}\n" f"- **Totale Documento**: {format_euro(doc.TotaleImponibile)}\n" ) if totale_non_ivato_non_verificato > 0: st.error(f"Totale Ave Non Verificato: {format_euro(totale_non_ivato_verificato)}") elif totale_non_ivato != 0: st.success(f"Totale Ave Verificato: {format_euro(totale_non_ivato_verificato)}") df = pd.DataFrame([{k: v for k, v in Articolo.model_dump().items() if k != ""} for Articolo in doc.Articoli]) if 'Verificato' in df.columns: df['Verificato'] = df['Verificato'].apply(lambda x: "✅" if x == 1 else "❌" if x == 0 else "❓" if x == 2 else x) if totale_non_ivato > 0: st.dataframe(df, use_container_width=True ,column_config={"TotaleNonIvato": st.column_config.NumberColumn("Totale non Ivato",format="€ %.2f")}) st.json(doc.model_dump(), expanded=False) if totale_non_ivato == 0: st.info(f"Non sono presenti articoli 'AVE'") if uploaded_file and file_path.lower().endswith(".pdf"): list_art = list_art = [articolo.CodiceArticolo for articolo in doc.Articoli] + [articolo.DescrizioneArticolo for articolo in doc.Articoli] if list_art: new_pdf = highlight_text_in_pdf(uploaded_file.getvalue(), list_art) pdf_viewer(input=new_pdf.getvalue(), width=1200) else: pdf_viewer(input=uploaded_file.getvalue(), width=1200) else: st.image(file_path) st.divider() except Exception as e: st.error(f"Errore durante l'elaborazione di {uploaded_file.name}: {e}") finally: if os.path.exists(file_path): os.remove(file_path) if __name__ == "__main__": st.divider() main()