from dotenv import load_dotenv load_dotenv() import os import chainlit as cl from llama_index.core import Settings from llama_index.core import VectorStoreIndex, StorageContext from llama_index.core.node_parser import SentenceSplitter from llama_index.vector_stores.milvus import MilvusVectorStore from llama_index.embeddings.nvidia import NVIDIAEmbedding from llama_index.llms.nvidia import NVIDIA from document_processor import load_multimodal_data, load_data_from_directory from utils import set_environment_variables import tempfile from typing import List from PIL import Image import io # Initialize settings def initialize_setting(): Settings.embed_model = NVIDIAEmbedding(model="nvidia/nv-embedqa-e5-v5", truncate="END") Settings.llm = NVIDIA(model="meta/llama-3.1-70b-instruct") Settings.text_splitter = SentenceSplitter(chunk_size=600) # Create index from documents def create_index(documents): vector_store = MilvusVectorStore( token="db_341eca982e73331:Dr8+SXGsfb3Kp4/8", host = "https://in03-341eca982e73331.serverless.gcp-us-west1.cloud.zilliz.com", port = 19530, dim = 1024 ) storage_context = StorageContext.from_defaults(vector_store=vector_store) return VectorStoreIndex.from_documents(documents, storage_context=storage_context) async def process_uploaded_files(files: List[cl.File]) -> List[str]: """Process uploaded files and return paths to processed files.""" temp_dir = tempfile.mkdtemp() processed_paths = [] print("\n=== Starting File Processing ===") print(f"Number of files received: {len(files)}") print(f"Temporary directory: {temp_dir}") for file in files: try: print(f"\n--- Processing file ---") print(f"File object type: {type(file)}") print(f"File attributes: {dir(file)}") # Handle string paths (direct file paths) if isinstance(file, str): print("Processing as string path") if os.path.exists(file): file_name = os.path.basename(file) file_extension = os.path.splitext(file_name)[1].lower() temp_path = os.path.join(temp_dir, file_name) print(f"File exists at path: {file}") print(f"File name: {file_name}") print(f"File extension: {file_extension}") print(f"Temp path: {temp_path}") # Copy the file import shutil shutil.copy2(file, temp_path) if file_extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']: await cl.Message( content=f"📸 Received image: {file_name}", elements=[cl.Image(path=file, name=file_name, display="inline")] ).send() else: await cl.Message( content=f"📄 Received file: {file_name}" ).send() processed_paths.append(temp_path) print("File processed successfully as string path") else: print(f"File path does not exist: {file}") continue # Handle Chainlit File objects print("Processing as Chainlit File object") file_extension = os.path.splitext(file.name)[1].lower() temp_path = os.path.join(temp_dir, file.name) print(f"File name: {file.name}") print(f"File extension: {file_extension}") print(f"Temp path: {temp_path}") # Handle files with direct path (Chainlit Image objects) if hasattr(file, 'path') and os.path.exists(file.path): print(f"File has path attribute: {file.path}") # For Chainlit Image objects, copy the file import shutil shutil.copy2(file.path, temp_path) print("File copied successfully") if file_extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']: print("Processing as image file") await cl.Message( content=f"📸 Received image: {file.name}", elements=[cl.Image(path=file.path, name=file.name, display="inline")] ).send() else: print("Processing as non-image file") await cl.Message( content=f"📄 Received file: {file.name}" ).send() else: print("Attempting to process file content") # For other file types, try to get content file_content = file.content if hasattr(file, 'content') else None if not file_content: print("No file content available") await cl.Message( content=f"⚠️ Warning: Could not access content for {file.name}" ).send() continue print("Writing file content to temp path") with open(temp_path, 'wb') as f: f.write(file_content) await cl.Message( content=f"📄 Received file: {file.name}" ).send() processed_paths.append(temp_path) print("File processed successfully") except Exception as e: print(f"Error processing file: {str(e)}") print(f"Error type: {type(e)}") import traceback print(f"Traceback: {traceback.format_exc()}") await cl.Message( content=f"❌ Error processing file: {str(e)}" ).send() continue print("\n=== File Processing Summary ===") print(f"Total files processed: {len(processed_paths)}") print(f"Processed paths: {processed_paths}") return processed_paths @cl.on_chat_start async def start(): """Initialize the chat session.""" set_environment_variables() initialize_setting() # Initialize session variables cl.user_session.set('index', None) cl.user_session.set('temp_dir', tempfile.mkdtemp()) # Send welcome message await cl.Message( content="👋 Welcome! You can:\n" "1. Upload images or documents using the paperclip icon\n" "2. Ask questions about the uploaded content\n" "3. Get detailed analysis including text extraction and scene descriptions" ).send() @cl.on_message async def main(message: cl.Message): """Handle incoming messages and files.""" print("\n=== Starting Message Processing ===") print(f"Message type: {type(message)}") print(f"Message content: {message.content}") print(f"Message elements: {message.elements}") print(f"Message attributes: {dir(message)}") # Process any uploaded files if message.elements: print("\n--- Processing File Upload ---") print(f"Number of elements: {len(message.elements)}") print(f"Elements types: {[type(elem) for elem in message.elements]}") try: # Process uploaded files print("Starting file processing...") processed_paths = await process_uploaded_files(message.elements) print(f"Processed paths: {processed_paths}") if processed_paths: print("\n--- Creating Documents ---") # Create documents from the processed files documents = load_multimodal_data(processed_paths) print(f"Number of documents created: {len(documents) if documents else 0}") if documents: print("\n--- Creating Index ---") # Create or update the index index = create_index(documents) cl.user_session.set('index', index) print("Index created and stored in session") await cl.Message( content="✅ Files processed successfully! You can now ask questions about the content." ).send() else: print("No documents were created") await cl.Message( content="⚠️ No documents were created from the uploaded files." ).send() return else: print("No files were processed successfully") await cl.Message( content="⚠️ No files were successfully processed." ).send() return except Exception as e: print(f"\n!!! Error in file processing !!!") print(f"Error type: {type(e)}") print(f"Error message: {str(e)}") import traceback print(f"Traceback: {traceback.format_exc()}") await cl.Message( content=f"❌ Error processing files: {str(e)}" ).send() return # Handle text queries if message.content: print("\n--- Processing Text Query ---") print(f"Query content: {message.content}") index = cl.user_session.get('index') print(f"Index exists: {index is not None}") if index is None: print("No index found in session") await cl.Message( content="⚠️ Please upload some files first before asking questions." ).send() return try: print("Creating query engine...") # Create message placeholder for streaming msg = cl.Message(content="") await msg.send() # Process the query query_engine = index.as_query_engine(similarity_top_k=20) print("Executing query...") response = query_engine.query(message.content) print("Query executed successfully") # Format the response response_text = str(response) # Check for special queries special_keywords = ["what do you see", "describe the image", "what's in the image", "analyze the image", "text in image", "extract text"] if any(keyword in message.content.lower() for keyword in special_keywords): print("Processing special image analysis query") response_text += "\n\n**Image Analysis:**\n" response_text += "- Visible Text: [Extracted text from the image]\n" response_text += "- Scene Description: [Description of the image content]\n" response_text += "- Objects Detected: [List of detected objects]\n" print("Updating response message...") # Update the message with the final response msg.content=response_text await msg.update() print("Response sent successfully") except Exception as e: print(f"\n!!! Error in query processing !!!") print(f"Error type: {type(e)}") print(f"Error message: {str(e)}") import traceback print(f"Traceback: {traceback.format_exc()}") await cl.Message( content=f"❌ Error processing query: {str(e)}" ).send() print("\n=== Message Processing Complete ===\n") @cl.on_stop def on_stop(): """Clean up resources when the chat session ends.""" # Clean up temporary directory temp_dir = cl.user_session.get('temp_dir') if temp_dir and os.path.exists(temp_dir): try: import shutil shutil.rmtree(temp_dir) except Exception: pass if __name__ == "__main__": cl.run()