oceansweep commited on
Commit
03f9c5c
1 Parent(s): f61f0a3

Update App_Function_Libraries/DB/Character_Chat_DB.py

Browse files
App_Function_Libraries/DB/Character_Chat_DB.py CHANGED
@@ -1,684 +1,684 @@
1
- # character_chat_db.py
2
- # Database functions for managing character cards and chat histories.
3
- # #
4
- # Imports
5
- import configparser
6
- import sqlite3
7
- import json
8
- import os
9
- import sys
10
- from typing import List, Dict, Optional, Tuple, Any, Union
11
-
12
- from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path
13
- from Tests.Chat_APIs.Chat_APIs_Integration_test import logging
14
-
15
- #
16
- #######################################################################################################################
17
- #
18
- #
19
-
20
- def ensure_database_directory():
21
- os.makedirs(get_database_dir(), exist_ok=True)
22
-
23
- ensure_database_directory()
24
-
25
-
26
- # Construct the path to the config file
27
- config_path = get_project_relative_path('Config_Files/config.txt')
28
-
29
- # Read the config file
30
- config = configparser.ConfigParser()
31
- config.read(config_path)
32
-
33
- # Get the chat db path from the config, or use the default if not specified
34
- chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db'))
35
- print(f"Chat Database path: {chat_DB_PATH}")
36
-
37
- ########################################################################################################
38
- #
39
- # Functions
40
-
41
- # FIXME - Setup properly and test/add documentation for its existence...
42
- def initialize_database():
43
- """Initialize the SQLite database with required tables and FTS5 virtual tables."""
44
- conn = None
45
- try:
46
- conn = sqlite3.connect(chat_DB_PATH)
47
- cursor = conn.cursor()
48
-
49
- # Enable foreign key constraints
50
- cursor.execute("PRAGMA foreign_keys = ON;")
51
-
52
- # Create CharacterCards table with V2 fields
53
- cursor.execute("""
54
- CREATE TABLE IF NOT EXISTS CharacterCards (
55
- id INTEGER PRIMARY KEY AUTOINCREMENT,
56
- name TEXT UNIQUE NOT NULL,
57
- description TEXT,
58
- personality TEXT,
59
- scenario TEXT,
60
- image BLOB,
61
- post_history_instructions TEXT,
62
- first_mes TEXT,
63
- mes_example TEXT,
64
- creator_notes TEXT,
65
- system_prompt TEXT,
66
- alternate_greetings TEXT,
67
- tags TEXT,
68
- creator TEXT,
69
- character_version TEXT,
70
- extensions TEXT,
71
- created_at DATETIME DEFAULT CURRENT_TIMESTAMP
72
- );
73
- """)
74
-
75
- # Create CharacterChats table
76
- cursor.execute("""
77
- CREATE TABLE IF NOT EXISTS CharacterChats (
78
- id INTEGER PRIMARY KEY AUTOINCREMENT,
79
- character_id INTEGER NOT NULL,
80
- conversation_name TEXT,
81
- chat_history TEXT,
82
- is_snapshot BOOLEAN DEFAULT FALSE,
83
- created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
84
- FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE
85
- );
86
- """)
87
-
88
- # Create FTS5 virtual table for CharacterChats
89
- cursor.execute("""
90
- CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5(
91
- conversation_name,
92
- chat_history,
93
- content='CharacterChats',
94
- content_rowid='id'
95
- );
96
- """)
97
-
98
- # Create triggers to keep FTS5 table in sync with CharacterChats
99
- cursor.executescript("""
100
- CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN
101
- INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history)
102
- VALUES (new.id, new.conversation_name, new.chat_history);
103
- END;
104
-
105
- CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN
106
- DELETE FROM CharacterChats_fts WHERE rowid = old.id;
107
- END;
108
-
109
- CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN
110
- UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history
111
- WHERE rowid = new.id;
112
- END;
113
- """)
114
-
115
- # Create ChatKeywords table
116
- cursor.execute("""
117
- CREATE TABLE IF NOT EXISTS ChatKeywords (
118
- chat_id INTEGER NOT NULL,
119
- keyword TEXT NOT NULL,
120
- FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE
121
- );
122
- """)
123
-
124
- # Create indexes for faster searches
125
- cursor.execute("""
126
- CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword);
127
- """)
128
- cursor.execute("""
129
- CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id);
130
- """)
131
-
132
- conn.commit()
133
- logging.info("Database initialized successfully.")
134
- except sqlite3.Error as e:
135
- logging.error(f"SQLite error occurred during database initialization: {e}")
136
- if conn:
137
- conn.rollback()
138
- raise
139
- except Exception as e:
140
- logging.error(f"Unexpected error occurred during database initialization: {e}")
141
- if conn:
142
- conn.rollback()
143
- raise
144
- finally:
145
- if conn:
146
- conn.close()
147
-
148
- # Call initialize_database() at the start of your application
149
- def setup_chat_database():
150
- try:
151
- initialize_database()
152
- except Exception as e:
153
- logging.critical(f"Failed to initialize database: {e}")
154
- sys.exit(1)
155
-
156
- setup_chat_database()
157
-
158
- ########################################################################################################
159
- #
160
- # Character Card handling
161
-
162
- def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]:
163
- """Parse and validate a character card according to V2 specification."""
164
- v2_data = {
165
- 'name': card_data.get('name', ''),
166
- 'description': card_data.get('description', ''),
167
- 'personality': card_data.get('personality', ''),
168
- 'scenario': card_data.get('scenario', ''),
169
- 'first_mes': card_data.get('first_mes', ''),
170
- 'mes_example': card_data.get('mes_example', ''),
171
- 'creator_notes': card_data.get('creator_notes', ''),
172
- 'system_prompt': card_data.get('system_prompt', ''),
173
- 'post_history_instructions': card_data.get('post_history_instructions', ''),
174
- 'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])),
175
- 'tags': json.dumps(card_data.get('tags', [])),
176
- 'creator': card_data.get('creator', ''),
177
- 'character_version': card_data.get('character_version', ''),
178
- 'extensions': json.dumps(card_data.get('extensions', {}))
179
- }
180
-
181
- # Handle 'image' separately as it might be binary data
182
- if 'image' in card_data:
183
- v2_data['image'] = card_data['image']
184
-
185
- return v2_data
186
-
187
-
188
- def add_character_card(card_data: Dict[str, Any]) -> Optional[int]:
189
- """Add or update a character card in the database."""
190
- conn = sqlite3.connect(chat_DB_PATH)
191
- cursor = conn.cursor()
192
- try:
193
- parsed_card = parse_character_card(card_data)
194
-
195
- # Check if character already exists
196
- cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],))
197
- row = cursor.fetchone()
198
-
199
- if row:
200
- # Update existing character
201
- character_id = row[0]
202
- update_query = """
203
- UPDATE CharacterCards
204
- SET description = ?, personality = ?, scenario = ?, image = ?,
205
- post_history_instructions = ?, first_mes = ?, mes_example = ?,
206
- creator_notes = ?, system_prompt = ?, alternate_greetings = ?,
207
- tags = ?, creator = ?, character_version = ?, extensions = ?
208
- WHERE id = ?
209
- """
210
- cursor.execute(update_query, (
211
- parsed_card['description'], parsed_card['personality'], parsed_card['scenario'],
212
- parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'],
213
- parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'],
214
- parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'],
215
- parsed_card['character_version'], parsed_card['extensions'], character_id
216
- ))
217
- else:
218
- # Insert new character
219
- insert_query = """
220
- INSERT INTO CharacterCards (name, description, personality, scenario, image,
221
- post_history_instructions, first_mes, mes_example, creator_notes, system_prompt,
222
- alternate_greetings, tags, creator, character_version, extensions)
223
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
224
- """
225
- cursor.execute(insert_query, (
226
- parsed_card['name'], parsed_card['description'], parsed_card['personality'],
227
- parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'],
228
- parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'],
229
- parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'],
230
- parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions']
231
- ))
232
- character_id = cursor.lastrowid
233
-
234
- conn.commit()
235
- return character_id
236
- except sqlite3.IntegrityError as e:
237
- logging.error(f"Error adding character card: {e}")
238
- return None
239
- except Exception as e:
240
- logging.error(f"Unexpected error adding character card: {e}")
241
- return None
242
- finally:
243
- conn.close()
244
-
245
- # def add_character_card(card_data: Dict) -> Optional[int]:
246
- # """Add or update a character card in the database.
247
- #
248
- # Returns the ID of the inserted character or None if failed.
249
- # """
250
- # conn = sqlite3.connect(chat_DB_PATH)
251
- # cursor = conn.cursor()
252
- # try:
253
- # # Ensure all required fields are present
254
- # required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message']
255
- # for field in required_fields:
256
- # if field not in card_data:
257
- # card_data[field] = '' # Assign empty string if field is missing
258
- #
259
- # # Check if character already exists
260
- # cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],))
261
- # row = cursor.fetchone()
262
- #
263
- # if row:
264
- # # Update existing character
265
- # character_id = row[0]
266
- # cursor.execute("""
267
- # UPDATE CharacterCards
268
- # SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
269
- # WHERE id = ?
270
- # """, (
271
- # card_data['description'],
272
- # card_data['personality'],
273
- # card_data['scenario'],
274
- # card_data['image'],
275
- # card_data['post_history_instructions'],
276
- # card_data['first_message'],
277
- # character_id
278
- # ))
279
- # else:
280
- # # Insert new character
281
- # cursor.execute("""
282
- # INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message)
283
- # VALUES (?, ?, ?, ?, ?, ?, ?)
284
- # """, (
285
- # card_data['name'],
286
- # card_data['description'],
287
- # card_data['personality'],
288
- # card_data['scenario'],
289
- # card_data['image'],
290
- # card_data['post_history_instructions'],
291
- # card_data['first_message']
292
- # ))
293
- # character_id = cursor.lastrowid
294
- #
295
- # conn.commit()
296
- # return cursor.lastrowid
297
- # except sqlite3.IntegrityError as e:
298
- # logging.error(f"Error adding character card: {e}")
299
- # return None
300
- # except Exception as e:
301
- # logging.error(f"Unexpected error adding character card: {e}")
302
- # return None
303
- # finally:
304
- # conn.close()
305
-
306
-
307
- def get_character_cards() -> List[Dict]:
308
- """Retrieve all character cards from the database."""
309
- logging.debug(f"Fetching characters from DB: {chat_DB_PATH}")
310
- conn = sqlite3.connect(chat_DB_PATH)
311
- cursor = conn.cursor()
312
- cursor.execute("SELECT * FROM CharacterCards")
313
- rows = cursor.fetchall()
314
- columns = [description[0] for description in cursor.description]
315
- conn.close()
316
- characters = [dict(zip(columns, row)) for row in rows]
317
- #logging.debug(f"Characters fetched from DB: {characters}")
318
- return characters
319
-
320
-
321
- def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
322
- """
323
- Retrieve a single character card by its ID.
324
-
325
- Args:
326
- character_id: Can be either an integer ID or a dictionary containing character data.
327
-
328
- Returns:
329
- A dictionary containing the character card data, or None if not found.
330
- """
331
- conn = sqlite3.connect(chat_DB_PATH)
332
- cursor = conn.cursor()
333
- try:
334
- if isinstance(character_id, dict):
335
- # If a dictionary is passed, assume it's already a character card
336
- return character_id
337
- elif isinstance(character_id, int):
338
- # If an integer is passed, fetch the character from the database
339
- cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,))
340
- row = cursor.fetchone()
341
- if row:
342
- columns = [description[0] for description in cursor.description]
343
- return dict(zip(columns, row))
344
- else:
345
- logging.warning(f"Invalid type for character_id: {type(character_id)}")
346
- return None
347
- except Exception as e:
348
- logging.error(f"Error in get_character_card_by_id: {e}")
349
- return None
350
- finally:
351
- conn.close()
352
-
353
-
354
- def update_character_card(character_id: int, card_data: Dict) -> bool:
355
- """Update an existing character card."""
356
- conn = sqlite3.connect(chat_DB_PATH)
357
- cursor = conn.cursor()
358
- try:
359
- cursor.execute("""
360
- UPDATE CharacterCards
361
- SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
362
- WHERE id = ?
363
- """, (
364
- card_data.get('name'),
365
- card_data.get('description'),
366
- card_data.get('personality'),
367
- card_data.get('scenario'),
368
- card_data.get('image'),
369
- card_data.get('post_history_instructions', ''),
370
- card_data.get('first_message', "Hello! I'm ready to chat."),
371
- character_id
372
- ))
373
- conn.commit()
374
- return cursor.rowcount > 0
375
- except sqlite3.IntegrityError as e:
376
- logging.error(f"Error updating character card: {e}")
377
- return False
378
- finally:
379
- conn.close()
380
-
381
-
382
- def delete_character_card(character_id: int) -> bool:
383
- """Delete a character card and its associated chats."""
384
- conn = sqlite3.connect(chat_DB_PATH)
385
- cursor = conn.cursor()
386
- try:
387
- # Delete associated chats first due to foreign key constraint
388
- cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,))
389
- cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,))
390
- conn.commit()
391
- return cursor.rowcount > 0
392
- except sqlite3.Error as e:
393
- logging.error(f"Error deleting character card: {e}")
394
- return False
395
- finally:
396
- conn.close()
397
-
398
-
399
- def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]:
400
- """
401
- Add a new chat history for a character, optionally associating keywords.
402
-
403
- Args:
404
- character_id (int): The ID of the character.
405
- conversation_name (str): Name of the conversation.
406
- chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples.
407
- keywords (Optional[List[str]]): List of keywords to associate with this chat.
408
- is_snapshot (bool, optional): Whether this chat is a snapshot.
409
-
410
- Returns:
411
- Optional[int]: The ID of the inserted chat or None if failed.
412
- """
413
- conn = sqlite3.connect(chat_DB_PATH)
414
- cursor = conn.cursor()
415
- try:
416
- chat_history_json = json.dumps(chat_history)
417
- cursor.execute("""
418
- INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot)
419
- VALUES (?, ?, ?, ?)
420
- """, (
421
- character_id,
422
- conversation_name,
423
- chat_history_json,
424
- is_snapshot
425
- ))
426
- chat_id = cursor.lastrowid
427
-
428
- if keywords:
429
- # Insert keywords into ChatKeywords table
430
- keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords]
431
- cursor.executemany("""
432
- INSERT INTO ChatKeywords (chat_id, keyword)
433
- VALUES (?, ?)
434
- """, keyword_records)
435
-
436
- conn.commit()
437
- return chat_id
438
- except sqlite3.Error as e:
439
- logging.error(f"Error adding character chat: {e}")
440
- return None
441
- finally:
442
- conn.close()
443
-
444
-
445
- def get_character_chats(character_id: Optional[int] = None) -> List[Dict]:
446
- """Retrieve all chats, or chats for a specific character if character_id is provided."""
447
- conn = sqlite3.connect(chat_DB_PATH)
448
- cursor = conn.cursor()
449
- if character_id is not None:
450
- cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,))
451
- else:
452
- cursor.execute("SELECT * FROM CharacterChats")
453
- rows = cursor.fetchall()
454
- columns = [description[0] for description in cursor.description]
455
- conn.close()
456
- return [dict(zip(columns, row)) for row in rows]
457
-
458
-
459
- def get_character_chat_by_id(chat_id: int) -> Optional[Dict]:
460
- """Retrieve a single chat by its ID."""
461
- conn = sqlite3.connect(chat_DB_PATH)
462
- cursor = conn.cursor()
463
- cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,))
464
- row = cursor.fetchone()
465
- conn.close()
466
- if row:
467
- columns = [description[0] for description in cursor.description]
468
- chat = dict(zip(columns, row))
469
- chat['chat_history'] = json.loads(chat['chat_history'])
470
- return chat
471
- return None
472
-
473
-
474
- def search_character_chats(query: str) -> Tuple[List[Dict], str]:
475
- """
476
- Search for character chats using FTS5.
477
-
478
- Args:
479
- query (str): The search query.
480
-
481
- Returns:
482
- Tuple[List[Dict], str]: A list of matching chats and a status message.
483
- """
484
- if not query.strip():
485
- return [], "Please enter a search query."
486
-
487
- conn = sqlite3.connect(chat_DB_PATH)
488
- cursor = conn.cursor()
489
- try:
490
- # Use parameterized queries to prevent SQL injection
491
- cursor.execute("""
492
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
493
- FROM CharacterChats_fts
494
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
495
- WHERE CharacterChats_fts MATCH ?
496
- ORDER BY rank
497
- """, (query,))
498
- rows = cursor.fetchall()
499
- columns = [description[0] for description in cursor.description]
500
- results = [dict(zip(columns, row)) for row in rows]
501
- status_message = f"Found {len(results)} chat(s) matching '{query}'."
502
- return results, status_message
503
- except Exception as e:
504
- logging.error(f"Error searching chats with FTS5: {e}")
505
- return [], f"Error occurred during search: {e}"
506
- finally:
507
- conn.close()
508
-
509
- def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool:
510
- """Update an existing chat history."""
511
- conn = sqlite3.connect(chat_DB_PATH)
512
- cursor = conn.cursor()
513
- try:
514
- chat_history_json = json.dumps(chat_history)
515
- cursor.execute("""
516
- UPDATE CharacterChats
517
- SET chat_history = ?
518
- WHERE id = ?
519
- """, (
520
- chat_history_json,
521
- chat_id
522
- ))
523
- conn.commit()
524
- return cursor.rowcount > 0
525
- except sqlite3.Error as e:
526
- logging.error(f"Error updating character chat: {e}")
527
- return False
528
- finally:
529
- conn.close()
530
-
531
-
532
- def delete_character_chat(chat_id: int) -> bool:
533
- """Delete a specific chat."""
534
- conn = sqlite3.connect(chat_DB_PATH)
535
- cursor = conn.cursor()
536
- try:
537
- cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,))
538
- conn.commit()
539
- return cursor.rowcount > 0
540
- except sqlite3.Error as e:
541
- logging.error(f"Error deleting character chat: {e}")
542
- return False
543
- finally:
544
- conn.close()
545
-
546
- def fetch_keywords_for_chats(keywords: List[str]) -> List[int]:
547
- """
548
- Fetch chat IDs associated with any of the specified keywords.
549
-
550
- Args:
551
- keywords (List[str]): List of keywords to search for.
552
-
553
- Returns:
554
- List[int]: List of chat IDs associated with the keywords.
555
- """
556
- if not keywords:
557
- return []
558
-
559
- conn = sqlite3.connect(chat_DB_PATH)
560
- cursor = conn.cursor()
561
- try:
562
- # Construct the WHERE clause to search for each keyword
563
- keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords))
564
- sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}"
565
- cursor.execute(sql_query, keywords)
566
- rows = cursor.fetchall()
567
- chat_ids = [row[0] for row in rows]
568
- return chat_ids
569
- except Exception as e:
570
- logging.error(f"Error in fetch_keywords_for_chats: {e}")
571
- return []
572
- finally:
573
- conn.close()
574
-
575
- def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]:
576
- """Save chat history to the CharacterChats table.
577
-
578
- Returns the ID of the inserted chat or None if failed.
579
- """
580
- return add_character_chat(character_id, conversation_name, chat_history)
581
-
582
- def migrate_chat_to_media_db():
583
- pass
584
-
585
-
586
- def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]:
587
- """
588
- Perform a full-text search on specified fields with optional filtering and pagination.
589
-
590
- Args:
591
- query (str): The search query.
592
- fields (List[str]): List of fields to search in.
593
- where_clause (str, optional): Additional SQL WHERE clause to filter results.
594
- page (int, optional): Page number for pagination.
595
- results_per_page (int, optional): Number of results per page.
596
-
597
- Returns:
598
- List[Dict[str, Any]]: List of matching chat records with content and metadata.
599
- """
600
- if not query.strip():
601
- return []
602
-
603
- conn = sqlite3.connect(chat_DB_PATH)
604
- cursor = conn.cursor()
605
- try:
606
- # Construct the MATCH query for FTS5
607
- match_query = " AND ".join(fields) + f" MATCH ?"
608
- # Adjust the query with the fields
609
- fts_query = f"""
610
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
611
- FROM CharacterChats_fts
612
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
613
- WHERE {match_query}
614
- """
615
- if where_clause:
616
- fts_query += f" AND ({where_clause})"
617
- fts_query += " ORDER BY rank LIMIT ? OFFSET ?"
618
- offset = (page - 1) * results_per_page
619
- cursor.execute(fts_query, (query, results_per_page, offset))
620
- rows = cursor.fetchall()
621
- columns = [description[0] for description in cursor.description]
622
- results = [dict(zip(columns, row)) for row in rows]
623
- return results
624
- except Exception as e:
625
- logging.error(f"Error in search_db: {e}")
626
- return []
627
- finally:
628
- conn.close()
629
-
630
-
631
- def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \
632
- List[Dict[str, Any]]:
633
- """
634
- Perform a full-text search within the specified chat IDs using FTS5.
635
-
636
- Args:
637
- query (str): The user's query.
638
- relevant_chat_ids (List[int]): List of chat IDs to search within.
639
- page (int): Pagination page number.
640
- results_per_page (int): Number of results per page.
641
-
642
- Returns:
643
- List[Dict[str, Any]]: List of search results with content and metadata.
644
- """
645
- try:
646
- # Construct a WHERE clause to limit the search to relevant chat IDs
647
- where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids])
648
- if not where_clause:
649
- where_clause = "1" # No restriction if no chat IDs
650
-
651
- # Perform full-text search using FTS5
652
- fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page)
653
-
654
- filtered_fts_results = [
655
- {
656
- "content": result['content'],
657
- "metadata": {"media_id": result['id']}
658
- }
659
- for result in fts_results
660
- if result['id'] in relevant_chat_ids
661
- ]
662
- return filtered_fts_results
663
- except Exception as e:
664
- logging.error(f"Error in perform_full_text_search_chat: {str(e)}")
665
- return []
666
-
667
-
668
- def fetch_all_chats() -> List[Dict[str, Any]]:
669
- """
670
- Fetch all chat messages from the database.
671
-
672
- Returns:
673
- List[Dict[str, Any]]: List of chat messages with relevant metadata.
674
- """
675
- try:
676
- chats = get_character_chats() # Modify this function to retrieve all chats
677
- return chats
678
- except Exception as e:
679
- logging.error(f"Error fetching all chats: {str(e)}")
680
- return []
681
-
682
- #
683
- # End of Character_Chat_DB.py
684
- #######################################################################################################################
 
