# app.py # ============================================================================= # 🚀 IMPORTS # ============================================================================= import base64 # 🔥 For encoding/decoding files import glob # 🔍 For file searching import hashlib # 🔒 For hashing import json # 🧮 For JSON handling import os # 📁 OS interaction import pandas as pd # 🐼 For data frames import pytz # ⏰ For timezone management import random # 🎲 For randomness import re # 🔍 For regex operations import shutil # 🗑️ For file copying/removal import streamlit as st # 💻 Streamlit UI framework import time # ⏳ For time functions import traceback # 🚨 For error traces import uuid # 🆔 For unique IDs import zipfile # 📦 For archiving files from PIL import Image # 🖼️ For image processing from azure.cosmos import CosmosClient, PartitionKey, exceptions # ☁️ Cosmos DB from datetime import datetime # ⏰ For timestamps from git import Repo # 🐙 For Git operations from github import Github # 🔗 For GitHub API from gradio_client import Client, handle_file # 🤖 For Gradio video generation import tempfile # 📝 For temporary files import io # 📡 For in-memory streams import requests # 🌐 For HTTP requests import numpy as np # 🔢 For numerical operations from urllib.parse import quote # 🔗 For URL encoding # ============================================================================= # 😎 EXTERNAL HELP LINKS (Always visible in sidebar) # ============================================================================= external_links = [ {"title": "MergeKit Official GitHub", "url": "https://github.com/arcee-ai/MergeKit", "emoji": "💻"}, {"title": "MergeKit arXiv Paper", "url": "https://arxiv.org/abs/xxxx.xxxxx", "emoji": "📘"}, {"title": "MergeKit Tutorial", "url": "https://huggingface.co/blog/mergekit-tutorial", "emoji": "✍️"}, {"title": "MergeKit Sample Usage", "url": "https://github.com/arcee-ai/MergeKit#examples", "emoji": "📚"}, {"title": "DistillKit Official GitHub", "url": "https://github.com/arcee-ai/DistillKit", "emoji": "💻"}, {"title": "DistillKit Announcing Blog Post", "url": "https://arcee.ai/blog/distillkit-announcement", "emoji": "✍️"}, {"title": "DistillKit Sample Usage", "url": "https://github.com/arcee-ai/DistillKit#usage", "emoji": "📚"}, {"title": "Spectrum Hugging Face Blog Post", "url": "https://huggingface.co/blog/spectrum", "emoji": "✍️"}, {"title": "Hugging Face Model Merging Docs", "url": "https://huggingface.co/docs/peft/model_merging", "emoji": "📚"}, {"title": "arcee.ai Official Website", "url": "https://arcee.ai", "emoji": "🌐"}, ] # ============================================================================= # 🎨 APP CONFIGURATION # ============================================================================= Site_Name = '🐙 GitCosmos' title = "🐙 GitCosmos" helpURL = 'https://huggingface.co/awacke1' bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/' icons = '🐙🌌💫' st.set_page_config( page_title=title, page_icon=icons, layout="wide", initial_sidebar_state="auto", menu_items={ 'Get Help': helpURL, 'Report a bug': bugURL, 'About': title } ) # Cosmos DB & App URLs ENDPOINT = "https://acae-afd.documents.azure.com:443/" DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") Key = os.environ.get("Key") LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI" CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer' # ============================================================================= # 💾 HELPER FUNCTIONS # ============================================================================= # 🔗 Get a download link for a file def get_download_link(file_path): with open(file_path, "rb") as file: contents = file.read() b64 = base64.b64encode(contents).decode() file_name = os.path.basename(file_path) return f'Download {file_name} 📂' # 🆔 Generate a unique ID def generate_unique_id(): timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') unique_uuid = str(uuid.uuid4()) return_value = f"{timestamp}-{unique_uuid}" st.write('New ID: ' + return_value) return return_value # 📝 Generate a safe filename based on a prompt def generate_filename(prompt, file_type): central = pytz.timezone('US/Central') safe_date_time = datetime.now(central).strftime("%m%d_%H%M") safe_prompt = re.sub(r'\W+', '', prompt)[:90] return f"{safe_date_time}{safe_prompt}.{file_type}" # 📄 Create a file with given content def create_file(filename, prompt, response, should_save=True): if not should_save: return with open(filename, 'w', encoding='utf-8') as file: file.write(prompt + "\n\n" + response) # 📂 Load file contents def load_file(file_name): with open(file_name, "r", encoding='utf-8') as file: content = file.read() return content # 🔗 Display a glossary entity with quick search links def display_glossary_entity(k): search_urls = { "🚀": lambda k: f"/?q={k}", "📖": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", "🔍": lambda k: f"https://www.google.com/search?q={quote(k)}", "🎥": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", } links_md = ' '.join([f"{emoji}" for emoji, url in search_urls.items()]) st.markdown(f"{k} {links_md}", unsafe_allow_html=True) # 📦 Create a ZIP archive of given files def create_zip_of_files(files): zip_name = "all_files.zip" with zipfile.ZipFile(zip_name, 'w') as zipf: for file in files: zipf.write(file) return zip_name # 🎥 Get HTML to embed a video def get_video_html(video_path, width="100%"): video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" return f''' ''' # 🎵 Get HTML to embed audio def get_audio_html(audio_path, width="100%"): audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" return f''' ''' # ✂️ Preprocess text (e.g., for JSON safety) def preprocess_text(text): text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n') text = text.replace('"', '\\"') text = re.sub(r'[\t]', ' ', text) text = re.sub(r'[^\x00-\x7F]+', '', text) return text.strip() # ============================================================================= # ☁️ COSMOS DB FUNCTIONS # ============================================================================= # List all databases def get_databases(client): return [db['id'] for db in client.list_databases()] # List all containers in a database def get_containers(database): return [container['id'] for container in database.list_containers()] # Retrieve documents from a container def get_documents(container, limit=None): query = "SELECT * FROM c ORDER BY c._ts DESC" items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) return items # Insert a record into a container def insert_record(container, record): try: container.create_item(body=record) return True, "Inserted! 🎉" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error: {str(e)} 🚨" except Exception as e: return False, f"Error: {str(e)} 😱" # Update (upsert) a record in a container def update_record(container, updated_record): try: container.upsert_item(body=updated_record) return True, f"Updated {updated_record['id']} 🛠️" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error: {str(e)} 🚨" except Exception as e: return False, f"Error: {traceback.format_exc()} 😱" # Delete a record from a container def delete_record(container, record): try: if "id" not in record: return False, "Record must contain an 'id' field. 🛑" doc_id = record["id"] if "delete_log" not in st.session_state: st.session_state.delete_log = [] st.session_state.delete_log.append(f"Attempting to delete document: {json.dumps(record, indent=2)}") partition_key_value = record.get("pk", doc_id) st.session_state.delete_log.append(f"Using ID and Partition Key: {partition_key_value}") container.delete_item(item=doc_id, partition_key=partition_key_value) success_msg = f"Record {doc_id} successfully deleted from Cosmos DB. 🗑️" st.session_state.delete_log.append(success_msg) return True, success_msg except exceptions.CosmosResourceNotFoundError: success_msg = f"Record {doc_id} not found in Cosmos DB (already deleted or never existed). 🗑️" st.session_state.delete_log.append(success_msg) return True, success_msg except exceptions.CosmosHttpResponseError as e: error_msg = f"HTTP error deleting {doc_id}: {str(e)}. 🚨" st.session_state.delete_log.append(error_msg) return False, error_msg except Exception as e: error_msg = f"Unexpected error deleting {doc_id}: {str(traceback.format_exc())}. 😱" st.session_state.delete_log.append(error_msg) return False, error_msg # Save data to Cosmos DB (for AI responses) def save_to_cosmos_db(container, query, response1, response2): try: if container: timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') unique_uuid = str(uuid.uuid4()) new_id = f"{timestamp}-{unique_uuid}" record = { "id": new_id, "pk": new_id, "name": new_id, "query": query, "response1": response1, "response2": response2, "timestamp": datetime.utcnow().isoformat(), "type": "ai_response", "version": "1.0" } container.create_item(body=record) st.success(f"Saved: {record['id']}") st.session_state.documents = get_documents(container) else: st.error("Cosmos container not initialized.") except Exception as e: st.error(f"Save error: {str(e)}") # Archive current container data into a ZIP file def archive_current_container(database_name, container_name, client): try: base_dir = "./cosmos_archive_current_container" if os.path.exists(base_dir): shutil.rmtree(base_dir) os.makedirs(base_dir) db_client = client.get_database_client(database_name) container_client = db_client.get_container_client(container_name) items = list(container_client.read_all_items()) container_dir = os.path.join(base_dir, container_name) os.makedirs(container_dir) for item in items: item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: json.dump(item, f, indent=2) archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" shutil.make_archive(archive_name, 'zip', base_dir) return get_download_link(f"{archive_name}.zip") except Exception as e: return f"Archive error: {str(e)} 😢" # ============================================================================= # 🚀 ADVANCED COSMOS FUNCTIONS # ============================================================================= def create_new_container(database, container_id, partition_key_path, analytical_storage_ttl=None, indexing_policy=None, vector_embedding_policy=None): try: if analytical_storage_ttl is not None: container = database.create_container( id=container_id, partition_key=PartitionKey(path=partition_key_path), analytical_storage_ttl=analytical_storage_ttl, indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy ) else: container = database.create_container( id=container_id, partition_key=PartitionKey(path=partition_key_path), indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy ) except exceptions.CosmosHttpResponseError as e: if analytical_storage_ttl is not None and "analyticalStorageTtl" in str(e): try: container = database.create_container( id=container_id, partition_key=PartitionKey(path=partition_key_path), indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy ) except Exception as e2: st.error(f"Error creating container without analytical_storage_ttl: {str(e2)}") return None elif isinstance(e, exceptions.CosmosResourceExistsError): container = database.get_container_client(container_id) else: st.error(f"Error creating container: {str(e)}") return None return container def advanced_insert_item(container, item): try: container.upsert_item(item) return True, f"Item {item.get('id', '')} inserted. ➕" except Exception as e: return False, str(e) def advanced_update_item(container, item): try: container.upsert_item(item) return True, f"Item {item.get('id', '')} updated. ✏️" except Exception as e: return False, str(e) def advanced_delete_item(container, item_id, partition_key_value): try: container.delete_item(item=item_id, partition_key=partition_key_value) return True, f"Item {item_id} deleted. 🗑️" except Exception as e: return False, str(e) def vector_search(container, query_vector, vector_field, top=10, exact_search=False): query_vector_str = json.dumps(query_vector) query = f"""SELECT TOP {top} c.id, VectorDistance(c.{vector_field}, {query_vector_str}, {str(exact_search).lower()}, {{'dataType':'float32','distanceFunction':'cosine'}}) AS SimilarityScore FROM c ORDER BY SimilarityScore""" results = list(container.query_items(query=query, enable_cross_partition_query=True)) return results # ============================================================================= # 🐙 GITHUB FUNCTIONS # ============================================================================= def download_github_repo(url, local_path): if os.path.exists(local_path): shutil.rmtree(local_path) Repo.clone_from(url, local_path) def create_zip_file(source_dir, output_filename): shutil.make_archive(output_filename, 'zip', source_dir) def create_repo(g, repo_name): user = g.get_user() return user.create_repo(repo_name) def push_to_github(local_path, repo, github_token): repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" local_repo = Repo(local_path) if 'origin' in [remote.name for remote in local_repo.remotes]: origin = local_repo.remote('origin') origin.set_url(repo_url) else: origin = local_repo.create_remote('origin', repo_url) if not local_repo.heads: local_repo.git.checkout('-b', 'main') current_branch = 'main' else: current_branch = local_repo.active_branch.name local_repo.git.add(A=True) if local_repo.is_dirty(): local_repo.git.commit('-m', 'Initial commit') origin.push(refspec=f'{current_branch}:{current_branch}') # ============================================================================= # 📁 FILE & MEDIA MANAGEMENT FUNCTIONS # ============================================================================= def display_saved_files_in_sidebar(): all_files = sorted([f for f in glob.glob("*.md") if not f.lower().startswith('readme')], reverse=True) st.sidebar.markdown("## 📁 Files") for file in all_files: col1, col2, col3 = st.sidebar.columns([6, 2, 1]) with col1: st.markdown(f"📄 {file}") with col2: st.sidebar.download_button( label="⬇️", data=open(file, 'rb').read(), file_name=file ) with col3: if st.sidebar.button("🗑", key=f"delete_{file}"): os.remove(file) st.rerun() def display_file_viewer(file_path): content = load_file(file_path) if content: st.markdown("### 📄 File Viewer") st.markdown(f"**{file_path}**") file_stats = os.stat(file_path) st.markdown(f"**Mod:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')} | **Size:** {file_stats.st_size} bytes") st.markdown("---") st.markdown(content) st.download_button("⬇️", data=content, file_name=os.path.basename(file_path), mime="text/markdown") def display_file_editor(file_path): if 'file_content' not in st.session_state: st.session_state.file_content = {} if file_path not in st.session_state.file_content: content = load_file(file_path) if content is not None: st.session_state.file_content[file_path] = content else: return st.markdown("### ✏️ Edit File") st.markdown(f"**Editing:** {file_path}") # Use on_change callback for auto-save via doc_editor new_content = st.text_area("Edit JSON", value=st.session_state.file_content[file_path], height=400, key="doc_editor", on_change=lambda: auto_save_edit()) col1, col2 = st.columns([1, 5]) with col1: if st.button("💾 Save"): if save_file_content(file_path, new_content): st.session_state.file_content[file_path] = new_content st.success("Saved! 🎉") time.sleep(1) st.rerun() with col2: st.download_button("⬇️", data=new_content, file_name=os.path.basename(file_path), mime="text/markdown") def save_file_content(file_path, content): try: with open(file_path, 'w', encoding='utf-8') as file: file.write(content) return True except Exception as e: st.error(f"Save error: {str(e)}") return False def update_file_management_section(): if 'file_view_mode' not in st.session_state: st.session_state.file_view_mode = None if 'current_file' not in st.session_state: st.session_state.current_file = None if 'file_content' not in st.session_state: st.session_state.file_content = {} all_files = sorted(glob.glob("*.md"), reverse=True) st.sidebar.title("📁 Files") if st.sidebar.button("🗑 Delete All"): for file in all_files: os.remove(file) st.session_state.file_content = {} st.session_state.current_file = None st.session_state.file_view_mode = None st.rerun() if st.sidebar.button("⬇️ Download All"): zip_file = create_zip_of_files(all_files) st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True) for file in all_files: col1, col2, col3, col4 = st.sidebar.columns([1, 3, 1, 1]) with col1: if st.button("🌐", key=f"view_{file}"): st.session_state.current_file = file st.session_state.file_view_mode = 'view' if file not in st.session_state.file_content: content = load_file(file) if content is not None: st.session_state.file_content[file] = content st.rerun() with col2: st.markdown(get_download_link(file), unsafe_allow_html=True) with col3: if st.button("📂", key=f"edit_{file}"): st.session_state.current_file = file st.session_state.file_view_mode = 'edit' if file not in st.session_state.file_content: content = load_file(file) if content is not None: st.session_state.file_content[file] = content st.rerun() with col4: if st.button("🗑", key=f"delete_{file}"): os.remove(file) if file in st.session_state.file_content: del st.session_state.file_content[file] if st.session_state.current_file == file: st.session_state.current_file = None st.session_state.file_view_mode = None st.rerun() st.sidebar.markdown("---") st.sidebar.title("External Help Links") for link in external_links: st.sidebar.markdown(f"{link['emoji']} [{link['title']}]({link['url']})", unsafe_allow_html=True) if st.session_state.current_file: if st.session_state.file_view_mode == 'view': display_file_viewer(st.session_state.current_file) elif st.session_state.file_view_mode == 'edit': display_file_editor(st.session_state.current_file) # ============================================================================= # 🎥 VIDEO & AUDIO UI FUNCTIONS # ============================================================================= def validate_and_preprocess_image(file_data, target_size=(576, 1024)): try: st.write("Preprocessing image...") if isinstance(file_data, bytes): img = Image.open(io.BytesIO(file_data)) elif hasattr(file_data, 'read'): if hasattr(file_data, 'seek'): file_data.seek(0) img = Image.open(file_data) elif isinstance(file_data, Image.Image): img = file_data else: raise ValueError(f"Unsupported input: {type(file_data)}") if img.mode != 'RGB': img = img.convert('RGB') aspect_ratio = img.size[0] / img.size[1] if aspect_ratio > target_size[0] / target_size[1]: new_width = target_size[0] new_height = int(new_width / aspect_ratio) else: new_height = target_size[1] new_width = int(new_height * aspect_ratio) new_width = (new_width // 2) * 2 new_height = (new_height // 2) * 2 resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) final_img = Image.new('RGB', target_size, (255, 255, 255)) paste_x = (target_size[0] - new_width) // 2 paste_y = (target_size[1] - new_height) // 2 final_img.paste(resized_img, (paste_x, paste_y)) return final_img except Exception as e: st.error(f"Image error: {str(e)}") return None def add_video_generation_ui(container): st.markdown("### 🎥 Video Gen") col1, col2 = st.columns([2, 1]) with col1: uploaded_file = st.file_uploader("Upload Image 🖼️", type=['png', 'jpg', 'jpeg']) with col2: st.markdown("#### Params") motion = st.slider("🌊 Motion", 1, 255, 127) fps = st.slider("🎬 FPS", 1, 30, 6) with st.expander("Advanced"): use_custom = st.checkbox("Custom Seed") seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None if uploaded_file is not None: try: file_data = uploaded_file.read() preview1, preview2 = st.columns(2) with preview1: st.write("Original") st.image(Image.open(io.BytesIO(file_data)), use_column_width=True) with preview2: proc_img = validate_and_preprocess_image(io.BytesIO(file_data)) if proc_img: st.write("Processed") st.image(proc_img, use_column_width=True) else: st.error("Preprocess failed") return if st.button("🎥 Generate"): with st.spinner("Generating video..."): with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file: proc_img.save(temp_file.name, format='PNG') try: client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN")) result = client.predict( image=temp_file.name, seed=seed if seed is not None else int(time.time() * 1000), randomize_seed=seed is None, motion_bucket_id=motion, fps_id=fps, api_name="/video" ) if result and isinstance(result, tuple) and len(result) >= 1: video_path = result[0].get('video') if isinstance(result[0], dict) else None if video_path and os.path.exists(video_path): video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" shutil.copy(video_path, video_filename) st.success(f"Video generated! 🎉") st.video(video_filename) if container: video_record = { "id": generate_unique_id(), "pk": generate_unique_id(), "type": "generated_video", "filename": video_filename, "seed": seed if seed is not None else "random", "motion": motion, "fps": fps, "timestamp": datetime.now().isoformat() } success, message = insert_record(container, video_record) if success: st.success("DB record saved!") else: st.error(f"DB error: {message}") else: st.error("Invalid result format") else: st.error("No result returned") except Exception as e: st.error(f"Video gen error: {str(e)}") finally: try: os.unlink(temp_file.name) st.write("Temp file removed") except Exception as e: st.warning(f"Cleanup error: {str(e)}") except Exception as e: st.error(f"Upload error: {str(e)}") # ============================================================================= # 🤖 NEW ITEM & FIELD FUNCTIONS # ============================================================================= # 🆕 Create a new sample document with default values def new_item_default(container): new_id = generate_unique_id() default_doc = { "id": new_id, "pk": new_id, "name": "New Sample Document", "content": "Start editing your document here...", "timestamp": datetime.now().isoformat(), "type": "sample" } success, message = insert_record(container, default_doc) if success: st.success("New sample document created! ✨") return default_doc else: st.error("Error creating new item: " + message) return None # 🔄 Auto-save changes from the editor def auto_save_edit(): try: edited_str = st.session_state.doc_editor edited_doc = json.loads(edited_str) container = st.session_state.current_container container.upsert_item(edited_doc) st.success("Auto-saved! 💾") except Exception as e: st.error(f"Auto-save error: {str(e)}") # ➕ Add a new field (key/value) to the current document def add_field_to_doc(): key = st.session_state.new_field_key value = st.session_state.new_field_value try: doc = json.loads(st.session_state.doc_editor) doc[key] = value st.session_state.doc_editor = json.dumps(doc, indent=2) auto_save_edit() st.success(f"Added field {key} 👍") except Exception as e: st.error(f"Error adding field: {str(e)}") # ============================================================================= # 🔍 VECTOR SEARCH INTERFACE (Simple keyword search) # ============================================================================= def vector_keyword_search(keyword, container): try: query = f"SELECT * FROM c WHERE CONTAINS(c.content, '{keyword}')" results = list(container.query_items(query=query, enable_cross_partition_query=True)) return results except Exception as e: st.error(f"Vector search error: {str(e)}") return [] # ============================================================================= # 🤖 NEW AI MODALITY RECORD TEMPLATES # ============================================================================= def new_ai_record(container): new_id = generate_unique_id() default_doc = { "id": new_id, "pk": new_id, "name": "AI Modality Record", "function_url": "https://example.com/function", "input_text": "### Input (markdown)\n\nType your input here.", "output_text": "### Output (markdown)\n\nResult will appear here.", "timestamp": datetime.now().isoformat(), "type": "ai_modality" } success, message = insert_record(container, default_doc) if success: st.success("New AI modality record created! 💡") return default_doc else: st.error("Error creating AI record: " + message) return None def new_links_record(container): new_id = generate_unique_id() links_md = "\n".join([f"- {link['emoji']} [{link['title']}]({link['url']})" for link in external_links]) default_doc = { "id": new_id, "pk": new_id, "name": "Portal Links Record", "function_url": "", "input_text": links_md, "output_text": "", "timestamp": datetime.now().isoformat(), "type": "ai_modality" } success, message = insert_record(container, default_doc) if success: st.success("New Portal Links record created! 🔗") return default_doc else: st.error("Error creating links record: " + message) return None # ============================================================================= # 🤖 LANGCHAIN FUNCTIONS (Witty emoji comments) # ============================================================================= def display_langchain_functions(): functions = [ {"name": "OpenAIEmbeddings", "comment": "🔮 Creates embeddings using OpenAI – pure magic!"}, {"name": "AzureCosmosDBNoSqlVectorSearch", "comment": "🚀 Performs vector search on Cosmos DB – superfast and smart!"}, {"name": "RecursiveCharacterTextSplitter", "comment": "✂️ Slices text into manageable chunks – like a pro chef!"} ] st.sidebar.markdown("### 🤖 Langchain Functions") for func in functions: st.sidebar.write(f"{func['name']}: {func['comment']}") # ============================================================================= # 📝 MAIN FUNCTION # ============================================================================= def main(): st.markdown("### 🐙 GitCosmos - Cosmos & Git Hub") # Set a friendly portal link (with emoji) for Cosmos DB st.markdown(f"[🔗 Portal]({CosmosDBUrl})") if "chat_history" not in st.session_state: st.session_state.chat_history = [] # NEW: Initialize container for auto-save editor access st.session_state.current_container = None # Auth & Cosmos client initialization if Key: st.session_state.primary_key = Key st.session_state.logged_in = True else: st.error("Missing Cosmos Key 🔑❌") return # Sidebar: New Item, Add Field, Vector Search, New AI Record buttons st.sidebar.markdown("## 🛠️ Item Management") # New Item button if st.sidebar.button("New Item"): if st.session_state.get("current_container"): new_doc = new_item_default(st.session_state.current_container) if new_doc: st.session_state.doc_editor = json.dumps(new_doc, indent=2) else: st.warning("No container selected!") # Add Field UI inputs st.sidebar.text_input("New Field Key", key="new_field_key") st.sidebar.text_input("New Field Value", key="new_field_value") if st.sidebar.button("Add Field"): if "doc_editor" in st.session_state: add_field_to_doc() else: st.warning("No document loaded to add a field.") # New AI Record button if st.sidebar.button("New AI Record"): if st.session_state.get("current_container"): new_ai_record(st.session_state.current_container) else: st.warning("No container selected!") # New Portal Links Record button if st.sidebar.button("New Links Record"): if st.session_state.get("current_container"): new_links_record(st.session_state.current_container) else: st.warning("No container selected!") # Vector Search UI st.sidebar.markdown("## 🔍 Vector Search") search_keyword = st.sidebar.text_input("Search Keyword", key="vector_search_keyword") if st.sidebar.button("Search"): if st.session_state.get("current_container"): results = vector_keyword_search(search_keyword, st.session_state.current_container) st.sidebar.write(f"Found {len(results)} results:") for res in results: st.sidebar.code(json.dumps(res, indent=2), language="json") else: st.warning("No container selected for search!") # Display Langchain functions display_langchain_functions() # Navigator and container selection try: if st.session_state.get("client") is None: st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) st.sidebar.title("🐙 Navigator") databases = get_databases(st.session_state.client) selected_db = st.sidebar.selectbox("🗃️ DB", databases) # Friendly portal link already shown above if selected_db != st.session_state.get("selected_database"): st.session_state.selected_database = selected_db st.session_state.selected_container = None st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() if st.session_state.selected_database: database = st.session_state.client.get_database_client(st.session_state.selected_database) # New Container creation UI if "show_new_container_form" not in st.session_state: st.session_state.show_new_container_form = False if st.sidebar.button("🆕 New Container"): st.session_state.show_new_container_form = True if st.session_state.show_new_container_form: with st.sidebar.form("new_container_form"): new_container_id = st.text_input("Container ID", value="aiml-container") new_partition_key = st.text_input("Partition Key", value="/pk") new_analytical = st.checkbox("Enable Analytical Store", value=True) submitted = st.form_submit_button("Create Container") if submitted: analytical_ttl = -1 if new_analytical else None new_container = create_new_container( database, new_container_id, new_partition_key, analytical_storage_ttl=analytical_ttl ) if new_container: st.success(f"Container '{new_container_id}' created.") # Insert default templated item for image prompts default_id = generate_unique_id() default_item = { "id": default_id, "pk": default_id, "name": "Default Image Prompt", "prompt": "Enter your image prompt here", "timestamp": datetime.now().isoformat(), "type": "image_prompt" } insert_success, insert_message = insert_record(new_container, default_item) if insert_success: st.info("Default templated item created in new container.") else: st.error(f"Default item insertion error: {insert_message}") st.session_state.show_new_container_form = False st.session_state.new_container_created = new_container_id st.rerun() # Update container list containers = get_containers(database) if "new_container_created" in st.session_state and st.session_state.new_container_created not in containers: containers.append(st.session_state.new_container_created) selected_container = st.sidebar.selectbox("📁 Container", containers) if selected_container != st.session_state.get("selected_container"): st.session_state.selected_container = selected_container st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() if st.session_state.selected_container: container = database.get_container_client(st.session_state.selected_container) st.session_state.current_container = container # For auto-save functions if st.sidebar.button("📦 Export"): download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client) if download_link.startswith(' num_docs else documents st.sidebar.info(f"Showing {len(documents_to_display)} docs") # Document Viewer / Editor view_options = ['Markdown', 'Code', 'Run AI', 'Clone', 'New'] selected_view = st.sidebar.selectbox("View", view_options, index=1) if selected_view == 'Markdown': st.markdown("#### 📄 Markdown") if documents: doc = documents[st.session_state.current_index] content = json.dumps(doc, indent=2) st.markdown(f"```json\n{content}\n```") col_prev, col_next = st.columns(2) with col_prev: if st.button("⬅️") and st.session_state.current_index > 0: st.session_state.current_index -= 1 st.rerun() with col_next: if st.button("➡️") and st.session_state.current_index < total_docs - 1: st.session_state.current_index += 1 st.rerun() elif selected_view == 'Code': st.markdown("#### 💻 Code Editor") if documents: doc = documents[st.session_state.current_index] # If a new document is loaded via New Item, load it into doc_editor if "doc_editor" not in st.session_state: st.session_state.doc_editor = json.dumps(doc, indent=2) edited = st.text_area("Edit JSON", value=st.session_state.doc_editor, height=300, key="doc_editor", on_change=lambda: auto_save_edit()) col_prev, col_next = st.columns(2) with col_prev: if st.button("⬅️") and st.session_state.current_index > 0: st.session_state.current_index -= 1 st.rerun() with col_next: if st.button("➡️") and st.session_state.current_index < total_docs - 1: st.session_state.current_index += 1 st.rerun() col_save, col_delete = st.columns(2) with col_save: if st.button("💾 Save", key=f'save_{st.session_state.current_index}'): try: updated_doc = json.loads(edited) container.upsert_item(body=updated_doc) st.success(f"Saved {updated_doc['id']}") st.rerun() except Exception as e: st.error(f"Save err: {str(e)}") with col_delete: if st.button("🗑️ Delete", key=f'delete_{st.session_state.current_index}'): try: current_doc = json.loads(edited) success, message = delete_record(container, current_doc) if success: st.success(message) st.rerun() else: st.error(message) except Exception as e: st.error(f"Delete err: {str(e)}") if "delete_log" in st.session_state and st.session_state.delete_log: st.subheader("Delete Log") for log_entry in st.session_state.delete_log[-5:]: st.write(log_entry) elif selected_view == 'Run AI': st.markdown("#### 🤖 Run AI (stub)") st.info("AI functionality not implemented.") elif selected_view == 'Clone': st.markdown("#### 📄 Clone") if documents: doc = documents[st.session_state.current_index] st.markdown(f"Original ID: {doc.get('id', '')}") new_id = st.text_input("New ID", value=generate_unique_id(), key='new_clone_id') new_name = st.text_input("New Name", value=f"Clone_{new_id[:8]}", key='new_clone_name') new_doc = {'id': new_id, 'pk': new_id, 'name': new_name, **{k: v for k, v in doc.items() if k not in ['id', 'name', 'pk', '_rid', '_self', '_etag', '_attachments', '_ts']}} doc_str = st.text_area("Edit JSON", value=json.dumps(new_doc, indent=2), height=300, key='clone_preview') col1, col2 = st.columns(2) with col1: if st.button("🔄 Regenerate"): new_id = generate_unique_id() st.session_state.new_clone_id = new_id st.rerun() with col2: if st.button("💾 Save Clone"): try: final_doc = json.loads(doc_str) for field in ['_rid', '_self', '_etag', '_attachments', '_ts']: final_doc.pop(field, None) container.create_item(body=final_doc) st.success(f"Cloned {final_doc['id']}") st.rerun() except Exception as e: st.error(f"Clone err: {str(e)}") col_prev, col_next = st.columns(2) with col_prev: if st.button("⬅️") and st.session_state.current_index > 0: st.session_state.current_index -= 1 st.rerun() with col_next: if st.button("➡️") and st.session_state.current_index < total_docs - 1: st.session_state.current_index += 1 st.rerun() elif selected_view == 'New': st.markdown("#### ➕ New Doc") if st.button("🤖 Auto-Gen"): auto_doc = { "id": generate_unique_id(), "pk": generate_unique_id(), "name": f"Auto {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "content": "Auto-generated record.", "timestamp": datetime.now().isoformat() } success, message = insert_record(container, auto_doc) if success: st.success(message) st.rerun() else: st.error(message) else: new_id = st.text_input("ID", value=generate_unique_id(), key='new_id') default_doc = { "id": new_id, "pk": new_id, "name": "New Doc", "content": "", "timestamp": datetime.now().isoformat() } new_doc_str = st.text_area("JSON", value=json.dumps(default_doc, indent=2), height=300) if st.button("➕ Create"): try: cleaned = preprocess_text(new_doc_str) new_doc = json.loads(cleaned) new_doc['id'] = new_id new_doc['pk'] = new_id success, message = insert_record(container, new_doc) if success: st.success(f"Created {new_doc['id']}") st.rerun() else: st.error(message) except Exception as e: st.error(f"Create err: {str(e)}") st.subheader(f"📊 {st.session_state.selected_container}") if documents_to_display: df = pd.DataFrame(documents_to_display) st.dataframe(df) else: st.info("No docs.") update_file_management_section() except exceptions.CosmosHttpResponseError as e: st.error(f"Cosmos error: {str(e)} 🚨") except Exception as e: st.error(f"Error: {str(e)} 😱") if st.session_state.logged_in and st.sidebar.button("🚪 Logout"): st.markdown("#### 🚪 Logout") st.session_state.logged_in = False st.session_state.selected_records = [] st.session_state.client = None st.session_state.selected_database = None st.session_state.selected_container = None st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() if __name__ == "__main__": main() # ============================================================================= # ───────────── Additional Blank Lines for Spacing (~1500 lines total) ───────────── # ============================================================================= # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # End of app.py