Spaces:
Running
Running
# app.py | |
# ============================================================================= | |
# ───────────── IMPORTS ───────────── | |
# ============================================================================= | |
import base64 | |
import glob | |
import hashlib | |
import json | |
import os | |
import pandas as pd | |
import pytz | |
import random | |
import re | |
import shutil | |
import streamlit as st | |
import time | |
import traceback | |
import uuid | |
import zipfile | |
from PIL import Image | |
from azure.cosmos import CosmosClient, PartitionKey, exceptions | |
from datetime import datetime | |
from git import Repo | |
from github import Github | |
from gradio_client import Client, handle_file | |
import tempfile | |
import io | |
import requests | |
import numpy as np | |
from urllib.parse import quote | |
# ============================================================================= | |
# ───────────── EXTERNAL HELP LINKS (Always visible in sidebar) ───────────── | |
# ============================================================================= | |
external_links = [ | |
{"title": "CosmosDB GenAI Full Text Search", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/full-text-search", "emoji": "💻"}, | |
{"title": "CosmosDB SQL API Client Library", "url": "https://learn.microsoft.com/en-us/python/api/overview/azure/cosmos-readme?view=azure-python", "emoji": "💻"}, | |
{"title": "CosmosDB Index and Query Vectors", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-python-vector-index-query", "emoji": "💻"}, | |
{"title": "CosmosDB NoSQL Materialized Views", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/materialized-views", "emoji": "💻"}, | |
{"title": "LangChain Vector Store Guide", "url": "https://python.langchain.com/docs/integrations/vectorstores/azure_cosmos_db_no_sql/", "emoji": "💻"}, | |
{"title": "Vector Database Prompt Engineering RAG for Python", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/vector-database?source=recommendations", "emoji": "💻"}, | |
{"title": "MergeKit Official GitHub", "url": "https://github.com/arcee-ai/MergeKit", "emoji": "💻"}, | |
{"title": "MergeKit Sample Usage", "url": "https://github.com/arcee-ai/MergeKit#examples", "emoji": "📚"}, | |
{"title": "DistillKit Official GitHub", "url": "https://github.com/arcee-ai/DistillKit", "emoji": "💻"}, | |
{"title": "DistillKit Sample Usage", "url": "https://github.com/arcee-ai/DistillKit#usage", "emoji": "📚"}, | |
{"title": "arcee.ai Official Website", "url": "https://arcee.ai", "emoji": "🌐"}, | |
] | |
# ============================================================================= | |
# ───────────── APP CONFIGURATION ───────────── | |
# ============================================================================= | |
Site_Name = '🐙 GitCosmos' | |
title = "🐙 GitCosmos" | |
helpURL = 'https://huggingface.co/awacke1' | |
bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/' | |
icons = '🐙🌌💫' | |
st.set_page_config( | |
page_title=title, | |
page_icon=icons, | |
layout="wide", | |
initial_sidebar_state="auto", | |
menu_items={ | |
'Get Help': helpURL, | |
'Report a bug': bugURL, | |
'About': title | |
} | |
) | |
# Cosmos DB & App URLs | |
ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") | |
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") | |
Key = os.environ.get("Key") | |
LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI" | |
CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer' | |
# ============================================================================= | |
# ───────────── HELPER FUNCTIONS ───────────── | |
# ============================================================================= | |
def get_download_link(file_path): | |
with open(file_path, "rb") as file: | |
contents = file.read() | |
b64 = base64.b64encode(contents).decode() | |
file_name = os.path.basename(file_path) | |
return f'<a href="data:file/txt;base64,{b64}" download="{file_name}">Download {file_name} 📂</a>' | |
def generate_unique_id(): | |
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
unique_uuid = str(uuid.uuid4()) | |
return_value = f"{timestamp}-{unique_uuid}" | |
st.write('New ID: ' + return_value) | |
return return_value | |
def generate_filename(prompt, file_type): | |
central = pytz.timezone('US/Central') | |
safe_date_time = datetime.now(central).strftime("%m%d_%H%M") | |
safe_prompt = re.sub(r'\W+', '', prompt)[:90] | |
return f"{safe_date_time}{safe_prompt}.{file_type}" | |
def create_file(filename, prompt, response, should_save=True): | |
if not should_save: | |
return | |
with open(filename, 'w', encoding='utf-8') as file: | |
file.write(prompt + "\n\n" + response) | |
def load_file(file_name): | |
with open(file_name, "r", encoding='utf-8') as file: | |
content = file.read() | |
return content | |
def display_glossary_entity(k): | |
search_urls = { | |
"🚀": lambda k: f"/?q={k}", | |
"📖": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", | |
"🔍": lambda k: f"https://www.google.com/search?q={quote(k)}", | |
"🎥": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", | |
} | |
links_md = ' '.join([f"<a href='{url(k)}' target='_blank'>{emoji}</a>" for emoji, url in search_urls.items()]) | |
st.markdown(f"{k} {links_md}", unsafe_allow_html=True) | |
def create_zip_of_files(files): | |
zip_name = "all_files.zip" | |
with zipfile.ZipFile(zip_name, 'w') as zipf: | |
for file in files: | |
zipf.write(file) | |
return zip_name | |
def get_video_html(video_path, width="100%"): | |
video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" | |
return f''' | |
<video width="{width}" controls autoplay loop> | |
<source src="{video_url}" type="video/mp4"> | |
Your browser does not support video. | |
</video> | |
''' | |
def get_audio_html(audio_path, width="100%"): | |
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" | |
return f''' | |
<audio controls style="width:{width}"> | |
<source src="{audio_url}" type="audio/mpeg"> | |
Your browser does not support audio. | |
</audio> | |
''' | |
def preprocess_text(text): | |
text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n') | |
text = text.replace('"', '\\"') | |
text = re.sub(r'[\t]', ' ', text) | |
text = re.sub(r'[^\x00-\x7F]+', '', text) | |
return text.strip() | |
# ============================================================================= | |
# ───────────── COSMOS DB FUNCTIONS ───────────── | |
# ============================================================================= | |
def get_databases(client): | |
return [db['id'] for db in client.list_databases()] | |
def get_containers(database): | |
return [container['id'] for container in database.list_containers()] | |
def get_documents(container, limit=None): | |
query = "SELECT * FROM c ORDER BY c._ts DESC" | |
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) | |
return items | |
def insert_record(container, record): | |
try: | |
container.create_item(body=record) | |
return True, "Inserted! 🎉" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error: {str(e)} 🚨" | |
except Exception as e: | |
return False, f"Error: {str(e)} 😱" | |
def update_record(container, updated_record): | |
try: | |
container.upsert_item(body=updated_record) | |
return True, f"Updated {updated_record['id']} 🛠️" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error: {str(e)} 🚨" | |
except Exception as e: | |
return False, f"Error: {traceback.format_exc()} 😱" | |
def delete_record(container, record): | |
try: | |
if "id" not in record: | |
return False, "Record must contain an 'id' field. 🛑" | |
doc_id = record["id"] | |
if "delete_log" not in st.session_state: | |
st.session_state.delete_log = [] | |
st.session_state.delete_log.append(f"Attempting to delete document: {json.dumps(record, indent=2)}") | |
partition_key_value = record.get("pk", doc_id) | |
st.session_state.delete_log.append(f"Using ID and Partition Key: {partition_key_value}") | |
container.delete_item(item=doc_id, partition_key=partition_key_value) | |
success_msg = f"Record {doc_id} successfully deleted from Cosmos DB. 🗑️" | |
st.session_state.delete_log.append(success_msg) | |
return True, success_msg | |
except exceptions.CosmosResourceNotFoundError: | |
success_msg = f"Record {doc_id} not found in Cosmos DB (already deleted or never existed). 🗑️" | |
st.session_state.delete_log.append(success_msg) | |
return True, success_msg | |
except exceptions.CosmosHttpResponseError as e: | |
error_msg = f"HTTP error deleting {doc_id}: {str(e)}. 🚨" | |
st.session_state.delete_log.append(error_msg) | |
return False, error_msg | |
except Exception as e: | |
error_msg = f"Unexpected error deleting {doc_id}: {str(traceback.format_exc())}. 😱" | |
st.session_state.delete_log.append(error_msg) | |
return False, error_msg | |
def save_to_cosmos_db(container, query, response1, response2): | |
try: | |
if container: | |
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
unique_uuid = str(uuid.uuid4()) | |
new_id = f"{timestamp}-{unique_uuid}" | |
record = { | |
"id": new_id, | |
"pk": new_id, | |
"name": new_id, | |
"query": query, | |
"response1": response1, | |
"response2": response2, | |
"timestamp": datetime.utcnow().isoformat(), | |
"type": "ai_response", | |
"version": "1.0" | |
} | |
container.create_item(body=record) | |
st.success(f"Saved: {record['id']}") | |
st.session_state.documents = get_documents(container) | |
else: | |
st.error("Cosmos container not initialized.") | |
except Exception as e: | |
st.error(f"Save error: {str(e)}") | |
def archive_current_container(database_name, container_name, client): | |
try: | |
base_dir = "./cosmos_archive_current_container" | |
if os.path.exists(base_dir): | |
shutil.rmtree(base_dir) | |
os.makedirs(base_dir) | |
db_client = client.get_database_client(database_name) | |
container_client = db_client.get_container_client(container_name) | |
items = list(container_client.read_all_items()) | |
container_dir = os.path.join(base_dir, container_name) | |
os.makedirs(container_dir) | |
for item in items: | |
item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") | |
with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: | |
json.dump(item, f, indent=2) | |
archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
shutil.make_archive(archive_name, 'zip', base_dir) | |
return get_download_link(f"{archive_name}.zip") | |
except Exception as e: | |
return f"Archive error: {str(e)} 😢" | |
# ============================================================================= | |
# ───────────── ADVANCED COSMOS FUNCTIONS ───────────── | |
# ============================================================================= | |
def create_new_container(database, container_id, partition_key_path, | |
analytical_storage_ttl=None, indexing_policy=None, vector_embedding_policy=None): | |
try: | |
if analytical_storage_ttl is not None: | |
container = database.create_container( | |
id=container_id, | |
partition_key=PartitionKey(path=partition_key_path), | |
analytical_storage_ttl=analytical_storage_ttl, | |
indexing_policy=indexing_policy, | |
vector_embedding_policy=vector_embedding_policy | |
) | |
else: | |
container = database.create_container( | |
id=container_id, | |
partition_key=PartitionKey(path=partition_key_path), | |
indexing_policy=indexing_policy, | |
vector_embedding_policy=vector_embedding_policy | |
) | |
except exceptions.CosmosHttpResponseError as e: | |
if analytical_storage_ttl is not None and "analyticalStorageTtl" in str(e): | |
try: | |
container = database.create_container( | |
id=container_id, | |
partition_key=PartitionKey(path=partition_key_path), | |
indexing_policy=indexing_policy, | |
vector_embedding_policy=vector_embedding_policy | |
) | |
except Exception as e2: | |
st.error(f"Error creating container without analytical_storage_ttl: {str(e2)}") | |
return None | |
elif isinstance(e, exceptions.CosmosResourceExistsError): | |
container = database.get_container_client(container_id) | |
else: | |
st.error(f"Error creating container: {str(e)}") | |
return None | |
return container | |
def advanced_insert_item(container, item): | |
try: | |
container.upsert_item(item) | |
return True, f"Item {item.get('id', '')} inserted. ➕" | |
except Exception as e: | |
return False, str(e) | |
def advanced_update_item(container, item): | |
try: | |
container.upsert_item(item) | |
return True, f"Item {item.get('id', '')} updated. ✏️" | |
except Exception as e: | |
return False, str(e) | |
def advanced_delete_item(container, item_id, partition_key_value): | |
try: | |
container.delete_item(item=item_id, partition_key=partition_key_value) | |
return True, f"Item {item_id} deleted. 🗑️" | |
except Exception as e: | |
return False, str(e) | |
def vector_search(container, query_vector, vector_field, top=10, exact_search=False): | |
query_vector_str = json.dumps(query_vector) | |
query = f"""SELECT TOP {top} c.id, VectorDistance(c.{vector_field}, {query_vector_str}, {str(exact_search).lower()}, | |
{{'dataType':'float32','distanceFunction':'cosine'}}) AS SimilarityScore | |
FROM c ORDER BY SimilarityScore""" | |
results = list(container.query_items(query=query, enable_cross_partition_query=True)) | |
return results | |
# ============================================================================= | |
# ───────────── GITHUB FUNCTIONS ───────────── | |
# ============================================================================= | |
def download_github_repo(url, local_path): | |
if os.path.exists(local_path): | |
shutil.rmtree(local_path) | |
Repo.clone_from(url, local_path) | |
def create_zip_file(source_dir, output_filename): | |
shutil.make_archive(output_filename, 'zip', source_dir) | |
def create_repo(g, repo_name): | |
user = g.get_user() | |
return user.create_repo(repo_name) | |
def push_to_github(local_path, repo, github_token): | |
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" | |
local_repo = Repo(local_path) | |
if 'origin' in [remote.name for remote in local_repo.remotes]: | |
origin = local_repo.remote('origin') | |
origin.set_url(repo_url) | |
else: | |
origin = local_repo.create_remote('origin', repo_url) | |
if not local_repo.heads: | |
local_repo.git.checkout('-b', 'main') | |
current_branch = 'main' | |
else: | |
current_branch = local_repo.active_branch.name | |
local_repo.git.add(A=True) | |
if local_repo.is_dirty(): | |
local_repo.git.commit('-m', 'Initial commit') | |
origin.push(refspec=f'{current_branch}:{current_branch}') | |
# ============================================================================= | |
# ───────────── FILE & MEDIA MANAGEMENT FUNCTIONS ───────────── | |
# ============================================================================= | |
def display_saved_files_in_sidebar(): | |
all_files = sorted([f for f in glob.glob("*.md") if not f.lower().startswith('readme')], reverse=True) | |
st.sidebar.markdown("## 📁 Files") | |
for file in all_files: | |
col1, col2, col3 = st.sidebar.columns([6, 2, 1]) | |
with col1: | |
st.markdown(f"📄 {file}") | |
with col2: | |
st.sidebar.download_button( | |
label="⬇️", | |
data=open(file, 'rb').read(), | |
file_name=file | |
) | |
with col3: | |
if st.sidebar.button("🗑", key=f"delete_{file}"): | |
os.remove(file) | |
st.rerun() | |
def display_file_viewer(file_path): | |
content = load_file(file_path) | |
if content: | |
st.markdown("### 📄 File Viewer") | |
st.markdown(f"**{file_path}**") | |
file_stats = os.stat(file_path) | |
st.markdown(f"**Mod:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')} | **Size:** {file_stats.st_size} bytes") | |
st.markdown("---") | |
st.markdown(content) | |
st.download_button("⬇️", data=content, file_name=os.path.basename(file_path), mime="text/markdown") | |
def display_file_editor(file_path): | |
if 'file_content' not in st.session_state: | |
st.session_state.file_content = {} | |
if file_path not in st.session_state.file_content: | |
content = load_file(file_path) | |
if content is not None: | |
st.session_state.file_content[file_path] = content | |
else: | |
return | |
st.markdown("### ✏️ Edit File") | |
st.markdown(f"**Editing:** {file_path}") | |
md_tab, code_tab = st.tabs(["Markdown", "Code"]) | |
with md_tab: | |
st.markdown(st.session_state.file_content[file_path]) | |
with code_tab: | |
new_content = st.text_area("Edit:", value=st.session_state.file_content[file_path], height=400, key=f"editor_{hash(file_path)}", on_change=lambda: auto_save_edit()) | |
col1, col2 = st.columns([1, 5]) | |
with col1: | |
if st.button("💾 Save"): | |
if save_file_content(file_path, new_content): | |
st.session_state.file_content[file_path] = new_content | |
st.success("Saved! 🎉") | |
time.sleep(1) | |
st.rerun() | |
with col2: | |
st.download_button("⬇️", data=new_content, file_name=os.path.basename(file_path), mime="text/markdown") | |
def save_file_content(file_path, content): | |
try: | |
with open(file_path, 'w', encoding='utf-8') as file: | |
file.write(content) | |
return True | |
except Exception as e: | |
st.error(f"Save error: {str(e)}") | |
return False | |
def update_file_management_section(): | |
if 'file_view_mode' not in st.session_state: | |
st.session_state.file_view_mode = None | |
if 'current_file' not in st.session_state: | |
st.session_state.current_file = None | |
if 'file_content' not in st.session_state: | |
st.session_state.file_content = {} | |
all_files = sorted(glob.glob("*.md"), reverse=True) | |
st.sidebar.title("📁 Files") | |
if st.sidebar.button("🗑 Delete All"): | |
for file in all_files: | |
os.remove(file) | |
st.session_state.file_content = {} | |
st.session_state.current_file = None | |
st.session_state.file_view_mode = None | |
st.rerun() | |
if st.sidebar.button("⬇️ Download All"): | |
zip_file = create_zip_of_files(all_files) | |
st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True) | |
for file in all_files: | |
col1, col2, col3, col4 = st.sidebar.columns([1, 3, 1, 1]) | |
with col1: | |
if st.button("🌐", key=f"view_{file}"): | |
st.session_state.current_file = file | |
st.session_state.file_view_mode = 'view' | |
if file not in st.session_state.file_content: | |
content = load_file(file) | |
if content is not None: | |
st.session_state.file_content[file] = content | |
st.rerun() | |
with col2: | |
st.markdown(get_download_link(file), unsafe_allow_html=True) | |
with col3: | |
if st.button("📂", key=f"edit_{file}"): | |
st.session_state.current_file = file | |
st.session_state.file_view_mode = 'edit' | |
if file not in st.session_state.file_content: | |
content = load_file(file) | |
if content is not None: | |
st.session_state.file_content[file] = content | |
st.rerun() | |
with col4: | |
if st.button("🗑", key=f"delete_{file}"): | |
os.remove(file) | |
if file in st.session_state.file_content: | |
del st.session_state.file_content[file] | |
if st.session_state.current_file == file: | |
st.session_state.current_file = None | |
st.session_state.file_view_mode = None | |
st.rerun() | |
st.sidebar.markdown("---") | |
st.sidebar.title("External Help Links") | |
for link in external_links: | |
st.sidebar.markdown(f"{link['emoji']} [{link['title']}]({link['url']})", unsafe_allow_html=True) | |
if st.session_state.current_file: | |
if st.session_state.file_view_mode == 'view': | |
display_file_viewer(st.session_state.current_file) | |
elif st.session_state.file_view_mode == 'edit': | |
display_file_editor(st.session_state.current_file) | |
# ============================================================================= | |
# ───────────── SIDEBAR DATA GRID (Records with formatted timestamps) ───────────── | |
# ============================================================================= | |
def show_sidebar_data_grid(): | |
if st.session_state.get("current_container"): | |
try: | |
records = get_documents(st.session_state.current_container) | |
data = [] | |
for rec in records: | |
ts = rec.get("timestamp", "") | |
try: | |
dt = datetime.fromisoformat(ts) | |
formatted = dt.strftime("%I:%M %p %m/%d/%Y") | |
except Exception: | |
formatted = ts | |
data.append({ | |
"ID": rec.get("id", ""), | |
"Name": rec.get("name", ""), | |
"Timestamp": formatted | |
}) | |
df = pd.DataFrame(data) | |
st.sidebar.markdown("### 📊 Data Grid") | |
st.sidebar.dataframe(df) | |
except Exception as e: | |
st.sidebar.error(f"Data grid error: {str(e)}") | |
else: | |
st.sidebar.info("No container selected for data grid.") | |
# ============================================================================= | |
# ───────────── VIDEO & AUDIO UI FUNCTIONS ───────────── | |
# ============================================================================= | |
def validate_and_preprocess_image(file_data, target_size=(576, 1024)): | |
try: | |
st.write("Preprocessing image...") | |
if isinstance(file_data, bytes): | |
img = Image.open(io.BytesIO(file_data)) | |
elif hasattr(file_data, 'read'): | |
if hasattr(file_data, 'seek'): | |
file_data.seek(0) | |
img = Image.open(file_data) | |
elif isinstance(file_data, Image.Image): | |
img = file_data | |
else: | |
raise ValueError(f"Unsupported input: {type(file_data)}") | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
aspect_ratio = img.size[0] / img.size[1] | |
if aspect_ratio > target_size[0] / target_size[1]: | |
new_width = target_size[0] | |
new_height = int(new_width / aspect_ratio) | |
else: | |
new_height = target_size[1] | |
new_width = int(new_height * aspect_ratio) | |
new_width = (new_width // 2) * 2 | |
new_height = (new_height // 2) * 2 | |
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
final_img = Image.new('RGB', target_size, (255, 255, 255)) | |
paste_x = (target_size[0] - new_width) // 2 | |
paste_y = (target_size[1] - new_height) // 2 | |
final_img.paste(resized_img, (paste_x, paste_y)) | |
return final_img | |
except Exception as e: | |
st.error(f"Image error: {str(e)}") | |
return None | |
def add_video_generation_ui(container): | |
st.markdown("### 🎥 Video Gen") | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
uploaded_file = st.file_uploader("Upload Image 🖼️", type=['png', 'jpg', 'jpeg']) | |
with col2: | |
st.markdown("#### Params") | |
motion = st.slider("🌊 Motion", 1, 255, 127) | |
fps = st.slider("🎬 FPS", 1, 30, 6) | |
with st.expander("Advanced"): | |
use_custom = st.checkbox("Custom Seed") | |
seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None | |
if uploaded_file is not None: | |
try: | |
file_data = uploaded_file.read() | |
preview1, preview2 = st.columns(2) | |
with preview1: | |
st.write("Original") | |
st.image(Image.open(io.BytesIO(file_data)), use_column_width=True) | |
with preview2: | |
proc_img = validate_and_preprocess_image(io.BytesIO(file_data)) | |
if proc_img: | |
st.write("Processed") | |
st.image(proc_img, use_column_width=True) | |
else: | |
st.error("Preprocess failed") | |
return | |
if st.button("🎥 Generate"): | |
with st.spinner("Generating video..."): | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file: | |
proc_img.save(temp_file.name, format='PNG') | |
try: | |
client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN")) | |
result = client.predict( | |
image=temp_file.name, | |
seed=seed if seed is not None else int(time.time() * 1000), | |
randomize_seed=seed is None, | |
motion_bucket_id=motion, | |
fps_id=fps, | |
api_name="/video" | |
) | |
if result and isinstance(result, tuple) and len(result) >= 1: | |
video_path = result[0].get('video') if isinstance(result[0], dict) else None | |
if video_path and os.path.exists(video_path): | |
video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" | |
shutil.copy(video_path, video_filename) | |
st.success(f"Video generated! 🎉") | |
st.video(video_filename) | |
if container: | |
video_record = { | |
"id": generate_unique_id(), | |
"pk": generate_unique_id(), | |
"type": "generated_video", | |
"filename": video_filename, | |
"seed": seed if seed is not None else "random", | |
"motion": motion, | |
"fps": fps, | |
"timestamp": datetime.now().isoformat() | |
} | |
success, message = insert_record(container, video_record) | |
if success: | |
st.success("DB record saved!") | |
else: | |
st.error(f"DB error: {message}") | |
else: | |
st.error("Invalid result format") | |
else: | |
st.error("No result returned") | |
except Exception as e: | |
st.error(f"Video gen error: {str(e)}") | |
finally: | |
try: | |
os.unlink(temp_file.name) | |
st.write("Temp file removed") | |
except Exception as e: | |
st.warning(f"Cleanup error: {str(e)}") | |
except Exception as e: | |
st.error(f"Upload error: {str(e)}") | |
# ============================================================================= | |
# ───────────── AI SAMPLES SIDEBAR (Processed as a Python List) ───────────── | |
# ============================================================================= | |
def display_ai_samples(): | |
ai_samples = [ | |
{ | |
"name": "FullTextContains", | |
"description": "Query using FullTextContains", | |
"query": 'SELECT TOP 10 * FROM c WHERE FullTextContains(c.text, "bicycle")' | |
}, | |
{ | |
"name": "FullTextContainsAll", | |
"description": "Query using FullTextContainsAll", | |
"query": 'SELECT TOP 10 * FROM c WHERE FullTextContainsAll(c.text, "red", "bicycle")' | |
}, | |
{ | |
"name": "FullTextContainsAny", | |
"description": "Query using FullTextContainsAny", | |
"query": 'SELECT TOP 10 * FROM c WHERE FullTextContains(c.text, "red") AND FullTextContainsAny(c.text, "bicycle", "skateboard")' | |
}, | |
{ | |
"name": "FullTextScore", | |
"description": "Query using FullTextScore (order by relevance)", | |
"query": 'SELECT TOP 10 * FROM c ORDER BY RANK FullTextScore(c.text, ["bicycle", "mountain"])' | |
}, | |
{ | |
"name": "Vector Search with Score", | |
"description": "Example vector search snippet", | |
"query": 'results = vector_search.similarity_search_with_score(query="Your query", k=5)\nfor result, score in results:\n print(result.json(), score)' | |
}, | |
{ | |
"name": "Vector Search with Filtering", | |
"description": "Example vector search with a filter", | |
"query": 'pre_filter = {"conditions": [{"property": "metadata.page", "operator": "$eq", "value": 0}]}\nresults = vector_search.similarity_search_with_score(query="Your query", k=5, pre_filter=pre_filter)' | |
}, | |
{ | |
"name": "Hybrid Search", | |
"description": "Example hybrid search snippet", | |
"query": 'results = vector_search.similarity_search_with_score(query="Your query", k=5, query_type=CosmosDBQueryType.HYBRID)' | |
} | |
] | |
st.sidebar.markdown("### 🤖 AI Samples") | |
st.sidebar.info("🚀 Get started with our AI samples! Time free access to get started today.") | |
sample_names = [sample["name"] for sample in ai_samples] | |
selected_sample_name = st.sidebar.selectbox("Select an AI Sample", sample_names) | |
selected_sample = next((s for s in ai_samples if s["name"] == selected_sample_name), None) | |
if selected_sample: | |
st.sidebar.markdown(f"**{selected_sample['name']}**: {selected_sample['description']}") | |
lang = "sql" if "FullText" in selected_sample["name"] else "python" | |
st.sidebar.code(selected_sample["query"], language=lang) | |
# ============================================================================= | |
# ───────────── NEW ITEM & FIELD FUNCTIONS | |
# ============================================================================= | |
def new_item_default(container): | |
new_id = generate_unique_id() | |
default_doc = { | |
"id": new_id, | |
"pk": new_id, | |
"name": "New Sample Document", | |
"content": "Start editing your document here...", | |
"timestamp": datetime.now().isoformat(), | |
"type": "sample" | |
} | |
success, message = insert_record(container, default_doc) | |
if success: | |
st.success("New sample document created! ✨") | |
return default_doc | |
else: | |
st.error("Error creating new item: " + message) | |
return None | |
def auto_save_edit(): | |
try: | |
edited_str = st.session_state.doc_editor | |
edited_doc = json.loads(edited_str) | |
container = st.session_state.current_container | |
container.upsert_item(edited_doc) | |
st.success("Auto-saved! 💾") | |
except Exception as e: | |
st.error(f"Auto-save error: {str(e)}") | |
def add_field_to_doc(): | |
key = st.session_state.new_field_key | |
value = st.session_state.new_field_value | |
try: | |
doc = json.loads(st.session_state.doc_editor) | |
doc[key] = value | |
st.session_state.doc_editor = json.dumps(doc, indent=2) | |
auto_save_edit() | |
st.success(f"Added field {key} 👍") | |
except Exception as e: | |
st.error(f"Error adding field: {str(e)}") | |
# ============================================================================= | |
# ───────────── VECTOR SEARCH INTERFACE (Simple keyword search) | |
# ============================================================================= | |
def vector_keyword_search(keyword, container): | |
try: | |
query = f"SELECT * FROM c WHERE CONTAINS(c.content, '{keyword}')" | |
results = list(container.query_items(query=query, enable_cross_partition_query=True)) | |
return results | |
except Exception as e: | |
st.error(f"Vector search error: {str(e)}") | |
return [] | |
# ============================================================================= | |
# ───────────── NEW AI MODALITY RECORD TEMPLATES | |
# ============================================================================= | |
def new_ai_record(container): | |
new_id = generate_unique_id() | |
default_doc = { | |
"id": new_id, | |
"pk": new_id, | |
"name": "AI Modality Record", | |
"function_url": "https://example.com/function", | |
"input_text": "### Input (markdown)\n\nType your input here.", | |
"output_text": "### Output (markdown)\n\nResult will appear here.", | |
"timestamp": datetime.now().isoformat(), | |
"type": "ai_modality" | |
} | |
success, message = insert_record(container, default_doc) | |
if success: | |
st.success("New AI modality record created! 💡") | |
return default_doc | |
else: | |
st.error("Error creating AI record: " + message) | |
return None | |
def new_links_record(container): | |
new_id = generate_unique_id() | |
links_md = "\n".join([f"- {link['emoji']} [{link['title']}]({link['url']})" for link in external_links]) | |
default_doc = { | |
"id": new_id, | |
"pk": new_id, | |
"name": "Portal Links Record", | |
"function_url": "", | |
"input_text": links_md, | |
"output_text": "", | |
"timestamp": datetime.now().isoformat(), | |
"type": "ai_modality" | |
} | |
success, message = insert_record(container, default_doc) | |
if success: | |
st.success("New Portal Links record created! 🔗") | |
return default_doc | |
else: | |
st.error("Error creating links record: " + message) | |
return None | |
# ============================================================================= | |
# ───────────── LANGCHAIN FUNCTIONS (Witty emoji comments) | |
# ============================================================================= | |
def display_langchain_functions(): | |
functions = [ | |
{"name": "OpenAIEmbeddings", "comment": "🔮 Creates embeddings using OpenAI – pure magic!"}, | |
{"name": "AzureCosmosDBNoSqlVectorSearch", "comment": "🚀 Performs vector search on Cosmos DB – superfast and smart!"}, | |
{"name": "RecursiveCharacterTextSplitter", "comment": "✂️ Slices text into manageable chunks – like a pro chef!"} | |
] | |
st.sidebar.markdown("### 🤖 Langchain Functions") | |
for func in functions: | |
st.sidebar.write(f"{func['name']}: {func['comment']}") | |
# ============================================================================= | |
# ───────────── OPTIONAL: SIDEBAR DATA GRID (Records with formatted timestamps) | |
# ============================================================================= | |
# (This feature is now integrated above via show_sidebar_data_grid().) | |
# ============================================================================= | |
# ───────────── ASYNC TTS & ARXIV FUNCTIONS (Optional Features) | |
# ============================================================================= | |
import asyncio | |
import edge_tts | |
from streamlit_marquee import streamlit_marquee | |
from collections import Counter | |
class PerformanceTimer: | |
def __init__(self, operation_name: str): | |
self.operation_name = operation_name | |
self.start_time = None | |
def __enter__(self): | |
self.start_time = time.time() | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
pass | |
async def async_edge_tts_generate(text: str, voice: str, rate: int = 0, pitch: int = 0, file_format: str = "mp3"): | |
with PerformanceTimer("tts_generation") as timer: | |
text = text.replace("\n", " ").strip() | |
if not text: | |
return None, 0 | |
cache_key = f"{text[:100]}_{voice}_{rate}_{pitch}_{file_format}" | |
if cache_key in st.session_state.get('audio_cache', {}): | |
return st.session_state['audio_cache'][cache_key], 0 | |
try: | |
rate_str = f"{rate:+d}%" | |
pitch_str = f"{pitch:+d}Hz" | |
communicate = edge_tts.Communicate(text, voice, rate=rate_str, pitch=pitch_str) | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"audio_{timestamp}_{random.randint(1000, 9999)}.{file_format}" | |
await communicate.save(filename) | |
st.session_state.setdefault('audio_cache', {})[cache_key] = filename | |
return filename, time.time() - timer.start_time | |
except Exception as e: | |
st.error(f"Error generating audio: {str(e)}") | |
return None, 0 | |
def speak_with_edge_tts(text, voice="en-US-AriaNeural", rate=0, pitch=0, file_format="mp3"): | |
result = asyncio.run(async_edge_tts_generate(text, voice, rate, pitch, file_format)) | |
if isinstance(result, tuple): | |
return result[0] | |
return result | |
async def async_save_qa_with_audio(question: str, answer: str): | |
with PerformanceTimer("qa_save") as timer: | |
md_file = create_file(question, answer, "md") | |
audio_file = None | |
if st.session_state.get('enable_audio', True): | |
audio_text = f"{question}\n\nAnswer: {answer}" | |
audio_file, _ = await async_edge_tts_generate(audio_text, voice=st.session_state.get('tts_voice', "en-US-AriaNeural"), file_format=st.session_state.get('audio_format', "mp3")) | |
return md_file, audio_file, time.time() - timer.start_time, 0 | |
def save_qa_with_audio(question, answer, voice=None): | |
if not voice: | |
voice = st.session_state.get('tts_voice', "en-US-AriaNeural") | |
md_file = create_file(question, answer, "md") | |
audio_text = f"{question}\n\nAnswer: {answer}" | |
audio_file = speak_with_edge_tts(audio_text, voice=voice, file_format=st.session_state.get('audio_format', "mp3")) | |
return md_file, audio_file | |
def play_and_download_audio(file_path, file_type="mp3"): | |
if file_path and os.path.exists(file_path): | |
st.audio(file_path) | |
dl_link = get_download_link(file_path, file_type=file_type) | |
st.markdown(dl_link, unsafe_allow_html=True) | |
def create_download_link_with_cache(file_path: str, file_type: str = "mp3") -> str: | |
cache_key = f"dl_{file_path}" | |
if cache_key in st.session_state.get('download_link_cache', {}): | |
return st.session_state['download_link_cache'][cache_key] | |
try: | |
with open(file_path, "rb") as f: | |
b64 = base64.b64encode(f.read()).decode() | |
filename = os.path.basename(file_path) | |
if file_type == "mp3": | |
link = f'<a href="data:audio/mpeg;base64,{b64}" download="{filename}">🎵 Download {filename}</a>' | |
elif file_type == "wav": | |
link = f'<a href="data:audio/wav;base64,{b64}" download="{filename}">🔊 Download {filename}</a>' | |
elif file_type == "md": | |
link = f'<a href="data:text/markdown;base64,{b64}" download="{filename}">📝 Download {filename}</a>' | |
else: | |
link = f'<a href="data:application/octet-stream;base64,{b64}" download="{filename}">Download {filename}</a>' | |
st.session_state.setdefault('download_link_cache', {})[cache_key] = link | |
return link | |
except Exception as e: | |
st.error(f"Error creating download link: {str(e)}") | |
return "" | |
# ============================================================================= | |
# ───────────── RESEARCH / ARXIV FUNCTIONS (Optional Features) | |
# ============================================================================= | |
def parse_arxiv_refs(ref_text: str): | |
if not ref_text: | |
return [] | |
results = [] | |
current_paper = {} | |
lines = ref_text.split('\n') | |
for i, line in enumerate(lines): | |
if line.count('|') == 2: | |
if current_paper: | |
results.append(current_paper) | |
if len(results) >= 20: | |
break | |
try: | |
header_parts = line.strip('* ').split('|') | |
date = header_parts[0].strip() | |
title = header_parts[1].strip() | |
url_match = re.search(r'(https://arxiv.org/\S+)', line) | |
url = url_match.group(1) if url_match else f"paper_{len(results)}" | |
current_paper = { | |
'date': date, | |
'title': title, | |
'url': url, | |
'authors': '', | |
'summary': '', | |
'full_audio': None, | |
'download_base64': '', | |
} | |
except Exception as e: | |
st.warning(f"Error parsing paper header: {str(e)}") | |
current_paper = {} | |
continue | |
elif current_paper: | |
if not current_paper['authors']: | |
current_paper['authors'] = line.strip('* ') | |
else: | |
if current_paper['summary']: | |
current_paper['summary'] += ' ' + line.strip() | |
else: | |
current_paper['summary'] = line.strip() | |
if current_paper: | |
results.append(current_paper) | |
return results[:20] | |
def create_paper_links_md(papers): | |
lines = ["# Paper Links\n"] | |
for i, p in enumerate(papers, start=1): | |
lines.append(f"{i}. **{p['title']}** — [Arxiv Link]({p['url']})") | |
return "\n".join(lines) | |
def generate_pdf_link(url: str) -> str: | |
if "abs" in url: | |
pdf_url = url.replace("abs", "pdf") | |
if not pdf_url.endswith(".pdf"): | |
pdf_url += ".pdf" | |
return pdf_url | |
return url | |
def generate_5min_feature_markdown(paper: dict) -> str: | |
title = paper.get('title', '') | |
summary = paper.get('summary', '') | |
authors = paper.get('authors', '') | |
date = paper.get('date', '') | |
url = paper.get('url', '') | |
pdf_link = generate_pdf_link(url) | |
title_wc = len(title.split()) | |
summary_wc = len(summary.split()) | |
high_info_terms = [term for term in summary.split()[:5]] | |
terms_str = ", ".join(high_info_terms) | |
rouge_score = round((len(high_info_terms) / max(len(summary.split()), 1)) * 100, 2) | |
mermaid_code = "```mermaid\nflowchart TD\n" | |
for i in range(len(high_info_terms) - 1): | |
mermaid_code += f' T{i+1}["{high_info_terms[i]}"] --> T{i+2}["{high_info_terms[i+1]}"]\n' | |
mermaid_code += "```" | |
md = f""" | |
## {title} | |
**Authors:** {authors} | |
**Date:** {date} | |
**Word Count (Title):** {title_wc} | **Word Count (Summary):** {summary_wc} | |
**Links:** [Abstract]({url}) | [PDF]({pdf_link}) | |
**High Info Terms:** {terms_str} | |
**ROUGE Score:** {rouge_score}% | |
### Mermaid Graph of Key Concepts | |
{mermaid_code} | |
--- | |
""" | |
return md | |
def create_detailed_paper_md(papers: list) -> str: | |
md_parts = ["# Detailed Research Paper Summary\n"] | |
for idx, paper in enumerate(papers, start=1): | |
md_parts.append(generate_5min_feature_markdown(paper)) | |
return "\n".join(md_parts) | |
# ============================================================================= | |
# ───────────────────────────────────────────────────────── | |
# MAIN AI LOOKUP FUNCTION (Optional Features) | |
# ============================================================================= | |
def perform_ai_lookup(q, vocal_summary=True, extended_refs=False, titles_summary=True, full_audio=False, useArxiv=True, useArxivAudio=False): | |
start = time.time() | |
ai_constitution = """ | |
You are a medical and machine learning review board expert... | |
""" | |
# 1) Claude API call | |
import anthropic | |
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY_3")) | |
user_input = q | |
response = client.messages.create( | |
model="claude-3-sonnet-20240229", | |
max_tokens=1000, | |
messages=[{"role": "user", "content": user_input}] | |
) | |
st.write("Claude's reply 🧠:") | |
st.markdown(response.content[0].text) | |
result = response.content[0].text | |
create_file(q, result, "md") | |
md_file, audio_file = save_qa_with_audio(q, result) | |
st.subheader("📝 Main Response Audio") | |
play_and_download_audio(audio_file, st.session_state.get('audio_format', "mp3")) | |
if useArxiv: | |
q = q + result | |
st.write('Running Arxiv RAG with Claude inputs.') | |
client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") | |
refs = client.predict(q, 10, "Semantic Search", "mistralai/Mixtral-8x7B-Instruct-v0.1", api_name="/update_with_rag_md")[0] | |
result = f"🔎 {q}\n\n{refs}" | |
md_file, audio_file = save_qa_with_audio(q, result) | |
st.subheader("📝 Main Response Audio") | |
play_and_download_audio(audio_file, st.session_state.get('audio_format', "mp3")) | |
papers = parse_arxiv_refs(refs) | |
if papers: | |
paper_links = create_paper_links_md(papers) | |
links_file = create_file(q, paper_links, "md") | |
st.markdown(paper_links) | |
detailed_md = create_detailed_paper_md(papers) | |
detailed_file = create_file(q, detailed_md, "md") | |
st.markdown(detailed_md) | |
if useArxivAudio: | |
asyncio.run(async_edge_tts_generate("Sample text", st.session_state.get('tts_voice', "en-US-AriaNeural"))) | |
st.write("Displaying Papers:") | |
# (Optional: call functions to display papers) | |
else: | |
st.warning("No papers found.") | |
response2 = client.messages.create( | |
model="claude-3-sonnet-20240229", | |
max_tokens=1000, | |
messages=[{"role": "user", "content": q + '\n\nUse the reference papers below to answer the question by creating a python streamlit app.py and requirements.txt with working code.'}] | |
) | |
r2 = response2.content[0].text | |
st.write("Claude's reply 🧠:") | |
st.markdown(r2) | |
elapsed = time.time() - start | |
st.write(f"**Total Elapsed:** {elapsed:.2f} s") | |
return result | |
# ============================================================================= | |
# ───────────── MAIN FUNCTION ───────────── | |
# ============================================================================= | |
def main(): | |
st.markdown("### 🐙 GitCosmos - Cosmos & Git Hub") | |
st.markdown(f"[🔗 Portal]({CosmosDBUrl})") | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
st.session_state.setdefault("current_container", None) | |
if Key: | |
st.session_state.primary_key = Key | |
st.session_state.logged_in = True | |
else: | |
st.error("Missing Cosmos Key 🔑❌") | |
return | |
st.sidebar.markdown("## 🛠️ Item Management") | |
if st.sidebar.button("New Item"): | |
if st.session_state.get("current_container"): | |
new_doc = new_item_default(st.session_state.current_container) | |
if new_doc: | |
st.session_state.doc_editor = json.dumps(new_doc, indent=2) | |
else: | |
st.warning("No container selected!") | |
st.sidebar.text_input("New Field Key", key="new_field_key") | |
st.sidebar.text_input("New Field Value", key="new_field_value") | |
if st.sidebar.button("Add Field"): | |
if "doc_editor" in st.session_state: | |
add_field_to_doc() | |
else: | |
st.warning("No document loaded to add a field.") | |
if st.sidebar.button("New AI Record"): | |
if st.session_state.get("current_container"): | |
new_ai_record(st.session_state.current_container) | |
else: | |
st.warning("No container selected!") | |
if st.sidebar.button("New Links Record"): | |
if st.session_state.get("current_container"): | |
new_links_record(st.session_state.current_container) | |
else: | |
st.warning("No container selected!") | |
st.sidebar.markdown("## 🔍 Vector Search") | |
search_keyword = st.sidebar.text_input("Search Keyword", key="vector_search_keyword") | |
if st.sidebar.button("Search"): | |
if st.session_state.get("current_container"): | |
results = vector_keyword_search(search_keyword, st.session_state.current_container) | |
st.sidebar.write(f"Found {len(results)} results:") | |
for res in results: | |
st.sidebar.code(json.dumps(res, indent=2), language="json") | |
else: | |
st.warning("No container selected for search!") | |
show_sidebar_data_grid() | |
display_langchain_functions() | |
try: | |
if st.session_state.get("client") is None: | |
st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) | |
st.sidebar.title("🐙 Navigator") | |
databases = get_databases(st.session_state.client) | |
selected_db = st.sidebar.selectbox("🗃️ DB", databases) | |
st.markdown(CosmosDBUrl) | |
if selected_db != st.session_state.get("selected_database"): | |
st.session_state.selected_database = selected_db | |
st.session_state.selected_container = None | |
st.session_state.selected_document_id = None | |
st.session_state.current_index = 0 | |
st.rerun() | |
if st.session_state.selected_database: | |
database = st.session_state.client.get_database_client(st.session_state.selected_database) | |
if "show_new_container_form" not in st.session_state: | |
st.session_state.show_new_container_form = False | |
if st.sidebar.button("🆕 New Container"): | |
st.session_state.show_new_container_form = True | |
if st.session_state.show_new_container_form: | |
with st.sidebar.form("new_container_form"): | |
new_container_id = st.text_input("Container ID", value="aiml-container") | |
new_partition_key = st.text_input("Partition Key", value="/pk") | |
new_analytical = st.checkbox("Enable Analytical Store", value=True) | |
submitted = st.form_submit_button("Create Container") | |
if submitted: | |
analytical_ttl = -1 if new_analytical else None | |
new_container = create_new_container(database, new_container_id, new_partition_key, analytical_storage_ttl=analytical_ttl) | |
if new_container: | |
st.success(f"Container '{new_container_id}' created.") | |
default_id = generate_unique_id() | |
default_item = { | |
"id": default_id, | |
"pk": default_id, | |
"name": "Default Image Prompt", | |
"prompt": "Enter your image prompt here", | |
"timestamp": datetime.now().isoformat(), | |
"type": "image_prompt" | |
} | |
insert_success, insert_message = insert_record(new_container, default_item) | |
if insert_success: | |
st.info("Default templated item created in new container.") | |
else: | |
st.error(f"Default item insertion error: {insert_message}") | |
st.session_state.show_new_container_form = False | |
st.session_state.new_container_created = new_container_id | |
st.rerun() | |
containers = get_containers(database) | |
if "new_container_created" in st.session_state and st.session_state.new_container_created not in containers: | |
containers.append(st.session_state.new_container_created) | |
selected_container = st.sidebar.selectbox("📁 Container", containers) | |
if selected_container != st.session_state.get("selected_container"): | |
st.session_state.selected_container = selected_container | |
st.session_state.selected_document_id = None | |
st.session_state.current_index = 0 | |
st.rerun() | |
if st.session_state.selected_container: | |
container = database.get_container_client(st.session_state.selected_container) | |
st.session_state.current_container = container | |
if st.sidebar.button("📦 Export"): | |
download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client) | |
if download_link.startswith('<a'): | |
st.markdown(download_link, unsafe_allow_html=True) | |
else: | |
st.error(download_link) | |
documents = get_documents(container) | |
total_docs = len(documents) | |
num_docs = st.slider("Docs", 1, 20, 1) | |
documents_to_display = documents[:num_docs] if total_docs > num_docs else documents | |
st.sidebar.info(f"Showing {len(documents_to_display)} docs") | |
view_options = ['Markdown', 'Code', 'Run AI', 'Clone', 'New'] | |
selected_view = st.sidebar.selectbox("View", view_options, index=1) | |
if selected_view == 'Markdown': | |
st.markdown("#### 📄 Markdown") | |
if documents: | |
doc = documents[st.session_state.current_index] | |
content = json.dumps(doc, indent=2) | |
st.markdown(f"```json\n{content}\n```") | |
col_prev, col_next = st.columns(2) | |
with col_prev: | |
if st.button("⬅️") and st.session_state.current_index > 0: | |
st.session_state.current_index -= 1 | |
st.rerun() | |
with col_next: | |
if st.button("➡️") and st.session_state.current_index < total_docs - 1: | |
st.session_state.current_index += 1 | |
st.rerun() | |
elif selected_view == 'Code': | |
st.markdown("#### 💻 Code Editor") | |
if documents: | |
doc = documents[st.session_state.current_index] | |
if "doc_editor" not in st.session_state: | |
st.session_state.doc_editor = json.dumps(doc, indent=2) | |
edited = st.text_area("Edit JSON", value=st.session_state.doc_editor, height=300, key="doc_editor", on_change=lambda: auto_save_edit()) | |
col_prev, col_next = st.columns(2) | |
with col_prev: | |
if st.button("⬅️") and st.session_state.current_index > 0: | |
st.session_state.current_index -= 1 | |
st.rerun() | |
with col_next: | |
if st.button("➡️") and st.session_state.current_index < total_docs - 1: | |
st.session_state.current_index += 1 | |
st.rerun() | |
col_save, col_delete = st.columns(2) | |
with col_save: | |
if st.button("💾 Save", key=f'save_{st.session_state.current_index}'): | |
try: | |
updated_doc = json.loads(edited) | |
container.upsert_item(body=updated_doc) | |
st.success(f"Saved {updated_doc['id']}") | |
st.rerun() | |
except Exception as e: | |
st.error(f"Save err: {str(e)}") | |
with col_delete: | |
if st.button("🗑️ Delete", key=f'delete_{st.session_state.current_index}'): | |
try: | |
current_doc = json.loads(edited) | |
success, message = delete_record(container, current_doc) | |
if success: | |
st.success(message) | |
st.rerun() | |
else: | |
st.error(message) | |
except Exception as e: | |
st.error(f"Delete err: {str(e)}") | |
if "delete_log" in st.session_state and st.session_state.delete_log: | |
st.subheader("Delete Log") | |
for log_entry in st.session_state.delete_log[-5:]: | |
st.write(log_entry) | |
elif selected_view == 'Run AI': | |
st.markdown("#### 🤖 Run AI") | |
ai_query = st.text_area("Enter your query for ArXiv search:", key="arxiv_query", height=100) | |
if st.button("Send"): | |
st.session_state.last_query = ai_query | |
perform_ai_lookup(ai_query, vocal_summary=True, extended_refs=False, titles_summary=True, full_audio=True, useArxiv=True, useArxivAudio=False) | |
elif selected_view == 'Clone': | |
st.markdown("#### 📄 Clone") | |
if documents: | |
doc = documents[st.session_state.current_index] | |
st.markdown(f"Original ID: {doc.get('id', '')}") | |
new_id = st.text_input("New ID", value=generate_unique_id(), key='new_clone_id') | |
new_name = st.text_input("New Name", value=f"Clone_{new_id[:8]}", key='new_clone_name') | |
new_doc = {'id': new_id, 'pk': new_id, 'name': new_name, **{k: v for k, v in doc.items() if k not in ['id', 'name', 'pk', '_rid', '_self', '_etag', '_attachments', '_ts']}} | |
doc_str = st.text_area("Edit JSON", value=json.dumps(new_doc, indent=2), height=300, key='clone_preview') | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("🔄 Regenerate"): | |
new_id = generate_unique_id() | |
st.session_state.new_clone_id = new_id | |
st.rerun() | |
with col2: | |
if st.button("💾 Save Clone"): | |
try: | |
final_doc = json.loads(doc_str) | |
for field in ['_rid', '_self', '_etag', '_attachments', '_ts']: | |
final_doc.pop(field, None) | |
container.create_item(body=final_doc) | |
st.success(f"Cloned {final_doc['id']}") | |
st.rerun() | |
except Exception as e: | |
st.error(f"Clone err: {str(e)}") | |
col_prev, col_next = st.columns(2) | |
with col_prev: | |
if st.button("⬅️") and st.session_state.current_index > 0: | |
st.session_state.current_index -= 1 | |
st.rerun() | |
with col_next: | |
if st.button("➡️") and st.session_state.current_index < total_docs - 1: | |
st.session_state.current_index += 1 | |
st.rerun() | |
elif selected_view == 'New': | |
st.markdown("#### ➕ New Doc") | |
if st.button("🤖 Auto-Gen"): | |
auto_doc = { | |
"id": generate_unique_id(), | |
"pk": generate_unique_id(), | |
"name": f"Auto {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
"content": "Auto-generated record.", | |
"timestamp": datetime.now().isoformat() | |
} | |
success, message = insert_record(container, auto_doc) | |
if success: | |
st.success(message) | |
st.rerun() | |
else: | |
st.error(message) | |
else: | |
new_id = st.text_input("ID", value=generate_unique_id(), key='new_id') | |
default_doc = { | |
"id": new_id, | |
"pk": new_id, | |
"name": "New Doc", | |
"content": "", | |
"timestamp": datetime.now().isoformat() | |
} | |
new_doc_str = st.text_area("JSON", value=json.dumps(default_doc, indent=2), height=300) | |
if st.button("➕ Create"): | |
try: | |
cleaned = preprocess_text(new_doc_str) | |
new_doc = json.loads(cleaned) | |
new_doc['id'] = new_id | |
new_doc['pk'] = new_id | |
success, message = insert_record(container, new_doc) | |
if success: | |
st.success(f"Created {new_doc['id']}") | |
st.rerun() | |
else: | |
st.error(message) | |
except Exception as e: | |
st.error(f"Create err: {str(e)}") | |
st.subheader(f"📊 {st.session_state.selected_container}") | |
if documents_to_display: | |
df = pd.DataFrame(documents_to_display) | |
st.dataframe(df) | |
else: | |
st.info("No docs.") | |
update_file_management_section() | |
except exceptions.CosmosHttpResponseError as e: | |
st.error(f"Cosmos error: {str(e)} 🚨") | |
except Exception as e: | |
st.error(f"Error: {str(e)} 😱") | |
if st.session_state.logged_in and st.sidebar.button("🚪 Logout"): | |
st.markdown("#### 🚪 Logout") | |
st.session_state.logged_in = False | |
st.session_state.selected_records = [] | |
st.session_state.client = None | |
st.session_state.selected_database = None | |
st.session_state.selected_container = None | |
st.session_state.selected_document_id = None | |
st.session_state.current_index = 0 | |
st.rerun() | |
show_sidebar_data_grid() | |
if __name__ == "__main__": | |
main() |