Spaces:
Running
Running
import streamlit as st | |
from azure.cosmos import CosmosClient, exceptions | |
import os | |
import pandas as pd | |
import traceback | |
import shutil | |
from github import Github | |
from git import Repo | |
from datetime import datetime | |
import base64 | |
import json | |
import uuid | |
from urllib.parse import quote | |
from gradio_client import Client | |
import anthropic | |
import glob | |
import pytz | |
import re | |
from PIL import Image | |
import zipfile | |
import time | |
# App Configuration | |
Site_Name = 'πGitπCosmosπ« - Azure Cosmos DB and Github Agent' | |
title = "πGitπCosmosπ« - Azure Cosmos DB and Github Agent" | |
helpURL = 'https://huggingface.co/awacke1' | |
bugURL = 'https://huggingface.co/spaces/awacke1' | |
icons = 'πππ«' | |
st.set_page_config( | |
page_title=title, | |
page_icon=icons, | |
layout="wide", | |
initial_sidebar_state="auto", | |
menu_items={ | |
'Get Help': helpURL, | |
'Report a bug': bugURL, | |
'About': title | |
} | |
) | |
# Cosmos DB configuration | |
ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") | |
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") | |
Key = os.environ.get("Key") | |
# Your local app URL (Change this to your app's URL) | |
LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI" | |
# Anthropic configuration | |
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) | |
# Initialize session state | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
# Helper Functions | |
def get_download_link(file_path): | |
with open(file_path, "rb") as file: | |
contents = file.read() | |
b64 = base64.b64encode(contents).decode() | |
file_name = os.path.basename(file_path) | |
return f'<a href="data:file/txt;base64,{b64}" download="{file_name}">Download {file_name}π</a>' | |
def generate_unique_id(): | |
now = datetime.now() | |
date_time = now.strftime("%d%m%Y%H%M%S") | |
ms = now.microsecond // 1000 | |
unique_id = f"{date_time}{ms:03d}" | |
return unique_id | |
def generate_filename(prompt, file_type): | |
central = pytz.timezone('US/Central') | |
safe_date_time = datetime.now(central).strftime("%m%d_%H%M") | |
safe_prompt = re.sub(r'\W+', '', prompt)[:90] | |
return f"{safe_date_time}{safe_prompt}.{file_type}" | |
def create_file(filename, prompt, response, should_save=True): | |
if not should_save: | |
return | |
with open(filename, 'w', encoding='utf-8') as file: | |
file.write(prompt + "\n\n" + response) | |
def load_file(file_name): | |
with open(file_name, "r", encoding='utf-8') as file: | |
content = file.read() | |
return content | |
def display_glossary_entity(k): | |
search_urls = { | |
"ππArXiv": lambda k: f"/?q={quote(k)}", | |
"π": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}", | |
"π": lambda k: f"https://www.google.com/search?q={quote(k)}", | |
"π₯": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}", | |
} | |
links_md = ' '.join([f"<a href='{url(k)}' target='_blank'>{emoji}</a>" for emoji, url in search_urls.items()]) | |
st.markdown(f"{k} {links_md}", unsafe_allow_html=True) | |
def create_zip_of_files(files): | |
zip_name = "all_files.zip" | |
with zipfile.ZipFile(zip_name, 'w') as zipf: | |
for file in files: | |
zipf.write(file) | |
return zip_name | |
def get_video_html(video_path, width="100%"): | |
video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" | |
return f''' | |
<video width="{width}" controls autoplay loop> | |
<source src="{video_url}" type="video/mp4"> | |
Your browser does not support the video tag. | |
</video> | |
''' | |
def get_audio_html(audio_path, width="100%"): | |
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" | |
return f''' | |
<audio controls style="width:{width}"> | |
<source src="{audio_url}" type="audio/mpeg"> | |
Your browser does not support the audio element. | |
</audio> | |
''' | |
# Cosmos DB functions | |
def get_databases(client): | |
return [db['id'] for db in client.list_databases()] | |
def get_containers(database): | |
return [container['id'] for container in database.list_containers()] | |
def get_documents(container, limit=None): | |
query = "SELECT * FROM c ORDER BY c._ts DESC" | |
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) | |
return items | |
def insert_record(container, record): | |
try: | |
container.create_item(body=record) | |
return True, "Record inserted successfully! π" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)} π¨" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {str(e)} π±" | |
def update_record(container, updated_record): | |
try: | |
container.upsert_item(body=updated_record) | |
return True, f"Record with id {updated_record['id']} successfully updated. π οΈ" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)} π¨" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {traceback.format_exc()} π±" | |
def delete_record(container, name, id): | |
try: | |
container.delete_item(item=id, partition_key=id) | |
return True, f"Successfully deleted record with name: {name} and id: {id} ποΈ" | |
except exceptions.CosmosResourceNotFoundError: | |
return False, f"Record with id {id} not found. It may have been already deleted. π΅οΈββοΈ" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)} π¨" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {traceback.format_exc()} π±" | |
import uuid | |
def generate_unique_id(): | |
return str(uuid.uuid4()) | |
def save_to_cosmos_db(container, query, response1, response2): | |
max_retries = 5 | |
base_wait_time = 0.1 # 100 ms | |
def generate_unique_id(): | |
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
unique_uuid = str(uuid.uuid4()) | |
return f"{timestamp}-{unique_uuid}" | |
for attempt in range(max_retries): | |
try: | |
if container: | |
base_id = generate_unique_id() | |
record = { | |
"id": base_id, | |
"query": query, | |
"response1": response1, | |
"response2": response2, | |
"timestamp": datetime.utcnow().isoformat() | |
} | |
# Check if document with this ID exists | |
try: | |
existing_doc = container.read_item(item=base_id, partition_key=base_id) | |
# If it exists, append a random string to the ID | |
record["id"] = f"{base_id}-{str(uuid.uuid4())[:8]}" | |
except exceptions.CosmosResourceNotFoundError: | |
# If it doesn't exist, we can use the original ID | |
pass | |
container.upsert_item(body=record) | |
st.success(f"Record saved successfully with ID: {record['id']}") | |
st.session_state.documents = container.query_items( | |
query="SELECT * FROM c ORDER BY c._ts DESC", | |
enable_cross_partition_query=True | |
) | |
return | |
else: | |
st.error("Cosmos DB container is not initialized.") | |
return | |
except exceptions.CosmosHttpResponseError as e: | |
if e.status_code == 409: # Conflict error | |
wait_time = (2 ** attempt) * base_wait_time | |
st.warning(f"ID conflict occurred. Retrying in {wait_time:.2f} seconds... (Attempt {attempt + 1})") | |
time.sleep(wait_time) | |
else: | |
st.error(f"Error saving record to Cosmos DB: {e}") | |
return | |
except Exception as e: | |
st.error(f"An unexpected error occurred: {str(e)}") | |
return | |
st.error("Failed to save record after maximum retries.") | |
# Add dropdowns for model and database choices | |
def search_glossary(query): | |
st.markdown(f"### π Search Glossary for: `{query}`") | |
# Dropdown for model selection | |
model_options = ['mistralai/Mixtral-8x7B-Instruct-v0.1', 'mistralai/Mistral-7B-Instruct-v0.2', 'google/gemma-7b-it', 'None'] | |
model_choice = st.selectbox('π§ Select LLM Model', options=model_options, index=1) | |
# Dropdown for database selection | |
database_options = ['Semantic Search', 'Arxiv Search - Latest - (EXPERIMENTAL)'] | |
database_choice = st.selectbox('π Select Database', options=database_options, index=0) | |
# Run Button with Emoji | |
#if st.button("π Run"): | |
# π΅οΈββοΈ Searching the glossary for: query | |
all_results = "" | |
st.markdown(f"- {query}") | |
# π ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM | |
#database_choice Literal['Semantic Search', 'Arxiv Search - Latest - (EXPERIMENTAL)'] Default: "Semantic Search" | |
#llm_model_picked Literal['mistralai/Mixtral-8x7B-Instruct-v0.1', 'mistralai/Mistral-7B-Instruct-v0.2', 'google/gemma-7b-it', 'None'] Default: "mistralai/Mistral-7B-Instruct-v0.2" | |
client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern") | |
# π ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /ask_llm | |
result = client.predict( | |
prompt=query, | |
llm_model_picked="mistralai/Mixtral-8x7B-Instruct-v0.1", | |
stream_outputs=True, | |
api_name="/ask_llm" | |
) | |
st.markdown(result) | |
st.code(result, language="python", line_numbers=True) | |
# π ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /ask_llm | |
result2 = client.predict( | |
prompt=query, | |
llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2", | |
stream_outputs=True, | |
api_name="/ask_llm" | |
) | |
st.markdown(result2) | |
st.code(result2, language="python", line_numbers=True) | |
# π ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /ask_llm | |
result3 = client.predict( | |
prompt=query, | |
llm_model_picked="google/gemma-7b-it", | |
stream_outputs=True, | |
api_name="/ask_llm" | |
) | |
st.markdown(result3) | |
st.code(result3, language="python", line_numbers=True) | |
# π ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /update_with_rag_md | |
response2 = client.predict( | |
message=query, # str in 'parameter_13' Textbox component | |
llm_results_use=10, | |
database_choice="Semantic Search", | |
llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2", | |
api_name="/update_with_rag_md" | |
) # update_with_rag_md Returns tuple of 2 elements [0] str The output value that appears in the "value_14" Markdown component. [1] str | |
st.markdown(response2[0]) | |
st.code(response2[0], language="python", line_numbers=True, wrap_lines=True) | |
st.markdown(response2[1]) | |
st.code(response2[1], language="python", line_numbers=True, wrap_lines=True) | |
# When saving results, pass the container | |
try: | |
save_to_cosmos_db(st.session_state.cosmos_container, query, result, result) | |
save_to_cosmos_db(st.session_state.cosmos_container, query, result2, result2) | |
save_to_cosmos_db(st.session_state.cosmos_container, query, result3, result3) | |
save_to_cosmos_db(st.session_state.cosmos_container, query, response2[0], response2[0]) | |
save_to_cosmos_db(st.session_state.cosmos_container, query, response2[1], response2[1]) | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)} π¨" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {str(e)} π±" | |
try: | |
# Aggregate hyperlinks and show with emojis | |
hyperlinks = extract_hyperlinks([response1, response2]) | |
st.markdown("### π Aggregated Hyperlinks") | |
for link in hyperlinks: | |
st.markdown(f"π [{link}]({link})") | |
# Show responses in a code format with line numbers | |
st.markdown("### π Response Outputs with Line Numbers") | |
st.code(f"Response 1: \n{format_with_line_numbers(response1)}\n\nResponse 2: \n{format_with_line_numbers(response2)}", language="json") | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)} π¨" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {str(e)} π±" | |
# π€ Function to process text input | |
def process_text(text_input): | |
# π€ Processing text inputβtranslating human words into cosmic signals! π‘ | |
if text_input: | |
if 'messages' not in st.session_state: | |
st.session_state.messages = [] | |
st.session_state.messages.append({"role": "user", "content": text_input}) | |
with st.chat_message("user"): | |
st.markdown(text_input) | |
with st.chat_message("assistant"): | |
search_glossary(text_input) | |
# π Function to generate a filename | |
def generate_filename(text, file_type): | |
# π Generate a filename based on the text input | |
safe_text = "".join(c if c.isalnum() or c in (' ', '.', '_') else '_' for c in text) | |
safe_text = "_".join(safe_text.strip().split()) | |
filename = f"{safe_text}.{file_type}" | |
return filename | |
# π΅οΈββοΈ Function to extract markdown title | |
def extract_markdown_title(content): | |
# π΅οΈββοΈ Extracting markdown titleβfinding the headline in the cosmic news! π° | |
lines = content.splitlines() | |
for line in lines: | |
if line.startswith('#'): | |
return line.lstrip('#').strip() | |
return None | |
# πΎ Function to create and save a file | |
def create_and_save_file(content, file_type="md", prompt=None, is_image=False, should_save=True): | |
# πΎ Creating and saving a fileβcapturing cosmic wisdom! π | |
if not should_save: | |
return None | |
# Step 1: Generate filename based on the prompt or content | |
filename = generate_filename(prompt if prompt else content, file_type) | |
# Step 2: If it's a markdown file, check if it has a title | |
if file_type == "md": | |
title_from_content = extract_markdown_title(content) | |
if title_from_content: | |
filename = generate_filename(title_from_content, file_type) | |
# Step 3: Save the file | |
with open(filename, "w", encoding="utf-8") as f: | |
if is_image: | |
f.write(content) | |
else: | |
f.write(prompt + "\n\n" + content) | |
return filename | |
# π€ Function to insert an auto-generated record | |
def insert_auto_generated_record(container): | |
# π€ Automatically generating a record and inserting it into Cosmos DB! | |
try: | |
# Generate a unique id | |
new_id = generate_unique_id() | |
# Create a sample JSON document | |
new_doc = { | |
'id': new_id, | |
'name': f'Sample Name {new_id[:8]}', | |
'description': 'This is a sample auto-generated description.', | |
'timestamp': datetime.utcnow().isoformat() | |
} | |
# Insert the document | |
container.create_item(body=new_doc) | |
return True, f"Record inserted successfully with id: {new_id} π" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)} π¨" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {str(e)} π±" | |
# GitHub functions | |
def download_github_repo(url, local_path): | |
if os.path.exists(local_path): | |
shutil.rmtree(local_path) | |
Repo.clone_from(url, local_path) | |
def create_zip_file(source_dir, output_filename): | |
shutil.make_archive(output_filename, 'zip', source_dir) | |
def create_repo(g, repo_name): | |
user = g.get_user() | |
return user.create_repo(repo_name) | |
def push_to_github(local_path, repo, github_token): | |
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" | |
local_repo = Repo(local_path) | |
if 'origin' in [remote.name for remote in local_repo.remotes]: | |
origin = local_repo.remote('origin') | |
origin.set_url(repo_url) | |
else: | |
origin = local_repo.create_remote('origin', repo_url) | |
if not local_repo.heads: | |
local_repo.git.checkout('-b', 'main') | |
current_branch = 'main' | |
else: | |
current_branch = local_repo.active_branch.name | |
local_repo.git.add(A=True) | |
if local_repo.is_dirty(): | |
local_repo.git.commit('-m', 'Initial commit') | |
origin.push(refspec=f'{current_branch}:{current_branch}') | |
# Main function | |
def main(): | |
st.title("πGitπCosmosπ« - Azure Cosmos DB and Github Agent") | |
# Initialize session state | |
if 'logged_in' not in st.session_state: | |
st.session_state.logged_in = False | |
if 'selected_records' not in st.session_state: | |
st.session_state.selected_records = [] | |
if 'client' not in st.session_state: | |
st.session_state.client = None | |
if 'selected_database' not in st.session_state: | |
st.session_state.selected_database = None | |
if 'selected_container' not in st.session_state: | |
st.session_state.selected_container = None | |
if 'selected_document_id' not in st.session_state: | |
st.session_state.selected_document_id = None | |
if 'current_index' not in st.session_state: | |
st.session_state.current_index = 0 | |
if 'cloned_doc' not in st.session_state: | |
st.session_state.cloned_doc = None | |
# βοΈ q= Run ArXiv search from query parameters | |
try: | |
query_params = st.query_params | |
query = query_params.get('q') or query_params.get('query') or '' | |
if query: | |
# π΅οΈββοΈ We have a query! Let's process it! | |
process_text(query) | |
st.stop() # Stop further execution | |
except Exception as e: | |
st.markdown(' ') | |
# Automatic Login | |
if Key: | |
st.session_state.primary_key = Key | |
st.session_state.logged_in = True | |
else: | |
st.error("Cosmos DB Key is not set in environment variables. πβ") | |
return | |
if st.session_state.logged_in: | |
# Initialize Cosmos DB client | |
try: | |
if st.session_state.client is None: | |
st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) | |
# Sidebar for database, container, and document selection | |
st.sidebar.title("πGitπCosmosπ«ποΈNavigator") | |
databases = get_databases(st.session_state.client) | |
selected_db = st.sidebar.selectbox("ποΈ Select Database", databases) | |
if selected_db != st.session_state.selected_database: | |
st.session_state.selected_database = selected_db | |
st.session_state.selected_container = None | |
st.session_state.selected_document_id = None | |
st.session_state.current_index = 0 | |
st.rerun() | |
if st.session_state.selected_database: | |
database = st.session_state.client.get_database_client(st.session_state.selected_database) | |
containers = get_containers(database) | |
selected_container = st.sidebar.selectbox("π Select Container", containers) | |
if selected_container != st.session_state.selected_container: | |
st.session_state.selected_container = selected_container | |
st.session_state.selected_document_id = None | |
st.session_state.current_index = 0 | |
st.rerun() | |
if st.session_state.selected_container: | |
container = database.get_container_client(st.session_state.selected_container) | |
# Add Export button | |
if st.button("π¦ Export Container Data"): | |
download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client) | |
if download_link.startswith('<a'): | |
st.markdown(download_link, unsafe_allow_html=True) | |
else: | |
st.error(download_link) | |
# Fetch documents | |
documents = get_documents(container) | |
total_docs = len(documents) | |
if total_docs > 5: | |
documents_to_display = documents[:5] | |
st.info("Showing top 5 most recent documents.") | |
else: | |
documents_to_display = documents | |
st.info(f"Showing all {len(documents_to_display)} documents.") | |
if documents_to_display: | |
# Add Viewer/Editor selection | |
view_options = ['Show as Markdown', 'Show as Code Editor', 'Show as Edit and Save', 'Clone Document', 'New Record'] | |
selected_view = st.selectbox("Select Viewer/Editor", view_options, index=2) | |
if selected_view == 'Show as Markdown': | |
# Show each record as Markdown with navigation | |
total_docs = len(documents) | |
doc = documents[st.session_state.current_index] | |
st.markdown(f"#### Document ID: {doc.get('id', '')}") | |
# Extract values from the JSON that have at least one space | |
values_with_space = [] | |
def extract_values(obj): | |
if isinstance(obj, dict): | |
for k, v in obj.items(): | |
extract_values(v) | |
elif isinstance(obj, list): | |
for item in obj: | |
extract_values(item) | |
elif isinstance(obj, str): | |
if ' ' in obj: | |
values_with_space.append(obj) | |
extract_values(doc) | |
# Create a list of links for these values | |
st.markdown("#### π Links for Extracted Texts") | |
for term in values_with_space: | |
display_glossary_entity(term) | |
# Show the document content as markdown | |
content = json.dumps(doc, indent=2) | |
st.markdown(f"```json\n{content}\n```") | |
# Navigation buttons | |
col_prev, col_next = st.columns([1, 1]) | |
with col_prev: | |
if st.button("β¬ οΈ Previous", key='prev_markdown'): | |
if st.session_state.current_index > 0: | |
st.session_state.current_index -= 1 | |
st.rerun() | |
with col_next: | |
if st.button("β‘οΈ Next", key='next_markdown'): | |
if st.session_state.current_index < total_docs - 1: | |
st.session_state.current_index += 1 | |
st.rerun() | |
elif selected_view == 'Show as Code Editor': | |
# Show each record in a code editor with navigation | |
total_docs = len(documents) | |
doc = documents[st.session_state.current_index] | |
st.markdown(f"#### Document ID: {doc.get('id', '')}") | |
doc_str = st.text_area("Edit Document", value=json.dumps(doc, indent=2), height=300, key=f'code_editor_{st.session_state.current_index}') | |
col_prev, col_next = st.columns([1, 1]) | |
with col_prev: | |
if st.button("β¬ οΈ Previous", key='prev_code'): | |
if st.session_state.current_index > 0: | |
st.session_state.current_index -= 1 | |
st.rerun() | |
with col_next: | |
if st.button("β‘οΈ Next", key='next_code'): | |
if st.session_state.current_index < total_docs - 1: | |
st.session_state.current_index += 1 | |
st.rerun() | |
if st.button("πΎ Save Changes", key=f'save_button_{st.session_state.current_index}'): | |
try: | |
updated_doc = json.loads(doc_str) | |
success, message = update_record(container, updated_doc) | |
if success: | |
st.success(f"Document {updated_doc['id']} saved successfully.") | |
st.session_state.selected_document_id = updated_doc['id'] | |
st.rerun() | |
else: | |
st.error(message) | |
except json.JSONDecodeError as e: | |
st.error(f"Invalid JSON: {str(e)} π«") | |
elif selected_view == 'Show as Edit and Save': | |
# Show as Edit and Save in columns | |
st.markdown("#### Edit the document fields below:") | |
# Create columns for each document | |
num_cols = len(documents_to_display) | |
cols = st.columns(num_cols) | |
for idx, (col, doc) in enumerate(zip(cols, documents_to_display)): | |
with col: | |
st.markdown(f"##### Document ID: {doc.get('id', '')}") | |
editable_id = st.text_input("ID", value=doc.get('id', ''), key=f'edit_id_{idx}') | |
# Remove 'id' from the document for editing other fields | |
editable_doc = doc.copy() | |
editable_doc.pop('id', None) | |
doc_str = st.text_area("Document Content (in JSON format)", value=json.dumps(editable_doc, indent=2), height=300, key=f'doc_str_{idx}') | |
# Add the "Run With AI" button next to "Save Changes" | |
col_save, col_ai = st.columns(2) | |
with col_save: | |
if st.button("πΎ Save Changes", key=f'save_button_{idx}'): | |
try: | |
updated_doc = json.loads(doc_str) | |
updated_doc['id'] = editable_id # Include the possibly edited ID | |
success, message = update_record(container, updated_doc) | |
if success: | |
st.success(f"Document {updated_doc['id']} saved successfully.") | |
st.session_state.selected_document_id = updated_doc['id'] | |
st.rerun() | |
else: | |
st.error(message) | |
except json.JSONDecodeError as e: | |
st.error(f"Invalid JSON: {str(e)} π«") | |
with col_ai: | |
if st.button("π€ Run With AI", key=f'run_with_ai_button_{idx}'): | |
# Use the entire document as input | |
search_glossary(json.dumps(editable_doc, indent=2)) | |
elif selected_view == 'Clone Document': | |
# Clone Document per record | |
st.markdown("#### Clone a document:") | |
for idx, doc in enumerate(documents_to_display): | |
st.markdown(f"##### Document ID: {doc.get('id', '')}") | |
if st.button("π Clone Document", key=f'clone_button_{idx}'): | |
cloned_doc = doc.copy() | |
# Generate a unique ID | |
cloned_doc['id'] = generate_unique_id() | |
st.session_state.cloned_doc = cloned_doc | |
st.session_state.cloned_doc_str = json.dumps(cloned_doc, indent=2) | |
st.session_state.clone_mode = True | |
st.rerun() | |
if st.session_state.get('clone_mode', False): | |
st.markdown("#### Edit Cloned Document:") | |
cloned_doc_str = st.text_area("Cloned Document Content (in JSON format)", value=st.session_state.cloned_doc_str, height=300) | |
if st.button("πΎ Save Cloned Document"): | |
try: | |
new_doc = json.loads(cloned_doc_str) | |
success, message = insert_record(container, new_doc) | |
if success: | |
st.success(f"Cloned document saved with id: {new_doc['id']} π") | |
st.session_state.selected_document_id = new_doc['id'] | |
st.session_state.clone_mode = False | |
st.session_state.cloned_doc = None | |
st.session_state.cloned_doc_str = '' | |
st.rerun() | |
else: | |
st.error(message) | |
except json.JSONDecodeError as e: | |
st.error(f"Invalid JSON: {str(e)} π«") | |
elif selected_view == 'New Record': | |
# New Record | |
st.markdown("#### Create a new document:") | |
if st.button("π€ Insert Auto-Generated Record"): | |
success, message = insert_auto_generated_record(container) | |
if success: | |
st.success(message) | |
st.rerun() | |
else: | |
st.error(message) | |
else: | |
new_id = st.text_input("ID", value=generate_unique_id(), key='new_id') | |
new_doc_str = st.text_area("Document Content (in JSON format)", value='{}', height=300) | |
if st.button("β Create New Document"): | |
try: | |
new_doc = json.loads(new_doc_str) | |
new_doc['id'] = new_id # Use the provided ID | |
success, message = insert_record(container, new_doc) | |
if success: | |
st.success(f"New document created with id: {new_doc['id']} π") | |
st.session_state.selected_document_id = new_doc['id'] | |
# Switch to 'Show as Edit and Save' mode | |
st.rerun() | |
else: | |
st.error(message) | |
except json.JSONDecodeError as e: | |
st.error(f"Invalid JSON: {str(e)} π«") | |
else: | |
st.sidebar.info("No documents found in this container. π") | |
# Main content area | |
st.subheader(f"π Container: {st.session_state.selected_container}") | |
if st.session_state.selected_container: | |
if documents_to_display: | |
df = pd.DataFrame(documents_to_display) | |
st.dataframe(df) | |
else: | |
st.info("No documents to display. π§") | |
# GitHub section | |
st.subheader("π GitHub Operations") | |
github_token = os.environ.get("GITHUB") # Read GitHub token from environment variable | |
source_repo = st.text_input("Source GitHub Repository URL", value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit") | |
new_repo_name = st.text_input("New Repository Name (for cloning)", value=f"AIExample-Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}") | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("π₯ Clone Repository"): | |
if github_token and source_repo: | |
try: | |
local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
download_github_repo(source_repo, local_path) | |
zip_filename = f"{new_repo_name}.zip" | |
create_zip_file(local_path, zip_filename[:-4]) | |
st.markdown(get_download_link(zip_filename), unsafe_allow_html=True) | |
st.success("Repository cloned successfully! π") | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)} π’") | |
finally: | |
if os.path.exists(local_path): | |
shutil.rmtree(local_path) | |
if os.path.exists(zip_filename): | |
os.remove(zip_filename) | |
else: | |
st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided. πβ") | |
with col2: | |
if st.button("π€ Push to New Repository"): | |
if github_token and source_repo: | |
try: | |
g = Github(github_token) | |
new_repo = create_repo(g, new_repo_name) | |
local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
download_github_repo(source_repo, local_path) | |
push_to_github(local_path, new_repo, github_token) | |
st.success(f"Repository pushed successfully to {new_repo.html_url} π") | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)} π’") | |
finally: | |
if os.path.exists(local_path): | |
shutil.rmtree(local_path) | |
else: | |
st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided. πβ") | |
# Chat with Claude | |
st.subheader("π¬ Chat with Claude") | |
user_input = st.text_area("Message π¨:", height=100) | |
if st.button("Send π¨"): | |
if user_input: | |
response = client.messages.create( | |
model="claude-3-sonnet-20240229", | |
max_tokens=1000, | |
messages=[ | |
{"role": "user", "content": user_input} | |
] | |
) | |
st.write("Claude's reply π§ :") | |
st.write(response.content[0].text) | |
filename = generate_filename(user_input, "md") | |
create_file(filename, user_input, response.content[0].text) | |
st.session_state.chat_history.append({"user": user_input, "claude": response.content[0].text}) | |
# Save to Cosmos DB | |
save_to_cosmos_db(container, user_input, response.content[0].text, "") | |
# Display Chat History | |
st.subheader("Past Conversations π") | |
for chat in st.session_state.chat_history: | |
st.text_area("You said π¬:", chat["user"], height=100, disabled=True) | |
st.text_area("Claude replied π€:", chat["claude"], height=200, disabled=True) | |
st.markdown("---") | |
# File Editor | |
if hasattr(st.session_state, 'current_file'): | |
st.subheader(f"Editing: {st.session_state.current_file} π ") | |
new_content = st.text_area("File Content βοΈ:", st.session_state.file_content, height=300) | |
if st.button("Save Changes πΎ"): | |
with open(st.session_state.current_file, 'w', encoding='utf-8') as file: | |
file.write(new_content) | |
st.success("File updated successfully! π") | |
# File Management | |
st.sidebar.title("π File Management") | |
all_files = glob.glob("*.md") | |
all_files.sort(reverse=True) | |
if st.sidebar.button("π Delete All Files"): | |
for file in all_files: | |
os.remove(file) | |
st.rerun() | |
if st.sidebar.button("β¬οΈ Download All Files"): | |
zip_file = create_zip_of_files(all_files) | |
st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True) | |
for file in all_files: | |
col1, col2, col3, col4 = st.sidebar.columns([1,3,1,1]) | |
with col1: | |
if st.button("π", key="view_"+file): | |
st.session_state.current_file = file | |
st.session_state.file_content = load_file(file) | |
with col2: | |
st.markdown(get_download_link(file), unsafe_allow_html=True) | |
with col3: | |
if st.button("π", key="edit_"+file): | |
st.session_state.current_file = file | |
st.session_state.file_content = load_file(file) | |
with col4: | |
if st.button("π", key="delete_"+file): | |
os.remove(file) | |
st.rerun() | |
except exceptions.CosmosHttpResponseError as e: | |
st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)} π¨") | |
except Exception as e: | |
st.error(f"An unexpected error occurred: {str(e)} π±") | |
# Logout button | |
if st.session_state.logged_in and st.sidebar.button("πͺ Logout"): | |
st.session_state.logged_in = False | |
st.session_state.selected_records.clear() | |
st.session_state.client = None | |
st.session_state.selected_database = None | |
st.session_state.selected_container = None | |
st.session_state.selected_document_id = None | |
st.session_state.current_index = 0 | |
st.rerun() | |
if __name__ == "__main__": | |
main() |