import streamlit as st
from azure.cosmos import CosmosClient, exceptions
import os
import pandas as pd
import traceback
import shutil
from github import Github
from git import Repo
from datetime import datetime
import base64
import json
import uuid # ๐ฒ For generating unique IDs
from urllib.parse import quote # ๐ For encoding URLs
from gradio_client import Client # ๐ For connecting to Gradio apps
# ๐ Welcome to our fun-filled Cosmos DB and GitHub Integration app!
st.set_page_config(layout="wide")
# ๐ Cosmos DB configuration
ENDPOINT = "https://acae-afd.documents.azure.com:443/"
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME")
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME")
Key = os.environ.get("Key") # ๐ Don't forget your key!
# ๐ Your local app URL (Change this to your app's URL)
LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI"
# ๐ GitHub configuration
def download_github_repo(url, local_path):
# ๐ Let's download that GitHub repo!
if os.path.exists(local_path):
shutil.rmtree(local_path)
Repo.clone_from(url, local_path)
def create_zip_file(source_dir, output_filename):
# ๐ฆ Zipping up files like a pro!
shutil.make_archive(output_filename, 'zip', source_dir)
def create_repo(g, repo_name):
# ๐ ๏ธ Creating a new GitHub repo. It's alive!
user = g.get_user()
return user.create_repo(repo_name)
def push_to_github(local_path, repo, github_token):
# ๐ Pushing code to GitHub. Blast off!
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git"
local_repo = Repo(local_path)
if 'origin' in [remote.name for remote in local_repo.remotes]:
origin = local_repo.remote('origin')
origin.set_url(repo_url)
else:
origin = local_repo.create_remote('origin', repo_url)
if not local_repo.heads:
local_repo.git.checkout('-b', 'main')
current_branch = 'main'
else:
current_branch = local_repo.active_branch.name
local_repo.git.add(A=True)
if local_repo.is_dirty():
local_repo.git.commit('-m', 'Initial commit')
origin.push(refspec=f'{current_branch}:{current_branch}')
def get_base64_download_link(file_path, file_name):
# ๐งโโ๏ธ Generating a magical download link!
with open(file_path, "rb") as file:
contents = file.read()
base64_encoded = base64.b64encode(contents).decode()
return f'โฌ๏ธ Download {file_name}'
# ๐งญ Cosmos DB Functions
def get_databases(client):
# ๐ Fetching list of databases!
return [db['id'] for db in client.list_databases()]
def get_containers(database):
# ๐ Listing containers within the selected database
return [container['id'] for container in database.list_containers()]
def get_documents(container, limit=None):
# ๐ Fetching documents from Cosmos DB
query = "SELECT * FROM c ORDER BY c._ts DESC"
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit))
return items
def insert_record(container, record):
# ๐ฅ Inserting a record into the Cosmos DB
try:
container.create_item(body=record)
return True, "Record inserted successfully! ๐"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)} ๐จ"
except Exception as e:
return False, f"An unexpected error occurred: {str(e)} ๐ฑ"
def update_record(container, updated_record):
# ๐ ๏ธ Updating a record in the Cosmos DB
try:
container.upsert_item(body=updated_record)
return True, f"Record with id {updated_record['id']} successfully updated. ๐ ๏ธ"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)} ๐จ"
except Exception as e:
return False, f"An unexpected error occurred: {traceback.format_exc()} ๐ฑ"
def delete_record(container, name, id):
# ๐๏ธ Deleting a record in Cosmos DB
try:
container.delete_item(item=id, partition_key=id)
return True, f"Successfully deleted record with name: {name} and id: {id} ๐๏ธ"
except exceptions.CosmosResourceNotFoundError:
return False, f"Record with id {id} not found. It may have been already deleted. ๐ต๏ธโโ๏ธ"
except exceptions.CosmosHttpResponseError as e:
return False, f"HTTP error occurred: {str(e)} ๐จ"
except Exception as e:
return False, f"An unexpected error occurred: {traceback.format_exc()} ๐ฑ"
def archive_current_container(database_name, container_name, client):
# ๐ฆ Archiving the container data from Cosmos DB
try:
base_dir = "./cosmos_archive_current_container"
if os.path.exists(base_dir):
shutil.rmtree(base_dir)
os.makedirs(base_dir)
db_client = client.get_database_client(database_name)
container_client = db_client.get_container_client(container_name)
items = list(container_client.read_all_items())
container_dir = os.path.join(base_dir, container_name)
os.makedirs(container_dir)
for item in items:
item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}")
with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f:
json.dump(item, f, indent=2)
archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}"
shutil.make_archive(archive_name, 'zip', base_dir)
return get_base64_download_link(f"{archive_name}.zip", f"{archive_name}.zip")
except Exception as e:
return f"An error occurred while archiving data: {str(e)} ๐ข"
# ๐ฒ Function to generate a unique UUID
def generate_unique_id():
# ๐ฎ Generate a unique ID for new records
return str(uuid.uuid4())
# ๐ Helper function for hyperlink extraction
def extract_hyperlinks(responses):
# ๐ Extracting hyperlinks from response
hyperlinks = []
for response in responses:
parsed_response = json.loads(response)
links = [value for key, value in parsed_response.items() if isinstance(value, str) and value.startswith("http")]
hyperlinks.extend(links)
return hyperlinks
# ๐ Helper function to format text with line numbers
def format_with_line_numbers(text):
# ๐ Organizing lines with numbers
lines = text.splitlines()
formatted_text = '\n'.join(f"{i+1}: {line}" for i, line in enumerate(lines))
return formatted_text
# ๐ค Function to process text input
def process_text(text_input):
# ๐ค Processing input for queries or searches
if text_input:
if 'messages' not in st.session_state:
st.session_state.messages = []
st.session_state.messages.append({"role": "user", "content": text_input})
with st.chat_message("user"):
st.markdown(text_input)
with st.chat_message("assistant"):
search_glossary(text_input)
# ๐ Main function to drive the application
def main():
# ๐ Let's kickstart the app with a fun introduction!
st.title("๐Git๐Cosmos๐ซ - Azure Cosmos DB and Github Agent")
# ๐ฆ Initialize session state for managing components
if 'logged_in' not in st.session_state:
st.session_state.logged_in = False
if 'client' not in st.session_state:
st.session_state.client = None
if 'selected_database' not in st.session_state:
st.session_state.selected_database = None
if 'selected_container' not in st.session_state:
st.session_state.selected_container = None
# ๐ Login management
if Key:
st.session_state.primary_key = Key
st.session_state.logged_in = True
else:
st.error("Cosmos DB Key is not set in environment variables. ๐โ")
return
if st.session_state.logged_in:
try:
# ๐ Initialize Cosmos DB client
if st.session_state.client is None:
st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key)
# ๐๏ธ Sidebar for database and container selection
st.sidebar.title("๐๏ธ Cosmos DB Navigator")
databases = get_databases(st.session_state.client)
selected_db = st.sidebar.selectbox("๐๏ธ Select Database", databases)
if selected_db != st.session_state.selected_database:
st.session_state.selected_database = selected_db
st.session_state.selected_container = None
st.rerun()
if st.session_state.selected_database:
database = st.session_state.client.get_database_client(st.session_state.selected_database)
containers = get_containers(database)
selected_container = st.sidebar.selectbox("๐ Select Container", containers)
if selected_container != st.session_state.selected_container:
st.session_state.selected_container = selected_container
st.rerun()
if st.session_state.selected_container:
container = database.get_container_client(st.session_state.selected_container)
# ๐ฆ Export container data
if st.button("๐ฆ Export Container Data"):
download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client)
if download_link.startswith('