butler / app.py
tarunkumark2's picture
deployment
2d8b8bf
from dotenv import load_dotenv
load_dotenv()
import os
import chainlit as cl
from llama_index.core import Settings
from llama_index.core import VectorStoreIndex, StorageContext
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.milvus import MilvusVectorStore
from llama_index.embeddings.nvidia import NVIDIAEmbedding
from llama_index.llms.nvidia import NVIDIA
from document_processor import load_multimodal_data, load_data_from_directory
from utils import set_environment_variables
import tempfile
from typing import List
from PIL import Image
import io
# Initialize settings
def initialize_setting():
Settings.embed_model = NVIDIAEmbedding(model="nvidia/nv-embedqa-e5-v5", truncate="END")
Settings.llm = NVIDIA(model="meta/llama-3.1-70b-instruct")
Settings.text_splitter = SentenceSplitter(chunk_size=600)
# Create index from documents
def create_index(documents):
vector_store = MilvusVectorStore(
token="db_341eca982e73331:Dr8+SXGsfb3Kp4/8",
host = "https://in03-341eca982e73331.serverless.gcp-us-west1.cloud.zilliz.com",
port = 19530,
dim = 1024
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
return VectorStoreIndex.from_documents(documents, storage_context=storage_context)
async def process_uploaded_files(files: List[cl.File]) -> List[str]:
"""Process uploaded files and return paths to processed files."""
temp_dir = tempfile.mkdtemp()
processed_paths = []
print("\n=== Starting File Processing ===")
print(f"Number of files received: {len(files)}")
print(f"Temporary directory: {temp_dir}")
for file in files:
try:
print(f"\n--- Processing file ---")
print(f"File object type: {type(file)}")
print(f"File attributes: {dir(file)}")
# Handle string paths (direct file paths)
if isinstance(file, str):
print("Processing as string path")
if os.path.exists(file):
file_name = os.path.basename(file)
file_extension = os.path.splitext(file_name)[1].lower()
temp_path = os.path.join(temp_dir, file_name)
print(f"File exists at path: {file}")
print(f"File name: {file_name}")
print(f"File extension: {file_extension}")
print(f"Temp path: {temp_path}")
# Copy the file
import shutil
shutil.copy2(file, temp_path)
if file_extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']:
await cl.Message(
content=f"πŸ“Έ Received image: {file_name}",
elements=[cl.Image(path=file, name=file_name, display="inline")]
).send()
else:
await cl.Message(
content=f"πŸ“„ Received file: {file_name}"
).send()
processed_paths.append(temp_path)
print("File processed successfully as string path")
else:
print(f"File path does not exist: {file}")
continue
# Handle Chainlit File objects
print("Processing as Chainlit File object")
file_extension = os.path.splitext(file.name)[1].lower()
temp_path = os.path.join(temp_dir, file.name)
print(f"File name: {file.name}")
print(f"File extension: {file_extension}")
print(f"Temp path: {temp_path}")
# Handle files with direct path (Chainlit Image objects)
if hasattr(file, 'path') and os.path.exists(file.path):
print(f"File has path attribute: {file.path}")
# For Chainlit Image objects, copy the file
import shutil
shutil.copy2(file.path, temp_path)
print("File copied successfully")
if file_extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']:
print("Processing as image file")
await cl.Message(
content=f"πŸ“Έ Received image: {file.name}",
elements=[cl.Image(path=file.path, name=file.name, display="inline")]
).send()
else:
print("Processing as non-image file")
await cl.Message(
content=f"πŸ“„ Received file: {file.name}"
).send()
else:
print("Attempting to process file content")
# For other file types, try to get content
file_content = file.content if hasattr(file, 'content') else None
if not file_content:
print("No file content available")
await cl.Message(
content=f"⚠️ Warning: Could not access content for {file.name}"
).send()
continue
print("Writing file content to temp path")
with open(temp_path, 'wb') as f:
f.write(file_content)
await cl.Message(
content=f"πŸ“„ Received file: {file.name}"
).send()
processed_paths.append(temp_path)
print("File processed successfully")
except Exception as e:
print(f"Error processing file: {str(e)}")
print(f"Error type: {type(e)}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
await cl.Message(
content=f"❌ Error processing file: {str(e)}"
).send()
continue
print("\n=== File Processing Summary ===")
print(f"Total files processed: {len(processed_paths)}")
print(f"Processed paths: {processed_paths}")
return processed_paths
@cl.on_chat_start
async def start():
"""Initialize the chat session."""
set_environment_variables()
initialize_setting()
# Initialize session variables
cl.user_session.set('index', None)
cl.user_session.set('temp_dir', tempfile.mkdtemp())
# Send welcome message
await cl.Message(
content="πŸ‘‹ Welcome! You can:\n"
"1. Upload images or documents using the paperclip icon\n"
"2. Ask questions about the uploaded content\n"
"3. Get detailed analysis including text extraction and scene descriptions"
).send()
@cl.on_message
async def main(message: cl.Message):
"""Handle incoming messages and files."""
print("\n=== Starting Message Processing ===")
print(f"Message type: {type(message)}")
print(f"Message content: {message.content}")
print(f"Message elements: {message.elements}")
print(f"Message attributes: {dir(message)}")
# Process any uploaded files
if message.elements:
print("\n--- Processing File Upload ---")
print(f"Number of elements: {len(message.elements)}")
print(f"Elements types: {[type(elem) for elem in message.elements]}")
try:
# Process uploaded files
print("Starting file processing...")
processed_paths = await process_uploaded_files(message.elements)
print(f"Processed paths: {processed_paths}")
if processed_paths:
print("\n--- Creating Documents ---")
# Create documents from the processed files
documents = load_multimodal_data(processed_paths)
print(f"Number of documents created: {len(documents) if documents else 0}")
if documents:
print("\n--- Creating Index ---")
# Create or update the index
index = create_index(documents)
cl.user_session.set('index', index)
print("Index created and stored in session")
await cl.Message(
content="βœ… Files processed successfully! You can now ask questions about the content."
).send()
else:
print("No documents were created")
await cl.Message(
content="⚠️ No documents were created from the uploaded files."
).send()
return
else:
print("No files were processed successfully")
await cl.Message(
content="⚠️ No files were successfully processed."
).send()
return
except Exception as e:
print(f"\n!!! Error in file processing !!!")
print(f"Error type: {type(e)}")
print(f"Error message: {str(e)}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
await cl.Message(
content=f"❌ Error processing files: {str(e)}"
).send()
return
# Handle text queries
if message.content:
print("\n--- Processing Text Query ---")
print(f"Query content: {message.content}")
index = cl.user_session.get('index')
print(f"Index exists: {index is not None}")
if index is None:
print("No index found in session")
await cl.Message(
content="⚠️ Please upload some files first before asking questions."
).send()
return
try:
print("Creating query engine...")
# Create message placeholder for streaming
msg = cl.Message(content="")
await msg.send()
# Process the query
query_engine = index.as_query_engine(similarity_top_k=20)
print("Executing query...")
response = query_engine.query(message.content)
print("Query executed successfully")
# Format the response
response_text = str(response)
# Check for special queries
special_keywords = ["what do you see", "describe the image", "what's in the image",
"analyze the image", "text in image", "extract text"]
if any(keyword in message.content.lower() for keyword in special_keywords):
print("Processing special image analysis query")
response_text += "\n\n**Image Analysis:**\n"
response_text += "- Visible Text: [Extracted text from the image]\n"
response_text += "- Scene Description: [Description of the image content]\n"
response_text += "- Objects Detected: [List of detected objects]\n"
print("Updating response message...")
# Update the message with the final response
msg.content=response_text
await msg.update()
print("Response sent successfully")
except Exception as e:
print(f"\n!!! Error in query processing !!!")
print(f"Error type: {type(e)}")
print(f"Error message: {str(e)}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
await cl.Message(
content=f"❌ Error processing query: {str(e)}"
).send()
print("\n=== Message Processing Complete ===\n")
@cl.on_stop
def on_stop():
"""Clean up resources when the chat session ends."""
# Clean up temporary directory
temp_dir = cl.user_session.get('temp_dir')
if temp_dir and os.path.exists(temp_dir):
try:
import shutil
shutil.rmtree(temp_dir)
except Exception:
pass
if __name__ == "__main__":
cl.run()