tldw / App_Function_Libraries /Chat /Chat_Functions.py
oceansweep's picture
Upload 169 files
c5b0bb7 verified
# Chat_Functions.py
# Chat functions for interacting with the LLMs as chatbots
import base64
# Imports
import json
import logging
import os
import re
import sqlite3
import tempfile
import time
from datetime import datetime
from pathlib import Path
#
# External Imports
#
# Local Imports
from App_Function_Libraries.DB.DB_Manager import start_new_conversation, delete_messages_in_conversation, save_message
from App_Function_Libraries.DB.RAG_QA_Chat_DB import get_db_connection, get_conversation_name
from App_Function_Libraries.LLM_API_Calls import chat_with_openai, chat_with_anthropic, chat_with_cohere, \
chat_with_groq, chat_with_openrouter, chat_with_deepseek, chat_with_mistral, chat_with_huggingface
from App_Function_Libraries.LLM_API_Calls_Local import chat_with_aphrodite, chat_with_local_llm, chat_with_ollama, \
chat_with_kobold, chat_with_llama, chat_with_oobabooga, chat_with_tabbyapi, chat_with_vllm, chat_with_custom_openai
from App_Function_Libraries.DB.SQLite_DB import load_media_content
from App_Function_Libraries.Utils.Utils import generate_unique_filename, load_and_log_configs
from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram
#
####################################################################################################
#
# Functions:
def approximate_token_count(history):
total_text = ''
for user_msg, bot_msg in history:
if user_msg:
total_text += user_msg + ' '
if bot_msg:
total_text += bot_msg + ' '
total_tokens = len(total_text.split())
return total_tokens
def chat_api_call(api_endpoint, api_key, input_data, prompt, temp, system_message=None):
log_counter("chat_api_call_attempt", labels={"api_endpoint": api_endpoint})
start_time = time.time()
if not api_key:
api_key = None
model = None
try:
logging.info(f"Debug - Chat API Call - API Endpoint: {api_endpoint}")
logging.info(f"Debug - Chat API Call - API Key: {api_key}")
logging.info(f"Debug - Chat chat_api_call - API Endpoint: {api_endpoint}")
if api_endpoint.lower() == 'openai':
response = chat_with_openai(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == 'anthropic':
# Retrieve the model from config
loaded_config_data = load_and_log_configs()
model = loaded_config_data['models']['anthropic'] if loaded_config_data else None
response = chat_with_anthropic(
api_key=api_key,
input_data=input_data,
model=model,
custom_prompt_arg=prompt,
system_prompt=system_message
)
elif api_endpoint.lower() == "cohere":
response = chat_with_cohere(
api_key,
input_data,
model=model,
custom_prompt_arg=prompt,
system_prompt=system_message,
temp=temp
)
elif api_endpoint.lower() == "groq":
response = chat_with_groq(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "openrouter":
response = chat_with_openrouter(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "deepseek":
response = chat_with_deepseek(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "mistral":
response = chat_with_mistral(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "llama.cpp":
response = chat_with_llama(input_data, prompt, temp, None, api_key, system_message)
elif api_endpoint.lower() == "kobold":
response = chat_with_kobold(input_data, api_key, prompt, temp, system_message)
elif api_endpoint.lower() == "ooba":
response = chat_with_oobabooga(input_data, api_key, prompt, temp, system_message)
elif api_endpoint.lower() == "tabbyapi":
response = chat_with_tabbyapi(input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "vllm":
response = chat_with_vllm(input_data, prompt, system_message)
elif api_endpoint.lower() == "local-llm":
response = chat_with_local_llm(input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "huggingface":
response = chat_with_huggingface(api_key, input_data, prompt, temp) # , system_message)
elif api_endpoint.lower() == "ollama":
response = chat_with_ollama(input_data, prompt, None, api_key, temp, system_message)
elif api_endpoint.lower() == "aphrodite":
response = chat_with_aphrodite(input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "custom-openai-api":
response = chat_with_custom_openai(api_key, input_data, prompt, temp, system_message)
else:
raise ValueError(f"Unsupported API endpoint: {api_endpoint}")
call_duration = time.time() - start_time
log_histogram("chat_api_call_duration", call_duration, labels={"api_endpoint": api_endpoint})
log_counter("chat_api_call_success", labels={"api_endpoint": api_endpoint})
return response
except Exception as e:
log_counter("chat_api_call_error", labels={"api_endpoint": api_endpoint, "error": str(e)})
logging.error(f"Error in chat function: {str(e)}")
return f"An error occurred: {str(e)}"
def chat(message, history, media_content, selected_parts, api_endpoint, api_key, prompt, temperature,
system_message=None):
log_counter("chat_attempt", labels={"api_endpoint": api_endpoint})
start_time = time.time()
try:
logging.info(f"Debug - Chat Function - Message: {message}")
logging.info(f"Debug - Chat Function - Media Content: {media_content}")
logging.info(f"Debug - Chat Function - Selected Parts: {selected_parts}")
logging.info(f"Debug - Chat Function - API Endpoint: {api_endpoint}")
# logging.info(f"Debug - Chat Function - Prompt: {prompt}")
# Ensure selected_parts is a list
if not isinstance(selected_parts, (list, tuple)):
selected_parts = [selected_parts] if selected_parts else []
# logging.debug(f"Debug - Chat Function - Selected Parts (after check): {selected_parts}")
# Combine the selected parts of the media content
combined_content = "\n\n".join(
[f"{part.capitalize()}: {media_content.get(part, '')}" for part in selected_parts if part in media_content])
# Print first 500 chars
# logging.debug(f"Debug - Chat Function - Combined Content: {combined_content[:500]}...")
# Prepare the input for the API
input_data = f"{combined_content}\n\n" if combined_content else ""
for old_message, old_response in history:
input_data += f"{old_message}\nAssistant: {old_response}\n\n"
input_data += f"{message}\n"
if system_message:
print(f"System message: {system_message}")
logging.debug(f"Debug - Chat Function - System Message: {system_message}")
temperature = float(temperature) if temperature else 0.7
temp = temperature
logging.debug(f"Debug - Chat Function - Temperature: {temperature}")
logging.debug(f"Debug - Chat Function - API Key: {api_key[:10]}")
logging.debug(f"Debug - Chat Function - Prompt: {prompt}")
# Use the existing API request code based on the selected endpoint
response = chat_api_call(api_endpoint, api_key, input_data, prompt, temp, system_message)
chat_duration = time.time() - start_time
log_histogram("chat_duration", chat_duration, labels={"api_endpoint": api_endpoint})
log_counter("chat_success", labels={"api_endpoint": api_endpoint})
return response
except Exception as e:
log_counter("chat_error", labels={"api_endpoint": api_endpoint, "error": str(e)})
logging.error(f"Error in chat function: {str(e)}")
return f"An error occurred: {str(e)}"
def save_chat_history_to_db_wrapper(chatbot, conversation_id, media_content, media_name=None):
log_counter("save_chat_history_to_db_attempt")
start_time = time.time()
logging.info(f"Attempting to save chat history. Media content type: {type(media_content)}")
try:
# First check if we can access the database
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT 1")
except sqlite3.DatabaseError as db_error:
logging.error(f"Database is corrupted or inaccessible: {str(db_error)}")
return conversation_id, "Database error: The database file appears to be corrupted. Please contact support."
# Now attempt the save
if not conversation_id:
# Only for new conversations, not updates
media_id = None
if isinstance(media_content, dict) and 'content' in media_content:
try:
content = media_content['content']
content_json = content if isinstance(content, dict) else json.loads(content)
media_id = content_json.get('webpage_url')
media_name = media_name or content_json.get('title', 'Unnamed Media')
except (json.JSONDecodeError, AttributeError) as e:
logging.error(f"Error processing media content: {str(e)}")
media_id = "unknown_media"
media_name = media_name or "Unnamed Media"
else:
media_id = "unknown_media"
media_name = media_name or "Unnamed Media"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
conversation_title = f"{media_name}_{timestamp}"
conversation_id = start_new_conversation(title=conversation_title, media_id=media_id)
logging.info(f"Created new conversation with ID: {conversation_id}")
# For both new and existing conversations
try:
delete_messages_in_conversation(conversation_id)
for user_msg, assistant_msg in chatbot:
if user_msg:
save_message(conversation_id, "user", user_msg)
if assistant_msg:
save_message(conversation_id, "assistant", assistant_msg)
except sqlite3.DatabaseError as db_error:
logging.error(f"Database error during message save: {str(db_error)}")
return conversation_id, "Database error: Unable to save messages. Please try again or contact support."
save_duration = time.time() - start_time
log_histogram("save_chat_history_to_db_duration", save_duration)
log_counter("save_chat_history_to_db_success")
return conversation_id, "Chat history saved successfully!"
except Exception as e:
log_counter("save_chat_history_to_db_error", labels={"error": str(e)})
error_message = f"Failed to save chat history: {str(e)}"
logging.error(error_message, exc_info=True)
return conversation_id, error_message
def save_chat_history(history, conversation_id, media_content):
log_counter("save_chat_history_attempt")
start_time = time.time()
try:
content, conversation_name = generate_chat_history_content(history, conversation_id, media_content)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_conversation_name = re.sub(r'[^a-zA-Z0-9_-]', '_', conversation_name)
base_filename = f"{safe_conversation_name}_{timestamp}.json"
# Create a temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
temp_file.write(content)
temp_file_path = temp_file.name
# Generate a unique filename
unique_filename = generate_unique_filename(os.path.dirname(temp_file_path), base_filename)
final_path = os.path.join(os.path.dirname(temp_file_path), unique_filename)
# Rename the temporary file to the unique filename
os.rename(temp_file_path, final_path)
save_duration = time.time() - start_time
log_histogram("save_chat_history_duration", save_duration)
log_counter("save_chat_history_success")
return final_path
except Exception as e:
log_counter("save_chat_history_error", labels={"error": str(e)})
logging.error(f"Error saving chat history: {str(e)}")
return None
def generate_chat_history_content(history, conversation_id, media_content):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
conversation_name = get_conversation_name(conversation_id)
if not conversation_name:
media_name = extract_media_name(media_content)
if media_name:
conversation_name = f"{media_name}-chat"
else:
conversation_name = f"chat-{timestamp}" # Fallback name
chat_data = {
"conversation_id": conversation_id,
"conversation_name": conversation_name,
"timestamp": timestamp,
"history": [
{
"role": "user" if i % 2 == 0 else "bot",
"content": msg[0] if isinstance(msg, tuple) else msg
}
for i, msg in enumerate(history)
]
}
return json.dumps(chat_data, indent=2), conversation_name
def extract_media_name(media_content):
if isinstance(media_content, dict):
content = media_content.get('content', {})
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
logging.warning("Failed to parse media_content JSON string")
return None
# Try to extract title from the content
if isinstance(content, dict):
return content.get('title') or content.get('name')
logging.warning(f"Unexpected media_content format: {type(media_content)}")
return None
def update_chat_content(selected_item, use_content, use_summary, use_prompt, item_mapping):
log_counter("update_chat_content_attempt")
start_time = time.time()
logging.debug(f"Debug - Update Chat Content - Selected Item: {selected_item}\n")
logging.debug(f"Debug - Update Chat Content - Use Content: {use_content}\n\n\n\n")
logging.debug(f"Debug - Update Chat Content - Use Summary: {use_summary}\n\n")
logging.debug(f"Debug - Update Chat Content - Use Prompt: {use_prompt}\n\n")
logging.debug(f"Debug - Update Chat Content - Item Mapping: {item_mapping}\n\n")
if selected_item and selected_item in item_mapping:
media_id = item_mapping[selected_item]
content = load_media_content(media_id)
selected_parts = []
if use_content and "content" in content:
selected_parts.append("content")
if use_summary and "summary" in content:
selected_parts.append("summary")
if use_prompt and "prompt" in content:
selected_parts.append("prompt")
# Modified debug print
if isinstance(content, dict):
print(f"Debug - Update Chat Content - Content keys: {list(content.keys())}")
for key, value in content.items():
print(f"Debug - Update Chat Content - {key} (first 500 char): {str(value)[:500]}\n\n\n\n")
else:
print(f"Debug - Update Chat Content - Content(first 500 char): {str(content)[:500]}\n\n\n\n")
print(f"Debug - Update Chat Content - Selected Parts: {selected_parts}")
update_duration = time.time() - start_time
log_histogram("update_chat_content_duration", update_duration)
log_counter("update_chat_content_success")
return content, selected_parts
else:
log_counter("update_chat_content_error", labels={"error": str("No item selected or item not in mapping")})
print(f"Debug - Update Chat Content - No item selected or item not in mapping")
return {}, []
#
# End of Chat functions
#######################################################################################################################
#######################################################################################################################
#
# Character Card Functions
CHARACTERS_FILE = Path('.', 'Helper_Scripts', 'Character_Cards', 'Characters.json')
def save_character(character_data):
log_counter("save_character_attempt")
start_time = time.time()
characters_file = os.path.join(os.path.dirname(__file__), '..', 'Helper_Scripts', 'Character_Cards', 'Characters.json')
characters_dir = os.path.dirname(characters_file)
try:
if os.path.exists(characters_file):
with open(characters_file, 'r') as f:
characters = json.load(f)
else:
characters = {}
char_name = character_data['name']
# Save the image separately if it exists
if 'image' in character_data:
img_data = base64.b64decode(character_data['image'])
img_filename = f"{char_name.replace(' ', '_')}.png"
img_path = os.path.join(characters_dir, img_filename)
with open(img_path, 'wb') as f:
f.write(img_data)
character_data['image_path'] = os.path.abspath(img_path)
del character_data['image'] # Remove the base64 image data from the JSON
characters[char_name] = character_data
with open(characters_file, 'w') as f:
json.dump(characters, f, indent=2)
save_duration = time.time() - start_time
log_histogram("save_character_duration", save_duration)
log_counter("save_character_success")
logging.info(f"Character '{char_name}' saved successfully.")
except Exception as e:
log_counter("save_character_error", labels={"error": str(e)})
logging.error(f"Error saving character: {str(e)}")
def load_characters():
log_counter("load_characters_attempt")
start_time = time.time()
try:
characters_file = os.path.join(os.path.dirname(__file__), '..', 'Helper_Scripts', 'Character_Cards', 'Characters.json')
if os.path.exists(characters_file):
with open(characters_file, 'r') as f:
characters = json.load(f)
logging.debug(f"Loaded {len(characters)} characters from {characters_file}")
load_duration = time.time() - start_time
log_histogram("load_characters_duration", load_duration)
log_counter("load_characters_success", labels={"character_count": len(characters)})
return characters
else:
logging.warning(f"Characters file not found: {characters_file}")
return {}
except Exception as e:
log_counter("load_characters_error", labels={"error": str(e)})
return {}
def get_character_names():
log_counter("get_character_names_attempt")
start_time = time.time()
try:
characters = load_characters()
names = list(characters.keys())
get_names_duration = time.time() - start_time
log_histogram("get_character_names_duration", get_names_duration)
log_counter("get_character_names_success", labels={"name_count": len(names)})
return names
except Exception as e:
log_counter("get_character_names_error", labels={"error": str(e)})
logging.error(f"Error getting character names: {str(e)}")
return []
#
# End of Chat.py
##########################################################################################################################