from __future__ import annotations from typing import AsyncIterable, Dict, Deque from collections import deque import requests from fastapi_poe import ( PoeBot, QueryRequest, PartialResponse, SettingsRequest, SettingsResponse, make_app, ) from fastapi import Header from modal import Image, App, asgi_app, Volume, Mount import os import json import re import tempfile import shutil import logging # LM Studio endpoint configurations NGROK_URL = "https://fca7-2601-2c1-280-1320-b881-aac7-9186-9365.ngrok-free.app" LM_STUDIO_CHAT_URL = f"{NGROK_URL}/v1/chat/completions" # Hardcoded model name for LM Studio MODEL_NAME = "bartowski/Qwen2.5-Coder-32B-Instruct-GGUF/Qwen2.5-Coder-32B-Instruct-IQ2_M.gguf" # Poe bot access key for the new bot NEW_BOT_ACCESS_KEY = "YOUR_ACCESS_KEY_HERE" # Replace with your actual access key # Path to store conversation history in volume VOLUME_PATH = "/data/user_histories.json" # Configure logging logging.basicConfig(level=logging.INFO, format='[%(asctime)s] [%(levelname)s] %(message)s') # Dictionary to track user-specific conversation history (initially empty) user_histories: Dict[str, Deque[dict]] = {} # Set a maximum number of messages to keep in the history MAX_HISTORY_MESSAGES = 50 # Adjustable based on expected conversation length # Load existing conversation history from volume if available if os.path.exists(VOLUME_PATH): try: with open(VOLUME_PATH, "r") as f: user_histories = { user_id: deque(history, maxlen=MAX_HISTORY_MESSAGES) for user_id, history in json.load(f).items() } logging.info("Loaded existing conversation histories.") except Exception as e: logging.error(f"Failed to load user histories: {e}") user_histories = {} else: logging.info("No existing conversation history found. Initializing a new history store.") class AnotherSecureLMStudioBot(PoeBot): async def get_response( self, request: QueryRequest, authorization: str = Header(...) ) -> AsyncIterable[PartialResponse]: """ Handle user queries dynamically while validating the Poe access key. """ # Validate the Poe access key if authorization != NEW_BOT_ACCESS_KEY: logging.warning("Unauthorized access key used.") # Proceed without raising an error for testing purposes # Extract user identifier user_id = self.get_user_id(request) if not user_id: yield PartialResponse(text="Error: User identifier not provided.") return # Get or create user-specific conversation history if user_id not in user_histories: user_histories[user_id] = deque(maxlen=MAX_HISTORY_MESSAGES) logging.info(f"Initializing new conversation history for user {user_id}.") conversation_history = user_histories[user_id] # Extract the user's message user_message = request.query[-1].content # Sanitize user input to prevent injection attacks user_message = re.sub(r"[<>]", "", user_message) # Append the user's message to the conversation history conversation_history.append({"role": "user", "content": user_message}) # Log the conversation history before generating a response logging.info(f"Conversation history for user {user_id}: {list(conversation_history)}") try: # Generate response based on conversation history response_text = self.get_chat_completion_with_context(conversation_history) # Add bot response to conversation history conversation_history.append({"role": "assistant", "content": response_text}) # Log the response generated logging.info(f"Generated response for user {user_id}: {response_text}") # Save updated conversation history to volume self.save_conversation_history() except Exception as e: # Graceful error handling logging.error(f"An error occurred while processing the request for user {user_id}: {e}") response_text = f"An error occurred: {e}" # Yield the response yield PartialResponse(text=response_text.strip()) def get_user_id(self, request: QueryRequest) -> str: """ Extract or generate a unique user identifier. """ # Use request.user_id if available if hasattr(request, 'user_id') and request.user_id: return request.user_id # Fallback: use a fixed identifier return "default_user_id" def get_chat_completion_with_context(self, conversation_history: Deque[dict]) -> str: """ Send a chat completion request to LM Studio's /v1/chat/completions endpoint, including the full conversation history. """ # Prepare the payload payload = { "model": MODEL_NAME, "messages": list(conversation_history), "temperature": 0.7, # Adjust as needed "max_tokens": 1024, "stream": False } logging.info(f"Sending request to LM Studio with payload:\n{json.dumps(payload, indent=2)}") response = requests.post(LM_STUDIO_CHAT_URL, json=payload, timeout=120) response.raise_for_status() response_data = response.json() # Use the assistant response content from the response if "choices" in response_data and len(response_data["choices"]) > 0: generated_text = response_data["choices"][0].get("message", {}).get("content", "") else: generated_text = "" # Fallback in case of empty content if not generated_text: generated_text = "I'm sorry, I couldn't generate a response. Could you please try again?" return generated_text def save_conversation_history(self): """ Save the current conversation history to the volume in an atomic way. """ try: with tempfile.NamedTemporaryFile('w', delete=False) as tmp_file: json.dump( {user_id: list(history) for user_id, history in user_histories.items()}, tmp_file, indent=4 ) temp_file_path = tmp_file.name shutil.move(temp_file_path, VOLUME_PATH) logging.info("Successfully saved user conversation histories.") except Exception as e: logging.error(f"Failed to save user histories: {e}") async def get_settings(self, setting: SettingsRequest) -> SettingsResponse: """ Configure the bot's capabilities for Poe, such as enabling attachments. """ return SettingsResponse( allow_attachments=True, allow_images=True, allow_audio=True, allow_video=True, allow_links=True, ) # Modal configuration for the new bot REQUIREMENTS = ["fastapi-poe==0.0.24", "requests==2.31.0"] image = Image.debian_slim().pip_install(REQUIREMENTS) # Mount the volume to persist user histories app = App( "another-secure-lmstudio-poe-bot", mounts=[Mount.from_local_dir("/data", remote_path="/data", recursive=True)] ) @app.function(image=image) @asgi_app() def fastapi_app(): bot = AnotherSecureLMStudioBot() return make_app(bot, allow_without_key=True)