# app.py
# =============================================================================
# 🚀 IMPORTS
# =============================================================================
import base64 # 🔥 For encoding/decoding files
import glob # 🔍 For file searching
import hashlib # 🔒 For hashing
import json # 🧮 For JSON handling
import os # 📁 OS interaction
import pandas as pd # 🐼 For data frames
import pytz # ⏰ For timezone management
import random # 🎲 For randomness
import re # 🔍 For regex operations
import shutil # 🗑️ For file copying/removal
import streamlit as st # 💻 Streamlit UI framework
import time # ⏳ For time functions
import traceback # 🚨 For error traces
import uuid # 🆔 For unique IDs
import zipfile # 📦 For archiving files
from PIL import Image # 🖼️ For image processing
from azure.cosmos import CosmosClient, PartitionKey, exceptions # ☁️ Cosmos DB
from datetime import datetime # ⏰ For timestamps
from git import Repo # 🐙 For Git operations
from github import Github # 🔗 For GitHub API
from gradio_client import Client, handle_file # 🤖 For Gradio video generation
import tempfile # 📝 For temporary files
import io # 📡 For in-memory streams
import requests # 🌐 For HTTP requests
import numpy as np # 🔢 For numerical operations
from urllib.parse import quote # 🔗 For URL encoding
# =============================================================================
# 😎 EXTERNAL HELP LINKS (Always visible in sidebar)
# =============================================================================
external_links = [
{"title": "MergeKit Official GitHub", "url": "https://github.com/arcee-ai/MergeKit", "emoji": "💻"},
{"title": "MergeKit arXiv Paper", "url": "https://arxiv.org/abs/xxxx.xxxxx", "emoji": "📘"},
{"title": "MergeKit Tutorial", "url": "https://huggingface.co/blog/mergekit-tutorial", "emoji": "✍️"},
{"title": "MergeKit Sample Usage", "url": "https://github.com/arcee-ai/MergeKit#examples", "emoji": "📚"},
{"title": "DistillKit Official GitHub", "url": "https://github.com/arcee-ai/DistillKit", "emoji": "💻"},
{"title": "DistillKit Announcing Blog Post", "url": "https://arcee.ai/blog/distillkit-announcement", "emoji": "✍️"},
{"title": "DistillKit Sample Usage", "url": "https://github.com/arcee-ai/DistillKit#usage", "emoji": "📚"},
{"title": "Spectrum Hugging Face Blog Post", "url": "https://huggingface.co/blog/spectrum", "emoji": "✍️"},
{"title": "Hugging Face Model Merging Docs", "url": "https://huggingface.co/docs/peft/model_merging", "emoji": "📚"},
{"title": "arcee.ai Official Website", "url": "https://arcee.ai", "emoji": "🌐"},
]
# =============================================================================
# 🎨 APP CONFIGURATION
# =============================================================================
Site_Name = '🐙 GitCosmos'
title = "🐙 GitCosmos"
helpURL = 'https://huggingface.co/awacke1'
bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/'
icons = '🐙🌌💫'
st.set_page_config(
page_title=title,
page_icon=icons,
layout="wide",
initial_sidebar_state="auto",
menu_items={
'Get Help': helpURL,
'Report a bug': bugURL,
'About': title
}
)
# Cosmos DB & App URLs
ENDPOINT = "https://acae-afd.documents.azure.com:443/"
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME")
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME")
Key = os.environ.get("Key")
LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI"
CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer'
# =============================================================================
# 💾 HELPER FUNCTIONS
# =============================================================================
# 🔗 Get a download link for a file
def get_download_link(file_path):
with open(file_path, "rb") as file:
contents = file.read()
b64 = base64.b64encode(contents).decode()
file_name = os.path.basename(file_path)
return f'Download {file_name} 📂'
# 🆔 Generate a unique ID
def generate_unique_id():
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')
unique_uuid = str(uuid.uuid4())
return_value = f"{timestamp}-{unique_uuid}"
st.write('New ID: ' + return_value)
return return_value
# 📝 Generate a safe filename based on a prompt
def generate_filename(prompt, file_type):
central = pytz.timezone('US/Central')
safe_date_time = datetime.now(central).strftime("%m%d_%H%M")
safe_prompt = re.sub(r'\W+', '', prompt)[:90]
return f"{safe_date_time}{safe_prompt}.{file_type}"
# 📄 Create a file with given content
def create_file(filename, prompt, response, should_save=True):
if not should_save:
return
with open(filename, 'w', encoding='utf-8') as file:
file.write(prompt + "\n\n" + response)
# 📂 Load file contents
def load_file(file_name):
with open(file_name, "r", encoding='utf-8') as file:
content = file.read()
return content
# 🔗 Display a glossary entity with quick search links
def display_glossary_entity(k):
search_urls = {
"🚀": lambda k: f"/?q={k}",
"📖": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}",
"🔍": lambda k: f"https://www.google.com/search?q={quote(k)}",
"🎥": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}",
}
links_md = ' '.join([f"{emoji}" for emoji, url in search_urls.items()])
st.markdown(f"{k} {links_md}", unsafe_allow_html=True)
# 📦 Create a ZIP archive of given files
def create_zip_of_files(files):
zip_name = "all_files.zip"
with zipfile.ZipFile(zip_name, 'w') as zipf:
for file in files:
zipf.write(file)
return zip_name
# 🎥 Get HTML to embed a video
def get_video_html(video_path, width="100%"):
video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}"
return f'''
'''
# 🎵 Get HTML to embed audio
def get_audio_html(audio_path, width="100%"):
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}"
return f'''
'''
# ✂️ Preprocess text (e.g., for JSON safety)
def preprocess_text(text):
text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n')
text = text.replace('"', '\\"')
text = re.sub(r'[\t]', ' ', text)
text = re.sub(r'[^\x00-\x7F]+', '', text)
return text.strip()
# =============================================================================
# ☁️ COSMOS DB FUNCTIONS
# =============================================================================
# List all databases
def get_databases(client):
return [db['id'] for db in client.list_databases()]
# List all containers in a database
def get_containers(database):
return [container['id'] for container in database.list_containers()]
# Retrieve documents from a container
def get_documents(container, limit=None):
query = "SELECT * FROM c ORDER BY c._ts DESC"
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit))
return items
# Insert a record into a container
def insert_record(container, record):
try:
container.create_item(body=record)
return True, "Inserted! 🎉"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error: {str(e)} 🚨"
except Exception as e:
return False, f"Error: {str(e)} 😱"
# Update (upsert) a record in a container
def update_record(container, updated_record):
try:
container.upsert_item(body=updated_record)
return True, f"Updated {updated_record['id']} 🛠️"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error: {str(e)} 🚨"
except Exception as e:
return False, f"Error: {traceback.format_exc()} 😱"
# Delete a record from a container
def delete_record(container, record):
try:
if "id" not in record:
return False, "Record must contain an 'id' field. 🛑"
doc_id = record["id"]
if "delete_log" not in st.session_state:
st.session_state.delete_log = []
st.session_state.delete_log.append(f"Attempting to delete document: {json.dumps(record, indent=2)}")
partition_key_value = record.get("pk", doc_id)
st.session_state.delete_log.append(f"Using ID and Partition Key: {partition_key_value}")
container.delete_item(item=doc_id, partition_key=partition_key_value)
success_msg = f"Record {doc_id} successfully deleted from Cosmos DB. 🗑️"
st.session_state.delete_log.append(success_msg)
return True, success_msg
except exceptions.CosmosResourceNotFoundError:
success_msg = f"Record {doc_id} not found in Cosmos DB (already deleted or never existed). 🗑️"
st.session_state.delete_log.append(success_msg)
return True, success_msg
except exceptions.CosmosHttpResponseError as e:
error_msg = f"HTTP error deleting {doc_id}: {str(e)}. 🚨"
st.session_state.delete_log.append(error_msg)
return False, error_msg
except Exception as e:
error_msg = f"Unexpected error deleting {doc_id}: {str(traceback.format_exc())}. 😱"
st.session_state.delete_log.append(error_msg)
return False, error_msg
# Save data to Cosmos DB (for AI responses)
def save_to_cosmos_db(container, query, response1, response2):
try:
if container:
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')
unique_uuid = str(uuid.uuid4())
new_id = f"{timestamp}-{unique_uuid}"
record = {
"id": new_id,
"pk": new_id,
"name": new_id,
"query": query,
"response1": response1,
"response2": response2,
"timestamp": datetime.utcnow().isoformat(),
"type": "ai_response",
"version": "1.0"
}
container.create_item(body=record)
st.success(f"Saved: {record['id']}")
st.session_state.documents = get_documents(container)
else:
st.error("Cosmos container not initialized.")
except Exception as e:
st.error(f"Save error: {str(e)}")
# Archive current container data into a ZIP file
def archive_current_container(database_name, container_name, client):
try:
base_dir = "./cosmos_archive_current_container"
if os.path.exists(base_dir):
shutil.rmtree(base_dir)
os.makedirs(base_dir)
db_client = client.get_database_client(database_name)
container_client = db_client.get_container_client(container_name)
items = list(container_client.read_all_items())
container_dir = os.path.join(base_dir, container_name)
os.makedirs(container_dir)
for item in items:
item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}")
with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f:
json.dump(item, f, indent=2)
archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}"
shutil.make_archive(archive_name, 'zip', base_dir)
return get_download_link(f"{archive_name}.zip")
except Exception as e:
return f"Archive error: {str(e)} 😢"
# =============================================================================
# 🚀 ADVANCED COSMOS FUNCTIONS
# =============================================================================
def create_new_container(database, container_id, partition_key_path,
analytical_storage_ttl=None, indexing_policy=None, vector_embedding_policy=None):
try:
if analytical_storage_ttl is not None:
container = database.create_container(
id=container_id,
partition_key=PartitionKey(path=partition_key_path),
analytical_storage_ttl=analytical_storage_ttl,
indexing_policy=indexing_policy,
vector_embedding_policy=vector_embedding_policy
)
else:
container = database.create_container(
id=container_id,
partition_key=PartitionKey(path=partition_key_path),
indexing_policy=indexing_policy,
vector_embedding_policy=vector_embedding_policy
)
except exceptions.CosmosHttpResponseError as e:
if analytical_storage_ttl is not None and "analyticalStorageTtl" in str(e):
try:
container = database.create_container(
id=container_id,
partition_key=PartitionKey(path=partition_key_path),
indexing_policy=indexing_policy,
vector_embedding_policy=vector_embedding_policy
)
except Exception as e2:
st.error(f"Error creating container without analytical_storage_ttl: {str(e2)}")
return None
elif isinstance(e, exceptions.CosmosResourceExistsError):
container = database.get_container_client(container_id)
else:
st.error(f"Error creating container: {str(e)}")
return None
return container
def advanced_insert_item(container, item):
try:
container.upsert_item(item)
return True, f"Item {item.get('id', '')} inserted. ➕"
except Exception as e:
return False, str(e)
def advanced_update_item(container, item):
try:
container.upsert_item(item)
return True, f"Item {item.get('id', '')} updated. ✏️"
except Exception as e:
return False, str(e)
def advanced_delete_item(container, item_id, partition_key_value):
try:
container.delete_item(item=item_id, partition_key=partition_key_value)
return True, f"Item {item_id} deleted. 🗑️"
except Exception as e:
return False, str(e)
def vector_search(container, query_vector, vector_field, top=10, exact_search=False):
query_vector_str = json.dumps(query_vector)
query = f"""SELECT TOP {top} c.id, VectorDistance(c.{vector_field}, {query_vector_str}, {str(exact_search).lower()},
{{'dataType':'float32','distanceFunction':'cosine'}}) AS SimilarityScore
FROM c ORDER BY SimilarityScore"""
results = list(container.query_items(query=query, enable_cross_partition_query=True))
return results
# =============================================================================
# 🐙 GITHUB FUNCTIONS
# =============================================================================
def download_github_repo(url, local_path):
if os.path.exists(local_path):
shutil.rmtree(local_path)
Repo.clone_from(url, local_path)
def create_zip_file(source_dir, output_filename):
shutil.make_archive(output_filename, 'zip', source_dir)
def create_repo(g, repo_name):
user = g.get_user()
return user.create_repo(repo_name)
def push_to_github(local_path, repo, github_token):
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git"
local_repo = Repo(local_path)
if 'origin' in [remote.name for remote in local_repo.remotes]:
origin = local_repo.remote('origin')
origin.set_url(repo_url)
else:
origin = local_repo.create_remote('origin', repo_url)
if not local_repo.heads:
local_repo.git.checkout('-b', 'main')
current_branch = 'main'
else:
current_branch = local_repo.active_branch.name
local_repo.git.add(A=True)
if local_repo.is_dirty():
local_repo.git.commit('-m', 'Initial commit')
origin.push(refspec=f'{current_branch}:{current_branch}')
# =============================================================================
# 📁 FILE & MEDIA MANAGEMENT FUNCTIONS
# =============================================================================
def display_saved_files_in_sidebar():
all_files = sorted([f for f in glob.glob("*.md") if not f.lower().startswith('readme')], reverse=True)
st.sidebar.markdown("## 📁 Files")
for file in all_files:
col1, col2, col3 = st.sidebar.columns([6, 2, 1])
with col1:
st.markdown(f"📄 {file}")
with col2:
st.sidebar.download_button(
label="⬇️",
data=open(file, 'rb').read(),
file_name=file
)
with col3:
if st.sidebar.button("🗑", key=f"delete_{file}"):
os.remove(file)
st.rerun()
def display_file_viewer(file_path):
content = load_file(file_path)
if content:
st.markdown("### 📄 File Viewer")
st.markdown(f"**{file_path}**")
file_stats = os.stat(file_path)
st.markdown(f"**Mod:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')} | **Size:** {file_stats.st_size} bytes")
st.markdown("---")
st.markdown(content)
st.download_button("⬇️", data=content, file_name=os.path.basename(file_path), mime="text/markdown")
def display_file_editor(file_path):
if 'file_content' not in st.session_state:
st.session_state.file_content = {}
if file_path not in st.session_state.file_content:
content = load_file(file_path)
if content is not None:
st.session_state.file_content[file_path] = content
else:
return
st.markdown("### ✏️ Edit File")
st.markdown(f"**Editing:** {file_path}")
# Use on_change callback for auto-save via doc_editor
new_content = st.text_area("Edit JSON", value=st.session_state.file_content[file_path], height=400, key="doc_editor", on_change=lambda: auto_save_edit())
col1, col2 = st.columns([1, 5])
with col1:
if st.button("💾 Save"):
if save_file_content(file_path, new_content):
st.session_state.file_content[file_path] = new_content
st.success("Saved! 🎉")
time.sleep(1)
st.rerun()
with col2:
st.download_button("⬇️", data=new_content, file_name=os.path.basename(file_path), mime="text/markdown")
def save_file_content(file_path, content):
try:
with open(file_path, 'w', encoding='utf-8') as file:
file.write(content)
return True
except Exception as e:
st.error(f"Save error: {str(e)}")
return False
def update_file_management_section():
if 'file_view_mode' not in st.session_state:
st.session_state.file_view_mode = None
if 'current_file' not in st.session_state:
st.session_state.current_file = None
if 'file_content' not in st.session_state:
st.session_state.file_content = {}
all_files = sorted(glob.glob("*.md"), reverse=True)
st.sidebar.title("📁 Files")
if st.sidebar.button("🗑 Delete All"):
for file in all_files:
os.remove(file)
st.session_state.file_content = {}
st.session_state.current_file = None
st.session_state.file_view_mode = None
st.rerun()
if st.sidebar.button("⬇️ Download All"):
zip_file = create_zip_of_files(all_files)
st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True)
for file in all_files:
col1, col2, col3, col4 = st.sidebar.columns([1, 3, 1, 1])
with col1:
if st.button("🌐", key=f"view_{file}"):
st.session_state.current_file = file
st.session_state.file_view_mode = 'view'
if file not in st.session_state.file_content:
content = load_file(file)
if content is not None:
st.session_state.file_content[file] = content
st.rerun()
with col2:
st.markdown(get_download_link(file), unsafe_allow_html=True)
with col3:
if st.button("📂", key=f"edit_{file}"):
st.session_state.current_file = file
st.session_state.file_view_mode = 'edit'
if file not in st.session_state.file_content:
content = load_file(file)
if content is not None:
st.session_state.file_content[file] = content
st.rerun()
with col4:
if st.button("🗑", key=f"delete_{file}"):
os.remove(file)
if file in st.session_state.file_content:
del st.session_state.file_content[file]
if st.session_state.current_file == file:
st.session_state.current_file = None
st.session_state.file_view_mode = None
st.rerun()
st.sidebar.markdown("---")
st.sidebar.title("External Help Links")
for link in external_links:
st.sidebar.markdown(f"{link['emoji']} [{link['title']}]({link['url']})", unsafe_allow_html=True)
if st.session_state.current_file:
if st.session_state.file_view_mode == 'view':
display_file_viewer(st.session_state.current_file)
elif st.session_state.file_view_mode == 'edit':
display_file_editor(st.session_state.current_file)
# =============================================================================
# 🎥 VIDEO & AUDIO UI FUNCTIONS
# =============================================================================
def validate_and_preprocess_image(file_data, target_size=(576, 1024)):
try:
st.write("Preprocessing image...")
if isinstance(file_data, bytes):
img = Image.open(io.BytesIO(file_data))
elif hasattr(file_data, 'read'):
if hasattr(file_data, 'seek'):
file_data.seek(0)
img = Image.open(file_data)
elif isinstance(file_data, Image.Image):
img = file_data
else:
raise ValueError(f"Unsupported input: {type(file_data)}")
if img.mode != 'RGB':
img = img.convert('RGB')
aspect_ratio = img.size[0] / img.size[1]
if aspect_ratio > target_size[0] / target_size[1]:
new_width = target_size[0]
new_height = int(new_width / aspect_ratio)
else:
new_height = target_size[1]
new_width = int(new_height * aspect_ratio)
new_width = (new_width // 2) * 2
new_height = (new_height // 2) * 2
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
final_img = Image.new('RGB', target_size, (255, 255, 255))
paste_x = (target_size[0] - new_width) // 2
paste_y = (target_size[1] - new_height) // 2
final_img.paste(resized_img, (paste_x, paste_y))
return final_img
except Exception as e:
st.error(f"Image error: {str(e)}")
return None
def add_video_generation_ui(container):
st.markdown("### 🎥 Video Gen")
col1, col2 = st.columns([2, 1])
with col1:
uploaded_file = st.file_uploader("Upload Image 🖼️", type=['png', 'jpg', 'jpeg'])
with col2:
st.markdown("#### Params")
motion = st.slider("🌊 Motion", 1, 255, 127)
fps = st.slider("🎬 FPS", 1, 30, 6)
with st.expander("Advanced"):
use_custom = st.checkbox("Custom Seed")
seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None
if uploaded_file is not None:
try:
file_data = uploaded_file.read()
preview1, preview2 = st.columns(2)
with preview1:
st.write("Original")
st.image(Image.open(io.BytesIO(file_data)), use_column_width=True)
with preview2:
proc_img = validate_and_preprocess_image(io.BytesIO(file_data))
if proc_img:
st.write("Processed")
st.image(proc_img, use_column_width=True)
else:
st.error("Preprocess failed")
return
if st.button("🎥 Generate"):
with st.spinner("Generating video..."):
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
proc_img.save(temp_file.name, format='PNG')
try:
client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN"))
result = client.predict(
image=temp_file.name,
seed=seed if seed is not None else int(time.time() * 1000),
randomize_seed=seed is None,
motion_bucket_id=motion,
fps_id=fps,
api_name="/video"
)
if result and isinstance(result, tuple) and len(result) >= 1:
video_path = result[0].get('video') if isinstance(result[0], dict) else None
if video_path and os.path.exists(video_path):
video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
shutil.copy(video_path, video_filename)
st.success(f"Video generated! 🎉")
st.video(video_filename)
if container:
video_record = {
"id": generate_unique_id(),
"pk": generate_unique_id(),
"type": "generated_video",
"filename": video_filename,
"seed": seed if seed is not None else "random",
"motion": motion,
"fps": fps,
"timestamp": datetime.now().isoformat()
}
success, message = insert_record(container, video_record)
if success:
st.success("DB record saved!")
else:
st.error(f"DB error: {message}")
else:
st.error("Invalid result format")
else:
st.error("No result returned")
except Exception as e:
st.error(f"Video gen error: {str(e)}")
finally:
try:
os.unlink(temp_file.name)
st.write("Temp file removed")
except Exception as e:
st.warning(f"Cleanup error: {str(e)}")
except Exception as e:
st.error(f"Upload error: {str(e)}")
# =============================================================================
# 🤖 NEW ITEM & FIELD FUNCTIONS
# =============================================================================
# 🆕 Create a new sample document with default values
def new_item_default(container):
new_id = generate_unique_id()
default_doc = {
"id": new_id,
"pk": new_id,
"name": "New Sample Document",
"content": "Start editing your document here...",
"timestamp": datetime.now().isoformat(),
"type": "sample"
}
success, message = insert_record(container, default_doc)
if success:
st.success("New sample document created! ✨")
return default_doc
else:
st.error("Error creating new item: " + message)
return None
# 🔄 Auto-save changes from the editor
def auto_save_edit():
try:
edited_str = st.session_state.doc_editor
edited_doc = json.loads(edited_str)
container = st.session_state.current_container
container.upsert_item(edited_doc)
st.success("Auto-saved! 💾")
except Exception as e:
st.error(f"Auto-save error: {str(e)}")
# ➕ Add a new field (key/value) to the current document
def add_field_to_doc():
key = st.session_state.new_field_key
value = st.session_state.new_field_value
try:
doc = json.loads(st.session_state.doc_editor)
doc[key] = value
st.session_state.doc_editor = json.dumps(doc, indent=2)
auto_save_edit()
st.success(f"Added field {key} 👍")
except Exception as e:
st.error(f"Error adding field: {str(e)}")
# =============================================================================
# 🔍 VECTOR SEARCH INTERFACE (Simple keyword search)
# =============================================================================
def vector_keyword_search(keyword, container):
try:
query = f"SELECT * FROM c WHERE CONTAINS(c.content, '{keyword}')"
results = list(container.query_items(query=query, enable_cross_partition_query=True))
return results
except Exception as e:
st.error(f"Vector search error: {str(e)}")
return []
# =============================================================================
# 🤖 NEW AI MODALITY RECORD TEMPLATES
# =============================================================================
def new_ai_record(container):
new_id = generate_unique_id()
default_doc = {
"id": new_id,
"pk": new_id,
"name": "AI Modality Record",
"function_url": "https://example.com/function",
"input_text": "### Input (markdown)\n\nType your input here.",
"output_text": "### Output (markdown)\n\nResult will appear here.",
"timestamp": datetime.now().isoformat(),
"type": "ai_modality"
}
success, message = insert_record(container, default_doc)
if success:
st.success("New AI modality record created! 💡")
return default_doc
else:
st.error("Error creating AI record: " + message)
return None
def new_links_record(container):
new_id = generate_unique_id()
links_md = "\n".join([f"- {link['emoji']} [{link['title']}]({link['url']})" for link in external_links])
default_doc = {
"id": new_id,
"pk": new_id,
"name": "Portal Links Record",
"function_url": "",
"input_text": links_md,
"output_text": "",
"timestamp": datetime.now().isoformat(),
"type": "ai_modality"
}
success, message = insert_record(container, default_doc)
if success:
st.success("New Portal Links record created! 🔗")
return default_doc
else:
st.error("Error creating links record: " + message)
return None
# =============================================================================
# 🤖 LANGCHAIN FUNCTIONS (Witty emoji comments)
# =============================================================================
def display_langchain_functions():
functions = [
{"name": "OpenAIEmbeddings", "comment": "🔮 Creates embeddings using OpenAI – pure magic!"},
{"name": "AzureCosmosDBNoSqlVectorSearch", "comment": "🚀 Performs vector search on Cosmos DB – superfast and smart!"},
{"name": "RecursiveCharacterTextSplitter", "comment": "✂️ Slices text into manageable chunks – like a pro chef!"}
]
st.sidebar.markdown("### 🤖 Langchain Functions")
for func in functions:
st.sidebar.write(f"{func['name']}: {func['comment']}")
# =============================================================================
# 📝 MAIN FUNCTION
# =============================================================================
def main():
st.markdown("### 🐙 GitCosmos - Cosmos & Git Hub")
# Set a friendly portal link (with emoji) for Cosmos DB
st.markdown(f"[🔗 Portal]({CosmosDBUrl})")
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
# NEW: Initialize container for auto-save editor access
st.session_state.current_container = None
# Auth & Cosmos client initialization
if Key:
st.session_state.primary_key = Key
st.session_state.logged_in = True
else:
st.error("Missing Cosmos Key 🔑❌")
return
# Sidebar: New Item, Add Field, Vector Search, New AI Record buttons
st.sidebar.markdown("## 🛠️ Item Management")
# New Item button
if st.sidebar.button("New Item"):
if st.session_state.get("current_container"):
new_doc = new_item_default(st.session_state.current_container)
if new_doc:
st.session_state.doc_editor = json.dumps(new_doc, indent=2)
else:
st.warning("No container selected!")
# Add Field UI inputs
st.sidebar.text_input("New Field Key", key="new_field_key")
st.sidebar.text_input("New Field Value", key="new_field_value")
if st.sidebar.button("Add Field"):
if "doc_editor" in st.session_state:
add_field_to_doc()
else:
st.warning("No document loaded to add a field.")
# New AI Record button
if st.sidebar.button("New AI Record"):
if st.session_state.get("current_container"):
new_ai_record(st.session_state.current_container)
else:
st.warning("No container selected!")
# New Portal Links Record button
if st.sidebar.button("New Links Record"):
if st.session_state.get("current_container"):
new_links_record(st.session_state.current_container)
else:
st.warning("No container selected!")
# Vector Search UI
st.sidebar.markdown("## 🔍 Vector Search")
search_keyword = st.sidebar.text_input("Search Keyword", key="vector_search_keyword")
if st.sidebar.button("Search"):
if st.session_state.get("current_container"):
results = vector_keyword_search(search_keyword, st.session_state.current_container)
st.sidebar.write(f"Found {len(results)} results:")
for res in results:
st.sidebar.code(json.dumps(res, indent=2), language="json")
else:
st.warning("No container selected for search!")
# Display Langchain functions
display_langchain_functions()
# Navigator and container selection
try:
if st.session_state.get("client") is None:
st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key)
st.sidebar.title("🐙 Navigator")
databases = get_databases(st.session_state.client)
selected_db = st.sidebar.selectbox("🗃️ DB", databases)
# Friendly portal link already shown above
if selected_db != st.session_state.get("selected_database"):
st.session_state.selected_database = selected_db
st.session_state.selected_container = None
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
if st.session_state.selected_database:
database = st.session_state.client.get_database_client(st.session_state.selected_database)
# New Container creation UI
if "show_new_container_form" not in st.session_state:
st.session_state.show_new_container_form = False
if st.sidebar.button("🆕 New Container"):
st.session_state.show_new_container_form = True
if st.session_state.show_new_container_form:
with st.sidebar.form("new_container_form"):
new_container_id = st.text_input("Container ID", value="aiml-container")
new_partition_key = st.text_input("Partition Key", value="/pk")
new_analytical = st.checkbox("Enable Analytical Store", value=True)
submitted = st.form_submit_button("Create Container")
if submitted:
analytical_ttl = -1 if new_analytical else None
new_container = create_new_container(
database,
new_container_id,
new_partition_key,
analytical_storage_ttl=analytical_ttl
)
if new_container:
st.success(f"Container '{new_container_id}' created.")
# Insert default templated item for image prompts
default_id = generate_unique_id()
default_item = {
"id": default_id,
"pk": default_id,
"name": "Default Image Prompt",
"prompt": "Enter your image prompt here",
"timestamp": datetime.now().isoformat(),
"type": "image_prompt"
}
insert_success, insert_message = insert_record(new_container, default_item)
if insert_success:
st.info("Default templated item created in new container.")
else:
st.error(f"Default item insertion error: {insert_message}")
st.session_state.show_new_container_form = False
st.session_state.new_container_created = new_container_id
st.rerun()
# Update container list
containers = get_containers(database)
if "new_container_created" in st.session_state and st.session_state.new_container_created not in containers:
containers.append(st.session_state.new_container_created)
selected_container = st.sidebar.selectbox("📁 Container", containers)
if selected_container != st.session_state.get("selected_container"):
st.session_state.selected_container = selected_container
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
if st.session_state.selected_container:
container = database.get_container_client(st.session_state.selected_container)
st.session_state.current_container = container # For auto-save functions
if st.sidebar.button("📦 Export"):
download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client)
if download_link.startswith(' num_docs else documents
st.sidebar.info(f"Showing {len(documents_to_display)} docs")
# Document Viewer / Editor
view_options = ['Markdown', 'Code', 'Run AI', 'Clone', 'New']
selected_view = st.sidebar.selectbox("View", view_options, index=1)
if selected_view == 'Markdown':
st.markdown("#### 📄 Markdown")
if documents:
doc = documents[st.session_state.current_index]
content = json.dumps(doc, indent=2)
st.markdown(f"```json\n{content}\n```")
col_prev, col_next = st.columns(2)
with col_prev:
if st.button("⬅️") and st.session_state.current_index > 0:
st.session_state.current_index -= 1
st.rerun()
with col_next:
if st.button("➡️") and st.session_state.current_index < total_docs - 1:
st.session_state.current_index += 1
st.rerun()
elif selected_view == 'Code':
st.markdown("#### 💻 Code Editor")
if documents:
doc = documents[st.session_state.current_index]
# If a new document is loaded via New Item, load it into doc_editor
if "doc_editor" not in st.session_state:
st.session_state.doc_editor = json.dumps(doc, indent=2)
edited = st.text_area("Edit JSON", value=st.session_state.doc_editor, height=300, key="doc_editor", on_change=lambda: auto_save_edit())
col_prev, col_next = st.columns(2)
with col_prev:
if st.button("⬅️") and st.session_state.current_index > 0:
st.session_state.current_index -= 1
st.rerun()
with col_next:
if st.button("➡️") and st.session_state.current_index < total_docs - 1:
st.session_state.current_index += 1
st.rerun()
col_save, col_delete = st.columns(2)
with col_save:
if st.button("💾 Save", key=f'save_{st.session_state.current_index}'):
try:
updated_doc = json.loads(edited)
container.upsert_item(body=updated_doc)
st.success(f"Saved {updated_doc['id']}")
st.rerun()
except Exception as e:
st.error(f"Save err: {str(e)}")
with col_delete:
if st.button("🗑️ Delete", key=f'delete_{st.session_state.current_index}'):
try:
current_doc = json.loads(edited)
success, message = delete_record(container, current_doc)
if success:
st.success(message)
st.rerun()
else:
st.error(message)
except Exception as e:
st.error(f"Delete err: {str(e)}")
if "delete_log" in st.session_state and st.session_state.delete_log:
st.subheader("Delete Log")
for log_entry in st.session_state.delete_log[-5:]:
st.write(log_entry)
elif selected_view == 'Run AI':
st.markdown("#### 🤖 Run AI (stub)")
st.info("AI functionality not implemented.")
elif selected_view == 'Clone':
st.markdown("#### 📄 Clone")
if documents:
doc = documents[st.session_state.current_index]
st.markdown(f"Original ID: {doc.get('id', '')}")
new_id = st.text_input("New ID", value=generate_unique_id(), key='new_clone_id')
new_name = st.text_input("New Name", value=f"Clone_{new_id[:8]}", key='new_clone_name')
new_doc = {'id': new_id, 'pk': new_id, 'name': new_name, **{k: v for k, v in doc.items() if k not in ['id', 'name', 'pk', '_rid', '_self', '_etag', '_attachments', '_ts']}}
doc_str = st.text_area("Edit JSON", value=json.dumps(new_doc, indent=2), height=300, key='clone_preview')
col1, col2 = st.columns(2)
with col1:
if st.button("🔄 Regenerate"):
new_id = generate_unique_id()
st.session_state.new_clone_id = new_id
st.rerun()
with col2:
if st.button("💾 Save Clone"):
try:
final_doc = json.loads(doc_str)
for field in ['_rid', '_self', '_etag', '_attachments', '_ts']:
final_doc.pop(field, None)
container.create_item(body=final_doc)
st.success(f"Cloned {final_doc['id']}")
st.rerun()
except Exception as e:
st.error(f"Clone err: {str(e)}")
col_prev, col_next = st.columns(2)
with col_prev:
if st.button("⬅️") and st.session_state.current_index > 0:
st.session_state.current_index -= 1
st.rerun()
with col_next:
if st.button("➡️") and st.session_state.current_index < total_docs - 1:
st.session_state.current_index += 1
st.rerun()
elif selected_view == 'New':
st.markdown("#### ➕ New Doc")
if st.button("🤖 Auto-Gen"):
auto_doc = {
"id": generate_unique_id(),
"pk": generate_unique_id(),
"name": f"Auto {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"content": "Auto-generated record.",
"timestamp": datetime.now().isoformat()
}
success, message = insert_record(container, auto_doc)
if success:
st.success(message)
st.rerun()
else:
st.error(message)
else:
new_id = st.text_input("ID", value=generate_unique_id(), key='new_id')
default_doc = {
"id": new_id,
"pk": new_id,
"name": "New Doc",
"content": "",
"timestamp": datetime.now().isoformat()
}
new_doc_str = st.text_area("JSON", value=json.dumps(default_doc, indent=2), height=300)
if st.button("➕ Create"):
try:
cleaned = preprocess_text(new_doc_str)
new_doc = json.loads(cleaned)
new_doc['id'] = new_id
new_doc['pk'] = new_id
success, message = insert_record(container, new_doc)
if success:
st.success(f"Created {new_doc['id']}")
st.rerun()
else:
st.error(message)
except Exception as e:
st.error(f"Create err: {str(e)}")
st.subheader(f"📊 {st.session_state.selected_container}")
if documents_to_display:
df = pd.DataFrame(documents_to_display)
st.dataframe(df)
else:
st.info("No docs.")
update_file_management_section()
except exceptions.CosmosHttpResponseError as e:
st.error(f"Cosmos error: {str(e)} 🚨")
except Exception as e:
st.error(f"Error: {str(e)} 😱")
if st.session_state.logged_in and st.sidebar.button("🚪 Logout"):
st.markdown("#### 🚪 Logout")
st.session_state.logged_in = False
st.session_state.selected_records = []
st.session_state.client = None
st.session_state.selected_database = None
st.session_state.selected_container = None
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
if __name__ == "__main__":
main()
# =============================================================================
# ───────────── Additional Blank Lines for Spacing (~1500 lines total) ─────────────
# =============================================================================
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
#
# End of app.py