Scripts / lmstudio_gradio.py
Aborman's picture
Upload folder using huggingface_hub
9d344de verified
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
High-Performance Chat Interface for LM Studio
This script creates a robust and efficient chat interface using Gradio,
facilitating seamless interactions with the LM Studio API. It leverages
GPU capabilities for accelerated processing and adheres to best practices
in modern Python programming. Comprehensive logging and error handling
ensure reliability and ease of maintenance.
Author: Your Name
Date: YYYY-MM-DD
"""
import gradio as gr
import httpx # Replacing 'requests' with 'httpx' for asynchronous HTTP calls
import logging
import json
import os
import numpy as np
import torch
import asyncio
# ===========================
# Configuration and Constants
# ===========================
# Set up logging for detailed diagnostics
logging.basicConfig(
level=logging.DEBUG, # Set to DEBUG for more verbose output
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# LM Studio REST API Base URL
BASE_URL = os.getenv("LMSTUDIO_API_BASE_URL", "http://localhost:1234/v1")
# GPU Availability and Device Configuration
USE_GPU = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_GPU else "cpu")
logger.info(f"Using device: {DEVICE}")
# Constants for Dynamic max_tokens Calculation
MODEL_MAX_TOKENS = 32768 # Model's maximum context length
AVERAGE_CHARS_PER_TOKEN = 4 # Approximate average characters per token
BUFFER_TOKENS = 2000 # Reserved tokens for system prompts and overhead
MIN_OUTPUT_TOKENS = 1000 # Minimum tokens to ensure meaningful responses
# Maximum number of embeddings to store to optimize memory usage
MAX_EMBEDDINGS = 100
# HTTPX Timeout Configuration
HTTPX_TIMEOUT = 300 # seconds, adjust as needed for longer processing times
# ===========================
# Utility Functions
# ===========================
def calculate_max_tokens(message, model_max_tokens=MODEL_MAX_TOKENS,
buffer=BUFFER_TOKENS, avg_chars_per_token=AVERAGE_CHARS_PER_TOKEN,
min_tokens=MIN_OUTPUT_TOKENS):
"""
Calculate the maximum number of tokens for the output based on the input message length.
Args:
message (str): The input message from the user.
model_max_tokens (int): The total token capacity of the model.
buffer (int): Reserved tokens for system prompts and overhead.
avg_chars_per_token (int): Approximate number of characters per token.
min_tokens (int): Minimum number of tokens to ensure a meaningful response.
Returns:
int: The calculated maximum tokens for the output.
"""
input_length = len(message)
input_tokens = input_length / avg_chars_per_token
max_tokens = model_max_tokens - int(input_tokens) - buffer
calculated_max = max(max_tokens, min_tokens)
logger.debug(f"Input length (chars): {input_length}, "
f"Estimated input tokens: {input_tokens}, "
f"Max tokens for output: {calculated_max}")
return calculated_max
async def get_embeddings(text):
"""
Retrieve embeddings for the given text from the LM Studio API.
Args:
text (str): The input text to generate embeddings for.
Returns:
list or None: The embedding vector as a list if successful, else None.
"""
url = f"{BASE_URL}/embeddings"
payload = {"model": "nomad_embed_text_v1_5_Q8_0", "input": text}
logger.info(f"Requesting embeddings for input: {text[:100]}...")
async with httpx.AsyncClient(timeout=HTTPX_TIMEOUT) as client:
try:
response = await client.post(
url,
json=payload, # Proper JSON serialization
headers={
"Content-Type": "application/json" # Ensuring correct Content-Type
}
)
logger.info(f"Embeddings response status code: {response.status_code}")
response.raise_for_status()
data = response.json()
logger.debug(f"Embeddings response data: {data}")
if "data" in data and len(data["data"]) > 0:
embedding = np.array(data["data"][0]["embedding"])
if USE_GPU:
embedding = torch.tensor(embedding, device=DEVICE).tolist() # Convert to list for serialization
return embedding
else:
logger.error("Invalid response structure for embeddings.")
return None
except httpx.RequestError as e:
logger.error(f"Failed to retrieve embeddings: {e}")
return None
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error while retrieving embeddings: {e}")
return None
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e}")
return None
def calculate_similarity(vec1, vec2):
"""
Calculate the cosine similarity between two vectors using GPU acceleration.
Args:
vec1 (list or torch.Tensor): The first embedding vector.
vec2 (list or torch.Tensor): The second embedding vector.
Returns:
float: The cosine similarity score.
"""
if vec1 is None or vec2 is None:
logger.warning("One or both vectors for similarity calculation are None.")
return 0.0
logger.debug("Calculating similarity between vectors.")
vec1_tensor = torch.tensor(vec1, device=DEVICE) if not isinstance(vec1, torch.Tensor) else vec1.to(DEVICE)
vec2_tensor = torch.tensor(vec2, device=DEVICE) if not isinstance(vec2, torch.Tensor) else vec2.to(DEVICE)
similarity = torch.nn.functional.cosine_similarity(vec1_tensor.unsqueeze(0), vec2_tensor.unsqueeze(0)).item()
logger.debug(f"Calculated similarity: {similarity}")
return similarity
# ===========================
# API Interaction Handling
# ===========================
async def chat_with_lmstudio(messages, max_tokens):
"""
Handle chat completions with the LM Studio API using streaming.
Args:
messages (list): A list of message dictionaries following OpenAI's format.
max_tokens (int): The maximum number of tokens to generate in the response.
Yields:
str: Chunks of the generated response.
"""
url = f"{BASE_URL}/chat/completions"
payload = {
"model": "Qwen2.5-Coder-32B-Instruct", # Adjusted model name if necessary
"messages": messages,
"temperature": 0.7,
"max_tokens": max_tokens,
"stream": True,
}
logger.info(f"Sending request to chat/completions with max_tokens: {max_tokens}")
async with httpx.AsyncClient(timeout=HTTPX_TIMEOUT) as client:
try:
async with client.stream("POST", url, json=payload, headers={"Content-Type": "application/json"}) as response:
logger.info(f"chat/completions response status code: {response.status_code}")
response.raise_for_status()
async for line in response.aiter_lines():
if line:
try:
decoded_line = line.strip()
if decoded_line.startswith("data: "):
data = json.loads(decoded_line[6:])
logger.debug(f"Received chunk: {data}")
content = data.get("choices", [{}])[0].get("delta", {}).get("content", "")
yield content
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e}")
except httpx.RequestError as e:
logger.error(f"LM Studio chat/completions request failed: {e}")
yield "An error occurred while generating a response."
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error during chat/completions: {e}")
yield "An HTTP error occurred while generating a response."
# ===========================
# User Interface Implementation
# ===========================
def gradio_chat_interface():
"""
Create and launch the Gradio Blocks interface for the chat application.
"""
with gr.Blocks() as interface:
gr.Markdown("# 🚀 High-Performance Chat Interface for LM Studio")
# Chatbot component to display the conversation
chatbot = gr.Chatbot(label="Conversation", type="messages")
# User input textbox
user_input = gr.Textbox(
label="Your Message",
placeholder="Type your message here...",
lines=2,
interactive=True
)
# File upload component for context files
file_input = gr.File(
label="Upload Context File (.txt)",
type="binary", # Correct value as per Gradio's expectations
interactive=True
)
# Display relevant context based on similarity
context_display = gr.Textbox(
label="Relevant Context",
interactive=False
)
# State to store embeddings and message history
embeddings_state = gr.State({"embeddings": [], "messages_history": []})
async def chat_handler(message, file, state):
"""
Handle user input, process embeddings, retrieve context, and generate responses.
Args:
message (str): The user's input message.
file (UploadedFile): The uploaded context file.
state (dict): The current state containing embeddings and message history.
Yields:
list: Updated chatbot messages, new state, and context display text.
"""
embeddings = state.get("embeddings", [])
messages_history = state.get("messages_history", [])
# ===========================
# File Processing
# ===========================
if file:
try:
file_content = file.read().decode("utf-8")
message += f"\n[File Content]:\n{file_content}"
logger.info("Successfully processed uploaded file.")
except Exception as e:
error_msg = f"Error reading file: {e}"
logger.error(error_msg)
yield [error_msg, state, ""]
return # Terminate the generator after yielding the error
# ===========================
# Embeddings Generation
# ===========================
user_embedding = await get_embeddings(message)
if user_embedding is not None:
embeddings.append(user_embedding)
messages_history.append({"role": "user", "content": message})
logger.info("Embeddings generated and appended to state.")
else:
error_msg = "Failed to generate embeddings."
logger.error(error_msg)
yield [error_msg, state, ""]
return # Terminate the generator after yielding the error
# Limit the number of stored embeddings to optimize memory usage
if len(embeddings) > MAX_EMBEDDINGS:
embeddings = embeddings[-MAX_EMBEDDINGS:]
messages_history = messages_history[-MAX_EMBEDDINGS:]
# ===========================
# Similarity Calculation and Context Retrieval
# ===========================
history = [{"role": "user", "content": message}]
context_text = ""
if len(embeddings) > 1:
similarities = [
(calculate_similarity(user_embedding, emb), idx)
for idx, emb in enumerate(embeddings[:-1])
]
similarities.sort(reverse=True, key=lambda x: x[0])
top_context = similarities[:3]
for similarity, idx in top_context:
context_message = messages_history[idx]
history.insert(0, {"role": "system", "content": context_message["content"]})
context_text += f"Context: {context_message['content'][:100]}...\n"
logger.info("Relevant context retrieved based on similarity.")
# ===========================
# Dynamic max_tokens Calculation
# ===========================
max_tokens = calculate_max_tokens(message)
logger.info(f"Calculated max_tokens for output: {max_tokens}")
# ===========================
# Chat with LM Studio API
# ===========================
response = ""
try:
async for chunk in chat_with_lmstudio(history, max_tokens):
response += chunk
# Ensure response is a string
if not isinstance(response, str):
response = str(response)
# Handle empty response
if not response.strip():
response = "Sorry, I couldn't process your request."
# Update chatbot in real-time with partial responses
updated_chat = chatbot.value.copy()
updated_chat.append({"role": "user", "content": message})
updated_chat.append({"role": "assistant", "content": response})
logger.debug(f"Updated Chat: {updated_chat}")
yield [
updated_chat,
{"embeddings": embeddings, "messages_history": messages_history},
context_text
]
logger.info("Response generation completed.")
except Exception as e:
error_msg = f"An error occurred while generating a response: {e}"
logger.error(error_msg)
yield [error_msg, state, ""]
return # Terminate the generator after yielding the error
# ===========================
# Final State Update
# ===========================
messages_history.append({"role": "assistant", "content": response})
new_state = {"embeddings": embeddings, "messages_history": messages_history}
updated_chat = chatbot.value.copy()
updated_chat.append({"role": "user", "content": message})
updated_chat.append({"role": "assistant", "content": response})
# Final yield
try:
logger.debug(f"Final Updated Chat: {updated_chat}")
yield [
updated_chat,
new_state,
context_text
]
except Exception as e:
error_msg = f"Error updating chatbot: {e}"
logger.error(error_msg)
yield ["An error occurred while updating the chat.", state, ""]
# ===========================
# Send Button Configuration
# ===========================
send_button = gr.Button("Send")
send_button.click(
chat_handler,
inputs=[user_input, file_input, embeddings_state],
outputs=[chatbot, embeddings_state, context_display],
show_progress=True
)
# ===========================
# Launch the Interface
# ===========================
interface.launch(share=True, server_name="0.0.0.0", server_port=7860)
# ===========================
# Main Execution
# ===========================
if __name__ == "__main__":
asyncio.run(gradio_chat_interface())