# %% ───────────── IMPORTS ─────────────
import base64
import glob
import hashlib
import json
import os
import pandas as pd
import pytz
import random
import re
import shutil
import streamlit as st
import time
import traceback
import uuid
import zipfile
from PIL import Image
from azure.cosmos import CosmosClient, exceptions
from datetime import datetime
from git import Repo
from github import Github
from gradio_client import Client, handle_file
import tempfile
import io
import requests
import numpy as np
from urllib.parse import quote
# %% ───────────── APP CONFIGURATION ─────────────
Site_Name = '🐙 GitCosmos'
title = "🐙 GitCosmos"
helpURL = 'https://huggingface.co/awacke1'
bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/'
icons = '🐙🌌💫'
st.set_page_config(
page_title=title,
page_icon=icons,
layout="wide",
initial_sidebar_state="auto",
menu_items={
'Get Help': helpURL,
'Report a bug': bugURL,
'About': title
}
)
# Cosmos DB & App URLs
ENDPOINT = "https://acae-afd.documents.azure.com:443/"
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME")
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME")
Key = os.environ.get("Key")
LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI"
CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer'
# %% ───────────── HELPER FUNCTIONS ─────────────
# 📎 Get download link from file
def get_download_link(file_path):
with open(file_path, "rb") as file:
contents = file.read()
b64 = base64.b64encode(contents).decode()
file_name = os.path.basename(file_path)
return f'Download {file_name} 📂'
# 🆔 Generate a unique ID
def generate_unique_id():
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')
unique_uuid = str(uuid.uuid4())
return_value = f"{timestamp}-{unique_uuid}"
st.write('New ID: ' + return_value)
return return_value
# 📄 Generate a filename from a prompt
def generate_filename(prompt, file_type):
central = pytz.timezone('US/Central')
safe_date_time = datetime.now(central).strftime("%m%d_%H%M")
safe_prompt = re.sub(r'\W+', '', prompt)[:90]
return f"{safe_date_time}{safe_prompt}.{file_type}"
# 💾 Create and save a file with prompt & response
def create_file(filename, prompt, response, should_save=True):
if not should_save:
return
with open(filename, 'w', encoding='utf-8') as file:
file.write(prompt + "\n\n" + response)
# 📖 Load file content from disk
def load_file(file_name):
with open(file_name, "r", encoding='utf-8') as file:
content = file.read()
return content
# 🔍 Display glossary entity with quick links
def display_glossary_entity(k):
search_urls = {
"🚀": lambda k: f"/?q={k}",
"📖": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}",
"🔍": lambda k: f"https://www.google.com/search?q={quote(k)}",
"🎥": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}",
}
links_md = ' '.join([f"{emoji}" for emoji, url in search_urls.items()])
st.markdown(f"{k} {links_md}", unsafe_allow_html=True)
# 🗜️ Create zip of multiple files
def create_zip_of_files(files):
zip_name = "all_files.zip"
with zipfile.ZipFile(zip_name, 'w') as zipf:
for file in files:
zipf.write(file)
return zip_name
# 🎬 Get HTML for video playback
def get_video_html(video_path, width="100%"):
video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}"
return f'''
'''
# 🎵 Get HTML for audio playback
def get_audio_html(audio_path, width="100%"):
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}"
return f'''
'''
# 📝 Preprocess text for JSON safety
def preprocess_text(text):
text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n')
text = text.replace('"', '\\"')
text = re.sub(r'[\t]', ' ', text)
text = re.sub(r'[^\x00-\x7F]+', '', text)
return text.strip()
# %% ───────────── COSMOS DB FUNCTIONS ─────────────
# 📚 List all databases in Cosmos
def get_databases(client):
return [db['id'] for db in client.list_databases()]
# 📦 List all containers in a database
def get_containers(database):
return [container['id'] for container in database.list_containers()]
# 📄 Query documents from a container (most recent first)
def get_documents(container, limit=None):
query = "SELECT * FROM c ORDER BY c._ts DESC"
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit))
return items
# 📥 Insert a new record into Cosmos
def insert_record(container, record):
try:
container.create_item(body=record)
return True, "Inserted! 🎉"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error: {str(e)} 🚨"
except Exception as e:
return False, f"Error: {str(e)} 😱"
# 🔄 Update an existing Cosmos record
def update_record(container, updated_record):
try:
container.upsert_item(body=updated_record)
return True, f"Updated {updated_record['id']} 🛠️"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error: {str(e)} 🚨"
except Exception as e:
return False, f"Error: {traceback.format_exc()} 😱"
# 🗑️ Delete a record from Cosmos DB using /id as partition key
def delete_record(container, record):
"""
Delete a record from Cosmos DB using its 'id' field as both the item ID and partition key.
Args:
container: Cosmos DB container client
record: Dictionary containing at least 'id'
Returns:
tuple: (success: bool, message: str)
"""
try:
# Ensure record has an ID
if "id" not in record:
return False, "Record must contain an 'id' field. 🛑"
doc_id = record["id"]
# Log the document being deleted (persistent in session state)
if "delete_log" not in st.session_state:
st.session_state.delete_log = []
st.session_state.delete_log.append(f"Attempting to delete document: {json.dumps(record, indent=2)}")
# Use 'id' as the partition key since container is partitioned on /id
partition_key_value = doc_id
st.session_state.delete_log.append(f"Using ID and Partition Key: {partition_key_value}")
# Perform the deletion
container.delete_item(item=doc_id, partition_key=partition_key_value)
success_msg = f"Record {doc_id} successfully deleted from Cosmos DB. 🗑️"
st.session_state.delete_log.append(success_msg)
return True, success_msg
except exceptions.CosmosResourceNotFoundError:
success_msg = f"Record {doc_id} not found in Cosmos DB (already deleted or never existed). 🗑️"
st.session_state.delete_log.append(success_msg)
return True, success_msg # Treat as success since the goal is removal
except exceptions.CosmosHttpResponseError as e:
error_msg = f"HTTP error deleting {doc_id}: {str(e)}. 🚨"
st.session_state.delete_log.append(error_msg)
return False, error_msg
except Exception as e:
error_msg = f"Unexpected error deleting {doc_id}: {str(traceback.format_exc())}. 😱"
st.session_state.delete_log.append(error_msg)
return False, error_msg
# 💾 Save a new document to Cosmos DB with extra fields
def save_to_cosmos_db(container, query, response1, response2):
try:
if container:
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')
unique_uuid = str(uuid.uuid4())
new_id = f"{timestamp}-{unique_uuid}"
record = {
"id": new_id,
"name": new_id,
"query": query,
"response1": response1,
"response2": response2,
"timestamp": datetime.utcnow().isoformat(),
"type": "ai_response",
"version": "1.0"
}
container.create_item(body=record)
st.success(f"Saved: {record['id']}")
st.session_state.documents = get_documents(container)
else:
st.error("Cosmos container not initialized.")
except Exception as e:
st.error(f"Save error: {str(e)}")
# 🗄️ Archive all documents in a container and provide a download link
def archive_current_container(database_name, container_name, client):
try:
base_dir = "./cosmos_archive_current_container"
if os.path.exists(base_dir):
shutil.rmtree(base_dir)
os.makedirs(base_dir)
db_client = client.get_database_client(database_name)
container_client = db_client.get_container_client(container_name)
items = list(container_client.read_all_items())
container_dir = os.path.join(base_dir, container_name)
os.makedirs(container_dir)
for item in items:
item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}")
with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f:
json.dump(item, f, indent=2)
archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}"
shutil.make_archive(archive_name, 'zip', base_dir)
return get_download_link(f"{archive_name}.zip")
except Exception as e:
return f"Archive error: {str(e)} 😢"
# %% ───────────── GITHUB FUNCTIONS ─────────────
# 📥 Clone a GitHub repository locally
def download_github_repo(url, local_path):
if os.path.exists(local_path):
shutil.rmtree(local_path)
Repo.clone_from(url, local_path)
# 🗜️ Zip a directory
def create_zip_file(source_dir, output_filename):
shutil.make_archive(output_filename, 'zip', source_dir)
# 🏗️ Create a new GitHub repo via API
def create_repo(g, repo_name):
user = g.get_user()
return user.create_repo(repo_name)
# 🚀 Push local repo changes to GitHub
def push_to_github(local_path, repo, github_token):
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git"
local_repo = Repo(local_path)
if 'origin' in [remote.name for remote in local_repo.remotes]:
origin = local_repo.remote('origin')
origin.set_url(repo_url)
else:
origin = local_repo.create_remote('origin', repo_url)
if not local_repo.heads:
local_repo.git.checkout('-b', 'main')
current_branch = 'main'
else:
current_branch = local_repo.active_branch.name
local_repo.git.add(A=True)
if local_repo.is_dirty():
local_repo.git.commit('-m', 'Initial commit')
origin.push(refspec=f'{current_branch}:{current_branch}')
# %% ───────────── FILE & MEDIA MANAGEMENT FUNCTIONS ─────────────
# 📂 List saved Markdown files in sidebar with actions
def display_saved_files_in_sidebar():
all_files = sorted([f for f in glob.glob("*.md") if not f.lower().startswith('readme')], reverse=True)
st.sidebar.markdown("## 📁 Files")
for file in all_files:
col1, col2, col3 = st.sidebar.columns([6, 2, 1])
with col1:
st.markdown(f"📄 {file}")
with col2:
st.sidebar.download_button(
label="⬇️",
data=open(file, 'rb').read(),
file_name=file
)
with col3:
if st.sidebar.button("🗑", key=f"delete_{file}"):
os.remove(file)
st.rerun()
# 👀 Display file viewer in main area
def display_file_viewer(file_path):
content = load_file(file_path)
if content:
st.markdown("### 📄 File Viewer")
st.markdown(f"**{file_path}**")
file_stats = os.stat(file_path)
st.markdown(f"**Mod:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')} | **Size:** {file_stats.st_size} bytes")
st.markdown("---")
st.markdown(content)
st.download_button("⬇️", data=content, file_name=os.path.basename(file_path), mime="text/markdown")
# ✏️ Display file editor (Markdown & Code)
def display_file_editor(file_path):
if 'file_content' not in st.session_state:
st.session_state.file_content = {}
if file_path not in st.session_state.file_content:
content = load_file(file_path)
if content is not None:
st.session_state.file_content[file_path] = content
else:
return
st.markdown("### ✏️ Edit File")
st.markdown(f"**Editing:** {file_path}")
md_tab, code_tab = st.tabs(["Markdown", "Code"])
with md_tab:
st.markdown(st.session_state.file_content[file_path])
with code_tab:
new_content = st.text_area("Edit:", value=st.session_state.file_content[file_path], height=400, key=f"editor_{hash(file_path)}")
col1, col2 = st.columns([1, 5])
with col1:
if st.button("💾 Save"):
if save_file_content(file_path, new_content):
st.session_state.file_content[file_path] = new_content
st.success("Saved! 🎉")
time.sleep(1)
st.rerun()
with col2:
st.download_button("⬇️", data=new_content, file_name=os.path.basename(file_path), mime="text/markdown")
# 💾 Save content to a file (with error handling)
def save_file_content(file_path, content):
try:
with open(file_path, 'w', encoding='utf-8') as file:
file.write(content)
return True
except Exception as e:
st.error(f"Save error: {str(e)}")
return False
# 🗂️ Update file management UI section (view, edit, delete)
def update_file_management_section():
if 'file_view_mode' not in st.session_state:
st.session_state.file_view_mode = None
if 'current_file' not in st.session_state:
st.session_state.current_file = None
if 'file_content' not in st.session_state:
st.session_state.file_content = {}
all_files = sorted(glob.glob("*.md"), reverse=True)
st.sidebar.title("📁 Files")
if st.sidebar.button("🗑 Delete All"):
for file in all_files:
os.remove(file)
st.session_state.file_content = {}
st.session_state.current_file = None
st.session_state.file_view_mode = None
st.rerun()
if st.sidebar.button("⬇️ Download All"):
zip_file = create_zip_of_files(all_files)
st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True)
for file in all_files:
col1, col2, col3, col4 = st.sidebar.columns([1, 3, 1, 1])
with col1:
if st.button("🌐", key=f"view_{file}"):
st.session_state.current_file = file
st.session_state.file_view_mode = 'view'
if file not in st.session_state.file_content:
content = load_file(file)
if content is not None:
st.session_state.file_content[file] = content
st.rerun()
with col2:
st.markdown(get_download_link(file), unsafe_allow_html=True)
with col3:
if st.button("📂", key=f"edit_{file}"):
st.session_state.current_file = file
st.session_state.file_view_mode = 'edit'
if file not in st.session_state.file_content:
content = load_file(file)
if content is not None:
st.session_state.file_content[file] = content
st.rerun()
with col4:
if st.button("🗑", key=f"delete_{file}"):
os.remove(file)
if file in st.session_state.file_content:
del st.session_state.file_content[file]
if st.session_state.current_file == file:
st.session_state.current_file = None
st.session_state.file_view_mode = None
st.rerun()
if st.session_state.current_file:
if st.session_state.file_view_mode == 'view':
display_file_viewer(st.session_state.current_file)
elif st.session_state.file_view_mode == 'edit':
display_file_editor(st.session_state.current_file)
# %% ───────────── VIDEO & AUDIO UI FUNCTIONS ─────────────
# 🖼️ Validate and preprocess an image for video generation
def validate_and_preprocess_image(file_data, target_size=(576, 1024)):
try:
st.write("Preprocessing image...")
if isinstance(file_data, bytes):
img = Image.open(io.BytesIO(file_data))
elif hasattr(file_data, 'read'):
if hasattr(file_data, 'seek'):
file_data.seek(0)
img = Image.open(file_data)
elif isinstance(file_data, Image.Image):
img = file_data
else:
raise ValueError(f"Unsupported input: {type(file_data)}")
if img.mode != 'RGB':
img = img.convert('RGB')
aspect_ratio = img.size[0] / img.size[1]
if aspect_ratio > target_size[0] / target_size[1]:
new_width = target_size[0]
new_height = int(new_width / aspect_ratio)
else:
new_height = target_size[1]
new_width = int(new_height * aspect_ratio)
new_width = (new_width // 2) * 2
new_height = (new_height // 2) * 2
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
final_img = Image.new('RGB', target_size, (255, 255, 255))
paste_x = (target_size[0] - new_width) // 2
paste_y = (target_size[1] - new_height) // 2
final_img.paste(resized_img, (paste_x, paste_y))
return final_img
except Exception as e:
st.error(f"Image error: {str(e)}")
return None
# ▶️ Add video generation UI with Gradio client
def add_video_generation_ui(container):
st.markdown("### 🎥 Video Gen")
col1, col2 = st.columns([2, 1])
with col1:
uploaded_file = st.file_uploader("Upload Image 🖼️", type=['png', 'jpg', 'jpeg'])
with col2:
st.markdown("#### Params")
motion = st.slider("🌊 Motion", 1, 255, 127)
fps = st.slider("🎬 FPS", 1, 30, 6)
with st.expander("Advanced"):
use_custom = st.checkbox("Custom Seed")
seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None
if uploaded_file is not None:
try:
file_data = uploaded_file.read()
preview1, preview2 = st.columns(2)
with preview1:
st.write("Original")
st.image(Image.open(io.BytesIO(file_data)), use_column_width=True)
with preview2:
proc_img = validate_and_preprocess_image(io.BytesIO(file_data))
if proc_img:
st.write("Processed")
st.image(proc_img, use_column_width=True)
else:
st.error("Preprocess failed")
return
if st.button("🎥 Generate"):
with st.spinner("Generating video..."):
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
proc_img.save(temp_file.name, format='PNG')
try:
client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN"))
result = client.predict(
image=temp_file.name,
seed=seed if seed is not None else int(time.time() * 1000),
randomize_seed=seed is None,
motion_bucket_id=motion,
fps_id=fps,
api_name="/video"
)
if result and isinstance(result, tuple) and len(result) >= 1:
video_path = result[0].get('video') if isinstance(result[0], dict) else None
if video_path and os.path.exists(video_path):
video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
shutil.copy(video_path, video_filename)
st.success(f"Video generated! 🎉")
st.video(video_filename)
if container:
video_record = {
"id": generate_unique_id(),
"type": "generated_video",
"filename": video_filename,
"seed": seed if seed is not None else "random",
"motion": motion,
"fps": fps,
"timestamp": datetime.now().isoformat()
}
success, message = insert_record(container, video_record)
if success:
st.success("DB record saved!")
else:
st.error(f"DB error: {message}")
else:
st.error("Invalid result format")
else:
st.error("No result returned")
except Exception as e:
st.error(f"Video gen error: {str(e)}")
finally:
try:
os.unlink(temp_file.name)
st.write("Temp file removed")
except Exception as e:
st.warning(f"Cleanup error: {str(e)}")
except Exception as e:
st.error(f"Upload error: {str(e)}")
# %% ───────────── MAIN FUNCTION ─────────────
# 🚀 Main app entry point
def main():
st.markdown("### 🐙 GitCosmos - Cosmos & Git Hub")
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
# Auth & Cosmos client initialization
if Key:
st.session_state.primary_key = Key
st.session_state.logged_in = True
else:
st.error("Missing Cosmos Key 🔑❌")
return
if st.session_state.logged_in:
try:
if st.session_state.get("client") is None:
st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key)
st.sidebar.title("🐙 Navigator")
databases = get_databases(st.session_state.client)
selected_db = st.sidebar.selectbox("🗃️ DB", databases)
st.markdown(CosmosDBUrl)
if selected_db != st.session_state.get("selected_database"):
st.session_state.selected_database = selected_db
st.session_state.selected_container = None
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
if st.session_state.selected_database:
database = st.session_state.client.get_database_client(st.session_state.selected_database)
containers = get_containers(database)
selected_container = st.sidebar.selectbox("📁 Container", containers)
if selected_container != st.session_state.get("selected_container"):
st.session_state.selected_container = selected_container
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
if st.session_state.selected_container:
container = database.get_container_client(st.session_state.selected_container)
if st.sidebar.button("📦 Export"):
download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client)
if download_link.startswith(' num_docs else documents
st.sidebar.info(f"Showing {len(documents_to_display)} docs")
# --- Document Viewer / Editor ---
view_options = ['Markdown', 'Code', 'Run AI', 'Clone', 'New']
selected_view = st.sidebar.selectbox("View", view_options, index=1)
if selected_view == 'Markdown':
st.markdown("#### 📄 Markdown")
if documents:
doc = documents[st.session_state.current_index]
content = json.dumps(doc, indent=2)
st.markdown(f"```json\n{content}\n```")
col_prev, col_next = st.columns(2)
with col_prev:
if st.button("⬅️") and st.session_state.current_index > 0:
st.session_state.current_index -= 1
st.rerun()
with col_next:
if st.button("➡️") and st.session_state.current_index < total_docs - 1:
st.session_state.current_index += 1
st.rerun()
elif selected_view == 'Code':
st.markdown("#### 💻 Code Editor")
if documents:
doc = documents[st.session_state.current_index]
doc_str = st.text_area("Edit JSON", value=json.dumps(doc, indent=2), height=300, key=f'code_{st.session_state.current_index}')
col_prev, col_next = st.columns(2)
with col_prev:
if st.button("⬅️") and st.session_state.current_index > 0:
st.session_state.current_index -= 1
st.rerun()
with col_next:
if st.button("➡️") and st.session_state.current_index < total_docs - 1:
st.session_state.current_index += 1
st.rerun()
col_save, col_delete = st.columns(2)
with col_save:
if st.button("💾 Save", key=f'save_{st.session_state.current_index}'):
try:
updated_doc = json.loads(doc_str)
container.upsert_item(body=updated_doc)
st.success(f"Saved {updated_doc['id']}")
st.rerun()
except Exception as e:
st.error(f"Save err: {str(e)}")
with col_delete:
if st.button("🗑️ Delete", key=f'delete_{st.session_state.current_index}'):
try:
current_doc = json.loads(doc_str)
success, message = delete_record(container, current_doc)
if success:
st.success(message)
st.rerun()
else:
st.error(message)
except Exception as e:
st.error(f"Delete err: {str(e)}")
# Display delete log persistently
if "delete_log" in st.session_state and st.session_state.delete_log:
st.subheader("Delete Log")
for log_entry in st.session_state.delete_log[-5:]: # Show last 5 entries
st.write(log_entry)
elif selected_view == 'Run AI':
st.markdown("#### 🤖 Run AI (stub)")
st.info("AI functionality not implemented.")
elif selected_view == 'Clone':
st.markdown("#### 📄 Clone")
if documents:
doc = documents[st.session_state.current_index]
st.markdown(f"Original ID: {doc.get('id', '')}")
new_id = st.text_input("New ID", value=generate_unique_id(), key='new_clone_id')
new_name = st.text_input("New Name", value=f"Clone_{new_id[:8]}", key='new_clone_name')
new_doc = {'id': new_id, 'name': new_name, **{k: v for k, v in doc.items() if k not in ['id', 'name', '_rid', '_self', '_etag', '_attachments', '_ts']}}
doc_str = st.text_area("Edit JSON", value=json.dumps(new_doc, indent=2), height=300, key='clone_preview')
col1, col2 = st.columns(2)
with col1:
if st.button("🔄 Regenerate"):
new_id = generate_unique_id()
st.session_state.new_clone_id = new_id
st.rerun()
with col2:
if st.button("💾 Save Clone"):
try:
final_doc = json.loads(doc_str)
for field in ['_rid', '_self', '_etag', '_attachments', '_ts']:
final_doc.pop(field, None)
container.create_item(body=final_doc)
st.success(f"Cloned {final_doc['id']}")
st.rerun()
except Exception as e:
st.error(f"Clone err: {str(e)}")
col_prev, col_next = st.columns(2)
with col_prev:
if st.button("⬅️") and st.session_state.current_index > 0:
st.session_state.current_index -= 1
st.rerun()
with col_next:
if st.button("➡️") and st.session_state.current_index < total_docs - 1:
st.session_state.current_index += 1
st.rerun()
elif selected_view == 'New':
st.markdown("#### ➕ New Doc")
if st.button("🤖 Auto-Gen"):
auto_doc = {
"id": generate_unique_id(),
"name": f"Auto {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"content": "Auto-generated record.",
"timestamp": datetime.now().isoformat()
}
success, message = insert_record(container, auto_doc)
if success:
st.success(message)
st.rerun()
else:
st.error(message)
else:
new_id = st.text_input("ID", value=generate_unique_id(), key='new_id')
default_doc = {
"id": new_id,
"name": "New Doc",
"content": "",
"timestamp": datetime.now().isoformat()
}
new_doc_str = st.text_area("JSON", value=json.dumps(default_doc, indent=2), height=300)
if st.button("➕ Create"):
try:
cleaned = preprocess_text(new_doc_str)
new_doc = json.loads(cleaned)
new_doc['id'] = new_id
success, message = insert_record(container, new_doc)
if success:
st.success(f"Created {new_doc['id']}")
st.rerun()
else:
st.error(message)
except Exception as e:
st.error(f"Create err: {str(e)}")
st.subheader(f"📊 {st.session_state.selected_container}")
if documents_to_display:
df = pd.DataFrame(documents_to_display)
st.dataframe(df)
else:
st.info("No docs.")
# --- End of Document UI ---
st.subheader("🐙 GitHub Ops")
github_token = os.environ.get("GITHUB")
source_repo = st.text_input("Source Repo URL", value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit")
new_repo_name = st.text_input("New Repo Name", value=f"Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}")
col_g1, col_g2 = st.columns(2)
with col_g1:
if st.button("📥 Clone Repo"):
if github_token and source_repo:
try:
local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}"
download_github_repo(source_repo, local_path)
zip_filename = f"{new_repo_name}.zip"
create_zip_file(local_path, zip_filename[:-4])
st.markdown(get_download_link(zip_filename), unsafe_allow_html=True)
st.success("Cloned! 🎉")
except Exception as e:
st.error(f"Clone err: {str(e)}")
finally:
if os.path.exists(local_path):
shutil.rmtree(local_path)
if os.path.exists(zip_filename):
os.remove(zip_filename)
else:
st.error("Missing token or URL 🔑❓")
with col_g2:
if st.button("📤 Push Repo"):
if github_token and source_repo:
try:
g = Github(github_token)
new_repo = create_repo(g, new_repo_name)
local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}"
download_github_repo(source_repo, local_path)
push_to_github(local_path, new_repo, github_token)
st.success(f"Pushed to {new_repo.html_url} 🚀")
except Exception as e:
st.error(f"Push err: {str(e)}")
finally:
if os.path.exists(local_path):
shutil.rmtree(local_path)
else:
st.error("Missing token or URL 🔑❓")
# --- File Management Section ---
update_file_management_section()
except exceptions.CosmosHttpResponseError as e:
st.error(f"Cosmos error: {str(e)} 🚨")
except Exception as e:
st.error(f"Error: {str(e)} 😱")
if st.session_state.logged_in and st.sidebar.button("🚪 Logout"):
st.markdown("#### 🚪 Logout")
st.session_state.logged_in = False
st.session_state.selected_records = []
st.session_state.client = None
st.session_state.selected_database = None
st.session_state.selected_container = None
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
if __name__ == "__main__":
main()