AzureCosmosDBUI / app20.py
awacke1's picture
Update app20.py
c18d3c9 verified
app.py
# =============================================================================
# ───────────── IMPORTS ─────────────
# =============================================================================
import base64
import glob
import hashlib
import json
import os
import pandas as pd
import pytz
import random
import re
import shutil
import streamlit as st
import time
import traceback
import uuid
import zipfile
from PIL import Image
from azure.cosmos import CosmosClient, PartitionKey, exceptions
from datetime import datetime
from git import Repo
from github import Github
from gradio_client import Client, handle_file
import tempfile
import io
import requests
import numpy as np
from urllib.parse import quote
# =============================================================================
# ───────────── EXTERNAL HELP LINKS (Always visible in sidebar) ─────────────
# =============================================================================
external_links = [
{"title": "CosmosDB GenAI Full Text Search", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/full-text-search", "emoji": "πŸ’»"},
{"title": "CosmosDB SQL API Client Library", "url": "https://learn.microsoft.com/en-us/python/api/overview/azure/cosmos-readme?view=azure-python", "emoji": "πŸ’»"},
{"title": "CosmosDB Index and Query Vectors", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-python-vector-index-query", "emoji": "πŸ’»"},
{"title": "CosmosDB NoSQL Materialized Views", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/materialized-views", "emoji": "πŸ’»"},
{"title": "LangChain Vector Store Guide", "url": "https://python.langchain.com/docs/integrations/vectorstores/azure_cosmos_db_no_sql/", "emoji": "πŸ’»"},
{"title": "Vector Database Prompt Engineering RAG for Python", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/vector-database?source=recommendations", "emoji": "πŸ’»"},
{"title": "MergeKit Official GitHub", "url": "https://github.com/arcee-ai/MergeKit", "emoji": "πŸ’»"},
{"title": "MergeKit Sample Usage", "url": "https://github.com/arcee-ai/MergeKit#examples", "emoji": "πŸ“š"},
{"title": "DistillKit Official GitHub", "url": "https://github.com/arcee-ai/DistillKit", "emoji": "πŸ’»"},
{"title": "DistillKit Sample Usage", "url": "https://github.com/arcee-ai/DistillKit#usage", "emoji": "πŸ“š"},
{"title": "arcee.ai Official Website", "url": "https://arcee.ai", "emoji": "🌐"},
]
# =============================================================================
# ───────────── APP CONFIGURATION ─────────────
# =============================================================================
Site_Name = 'πŸ™ GitCosmos'
title = "πŸ™ GitCosmos"
helpURL = 'https://huggingface.co/awacke1'
bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/'
icons = 'πŸ™πŸŒŒπŸ’«'
st.set_page_config(
page_title=title,
page_icon=icons,
layout="wide",
initial_sidebar_state="auto",
menu_items={
'Get Help': helpURL,
'Report a bug': bugURL,
'About': title
}
)
# Cosmos DB & App URLs
ENDPOINT = "https://acae-afd.documents.azure.com:443/"
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME")
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME")
Key = os.environ.get("Key")
LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI"
CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer'
# =============================================================================
# ───────────── HELPER FUNCTIONS ─────────────
# =============================================================================
def get_download_link(file_path):
with open(file_path, "rb") as file:
contents = file.read()
b64 = base64.b64encode(contents).decode()
file_name = os.path.basename(file_path)
return f'<a href="data:file/txt;base64,{b64}" download="{file_name}">Download {file_name} πŸ“‚</a>'
def generate_unique_id():
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')
unique_uuid = str(uuid.uuid4())
return_value = f"{timestamp}-{unique_uuid}"
st.write('New ID: ' + return_value)
return return_value
def generate_filename(prompt, file_type):
central = pytz.timezone('US/Central')
safe_date_time = datetime.now(central).strftime("%m%d_%H%M")
safe_prompt = re.sub(r'\W+', '', prompt)[:90]
return f"{safe_date_time}{safe_prompt}.{file_type}"
def create_file(filename, prompt, response, should_save=True):
if not should_save:
return
with open(filename, 'w', encoding='utf-8') as file:
file.write(prompt + "\n\n" + response)
def load_file(file_name):
with open(file_name, "r", encoding='utf-8') as file:
content = file.read()
return content
def display_glossary_entity(k):
search_urls = {
"πŸš€": lambda k: f"/?q={k}",
"πŸ“–": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}",
"πŸ”": lambda k: f"https://www.google.com/search?q={quote(k)}",
"πŸŽ₯": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}",
}
links_md = ' '.join([f"<a href='{url(k)}' target='_blank'>{emoji}</a>" for emoji, url in search_urls.items()])
st.markdown(f"{k} {links_md}", unsafe_allow_html=True)
def create_zip_of_files(files):
zip_name = "all_files.zip"
with zipfile.ZipFile(zip_name, 'w') as zipf:
for file in files:
zipf.write(file)
return zip_name
def get_video_html(video_path, width="100%"):
video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}"
return f'''
<video width="{width}" controls autoplay loop>
<source src="{video_url}" type="video/mp4">
Your browser does not support video.
</video>
'''
def get_audio_html(audio_path, width="100%"):
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}"
return f'''
<audio controls style="width:{width}">
<source src="{audio_url}" type="audio/mpeg">
Your browser does not support audio.
</audio>
'''
def preprocess_text(text):
text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n')
text = text.replace('"', '\\"')
text = re.sub(r'[\t]', ' ', text)
text = re.sub(r'[^\x00-\x7F]+', '', text)
return text.strip()
# NEW: Sanitize JSON text before saving (remove problematic control characters)
def sanitize_json_text(text):
# Remove control characters except newline, carriage return, and tab
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text)
# Escape newline, tab, and carriage return
text = text.replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
return text
# =============================================================================
# ───────────── COSMOS DB FUNCTIONS ─────────────
# =============================================================================
def get_databases(client):
return [db['id'] for db in client.list_databases()]
def get_containers(database):
return [container['id'] for container in database.list_containers()]
def get_documents(container, limit=None):
query = "SELECT * FROM c ORDER BY c._ts DESC"
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit))
return items
def insert_record(container, record):
try:
container.create_item(body=record)
return True, "Inserted! πŸŽ‰"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error: {str(e)} 🚨"
except Exception as e:
return False, f"Error: {str(e)} 😱"
def update_record(container, updated_record):
try:
container.upsert_item(body=updated_record)
return True, f"Updated {updated_record['id']} πŸ› οΈ"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error: {str(e)} 🚨"
except Exception as e:
return False, f"Error: {traceback.format_exc()} 😱"
def delete_record(container, record):
try:
if "id" not in record:
return False, "Record must contain an 'id' field. πŸ›‘"
doc_id = record["id"]
if "delete_log" not in st.session_state:
st.session_state.delete_log = []
st.session_state.delete_log.append(f"Attempting to delete document: {json.dumps(record, indent=2)}")
partition_key_value = record.get("pk", doc_id)
st.session_state.delete_log.append(f"Using ID and Partition Key: {partition_key_value}")
container.delete_item(item=doc_id, partition_key=partition_key_value)
success_msg = f"Record {doc_id} successfully deleted from Cosmos DB. πŸ—‘οΈ"
st.session_state.delete_log.append(success_msg)
return True, success_msg
except exceptions.CosmosResourceNotFoundError:
success_msg = f"Record {doc_id} not found in Cosmos DB (already deleted or never existed). πŸ—‘οΈ"
st.session_state.delete_log.append(success_msg)
return True, success_msg
except exceptions.CosmosHttpResponseError as e:
error_msg = f"HTTP error deleting {doc_id}: {str(e)}. 🚨"
st.session_state.delete_log.append(error_msg)
return False, error_msg
except Exception as e:
error_msg = f"Unexpected error deleting {doc_id}: {str(traceback.format_exc())}. 😱"
st.session_state.delete_log.append(error_msg)
return False, error_msg
def save_to_cosmos_db(container, query, response1, response2):
try:
if container:
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')
unique_uuid = str(uuid.uuid4())
new_id = f"{timestamp}-{unique_uuid}"
record = {
"id": new_id,
"pk": new_id,
"name": new_id,
"query": query,
"response1": response1,
"response2": response2,
"timestamp": datetime.utcnow().isoformat(),
"type": "ai_response",
"version": "1.0"
}
container.create_item(body=record)
st.success(f"Saved: {record['id']}")
st.session_state.documents = get_documents(container)
else:
st.error("Cosmos container not initialized.")
except Exception as e:
st.error(f"Save error: {str(e)}")
def archive_current_container(database_name, container_name, client):
try:
base_dir = "./cosmos_archive_current_container"
if os.path.exists(base_dir):
shutil.rmtree(base_dir)
os.makedirs(base_dir)
db_client = client.get_database_client(database_name)
container_client = db_client.get_container_client(container_name)
items = list(container_client.read_all_items())
container_dir = os.path.join(base_dir, container_name)
os.makedirs(container_dir)
for item in items:
item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}")
with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f:
json.dump(item, f, indent=2)
archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}"
shutil.make_archive(archive_name, 'zip', base_dir)
return get_download_link(f"{archive_name}.zip")
except Exception as e:
return f"Archive error: {str(e)} 😒"
# =============================================================================
# ───────────── ADVANCED COSMOS FUNCTIONS ─────────────
# =============================================================================
def create_new_container(database, container_id, partition_key_path,
analytical_storage_ttl=None, indexing_policy=None, vector_embedding_policy=None):
try:
if analytical_storage_ttl is not None:
container = database.create_container(
id=container_id,
partition_key=PartitionKey(path=partition_key_path),
analytical_storage_ttl=analytical_storage_ttl,
indexing_policy=indexing_policy,
vector_embedding_policy=vector_embedding_policy
)
else:
container = database.create_container(
id=container_id,
partition_key=PartitionKey(path=partition_key_path),
indexing_policy=indexing_policy,
vector_embedding_policy=vector_embedding_policy
)
except exceptions.CosmosHttpResponseError as e:
if analytical_storage_ttl is not None and "analyticalStorageTtl" in str(e):
try:
container = database.create_container(
id=container_id,
partition_key=PartitionKey(path=partition_key_path),
indexing_policy=indexing_policy,
vector_embedding_policy=vector_embedding_policy
)
except Exception as e2:
st.error(f"Error creating container without analytical_storage_ttl: {str(e2)}")
return None
elif isinstance(e, exceptions.CosmosResourceExistsError):
container = database.get_container_client(container_id)
else:
st.error(f"Error creating container: {str(e)}")
return None
return container
def advanced_insert_item(container, item):
try:
container.upsert_item(item)
return True, f"Item {item.get('id', '')} inserted. βž•"
except Exception as e:
return False, str(e)
def advanced_update_item(container, item):
try:
container.upsert_item(item)
return True, f"Item {item.get('id', '')} updated. ✏️"
except Exception as e:
return False, str(e)
def advanced_delete_item(container, item_id, partition_key_value):
try:
container.delete_item(item=item_id, partition_key=partition_key_value)
return True, f"Item {item_id} deleted. πŸ—‘οΈ"
except Exception as e:
return False, str(e)
def vector_search(container, query_vector, vector_field, top=10, exact_search=False):
query_vector_str = json.dumps(query_vector)
query = f"""SELECT TOP {top} c.id, VectorDistance(c.{vector_field}, {query_vector_str}, {str(exact_search).lower()},
{{'dataType':'float32','distanceFunction':'cosine'}}) AS SimilarityScore
FROM c ORDER BY SimilarityScore"""
results = list(container.query_items(query=query, enable_cross_partition_query=True))
return results
# =============================================================================
# ───────────── GITHUB FUNCTIONS ─────────────
# =============================================================================
def download_github_repo(url, local_path):
if os.path.exists(local_path):
shutil.rmtree(local_path)
Repo.clone_from(url, local_path)
def create_zip_file(source_dir, output_filename):
shutil.make_archive(output_filename, 'zip', source_dir)
def create_repo(g, repo_name):
user = g.get_user()
return user.create_repo(repo_name)
def push_to_github(local_path, repo, github_token):
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git"
local_repo = Repo(local_path)
if 'origin' in [remote.name for remote in local_repo.remotes]:
origin = local_repo.remote('origin')
origin.set_url(repo_url)
else:
origin = local_repo.create_remote('origin', repo_url)
if not local_repo.heads:
local_repo.git.checkout('-b', 'main')
current_branch = 'main'
else:
current_branch = local_repo.active_branch.name
local_repo.git.add(A=True)
if local_repo.is_dirty():
local_repo.git.commit('-m', 'Initial commit')
origin.push(refspec=f'{current_branch}:{current_branch}')
# =============================================================================
# ───────────── FILE & MEDIA MANAGEMENT FUNCTIONS ─────────────
# =============================================================================
def display_saved_files_in_sidebar():
all_files = sorted([f for f in glob.glob("*.md") if not f.lower().startswith('readme')], reverse=True)
st.sidebar.markdown("## πŸ“ Files")
for file in all_files:
col1, col2, col3 = st.sidebar.columns([6, 2, 1])
with col1:
st.markdown(f"πŸ“„ {file}")
with col2:
st.sidebar.download_button(
label="⬇️",
data=open(file, 'rb').read(),
file_name=file
)
with col3:
if st.sidebar.button("πŸ—‘", key=f"delete_{file}"):
os.remove(file)
st.rerun()
def display_file_viewer(file_path):
content = load_file(file_path)
if content:
st.markdown("### πŸ“„ File Viewer")
st.markdown(f"**{file_path}**")
file_stats = os.stat(file_path)
st.markdown(f"**Mod:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')} | **Size:** {file_stats.st_size} bytes")
st.markdown("---")
st.markdown(content)
st.download_button("⬇️", data=content, file_name=os.path.basename(file_path), mime="text/markdown")
def display_file_editor(file_path):
if 'file_content' not in st.session_state:
st.session_state.file_content = {}
if file_path not in st.session_state.file_content:
content = load_file(file_path)
if content is not None:
st.session_state.file_content[file_path] = content
else:
return
st.markdown("### ✏️ Edit File")
st.markdown(f"**Editing:** {file_path}")
md_tab, code_tab = st.tabs(["Markdown", "Code"])
with md_tab:
st.markdown(st.session_state.file_content[file_path])
with code_tab:
new_content = st.text_area("Edit:", value=st.session_state.file_content[file_path], height=400, key=f"editor_{hash(file_path)}", on_change=lambda: auto_save_edit())
col1, col2 = st.columns([1, 5])
with col1:
if st.button("πŸ’Ύ Save"):
sanitized = sanitize_json_text(new_content)
try:
json.loads(sanitized)
except Exception as e:
st.error(f"Sanitization failed: {str(e)}")
return
if save_file_content(file_path, sanitized):
st.session_state.file_content[file_path] = sanitized
st.success("Saved! πŸŽ‰")
time.sleep(1)
st.rerun()
with col2:
st.download_button("⬇️", data=new_content, file_name=os.path.basename(file_path), mime="text/markdown")
def save_file_content(file_path, content):
try:
with open(file_path, 'w', encoding='utf-8') as file:
file.write(content)
return True
except Exception as e:
st.error(f"Save error: {str(e)}")
return False
def update_file_management_section():
if 'file_view_mode' not in st.session_state:
st.session_state.file_view_mode = None
if 'current_file' not in st.session_state:
st.session_state.current_file = None
if 'file_content' not in st.session_state:
st.session_state.file_content = {}
all_files = sorted(glob.glob("*.md"), reverse=True)
st.sidebar.title("πŸ“ Files")
if st.sidebar.button("πŸ—‘ Delete All"):
for file in all_files:
os.remove(file)
st.session_state.file_content = {}
st.session_state.current_file = None
st.session_state.file_view_mode = None
st.rerun()
if st.sidebar.button("⬇️ Download All"):
zip_file = create_zip_of_files(all_files)
st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True)
for file in all_files:
col1, col2, col3, col4 = st.sidebar.columns([1, 3, 1, 1])
with col1:
if st.button("🌐", key=f"view_{file}"):
st.session_state.current_file = file
st.session_state.file_view_mode = 'view'
if file not in st.session_state.file_content:
content = load_file(file)
if content is not None:
st.session_state.file_content[file] = content
st.rerun()
with col2:
st.markdown(get_download_link(file), unsafe_allow_html=True)
with col3:
if st.button("πŸ“‚", key=f"edit_{file}"):
st.session_state.current_file = file
st.session_state.file_view_mode = 'edit'
if file not in st.session_state.file_content:
content = load_file(file)
if content is not None:
st.session_state.file_content[file] = content
st.rerun()
with col4:
if st.button("πŸ—‘", key=f"delete_{file}"):
os.remove(file)
if file in st.session_state.file_content:
del st.session_state.file_content[file]
if st.session_state.current_file == file:
st.session_state.current_file = None
st.session_state.file_view_mode = None
st.rerun()
st.sidebar.markdown("---")
st.sidebar.title("External Help Links")
for link in external_links:
st.sidebar.markdown(f"{link['emoji']} [{link['title']}]({link['url']})", unsafe_allow_html=True)
if st.session_state.current_file:
if st.session_state.file_view_mode == 'view':
display_file_viewer(st.session_state.current_file)
elif st.session_state.file_view_mode == 'edit':
display_file_editor(st.session_state.current_file)
# =============================================================================
# ───────────── SIDEBAR DATA GRID (Records with formatted timestamps) ─────────────
# =============================================================================
def show_sidebar_data_grid():
if st.session_state.get("current_container"):
try:
records = get_documents(st.session_state.current_container)
data = []
for rec in records:
ts = rec.get("timestamp", "")
try:
dt = datetime.fromisoformat(ts)
formatted = dt.strftime("%I:%M %p %m/%d/%Y")
except Exception:
formatted = ts
data.append({
"ID": rec.get("id", ""),
"Name": rec.get("name", ""),
"Timestamp": formatted
})
df = pd.DataFrame(data)
st.sidebar.markdown("### πŸ“Š Data Grid")
st.sidebar.dataframe(df[["Name", "Timestamp"]])
except Exception as e:
st.sidebar.error(f"Data grid error: {str(e)}")
else:
st.sidebar.info("No container selected for data grid.")
# =============================================================================
# ───────────── SEARCH RESULTS DISPLAY (Editable Code Editors)
# =============================================================================
def display_search_results(keyword, container):
results = vector_keyword_search(keyword, container)
st.markdown("### πŸ” Search Results")
for res in results:
doc_id = res.get("id", "")
exp = st.expander(f"Result {doc_id}")
with exp:
edited = st.text_area("Edit Document", value=json.dumps(res, indent=2), key=f"search_{doc_id}")
if st.button(f"πŸ’Ύ Save changes for {doc_id}", key=f"save_search_{doc_id}"):
try:
updated_doc = json.loads(edited)
container.upsert_item(body=updated_doc)
st.success(f"Updated {doc_id}!")
st.experimental_rerun()
except Exception as e:
st.error(f"Error saving {doc_id}: {str(e)}")
# =============================================================================
# ───────────── DOCUMENTS LIST VIEW (Editable List with Sorting)
# =============================================================================
def edit_documents_list(container):
records = get_documents(container)
sort_option = st.selectbox("Sort by", ["Timestamp", "Name"], key="sort_option")
if sort_option == "Name":
records.sort(key=lambda r: r.get("name", "").lower())
else:
records.sort(key=lambda r: r.get("timestamp", ""), reverse=True)
data = []
for rec in records:
ts = rec.get("timestamp", "")
try:
dt = datetime.fromisoformat(ts)
formatted = dt.strftime("%I:%M %p %m/%d/%Y")
except Exception:
formatted = ts
data.append({
"ID": rec.get("id", ""),
"Name": rec.get("name", ""),
"Content": rec.get("content", "")[:100] + "..." if rec.get("content", "") else "",
"Timestamp": formatted
})
df = pd.DataFrame(data)
edited_df = st.data_editor(df[["Name", "Content", "Timestamp"]], key="docs_editor", num_rows="dynamic")
if st.button("πŸ’Ύ Save List Changes"):
for idx, row in edited_df.iterrows():
original = data[idx]
if row["Name"] != original["Name"] or row["Content"] != original["Content"]:
doc_id = original["ID"]
doc = next((r for r in records if r.get("id") == doc_id), None)
if doc:
doc["name"] = row["Name"]
doc["content"] = row["Content"]
success, message = update_record(container, doc)
if success:
st.success(f"Updated {doc_id} πŸ‘")
else:
st.error(f"Error updating {doc_id}: {message}")
st.experimental_rerun()
# =============================================================================
# ───────────── VIDEO & AUDIO UI FUNCTIONS ─────────────
# =============================================================================
def validate_and_preprocess_image(file_data, target_size=(576, 1024)):
try:
st.write("Preprocessing image...")
if isinstance(file_data, bytes):
img = Image.open(io.BytesIO(file_data))
elif hasattr(file_data, 'read'):
if hasattr(file_data, 'seek'):
file_data.seek(0)
img = Image.open(file_data)
elif isinstance(file_data, Image.Image):
img = file_data
else:
raise ValueError(f"Unsupported input: {type(file_data)}")
if img.mode != 'RGB':
img = img.convert('RGB')
aspect_ratio = img.size[0] / img.size[1]
if aspect_ratio > target_size[0] / target_size[1]:
new_width = target_size[0]
new_height = int(new_width / aspect_ratio)
else:
new_height = target_size[1]
new_width = int(new_height * aspect_ratio)
new_width = (new_width // 2) * 2
new_height = (new_height // 2) * 2
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
final_img = Image.new('RGB', target_size, (255, 255, 255))
paste_x = (target_size[0] - new_width) // 2
paste_y = (target_size[1] - new_height) // 2
final_img.paste(resized_img, (paste_x, paste_y))
return final_img
except Exception as e:
st.error(f"Image error: {str(e)}")
return None
def add_video_generation_ui(container):
st.markdown("### πŸŽ₯ Video Gen")
col1, col2 = st.columns([2, 1])
with col1:
uploaded_file = st.file_uploader("Upload Image πŸ–ΌοΈ", type=['png', 'jpg', 'jpeg'])
with col2:
st.markdown("#### Params")
motion = st.slider("🌊 Motion", 1, 255, 127)
fps = st.slider("🎬 FPS", 1, 30, 6)
with st.expander("Advanced"):
use_custom = st.checkbox("Custom Seed")
seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None
if uploaded_file is not None:
try:
file_data = uploaded_file.read()
preview1, preview2 = st.columns(2)
with preview1:
st.write("Original")
st.image(Image.open(io.BytesIO(file_data)), use_column_width=True)
with preview2:
proc_img = validate_and_preprocess_image(io.BytesIO(file_data))
if proc_img:
st.write("Processed")
st.image(proc_img, use_column_width=True)
else:
st.error("Preprocess failed")
return
if st.button("πŸŽ₯ Generate"):
with st.spinner("Generating video..."):
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
proc_img.save(temp_file.name, format='PNG')
try:
client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN"))
result = client.predict(
image=temp_file.name,
seed=seed if seed is not None else int(time.time() * 1000),
randomize_seed=seed is None,
motion_bucket_id=motion,
fps_id=fps,
api_name="/video"
)
if result and isinstance(result, tuple) and len(result) >= 1:
video_path = result[0].get('video') if isinstance(result[0], dict) else None
if video_path and os.path.exists(video_path):
video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
shutil.copy(video_path, video_filename)
st.success(f"Video generated! πŸŽ‰")
st.video(video_filename)
if container:
video_record = {
"id": generate_unique_id(),
"pk": generate_unique_id(),
"type": "generated_video",
"filename": video_filename,
"seed": seed if seed is not None else "random",
"motion": motion,
"fps": fps,
"timestamp": datetime.now().isoformat()
}
success, message = insert_record(container, video_record)
if success:
st.success("DB record saved!")
else:
st.error(f"DB error: {message}")
else:
st.error("Invalid result format")
else:
st.error("No result returned")
except Exception as e:
st.error(f"Video gen error: {str(e)}")
finally:
try:
os.unlink(temp_file.name)
st.write("Temp file removed")
except Exception as e:
st.warning(f"Cleanup error: {str(e)}")
except Exception as e:
st.error(f"Upload error: {str(e)}")
# =============================================================================
# ───────────── NEW ITEM & FIELD FUNCTIONS
# =============================================================================
def new_item_default(container):
new_id = generate_unique_id()
default_doc = {
"id": new_id,
"pk": new_id,
"name": "New Sample Document",
"content": "Start editing your document here...",
"timestamp": datetime.now().isoformat(),
"type": "sample"
}
success, message = insert_record(container, default_doc)
if success:
st.success("New sample document created! ✨")
return default_doc
else:
st.error("Error creating new item: " + message)
return None
def auto_save_edit():
try:
edited_str = st.session_state.doc_editor
try:
json.loads(edited_str)
except Exception:
edited_str = sanitize_json_text(edited_str)
edited_doc = json.loads(edited_str)
container = st.session_state.current_container
container.upsert_item(edited_doc)
st.success("Auto-saved! πŸ’Ύ")
except Exception as e:
st.error(f"Auto-save error: {str(e)}")
def add_field_to_doc():
key = st.session_state.new_field_key
value = st.session_state.new_field_value
try:
doc = json.loads(st.session_state.doc_editor)
doc[key] = value
st.session_state.doc_editor = json.dumps(doc, indent=2)
auto_save_edit()
st.success(f"Added field {key} πŸ‘")
except Exception as e:
st.error(f"Error adding field: {str(e)}")
# =============================================================================
# ───────────── VECTOR SEARCH INTERFACE (Simple keyword search)
# =============================================================================
def vector_keyword_search(keyword, container):
try:
query = f"SELECT * FROM c WHERE CONTAINS(c.content, '{keyword}')"
results = list(container.query_items(query=query, enable_cross_partition_query=True))
return results
except Exception as e:
st.error(f"Vector search error: {str(e)}")
return []
# =============================================================================
# ───────────── NEW AI MODALITY RECORD TEMPLATES
# =============================================================================
def new_ai_record(container):
new_id = generate_unique_id()
default_doc = {
"id": new_id,
"pk": new_id,
"name": "AI Modality Record",
"function_url": "https://example.com/function",
"input_text": "### Input (markdown)\n\nType your input here.",
"output_text": "### Output (markdown)\n\nResult will appear here.",
"timestamp": datetime.now().isoformat(),
"type": "ai_modality"
}
success, message = insert_record(container, default_doc)
if success:
st.success("New AI modality record created! πŸ’‘")
return default_doc
else:
st.error("Error creating AI record: " + message)
return None
def new_links_record(container):
new_id = generate_unique_id()
links_md = "\n".join([f"- {link['emoji']} [{link['title']}]({link['url']})" for link in external_links])
default_doc = {
"id": new_id,
"pk": new_id,
"name": "Portal Links Record",
"function_url": "",
"input_text": links_md,
"output_text": "",
"timestamp": datetime.now().isoformat(),
"type": "ai_modality"
}
success, message = insert_record(container, default_doc)
if success:
st.success("New Portal Links record created! πŸ”—")
return default_doc
else:
st.error("Error creating links record: " + message)
return None
# =============================================================================
# ───────────── SIDEBAR DATA GRID (Editable Names Grid)
# =============================================================================
def edit_names_grid(container):
records = get_documents(container)
data = []
for rec in records:
ts = rec.get("timestamp", "")
try:
dt = datetime.fromisoformat(ts)
formatted = dt.strftime("%I:%M %p %m/%d/%Y")
except Exception:
formatted = ts
data.append({
"ID": rec.get("id", ""),
"Name": rec.get("name", ""),
"Timestamp": formatted
})
df = pd.DataFrame(data)
edited_df = st.sidebar.data_editor(df[["Name", "Timestamp"]], key="names_editor", num_rows="dynamic")
if st.sidebar.button("πŸ’Ύ Save Name Changes"):
for idx, row in edited_df.iterrows():
original = df.iloc[idx]
if row["Name"] != original["Name"]:
doc_id = original["ID"]
doc = next((r for r in records if r.get("id") == doc_id), None)
if doc:
doc["name"] = row["Name"]
success, message = update_record(container, doc)
if success:
st.sidebar.success(f"Updated Name for {doc_id} to '{row['Name']}'")
else:
st.sidebar.error(f"Update error for {doc_id}: {message}")
st.experimental_rerun()
# =============================================================================
# ───────────── SEARCH RESULTS DISPLAY (Editable Code Editors)
# =============================================================================
def display_search_results(keyword, container):
results = vector_keyword_search(keyword, container)
st.markdown("### πŸ” Search Results")
for res in results:
doc_id = res.get("id", "")
exp = st.expander(f"Result {doc_id}")
with exp:
edited = st.text_area("Edit Document", value=json.dumps(res, indent=2), key=f"search_{doc_id}")
if st.button(f"πŸ’Ύ Save changes for {doc_id}", key=f"save_search_{doc_id}"):
try:
updated_doc = json.loads(edited)
container.upsert_item(body=updated_doc)
st.success(f"Updated {doc_id}!")
st.experimental_rerun()
except Exception as e:
st.error(f"Error saving {doc_id}: {str(e)}")
# =============================================================================
# ───────────── DOCUMENTS LIST VIEW (Editable List with Sorting)
# =============================================================================
def edit_documents_list(container):
records = get_documents(container)
sort_option = st.selectbox("Sort by", ["Timestamp", "Name"], key="sort_option")
if sort_option == "Name":
records.sort(key=lambda r: r.get("name", "").lower())
else:
records.sort(key=lambda r: r.get("timestamp", ""), reverse=True)
data = []
for rec in records:
ts = rec.get("timestamp", "")
try:
dt = datetime.fromisoformat(ts)
formatted = dt.strftime("%I:%M %p %m/%d/%Y")
except Exception:
formatted = ts
data.append({
"ID": rec.get("id", ""),
"Name": rec.get("name", ""),
"Content": rec.get("content", "")[:100] + "..." if rec.get("content", "") else "",
"Timestamp": formatted
})
df = pd.DataFrame(data)
edited_df = st.data_editor(df[["Name", "Content", "Timestamp"]], key="docs_editor", num_rows="dynamic")
if st.button("πŸ’Ύ Save List Changes"):
for idx, row in edited_df.iterrows():
original = data[idx]
if row["Name"] != original["Name"] or row["Content"] != original["Content"]:
doc_id = original["ID"]
doc = next((r for r in records if r.get("id") == doc_id), None)
if doc:
doc["name"] = row["Name"]
doc["content"] = row["Content"]
success, message = update_record(container, doc)
if success:
st.success(f"Updated {doc_id} πŸ‘")
else:
st.error(f"Error updating {doc_id}: {message}")
st.experimental_rerun()
# =============================================================================
# ───────────── VIDEO & AUDIO UI FUNCTIONS ─────────────
# =============================================================================
def validate_and_preprocess_image(file_data, target_size=(576, 1024)):
try:
st.write("Preprocessing image...")
if isinstance(file_data, bytes):
img = Image.open(io.BytesIO(file_data))
elif hasattr(file_data, 'read'):
if hasattr(file_data, 'seek'):
file_data.seek(0)
img = Image.open(file_data)
elif isinstance(file_data, Image.Image):
img = file_data
else:
raise ValueError(f"Unsupported input: {type(file_data)}")
if img.mode != 'RGB':
img = img.convert('RGB')
aspect_ratio = img.size[0] / img.size[1]
if aspect_ratio > target_size[0] / target_size[1]:
new_width = target_size[0]
new_height = int(new_width / aspect_ratio)
else:
new_height = target_size[1]
new_width = int(new_height * aspect_ratio)
new_width = (new_width // 2) * 2
new_height = (new_height // 2) * 2
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
final_img = Image.new('RGB', target_size, (255, 255, 255))
paste_x = (target_size[0] - new_width) // 2
paste_y = (target_size[1] - new_height) // 2
final_img.paste(resized_img, (paste_x, paste_y))
return final_img
except Exception as e:
st.error(f"Image error: {str(e)}")
return None
def add_video_generation_ui(container):
st.markdown("### πŸŽ₯ Video Gen")
col1, col2 = st.columns([2, 1])
with col1:
uploaded_file = st.file_uploader("Upload Image πŸ–ΌοΈ", type=['png', 'jpg', 'jpeg'])
with col2:
st.markdown("#### Params")
motion = st.slider("🌊 Motion", 1, 255, 127)
fps = st.slider("🎬 FPS", 1, 30, 6)
with st.expander("Advanced"):
use_custom = st.checkbox("Custom Seed")
seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None
if uploaded_file is not None:
try:
file_data = uploaded_file.read()
preview1, preview2 = st.columns(2)
with preview1:
st.write("Original")
st.image(Image.open(io.BytesIO(file_data)), use_column_width=True)
with preview2:
proc_img = validate_and_preprocess_image(io.BytesIO(file_data))
if proc_img:
st.write("Processed")
st.image(proc_img, use_column_width=True)
else:
st.error("Preprocess failed")
return
if st.button("πŸŽ₯ Generate"):
with st.spinner("Generating video..."):
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
proc_img.save(temp_file.name, format='PNG')
try:
client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN"))
result = client.predict(
image=temp_file.name,
seed=seed if seed is not None else int(time.time() * 1000),
randomize_seed=seed is None,
motion_bucket_id=motion,
fps_id=fps,
api_name="/video"
)
if result and isinstance(result, tuple) and len(result) >= 1:
video_path = result[0].get('video') if isinstance(result[0], dict) else None
if video_path and os.path.exists(video_path):
video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
shutil.copy(video_path, video_filename)
st.success(f"Video generated! πŸŽ‰")
st.video(video_filename)
if container:
video_record = {
"id": generate_unique_id(),
"pk": generate_unique_id(),
"type": "generated_video",
"filename": video_filename,
"seed": seed if seed is not None else "random",
"motion": motion,
"fps": fps,
"timestamp": datetime.now().isoformat()
}
success, message = insert_record(container, video_record)
if success:
st.success("DB record saved!")
else:
st.error(f"DB error: {message}")
else:
st.error("Invalid result format")
else:
st.error("No result returned")
except Exception as e:
st.error(f"Video gen error: {str(e)}")
finally:
try:
os.unlink(temp_file.name)
st.write("Temp file removed")
except Exception as e:
st.warning(f"Cleanup error: {str(e)}")
except Exception as e:
st.error(f"Upload error: {str(e)}")
# =============================================================================
# ───────────── NEW ITEM & FIELD FUNCTIONS
# =============================================================================
def new_item_default(container):
new_id = generate_unique_id()
default_doc = {
"id": new_id,
"pk": new_id,
"name": "New Sample Document",
"content": "Start editing your document here...",
"timestamp": datetime.now().isoformat(),
"type": "sample"
}
success, message = insert_record(container, default_doc)
if success:
st.success("New sample document created! ✨")
return default_doc
else:
st.error("Error creating new item: " + message)
return None
def auto_save_edit():
try:
edited_str = st.session_state.doc_editor
try:
json.loads(edited_str)
except Exception:
edited_str = sanitize_json_text(edited_str)
edited_doc = json.loads(edited_str)
container = st.session_state.current_container
container.upsert_item(edited_doc)
st.success("Auto-saved! πŸ’Ύ")
except Exception as e:
st.error(f"Auto-save error: {str(e)}")
def add_field_to_doc():
key = st.session_state.new_field_key
value = st.session_state.new_field_value
try:
doc = json.loads(st.session_state.doc_editor)
doc[key] = value
st.session_state.doc_editor = json.dumps(doc, indent=2)
auto_save_edit()
st.success(f"Added field {key} πŸ‘")
except Exception as e:
st.error(f"Error adding field: {str(e)}")
# =============================================================================
# ───────────── SEARCH RESULTS DISPLAY (Editable Code Editors)
# =============================================================================
def display_search_results(keyword, container):
results = vector_keyword_search(keyword, container)
st.markdown("### πŸ” Search Results")
for res in results:
doc_id = res.get("id", "")
exp = st.expander(f"Result {doc_id}")
with exp:
edited = st.text_area("Edit Document", value=json.dumps(res, indent=2), key=f"search_{doc_id}")
if st.button(f"πŸ’Ύ Save changes for {doc_id}", key=f"save_search_{doc_id}"):
try:
updated_doc = json.loads(edited)
container.upsert_item(body=updated_doc)
st.success(f"Updated {doc_id}!")
st.experimental_rerun()
except Exception as e:
st.error(f"Error saving {doc_id}: {str(e)}")
# =============================================================================
# ───────────── DOCUMENTS LIST VIEW (Editable List with Sorting)
# =============================================================================
def edit_documents_list(container):
records = get_documents(container)
sort_option = st.selectbox("Sort by", ["Timestamp", "Name"], key="sort_option")
if sort_option == "Name":
records.sort(key=lambda r: r.get("name", "").lower())
else:
records.sort(key=lambda r: r.get("timestamp", ""), reverse=True)
data = []
for rec in records:
ts = rec.get("timestamp", "")
try:
dt = datetime.fromisoformat(ts)
formatted = dt.strftime("%I:%M %p %m/%d/%Y")
except Exception:
formatted = ts
data.append({
"ID": rec.get("id", ""),
"Name": rec.get("name", ""),
"Content": rec.get("content", "")[:100] + "..." if rec.get("content", "") else "",
"Timestamp": formatted
})
df = pd.DataFrame(data)
edited_df = st.data_editor(df[["Name", "Content", "Timestamp"]], key="docs_editor", num_rows="dynamic")
if st.button("πŸ’Ύ Save List Changes"):
for idx, row in edited_df.iterrows():
original = data[idx]
if row["Name"] != original["Name"] or row["Content"] != original["Content"]:
doc_id = original["ID"]
doc = next((r for r in records if r.get("id") == doc_id), None)
if doc:
doc["name"] = row["Name"]
doc["content"] = row["Content"]
success, message = update_record(container, doc)
if success:
st.success(f"Updated {doc_id} πŸ‘")
else:
st.error(f"Error updating {doc_id}: {message}")
st.experimental_rerun()
# =============================================================================
# ───────────── SEARCH DOCUMENTS UI (Enter Key triggers search)
# =============================================================================
def search_documents_ui(container):
with st.sidebar.form("search_form"):
keyword = st.text_input("Search Keyword", key="search_keyword")
submitted = st.form_submit_button("πŸ” Search")
if submitted and keyword:
display_search_results(keyword, container)
# =============================================================================
# ───────────── MAIN FUNCTION ─────────────
# =============================================================================
def main():
st.markdown("### πŸ™ GitCosmos - Cosmos & Git Hub")
st.markdown(f"[πŸ”— Portal]({CosmosDBUrl})")
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
st.session_state.setdefault("current_container", None)
if Key:
st.session_state.primary_key = Key
st.session_state.logged_in = True
else:
st.error("Missing Cosmos Key πŸ”‘βŒ")
return
st.sidebar.markdown("## πŸ› οΈ Item Management")
if st.sidebar.button("New Item"):
if st.session_state.get("current_container"):
new_doc = new_item_default(st.session_state.current_container)
if new_doc:
st.session_state.doc_editor = json.dumps(new_doc, indent=2)
else:
st.warning("No container selected!")
st.sidebar.text_input("New Field Key", key="new_field_key")
st.sidebar.text_input("New Field Value", key="new_field_value")
if st.sidebar.button("Add Field"):
if "doc_editor" in st.session_state:
add_field_to_doc()
else:
st.warning("No document loaded to add a field.")
if st.sidebar.button("New AI Record"):
if st.session_state.get("current_container"):
new_ai_record(st.session_state.current_container)
else:
st.warning("No container selected!")
if st.sidebar.button("New Links Record"):
if st.session_state.get("current_container"):
new_links_record(st.session_state.current_container)
else:
st.warning("No container selected!")
st.sidebar.markdown("## πŸ” Vector Search")
search_documents_ui(st.session_state.get("current_container"))
show_sidebar_data_grid()
if st.session_state.get("current_container"):
edit_names_grid(st.session_state.current_container)
try:
if st.session_state.get("client") is None:
st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key)
st.sidebar.title("πŸ™ Navigator")
databases = get_databases(st.session_state.client)
selected_db = st.sidebar.selectbox("πŸ—ƒοΈ DB", databases)
if selected_db != st.session_state.get("selected_database"):
st.session_state.selected_database = selected_db
st.session_state.selected_container = None
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
if st.session_state.selected_database:
database = st.session_state.client.get_database_client(st.session_state.selected_database)
if "show_new_container_form" not in st.session_state:
st.session_state.show_new_container_form = False
if st.sidebar.button("πŸ†• New Container"):
st.session_state.show_new_container_form = True
if st.session_state.show_new_container_form:
with st.sidebar.form("new_container_form"):
new_container_id = st.text_input("Container ID", value="aiml-container")
new_partition_key = st.text_input("Partition Key", value="/pk")
new_analytical = st.checkbox("Enable Analytical Store", value=True)
submitted = st.form_submit_button("Create Container")
if submitted:
analytical_ttl = -1 if new_analytical else None
new_container = create_new_container(database, new_container_id, new_partition_key, analytical_storage_ttl=analytical_ttl)
if new_container:
st.success(f"Container '{new_container_id}' created.")
default_id = generate_unique_id()
default_item = {
"id": default_id,
"pk": default_id,
"name": "Default Image Prompt",
"prompt": "Enter your image prompt here",
"timestamp": datetime.now().isoformat(),
"type": "image_prompt"
}
insert_success, insert_message = insert_record(new_container, default_item)
if insert_success:
st.info("Default templated item created in new container.")
else:
st.error(f"Default item insertion error: {insert_message}")
st.session_state.show_new_container_form = False
st.session_state.new_container_created = new_container_id
st.rerun()
containers = get_containers(database)
if "new_container_created" in st.session_state and st.session_state.new_container_created not in containers:
containers.append(st.session_state.new_container_created)
selected_container = st.sidebar.selectbox("πŸ“ Container", containers)
if selected_container != st.session_state.get("selected_container"):
st.session_state.selected_container = selected_container
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
if st.session_state.selected_container:
container = database.get_container_client(st.session_state.selected_container)
st.session_state.current_container = container
if st.sidebar.button("πŸ“¦ Export"):
download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client)
if download_link.startswith('<a'):
st.markdown(download_link, unsafe_allow_html=True)
else:
st.error(download_link)
documents = get_documents(container)
total_docs = len(documents)
num_docs = st.slider("Docs", 1, 20, 1)
documents_to_display = documents[:num_docs] if total_docs > num_docs else documents
st.sidebar.info(f"Showing {len(documents_to_display)} docs")
view_options = ['List', 'Markdown', 'Code', 'Run AI', 'Clone', 'New']
selected_view = st.sidebar.selectbox("View", view_options, index=1)
if selected_view == 'List':
edit_documents_list(container)
elif selected_view == 'Markdown':
st.markdown("#### πŸ“„ Markdown")
if documents:
doc = documents[st.session_state.current_index]
content = json.dumps(doc, indent=2)
st.markdown(f"```json\n{content}\n```")
col_prev, col_next = st.columns(2)
with col_prev:
if st.button("⬅️") and st.session_state.current_index > 0:
st.session_state.current_index -= 1
st.rerun()
with col_next:
if st.button("➑️") and st.session_state.current_index < total_docs - 1:
st.session_state.current_index += 1
st.rerun()
elif selected_view == 'Code':
st.markdown("#### πŸ’» Code Editor")
if documents:
doc = documents[st.session_state.current_index]
if "doc_editor" not in st.session_state:
st.session_state.doc_editor = json.dumps(doc, indent=2)
edited = st.text_area("Edit JSON", value=st.session_state.doc_editor, height=300, key="doc_editor", on_change=lambda: auto_save_edit())
col_prev, col_next = st.columns(2)
with col_prev:
if st.button("⬅️") and st.session_state.current_index > 0:
st.session_state.current_index -= 1
st.rerun()
with col_next:
if st.button("➑️") and st.session_state.current_index < total_docs - 1:
st.session_state.current_index += 1
st.rerun()
col_save, col_delete = st.columns(2)
with col_save:
if st.button("πŸ’Ύ Save", key=f'save_{st.session_state.current_index}'):
try:
updated_doc = json.loads(edited)
container.upsert_item(body=updated_doc)
st.success(f"Saved {updated_doc['id']}")
st.rerun()
except Exception as e:
st.error(f"Save err: {str(e)}")
with col_delete:
if st.button("πŸ—‘οΈ Delete", key=f'delete_{st.session_state.current_index}'):
try:
current_doc = json.loads(edited)
success, message = delete_record(container, current_doc)
if success:
st.success(message)
st.rerun()
else:
st.error(message)
except Exception as e:
st.error(f"Delete err: {str(e)}")
if "delete_log" in st.session_state and st.session_state.delete_log:
st.subheader("Delete Log")
for log_entry in st.session_state.delete_log[-5:]:
st.write(log_entry)
elif selected_view == 'Run AI':
st.markdown("#### πŸ€– Run AI")
ai_query = st.text_area("Enter your query for ArXiv search:", key="arxiv_query", height=100)
if st.button("Send"):
st.session_state.last_query = ai_query
perform_ai_lookup(ai_query, vocal_summary=True, extended_refs=False, titles_summary=True, full_audio=True, useArxiv=True, useArxivAudio=False)
elif selected_view == 'Clone':
st.markdown("#### πŸ“„ Clone")
if documents:
doc = documents[st.session_state.current_index]
st.markdown(f"Original ID: {doc.get('id', '')}")
new_id = st.text_input("New ID", value=generate_unique_id(), key='new_clone_id')
new_name = st.text_input("New Name", value=f"Clone_{new_id[:8]}", key='new_clone_name')
new_doc = {'id': new_id, 'pk': new_id, 'name': new_name, **{k: v for k, v in doc.items() if k not in ['id', 'name', 'pk', '_rid', '_self', '_etag', '_attachments', '_ts']}}
doc_str = st.text_area("Edit JSON", value=json.dumps(new_doc, indent=2), height=300, key='clone_preview')
col1, col2 = st.columns(2)
with col1:
if st.button("πŸ”„ Regenerate"):
new_id = generate_unique_id()
st.session_state.new_clone_id = new_id
st.rerun()
with col2:
if st.button("πŸ’Ύ Save Clone"):
try:
final_doc = json.loads(doc_str)
for field in ['_rid', '_self', '_etag', '_attachments', '_ts']:
final_doc.pop(field, None)
container.create_item(body=final_doc)
st.success(f"Cloned {final_doc['id']}")
st.rerun()
except Exception as e:
st.error(f"Clone err: {str(e)}")
col_prev, col_next = st.columns(2)
with col_prev:
if st.button("⬅️") and st.session_state.current_index > 0:
st.session_state.current_index -= 1
st.rerun()
with col_next:
if st.button("➑️") and st.session_state.current_index < total_docs - 1:
st.session_state.current_index += 1
st.rerun()
elif selected_view == 'New':
st.markdown("#### βž• New Doc")
if st.button("πŸ€– Auto-Gen"):
auto_doc = {
"id": generate_unique_id(),
"pk": generate_unique_id(),
"name": f"Auto {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"content": "Auto-generated record.",
"timestamp": datetime.now().isoformat()
}
success, message = insert_record(container, auto_doc)
if success:
st.success(message)
st.rerun()
else:
st.error(message)
else:
new_id = st.text_input("ID", value=generate_unique_id(), key='new_id')
default_doc = {
"id": new_id,
"pk": new_id,
"name": "New Doc",
"content": "",
"timestamp": datetime.now().isoformat()
}
new_doc_str = st.text_area("JSON", value=json.dumps(default_doc, indent=2), height=300)
if st.button("βž• Create"):
try:
cleaned = preprocess_text(new_doc_str)
new_doc = json.loads(cleaned)
new_doc['id'] = new_id
new_doc['pk'] = new_id
success, message = insert_record(container, new_doc)
if success:
st.success(f"Created {new_doc['id']}")
st.rerun()
else:
st.error(message)
except Exception as e:
st.error(f"Create err: {str(e)}")
st.subheader(f"πŸ“Š {st.session_state.selected_container}")
if documents_to_display:
df = pd.DataFrame(documents_to_display)
st.dataframe(df)
else:
st.info("No docs.")
update_file_management_section()
except exceptions.CosmosHttpResponseError as e:
st.error(f"Cosmos error: {str(e)} 🚨")
except Exception as e:
st.error(f"Error: {str(e)} 😱")
if st.session_state.logged_in and st.sidebar.button("πŸšͺ Logout"):
st.markdown("#### πŸšͺ Logout")
st.session_state.logged_in = False
st.session_state.selected_records = []
st.session_state.client = None
st.session_state.selected_database = None
st.session_state.selected_container = None
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
show_sidebar_data_grid()
if __name__ == "__main__":
main()