Spaces:
Running
Running
# Import_Functionality.py | |
# Functionality to import content into the DB | |
# | |
# Imports | |
from time import sleep | |
import logging | |
import re | |
import shutil | |
import tempfile | |
import os | |
import traceback | |
import zipfile | |
# | |
# External Imports | |
import gradio as gr | |
# | |
# Local Imports | |
from App_Function_Libraries.DB.DB_Manager import insert_prompt_to_db, load_preset_prompts, import_obsidian_note_to_db, \ | |
add_media_to_database | |
from App_Function_Libraries.Prompt_Handling import import_prompt_from_file, import_prompts_from_zip# | |
from App_Function_Libraries.Summarization.Summarization_General_Lib import perform_summarization | |
################################################################################################################### | |
# | |
# Functions: | |
logger = logging.getLogger() | |
def import_data(file, title, author, keywords, custom_prompt, summary, auto_summarize, api_name, api_key): | |
logging.debug(f"Starting import_data with file: {file} / Title: {title} / Author: {author} / Keywords: {keywords}") | |
if file is None: | |
return "No file uploaded. Please upload a file." | |
try: | |
logging.debug(f"File object type: {type(file)}") | |
logging.debug(f"File object attributes: {dir(file)}") | |
if hasattr(file, 'name'): | |
file_name = file.name | |
else: | |
file_name = 'unknown_file' | |
# Create a temporary file | |
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt', encoding='utf-8') as temp_file: | |
if isinstance(file, str): | |
# If file is a string, it's likely file content | |
temp_file.write(file) | |
elif hasattr(file, 'read'): | |
# If file has a 'read' method, it's likely a file-like object | |
content = file.read() | |
if isinstance(content, bytes): | |
content = content.decode('utf-8') | |
temp_file.write(content) | |
else: | |
# If it's neither a string nor a file-like object, try converting it to a string | |
temp_file.write(str(file)) | |
temp_file.seek(0) | |
file_content = temp_file.read() | |
logging.debug(f"File name: {file_name}") | |
logging.debug(f"File content (first 100 chars): {file_content[:100]}") | |
# Create info_dict | |
info_dict = { | |
'title': title or 'Untitled', | |
'uploader': author or 'Unknown', | |
} | |
# FIXME - Add chunking support... I added chapter chunking specifically for this... | |
# Create segments (assuming one segment for the entire content) | |
segments = [{'Text': file_content}] | |
# Process keywords | |
keyword_list = [kw.strip() for kw in keywords.split(',') if kw.strip()] if keywords else [] | |
# Handle summarization | |
if auto_summarize and api_name and api_key: | |
summary = perform_summarization(api_name, file_content, custom_prompt, api_key) | |
elif not summary: | |
summary = "No summary provided" | |
# Add to database | |
result = add_media_to_database( | |
url=file_name, # Using filename as URL | |
info_dict=info_dict, | |
segments=segments, | |
summary=summary, | |
keywords=keyword_list, | |
custom_prompt_input=custom_prompt, | |
whisper_model="Imported", # Indicating this was an imported file | |
media_type="document", | |
overwrite=False # Set this to True if you want to overwrite existing entries | |
) | |
# Clean up the temporary file | |
os.unlink(temp_file.name) | |
return f"File '{file_name}' import attempt complete. Database result: {result}" | |
except Exception as e: | |
logging.exception(f"Error importing file: {str(e)}") | |
return f"Error importing file: {str(e)}" | |
def process_obsidian_zip(zip_file): | |
with tempfile.TemporaryDirectory() as temp_dir: | |
try: | |
with zipfile.ZipFile(zip_file, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
imported_files, total_files, errors = import_obsidian_vault(temp_dir) | |
return imported_files, total_files, errors | |
except zipfile.BadZipFile: | |
error_msg = "The uploaded file is not a valid zip file." | |
logger.error(error_msg) | |
return 0, 0, [error_msg] | |
except Exception as e: | |
error_msg = f"Error processing zip file: {str(e)}\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
return 0, 0, [error_msg] | |
finally: | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
def scan_obsidian_vault(vault_path): | |
markdown_files = [] | |
for root, dirs, files in os.walk(vault_path): | |
for file in files: | |
if file.endswith('.md'): | |
markdown_files.append(os.path.join(root, file)) | |
return markdown_files | |
def parse_obsidian_note(file_path): | |
with open(file_path, 'r', encoding='utf-8') as file: | |
content = file.read() | |
frontmatter = {} | |
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---\s*\n', content, re.DOTALL) | |
if frontmatter_match: | |
frontmatter_text = frontmatter_match.group(1) | |
import yaml | |
frontmatter = yaml.safe_load(frontmatter_text) | |
content = content[frontmatter_match.end():] | |
tags = re.findall(r'#(\w+)', content) | |
links = re.findall(r'\[\[(.*?)\]\]', content) | |
return { | |
'title': os.path.basename(file_path).replace('.md', ''), | |
'content': content, | |
'frontmatter': frontmatter, | |
'tags': tags, | |
'links': links, | |
'file_path': file_path # Add this line | |
} | |
def create_import_single_prompt_tab(): | |
with gr.TabItem("Import a Prompt"): | |
gr.Markdown("# Import a prompt into the database") | |
with gr.Row(): | |
with gr.Column(): | |
import_file = gr.File(label="Upload file for import", file_types=["txt", "md"]) | |
title_input = gr.Textbox(label="Title", placeholder="Enter the title of the content") | |
author_input = gr.Textbox(label="Author", placeholder="Enter the author's name") | |
system_input = gr.Textbox(label="System", placeholder="Enter the system message for the prompt", lines=3) | |
user_input = gr.Textbox(label="User", placeholder="Enter the user message for the prompt", lines=3) | |
keywords_input = gr.Textbox(label="Keywords", placeholder="Enter keywords separated by commas") | |
import_button = gr.Button("Import Prompt") | |
with gr.Column(): | |
import_output = gr.Textbox(label="Import Status") | |
save_button = gr.Button("Save to Database") | |
save_output = gr.Textbox(label="Save Status") | |
def handle_import(file): | |
result = import_prompt_from_file(file) | |
if isinstance(result, tuple) and len(result) == 5: | |
title, author, system, user, keywords = result | |
return gr.update(value="File successfully imported. You can now edit the content before saving."), \ | |
gr.update(value=title), gr.update(value=author), gr.update(value=system), \ | |
gr.update(value=user), gr.update(value=", ".join(keywords)) | |
else: | |
return gr.update(value=result), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
import_button.click( | |
fn=handle_import, | |
inputs=[import_file], | |
outputs=[import_output, title_input, author_input, system_input, user_input, keywords_input] | |
) | |
def save_prompt_to_db(title, author, system, user, keywords): | |
keyword_list = [k.strip() for k in keywords.split(',') if k.strip()] | |
return insert_prompt_to_db(title, author, system, user, keyword_list) | |
save_button.click( | |
fn=save_prompt_to_db, | |
inputs=[title_input, author_input, system_input, user_input, keywords_input], | |
outputs=save_output | |
) | |
def update_prompt_dropdown(): | |
return gr.update(choices=load_preset_prompts()) | |
save_button.click( | |
fn=update_prompt_dropdown, | |
inputs=[], | |
outputs=[gr.Dropdown(label="Select Preset Prompt")] | |
) | |
def create_import_item_tab(): | |
with gr.TabItem("Import Markdown/Text Files"): | |
gr.Markdown("# Import a markdown file or text file into the database") | |
gr.Markdown("...and have it tagged + summarized") | |
with gr.Row(): | |
with gr.Column(): | |
import_file = gr.File(label="Upload file for import", file_types=["txt", "md"]) | |
title_input = gr.Textbox(label="Title", placeholder="Enter the title of the content") | |
author_input = gr.Textbox(label="Author", placeholder="Enter the author's name") | |
keywords_input = gr.Textbox(label="Keywords", placeholder="Enter keywords, comma-separated") | |
custom_prompt_input = gr.Textbox(label="Custom Prompt", | |
placeholder="Enter a custom prompt for summarization (optional)") | |
summary_input = gr.Textbox(label="Summary", | |
placeholder="Enter a summary or leave blank for auto-summarization", lines=3) | |
auto_summarize_checkbox = gr.Checkbox(label="Auto-summarize", value=False) | |
api_name_input = gr.Dropdown( | |
choices=[None, "Local-LLM", "OpenAI", "Anthropic", "Cohere", "Groq", "DeepSeek", "Mistral", "OpenRouter", | |
"Llama.cpp", "Kobold", "Ooba", "Tabbyapi", "VLLM","ollama", "HuggingFace", "Custom-OpenAI-API"], | |
label="API for Auto-summarization" | |
) | |
api_key_input = gr.Textbox(label="API Key", type="password") | |
with gr.Column(): | |
import_button = gr.Button("Import Data") | |
import_output = gr.Textbox(label="Import Status") | |
import_button.click( | |
fn=import_data, | |
inputs=[import_file, title_input, author_input, keywords_input, custom_prompt_input, | |
summary_input, auto_summarize_checkbox, api_name_input, api_key_input], | |
outputs=import_output | |
) | |
def create_import_multiple_prompts_tab(): | |
with gr.TabItem("Import Multiple Prompts"): | |
gr.Markdown("# Import multiple prompts into the database") | |
gr.Markdown("Upload a zip file containing multiple prompt files (txt or md)") | |
with gr.Row(): | |
with gr.Column(): | |
zip_file = gr.File(label="Upload zip file for import", file_types=["zip"]) | |
import_button = gr.Button("Import Prompts") | |
prompts_dropdown = gr.Dropdown(label="Select Prompt to Edit", choices=[]) | |
title_input = gr.Textbox(label="Title", placeholder="Enter the title of the content") | |
author_input = gr.Textbox(label="Author", placeholder="Enter the author's name") | |
system_input = gr.Textbox(label="System", placeholder="Enter the system message for the prompt", | |
lines=3) | |
user_input = gr.Textbox(label="User", placeholder="Enter the user message for the prompt", lines=3) | |
keywords_input = gr.Textbox(label="Keywords", placeholder="Enter keywords separated by commas") | |
with gr.Column(): | |
import_output = gr.Textbox(label="Import Status") | |
save_button = gr.Button("Save to Database") | |
save_output = gr.Textbox(label="Save Status") | |
prompts_display = gr.Textbox(label="Identified Prompts") | |
def handle_zip_import(zip_file): | |
result = import_prompts_from_zip(zip_file) | |
if isinstance(result, list): | |
prompt_titles = [prompt['title'] for prompt in result] | |
return gr.update( | |
value="Zip file successfully imported. Select a prompt to edit from the dropdown."), prompt_titles, gr.update( | |
value="\n".join(prompt_titles)), result | |
else: | |
return gr.update(value=result), [], gr.update(value=""), [] | |
def handle_prompt_selection(selected_title, prompts): | |
selected_prompt = next((prompt for prompt in prompts if prompt['title'] == selected_title), None) | |
if selected_prompt: | |
return ( | |
selected_prompt['title'], | |
selected_prompt.get('author', ''), | |
selected_prompt['system'], | |
selected_prompt.get('user', ''), | |
", ".join(selected_prompt.get('keywords', [])) | |
) | |
else: | |
return "", "", "", "", "" | |
zip_import_state = gr.State([]) | |
import_button.click( | |
fn=handle_zip_import, | |
inputs=[zip_file], | |
outputs=[import_output, prompts_dropdown, prompts_display, zip_import_state] | |
) | |
prompts_dropdown.change( | |
fn=handle_prompt_selection, | |
inputs=[prompts_dropdown, zip_import_state], | |
outputs=[title_input, author_input, system_input, user_input, keywords_input] | |
) | |
def save_prompt_to_db(title, author, system, user, keywords): | |
keyword_list = [k.strip() for k in keywords.split(',') if k.strip()] | |
return insert_prompt_to_db(title, author, system, user, keyword_list) | |
save_button.click( | |
fn=save_prompt_to_db, | |
inputs=[title_input, author_input, system_input, user_input, keywords_input], | |
outputs=save_output | |
) | |
def update_prompt_dropdown(): | |
return gr.update(choices=load_preset_prompts()) | |
save_button.click( | |
fn=update_prompt_dropdown, | |
inputs=[], | |
outputs=[gr.Dropdown(label="Select Preset Prompt")] | |
) | |
def create_import_obsidian_vault_tab(): | |
with gr.TabItem("Import Obsidian Vault"): | |
gr.Markdown("## Import Obsidian Vault") | |
with gr.Row(): | |
with gr.Column(): | |
vault_path_input = gr.Textbox(label="Obsidian Vault Path (Local)") | |
vault_zip_input = gr.File(label="Upload Obsidian Vault (Zip)") | |
with gr.Column(): | |
import_vault_button = gr.Button("Import Obsidian Vault") | |
import_status = gr.Textbox(label="Import Status", interactive=False) | |
def import_vault(vault_path, vault_zip): | |
if vault_zip: | |
imported, total, errors = process_obsidian_zip(vault_zip.name) | |
elif vault_path: | |
imported, total, errors = import_obsidian_vault(vault_path) | |
else: | |
return "Please provide either a local vault path or upload a zip file." | |
status = f"Imported {imported} out of {total} files.\n" | |
if errors: | |
status += f"Encountered {len(errors)} errors:\n" + "\n".join(errors) | |
return status | |
import_vault_button.click( | |
fn=import_vault, | |
inputs=[vault_path_input, vault_zip_input], | |
outputs=[import_status], | |
) | |
def import_obsidian_vault(vault_path, progress=gr.Progress()): | |
try: | |
markdown_files = scan_obsidian_vault(vault_path) | |
total_files = len(markdown_files) | |
imported_files = 0 | |
errors = [] | |
for i, file_path in enumerate(markdown_files): | |
try: | |
note_data = parse_obsidian_note(file_path) | |
success, error_msg = import_obsidian_note_to_db(note_data) | |
if success: | |
imported_files += 1 | |
else: | |
errors.append(error_msg) | |
except Exception as e: | |
error_msg = f"Error processing {file_path}: {str(e)}" | |
logger.error(error_msg) | |
errors.append(error_msg) | |
progress((i + 1) / total_files, f"Imported {imported_files} of {total_files} files") | |
sleep(0.1) # Small delay to prevent UI freezing | |
return imported_files, total_files, errors | |
except Exception as e: | |
error_msg = f"Error scanning vault: {str(e)}\n{traceback.format_exc()}" | |
logger.error(error_msg) | |
return 0, 0, [error_msg] |