# app.py # ============================================================================= # ───────────── IMPORTS ───────────── # ============================================================================= import base64 import glob import hashlib import json import os import pandas as pd import pytz import random import re import shutil import streamlit as st import time import traceback import uuid import zipfile from PIL import Image from azure.cosmos import CosmosClient, PartitionKey, exceptions from datetime import datetime from git import Repo from github import Github from gradio_client import Client, handle_file import tempfile import io import requests import numpy as np from urllib.parse import quote # ============================================================================= # ───────────── EXTERNAL HELP LINKS (Always visible in sidebar) ───────────── # ============================================================================= external_links = [ {"title": "CosmosDB GenAI Full Text Search", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/full-text-search", "emoji": "💻"}, {"title": "CosmosDB SQL API Client Library", "url": "https://learn.microsoft.com/en-us/python/api/overview/azure/cosmos-readme?view=azure-python", "emoji": "💻"}, {"title": "CosmosDB Index and Query Vectors", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-python-vector-index-query", "emoji": "💻"}, {"title": "CosmosDB NoSQL Materialized Views", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/materialized-views", "emoji": "💻"}, {"title": "LangChain Vector Store Guide", "url": "https://python.langchain.com/docs/integrations/vectorstores/azure_cosmos_db_no_sql/", "emoji": "💻"}, {"title": "Vector Database Prompt Engineering RAG for Python", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/vector-database?source=recommendations", "emoji": "💻"}, {"title": "MergeKit Official GitHub", "url": "https://github.com/arcee-ai/MergeKit", "emoji": "💻"}, {"title": "MergeKit Sample Usage", "url": "https://github.com/arcee-ai/MergeKit#examples", "emoji": "📚"}, {"title": "DistillKit Official GitHub", "url": "https://github.com/arcee-ai/DistillKit", "emoji": "💻"}, {"title": "DistillKit Sample Usage", "url": "https://github.com/arcee-ai/DistillKit#usage", "emoji": "📚"}, {"title": "arcee.ai Official Website", "url": "https://arcee.ai", "emoji": "🌐"}, ] # ============================================================================= # ───────────── APP CONFIGURATION ───────────── # ============================================================================= Site_Name = '🐙 GitCosmos' title = "🐙 GitCosmos" helpURL = 'https://huggingface.co/awacke1' bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/' icons = '🐙🌌💫' st.set_page_config( page_title=title, page_icon=icons, layout="wide", initial_sidebar_state="auto", menu_items={ 'Get Help': helpURL, 'Report a bug': bugURL, 'About': title } ) # Cosmos DB & App URLs ENDPOINT = "https://acae-afd.documents.azure.com:443/" DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") Key = os.environ.get("Key") LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI" CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer' # ============================================================================= # ───────────── HELPER FUNCTIONS ───────────── # ============================================================================= def get_download_link(file_path): with open(file_path, "rb") as file: contents = file.read() b64 = base64.b64encode(contents).decode() file_name = os.path.basename(file_path) return f'Download {file_name} 📂' def generate_unique_id(): timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') unique_uuid = str(uuid.uuid4()) return_value = f"{timestamp}-{unique_uuid}" st.write('New ID: ' + return_value) return return_value def generate_filename(prompt, file_type): central = pytz.timezone('US/Central') safe_date_time = datetime.now(central).strftime("%m%d_%H%M") safe_prompt = re.sub(r'\W+', '', prompt)[:90] return f"{safe_date_time}{safe_prompt}.{file_type}" def create_file(filename, prompt, response, should_save=True): if not should_save: return with open(filename, 'w', encoding='utf-8') as file: file.write(prompt + "\n\n" + response) def load_file(file_name): with open(file_name, "r", encoding='utf-8') as file: content = file.read() return content def display_glossary_entity(k): search_urls = { "🚀": lambda k: f"/?q={k}", "📖": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", "🔍": lambda k: f"https://www.google.com/search?q={quote(k)}", "🎥": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", } links_md = ' '.join([f"{emoji}" for emoji, url in search_urls.items()]) st.markdown(f"{k} {links_md}", unsafe_allow_html=True) def create_zip_of_files(files): zip_name = "all_files.zip" with zipfile.ZipFile(zip_name, 'w') as zipf: for file in files: zipf.write(file) return zip_name def get_video_html(video_path, width="100%"): video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" return f''' ''' def get_audio_html(audio_path, width="100%"): audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" return f''' ''' def preprocess_text(text): text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n') text = text.replace('"', '\\"') text = re.sub(r'[\t]', ' ', text) text = re.sub(r'[^\x00-\x7F]+', '', text) return text.strip() # ============================================================================= # ───────────── COSMOS DB FUNCTIONS ───────────── # ============================================================================= def get_databases(client): return [db['id'] for db in client.list_databases()] def get_containers(database): return [container['id'] for container in database.list_containers()] def get_documents(container, limit=None): query = "SELECT * FROM c ORDER BY c._ts DESC" items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) return items def insert_record(container, record): try: container.create_item(body=record) return True, "Inserted! 🎉" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error: {str(e)} 🚨" except Exception as e: return False, f"Error: {str(e)} 😱" def update_record(container, updated_record): try: container.upsert_item(body=updated_record) return True, f"Updated {updated_record['id']} 🛠️" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error: {str(e)} 🚨" except Exception as e: return False, f"Error: {traceback.format_exc()} 😱" def delete_record(container, record): try: if "id" not in record: return False, "Record must contain an 'id' field. 🛑" doc_id = record["id"] if "delete_log" not in st.session_state: st.session_state.delete_log = [] st.session_state.delete_log.append(f"Attempting to delete document: {json.dumps(record, indent=2)}") partition_key_value = record.get("pk", doc_id) st.session_state.delete_log.append(f"Using ID and Partition Key: {partition_key_value}") container.delete_item(item=doc_id, partition_key=partition_key_value) success_msg = f"Record {doc_id} successfully deleted from Cosmos DB. 🗑️" st.session_state.delete_log.append(success_msg) return True, success_msg except exceptions.CosmosResourceNotFoundError: success_msg = f"Record {doc_id} not found in Cosmos DB (already deleted or never existed). 🗑️" st.session_state.delete_log.append(success_msg) return True, success_msg except exceptions.CosmosHttpResponseError as e: error_msg = f"HTTP error deleting {doc_id}: {str(e)}. 🚨" st.session_state.delete_log.append(error_msg) return False, error_msg except Exception as e: error_msg = f"Unexpected error deleting {doc_id}: {str(traceback.format_exc())}. 😱" st.session_state.delete_log.append(error_msg) return False, error_msg def save_to_cosmos_db(container, query, response1, response2): try: if container: timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') unique_uuid = str(uuid.uuid4()) new_id = f"{timestamp}-{unique_uuid}" record = { "id": new_id, "pk": new_id, "name": new_id, "query": query, "response1": response1, "response2": response2, "timestamp": datetime.utcnow().isoformat(), "type": "ai_response", "version": "1.0" } container.create_item(body=record) st.success(f"Saved: {record['id']}") st.session_state.documents = get_documents(container) else: st.error("Cosmos container not initialized.") except Exception as e: st.error(f"Save error: {str(e)}") def archive_current_container(database_name, container_name, client): try: base_dir = "./cosmos_archive_current_container" if os.path.exists(base_dir): shutil.rmtree(base_dir) os.makedirs(base_dir) db_client = client.get_database_client(database_name) container_client = db_client.get_container_client(container_name) items = list(container_client.read_all_items()) container_dir = os.path.join(base_dir, container_name) os.makedirs(container_dir) for item in items: item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: json.dump(item, f, indent=2) archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" shutil.make_archive(archive_name, 'zip', base_dir) return get_download_link(f"{archive_name}.zip") except Exception as e: return f"Archive error: {str(e)} 😢" # ============================================================================= # ───────────── ADVANCED COSMOS FUNCTIONS ───────────── # ============================================================================= def create_new_container(database, container_id, partition_key_path, analytical_storage_ttl=None, indexing_policy=None, vector_embedding_policy=None): try: if analytical_storage_ttl is not None: container = database.create_container( id=container_id, partition_key=PartitionKey(path=partition_key_path), analytical_storage_ttl=analytical_storage_ttl, indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy ) else: container = database.create_container( id=container_id, partition_key=PartitionKey(path=partition_key_path), indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy ) except exceptions.CosmosHttpResponseError as e: if analytical_storage_ttl is not None and "analyticalStorageTtl" in str(e): try: container = database.create_container( id=container_id, partition_key=PartitionKey(path=partition_key_path), indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy ) except Exception as e2: st.error(f"Error creating container without analytical_storage_ttl: {str(e2)}") return None elif isinstance(e, exceptions.CosmosResourceExistsError): container = database.get_container_client(container_id) else: st.error(f"Error creating container: {str(e)}") return None return container def advanced_insert_item(container, item): try: container.upsert_item(item) return True, f"Item {item.get('id', '')} inserted. ➕" except Exception as e: return False, str(e) def advanced_update_item(container, item): try: container.upsert_item(item) return True, f"Item {item.get('id', '')} updated. ✏️" except Exception as e: return False, str(e) def advanced_delete_item(container, item_id, partition_key_value): try: container.delete_item(item=item_id, partition_key=partition_key_value) return True, f"Item {item_id} deleted. 🗑️" except Exception as e: return False, str(e) def vector_search(container, query_vector, vector_field, top=10, exact_search=False): query_vector_str = json.dumps(query_vector) query = f"""SELECT TOP {top} c.id, VectorDistance(c.{vector_field}, {query_vector_str}, {str(exact_search).lower()}, {{'dataType':'float32','distanceFunction':'cosine'}}) AS SimilarityScore FROM c ORDER BY SimilarityScore""" results = list(container.query_items(query=query, enable_cross_partition_query=True)) return results # ============================================================================= # ───────────── GITHUB FUNCTIONS ───────────── # ============================================================================= def download_github_repo(url, local_path): if os.path.exists(local_path): shutil.rmtree(local_path) Repo.clone_from(url, local_path) def create_zip_file(source_dir, output_filename): shutil.make_archive(output_filename, 'zip', source_dir) def create_repo(g, repo_name): user = g.get_user() return user.create_repo(repo_name) def push_to_github(local_path, repo, github_token): repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" local_repo = Repo(local_path) if 'origin' in [remote.name for remote in local_repo.remotes]: origin = local_repo.remote('origin') origin.set_url(repo_url) else: origin = local_repo.create_remote('origin', repo_url) if not local_repo.heads: local_repo.git.checkout('-b', 'main') current_branch = 'main' else: current_branch = local_repo.active_branch.name local_repo.git.add(A=True) if local_repo.is_dirty(): local_repo.git.commit('-m', 'Initial commit') origin.push(refspec=f'{current_branch}:{current_branch}') # ============================================================================= # ───────────── FILE & MEDIA MANAGEMENT FUNCTIONS ───────────── # ============================================================================= def display_saved_files_in_sidebar(): all_files = sorted([f for f in glob.glob("*.md") if not f.lower().startswith('readme')], reverse=True) st.sidebar.markdown("## 📁 Files") for file in all_files: col1, col2, col3 = st.sidebar.columns([6, 2, 1]) with col1: st.markdown(f"📄 {file}") with col2: st.sidebar.download_button( label="⬇️", data=open(file, 'rb').read(), file_name=file ) with col3: if st.sidebar.button("🗑", key=f"delete_{file}"): os.remove(file) st.rerun() def display_file_viewer(file_path): content = load_file(file_path) if content: st.markdown("### 📄 File Viewer") st.markdown(f"**{file_path}**") file_stats = os.stat(file_path) st.markdown(f"**Mod:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')} | **Size:** {file_stats.st_size} bytes") st.markdown("---") st.markdown(content) st.download_button("⬇️", data=content, file_name=os.path.basename(file_path), mime="text/markdown") def display_file_editor(file_path): if 'file_content' not in st.session_state: st.session_state.file_content = {} if file_path not in st.session_state.file_content: content = load_file(file_path) if content is not None: st.session_state.file_content[file_path] = content else: return st.markdown("### ✏️ Edit File") st.markdown(f"**Editing:** {file_path}") md_tab, code_tab = st.tabs(["Markdown", "Code"]) with md_tab: st.markdown(st.session_state.file_content[file_path]) with code_tab: new_content = st.text_area("Edit:", value=st.session_state.file_content[file_path], height=400, key=f"editor_{hash(file_path)}", on_change=lambda: auto_save_edit()) col1, col2 = st.columns([1, 5]) with col1: if st.button("💾 Save"): if save_file_content(file_path, new_content): st.session_state.file_content[file_path] = new_content st.success("Saved! 🎉") time.sleep(1) st.rerun() with col2: st.download_button("⬇️", data=new_content, file_name=os.path.basename(file_path), mime="text/markdown") def save_file_content(file_path, content): try: with open(file_path, 'w', encoding='utf-8') as file: file.write(content) return True except Exception as e: st.error(f"Save error: {str(e)}") return False def update_file_management_section(): if 'file_view_mode' not in st.session_state: st.session_state.file_view_mode = None if 'current_file' not in st.session_state: st.session_state.current_file = None if 'file_content' not in st.session_state: st.session_state.file_content = {} all_files = sorted(glob.glob("*.md"), reverse=True) st.sidebar.title("📁 Files") if st.sidebar.button("🗑 Delete All"): for file in all_files: os.remove(file) st.session_state.file_content = {} st.session_state.current_file = None st.session_state.file_view_mode = None st.rerun() if st.sidebar.button("⬇️ Download All"): zip_file = create_zip_of_files(all_files) st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True) for file in all_files: col1, col2, col3, col4 = st.sidebar.columns([1, 3, 1, 1]) with col1: if st.button("🌐", key=f"view_{file}"): st.session_state.current_file = file st.session_state.file_view_mode = 'view' if file not in st.session_state.file_content: content = load_file(file) if content is not None: st.session_state.file_content[file] = content st.rerun() with col2: st.markdown(get_download_link(file), unsafe_allow_html=True) with col3: if st.button("📂", key=f"edit_{file}"): st.session_state.current_file = file st.session_state.file_view_mode = 'edit' if file not in st.session_state.file_content: content = load_file(file) if content is not None: st.session_state.file_content[file] = content st.rerun() with col4: if st.button("🗑", key=f"delete_{file}"): os.remove(file) if file in st.session_state.file_content: del st.session_state.file_content[file] if st.session_state.current_file == file: st.session_state.current_file = None st.session_state.file_view_mode = None st.rerun() st.sidebar.markdown("---") st.sidebar.title("External Help Links") for link in external_links: st.sidebar.markdown(f"{link['emoji']} [{link['title']}]({link['url']})", unsafe_allow_html=True) if st.session_state.current_file: if st.session_state.file_view_mode == 'view': display_file_viewer(st.session_state.current_file) elif st.session_state.file_view_mode == 'edit': display_file_editor(st.session_state.current_file) # ============================================================================= # ───────────── SIDEBAR DATA GRID (Records with formatted timestamps) ───────────── # ============================================================================= def show_sidebar_data_grid(): if st.session_state.get("current_container"): try: records = get_documents(st.session_state.current_container) data = [] for rec in records: ts = rec.get("timestamp", "") try: dt = datetime.fromisoformat(ts) formatted = dt.strftime("%I:%M %p %m/%d/%Y") except Exception: formatted = ts data.append({ "ID": rec.get("id", ""), "Name": rec.get("name", ""), "Timestamp": formatted }) df = pd.DataFrame(data) st.sidebar.markdown("### 📊 Data Grid") st.sidebar.dataframe(df) except Exception as e: st.sidebar.error(f"Data grid error: {str(e)}") else: st.sidebar.info("No container selected for data grid.") # ============================================================================= # ───────────── VIDEO & AUDIO UI FUNCTIONS ───────────── # ============================================================================= def validate_and_preprocess_image(file_data, target_size=(576, 1024)): try: st.write("Preprocessing image...") if isinstance(file_data, bytes): img = Image.open(io.BytesIO(file_data)) elif hasattr(file_data, 'read'): if hasattr(file_data, 'seek'): file_data.seek(0) img = Image.open(file_data) elif isinstance(file_data, Image.Image): img = file_data else: raise ValueError(f"Unsupported input: {type(file_data)}") if img.mode != 'RGB': img = img.convert('RGB') aspect_ratio = img.size[0] / img.size[1] if aspect_ratio > target_size[0] / target_size[1]: new_width = target_size[0] new_height = int(new_width / aspect_ratio) else: new_height = target_size[1] new_width = int(new_height * aspect_ratio) new_width = (new_width // 2) * 2 new_height = (new_height // 2) * 2 resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) final_img = Image.new('RGB', target_size, (255, 255, 255)) paste_x = (target_size[0] - new_width) // 2 paste_y = (target_size[1] - new_height) // 2 final_img.paste(resized_img, (paste_x, paste_y)) return final_img except Exception as e: st.error(f"Image error: {str(e)}") return None def add_video_generation_ui(container): st.markdown("### 🎥 Video Gen") col1, col2 = st.columns([2, 1]) with col1: uploaded_file = st.file_uploader("Upload Image 🖼️", type=['png', 'jpg', 'jpeg']) with col2: st.markdown("#### Params") motion = st.slider("🌊 Motion", 1, 255, 127) fps = st.slider("🎬 FPS", 1, 30, 6) with st.expander("Advanced"): use_custom = st.checkbox("Custom Seed") seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None if uploaded_file is not None: try: file_data = uploaded_file.read() preview1, preview2 = st.columns(2) with preview1: st.write("Original") st.image(Image.open(io.BytesIO(file_data)), use_column_width=True) with preview2: proc_img = validate_and_preprocess_image(io.BytesIO(file_data)) if proc_img: st.write("Processed") st.image(proc_img, use_column_width=True) else: st.error("Preprocess failed") return if st.button("🎥 Generate"): with st.spinner("Generating video..."): with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file: proc_img.save(temp_file.name, format='PNG') try: client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN")) result = client.predict( image=temp_file.name, seed=seed if seed is not None else int(time.time() * 1000), randomize_seed=seed is None, motion_bucket_id=motion, fps_id=fps, api_name="/video" ) if result and isinstance(result, tuple) and len(result) >= 1: video_path = result[0].get('video') if isinstance(result[0], dict) else None if video_path and os.path.exists(video_path): video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" shutil.copy(video_path, video_filename) st.success(f"Video generated! 🎉") st.video(video_filename) if container: video_record = { "id": generate_unique_id(), "pk": generate_unique_id(), "type": "generated_video", "filename": video_filename, "seed": seed if seed is not None else "random", "motion": motion, "fps": fps, "timestamp": datetime.now().isoformat() } success, message = insert_record(container, video_record) if success: st.success("DB record saved!") else: st.error(f"DB error: {message}") else: st.error("Invalid result format") else: st.error("No result returned") except Exception as e: st.error(f"Video gen error: {str(e)}") finally: try: os.unlink(temp_file.name) st.write("Temp file removed") except Exception as e: st.warning(f"Cleanup error: {str(e)}") except Exception as e: st.error(f"Upload error: {str(e)}") # ============================================================================= # ───────────── AI SAMPLES SIDEBAR (Processed as a Python List) ───────────── # ============================================================================= def display_ai_samples(): ai_samples = [ { "name": "FullTextContains", "description": "Query using FullTextContains", "query": 'SELECT TOP 10 * FROM c WHERE FullTextContains(c.text, "bicycle")' }, { "name": "FullTextContainsAll", "description": "Query using FullTextContainsAll", "query": 'SELECT TOP 10 * FROM c WHERE FullTextContainsAll(c.text, "red", "bicycle")' }, { "name": "FullTextContainsAny", "description": "Query using FullTextContainsAny", "query": 'SELECT TOP 10 * FROM c WHERE FullTextContains(c.text, "red") AND FullTextContainsAny(c.text, "bicycle", "skateboard")' }, { "name": "FullTextScore", "description": "Query using FullTextScore (order by relevance)", "query": 'SELECT TOP 10 * FROM c ORDER BY RANK FullTextScore(c.text, ["bicycle", "mountain"])' }, { "name": "Vector Search with Score", "description": "Example vector search snippet", "query": 'results = vector_search.similarity_search_with_score(query="Your query", k=5)\nfor result, score in results:\n print(result.json(), score)' }, { "name": "Vector Search with Filtering", "description": "Example vector search with a filter", "query": 'pre_filter = {"conditions": [{"property": "metadata.page", "operator": "$eq", "value": 0}]}\nresults = vector_search.similarity_search_with_score(query="Your query", k=5, pre_filter=pre_filter)' }, { "name": "Hybrid Search", "description": "Example hybrid search snippet", "query": 'results = vector_search.similarity_search_with_score(query="Your query", k=5, query_type=CosmosDBQueryType.HYBRID)' } ] st.sidebar.markdown("### 🤖 AI Samples") st.sidebar.info("🚀 Get started with our AI samples! Time free access to get started today.") sample_names = [sample["name"] for sample in ai_samples] selected_sample_name = st.sidebar.selectbox("Select an AI Sample", sample_names) selected_sample = next((s for s in ai_samples if s["name"] == selected_sample_name), None) if selected_sample: st.sidebar.markdown(f"**{selected_sample['name']}**: {selected_sample['description']}") lang = "sql" if "FullText" in selected_sample["name"] else "python" st.sidebar.code(selected_sample["query"], language=lang) # ============================================================================= # ───────────── NEW ITEM & FIELD FUNCTIONS # ============================================================================= def new_item_default(container): new_id = generate_unique_id() default_doc = { "id": new_id, "pk": new_id, "name": "New Sample Document", "content": "Start editing your document here...", "timestamp": datetime.now().isoformat(), "type": "sample" } success, message = insert_record(container, default_doc) if success: st.success("New sample document created! ✨") return default_doc else: st.error("Error creating new item: " + message) return None def auto_save_edit(): try: edited_str = st.session_state.doc_editor edited_doc = json.loads(edited_str) container = st.session_state.current_container container.upsert_item(edited_doc) st.success("Auto-saved! 💾") except Exception as e: st.error(f"Auto-save error: {str(e)}") def add_field_to_doc(): key = st.session_state.new_field_key value = st.session_state.new_field_value try: doc = json.loads(st.session_state.doc_editor) doc[key] = value st.session_state.doc_editor = json.dumps(doc, indent=2) auto_save_edit() st.success(f"Added field {key} 👍") except Exception as e: st.error(f"Error adding field: {str(e)}") # ============================================================================= # ───────────── VECTOR SEARCH INTERFACE (Simple keyword search) # ============================================================================= def vector_keyword_search(keyword, container): try: query = f"SELECT * FROM c WHERE CONTAINS(c.content, '{keyword}')" results = list(container.query_items(query=query, enable_cross_partition_query=True)) return results except Exception as e: st.error(f"Vector search error: {str(e)}") return [] # ============================================================================= # ───────────── NEW AI MODALITY RECORD TEMPLATES # ============================================================================= def new_ai_record(container): new_id = generate_unique_id() default_doc = { "id": new_id, "pk": new_id, "name": "AI Modality Record", "function_url": "https://example.com/function", "input_text": "### Input (markdown)\n\nType your input here.", "output_text": "### Output (markdown)\n\nResult will appear here.", "timestamp": datetime.now().isoformat(), "type": "ai_modality" } success, message = insert_record(container, default_doc) if success: st.success("New AI modality record created! 💡") return default_doc else: st.error("Error creating AI record: " + message) return None def new_links_record(container): new_id = generate_unique_id() links_md = "\n".join([f"- {link['emoji']} [{link['title']}]({link['url']})" for link in external_links]) default_doc = { "id": new_id, "pk": new_id, "name": "Portal Links Record", "function_url": "", "input_text": links_md, "output_text": "", "timestamp": datetime.now().isoformat(), "type": "ai_modality" } success, message = insert_record(container, default_doc) if success: st.success("New Portal Links record created! 🔗") return default_doc else: st.error("Error creating links record: " + message) return None # ============================================================================= # ───────────── LANGCHAIN FUNCTIONS (Witty emoji comments) # ============================================================================= def display_langchain_functions(): functions = [ {"name": "OpenAIEmbeddings", "comment": "🔮 Creates embeddings using OpenAI – pure magic!"}, {"name": "AzureCosmosDBNoSqlVectorSearch", "comment": "🚀 Performs vector search on Cosmos DB – superfast and smart!"}, {"name": "RecursiveCharacterTextSplitter", "comment": "✂️ Slices text into manageable chunks – like a pro chef!"} ] st.sidebar.markdown("### 🤖 Langchain Functions") for func in functions: st.sidebar.write(f"{func['name']}: {func['comment']}") # ============================================================================= # ───────────── OPTIONAL: SIDEBAR DATA GRID (Records with formatted timestamps) # ============================================================================= # (This feature is now integrated above via show_sidebar_data_grid().) # ============================================================================= # ───────────── ASYNC TTS & ARXIV FUNCTIONS (Optional Features) # ============================================================================= import asyncio import edge_tts from streamlit_marquee import streamlit_marquee from collections import Counter class PerformanceTimer: def __init__(self, operation_name: str): self.operation_name = operation_name self.start_time = None def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): pass async def async_edge_tts_generate(text: str, voice: str, rate: int = 0, pitch: int = 0, file_format: str = "mp3"): with PerformanceTimer("tts_generation") as timer: text = text.replace("\n", " ").strip() if not text: return None, 0 cache_key = f"{text[:100]}_{voice}_{rate}_{pitch}_{file_format}" if cache_key in st.session_state.get('audio_cache', {}): return st.session_state['audio_cache'][cache_key], 0 try: rate_str = f"{rate:+d}%" pitch_str = f"{pitch:+d}Hz" communicate = edge_tts.Communicate(text, voice, rate=rate_str, pitch=pitch_str) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"audio_{timestamp}_{random.randint(1000, 9999)}.{file_format}" await communicate.save(filename) st.session_state.setdefault('audio_cache', {})[cache_key] = filename return filename, time.time() - timer.start_time except Exception as e: st.error(f"Error generating audio: {str(e)}") return None, 0 def speak_with_edge_tts(text, voice="en-US-AriaNeural", rate=0, pitch=0, file_format="mp3"): result = asyncio.run(async_edge_tts_generate(text, voice, rate, pitch, file_format)) if isinstance(result, tuple): return result[0] return result async def async_save_qa_with_audio(question: str, answer: str): with PerformanceTimer("qa_save") as timer: md_file = create_file(question, answer, "md") audio_file = None if st.session_state.get('enable_audio', True): audio_text = f"{question}\n\nAnswer: {answer}" audio_file, _ = await async_edge_tts_generate(audio_text, voice=st.session_state.get('tts_voice', "en-US-AriaNeural"), file_format=st.session_state.get('audio_format', "mp3")) return md_file, audio_file, time.time() - timer.start_time, 0 def save_qa_with_audio(question, answer, voice=None): if not voice: voice = st.session_state.get('tts_voice', "en-US-AriaNeural") md_file = create_file(question, answer, "md") audio_text = f"{question}\n\nAnswer: {answer}" audio_file = speak_with_edge_tts(audio_text, voice=voice, file_format=st.session_state.get('audio_format', "mp3")) return md_file, audio_file def play_and_download_audio(file_path, file_type="mp3"): if file_path and os.path.exists(file_path): st.audio(file_path) dl_link = get_download_link(file_path, file_type=file_type) st.markdown(dl_link, unsafe_allow_html=True) def create_download_link_with_cache(file_path: str, file_type: str = "mp3") -> str: cache_key = f"dl_{file_path}" if cache_key in st.session_state.get('download_link_cache', {}): return st.session_state['download_link_cache'][cache_key] try: with open(file_path, "rb") as f: b64 = base64.b64encode(f.read()).decode() filename = os.path.basename(file_path) if file_type == "mp3": link = f'🎵 Download {filename}' elif file_type == "wav": link = f'🔊 Download {filename}' elif file_type == "md": link = f'📝 Download {filename}' else: link = f'Download {filename}' st.session_state.setdefault('download_link_cache', {})[cache_key] = link return link except Exception as e: st.error(f"Error creating download link: {str(e)}") return "" # ============================================================================= # ───────────── RESEARCH / ARXIV FUNCTIONS (Optional Features) # ============================================================================= def parse_arxiv_refs(ref_text: str): if not ref_text: return [] results = [] current_paper = {} lines = ref_text.split('\n') for i, line in enumerate(lines): if line.count('|') == 2: if current_paper: results.append(current_paper) if len(results) >= 20: break try: header_parts = line.strip('* ').split('|') date = header_parts[0].strip() title = header_parts[1].strip() url_match = re.search(r'(https://arxiv.org/\S+)', line) url = url_match.group(1) if url_match else f"paper_{len(results)}" current_paper = { 'date': date, 'title': title, 'url': url, 'authors': '', 'summary': '', 'full_audio': None, 'download_base64': '', } except Exception as e: st.warning(f"Error parsing paper header: {str(e)}") current_paper = {} continue elif current_paper: if not current_paper['authors']: current_paper['authors'] = line.strip('* ') else: if current_paper['summary']: current_paper['summary'] += ' ' + line.strip() else: current_paper['summary'] = line.strip() if current_paper: results.append(current_paper) return results[:20] def create_paper_links_md(papers): lines = ["# Paper Links\n"] for i, p in enumerate(papers, start=1): lines.append(f"{i}. **{p['title']}** — [Arxiv Link]({p['url']})") return "\n".join(lines) def generate_pdf_link(url: str) -> str: if "abs" in url: pdf_url = url.replace("abs", "pdf") if not pdf_url.endswith(".pdf"): pdf_url += ".pdf" return pdf_url return url def generate_5min_feature_markdown(paper: dict) -> str: title = paper.get('title', '') summary = paper.get('summary', '') authors = paper.get('authors', '') date = paper.get('date', '') url = paper.get('url', '') pdf_link = generate_pdf_link(url) title_wc = len(title.split()) summary_wc = len(summary.split()) high_info_terms = [term for term in summary.split()[:5]] terms_str = ", ".join(high_info_terms) rouge_score = round((len(high_info_terms) / max(len(summary.split()), 1)) * 100, 2) mermaid_code = "```mermaid\nflowchart TD\n" for i in range(len(high_info_terms) - 1): mermaid_code += f' T{i+1}["{high_info_terms[i]}"] --> T{i+2}["{high_info_terms[i+1]}"]\n' mermaid_code += "```" md = f""" ## {title} **Authors:** {authors} **Date:** {date} **Word Count (Title):** {title_wc} | **Word Count (Summary):** {summary_wc} **Links:** [Abstract]({url}) | [PDF]({pdf_link}) **High Info Terms:** {terms_str} **ROUGE Score:** {rouge_score}% ### Mermaid Graph of Key Concepts {mermaid_code} --- """ return md def create_detailed_paper_md(papers: list) -> str: md_parts = ["# Detailed Research Paper Summary\n"] for idx, paper in enumerate(papers, start=1): md_parts.append(generate_5min_feature_markdown(paper)) return "\n".join(md_parts) # ============================================================================= # ───────────────────────────────────────────────────────── # MAIN AI LOOKUP FUNCTION (Optional Features) # ============================================================================= def perform_ai_lookup(q, vocal_summary=True, extended_refs=False, titles_summary=True, full_audio=False, useArxiv=True, useArxivAudio=False): start = time.time() ai_constitution = """ You are a medical and machine learning review board expert... """ # 1) Claude API call import anthropic client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY_3")) user_input = q response = client.messages.create( model="claude-3-sonnet-20240229", max_tokens=1000, messages=[{"role": "user", "content": user_input}] ) st.write("Claude's reply 🧠:") st.markdown(response.content[0].text) result = response.content[0].text create_file(q, result, "md") md_file, audio_file = save_qa_with_audio(q, result) st.subheader("📝 Main Response Audio") play_and_download_audio(audio_file, st.session_state.get('audio_format', "mp3")) if useArxiv: q = q + result st.write('Running Arxiv RAG with Claude inputs.') client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") refs = client.predict(q, 10, "Semantic Search", "mistralai/Mixtral-8x7B-Instruct-v0.1", api_name="/update_with_rag_md")[0] result = f"🔎 {q}\n\n{refs}" md_file, audio_file = save_qa_with_audio(q, result) st.subheader("📝 Main Response Audio") play_and_download_audio(audio_file, st.session_state.get('audio_format', "mp3")) papers = parse_arxiv_refs(refs) if papers: paper_links = create_paper_links_md(papers) links_file = create_file(q, paper_links, "md") st.markdown(paper_links) detailed_md = create_detailed_paper_md(papers) detailed_file = create_file(q, detailed_md, "md") st.markdown(detailed_md) if useArxivAudio: asyncio.run(async_edge_tts_generate("Sample text", st.session_state.get('tts_voice', "en-US-AriaNeural"))) st.write("Displaying Papers:") # (Optional: call functions to display papers) else: st.warning("No papers found.") response2 = client.messages.create( model="claude-3-sonnet-20240229", max_tokens=1000, messages=[{"role": "user", "content": q + '\n\nUse the reference papers below to answer the question by creating a python streamlit app.py and requirements.txt with working code.'}] ) r2 = response2.content[0].text st.write("Claude's reply 🧠:") st.markdown(r2) elapsed = time.time() - start st.write(f"**Total Elapsed:** {elapsed:.2f} s") return result # ============================================================================= # ───────────── MAIN FUNCTION ───────────── # ============================================================================= def main(): st.markdown("### 🐙 GitCosmos - Cosmos & Git Hub") st.markdown(f"[🔗 Portal]({CosmosDBUrl})") if "chat_history" not in st.session_state: st.session_state.chat_history = [] st.session_state.setdefault("current_container", None) if Key: st.session_state.primary_key = Key st.session_state.logged_in = True else: st.error("Missing Cosmos Key 🔑❌") return st.sidebar.markdown("## 🛠️ Item Management") if st.sidebar.button("New Item"): if st.session_state.get("current_container"): new_doc = new_item_default(st.session_state.current_container) if new_doc: st.session_state.doc_editor = json.dumps(new_doc, indent=2) else: st.warning("No container selected!") st.sidebar.text_input("New Field Key", key="new_field_key") st.sidebar.text_input("New Field Value", key="new_field_value") if st.sidebar.button("Add Field"): if "doc_editor" in st.session_state: add_field_to_doc() else: st.warning("No document loaded to add a field.") if st.sidebar.button("New AI Record"): if st.session_state.get("current_container"): new_ai_record(st.session_state.current_container) else: st.warning("No container selected!") if st.sidebar.button("New Links Record"): if st.session_state.get("current_container"): new_links_record(st.session_state.current_container) else: st.warning("No container selected!") st.sidebar.markdown("## 🔍 Vector Search") search_keyword = st.sidebar.text_input("Search Keyword", key="vector_search_keyword") if st.sidebar.button("Search"): if st.session_state.get("current_container"): results = vector_keyword_search(search_keyword, st.session_state.current_container) st.sidebar.write(f"Found {len(results)} results:") for res in results: st.sidebar.code(json.dumps(res, indent=2), language="json") else: st.warning("No container selected for search!") show_sidebar_data_grid() display_langchain_functions() try: if st.session_state.get("client") is None: st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) st.sidebar.title("🐙 Navigator") databases = get_databases(st.session_state.client) selected_db = st.sidebar.selectbox("🗃️ DB", databases) st.markdown(CosmosDBUrl) if selected_db != st.session_state.get("selected_database"): st.session_state.selected_database = selected_db st.session_state.selected_container = None st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() if st.session_state.selected_database: database = st.session_state.client.get_database_client(st.session_state.selected_database) if "show_new_container_form" not in st.session_state: st.session_state.show_new_container_form = False if st.sidebar.button("🆕 New Container"): st.session_state.show_new_container_form = True if st.session_state.show_new_container_form: with st.sidebar.form("new_container_form"): new_container_id = st.text_input("Container ID", value="aiml-container") new_partition_key = st.text_input("Partition Key", value="/pk") new_analytical = st.checkbox("Enable Analytical Store", value=True) submitted = st.form_submit_button("Create Container") if submitted: analytical_ttl = -1 if new_analytical else None new_container = create_new_container(database, new_container_id, new_partition_key, analytical_storage_ttl=analytical_ttl) if new_container: st.success(f"Container '{new_container_id}' created.") default_id = generate_unique_id() default_item = { "id": default_id, "pk": default_id, "name": "Default Image Prompt", "prompt": "Enter your image prompt here", "timestamp": datetime.now().isoformat(), "type": "image_prompt" } insert_success, insert_message = insert_record(new_container, default_item) if insert_success: st.info("Default templated item created in new container.") else: st.error(f"Default item insertion error: {insert_message}") st.session_state.show_new_container_form = False st.session_state.new_container_created = new_container_id st.rerun() containers = get_containers(database) if "new_container_created" in st.session_state and st.session_state.new_container_created not in containers: containers.append(st.session_state.new_container_created) selected_container = st.sidebar.selectbox("📁 Container", containers) if selected_container != st.session_state.get("selected_container"): st.session_state.selected_container = selected_container st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() if st.session_state.selected_container: container = database.get_container_client(st.session_state.selected_container) st.session_state.current_container = container if st.sidebar.button("📦 Export"): download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client) if download_link.startswith(' num_docs else documents st.sidebar.info(f"Showing {len(documents_to_display)} docs") view_options = ['Markdown', 'Code', 'Run AI', 'Clone', 'New'] selected_view = st.sidebar.selectbox("View", view_options, index=1) if selected_view == 'Markdown': st.markdown("#### 📄 Markdown") if documents: doc = documents[st.session_state.current_index] content = json.dumps(doc, indent=2) st.markdown(f"```json\n{content}\n```") col_prev, col_next = st.columns(2) with col_prev: if st.button("⬅️") and st.session_state.current_index > 0: st.session_state.current_index -= 1 st.rerun() with col_next: if st.button("➡️") and st.session_state.current_index < total_docs - 1: st.session_state.current_index += 1 st.rerun() elif selected_view == 'Code': st.markdown("#### 💻 Code Editor") if documents: doc = documents[st.session_state.current_index] if "doc_editor" not in st.session_state: st.session_state.doc_editor = json.dumps(doc, indent=2) edited = st.text_area("Edit JSON", value=st.session_state.doc_editor, height=300, key="doc_editor", on_change=lambda: auto_save_edit()) col_prev, col_next = st.columns(2) with col_prev: if st.button("⬅️") and st.session_state.current_index > 0: st.session_state.current_index -= 1 st.rerun() with col_next: if st.button("➡️") and st.session_state.current_index < total_docs - 1: st.session_state.current_index += 1 st.rerun() col_save, col_delete = st.columns(2) with col_save: if st.button("💾 Save", key=f'save_{st.session_state.current_index}'): try: updated_doc = json.loads(edited) container.upsert_item(body=updated_doc) st.success(f"Saved {updated_doc['id']}") st.rerun() except Exception as e: st.error(f"Save err: {str(e)}") with col_delete: if st.button("🗑️ Delete", key=f'delete_{st.session_state.current_index}'): try: current_doc = json.loads(edited) success, message = delete_record(container, current_doc) if success: st.success(message) st.rerun() else: st.error(message) except Exception as e: st.error(f"Delete err: {str(e)}") if "delete_log" in st.session_state and st.session_state.delete_log: st.subheader("Delete Log") for log_entry in st.session_state.delete_log[-5:]: st.write(log_entry) elif selected_view == 'Run AI': st.markdown("#### 🤖 Run AI") ai_query = st.text_area("Enter your query for ArXiv search:", key="arxiv_query", height=100) if st.button("Send"): st.session_state.last_query = ai_query perform_ai_lookup(ai_query, vocal_summary=True, extended_refs=False, titles_summary=True, full_audio=True, useArxiv=True, useArxivAudio=False) elif selected_view == 'Clone': st.markdown("#### 📄 Clone") if documents: doc = documents[st.session_state.current_index] st.markdown(f"Original ID: {doc.get('id', '')}") new_id = st.text_input("New ID", value=generate_unique_id(), key='new_clone_id') new_name = st.text_input("New Name", value=f"Clone_{new_id[:8]}", key='new_clone_name') new_doc = {'id': new_id, 'pk': new_id, 'name': new_name, **{k: v for k, v in doc.items() if k not in ['id', 'name', 'pk', '_rid', '_self', '_etag', '_attachments', '_ts']}} doc_str = st.text_area("Edit JSON", value=json.dumps(new_doc, indent=2), height=300, key='clone_preview') col1, col2 = st.columns(2) with col1: if st.button("🔄 Regenerate"): new_id = generate_unique_id() st.session_state.new_clone_id = new_id st.rerun() with col2: if st.button("💾 Save Clone"): try: final_doc = json.loads(doc_str) for field in ['_rid', '_self', '_etag', '_attachments', '_ts']: final_doc.pop(field, None) container.create_item(body=final_doc) st.success(f"Cloned {final_doc['id']}") st.rerun() except Exception as e: st.error(f"Clone err: {str(e)}") col_prev, col_next = st.columns(2) with col_prev: if st.button("⬅️") and st.session_state.current_index > 0: st.session_state.current_index -= 1 st.rerun() with col_next: if st.button("➡️") and st.session_state.current_index < total_docs - 1: st.session_state.current_index += 1 st.rerun() elif selected_view == 'New': st.markdown("#### ➕ New Doc") if st.button("🤖 Auto-Gen"): auto_doc = { "id": generate_unique_id(), "pk": generate_unique_id(), "name": f"Auto {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "content": "Auto-generated record.", "timestamp": datetime.now().isoformat() } success, message = insert_record(container, auto_doc) if success: st.success(message) st.rerun() else: st.error(message) else: new_id = st.text_input("ID", value=generate_unique_id(), key='new_id') default_doc = { "id": new_id, "pk": new_id, "name": "New Doc", "content": "", "timestamp": datetime.now().isoformat() } new_doc_str = st.text_area("JSON", value=json.dumps(default_doc, indent=2), height=300) if st.button("➕ Create"): try: cleaned = preprocess_text(new_doc_str) new_doc = json.loads(cleaned) new_doc['id'] = new_id new_doc['pk'] = new_id success, message = insert_record(container, new_doc) if success: st.success(f"Created {new_doc['id']}") st.rerun() else: st.error(message) except Exception as e: st.error(f"Create err: {str(e)}") st.subheader(f"📊 {st.session_state.selected_container}") if documents_to_display: df = pd.DataFrame(documents_to_display) st.dataframe(df) else: st.info("No docs.") update_file_management_section() except exceptions.CosmosHttpResponseError as e: st.error(f"Cosmos error: {str(e)} 🚨") except Exception as e: st.error(f"Error: {str(e)} 😱") if st.session_state.logged_in and st.sidebar.button("🚪 Logout"): st.markdown("#### 🚪 Logout") st.session_state.logged_in = False st.session_state.selected_records = [] st.session_state.client = None st.session_state.selected_database = None st.session_state.selected_container = None st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() show_sidebar_data_grid() if __name__ == "__main__": main()