oceansweep commited on
Commit
c8770f3
1 Parent(s): a589866

Update App_Function_Libraries/RAG/RAG_QA_Chat.py

Browse files
App_Function_Libraries/RAG/RAG_QA_Chat.py CHANGED
@@ -1,84 +1,84 @@
1
- # Podcast_tab.py
2
- # Description: Gradio UI for ingesting podcasts into the database
3
- #
4
- # Imports
5
- #
6
- #
7
- # External Imports
8
- import json
9
- import logging
10
- import tempfile
11
- from typing import List, Tuple, IO, Union
12
- #
13
- # Local Imports
14
- from App_Function_Libraries.DB.DB_Manager import db, search_db, DatabaseError, get_media_content
15
- from App_Function_Libraries.RAG.RAG_Libary_2 import generate_answer
16
- #
17
- ########################################################################################################################
18
- #
19
- # Functions:
20
-
21
- def rag_qa_chat(message: str, history: List[Tuple[str, str]], context: Union[str, IO[str]], api_choice: str) -> Tuple[List[Tuple[str, str]], str]:
22
- try:
23
- # Prepare the context based on the selected source
24
- if hasattr(context, 'read'):
25
- # Handle uploaded file
26
- context_text = context.read()
27
- if isinstance(context_text, bytes):
28
- context_text = context_text.decode('utf-8')
29
- elif isinstance(context, str) and context.startswith("media_id:"):
30
- # Handle existing file or search result
31
- media_id = int(context.split(":")[1])
32
- context_text = get_media_content(media_id) # Implement this function to fetch content from the database
33
- else:
34
- context_text = str(context)
35
-
36
- # Prepare the full context including chat history
37
- full_context = "\n".join([f"Human: {h[0]}\nAI: {h[1]}" for h in history])
38
- full_context += f"\n\nContext: {context_text}\n\nHuman: {message}\nAI:"
39
-
40
- # Generate response using the selected API
41
- response = generate_answer(api_choice, full_context, message)
42
-
43
- # Update history
44
- history.append((message, response))
45
-
46
- return history, ""
47
- except DatabaseError as e:
48
- logging.error(f"Database error in rag_qa_chat: {str(e)}")
49
- return history, f"An error occurred while accessing the database: {str(e)}"
50
- except Exception as e:
51
- logging.error(f"Unexpected error in rag_qa_chat: {str(e)}")
52
- return history, f"An unexpected error occurred: {str(e)}"
53
-
54
-
55
-
56
- def save_chat_history(history: List[Tuple[str, str]]) -> str:
57
- # Save chat history to a file
58
- with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
59
- json.dump(history, temp_file)
60
- return temp_file.name
61
-
62
-
63
- def load_chat_history(file: IO[str]) -> List[Tuple[str, str]]:
64
- # Load chat history from a file
65
- return json.load(file)
66
-
67
-
68
- def search_database(query: str) -> List[Tuple[int, str]]:
69
- # Implement database search functionality
70
- results = search_db(query, ["title", "content"], "", page=1, results_per_page=10)
71
- return [(result['id'], result['title']) for result in results]
72
-
73
-
74
- def get_existing_files() -> List[Tuple[int, str]]:
75
- # Fetch list of existing files from the database
76
- with db.get_connection() as conn:
77
- cursor = conn.cursor()
78
- cursor.execute("SELECT id, title FROM Media ORDER BY title")
79
- return cursor.fetchall()
80
-
81
-
82
- #
83
- # End of RAG_QA_Chat.py
84
- ########################################################################################################################
 
1
+ # Podcast_tab.py
2
+ # Description: Gradio UI for ingesting podcasts into the database
3
+ #
4
+ # Imports
5
+ #
6
+ #
7
+ # External Imports
8
+ import json
9
+ import logging
10
+ import tempfile
11
+ from typing import List, Tuple, IO, Union
12
+ #
13
+ # Local Imports
14
+ from App_Function_Libraries.DB.DB_Manager import db, search_db, DatabaseError, get_media_content
15
+ from App_Function_Libraries.RAG.RAG_Library_2 import generate_answer
16
+ #
17
+ ########################################################################################################################
18
+ #
19
+ # Functions:
20
+
21
+ def rag_qa_chat(message: str, history: List[Tuple[str, str]], context: Union[str, IO[str]], api_choice: str) -> Tuple[List[Tuple[str, str]], str]:
22
+ try:
23
+ # Prepare the context based on the selected source
24
+ if hasattr(context, 'read'):
25
+ # Handle uploaded file
26
+ context_text = context.read()
27
+ if isinstance(context_text, bytes):
28
+ context_text = context_text.decode('utf-8')
29
+ elif isinstance(context, str) and context.startswith("media_id:"):
30
+ # Handle existing file or search result
31
+ media_id = int(context.split(":")[1])
32
+ context_text = get_media_content(media_id) # Implement this function to fetch content from the database
33
+ else:
34
+ context_text = str(context)
35
+
36
+ # Prepare the full context including chat history
37
+ full_context = "\n".join([f"Human: {h[0]}\nAI: {h[1]}" for h in history])
38
+ full_context += f"\n\nContext: {context_text}\n\nHuman: {message}\nAI:"
39
+
40
+ # Generate response using the selected API
41
+ response = generate_answer(api_choice, full_context, message)
42
+
43
+ # Update history
44
+ history.append((message, response))
45
+
46
+ return history, ""
47
+ except DatabaseError as e:
48
+ logging.error(f"Database error in rag_qa_chat: {str(e)}")
49
+ return history, f"An error occurred while accessing the database: {str(e)}"
50
+ except Exception as e:
51
+ logging.error(f"Unexpected error in rag_qa_chat: {str(e)}")
52
+ return history, f"An unexpected error occurred: {str(e)}"
53
+
54
+
55
+
56
+ def save_chat_history(history: List[Tuple[str, str]]) -> str:
57
+ # Save chat history to a file
58
+ with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
59
+ json.dump(history, temp_file)
60
+ return temp_file.name
61
+
62
+
63
+ def load_chat_history(file: IO[str]) -> List[Tuple[str, str]]:
64
+ # Load chat history from a file
65
+ return json.load(file)
66
+
67
+
68
+ def search_database(query: str) -> List[Tuple[int, str]]:
69
+ # Implement database search functionality
70
+ results = search_db(query, ["title", "content"], "", page=1, results_per_page=10)
71
+ return [(result['id'], result['title']) for result in results]
72
+
73
+
74
+ def get_existing_files() -> List[Tuple[int, str]]:
75
+ # Fetch list of existing files from the database
76
+ with db.get_connection() as conn:
77
+ cursor = conn.cursor()
78
+ cursor.execute("SELECT id, title FROM Media ORDER BY title")
79
+ return cursor.fetchall()
80
+
81
+
82
+ #
83
+ # End of RAG_QA_Chat.py
84
+ ########################################################################################################################