Spaces:
Running
Running
File size: 12,892 Bytes
fa9a583 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# Chat.py
# Chat functions for interacting with the LLMs as chatbots
# Imports
import json
import logging
import os
import re
import tempfile
from datetime import datetime
from App_Function_Libraries.DB_Manager import get_conversation_name, save_chat_history_to_database
from App_Function_Libraries.LLM_API_Calls import chat_with_openai, chat_with_anthropic, chat_with_cohere, \
chat_with_groq, chat_with_openrouter, chat_with_deepseek, chat_with_mistral, chat_with_huggingface, chat_with_vllm
from App_Function_Libraries.LLM_API_Calls_Local import chat_with_aphrodite, chat_with_local_llm, chat_with_ollama, \
chat_with_kobold, chat_with_llama, chat_with_oobabooga, chat_with_tabbyapi
from App_Function_Libraries.SQLite_DB import load_media_content
from App_Function_Libraries.Utils import generate_unique_filename
#
# External Imports
#
# Local Imports
#
####################################################################################################
def chat(message, history, media_content, selected_parts, api_endpoint, api_key, prompt, temperature,
system_message=None):
try:
logging.info(f"Debug - Chat Function - Message: {message}")
logging.info(f"Debug - Chat Function - Media Content: {media_content}")
logging.info(f"Debug - Chat Function - Selected Parts: {selected_parts}")
logging.info(f"Debug - Chat Function - API Endpoint: {api_endpoint}")
# logging.info(f"Debug - Chat Function - Prompt: {prompt}")
# Ensure selected_parts is a list
if not isinstance(selected_parts, (list, tuple)):
selected_parts = [selected_parts] if selected_parts else []
# logging.debug(f"Debug - Chat Function - Selected Parts (after check): {selected_parts}")
# Combine the selected parts of the media content
combined_content = "\n\n".join(
[f"{part.capitalize()}: {media_content.get(part, '')}" for part in selected_parts if part in media_content])
# Print first 500 chars
# logging.debug(f"Debug - Chat Function - Combined Content: {combined_content[:500]}...")
# Prepare the input for the API
if not history:
input_data = f"{combined_content}\n\nUser: {message}\n"
else:
input_data = f"User: {message}\n"
# Print first 500 chars
# logging.info(f"Debug - Chat Function - Input Data: {input_data[:500]}...")
if system_message:
print(f"System message: {system_message}")
logging.debug(f"Debug - Chat Function - System Message: {system_message}")
temperature = float(temperature) if temperature else 0.7
temp = temperature
logging.debug("Debug - Chat Function - Temperature: {temperature}")
logging.debug(f"Debug - Chat Function - API Key: {api_key[:10]}")
logging.debug(f"Debug - Chat Function - Prompt: {prompt}")
# Use the existing API request code based on the selected endpoint
logging.info(f"Debug - Chat Function - API Endpoint: {api_endpoint}")
if api_endpoint.lower() == 'openai':
response = chat_with_openai(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "anthropic":
response = chat_with_anthropic(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "cohere":
response = chat_with_cohere(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "groq":
response = chat_with_groq(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "openrouter":
response = chat_with_openrouter(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "deepseek":
response = chat_with_deepseek(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "mistral":
response = chat_with_mistral(api_key, input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "llama.cpp":
response = chat_with_llama(input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "kobold":
response = chat_with_kobold(input_data, api_key, prompt, temp, system_message)
elif api_endpoint.lower() == "ooba":
response = chat_with_oobabooga(input_data, api_key, prompt, temp, system_message)
elif api_endpoint.lower() == "tabbyapi":
response = chat_with_tabbyapi(input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "vllm":
response = chat_with_vllm(input_data, prompt, system_message)
elif api_endpoint.lower() == "local-llm":
response = chat_with_local_llm(input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "huggingface":
response = chat_with_huggingface(api_key, input_data, prompt, temp) # , system_message)
elif api_endpoint.lower() == "ollama":
response = chat_with_ollama(input_data, prompt, temp, system_message)
elif api_endpoint.lower() == "aphrodite":
response = chat_with_aphrodite(input_data, prompt, temp, system_message)
else:
raise ValueError(f"Unsupported API endpoint: {api_endpoint}")
return response
except Exception as e:
logging.error(f"Error in chat function: {str(e)}")
return f"An error occurred: {str(e)}"
def save_chat_history_to_db_wrapper(chatbot, conversation_id, media_content):
logging.info(f"Attempting to save chat history. Media content type: {type(media_content)}")
try:
# Extract the media_id and media_name from the media_content
media_id = None
media_name = None
if isinstance(media_content, dict):
logging.debug(f"Media content keys: {media_content.keys()}")
if 'content' in media_content:
try:
content = media_content['content']
if isinstance(content, str):
content_json = json.loads(content)
elif isinstance(content, dict):
content_json = content
else:
raise ValueError(f"Unexpected content type: {type(content)}")
# Use the webpage_url as the media_id
media_id = content_json.get('webpage_url')
# Use the title as the media_name
media_name = content_json.get('title')
logging.info(f"Extracted media_id: {media_id}, media_name: {media_name}")
except json.JSONDecodeError:
logging.error("Failed to decode JSON from media_content['content']")
except Exception as e:
logging.error(f"Error processing media_content: {str(e)}")
else:
logging.warning("'content' key not found in media_content")
else:
logging.warning(f"media_content is not a dictionary. Type: {type(media_content)}")
if media_id is None:
# If we couldn't find a media_id, we'll use a placeholder
media_id = "unknown_media"
logging.warning(f"Unable to extract media_id from media_content. Using placeholder: {media_id}")
if media_name is None:
media_name = "Unnamed Media"
logging.warning(f"Unable to extract media_name from media_content. Using placeholder: {media_name}")
# Generate a unique conversation name using media_id and current timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
conversation_name = f"Chat_{media_id}_{timestamp}"
new_conversation_id = save_chat_history_to_database(chatbot, conversation_id, media_id, media_name,
conversation_name)
return new_conversation_id, f"Chat history saved successfully as {conversation_name}!"
except Exception as e:
error_message = f"Failed to save chat history: {str(e)}"
logging.error(error_message, exc_info=True)
return conversation_id, error_message
def save_chat_history(history, conversation_id, media_content):
try:
content, conversation_name = generate_chat_history_content(history, conversation_id, media_content)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_conversation_name = re.sub(r'[^a-zA-Z0-9_-]', '_', conversation_name)
base_filename = f"{safe_conversation_name}_{timestamp}.json"
# Create a temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
temp_file.write(content)
temp_file_path = temp_file.name
# Generate a unique filename
unique_filename = generate_unique_filename(os.path.dirname(temp_file_path), base_filename)
final_path = os.path.join(os.path.dirname(temp_file_path), unique_filename)
# Rename the temporary file to the unique filename
os.rename(temp_file_path, final_path)
return final_path
except Exception as e:
logging.error(f"Error saving chat history: {str(e)}")
return None
def generate_chat_history_content(history, conversation_id, media_content):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
conversation_name = get_conversation_name(conversation_id)
if not conversation_name:
media_name = extract_media_name(media_content)
if media_name:
conversation_name = f"{media_name}-chat"
else:
conversation_name = f"chat-{timestamp}" # Fallback name
chat_data = {
"conversation_id": conversation_id,
"conversation_name": conversation_name,
"timestamp": timestamp,
"history": [
{
"role": "user" if i % 2 == 0 else "bot",
"content": msg[0] if isinstance(msg, tuple) else msg
}
for i, msg in enumerate(history)
]
}
return json.dumps(chat_data, indent=2), conversation_name
def extract_media_name(media_content):
if isinstance(media_content, dict):
content = media_content.get('content', {})
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
logging.warning("Failed to parse media_content JSON string")
return None
# Try to extract title from the content
if isinstance(content, dict):
return content.get('title') or content.get('name')
logging.warning(f"Unexpected media_content format: {type(media_content)}")
return None
def update_chat_content(selected_item, use_content, use_summary, use_prompt, item_mapping):
logging.debug(f"Debug - Update Chat Content - Selected Item: {selected_item}\n")
logging.debug(f"Debug - Update Chat Content - Use Content: {use_content}\n\n\n\n")
logging.debug(f"Debug - Update Chat Content - Use Summary: {use_summary}\n\n")
logging.debug(f"Debug - Update Chat Content - Use Prompt: {use_prompt}\n\n")
logging.debug(f"Debug - Update Chat Content - Item Mapping: {item_mapping}\n\n")
if selected_item and selected_item in item_mapping:
media_id = item_mapping[selected_item]
content = load_media_content(media_id)
selected_parts = []
if use_content and "content" in content:
selected_parts.append("content")
if use_summary and "summary" in content:
selected_parts.append("summary")
if use_prompt and "prompt" in content:
selected_parts.append("prompt")
# Modified debug print
if isinstance(content, dict):
print(f"Debug - Update Chat Content - Content keys: {list(content.keys())}")
for key, value in content.items():
print(f"Debug - Update Chat Content - {key} (first 500 char): {str(value)[:500]}\n\n\n\n")
else:
print(f"Debug - Update Chat Content - Content(first 500 char): {str(content)[:500]}\n\n\n\n")
print(f"Debug - Update Chat Content - Selected Parts: {selected_parts}")
return content, selected_parts
else:
print(f"Debug - Update Chat Content - No item selected or item not in mapping")
return {}, []
#
# End of Chat.py
########################################################################################################################## |