# %% ───────────── IMPORTS ───────────── import base64 import glob import hashlib import json import os import pandas as pd import pytz import random import re import shutil import streamlit as st import time import traceback import uuid import zipfile from PIL import Image from azure.cosmos import CosmosClient, PartitionKey, exceptions from datetime import datetime from git import Repo from github import Github from gradio_client import Client, handle_file import tempfile import io import requests import numpy as np from urllib.parse import quote # External help links for AI tools external_links = [ {"title": "MergeKit Official GitHub", "url": "https://github.com/arcee-ai/MergeKit", "emoji": "💻"}, {"title": "MergeKit arXiv Paper", "url": "https://arxiv.org/abs/xxxx.xxxxx", "emoji": "📘"}, {"title": "MergeKit Tutorial", "url": "https://huggingface.co/blog/mergekit-tutorial", "emoji": "✍️"}, {"title": "MergeKit Sample Usage", "url": "https://github.com/arcee-ai/MergeKit#examples", "emoji": "📚"}, {"title": "DistillKit Official GitHub", "url": "https://github.com/arcee-ai/DistillKit", "emoji": "💻"}, {"title": "DistillKit Announcing Blog Post", "url": "https://arcee.ai/blog/distillkit-announcement", "emoji": "✍️"}, {"title": "DistillKit Sample Usage", "url": "https://github.com/arcee-ai/DistillKit#usage", "emoji": "📚"}, {"title": "Spectrum Hugging Face Blog Post", "url": "https://huggingface.co/blog/spectrum", "emoji": "✍️"}, {"title": "Hugging Face Model Merging Docs", "url": "https://huggingface.co/docs/peft/model_merging", "emoji": "📚"}, {"title": "arcee.ai Official Website", "url": "https://arcee.ai", "emoji": "🌐"}, ] # %% ───────────── APP CONFIGURATION ───────────── Site_Name = '🐙 GitCosmos' title = "🐙 GitCosmos" helpURL = 'https://huggingface.co/awacke1' bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/' icons = '🐙🌌💫' st.set_page_config( page_title=title, page_icon=icons, layout="wide", initial_sidebar_state="auto", menu_items={ 'Get Help': helpURL, 'Report a bug': bugURL, 'About': title } ) # Cosmos DB & App URLs ENDPOINT = "https://acae-afd.documents.azure.com:443/" DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") Key = os.environ.get("Key") LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI" CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer' # %% ───────────── HELPER FUNCTIONS ───────────── # 📎 Get download link from file def get_download_link(file_path): with open(file_path, "rb") as file: contents = file.read() b64 = base64.b64encode(contents).decode() file_name = os.path.basename(file_path) return f'Download {file_name} 📂' # 🆔 Generate a unique ID def generate_unique_id(): timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') unique_uuid = str(uuid.uuid4()) return_value = f"{timestamp}-{unique_uuid}" st.write('New ID: ' + return_value) return return_value # 📄 Generate a filename from a prompt def generate_filename(prompt, file_type): central = pytz.timezone('US/Central') safe_date_time = datetime.now(central).strftime("%m%d_%H%M") safe_prompt = re.sub(r'\W+', '', prompt)[:90] return f"{safe_date_time}{safe_prompt}.{file_type}" # 💾 Create and save a file with prompt & response def create_file(filename, prompt, response, should_save=True): if not should_save: return with open(filename, 'w', encoding='utf-8') as file: file.write(prompt + "\n\n" + response) # 📖 Load file content from disk def load_file(file_name): with open(file_name, "r", encoding='utf-8') as file: content = file.read() return content # 🔍 Display glossary entity with quick links def display_glossary_entity(k): search_urls = { "🚀": lambda k: f"/?q={k}", "📖": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", "🔍": lambda k: f"https://www.google.com/search?q={quote(k)}", "🎥": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", } links_md = ' '.join([f"{emoji}" for emoji, url in search_urls.items()]) st.markdown(f"{k} {links_md}", unsafe_allow_html=True) # 🗜️ Create zip of multiple files def create_zip_of_files(files): zip_name = "all_files.zip" with zipfile.ZipFile(zip_name, 'w') as zipf: for file in files: zipf.write(file) return zip_name # 🎬 Get HTML for video playback def get_video_html(video_path, width="100%"): video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" return f''' ''' # 🎵 Get HTML for audio playback def get_audio_html(audio_path, width="100%"): audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" return f''' ''' # 📝 Preprocess text for JSON safety def preprocess_text(text): text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n') text = text.replace('"', '\\"') text = re.sub(r'[\t]', ' ', text) text = re.sub(r'[^\x00-\x7F]+', '', text) return text.strip() # %% ───────────── COSMOS DB FUNCTIONS ───────────── # 📚 List all databases in Cosmos def get_databases(client): return [db['id'] for db in client.list_databases()] # 📦 List all containers in a database def get_containers(database): return [container['id'] for container in database.list_containers()] # 📄 Query documents from a container (most recent first) def get_documents(container, limit=None): query = "SELECT * FROM c ORDER BY c._ts DESC" items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) return items # 📥 Insert a new record into Cosmos def insert_record(container, record): try: container.create_item(body=record) return True, "Inserted! 🎉" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error: {str(e)} 🚨" except Exception as e: return False, f"Error: {str(e)} 😱" # 🔄 Update an existing Cosmos record def update_record(container, updated_record): try: container.upsert_item(body=updated_record) return True, f"Updated {updated_record['id']} 🛠️" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error: {str(e)} 🚨" except Exception as e: return False, f"Error: {traceback.format_exc()} 😱" # 🗑️ Delete a record from Cosmos DB using /id as partition key def delete_record(container, record): try: if "id" not in record: return False, "Record must contain an 'id' field. 🛑" doc_id = record["id"] if "delete_log" not in st.session_state: st.session_state.delete_log = [] st.session_state.delete_log.append(f"Attempting to delete document: {json.dumps(record, indent=2)}") partition_key_value = doc_id st.session_state.delete_log.append(f"Using ID and Partition Key: {partition_key_value}") container.delete_item(item=doc_id, partition_key=partition_key_value) success_msg = f"Record {doc_id} successfully deleted from Cosmos DB. 🗑️" st.session_state.delete_log.append(success_msg) return True, success_msg except exceptions.CosmosResourceNotFoundError: success_msg = f"Record {doc_id} not found in Cosmos DB (already deleted or never existed). 🗑️" st.session_state.delete_log.append(success_msg) return True, success_msg except exceptions.CosmosHttpResponseError as e: error_msg = f"HTTP error deleting {doc_id}: {str(e)}. 🚨" st.session_state.delete_log.append(error_msg) return False, error_msg except Exception as e: error_msg = f"Unexpected error deleting {doc_id}: {str(traceback.format_exc())}. 😱" st.session_state.delete_log.append(error_msg) return False, error_msg # 💾 Save a new document to Cosmos DB with extra fields def save_to_cosmos_db(container, query, response1, response2): try: if container: timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') unique_uuid = str(uuid.uuid4()) new_id = f"{timestamp}-{unique_uuid}" record = { "id": new_id, "name": new_id, "query": query, "response1": response1, "response2": response2, "timestamp": datetime.utcnow().isoformat(), "type": "ai_response", "version": "1.0" } container.create_item(body=record) st.success(f"Saved: {record['id']}") st.session_state.documents = get_documents(container) else: st.error("Cosmos container not initialized.") except Exception as e: st.error(f"Save error: {str(e)}") # 🗄️ Archive all documents in a container and provide a download link def archive_current_container(database_name, container_name, client): try: base_dir = "./cosmos_archive_current_container" if os.path.exists(base_dir): shutil.rmtree(base_dir) os.makedirs(base_dir) db_client = client.get_database_client(database_name) container_client = db_client.get_container_client(container_name) items = list(container_client.read_all_items()) container_dir = os.path.join(base_dir, container_name) os.makedirs(container_dir) for item in items: item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: json.dump(item, f, indent=2) archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" shutil.make_archive(archive_name, 'zip', base_dir) return get_download_link(f"{archive_name}.zip") except Exception as e: return f"Archive error: {str(e)} 😢" # %% ───────────── ADVANCED COSMOS FUNCTIONS ───────────── def create_new_container(database, container_id, partition_key_path, analytical_storage_ttl=None, indexing_policy=None, vector_embedding_policy=None): try: if analytical_storage_ttl is not None: container = database.create_container( id=container_id, partition_key=PartitionKey(path=partition_key_path), analytical_storage_ttl=analytical_storage_ttl, indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy ) else: container = database.create_container( id=container_id, partition_key=PartitionKey(path=partition_key_path), indexing_policy=indexing_policy, vector_embedding_policy=vector_embedding_policy ) except exceptions.CosmosResourceExistsError: container = database.get_container_client(container_id) except exceptions.CosmosHttpResponseError as e: st.error(f"Error creating container: {str(e)}") return None return container def advanced_insert_item(container, item): try: container.upsert_item(item) return True, f"Item {item.get('id', '')} inserted. ➕" except Exception as e: return False, str(e) def advanced_update_item(container, item): try: container.upsert_item(item) return True, f"Item {item.get('id', '')} updated. ✏️" except Exception as e: return False, str(e) def advanced_delete_item(container, item_id, partition_key_value): try: container.delete_item(item=item_id, partition_key=partition_key_value) return True, f"Item {item_id} deleted. 🗑️" except Exception as e: return False, str(e) def vector_search(container, query_vector, vector_field, top=10, exact_search=False): # Convert the vector list to a JSON string query_vector_str = json.dumps(query_vector) query = f"""SELECT TOP {top} c.id, VectorDistance(c.{vector_field}, {query_vector_str}, {str(exact_search).lower()}, {{'dataType':'float32','distanceFunction':'cosine'}}) AS SimilarityScore FROM c ORDER BY SimilarityScore""" results = list(container.query_items(query=query, enable_cross_partition_query=True)) return results # %% ───────────── GITHUB FUNCTIONS ───────────── def download_github_repo(url, local_path): if os.path.exists(local_path): shutil.rmtree(local_path) Repo.clone_from(url, local_path) def create_zip_file(source_dir, output_filename): shutil.make_archive(output_filename, 'zip', source_dir) def create_repo(g, repo_name): user = g.get_user() return user.create_repo(repo_name) def push_to_github(local_path, repo, github_token): repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" local_repo = Repo(local_path) if 'origin' in [remote.name for remote in local_repo.remotes]: origin = local_repo.remote('origin') origin.set_url(repo_url) else: origin = local_repo.create_remote('origin', repo_url) if not local_repo.heads: local_repo.git.checkout('-b', 'main') current_branch = 'main' else: current_branch = local_repo.active_branch.name local_repo.git.add(A=True) if local_repo.is_dirty(): local_repo.git.commit('-m', 'Initial commit') origin.push(refspec=f'{current_branch}:{current_branch}') # %% ───────────── FILE & MEDIA MANAGEMENT FUNCTIONS ───────────── def display_saved_files_in_sidebar(): all_files = sorted([f for f in glob.glob("*.md") if not f.lower().startswith('readme')], reverse=True) st.sidebar.markdown("## 📁 Files") for file in all_files: col1, col2, col3 = st.sidebar.columns([6, 2, 1]) with col1: st.markdown(f"📄 {file}") with col2: st.sidebar.download_button( label="⬇️", data=open(file, 'rb').read(), file_name=file ) with col3: if st.sidebar.button("🗑", key=f"delete_{file}"): os.remove(file) st.rerun() def display_file_viewer(file_path): content = load_file(file_path) if content: st.markdown("### 📄 File Viewer") st.markdown(f"**{file_path}**") file_stats = os.stat(file_path) st.markdown(f"**Mod:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')} | **Size:** {file_stats.st_size} bytes") st.markdown("---") st.markdown(content) st.download_button("⬇️", data=content, file_name=os.path.basename(file_path), mime="text/markdown") def display_file_editor(file_path): if 'file_content' not in st.session_state: st.session_state.file_content = {} if file_path not in st.session_state.file_content: content = load_file(file_path) if content is not None: st.session_state.file_content[file_path] = content else: return st.markdown("### ✏️ Edit File") st.markdown(f"**Editing:** {file_path}") md_tab, code_tab = st.tabs(["Markdown", "Code"]) with md_tab: st.markdown(st.session_state.file_content[file_path]) with code_tab: new_content = st.text_area("Edit:", value=st.session_state.file_content[file_path], height=400, key=f"editor_{hash(file_path)}") col1, col2 = st.columns([1, 5]) with col1: if st.button("💾 Save"): if save_file_content(file_path, new_content): st.session_state.file_content[file_path] = new_content st.success("Saved! 🎉") time.sleep(1) st.rerun() with col2: st.download_button("⬇️", data=new_content, file_name=os.path.basename(file_path), mime="text/markdown") def save_file_content(file_path, content): try: with open(file_path, 'w', encoding='utf-8') as file: file.write(content) return True except Exception as e: st.error(f"Save error: {str(e)}") return False def update_file_management_section(): if 'file_view_mode' not in st.session_state: st.session_state.file_view_mode = None if 'current_file' not in st.session_state: st.session_state.current_file = None if 'file_content' not in st.session_state: st.session_state.file_content = {} all_files = sorted(glob.glob("*.md"), reverse=True) st.sidebar.title("📁 Files") if st.sidebar.button("🗑 Delete All"): for file in all_files: os.remove(file) st.session_state.file_content = {} st.session_state.current_file = None st.session_state.file_view_mode = None st.rerun() if st.sidebar.button("⬇️ Download All"): zip_file = create_zip_of_files(all_files) st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True) for file in all_files: col1, col2, col3, col4 = st.sidebar.columns([1, 3, 1, 1]) with col1: if st.button("🌐", key=f"view_{file}"): st.session_state.current_file = file st.session_state.file_view_mode = 'view' if file not in st.session_state.file_content: content = load_file(file) if content is not None: st.session_state.file_content[file] = content st.rerun() with col2: st.markdown(get_download_link(file), unsafe_allow_html=True) with col3: if st.button("📂", key=f"edit_{file}"): st.session_state.current_file = file st.session_state.file_view_mode = 'edit' if file not in st.session_state.file_content: content = load_file(file) if content is not None: st.session_state.file_content[file] = content st.rerun() with col4: if st.button("🗑", key=f"delete_{file}"): os.remove(file) if file in st.session_state.file_content: del st.session_state.file_content[file] if st.session_state.current_file == file: st.session_state.current_file = None st.session_state.file_view_mode = None st.rerun() # ───── External Help Links Section ───── st.sidebar.markdown("---") st.sidebar.title("External Help Links") for link in external_links: st.sidebar.markdown(f"{link['emoji']} [{link['title']}]({link['url']})", unsafe_allow_html=True) if st.session_state.current_file: if st.session_state.file_view_mode == 'view': display_file_viewer(st.session_state.current_file) elif st.session_state.file_view_mode == 'edit': display_file_editor(st.session_state.current_file) # %% ───────────── VIDEO & AUDIO UI FUNCTIONS ───────────── def validate_and_preprocess_image(file_data, target_size=(576, 1024)): try: st.write("Preprocessing image...") if isinstance(file_data, bytes): img = Image.open(io.BytesIO(file_data)) elif hasattr(file_data, 'read'): if hasattr(file_data, 'seek'): file_data.seek(0) img = Image.open(file_data) elif isinstance(file_data, Image.Image): img = file_data else: raise ValueError(f"Unsupported input: {type(file_data)}") if img.mode != 'RGB': img = img.convert('RGB') aspect_ratio = img.size[0] / img.size[1] if aspect_ratio > target_size[0] / target_size[1]: new_width = target_size[0] new_height = int(new_width / aspect_ratio) else: new_height = target_size[1] new_width = int(new_height * aspect_ratio) new_width = (new_width // 2) * 2 new_height = (new_height // 2) * 2 resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) final_img = Image.new('RGB', target_size, (255, 255, 255)) paste_x = (target_size[0] - new_width) // 2 paste_y = (target_size[1] - new_height) // 2 final_img.paste(resized_img, (paste_x, paste_y)) return final_img except Exception as e: st.error(f"Image error: {str(e)}") return None def add_video_generation_ui(container): st.markdown("### 🎥 Video Gen") col1, col2 = st.columns([2, 1]) with col1: uploaded_file = st.file_uploader("Upload Image 🖼️", type=['png', 'jpg', 'jpeg']) with col2: st.markdown("#### Params") motion = st.slider("🌊 Motion", 1, 255, 127) fps = st.slider("🎬 FPS", 1, 30, 6) with st.expander("Advanced"): use_custom = st.checkbox("Custom Seed") seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None if uploaded_file is not None: try: file_data = uploaded_file.read() preview1, preview2 = st.columns(2) with preview1: st.write("Original") st.image(Image.open(io.BytesIO(file_data)), use_column_width=True) with preview2: proc_img = validate_and_preprocess_image(io.BytesIO(file_data)) if proc_img: st.write("Processed") st.image(proc_img, use_column_width=True) else: st.error("Preprocess failed") return if st.button("🎥 Generate"): with st.spinner("Generating video..."): with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file: proc_img.save(temp_file.name, format='PNG') try: client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN")) result = client.predict( image=temp_file.name, seed=seed if seed is not None else int(time.time() * 1000), randomize_seed=seed is None, motion_bucket_id=motion, fps_id=fps, api_name="/video" ) if result and isinstance(result, tuple) and len(result) >= 1: video_path = result[0].get('video') if isinstance(result[0], dict) else None if video_path and os.path.exists(video_path): video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" shutil.copy(video_path, video_filename) st.success(f"Video generated! 🎉") st.video(video_filename) if container: video_record = { "id": generate_unique_id(), "type": "generated_video", "filename": video_filename, "seed": seed if seed is not None else "random", "motion": motion, "fps": fps, "timestamp": datetime.now().isoformat() } success, message = insert_record(container, video_record) if success: st.success("DB record saved!") else: st.error(f"DB error: {message}") else: st.error("Invalid result format") else: st.error("No result returned") except Exception as e: st.error(f"Video gen error: {str(e)}") finally: try: os.unlink(temp_file.name) st.write("Temp file removed") except Exception as e: st.warning(f"Cleanup error: {str(e)}") except Exception as e: st.error(f"Upload error: {str(e)}") def main(): st.markdown("### 🐙 GitCosmos - Cosmos & Git Hub") if "chat_history" not in st.session_state: st.session_state.chat_history = [] # Auth & Cosmos client initialization if Key: st.session_state.primary_key = Key st.session_state.logged_in = True else: st.error("Missing Cosmos Key 🔑❌") return if st.session_state.logged_in: try: if st.session_state.get("client") is None: st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) st.sidebar.title("🐙 Navigator") databases = get_databases(st.session_state.client) selected_db = st.sidebar.selectbox("🗃️ DB", databases) st.markdown(CosmosDBUrl) if selected_db != st.session_state.get("selected_database"): st.session_state.selected_database = selected_db st.session_state.selected_container = None st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() if st.session_state.selected_database: database = st.session_state.client.get_database_client(st.session_state.selected_database) # ── New Container Button & Form Under DB Menu ── if "show_new_container_form" not in st.session_state: st.session_state.show_new_container_form = False if st.sidebar.button("🆕 New Container"): st.session_state.show_new_container_form = True if st.session_state.show_new_container_form: with st.sidebar.form("new_container_form"): new_container_id = st.text_input("Container ID", value="newContainer") new_partition_key = st.text_input("Partition Key", value="/id") new_analytical = st.checkbox("Enable Analytical Store", value=False) submitted = st.form_submit_button("Create Container") if submitted: analytical_ttl = -1 if new_analytical else None new_container = create_new_container( database, new_container_id, new_partition_key, analytical_storage_ttl=analytical_ttl ) if new_container: st.success(f"Container '{new_container_id}' created.") st.session_state.show_new_container_form = False st.session_state.new_container_created = new_container_id st.rerun() # ── Get and Update Container List ── containers = get_containers(database) if "new_container_created" in st.session_state and st.session_state.new_container_created not in containers: containers.append(st.session_state.new_container_created) selected_container = st.sidebar.selectbox("📁 Container", containers) if selected_container != st.session_state.get("selected_container"): st.session_state.selected_container = selected_container st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() if st.session_state.selected_container: container = database.get_container_client(st.session_state.selected_container) # ... rest of your container operations code follows here ... except exceptions.CosmosHttpResponseError as e: st.error(f"Cosmos error: {str(e)} 🚨") except Exception as e: st.error(f"Error: {str(e)} 😱") if st.session_state.logged_in and st.sidebar.button("🚪 Logout"): st.markdown("#### 🚪 Logout") st.session_state.logged_in = False st.session_state.selected_records = [] st.session_state.client = None st.session_state.selected_database = None st.session_state.selected_container = None st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() if __name__ == "__main__": main()