Spaces:
Running
Running
# character_chat_db.py | |
# Database functions for managing character cards and chat histories. | |
# # | |
# Imports | |
import configparser | |
import sqlite3 | |
import json | |
import os | |
import sys | |
from typing import List, Dict, Optional, Tuple, Any, Union | |
from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path | |
from Tests.Chat_APIs.Chat_APIs_Integration_test import logging | |
# | |
####################################################################################################################### | |
# | |
# | |
def ensure_database_directory(): | |
os.makedirs(get_database_dir(), exist_ok=True) | |
ensure_database_directory() | |
# Construct the path to the config file | |
config_path = get_project_relative_path('Config_Files/config.txt') | |
# Read the config file | |
config = configparser.ConfigParser() | |
config.read(config_path) | |
# Get the chat db path from the config, or use the default if not specified | |
chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db')) | |
print(f"Chat Database path: {chat_DB_PATH}") | |
######################################################################################################## | |
# | |
# Functions | |
# FIXME - Setup properly and test/add documentation for its existence... | |
def initialize_database(): | |
"""Initialize the SQLite database with required tables and FTS5 virtual tables.""" | |
conn = None | |
try: | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
# Enable foreign key constraints | |
cursor.execute("PRAGMA foreign_keys = ON;") | |
# Create CharacterCards table with V2 fields | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS CharacterCards ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
name TEXT UNIQUE NOT NULL, | |
description TEXT, | |
personality TEXT, | |
scenario TEXT, | |
image BLOB, | |
post_history_instructions TEXT, | |
first_mes TEXT, | |
mes_example TEXT, | |
creator_notes TEXT, | |
system_prompt TEXT, | |
alternate_greetings TEXT, | |
tags TEXT, | |
creator TEXT, | |
character_version TEXT, | |
extensions TEXT, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
); | |
""") | |
# Create CharacterChats table | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS CharacterChats ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
character_id INTEGER NOT NULL, | |
conversation_name TEXT, | |
chat_history TEXT, | |
is_snapshot BOOLEAN DEFAULT FALSE, | |
created_at DATETIME DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE | |
); | |
""") | |
# Create FTS5 virtual table for CharacterChats | |
cursor.execute(""" | |
CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5( | |
conversation_name, | |
chat_history, | |
content='CharacterChats', | |
content_rowid='id' | |
); | |
""") | |
# Create triggers to keep FTS5 table in sync with CharacterChats | |
cursor.executescript(""" | |
CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN | |
INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history) | |
VALUES (new.id, new.conversation_name, new.chat_history); | |
END; | |
CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN | |
DELETE FROM CharacterChats_fts WHERE rowid = old.id; | |
END; | |
CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN | |
UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history | |
WHERE rowid = new.id; | |
END; | |
""") | |
# Create ChatKeywords table | |
cursor.execute(""" | |
CREATE TABLE IF NOT EXISTS ChatKeywords ( | |
chat_id INTEGER NOT NULL, | |
keyword TEXT NOT NULL, | |
FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE | |
); | |
""") | |
# Create indexes for faster searches | |
cursor.execute(""" | |
CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword); | |
""") | |
cursor.execute(""" | |
CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id); | |
""") | |
conn.commit() | |
logging.info("Database initialized successfully.") | |
except sqlite3.Error as e: | |
logging.error(f"SQLite error occurred during database initialization: {e}") | |
if conn: | |
conn.rollback() | |
raise | |
except Exception as e: | |
logging.error(f"Unexpected error occurred during database initialization: {e}") | |
if conn: | |
conn.rollback() | |
raise | |
finally: | |
if conn: | |
conn.close() | |
# Call initialize_database() at the start of your application | |
def setup_chat_database(): | |
try: | |
initialize_database() | |
except Exception as e: | |
logging.critical(f"Failed to initialize database: {e}") | |
sys.exit(1) | |
setup_chat_database() | |
######################################################################################################## | |
# | |
# Character Card handling | |
def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]: | |
"""Parse and validate a character card according to V2 specification.""" | |
v2_data = { | |
'name': card_data.get('name', ''), | |
'description': card_data.get('description', ''), | |
'personality': card_data.get('personality', ''), | |
'scenario': card_data.get('scenario', ''), | |
'first_mes': card_data.get('first_mes', ''), | |
'mes_example': card_data.get('mes_example', ''), | |
'creator_notes': card_data.get('creator_notes', ''), | |
'system_prompt': card_data.get('system_prompt', ''), | |
'post_history_instructions': card_data.get('post_history_instructions', ''), | |
'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])), | |
'tags': json.dumps(card_data.get('tags', [])), | |
'creator': card_data.get('creator', ''), | |
'character_version': card_data.get('character_version', ''), | |
'extensions': json.dumps(card_data.get('extensions', {})) | |
} | |
# Handle 'image' separately as it might be binary data | |
if 'image' in card_data: | |
v2_data['image'] = card_data['image'] | |
return v2_data | |
def add_character_card(card_data: Dict[str, Any]) -> Optional[int]: | |
"""Add or update a character card in the database.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
parsed_card = parse_character_card(card_data) | |
# Check if character already exists | |
cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],)) | |
row = cursor.fetchone() | |
if row: | |
# Update existing character | |
character_id = row[0] | |
update_query = """ | |
UPDATE CharacterCards | |
SET description = ?, personality = ?, scenario = ?, image = ?, | |
post_history_instructions = ?, first_mes = ?, mes_example = ?, | |
creator_notes = ?, system_prompt = ?, alternate_greetings = ?, | |
tags = ?, creator = ?, character_version = ?, extensions = ? | |
WHERE id = ? | |
""" | |
cursor.execute(update_query, ( | |
parsed_card['description'], parsed_card['personality'], parsed_card['scenario'], | |
parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'], | |
parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'], | |
parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'], | |
parsed_card['character_version'], parsed_card['extensions'], character_id | |
)) | |
else: | |
# Insert new character | |
insert_query = """ | |
INSERT INTO CharacterCards (name, description, personality, scenario, image, | |
post_history_instructions, first_mes, mes_example, creator_notes, system_prompt, | |
alternate_greetings, tags, creator, character_version, extensions) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
""" | |
cursor.execute(insert_query, ( | |
parsed_card['name'], parsed_card['description'], parsed_card['personality'], | |
parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'], | |
parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'], | |
parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'], | |
parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions'] | |
)) | |
character_id = cursor.lastrowid | |
conn.commit() | |
return character_id | |
except sqlite3.IntegrityError as e: | |
logging.error(f"Error adding character card: {e}") | |
return None | |
except Exception as e: | |
logging.error(f"Unexpected error adding character card: {e}") | |
return None | |
finally: | |
conn.close() | |
# def add_character_card(card_data: Dict) -> Optional[int]: | |
# """Add or update a character card in the database. | |
# | |
# Returns the ID of the inserted character or None if failed. | |
# """ | |
# conn = sqlite3.connect(chat_DB_PATH) | |
# cursor = conn.cursor() | |
# try: | |
# # Ensure all required fields are present | |
# required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message'] | |
# for field in required_fields: | |
# if field not in card_data: | |
# card_data[field] = '' # Assign empty string if field is missing | |
# | |
# # Check if character already exists | |
# cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],)) | |
# row = cursor.fetchone() | |
# | |
# if row: | |
# # Update existing character | |
# character_id = row[0] | |
# cursor.execute(""" | |
# UPDATE CharacterCards | |
# SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ? | |
# WHERE id = ? | |
# """, ( | |
# card_data['description'], | |
# card_data['personality'], | |
# card_data['scenario'], | |
# card_data['image'], | |
# card_data['post_history_instructions'], | |
# card_data['first_message'], | |
# character_id | |
# )) | |
# else: | |
# # Insert new character | |
# cursor.execute(""" | |
# INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message) | |
# VALUES (?, ?, ?, ?, ?, ?, ?) | |
# """, ( | |
# card_data['name'], | |
# card_data['description'], | |
# card_data['personality'], | |
# card_data['scenario'], | |
# card_data['image'], | |
# card_data['post_history_instructions'], | |
# card_data['first_message'] | |
# )) | |
# character_id = cursor.lastrowid | |
# | |
# conn.commit() | |
# return cursor.lastrowid | |
# except sqlite3.IntegrityError as e: | |
# logging.error(f"Error adding character card: {e}") | |
# return None | |
# except Exception as e: | |
# logging.error(f"Unexpected error adding character card: {e}") | |
# return None | |
# finally: | |
# conn.close() | |
def get_character_cards() -> List[Dict]: | |
"""Retrieve all character cards from the database.""" | |
logging.debug(f"Fetching characters from DB: {chat_DB_PATH}") | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("SELECT * FROM CharacterCards") | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
conn.close() | |
characters = [dict(zip(columns, row)) for row in rows] | |
#logging.debug(f"Characters fetched from DB: {characters}") | |
return characters | |
def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]: | |
""" | |
Retrieve a single character card by its ID. | |
Args: | |
character_id: Can be either an integer ID or a dictionary containing character data. | |
Returns: | |
A dictionary containing the character card data, or None if not found. | |
""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
if isinstance(character_id, dict): | |
# If a dictionary is passed, assume it's already a character card | |
return character_id | |
elif isinstance(character_id, int): | |
# If an integer is passed, fetch the character from the database | |
cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,)) | |
row = cursor.fetchone() | |
if row: | |
columns = [description[0] for description in cursor.description] | |
return dict(zip(columns, row)) | |
else: | |
logging.warning(f"Invalid type for character_id: {type(character_id)}") | |
return None | |
except Exception as e: | |
logging.error(f"Error in get_character_card_by_id: {e}") | |
return None | |
finally: | |
conn.close() | |
def update_character_card(character_id: int, card_data: Dict) -> bool: | |
"""Update an existing character card.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
cursor.execute(""" | |
UPDATE CharacterCards | |
SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ? | |
WHERE id = ? | |
""", ( | |
card_data.get('name'), | |
card_data.get('description'), | |
card_data.get('personality'), | |
card_data.get('scenario'), | |
card_data.get('image'), | |
card_data.get('post_history_instructions', ''), | |
card_data.get('first_message', "Hello! I'm ready to chat."), | |
character_id | |
)) | |
conn.commit() | |
return cursor.rowcount > 0 | |
except sqlite3.IntegrityError as e: | |
logging.error(f"Error updating character card: {e}") | |
return False | |
finally: | |
conn.close() | |
def delete_character_card(character_id: int) -> bool: | |
"""Delete a character card and its associated chats.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
# Delete associated chats first due to foreign key constraint | |
cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,)) | |
cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,)) | |
conn.commit() | |
return cursor.rowcount > 0 | |
except sqlite3.Error as e: | |
logging.error(f"Error deleting character card: {e}") | |
return False | |
finally: | |
conn.close() | |
def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]: | |
""" | |
Add a new chat history for a character, optionally associating keywords. | |
Args: | |
character_id (int): The ID of the character. | |
conversation_name (str): Name of the conversation. | |
chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples. | |
keywords (Optional[List[str]]): List of keywords to associate with this chat. | |
is_snapshot (bool, optional): Whether this chat is a snapshot. | |
Returns: | |
Optional[int]: The ID of the inserted chat or None if failed. | |
""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
chat_history_json = json.dumps(chat_history) | |
cursor.execute(""" | |
INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot) | |
VALUES (?, ?, ?, ?) | |
""", ( | |
character_id, | |
conversation_name, | |
chat_history_json, | |
is_snapshot | |
)) | |
chat_id = cursor.lastrowid | |
if keywords: | |
# Insert keywords into ChatKeywords table | |
keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords] | |
cursor.executemany(""" | |
INSERT INTO ChatKeywords (chat_id, keyword) | |
VALUES (?, ?) | |
""", keyword_records) | |
conn.commit() | |
return chat_id | |
except sqlite3.Error as e: | |
logging.error(f"Error adding character chat: {e}") | |
return None | |
finally: | |
conn.close() | |
def get_character_chats(character_id: Optional[int] = None) -> List[Dict]: | |
"""Retrieve all chats, or chats for a specific character if character_id is provided.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
if character_id is not None: | |
cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,)) | |
else: | |
cursor.execute("SELECT * FROM CharacterChats") | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
conn.close() | |
return [dict(zip(columns, row)) for row in rows] | |
def get_character_chat_by_id(chat_id: int) -> Optional[Dict]: | |
"""Retrieve a single chat by its ID.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,)) | |
row = cursor.fetchone() | |
conn.close() | |
if row: | |
columns = [description[0] for description in cursor.description] | |
chat = dict(zip(columns, row)) | |
chat['chat_history'] = json.loads(chat['chat_history']) | |
return chat | |
return None | |
def search_character_chats(query: str) -> Tuple[List[Dict], str]: | |
""" | |
Search for character chats using FTS5. | |
Args: | |
query (str): The search query. | |
Returns: | |
Tuple[List[Dict], str]: A list of matching chats and a status message. | |
""" | |
if not query.strip(): | |
return [], "Please enter a search query." | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
# Use parameterized queries to prevent SQL injection | |
cursor.execute(""" | |
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history | |
FROM CharacterChats_fts | |
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id | |
WHERE CharacterChats_fts MATCH ? | |
ORDER BY rank | |
""", (query,)) | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
results = [dict(zip(columns, row)) for row in rows] | |
status_message = f"Found {len(results)} chat(s) matching '{query}'." | |
return results, status_message | |
except Exception as e: | |
logging.error(f"Error searching chats with FTS5: {e}") | |
return [], f"Error occurred during search: {e}" | |
finally: | |
conn.close() | |
def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool: | |
"""Update an existing chat history.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
chat_history_json = json.dumps(chat_history) | |
cursor.execute(""" | |
UPDATE CharacterChats | |
SET chat_history = ? | |
WHERE id = ? | |
""", ( | |
chat_history_json, | |
chat_id | |
)) | |
conn.commit() | |
return cursor.rowcount > 0 | |
except sqlite3.Error as e: | |
logging.error(f"Error updating character chat: {e}") | |
return False | |
finally: | |
conn.close() | |
def delete_character_chat(chat_id: int) -> bool: | |
"""Delete a specific chat.""" | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,)) | |
conn.commit() | |
return cursor.rowcount > 0 | |
except sqlite3.Error as e: | |
logging.error(f"Error deleting character chat: {e}") | |
return False | |
finally: | |
conn.close() | |
def fetch_keywords_for_chats(keywords: List[str]) -> List[int]: | |
""" | |
Fetch chat IDs associated with any of the specified keywords. | |
Args: | |
keywords (List[str]): List of keywords to search for. | |
Returns: | |
List[int]: List of chat IDs associated with the keywords. | |
""" | |
if not keywords: | |
return [] | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
# Construct the WHERE clause to search for each keyword | |
keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords)) | |
sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}" | |
cursor.execute(sql_query, keywords) | |
rows = cursor.fetchall() | |
chat_ids = [row[0] for row in rows] | |
return chat_ids | |
except Exception as e: | |
logging.error(f"Error in fetch_keywords_for_chats: {e}") | |
return [] | |
finally: | |
conn.close() | |
def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]: | |
"""Save chat history to the CharacterChats table. | |
Returns the ID of the inserted chat or None if failed. | |
""" | |
return add_character_chat(character_id, conversation_name, chat_history) | |
def migrate_chat_to_media_db(): | |
pass | |
def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]: | |
""" | |
Perform a full-text search on specified fields with optional filtering and pagination. | |
Args: | |
query (str): The search query. | |
fields (List[str]): List of fields to search in. | |
where_clause (str, optional): Additional SQL WHERE clause to filter results. | |
page (int, optional): Page number for pagination. | |
results_per_page (int, optional): Number of results per page. | |
Returns: | |
List[Dict[str, Any]]: List of matching chat records with content and metadata. | |
""" | |
if not query.strip(): | |
return [] | |
conn = sqlite3.connect(chat_DB_PATH) | |
cursor = conn.cursor() | |
try: | |
# Construct the MATCH query for FTS5 | |
match_query = " AND ".join(fields) + f" MATCH ?" | |
# Adjust the query with the fields | |
fts_query = f""" | |
SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history | |
FROM CharacterChats_fts | |
JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id | |
WHERE {match_query} | |
""" | |
if where_clause: | |
fts_query += f" AND ({where_clause})" | |
fts_query += " ORDER BY rank LIMIT ? OFFSET ?" | |
offset = (page - 1) * results_per_page | |
cursor.execute(fts_query, (query, results_per_page, offset)) | |
rows = cursor.fetchall() | |
columns = [description[0] for description in cursor.description] | |
results = [dict(zip(columns, row)) for row in rows] | |
return results | |
except Exception as e: | |
logging.error(f"Error in search_db: {e}") | |
return [] | |
finally: | |
conn.close() | |
def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \ | |
List[Dict[str, Any]]: | |
""" | |
Perform a full-text search within the specified chat IDs using FTS5. | |
Args: | |
query (str): The user's query. | |
relevant_chat_ids (List[int]): List of chat IDs to search within. | |
page (int): Pagination page number. | |
results_per_page (int): Number of results per page. | |
Returns: | |
List[Dict[str, Any]]: List of search results with content and metadata. | |
""" | |
try: | |
# Construct a WHERE clause to limit the search to relevant chat IDs | |
where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids]) | |
if not where_clause: | |
where_clause = "1" # No restriction if no chat IDs | |
# Perform full-text search using FTS5 | |
fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page) | |
filtered_fts_results = [ | |
{ | |
"content": result['content'], | |
"metadata": {"media_id": result['id']} | |
} | |
for result in fts_results | |
if result['id'] in relevant_chat_ids | |
] | |
return filtered_fts_results | |
except Exception as e: | |
logging.error(f"Error in perform_full_text_search_chat: {str(e)}") | |
return [] | |
def fetch_all_chats() -> List[Dict[str, Any]]: | |
""" | |
Fetch all chat messages from the database. | |
Returns: | |
List[Dict[str, Any]]: List of chat messages with relevant metadata. | |
""" | |
try: | |
chats = get_character_chats() # Modify this function to retrieve all chats | |
return chats | |
except Exception as e: | |
logging.error(f"Error fetching all chats: {str(e)}") | |
return [] | |
# | |
# End of Character_Chat_DB.py | |
####################################################################################################################### | |