import gradio as gr import logging import sqlite3 from typing import List, Dict import os import zipfile import tempfile import shutil # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Database connection (you'll need to set this up) db = None # Replace with your actual database connection class DatabaseError(Exception): pass # Database functions def fetch_items_by_keyword(search_query: str) -> List[Dict]: try: with db.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT m.id, m.title, m.url FROM Media m JOIN MediaKeywords mk ON m.id = mk.media_id JOIN Keywords k ON mk.keyword_id = k.id WHERE k.keyword LIKE ? """, (f'%{search_query}%',)) results = cursor.fetchall() return [{"id": r[0], "title": r[1], "url": r[2]} for r in results] except sqlite3.Error as e: logger.error(f"Error fetching items by keyword: {e}") raise DatabaseError(f"Error fetching items by keyword: {e}") def fetch_item_details(media_id: int) -> tuple: try: with db.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT prompt, summary FROM MediaModifications WHERE media_id = ? ORDER BY modification_date DESC LIMIT 1 """, (media_id,)) prompt_summary_result = cursor.fetchone() cursor.execute("SELECT content FROM Media WHERE id = ?", (media_id,)) content_result = cursor.fetchone() prompt = prompt_summary_result[0] if prompt_summary_result else "" summary = prompt_summary_result[1] if prompt_summary_result else "" content = content_result[0] if content_result else "" return content, prompt, summary except sqlite3.Error as e: logger.error(f"Error fetching item details: {e}") return "", "", "" def browse_items(search_query: str, search_type: str) -> List[Dict]: try: with db.get_connection() as conn: cursor = conn.cursor() if search_type == 'Title': cursor.execute("SELECT id, title, url FROM Media WHERE title LIKE ?", (f'%{search_query}%',)) elif search_type == 'URL': cursor.execute("SELECT id, title, url FROM Media WHERE url LIKE ?", (f'%{search_query}%',)) elif search_type == 'Keyword': return fetch_items_by_keyword(search_query) elif search_type == 'Content': cursor.execute("SELECT id, title, url FROM Media WHERE content LIKE ?", (f'%{search_query}%',)) else: raise ValueError(f"Invalid search type: {search_type}") results = cursor.fetchall() return [{"id": r[0], "title": r[1], "url": r[2]} for r in results] except sqlite3.Error as e: logger.error(f"Error fetching items by {search_type}: {e}") raise DatabaseError(f"Error fetching items by {search_type}: {e}") # Export functions def export_item_as_markdown(media_id: int) -> str: try: content, prompt, summary = fetch_item_details(media_id) title = f"Item {media_id}" # You might want to fetch the actual title markdown_content = f"# {title}\n\n## Prompt\n{prompt}\n\n## Summary\n{summary}\n\n## Content\n{content}" filename = f"export_item_{media_id}.md" with open(filename, "w", encoding='utf-8') as f: f.write(markdown_content) logger.info(f"Successfully exported item {media_id} to {filename}") return filename except Exception as e: logger.error(f"Error exporting item {media_id}: {str(e)}") return None def export_items_by_keyword(keyword: str) -> str: try: items = fetch_items_by_keyword(keyword) if not items: logger.warning(f"No items found for keyword: {keyword}") return None # Create a temporary directory to store individual markdown files with tempfile.TemporaryDirectory() as temp_dir: folder_name = f"export_keyword_{keyword}" export_folder = os.path.join(temp_dir, folder_name) os.makedirs(export_folder) for item in items: content, prompt, summary = fetch_item_details(item['id']) markdown_content = f"# {item['title']}\n\n## Prompt\n{prompt}\n\n## Summary\n{summary}\n\n## Content\n{content}" # Create individual markdown file for each item file_name = f"{item['id']}_{item['title'][:50]}.md" # Limit filename length file_path = os.path.join(export_folder, file_name) with open(file_path, "w", encoding='utf-8') as f: f.write(markdown_content) # Create a zip file containing all markdown files zip_filename = f"{folder_name}.zip" shutil.make_archive(os.path.join(temp_dir, folder_name), 'zip', export_folder) # Move the zip file to a location accessible by Gradio final_zip_path = os.path.join(os.getcwd(), zip_filename) shutil.move(os.path.join(temp_dir, zip_filename), final_zip_path) logger.info(f"Successfully exported {len(items)} items for keyword '{keyword}' to {zip_filename}") return final_zip_path except Exception as e: logger.error(f"Error exporting items for keyword '{keyword}': {str(e)}") return None def export_selected_items(selected_items: List[Dict]) -> str: try: if not selected_items: logger.warning("No items selected for export") return None markdown_content = "# Selected Items\n\n" for item in selected_items: content, prompt, summary = fetch_item_details(item['id']) markdown_content += f"## {item['title']}\n\n### Prompt\n{prompt}\n\n### Summary\n{summary}\n\n### Content\n{content}\n\n---\n\n" filename = "export_selected_items.md" with open(filename, "w", encoding='utf-8') as f: f.write(markdown_content) logger.info(f"Successfully exported {len(selected_items)} selected items to {filename}") return filename except Exception as e: logger.error(f"Error exporting selected items: {str(e)}") return None # Gradio interface functions def display_search_results(search_query: str, search_type: str) -> List[Dict]: try: results = browse_items(search_query, search_type) return [{"name": f"{item['title']} ({item['url']})", "value": item} for item in results] except DatabaseError as e: logger.error(f"Error in display_search_results: {str(e)}") return [] # Gradio interface with gr.Blocks() as demo: gr.Markdown("# Content Export Interface") with gr.Tab("Search and Export"): search_query = gr.Textbox(label="Search Query") search_type = gr.Radio(["Title", "URL", "Keyword", "Content"], label="Search By") search_button = gr.Button("Search") search_results = gr.CheckboxGroup(label="Search Results") export_selected_button = gr.Button("Export Selected Items") keyword_input = gr.Textbox(label="Enter keyword for export") export_by_keyword_button = gr.Button("Export items by keyword") export_output = gr.File(label="Download Exported File") error_output = gr.Textbox(label="Status/Error Messages", interactive=False) search_button.click( fn=display_search_results, inputs=[search_query, search_type], outputs=[search_results, error_output] ) export_selected_button.click( fn=lambda selected: (export_selected_items(selected), "Exported selected items") if selected else ( None, "No items selected"), inputs=[search_results], outputs=[export_output, error_output] ) export_by_keyword_button.click( fn=lambda keyword: ( export_items_by_keyword(keyword), f"Exported items for keyword: {keyword}") if keyword else ( None, "No keyword provided"), inputs=[keyword_input], outputs=[export_output, error_output] ) # Add functionality to export individual items search_results.select( fn=lambda item: (export_item_as_markdown(item['id']), f"Exported item: {item['title']}") if item else ( None, "No item selected"), inputs=[gr.State(lambda: search_results.value)], outputs=[export_output, error_output] ) demo.launch() # This modified version of export_items_by_keyword does the following: # # Creates a temporary directory to store individual markdown files. # For each item associated with the keyword, it creates a separate markdown file. # Places all markdown files in a folder named export_keyword_{keyword}. # Creates a zip file containing the folder with all markdown files. # Moves the zip file to a location accessible by Gradio for download.