import streamlit as st import anthropic import os import base64 import glob import json import pytz from datetime import datetime from streamlit.components.v1 import html from PIL import Image import re #from urllib.parse import quote from azure.cosmos import CosmosClient, exceptions import pandas as pd import traceback import shutil from github import Github from git import Repo import uuid # 🎲 For generating unique IDs from urllib.parse import quote # πŸ”— For encoding URLs from gradio_client import Client # 🌐 For connecting to Gradio apps # πŸŽ‰ Welcome to our fun-filled Cosmos DB and GitHub Integration app! # 1. App Configuration Site_Name = 'πŸ€–πŸ§ Claude35πŸ“πŸ”¬' title="πŸ€–πŸ§ Claude35πŸ“πŸ”¬" helpURL='https://huggingface.co/awacke1' bugURL='https://huggingface.co/spaces/awacke1' icons='πŸ€–πŸ§ πŸ”¬πŸ“' st.set_page_config( page_title=title, page_icon=icons, layout="wide", initial_sidebar_state="auto", menu_items={ 'Get Help': helpURL, 'Report a bug': bugURL, 'About': title } ) # 🌌 Cosmos DB configuration ENDPOINT = "https://acae-afd.documents.azure.com:443/" DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") Key = os.environ.get("Key") # πŸ”‘ Don't forget your key! # 🏠 Your local app URL (Change this to your app's URL) LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI" # πŸ€– OpenAI configuration # openai.api_key = os.environ.get("OPENAI_API_KEY") # MODEL = "gpt-3.5-turbo" # Replace with your desired model # πŸ™ GitHub configuration def download_github_repo(url, local_path): # 🚚 Let's download that GitHub repo! if os.path.exists(local_path): shutil.rmtree(local_path) Repo.clone_from(url, local_path) def create_zip_file(source_dir, output_filename): # πŸ“¦ Zipping up files like a pro! shutil.make_archive(output_filename, 'zip', source_dir) def create_repo(g, repo_name): # πŸ› οΈ Creating a new GitHub repo. Magic! user = g.get_user() return user.create_repo(repo_name) def push_to_github(local_path, repo, github_token): # πŸš€ Pushing code to GitHub. Hold on tight! repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" local_repo = Repo(local_path) if 'origin' in [remote.name for remote in local_repo.remotes]: origin = local_repo.remote('origin') origin.set_url(repo_url) else: origin = local_repo.create_remote('origin', repo_url) if not local_repo.heads: local_repo.git.checkout('-b', 'main') current_branch = 'main' else: current_branch = local_repo.active_branch.name local_repo.git.add(A=True) if local_repo.is_dirty(): local_repo.git.commit('-m', 'Initial commit') origin.push(refspec=f'{current_branch}:{current_branch}') def get_base64_download_link(file_path, file_name): # πŸ§™β€β™‚οΈ Generating a magical download link! with open(file_path, "rb") as file: contents = file.read() base64_encoded = base64.b64encode(contents).decode() return f'⬇️ Download {file_name}' # 🧭 New functions for dynamic sidebar navigation def get_databases(client): # πŸ“š Fetching list of databases. So many options! return [db['id'] for db in client.list_databases()] def get_containers(database): # πŸ“‚ Getting containers. Containers within containers! return [container['id'] for container in database.list_containers()] def get_documents(container, limit=None): # πŸ“ Retrieving documents. Shhh, don't tell anyone! query = "SELECT * FROM c ORDER BY c._ts DESC" items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) return items # 🌟 Cosmos DB functions def insert_record(container, record): # πŸ“₯ Inserting a record into the Cosmosβ€”hope we don't disturb any aliens! πŸ‘½ try: container.create_item(body=record) return True, "Record inserted successfully! πŸŽ‰" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)} 🚨" except Exception as e: return False, f"An unexpected error occurred: {str(e)} 😱" def update_record(container, updated_record): # πŸ”„ Updating a recordβ€”giving it a cosmic makeover! ✨ try: container.upsert_item(body=updated_record) return True, f"Record with id {updated_record['id']} successfully updated. πŸ› οΈ" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)} 🚨" except Exception as e: return False, f"An unexpected error occurred: {traceback.format_exc()} 😱" def delete_record(container, name, id): # πŸ—‘οΈ Deleting a recordβ€”sending it into the cosmic void! 🌌 try: container.delete_item(item=id, partition_key=id) return True, f"Successfully deleted record with name: {name} and id: {id} πŸ—‘οΈ" except exceptions.CosmosResourceNotFoundError: return False, f"Record with id {id} not found. It may have been already deleted. πŸ•΅οΈβ€β™‚οΈ" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)} 🚨" except Exception as e: return False, f"An unexpected error occurred: {traceback.format_exc()} 😱" # 🎲 Function to generate a unique UUID def generate_unique_id(): # πŸ§™β€β™‚οΈ Generating a unique UUID! return str(uuid.uuid4()) # πŸ“¦ Function to archive current container def archive_current_container(database_name, container_name, client): # πŸ“¦ Archiving the entire containerβ€”time to pack up the stars! 🌠 try: base_dir = "./cosmos_archive_current_container" if os.path.exists(base_dir): shutil.rmtree(base_dir) os.makedirs(base_dir) db_client = client.get_database_client(database_name) container_client = db_client.get_container_client(container_name) items = list(container_client.read_all_items()) container_dir = os.path.join(base_dir, container_name) os.makedirs(container_dir) for item in items: item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: json.dump(item, f, indent=2) archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" shutil.make_archive(archive_name, 'zip', base_dir) return get_base64_download_link(f"{archive_name}.zip", f"{archive_name}.zip") except Exception as e: return f"An error occurred while archiving data: {str(e)} 😒" # πŸ”— Helper to extract hyperlinks def extract_hyperlinks(responses): # πŸ”— Extracting hyperlinksβ€”connecting the dots across the universe! πŸ•ΈοΈ hyperlinks = [] for response in responses: parsed_response = json.loads(response) links = [value for key, value in parsed_response.items() if isinstance(value, str) and value.startswith("http")] hyperlinks.extend(links) return hyperlinks # πŸ“‹ Helper to format text with line numbers def format_with_line_numbers(text): # πŸ“‹ Formatting text with line numbersβ€”organizing the cosmos one line at a time! πŸ“ lines = text.splitlines() formatted_text = '\n'.join(f"{i+1}: {line}" for i, line in enumerate(lines)) return formatted_text def generate_unique_id(): return str(uuid.uuid4()) def get_databases(client): return [db['id'] for db in client.list_databases()] def get_containers(database): return [container['id'] for container in database.list_containers()] def get_documents(container, limit=None): query = "SELECT * FROM c ORDER BY c._ts DESC" items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) return items def save_to_cosmos_db(container, query, response1, response2): try: if container: record = { "id": generate_unique_id(), "query": query, "response1": response1, "response2": response2 } try: container.create_item(body=record) st.success(f"Record saved successfully with ID: {record['id']}") # Refresh the documents display st.session_state.documents = get_documents(container) except exceptions.CosmosHttpResponseError as e: st.error(f"Error saving record to Cosmos DB: {e}") else: st.error("Cosmos DB container is not initialized.") except Exception as e: st.error(f"An unexpected error occurred: {str(e)}") # Add dropdowns for model and database choices def search_glossary(query): st.markdown(f"### πŸ” Search Glossary for: `{query}`") # Dropdown for model selection model_options = ['mistralai/Mixtral-8x7B-Instruct-v0.1', 'mistralai/Mistral-7B-Instruct-v0.2', 'google/gemma-7b-it', 'None'] model_choice = st.selectbox('🧠 Select LLM Model', options=model_options, index=1) # Dropdown for database selection database_options = ['Semantic Search', 'Arxiv Search - Latest - (EXPERIMENTAL)'] database_choice = st.selectbox('πŸ“š Select Database', options=database_options, index=0) # Run Button with Emoji #if st.button("πŸš€ Run"): # πŸ•΅οΈβ€β™‚οΈ Searching the glossary for: query all_results = "" st.markdown(f"- {query}") # πŸ” ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM #database_choice Literal['Semantic Search', 'Arxiv Search - Latest - (EXPERIMENTAL)'] Default: "Semantic Search" #llm_model_picked Literal['mistralai/Mixtral-8x7B-Instruct-v0.1', 'mistralai/Mistral-7B-Instruct-v0.2', 'google/gemma-7b-it', 'None'] Default: "mistralai/Mistral-7B-Instruct-v0.2" client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") # πŸ” ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /ask_llm result = client.predict( prompt=query, llm_model_picked="mistralai/Mixtral-8x7B-Instruct-v0.1", stream_outputs=True, api_name="/ask_llm" ) st.markdown(result) st.code(result, language="python", line_numbers=True) # πŸ” ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /ask_llm result2 = client.predict( prompt=query, llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2", stream_outputs=True, api_name="/ask_llm" ) st.markdown(result2) st.code(result2, language="python", line_numbers=True) # πŸ” ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /ask_llm result3 = client.predict( prompt=query, llm_model_picked="google/gemma-7b-it", stream_outputs=True, api_name="/ask_llm" ) st.markdown(result3) st.code(result3, language="python", line_numbers=True) # πŸ” ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /update_with_rag_md response2 = client.predict( message=query, # str in 'parameter_13' Textbox component llm_results_use=10, database_choice="Semantic Search", llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2", api_name="/update_with_rag_md" ) # update_with_rag_md Returns tuple of 2 elements [0] str The output value that appears in the "value_14" Markdown component. [1] str st.markdown(response2[0]) st.code(response2[0], language="python", line_numbers=True, wrap_lines=True) st.markdown(response2[1]) st.code(response2[1], language="python", line_numbers=True, wrap_lines=True) # When saving results, pass the container try: save_to_cosmos_db(st.session_state.cosmos_container, query, result, result) save_to_cosmos_db(st.session_state.cosmos_container, query, result2, result2) save_to_cosmos_db(st.session_state.cosmos_container, query, result3, result3) save_to_cosmos_db(st.session_state.cosmos_container, query, response2[0], response2[0]) save_to_cosmos_db(st.session_state.cosmos_container, query, response2[1], response2[1]) except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)} 🚨" except Exception as e: return False, f"An unexpected error occurred: {str(e)} 😱" try: # Aggregate hyperlinks and show with emojis hyperlinks = extract_hyperlinks([response1, response2]) st.markdown("### πŸ”— Aggregated Hyperlinks") for link in hyperlinks: st.markdown(f"πŸ”— [{link}]({link})") # Show responses in a code format with line numbers st.markdown("### πŸ“œ Response Outputs with Line Numbers") st.code(f"Response 1: \n{format_with_line_numbers(response1)}\n\nResponse 2: \n{format_with_line_numbers(response2)}", language="json") except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)} 🚨" except Exception as e: return False, f"An unexpected error occurred: {str(e)} 😱" # 🎀 Function to process text input def process_text(text_input): # 🎀 Processing text inputβ€”translating human words into cosmic signals! πŸ“‘ if text_input: if 'messages' not in st.session_state: st.session_state.messages = [] st.session_state.messages.append({"role": "user", "content": text_input}) with st.chat_message("user"): st.markdown(text_input) with st.chat_message("assistant"): search_glossary(text_input) # πŸ“ Function to generate a filename def generate_filename(text, file_type): # πŸ“ Generate a filename based on the text input safe_text = "".join(c if c.isalnum() or c in (' ', '.', '_') else '_' for c in text) safe_text = "_".join(safe_text.strip().split()) filename = f"{safe_text}.{file_type}" return filename # πŸ•΅οΈβ€β™€οΈ Function to extract markdown title def extract_markdown_title(content): # πŸ•΅οΈβ€β™€οΈ Extracting markdown titleβ€”finding the headline in the cosmic news! πŸ“° lines = content.splitlines() for line in lines: if line.startswith('#'): return line.lstrip('#').strip() return None # πŸ’Ύ Function to create and save a file def create_and_save_file(content, file_type="md", prompt=None, is_image=False, should_save=True): # πŸ’Ύ Creating and saving a fileβ€”capturing cosmic wisdom! πŸ“ if not should_save: return None # Step 1: Generate filename based on the prompt or content filename = generate_filename(prompt if prompt else content, file_type) # Step 2: If it's a markdown file, check if it has a title if file_type == "md": title_from_content = extract_markdown_title(content) if title_from_content: filename = generate_filename(title_from_content, file_type) # Step 3: Save the file with open(filename, "w", encoding="utf-8") as f: if is_image: f.write(content) else: f.write(prompt + "\n\n" + content) return filename # πŸ€– Function to insert an auto-generated record def insert_auto_generated_record(container): # πŸ€– Automatically generating a record and inserting it into Cosmos DB! try: # Generate a unique id new_id = generate_unique_id() # Create a sample JSON document new_doc = { 'id': new_id, 'name': f'Sample Name {new_id[:8]}', 'description': 'This is a sample auto-generated description.', 'timestamp': datetime.utcnow().isoformat() } # Insert the document container.create_item(body=new_doc) return True, f"Record inserted successfully with id: {new_id} πŸŽ‰" except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)} 🚨" except Exception as e: return False, f"An unexpected error occurred: {str(e)} 😱" # 🎈 Main function def main(): # 🎈 Let's modify the main app to be more fun! st.title("πŸ™Git🌌CosmosπŸ’« - Azure Cosmos DB and Github Agent") # 🚦 Initialize session state if 'logged_in' not in st.session_state: st.session_state.logged_in = False if 'selected_records' not in st.session_state: st.session_state.selected_records = [] if 'client' not in st.session_state: st.session_state.client = None if 'selected_database' not in st.session_state: st.session_state.selected_database = None if 'selected_container' not in st.session_state: st.session_state.selected_container = None if 'selected_document_id' not in st.session_state: st.session_state.selected_document_id = None if 'current_index' not in st.session_state: st.session_state.current_index = 0 if 'cloned_doc' not in st.session_state: st.session_state.cloned_doc = None # βš™οΈ q= Run ArXiv search from query parameters try: query_params = st.query_params query = query_params.get('q') or query_params.get('query') or '' if query: # πŸ•΅οΈβ€β™‚οΈ We have a query! Let's process it! process_text(query) st.stop() # Stop further execution except Exception as e: st.markdown(' ') # πŸ” Automatic Login if Key: st.session_state.primary_key = Key st.session_state.logged_in = True else: st.error("Cosmos DB Key is not set in environment variables. πŸ”‘βŒ") return # Can't proceed without a key if st.session_state.logged_in: # 🌌 Initialize Cosmos DB client try: if st.session_state.client is None: st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) # πŸ—„οΈ Sidebar for database, container, and document selection st.sidebar.title("πŸ™Git🌌CosmosπŸ’«πŸ—„οΈNavigator") databases = get_databases(st.session_state.client) selected_db = st.sidebar.selectbox("πŸ—ƒοΈ Select Database", databases) if selected_db != st.session_state.selected_database: st.session_state.selected_database = selected_db st.session_state.selected_container = None st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() if st.session_state.selected_database: database = st.session_state.client.get_database_client(st.session_state.selected_database) containers = get_containers(database) selected_container = st.sidebar.selectbox("πŸ“ Select Container", containers) if selected_container != st.session_state.selected_container: st.session_state.selected_container = selected_container st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() if st.session_state.selected_container: container = database.get_container_client(st.session_state.selected_container) # πŸ“¦ Add Export button if st.button("πŸ“¦ Export Container Data"): download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client) if download_link.startswith(' 5: documents_to_display = documents[:5] st.info("Showing top 5 most recent documents.") else: documents_to_display = documents st.info(f"Showing all {len(documents_to_display)} documents.") if documents_to_display: # 🎨 Add Viewer/Editor selection view_options = ['Show as Markdown', 'Show as Code Editor', 'Show as Edit and Save', 'Clone Document', 'New Record'] selected_view = st.selectbox("Select Viewer/Editor", view_options, index=2) if selected_view == 'Show as Markdown': # πŸ–ŒοΈ Show each record as Markdown with navigation total_docs = len(documents) doc = documents[st.session_state.current_index] st.markdown(f"#### Document ID: {doc.get('id', '')}") # πŸ•΅οΈβ€β™‚οΈ Let's extract values from the JSON that have at least one space values_with_space = [] def extract_values(obj): if isinstance(obj, dict): for k, v in obj.items(): extract_values(v) elif isinstance(obj, list): for item in obj: extract_values(item) elif isinstance(obj, str): if ' ' in obj: values_with_space.append(obj) extract_values(doc) # πŸ”— Let's create a list of links for these values search_urls = { "πŸš€πŸŒŒArXiv": lambda k: f"{LOCAL_APP_URL}/?q={quote(k)}", "πŸƒAnalyst": lambda k: f"{LOCAL_APP_URL}/?q={quote(k)}-{quote('PromptPrefix')}", "πŸ“šPyCoder": lambda k: f"{LOCAL_APP_URL}/?q={quote(k)}-{quote('PromptPrefix2')}", "πŸ”¬JSCoder": lambda k: f"{LOCAL_APP_URL}/?q={quote(k)}-{quote('PromptPrefix3')}", "🏠": lambda k: f"{LOCAL_APP_URL}/?q={quote(k)}", "πŸ“–": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", "πŸ”": lambda k: f"https://www.google.com/search?q={quote(k)}", "▢️": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", "πŸ”Ž": lambda k: f"https://www.bing.com/search?q={quote(k)}", "πŸŽ₯": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", "🐦": lambda k: f"https://twitter.com/search?q={quote(k)}", } st.markdown("#### πŸ”— Links for Extracted Texts") for term in values_with_space: links_md = ' '.join([f"[{emoji}]({url(term)})" for emoji, url in search_urls.items()]) st.markdown(f"**{term}** {links_md}", unsafe_allow_html=True) # Show the document content as markdown content = json.dumps(doc, indent=2) st.markdown(f"```json\n{content}\n```") # Navigation buttons col_prev, col_next = st.columns([1, 1]) with col_prev: if st.button("⬅️ Previous", key='prev_markdown'): if st.session_state.current_index > 0: st.session_state.current_index -= 1 st.rerun() with col_next: if st.button("➑️ Next", key='next_markdown'): if st.session_state.current_index < total_docs - 1: st.session_state.current_index += 1 st.rerun() elif selected_view == 'Show as Code Editor': # πŸ’» Show each record in a code editor with navigation total_docs = len(documents) doc = documents[st.session_state.current_index] st.markdown(f"#### Document ID: {doc.get('id', '')}") doc_str = st.text_area("Edit Document", value=json.dumps(doc, indent=2), height=300, key=f'code_editor_{st.session_state.current_index}') col_prev, col_next = st.columns([1, 1]) with col_prev: if st.button("⬅️ Previous", key='prev_code'): if st.session_state.current_index > 0: st.session_state.current_index -= 1 st.rerun() with col_next: if st.button("➑️ Next", key='next_code'): if st.session_state.current_index < total_docs - 1: st.session_state.current_index += 1 st.rerun() if st.button("πŸ’Ύ Save Changes", key=f'save_button_{st.session_state.current_index}'): try: updated_doc = json.loads(doc_str) success, message = update_record(container, updated_doc) if success: st.success(f"Document {updated_doc['id']} saved successfully.") st.session_state.selected_document_id = updated_doc['id'] st.rerun() else: st.error(message) except json.JSONDecodeError as e: st.error(f"Invalid JSON: {str(e)} 🚫") elif selected_view == 'Show as Edit and Save': # ✏️ Show as Edit and Save in columns st.markdown("#### Edit the document fields below:") # Create columns for each document num_cols = len(documents_to_display) cols = st.columns(num_cols) for idx, (col, doc) in enumerate(zip(cols, documents_to_display)): with col: st.markdown(f"##### Document ID: {doc.get('id', '')}") editable_id = st.text_input("ID", value=doc.get('id', ''), key=f'edit_id_{idx}') # Remove 'id' from the document for editing other fields editable_doc = doc.copy() editable_doc.pop('id', None) doc_str = st.text_area("Document Content (in JSON format)", value=json.dumps(editable_doc, indent=2), height=300, key=f'doc_str_{idx}') # Add the "Run With AI" button next to "Save Changes" col_save, col_ai = st.columns(2) with col_save: if st.button("πŸ’Ύ Save Changes", key=f'save_button_{idx}'): try: updated_doc = json.loads(doc_str) updated_doc['id'] = editable_id # Include the possibly edited ID success, message = update_record(container, updated_doc) if success: st.success(f"Document {updated_doc['id']} saved successfully.") st.session_state.selected_document_id = updated_doc['id'] st.rerun() else: st.error(message) except json.JSONDecodeError as e: st.error(f"Invalid JSON: {str(e)} 🚫") with col_ai: if st.button("πŸ€– Run With AI", key=f'run_with_ai_button_{idx}'): # Use the entire document as input search_glossary(json.dumps(editable_doc, indent=2)) elif selected_view == 'Clone Document': # 🧬 Clone Document per record st.markdown("#### Clone a document:") for idx, doc in enumerate(documents_to_display): st.markdown(f"##### Document ID: {doc.get('id', '')}") if st.button("πŸ“„ Clone Document", key=f'clone_button_{idx}'): cloned_doc = doc.copy() # Generate a unique ID cloned_doc['id'] = generate_unique_id() st.session_state.cloned_doc = cloned_doc st.session_state.cloned_doc_str = json.dumps(cloned_doc, indent=2) st.session_state.clone_mode = True st.rerun() if st.session_state.get('clone_mode', False): st.markdown("#### Edit Cloned Document:") cloned_doc_str = st.text_area("Cloned Document Content (in JSON format)", value=st.session_state.cloned_doc_str, height=300) if st.button("πŸ’Ύ Save Cloned Document"): try: new_doc = json.loads(cloned_doc_str) success, message = insert_record(container, new_doc) if success: st.success(f"Cloned document saved with id: {new_doc['id']} πŸŽ‰") st.session_state.selected_document_id = new_doc['id'] st.session_state.clone_mode = False st.session_state.cloned_doc = None st.session_state.cloned_doc_str = '' st.rerun() else: st.error(message) except json.JSONDecodeError as e: st.error(f"Invalid JSON: {str(e)} 🚫") elif selected_view == 'New Record': # πŸ†• New Record st.markdown("#### Create a new document:") if st.button("πŸ€– Insert Auto-Generated Record"): success, message = insert_auto_generated_record(container) if success: st.success(message) st.rerun() else: st.error(message) else: new_id = st.text_input("ID", value=generate_unique_id(), key='new_id') new_doc_str = st.text_area("Document Content (in JSON format)", value='{}', height=300) if st.button("βž• Create New Document"): try: new_doc = json.loads(new_doc_str) new_doc['id'] = new_id # Use the provided ID success, message = insert_record(container, new_doc) if success: st.success(f"New document created with id: {new_doc['id']} πŸŽ‰") st.session_state.selected_document_id = new_doc['id'] # Switch to 'Show as Edit and Save' mode st.rerun() else: st.error(message) except json.JSONDecodeError as e: st.error(f"Invalid JSON: {str(e)} 🚫") else: st.sidebar.info("No documents found in this container. πŸ“­") # πŸŽ‰ Main content area st.subheader(f"πŸ“Š Container: {st.session_state.selected_container}") if st.session_state.selected_container: if documents_to_display: df = pd.DataFrame(documents_to_display) st.dataframe(df) else: st.info("No documents to display. 🧐") # πŸ™ GitHub section st.subheader("πŸ™ GitHub Operations") github_token = os.environ.get("GITHUB") # Read GitHub token from environment variable source_repo = st.text_input("Source GitHub Repository URL", value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit") new_repo_name = st.text_input("New Repository Name (for cloning)", value=f"AIExample-Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}") col1, col2 = st.columns(2) with col1: if st.button("πŸ“₯ Clone Repository"): if github_token and source_repo: try: local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}" download_github_repo(source_repo, local_path) zip_filename = f"{new_repo_name}.zip" create_zip_file(local_path, zip_filename[:-4]) st.markdown(get_base64_download_link(zip_filename, zip_filename), unsafe_allow_html=True) st.success("Repository cloned successfully! πŸŽ‰") except Exception as e: st.error(f"An error occurred: {str(e)} 😒") finally: if os.path.exists(local_path): shutil.rmtree(local_path) if os.path.exists(zip_filename): os.remove(zip_filename) else: st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided. πŸ”‘β“") with col2: if st.button("πŸ“€ Push to New Repository"): if github_token and source_repo: try: g = Github(github_token) new_repo = create_repo(g, new_repo_name) local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}" download_github_repo(source_repo, local_path) push_to_github(local_path, new_repo, github_token) st.success(f"Repository pushed successfully to {new_repo.html_url} πŸš€") except Exception as e: st.error(f"An error occurred: {str(e)} 😒") finally: if os.path.exists(local_path): shutil.rmtree(local_path) else: st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided. πŸ”‘β“") except exceptions.CosmosHttpResponseError as e: st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)} 🚨") except Exception as e: st.error(f"An unexpected error occurred: {str(e)} 😱") # πŸšͺ Logout button if st.session_state.logged_in and st.sidebar.button("πŸšͺ Logout"): st.session_state.logged_in = False st.session_state.selected_records.clear() st.session_state.client = None st.session_state.selected_database = None st.session_state.selected_container = None st.session_state.selected_document_id = None st.session_state.current_index = 0 st.rerun() # Set up the Anthropic client client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) # Initialize session state if "chat_history" not in st.session_state: st.session_state.chat_history = [] # Helper Functions: All Your Essentials πŸš€ # Function to get a file download link (because you deserve easy downloads 😎) def get_download_link(file_path): with open(file_path, "rb") as file: contents = file.read() b64 = base64.b64encode(contents).decode() file_name = os.path.basename(file_path) return f'Download {file_name}πŸ“‚' # Function to generate a filename based on prompt and time (because names matter πŸ•’) def generate_filename(prompt, file_type): central = pytz.timezone('US/Central') safe_date_time = datetime.now(central).strftime("%m%d_%H%M") safe_prompt = re.sub(r'\W+', '_', prompt)[:90] return f"{safe_date_time}_{safe_prompt}.{file_type}" # Function to create and save a file (and avoid the black hole of lost data πŸ•³) def create_file(filename, prompt, response, should_save=True): if not should_save: return with open(filename, 'w', encoding='utf-8') as file: file.write(prompt + "\n\n" + response) # Function to load file content (for revisiting the past πŸ“œ) def load_file(file_name): with open(file_name, "r", encoding='utf-8') as file: content = file.read() return content # Function to display handy glossary entity links (search like a pro πŸ”) def display_glossary_entity(k): search_urls = { "πŸš€πŸŒŒArXiv": lambda k: f"/?q={quote(k)}", "πŸ“–": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", "πŸ”": lambda k: f"https://www.google.com/search?q={quote(k)}", "πŸŽ₯": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", } links_md = ' '.join([f"[{emoji}]({url(k)})" for emoji, url in search_urls.items()]) st.markdown(f"**{k}** {links_md}", unsafe_allow_html=True) # Function to create zip of files (because more is more 🧳) def create_zip_of_files(files): import zipfile zip_name = "all_files.zip" with zipfile.ZipFile(zip_name, 'w') as zipf: for file in files: zipf.write(file) return zip_name # Function to create HTML for autoplaying and looping video (for the full cinematic effect πŸŽ₯) def get_video_html(video_path, width="100%"): video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" return f''' ''' # Function to create HTML for audio player (when life needs a soundtrack 🎢) def get_audio_html(audio_path, width="100%"): audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" return f''' ''' # Streamlit App Layout (like a house with better flow 🏑) def main(): # Sidebar with Useful Controls (All the VIP actions πŸŽ›) st.sidebar.title("🧠ClaudeπŸ“") all_files = glob.glob("*.md") all_files.sort(reverse=True) if st.sidebar.button("πŸ—‘ Delete All"): for file in all_files: os.remove(file) st.rerun() if st.sidebar.button("⬇️ Download All"): zip_file = create_zip_of_files(all_files) st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True) for file in all_files: col1, col2, col3, col4 = st.sidebar.columns([1,3,1,1]) with col1: if st.button("🌐", key="view_"+file): st.session_state.current_file = file st.session_state.file_content = load_file(file) with col2: st.markdown(get_download_link(file), unsafe_allow_html=True) with col3: if st.button("πŸ“‚", key="edit_"+file): st.session_state.current_file = file st.session_state.file_content = load_file(file) with col4: if st.button("πŸ—‘", key="delete_"+file): os.remove(file) st.rerun() # Main Area: Chat with Claude (He’s a good listener πŸ’¬) user_input = st.text_area("Message πŸ“¨:", height=100) if st.button("Send πŸ“¨"): if user_input: response = client.messages.create( model="claude-3-sonnet-20240229", max_tokens=1000, messages=[ {"role": "user", "content": user_input} ] ) st.write("Claude's reply 🧠:") st.write(response.content[0].text) filename = generate_filename(user_input, "md") create_file(filename, user_input, response.content[0].text) st.session_state.chat_history.append({"user": user_input, "claude": response.content[0].text}) # Display Chat History (Never forget a good chat πŸ’­) st.subheader("Past Conversations πŸ“œ") for chat in st.session_state.chat_history: st.text_area("You said πŸ’¬:", chat["user"], height=100, disabled=True) st.text_area("Claude replied πŸ€–:", chat["claude"], height=200, disabled=True) st.markdown("---") # File Editor (When you need to tweak things ✏️) if hasattr(st.session_state, 'current_file'): st.subheader(f"Editing: {st.session_state.current_file} πŸ› ") new_content = st.text_area("File Content ✏️:", st.session_state.file_content, height=300) if st.button("Save Changes πŸ’Ύ"): with open(st.session_state.current_file, 'w', encoding='utf-8') as file: file.write(new_content) st.success("File updated successfully! πŸŽ‰") # Image Gallery (For your viewing pleasure πŸ“Έ) st.subheader("Image Gallery πŸ–Ό") image_files = glob.glob("*.png") + glob.glob("*.jpg") + glob.glob("*.jpeg") image_cols = st.slider("Gallery Columns πŸ–Ό", min_value=1, max_value=15, value=5) cols = st.columns(image_cols) for idx, image_file in enumerate(image_files): with cols[idx % image_cols]: img = Image.open(image_file) #st.image(img, caption=image_file, use_column_width=True) st.image(img, use_column_width=True) display_glossary_entity(os.path.splitext(image_file)[0]) # Video Gallery (Let’s roll the tapes 🎬) st.subheader("Video Gallery πŸŽ₯") video_files = glob.glob("*.mp4") video_cols = st.slider("Gallery Columns 🎬", min_value=1, max_value=5, value=3) cols = st.columns(video_cols) for idx, video_file in enumerate(video_files): with cols[idx % video_cols]: st.markdown(get_video_html(video_file, width="100%"), unsafe_allow_html=True) display_glossary_entity(os.path.splitext(video_file)[0]) # Audio Gallery (Tunes for the mood 🎢) st.subheader("Audio Gallery 🎧") audio_files = glob.glob("*.mp3") + glob.glob("*.wav") audio_cols = st.slider("Gallery Columns 🎢", min_value=1, max_value=15, value=5) cols = st.columns(audio_cols) for idx, audio_file in enumerate(audio_files): with cols[idx % audio_cols]: st.markdown(get_audio_html(audio_file, width="100%"), unsafe_allow_html=True) display_glossary_entity(os.path.splitext(audio_file)[0]) if __name__ == "__main__": main()