Spaces:
Sleeping
Sleeping
import streamlit as st | |
import random | |
import os | |
import time | |
# from google.cloud import storage | |
from Astronomy_BH_hybrid_RAG import get_query | |
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "tidy-resolver-411707-0f032726c297.json" | |
# def download_blob(bucket_name, source_blob_name, destination_file_name): | |
# """Downloads a blob from the bucket.""" | |
# storage_client = storage.Client() | |
# bucket = storage_client.bucket(bucket_name) | |
# blob = bucket.blob(source_blob_name) | |
# blob.download_to_filename(destination_file_name) | |
# print(f"Downloaded storage object {source_blob_name} from bucket {bucket_name} to local file {destination_file_name}.") | |
# if not (os.path.exists("storage_file/bm25") and os.path.exists("storage_file/kg")): | |
# # List of file names to download | |
# file_names = [ | |
# "default__vector_store.json", | |
# "docstore.json", | |
# "graph_store.json", | |
# "image__vector_store.json", | |
# "index_store.json" | |
# ] | |
# # Bucket name | |
# bucket_name = "title_tailors_bucket" | |
# # Create the destination directory if it doesn't exist | |
# os.makedirs("storage_file/bm25", exist_ok=True) | |
# # Loop through the file names and download each one | |
# for file_name in file_names: | |
# source_blob_name = f"storage/bm25/{file_name}" | |
# destination_file_name = f"storage_file/bm25/{file_name}" | |
# download_blob(bucket_name, source_blob_name, destination_file_name) | |
# # List of file names to download | |
# file_names = [ | |
# "default__vector_store.json", | |
# "docstore.json", | |
# "graph_store.json", | |
# "image__vector_store.json", | |
# "index_store.json" | |
# ] | |
# # Bucket name | |
# bucket_name = "title_tailors_bucket" | |
# # Create the destination directory if it doesn't exist | |
# os.makedirs("storage_file/kg", exist_ok=True) | |
# # Loop through the file names and download each one | |
# for file_name in file_names: | |
# source_blob_name = f"storage/kg/{file_name}" | |
# destination_file_name = f"storage_file/kg/{file_name}" | |
# download_blob(bucket_name, source_blob_name, destination_file_name) | |
# else: | |
# print("Files already exist in the storage_file directory.") | |
# Streamed response emulator | |
def response_generator(text): | |
output = get_query(text) | |
responses = { | |
"Knowledge Graph Response" : output[1], | |
"Dense + BM25 without KG Response" : output[2], | |
"Dense + BM25 with KG Response" : output[2] | |
} | |
return responses | |
st.title("Context-aware Astronomy ChatBot") | |
# Initialize chat history | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
# Display chat messages from history on app rerun | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
# Accept user input | |
if prompt := st.chat_input("What is up?"): | |
# Add user message to chat history | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
# Display user message in chat message container | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
# Generate response | |
response_dict = response_generator(prompt) | |
# Display each part of the response in separate markdown blocks with headings | |
with st.chat_message("assistant"): | |
for key, value in response_dict.items(): | |
st.markdown(f"### {key}") | |
st.markdown(value) | |
# Add assistant response to chat history | |
for key, value in response_dict.items(): | |
st.session_state.messages.append({"role": "assistant", "content": f"### {key}\n{value}"}) | |