Spaces:
Sleeping
Sleeping
import os | |
from dotenv import load_dotenv | |
from langchain_community.utilities import SQLDatabase | |
load_dotenv() | |
db_uri = os.getenv("DB_CONNECTION_STRING") | |
db_final = SQLDatabase.from_uri(db_uri) | |
import gradio as gr | |
from file_upload import FileHandler | |
from chat import ChatHandler | |
# Initialize the FileHandler inline | |
VECTOR_DB_PATH = os.getenv("VECTOR_DB_PATH_DB") | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
GROK_API_KEY = os.getenv("GROK_API_KEY") | |
# Initialize FileHandler and ChatHandler | |
file_handler = FileHandler(VECTOR_DB_PATH, OPENAI_API_KEY, GROK_API_KEY) | |
chat_handler = ChatHandler(VECTOR_DB_PATH, OPENAI_API_KEY, GROK_API_KEY, db_final) | |
# Chat history | |
chat_history = [] | |
# File Upload and Processing Function | |
def handle_file_uploads(files): | |
try: | |
if not files: | |
return "Please upload a file.", chat_history | |
file_name = files.name # Retrieve the name of the uploaded file | |
response = file_handler.handle_file_upload(file_name, files) | |
return response["message"] | |
except Exception as e: | |
return f"Error processing file: {e}", chat_history | |
# Chat Function | |
def handle_chat(user_message): | |
global chat_history | |
try: | |
if not user_message.strip(): | |
chat_history.append(("AI: ", "Please enter a question.")) | |
# Add user message to chat history | |
if "chart" in user_message.strip():# or "graph" in user_message.strip() or "plot" in user_message.strip()): | |
parts = user_message.split("for", 1) # Split only at the first occurrence of "with" | |
# Assign the first part and second part to variables | |
visual_query = user_message # Strip to remove leading/trailing spaces | |
user_message = parts[1].strip() if len(parts) > 1 else None | |
else: | |
visual_query = None | |
# Get AI response | |
ai_response, visual_response = chat_handler.answer_question(user_message.strip(),visual_query) | |
if visual_query is not None: | |
user_message = f"<b style='color:#6366f1;'>USER:</b> {visual_query.strip()}" | |
else: | |
user_message = f"<b style='color:#6366f1;'>USER:</b> {user_message.strip()}" | |
if visual_response: | |
from PIL import Image | |
import base64 | |
from io import BytesIO | |
# Open the image file | |
img = Image.open(visual_response) | |
# Convert the PIL Image to a base64 encoded string | |
buffered = BytesIO() | |
img.save(buffered, format="PNG") | |
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
img = f'<img src="data:image/png;base64,{img_str}" style="width:600px; height:600px;">' | |
ai_response = img + ai_response | |
user_message = f"<b style='color:#6366f1;'>USER:</b> {visual_query.strip()}" | |
ai_response = f"<b style='color:#6366f1;'>AI:</b> {ai_response}" | |
# Add AI response to chat history | |
chat_history.append((user_message,ai_response)) | |
return chat_history | |
except Exception as e: | |
user_message = f"<b style='color:#6366f1;'>USER:</b> {visual_query.strip()}" | |
chat_history.append((user_message, f"As you know I am still learning at this moment I am not able to respond to your question.\nThank you for your patience!")) | |
return chat_history | |
# Chat handling function | |
def chat_interaction(user_message): | |
updated_history = handle_chat(user_message) | |
return updated_history, "" # Return the chat history to the chatbot | |
with gr.Blocks(theme="soft") as app: # Use the 'soft' theme | |
gr.Markdown( | |
"<h1 style='color:blue;'>Material Forecast - Generative AI Agent</h1>", | |
elem_id="title" | |
) | |
with gr.Tab("Chat"): | |
chat_box = gr.Chatbot(label="Chat History",elem_id="chatbot_gpt") | |
with gr.Row(): # Place elements in the same row | |
with gr.Column(scale=2): | |
user_input = gr.Textbox( | |
placeholder="Type your message here...", | |
label="Your Message", | |
elem_id="user-input", container=False) | |
with gr.Column(scale=1): | |
with gr.Row(): # Place the buttons in the same row | |
send_button = gr.Button("Send", elem_id="send-button") | |
clear_button = gr.Button("Clear", elem_id="clear-button") | |
send_button.click( | |
chat_interaction, | |
inputs=[user_input], | |
outputs=[chat_box, user_input] | |
) | |
"""with gr.Tab("File Upload"): | |
upload_button = gr.UploadButton( | |
label="Upload your file (PDF, Excel, Docx, Txt, CSV)", | |
file_types=[".pdf", ".xlsx", ".docx", ".txt", ".csv"], | |
file_count="single" | |
) | |
file_output = gr.Textbox(label="File Processing Output") | |
upload_button.upload(fn=handle_file_uploads, inputs=upload_button, outputs=[file_output])""" | |
# Clear input field function | |
def clear_input(): | |
return "" # Clear the input field | |
clear_button.click( | |
clear_input, | |
inputs=[], | |
outputs=[user_input] # Clear only the user_input field | |
) | |
# Custom CSS for styling | |
app.css = """ | |
#send-button { | |
color: white; | |
border-radius: 20px; /* Round corners */ | |
background-color: #6366f1; | |
transition: background-color 0.3s, transform 0.3s; | |
} | |
#send-button:hover { | |
background-color: #6366f0; /* Change background color on hover */ | |
transform: scale(1.05); /* Slightly enlarge on hover */ | |
} | |
#clear-button { | |
color: white; | |
border-radius: 20px; /* Round corners */ | |
background-color: #6366f1; | |
transition: background-color 0.3s, transform 0.3s; | |
} | |
#clear-button:hover { | |
background-color: #6366f0; /* Change background color on hover */ | |
transform: scale(1.05); /* Slightly enlarge on hover */ | |
} | |
#user-input { | |
flex-grow: 1; /* Allow textbox to take remaining space */ | |
} | |
#title { | |
margin-bottom: 10px; /* Space below the title */ | |
text-align: center; | |
#chatbot_gpt { | |
height: 600px !important; /* Adjust height as needed */ | |
} | |
} | |
""" | |
# Launch the app | |
app.launch(debug=True) | |