Spaces:
Running
Running
import streamlit as st | |
from azure.cosmos import CosmosClient, exceptions | |
import os | |
import pandas as pd | |
import traceback | |
import shutil | |
from github import Github | |
from git import Repo | |
from datetime import datetime | |
import base64 | |
import json | |
import uuid # ๐ฒ For generating unique IDs | |
from urllib.parse import quote # ๐ For encoding URLs | |
from gradio_client import Client # ๐ For connecting to Gradio apps | |
# ๐ Welcome to our fun-filled Cosmos DB and GitHub Integration app! | |
st.set_page_config(layout="wide") | |
# ๐ Cosmos DB configuration | |
ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") | |
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") | |
Key = os.environ.get("Key") # ๐ Don't forget your key! | |
# ๐ Your local app URL (Change this to your app's URL) | |
LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI" | |
# ๐ GitHub configuration | |
def download_github_repo(url, local_path): | |
# ๐ Let's download that GitHub repo! | |
if os.path.exists(local_path): | |
shutil.rmtree(local_path) | |
Repo.clone_from(url, local_path) | |
def create_zip_file(source_dir, output_filename): | |
# ๐ฆ Zipping up files like a pro! | |
shutil.make_archive(output_filename, 'zip', source_dir) | |
def create_repo(g, repo_name): | |
# ๐ ๏ธ Creating a new GitHub repo. It's alive! | |
user = g.get_user() | |
return user.create_repo(repo_name) | |
def push_to_github(local_path, repo, github_token): | |
# ๐ Pushing code to GitHub. Blast off! | |
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" | |
local_repo = Repo(local_path) | |
if 'origin' in [remote.name for remote in local_repo.remotes]: | |
origin = local_repo.remote('origin') | |
origin.set_url(repo_url) | |
else: | |
origin = local_repo.create_remote('origin', repo_url) | |
if not local_repo.heads: | |
local_repo.git.checkout('-b', 'main') | |
current_branch = 'main' | |
else: | |
current_branch = local_repo.active_branch.name | |
local_repo.git.add(A=True) | |
if local_repo.is_dirty(): | |
local_repo.git.commit('-m', 'Initial commit') | |
origin.push(refspec=f'{current_branch}:{current_branch}') | |
def get_base64_download_link(file_path, file_name): | |
# ๐งโโ๏ธ Generating a magical download link! | |
with open(file_path, "rb") as file: | |
contents = file.read() | |
base64_encoded = base64.b64encode(contents).decode() | |
return f'<a href="data:application/zip;base64,{base64_encoded}" download="{file_name}">โฌ๏ธ Download {file_name}</a>' | |
# ๐งญ Cosmos DB Functions | |
def get_databases(client): | |
# ๐ Fetching list of databases! | |
return [db['id'] for db in client.list_databases()] | |
def get_containers(database): | |
# ๐ Listing containers within the selected database | |
return [container['id'] for container in database.list_containers()] | |
def get_documents(container, limit=None): | |
# ๐ Fetching documents from Cosmos DB | |
query = "SELECT * FROM c ORDER BY c._ts DESC" | |
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) | |
return items | |
def insert_record(container, record): | |
# ๐ฅ Inserting a record into the Cosmos DB | |
try: | |
container.create_item(body=record) | |
return True, "Record inserted successfully! ๐" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)} ๐จ" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {str(e)} ๐ฑ" | |
def update_record(container, updated_record): | |
# ๐ ๏ธ Updating a record in the Cosmos DB | |
try: | |
container.upsert_item(body=updated_record) | |
return True, f"Record with id {updated_record['id']} successfully updated. ๐ ๏ธ" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)} ๐จ" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {traceback.format_exc()} ๐ฑ" | |
def delete_record(container, name, id): | |
# ๐๏ธ Deleting a record in Cosmos DB | |
try: | |
container.delete_item(item=id, partition_key=id) | |
return True, f"Successfully deleted record with name: {name} and id: {id} ๐๏ธ" | |
except exceptions.CosmosResourceNotFoundError: | |
return False, f"Record with id {id} not found. It may have been already deleted. ๐ต๏ธโโ๏ธ" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)} ๐จ" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {traceback.format_exc()} ๐ฑ" | |
def archive_current_container(database_name, container_name, client): | |
# ๐ฆ Archiving the container data from Cosmos DB | |
try: | |
base_dir = "./cosmos_archive_current_container" | |
if os.path.exists(base_dir): | |
shutil.rmtree(base_dir) | |
os.makedirs(base_dir) | |
db_client = client.get_database_client(database_name) | |
container_client = db_client.get_container_client(container_name) | |
items = list(container_client.read_all_items()) | |
container_dir = os.path.join(base_dir, container_name) | |
os.makedirs(container_dir) | |
for item in items: | |
item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") | |
with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: | |
json.dump(item, f, indent=2) | |
archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
shutil.make_archive(archive_name, 'zip', base_dir) | |
return get_base64_download_link(f"{archive_name}.zip", f"{archive_name}.zip") | |
except Exception as e: | |
return f"An error occurred while archiving data: {str(e)} ๐ข" | |
# ๐ฒ Function to generate a unique UUID | |
def generate_unique_id(): | |
# ๐ฎ Generate a unique ID for new records | |
return str(uuid.uuid4()) | |
# ๐ Helper function for hyperlink extraction | |
def extract_hyperlinks(responses): | |
# ๐ Extracting hyperlinks from response | |
hyperlinks = [] | |
for response in responses: | |
parsed_response = json.loads(response) | |
links = [value for key, value in parsed_response.items() if isinstance(value, str) and value.startswith("http")] | |
hyperlinks.extend(links) | |
return hyperlinks | |
# ๐ Helper function to format text with line numbers | |
def format_with_line_numbers(text): | |
# ๐ Organizing lines with numbers | |
lines = text.splitlines() | |
formatted_text = '\n'.join(f"{i+1}: {line}" for i, line in enumerate(lines)) | |
return formatted_text | |
# ๐ค Function to process text input | |
def process_text(text_input): | |
# ๐ค Processing input for queries or searches | |
if text_input: | |
if 'messages' not in st.session_state: | |
st.session_state.messages = [] | |
st.session_state.messages.append({"role": "user", "content": text_input}) | |
with st.chat_message("user"): | |
st.markdown(text_input) | |
with st.chat_message("assistant"): | |
search_glossary(text_input) | |
# ๐ Main function to drive the application | |
def main(): | |
# ๐ Let's kickstart the app with a fun introduction! | |
st.title("๐Git๐Cosmos๐ซ - Azure Cosmos DB and Github Agent") | |
# ๐ฆ Initialize session state for managing components | |
if 'logged_in' not in st.session_state: | |
st.session_state.logged_in = False | |
if 'client' not in st.session_state: | |
st.session_state.client = None | |
if 'selected_database' not in st.session_state: | |
st.session_state.selected_database = None | |
if 'selected_container' not in st.session_state: | |
st.session_state.selected_container = None | |
# ๐ Login management | |
if Key: | |
st.session_state.primary_key = Key | |
st.session_state.logged_in = True | |
else: | |
st.error("Cosmos DB Key is not set in environment variables. ๐โ") | |
return | |
if st.session_state.logged_in: | |
try: | |
# ๐ Initialize Cosmos DB client | |
if st.session_state.client is None: | |
st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) | |
# ๐๏ธ Sidebar for database and container selection | |
st.sidebar.title("๐๏ธ Cosmos DB Navigator") | |
databases = get_databases(st.session_state.client) | |
selected_db = st.sidebar.selectbox("๐๏ธ Select Database", databases) | |
if selected_db != st.session_state.selected_database: | |
st.session_state.selected_database = selected_db | |
st.session_state.selected_container = None | |
st.rerun() | |
if st.session_state.selected_database: | |
database = st.session_state.client.get_database_client(st.session_state.selected_database) | |
containers = get_containers(database) | |
selected_container = st.sidebar.selectbox("๐ Select Container", containers) | |
if selected_container != st.session_state.selected_container: | |
st.session_state.selected_container = selected_container | |
st.rerun() | |
if st.session_state.selected_container: | |
container = database.get_container_client(st.session_state.selected_container) | |
# ๐ฆ Export container data | |
if st.button("๐ฆ Export Container Data"): | |
download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client) | |
if download_link.startswith('<a'): | |
st.markdown(download_link, unsafe_allow_html=True) | |
else: | |
st.error(download_link) | |
# ๐ Display documents | |
documents = get_documents(container) | |
st.subheader(f"๐ Container: {st.session_state.selected_container}") | |
if documents: | |
df = pd.DataFrame(documents) | |
st.dataframe(df) | |
else: | |
st.info("No documents to display. ๐ง") | |
# ๐ GitHub operations | |
st.subheader("๐ GitHub Operations") | |
github_token = os.environ.get("GITHUB") # GitHub token for authentication | |
source_repo = st.text_input("Source GitHub Repository URL", value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit") | |
new_repo_name = st.text_input("New Repository Name", value=f"AIExample-Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}") | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("๐ฅ Clone Repository"): | |
if github_token and source_repo: | |
local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
download_github_repo(source_repo, local_path) | |
zip_filename = f"{new_repo_name}.zip" | |
create_zip_file(local_path, zip_filename[:-4]) | |
st.markdown(get_base64_download_link(zip_filename, zip_filename), unsafe_allow_html=True) | |
st.success("Repository cloned successfully! ๐") | |
shutil.rmtree(local_path) | |
else: | |
st.error("Please provide GitHub token and source repo URL.") | |
with col2: | |
if st.button("๐ค Push to New Repository"): | |
if github_token and source_repo: | |
g = Github(github_token) | |
new_repo = create_repo(g, new_repo_name) | |
local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
download_github_repo(source_repo, local_path) | |
push_to_github(local_path, new_repo, github_token) | |
st.success(f"Repository pushed successfully to {new_repo.html_url} ๐") | |
shutil.rmtree(local_path) | |
else: | |
st.error("Please provide GitHub token and source repo URL.") | |
except exceptions.CosmosHttpResponseError as e: | |
st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)} ๐จ") | |
except Exception as e: | |
st.error(f"An unexpected error occurred: {str(e)} ๐ฑ") | |
# ๐ช Logout option | |
if st.session_state.logged_in and st.sidebar.button("๐ช Logout"): | |
st.session_state.logged_in = False | |
st.session_state.client = None | |
st.rerun() | |
if __name__ == "__main__": | |
main() | |