1
+ # character_chat_db.py
2
+ # Database functions for managing character cards and chat histories.
3
+ # #
4
+ # Imports
5
+ import configparser
6
+ import sqlite3
7
+ import json
8
+ import os
9
+ import sys
10
+ from typing import List, Dict, Optional, Tuple, Any, Union
11
+
12
+ from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path
13
+ import logging
14
+
15
+ #
16
+ #######################################################################################################################
17
+ #
18
+ #
19
+
20
+ def ensure_database_directory():
21
+ os.makedirs(get_database_dir(), exist_ok=True)
22
+
23
+ ensure_database_directory()
24
+
25
+
26
+ # Construct the path to the config file
27
+ config_path = get_project_relative_path('Config_Files/config.txt')
28
+
29
+ # Read the config file
30
+ config = configparser.ConfigParser()
31
+ config.read(config_path)
32
+
33
+ # Get the chat db path from the config, or use the default if not specified
34
+ chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db'))
35
+ print(f"Chat Database path: {chat_DB_PATH}")
36
+
37
+ ########################################################################################################
38
+ #
39
+ # Functions
40
+
41
+ # FIXME - Setup properly and test/add documentation for its existence...
42
+ def initialize_database():
43
+ """Initialize the SQLite database with required tables and FTS5 virtual tables."""
44
+ conn = None
45
+ try:
46
+ conn = sqlite3.connect(chat_DB_PATH)
47
+ cursor = conn.cursor()
48
+
49
+ # Enable foreign key constraints
50
+ cursor.execute("PRAGMA foreign_keys = ON;")
51
+
52
+ # Create CharacterCards table with V2 fields
53
+ cursor.execute("""
54
+ CREATE TABLE IF NOT EXISTS CharacterCards (
55
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
56
+ name TEXT UNIQUE NOT NULL,
57
+ description TEXT,
58
+ personality TEXT,
59
+ scenario TEXT,
60
+ image BLOB,
61
+ post_history_instructions TEXT,
62
+ first_mes TEXT,
63
+ mes_example TEXT,
64
+ creator_notes TEXT,
65
+ system_prompt TEXT,
66
+ alternate_greetings TEXT,
67
+ tags TEXT,
68
+ creator TEXT,
69
+ character_version TEXT,
70
+ extensions TEXT,
71
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP
72
+ );
73
+ """)
74
+
75
+ # Create CharacterChats table
76
+ cursor.execute("""
77
+ CREATE TABLE IF NOT EXISTS CharacterChats (
78
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
79
+ character_id INTEGER NOT NULL,
80
+ conversation_name TEXT,
81
+ chat_history TEXT,
82
+ is_snapshot BOOLEAN DEFAULT FALSE,
83
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
84
+ FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE
85
+ );
86
+ """)
87
+
88
+ # Create FTS5 virtual table for CharacterChats
89
+ cursor.execute("""
90
+ CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5(
91
+ conversation_name,
92
+ chat_history,
93
+ content='CharacterChats',
94
+ content_rowid='id'
95
+ );
96
+ """)
97
+
98
+ # Create triggers to keep FTS5 table in sync with CharacterChats
99
+ cursor.executescript("""
100
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN
101
+ INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history)
102
+ VALUES (new.id, new.conversation_name, new.chat_history);
103
+ END;
104
+
105
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN
106
+ DELETE FROM CharacterChats_fts WHERE rowid = old.id;
107
+ END;
108
+
109
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN
110
+ UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history
111
+ WHERE rowid = new.id;
112
+ END;
113
+ """)
114
+
115
+ # Create ChatKeywords table
116
+ cursor.execute("""
117
+ CREATE TABLE IF NOT EXISTS ChatKeywords (
118
+ chat_id INTEGER NOT NULL,
119
+ keyword TEXT NOT NULL,
120
+ FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE
121
+ );
122
+ """)
123
+
124
+ # Create indexes for faster searches
125
+ cursor.execute("""
126
+ CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword);
127
+ """)
128
+ cursor.execute("""
129
+ CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id);
130
+ """)
131
+
132
+ conn.commit()
133
+ logging.info("Database initialized successfully.")
134
+ except sqlite3.Error as e:
135
+ logging.error(f"SQLite error occurred during database initialization: {e}")
136
+ if conn:
137
+ conn.rollback()
138
+ raise
139
+ except Exception as e:
140
+ logging.error(f"Unexpected error occurred during database initialization: {e}")
141
+ if conn:
142
+ conn.rollback()
143
+ raise
144
+ finally:
145
+ if conn:
146
+ conn.close()
147
+
148
+ # Call initialize_database() at the start of your application
149
+ def setup_chat_database():
150
+ try:
151
+ initialize_database()
152
+ except Exception as e:
153
+ logging.critical(f"Failed to initialize database: {e}")
154
+ sys.exit(1)
155
+
156
+ setup_chat_database()
157
+
158
+ ########################################################################################################
159
+ #
160
+ # Character Card handling
161
+
162
+ def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]:
163
+ """Parse and validate a character card according to V2 specification."""
164
+ v2_data = {
165
+ 'name': card_data.get('name', ''),
166
+ 'description': card_data.get('description', ''),
167
+ 'personality': card_data.get('personality', ''),
168
+ 'scenario': card_data.get('scenario', ''),
169
+ 'first_mes': card_data.get('first_mes', ''),
170
+ 'mes_example': card_data.get('mes_example', ''),
171
+ 'creator_notes': card_data.get('creator_notes', ''),
172
+ 'system_prompt': card_data.get('system_prompt', ''),
173
+ 'post_history_instructions': card_data.get('post_history_instructions', ''),
174
+ 'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])),
175
+ 'tags': json.dumps(card_data.get('tags', [])),
176
+ 'creator': card_data.get('creator', ''),
177
+ 'character_version': card_data.get('character_version', ''),
178
+ 'extensions': json.dumps(card_data.get('extensions', {}))
179
+ }
180
+
181
+ # Handle 'image' separately as it might be binary data
182
+ if 'image' in card_data:
183
+ v2_data['image'] = card_data['image']
184
+
185
+ return v2_data
186
+
187
+
188
+ def add_character_card(card_data: Dict[str, Any]) -> Optional[int]:
189
+ """Add or update a character card in the database."""
190
+ conn = sqlite3.connect(chat_DB_PATH)
191
+ cursor = conn.cursor()
192
+ try:
193
+ parsed_card = parse_character_card(card_data)
194
+
195
+ # Check if character already exists
196
+ cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],))
197
+ row = cursor.fetchone()
198
+
199
+ if row:
200
+ # Update existing character
201
+ character_id = row[0]
202
+ update_query = """
203
+ UPDATE CharacterCards
204
+ SET description = ?, personality = ?, scenario = ?, image = ?,
205
+ post_history_instructions = ?, first_mes = ?, mes_example = ?,
206
+ creator_notes = ?, system_prompt = ?, alternate_greetings = ?,
207
+ tags = ?, creator = ?, character_version = ?, extensions = ?
208
+ WHERE id = ?
209
+ """
210
+ cursor.execute(update_query, (
211
+ parsed_card['description'], parsed_card['personality'], parsed_card['scenario'],
212
+ parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'],
213
+ parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'],
214
+ parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'],
215
+ parsed_card['character_version'], parsed_card['extensions'], character_id
216
+ ))
217
+ else:
218
+ # Insert new character
219
+ insert_query = """
220
+ INSERT INTO CharacterCards (name, description, personality, scenario, image,
221
+ post_history_instructions, first_mes, mes_example, creator_notes, system_prompt,
222
+ alternate_greetings, tags, creator, character_version, extensions)
223
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
224
+ """
225
+ cursor.execute(insert_query, (
226
+ parsed_card['name'], parsed_card['description'], parsed_card['personality'],
227
+ parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'],
228
+ parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'],
229
+ parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'],
230
+ parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions']
231
+ ))
232
+ character_id = cursor.lastrowid
233
+
234
+ conn.commit()
235
+ return character_id
236
+ except sqlite3.IntegrityError as e:
237
+ logging.error(f"Error adding character card: {e}")
238
+ return None
239
+ except Exception as e:
240
+ logging.error(f"Unexpected error adding character card: {e}")
241
+ return None
242
+ finally:
243
+ conn.close()
244
+
245
+ # def add_character_card(card_data: Dict) -> Optional[int]:
246
+ # """Add or update a character card in the database.
247
+ #
248
+ # Returns the ID of the inserted character or None if failed.
249
+ # """
250
+ # conn = sqlite3.connect(chat_DB_PATH)
251
+ # cursor = conn.cursor()
252
+ # try:
253
+ # # Ensure all required fields are present
254
+ # required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message']
255
+ # for field in required_fields:
256
+ # if field not in card_data:
257
+ # card_data[field] = '' # Assign empty string if field is missing
258
+ #
259
+ # # Check if character already exists
260
+ # cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],))
261
+ # row = cursor.fetchone()
262
+ #
263
+ # if row:
264
+ # # Update existing character
265
+ # character_id = row[0]
266
+ # cursor.execute("""
267
+ # UPDATE CharacterCards
268
+ # SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
269
+ # WHERE id = ?
270
+ # """, (
271
+ # card_data['description'],
272
+ # card_data['personality'],
273
+ # card_data['scenario'],
274
+ # card_data['image'],
275
+ # card_data['post_history_instructions'],
276
+ # card_data['first_message'],
277
+ # character_id
278
+ # ))
279
+ # else:
280
+ # # Insert new character
281
+ # cursor.execute("""
282
+ # INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message)
283
+ # VALUES (?, ?, ?, ?, ?, ?, ?)
284
+ # """, (
285
+ # card_data['name'],
286
+ # card_data['description'],
287
+ # card_data['personality'],
288
+ # card_data['scenario'],
289
+ # card_data['image'],
290
+ # card_data['post_history_instructions'],
291
+ # card_data['first_message']
292
+ # ))
293
+ # character_id = cursor.lastrowid
294
+ #
295
+ # conn.commit()
296
+ # return cursor.lastrowid
297
+ # except sqlite3.IntegrityError as e:
298
+ # logging.error(f"Error adding character card: {e}")
299
+ # return None
300
+ # except Exception as e:
301
+ # logging.error(f"Unexpected error adding character card: {e}")
302
+ # return None
303
+ # finally:
304
+ # conn.close()
305
+
306
+
307
+ def get_character_cards() -> List[Dict]:
308
+ """Retrieve all character cards from the database."""
309
+ logging.debug(f"Fetching characters from DB: {chat_DB_PATH}")
310
+ conn = sqlite3.connect(chat_DB_PATH)
311
+ cursor = conn.cursor()
312
+ cursor.execute("SELECT * FROM CharacterCards")
313
+ rows = cursor.fetchall()
314
+ columns = [description[0] for description in cursor.description]
315
+ conn.close()
316
+ characters = [dict(zip(columns, row)) for row in rows]
317
+ #logging.debug(f"Characters fetched from DB: {characters}")
318
+ return characters
319
+
320
+
321
+ def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
322
+ """
323
+ Retrieve a single character card by its ID.
324
+
325
+ Args:
326
+ character_id: Can be either an integer ID or a dictionary containing character data.
327
+
328
+ Returns:
329
+ A dictionary containing the character card data, or None if not found.
330
+ """
331
+ conn = sqlite3.connect(chat_DB_PATH)
332
+ cursor = conn.cursor()
333
+ try:
334
+ if isinstance(character_id, dict):
335
+ # If a dictionary is passed, assume it's already a character card
336
+ return character_id
337
+ elif isinstance(character_id, int):
338
+ # If an integer is passed, fetch the character from the database
339
+ cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,))
340
+ row = cursor.fetchone()
341
+ if row:
342
+ columns = [description[0] for description in cursor.description]
343
+ return dict(zip(columns, row))
344
+ else:
345
+ logging.warning(f"Invalid type for character_id: {type(character_id)}")
346
+ return None
347
+ except Exception as e:
348
+ logging.error(f"Error in get_character_card_by_id: {e}")
349
+ return None
350
+ finally:
351
+ conn.close()
352
+
353
+
354
+ def update_character_card(character_id: int, card_data: Dict) -> bool:
355
+ """Update an existing character card."""
356
+ conn = sqlite3.connect(chat_DB_PATH)
357
+ cursor = conn.cursor()
358
+ try:
359
+ cursor.execute("""
360
+ UPDATE CharacterCards
361
+ SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
362
+ WHERE id = ?
363
+ """, (
364
+ card_data.get('name'),
365
+ card_data.get('description'),
366
+ card_data.get('personality'),
367
+ card_data.get('scenario'),
368
+ card_data.get('image'),
369
+ card_data.get('post_history_instructions', ''),
370
+ card_data.get('first_message', "Hello! I'm ready to chat."),
371
+ character_id
372
+ ))
373
+ conn.commit()
374
+ return cursor.rowcount > 0
375
+ except sqlite3.IntegrityError as e:
376
+ logging.error(f"Error updating character card: {e}")
377
+ return False
378
+ finally:
379
+ conn.close()
380
+
381
+
382
+ def delete_character_card(character_id: int) -> bool:
383
+ """Delete a character card and its associated chats."""
384
+ conn = sqlite3.connect(chat_DB_PATH)
385
+ cursor = conn.cursor()
386
+ try:
387
+ # Delete associated chats first due to foreign key constraint
388
+ cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,))
389
+ cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,))
390
+ conn.commit()
391
+ return cursor.rowcount > 0
392
+ except sqlite3.Error as e:
393
+ logging.error(f"Error deleting character card: {e}")
394
+ return False
395
+ finally:
396
+ conn.close()
397
+
398
+
399
+ def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]:
400
+ """
401
+ Add a new chat history for a character, optionally associating keywords.
402
+
403
+ Args:
404
+ character_id (int): The ID of the character.
405
+ conversation_name (str): Name of the conversation.
406
+ chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples.
407
+ keywords (Optional[List[str]]): List of keywords to associate with this chat.
408
+ is_snapshot (bool, optional): Whether this chat is a snapshot.
409
+
410
+ Returns:
411
+ Optional[int]: The ID of the inserted chat or None if failed.
412
+ """
413
+ conn = sqlite3.connect(chat_DB_PATH)
414
+ cursor = conn.cursor()
415
+ try:
416
+ chat_history_json = json.dumps(chat_history)
417
+ cursor.execute("""
418
+ INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot)
419
+ VALUES (?, ?, ?, ?)
420
+ """, (
421
+ character_id,
422
+ conversation_name,
423
+ chat_history_json,
424
+ is_snapshot
425
+ ))
426
+ chat_id = cursor.lastrowid
427
+
428
+ if keywords:
429
+ # Insert keywords into ChatKeywords table
430
+ keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords]
431
+ cursor.executemany("""
432
+ INSERT INTO ChatKeywords (chat_id, keyword)
433
+ VALUES (?, ?)
434
+ """, keyword_records)
435
+
436
+ conn.commit()
437
+ return chat_id
438
+ except sqlite3.Error as e:
439
+ logging.error(f"Error adding character chat: {e}")
440
+ return None
441
+ finally:
442
+ conn.close()
443
+
444
+
445
+ def get_character_chats(character_id: Optional[int] = None) -> List[Dict]:
446
+ """Retrieve all chats, or chats for a specific character if character_id is provided."""
447
+ conn = sqlite3.connect(chat_DB_PATH)
448
+ cursor = conn.cursor()
449
+ if character_id is not None:
450
+ cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,))
451
+ else:
452
+ cursor.execute("SELECT * FROM CharacterChats")
453
+ rows = cursor.fetchall()
454
+ columns = [description[0] for description in cursor.description]
455
+ conn.close()
456
+ return [dict(zip(columns, row)) for row in rows]
457
+
458
+
459
+ def get_character_chat_by_id(chat_id: int) -> Optional[Dict]:
460
+ """Retrieve a single chat by its ID."""
461
+ conn = sqlite3.connect(chat_DB_PATH)
462
+ cursor = conn.cursor()
463
+ cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,))
464
+ row = cursor.fetchone()
465
+ conn.close()
466
+ if row:
467
+ columns = [description[0] for description in cursor.description]
468
+ chat = dict(zip(columns, row))
469
+ chat['chat_history'] = json.loads(chat['chat_history'])
470
+ return chat
471
+ return None
472
+
473
+
474
+ def search_character_chats(query: str) -> Tuple[List[Dict], str]:
475
+ """
476
+ Search for character chats using FTS5.
477
+
478
+ Args:
479
+ query (str): The search query.
480
+
481
+ Returns:
482
+ Tuple[List[Dict], str]: A list of matching chats and a status message.
483
+ """
484
+ if not query.strip():
485
+ return [], "Please enter a search query."
486
+
487
+ conn = sqlite3.connect(chat_DB_PATH)
488
+ cursor = conn.cursor()
489
+ try:
490
+ # Use parameterized queries to prevent SQL injection
491
+ cursor.execute("""
492
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
493
+ FROM CharacterChats_fts
494
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
495
+ WHERE CharacterChats_fts MATCH ?
496
+ ORDER BY rank
497
+ """, (query,))
498
+ rows = cursor.fetchall()
499
+ columns = [description[0] for description in cursor.description]
500
+ results = [dict(zip(columns, row)) for row in rows]
501
+ status_message = f"Found {len(results)} chat(s) matching '{query}'."
502
+ return results, status_message
503
+ except Exception as e:
504
+ logging.error(f"Error searching chats with FTS5: {e}")
505
+ return [], f"Error occurred during search: {e}"
506
+ finally:
507
+ conn.close()
508
+
509
+ def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool:
510
+ """Update an existing chat history."""
511
+ conn = sqlite3.connect(chat_DB_PATH)
512
+ cursor = conn.cursor()
513
+ try:
514
+ chat_history_json = json.dumps(chat_history)
515
+ cursor.execute("""
516
+ UPDATE CharacterChats
517
+ SET chat_history = ?
518
+ WHERE id = ?
519
+ """, (
520
+ chat_history_json,
521
+ chat_id
522
+ ))
523
+ conn.commit()
524
+ return cursor.rowcount > 0
525
+ except sqlite3.Error as e:
526
+ logging.error(f"Error updating character chat: {e}")
527
+ return False
528
+ finally:
529
+ conn.close()
530
+
531
+
532
+ def delete_character_chat(chat_id: int) -> bool:
533
+ """Delete a specific chat."""
534
+ conn = sqlite3.connect(chat_DB_PATH)
535
+ cursor = conn.cursor()
536
+ try:
537
+ cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,))
538
+ conn.commit()
539
+ return cursor.rowcount > 0
540
+ except sqlite3.Error as e:
541
+ logging.error(f"Error deleting character chat: {e}")
542
+ return False
543
+ finally:
544
+ conn.close()
545
+
546
+ def fetch_keywords_for_chats(keywords: List[str]) -> List[int]:
547
+ """
548
+ Fetch chat IDs associated with any of the specified keywords.
549
+
550
+ Args:
551
+ keywords (List[str]): List of keywords to search for.
552
+
553
+ Returns:
554
+ List[int]: List of chat IDs associated with the keywords.
555
+ """
556
+ if not keywords:
557
+ return []
558
+
559
+ conn = sqlite3.connect(chat_DB_PATH)
560
+ cursor = conn.cursor()
561
+ try:
562
+ # Construct the WHERE clause to search for each keyword
563
+ keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords))
564
+ sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}"
565
+ cursor.execute(sql_query, keywords)
566
+ rows = cursor.fetchall()
567
+ chat_ids = [row[0] for row in rows]
568
+ return chat_ids
569
+ except Exception as e:
570
+ logging.error(f"Error in fetch_keywords_for_chats: {e}")
571
+ return []
572
+ finally:
573
+ conn.close()
574
+
575
+ def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]:
576
+ """Save chat history to the CharacterChats table.
577
+
578
+ Returns the ID of the inserted chat or None if failed.
579
+ """
580
+ return add_character_chat(character_id, conversation_name, chat_history)
581
+
582
+ def migrate_chat_to_media_db():
583
+ pass
584
+
585
+
586
+ def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]:
587
+ """
588
+ Perform a full-text search on specified fields with optional filtering and pagination.
589
+
590
+ Args:
591
+ query (str): The search query.
592
+ fields (List[str]): List of fields to search in.
593
+ where_clause (str, optional): Additional SQL WHERE clause to filter results.
594
+ page (int, optional): Page number for pagination.
595
+ results_per_page (int, optional): Number of results per page.
596
+
597
+ Returns:
598
+ List[Dict[str, Any]]: List of matching chat records with content and metadata.
599
+ """
600
+ if not query.strip():
601
+ return []
602
+
603
+ conn = sqlite3.connect(chat_DB_PATH)
604
+ cursor = conn.cursor()
605
+ try:
606
+ # Construct the MATCH query for FTS5
607
+ match_query = " AND ".join(fields) + f" MATCH ?"
608
+ # Adjust the query with the fields
609
+ fts_query = f"""
610
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
611
+ FROM CharacterChats_fts
612
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
613
+ WHERE {match_query}
614
+ """
615
+ if where_clause:
616
+ fts_query += f" AND ({where_clause})"
617
+ fts_query += " ORDER BY rank LIMIT ? OFFSET ?"
618
+ offset = (page - 1) * results_per_page
619
+ cursor.execute(fts_query, (query, results_per_page, offset))
620
+ rows = cursor.fetchall()
621
+ columns = [description[0] for description in cursor.description]
622
+ results = [dict(zip(columns, row)) for row in rows]
623
+ return results
624
+ except Exception as e:
625
+ logging.error(f"Error in search_db: {e}")
626
+ return []
627
+ finally:
628
+ conn.close()
629
+
630
+
631
+ def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \
632
+ List[Dict[str, Any]]:
633
+ """
634
+ Perform a full-text search within the specified chat IDs using FTS5.
635
+
636
+ Args:
637
+ query (str): The user's query.
638
+ relevant_chat_ids (List[int]): List of chat IDs to search within.
639
+ page (int): Pagination page number.
640
+ results_per_page (int): Number of results per page.
641
+
642
+ Returns:
643
+ List[Dict[str, Any]]: List of search results with content and metadata.
644
+ """
645
+ try:
646
+ # Construct a WHERE clause to limit the search to relevant chat IDs
647
+ where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids])
648
+ if not where_clause:
649
+ where_clause = "1" # No restriction if no chat IDs
650
+
651
+ # Perform full-text search using FTS5
652
+ fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page)
653
+
654
+ filtered_fts_results = [
655
+ {
656
+ "content": result['content'],
657
+ "metadata": {"media_id": result['id']}
658
+ }
659
+ for result in fts_results
660
+ if result['id'] in relevant_chat_ids
661
+ ]
662
+ return filtered_fts_results
663
+ except Exception as e:
664
+ logging.error(f"Error in perform_full_text_search_chat: {str(e)}")
665
+ return []
666
+
667
+
668
+ def fetch_all_chats() -> List[Dict[str, Any]]:
669
+ """
670
+ Fetch all chat messages from the database.
671
+
672
+ Returns:
673
+ List[Dict[str, Any]]: List of chat messages with relevant metadata.
674
+ """
675
+ try:
676
+ chats = get_character_chats() # Modify this function to retrieve all chats
677
+ return chats
678
+ except Exception as e:
679
+ logging.error(f"Error fetching all chats: {str(e)}")
680
+ return []
681
+
682
+ #
683
+ # End of Character_Chat_DB.py
684
+ #######################################################################################################################