import gradio as gr import fitz # PyMuPDF import os import requests from huggingface_hub import HfApi import base64 from io import BytesIO import urllib.parse import tempfile from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity # Zugriff auf das Secret als Umgebungsvariable HF_WRITE = os.getenv("HF_WRITE") HF_READ = os.getenv("HF_READ") # CONSTANTS REPO_ID = "alexkueck/kkg_suche" REPO_TYPE = "space" SAVE_DIR = "kkg_dokumente" # HfApi-Instanz erstellen api = HfApi() # Funktion zum Extrahieren des Textes aus einer PDF-Datei def extract_text_from_pdf(pdf_path): doc = fitz.open(pdf_path) text = [] for page in doc: text.append(page.get_text()) return text # Dynamische Erstellung der Dokumentenliste und Extraktion der Texte documents = [] for file_name in os.listdir(SAVE_DIR): if file_name.endswith(".pdf"): pdf_path = os.path.join(SAVE_DIR, file_name) pages_text = extract_text_from_pdf(pdf_path) documents.append({"file": file_name, "pages": pages_text}) # TF-IDF Vectorizer vorbereiten vectorizer = TfidfVectorizer() tfidf_matrix = vectorizer.fit_transform([page for doc in documents for page in doc['pages']]) #################################################### def search_documents(query): if not query: return [doc['file'] for doc in documents], "", [] query_vector = vectorizer.transform([query]) cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten() related_docs_indices = cosine_similarities.argsort()[::-1] results = [] relevant_text = "" relevant_pdfs = [] num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if cosine_similarities[i] > 0: doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative) page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] results.append(doc['file']) page_content = doc['pages'][page_index] index = page_content.lower().find(query.lower()) if index != -1: start = max(0, index - 400) end = min(len(page_content), index + 400) relevant_text += f"Aus {doc['file']} (Seite {page_index + 1}):\n...{page_content[start:end]}...\n\n" relevant_pdfs.append((doc['file'], page_index)) return results, relevant_text, relevant_pdfs def update_display(selected_pdf): return display_document(selected_pdf) def update_dropdown(): return gr.Dropdown.update(choices=list_pdfs()) def search_and_update(query): results, rel_text, relevant_pdfs = search_documents(query) pdf_html = "" images = [] temp_dir = tempfile.mkdtemp() for pdf, page in relevant_pdfs: pdf_path = os.path.join(SAVE_DIR, pdf) document = fitz.open(pdf_path) # Seite als Integer umwandeln page_num = int(page) page = document.load_page(page_num) pix = page.get_pixmap() img_path = os.path.join(temp_dir, f"{pdf}_page_{page.number}.png") pix.save(img_path) images.append(img_path) return images, rel_text def upload_pdf(file): if file is None: return None, "Keine Datei hochgeladen." # Extrahieren des Dateinamens aus dem vollen Pfad filename = os.path.basename(file.name) # Datei zum Hugging Face Space hochladen upload_path = f"kkg_dokumente/{filename}" api.upload_file( path_or_fileobj=file.name, path_in_repo=upload_path, repo_id=REPO_ID, repo_type=REPO_TYPE, token=HF_WRITE ) return f"PDF '{filename}' erfolgreich hochgeladen." def list_pdfs(): if not os.path.exists(SAVE_DIR): return [] return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')] def display_pdf(selected_pdf): pdf_path = os.path.join(SAVE_DIR, selected_pdf) # PDF-URL im Hugging Face Space encoded_pdf_name = urllib.parse.quote(selected_pdf) pdf_url = f"https://huggingface.co/spaces/{REPO_ID}/resolve/main/kkg_dokumente/{encoded_pdf_name}" # PDF von der URL herunterladen headers = {"Authorization": f"Bearer {HF_READ}"} response = requests.get(pdf_url, headers=headers) if response.status_code == 200: with open(pdf_path, 'wb') as f: f.write(response.content) else: return None, f"Fehler beim Herunterladen der PDF-Datei von {pdf_url}" # PDF in Bilder umwandeln document = fitz.open(pdf_path) temp_dir = tempfile.mkdtemp() # Nur die erste Seite als Bild speichern page = document.load_page(0) pix = page.get_pixmap() img_path = os.path.join(temp_dir, f"page_0.png") pix.save(img_path) status = f"PDF '{selected_pdf}' erfolgreich geladen und verarbeitet." return img_path, status ############################################################## with gr.Blocks() as demo: with gr.Tab("Upload PDF"): upload_pdf_file = gr.File(label="PDF-Datei hochladen") upload_status = gr.Textbox(label="Status") upload_button = gr.Button("Upload") upload_button.click(upload_pdf, inputs=upload_pdf_file, outputs=upload_status) with gr.Tab("PDF Auswahl und Anzeige"): pdf_dropdown = gr.Dropdown(label="Wählen Sie eine PDF-Datei", choices=list_pdfs()) query = gr.Textbox(label="Suchanfrage", type="text") display_status = gr.Textbox(label="Status") display_button = gr.Button("Anzeigen") with gr.Row(): pdf_image = gr.Image(label="PDF-Seite als Bild", type="filepath") relevant_text = gr.Textbox(label="Relevanter Text", lines=10) display_button.click(display_pdf, inputs=[pdf_dropdown], outputs=[pdf_image, display_status]) with gr.Tab("Suche"): search_query = gr.Textbox(label="Suchanfrage") search_button = gr.Button("Suchen") with gr.Row(): search_results = gr.Gallery(label="Relevante PDFs", type="filepath") search_text = gr.Textbox(label="Relevanter Text", lines=10) search_button.click(search_and_update, inputs=search_query, outputs=[search_results, search_text]) # Automatische Aktualisierung der Dropdown-Liste nach dem Hochladen einer PDF-Datei #upload_button.click(update_dropdown, inputs=None, outputs=pdf_dropdown) #upload_button.click(lambda: pdf_dropdown.update(choices=list_pdfs()), outputs=pdf_dropdown) demo.launch(share=True) """ import gradio as gr import os from huggingface_hub import HfApi import time # Zugriff auf das Secret als Umgebungsvariable HF_TOKEN = os.getenv("HF_WRITE") # Überprüfen, ob das Secret geladen wurde if HF_TOKEN is None: raise ValueError("HF_TOKEN environment variable not set. Please set the secret in your Hugging Face Space.") # Repository-Name und Typ repo_id = "alexkueck/kkg_suche" repo_type = "space" # HfApi-Instanz erstellen api = HfApi() def upload_and_display_pdf(file): if file is None: return None, "Keine Datei hochgeladen." # Extrahieren des Dateinamens aus dem vollen Pfad filename = os.path.basename(file.name) # Datei zum Hugging Face Space hochladen upload_path = f"kkg_dokumente/{filename}" api.upload_file( path_or_fileobj=file.name, path_in_repo=upload_path, repo_id=repo_id, repo_type=repo_type, token=HF_TOKEN ) # Kurze Verzögerung, um sicherzustellen, dass die Datei verfügbar ist time.sleep(2) # URL zur hochgeladenen PDF-Datei erstellen pdf_url = f"https://huggingface.co/spaces/{repo_id}/resolve/main/{upload_path}" # HTML mit eingebettetem PDF erstellen html_content = f
Fehler: Datei nicht gefunden - {file_path}
" # Generieren Sie die URL für das PDF file_url = f"file://{file_path}" return f'' def search_documents(query): if not query: return [doc['file'] for doc in documents], "", [] query_vector = vectorizer.transform([query]) cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten() related_docs_indices = cosine_similarities.argsort()[::-1] results = [] relevant_text = "" relevant_pdfs = [] num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if cosine_similarities[i] > 0: doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative) page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] results.append(doc['file']) page_content = doc['pages'][page_index] index = page_content.lower().find(query.lower()) if index != -1: start = max(0, index - 100) end = min(len(page_content), index + 100) relevant_text += f"Aus {doc['file']} (Seite {page_index + 1}):\n...{page_content[start:end]}...\n\n" relevant_pdfs.append((doc['file'], page_index)) return results, relevant_text, relevant_pdfs def update_display(doc_name): return display_document(doc_name) def search_and_update(query): results, rel_text, relevant_pdfs = search_documents(query) pdf_html = "" for pdf, page in relevant_pdfs: pdf_path = os.path.join(DOCS_DIR, pdf) if not os.path.exists(pdf_path): pdf_html += f"Fehler: Datei nicht gefunden - {pdf_path}
" else: file_url = f"file://{pdf_path}" pdf_html += f"Fehler: Datei nicht gefunden - {file_path}
" # Generieren Sie die URL für das PDF file_url = f"file://{file_path}" return f'' def search_documents(query): if not query: return [doc['file'] for doc in documents], "", [] query_vector = vectorizer.transform([query]) cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten() related_docs_indices = cosine_similarities.argsort()[::-1] results = [] relevant_text = "" relevant_pdfs = [] num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if cosine_similarities[i] > 0: doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative) page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] results.append(doc['file']) page_content = doc['pages'][page_index] index = page_content.lower().find(query.lower()) if index != -1: start = max(0, index - 100) end = min(len(page_content), index + 100) relevant_text += f"Aus {doc['file']} (Seite {page_index + 1}):\n...{page_content[start:end]}...\n\n" relevant_pdfs.append((doc['file'], page_index)) return results, relevant_text, relevant_pdfs def update_display(doc_name): return display_document(doc_name) def search_and_update(query): results, rel_text, relevant_pdfs = search_documents(query) pdf_html = "" for pdf, page in relevant_pdfs: pdf_path = os.path.join(DOCS_DIR, pdf) if not os.path.exists(pdf_path): pdf_html += f"Fehler: Datei nicht gefunden - {pdf_path}
" else: file_url = f"file://{pdf_path}" pdf_html += f"Fehler: Datei nicht gefunden - {file_path}
" # Generieren Sie die URL für das PDF file_url = f"file://{file_path}" return f'' def search_documents(query): if not query: return [doc['file'] for doc in documents], "", [] query_vector = vectorizer.transform([query]) cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten() related_docs_indices = cosine_similarities.argsort()[::-1] results = [] relevant_text = "" relevant_pdfs = [] num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if cosine_similarities[i] > 0: doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative) page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] results.append(doc['file']) page_content = doc['pages'][page_index] index = page_content.lower().find(query.lower()) if index != -1: start = max(0, index - 100) end = min(len(page_content), index + 100) relevant_text += f"Aus {doc['file']} (Seite {page_index + 1}):\n...{page_content[start:end]}...\n\n" relevant_pdfs.append((doc['file'], page_index)) return results, relevant_text, relevant_pdfs def update_display(doc_name): return display_document(doc_name) def search_and_update(query): results, rel_text, relevant_pdfs = search_documents(query) pdf_html = "" for pdf, page in relevant_pdfs: pdf_path = os.path.join(DOCS_DIR, pdf) if not os.path.exists(pdf_path): pdf_html += f"Fehler: Datei nicht gefunden - {pdf_path}
" else: file_url = f"file://{pdf_path}" pdf_html += f"Dies ist ein Beispieltext für die Anzeige des Dokuments {doc_name}.
return hardcoded_html def search_documents(query): if not query: return [doc['file'] for doc in documents], "", [] query_vector = vectorizer.transform([query]) cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten() related_docs_indices = cosine_similarities.argsort()[::-1] results = [] relevant_text = "" relevant_pdfs = [] num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if cosine_similarities[i] > 0: doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative) page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] results.append(doc['file']) page_content = doc['pages'][page_index] index = page_content.lower().find(query.lower()) if index != -1: start = max(0, index - 100) end = min(len(page_content), index + 100) relevant_text += f"Aus {doc['file']} (Seite {page_index + 1}):\n...{page_content[start:end]}...\n\n" relevant_pdfs.append((doc['file'], page_index)) return results, relevant_text, relevant_pdfs def update_display(doc_name): return display_document(doc_name) def search_and_update(query): results, rel_text, relevant_pdfs = search_documents(query) pdf_html = "" for pdf, page in relevant_pdfs: # Hartcodierter HTML-Inhalt zur Anzeige der Suchergebnisse pdf_html += f"Fehler: Datei nicht gefunden - {file_path}
" # Generieren Sie die URL für das PDF file_url = f"{DOCS_DIR}/{doc_name}" return f'' def search_documents(query): if not query: return [doc['file'] for doc in documents], "", [] query_vector = vectorizer.transform([query]) cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten() related_docs_indices = cosine_similarities.argsort()[::-1] results = [] relevant_text = "" relevant_pdfs = [] num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if cosine_similarities[i] > 0: doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative) page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] results.append(doc['file']) page_content = doc['pages'][page_index] index = page_content.lower().find(query.lower()) if index != -1: start = max(0, index - 100) end = min(len(page_content), index + 100) relevant_text += f"Aus {doc['file']} (Seite {page_index + 1}):\n...{page_content[start:end]}...\n\n" relevant_pdfs.append((doc['file'], page_index)) return results, relevant_text, relevant_pdfs def update_display(doc_name): return display_document(doc_name) def search_and_update(query): results, rel_text, relevant_pdfs = search_documents(query) pdf_html = "" for pdf, page in relevant_pdfs: pdf_path = os.path.join(DOCS_DIR, pdf) if not os.path.exists(pdf_path): pdf_html += f"Fehler: Datei nicht gefunden - {pdf_path}
" else: file_url = f"{DOCS_DIR}/{pdf}" pdf_html += f"