File size: 3,248 Bytes
370522e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# Podcast_tab.py
# Description: Gradio UI for ingesting podcasts into the database
#
# Imports
#
#
# External Imports
import json
import logging
import tempfile
from typing import List, Tuple, IO, Union
#
# Local Imports
from App_Function_Libraries.DB.DB_Manager import db, search_db, DatabaseError, get_media_content
from App_Function_Libraries.RAG.RAG_Libary_2 import generate_answer
#
########################################################################################################################
#
# Functions:

def rag_qa_chat(message: str, history: List[Tuple[str, str]], context: Union[str, IO[str]], api_choice: str) -> Tuple[List[Tuple[str, str]], str]:
    try:
        # Prepare the context based on the selected source
        if hasattr(context, 'read'):
            # Handle uploaded file
            context_text = context.read()
            if isinstance(context_text, bytes):
                context_text = context_text.decode('utf-8')
        elif isinstance(context, str) and context.startswith("media_id:"):
            # Handle existing file or search result
            media_id = int(context.split(":")[1])
            context_text = get_media_content(media_id)  # Implement this function to fetch content from the database
        else:
            context_text = str(context)

        # Prepare the full context including chat history
        full_context = "\n".join([f"Human: {h[0]}\nAI: {h[1]}" for h in history])
        full_context += f"\n\nContext: {context_text}\n\nHuman: {message}\nAI:"

        # Generate response using the selected API
        response = generate_answer(api_choice, full_context, message)

        # Update history
        history.append((message, response))

        return history, ""
    except DatabaseError as e:
        logging.error(f"Database error in rag_qa_chat: {str(e)}")
        return history, f"An error occurred while accessing the database: {str(e)}"
    except Exception as e:
        logging.error(f"Unexpected error in rag_qa_chat: {str(e)}")
        return history, f"An unexpected error occurred: {str(e)}"



def save_chat_history(history: List[Tuple[str, str]]) -> str:
    # Save chat history to a file
    with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
        json.dump(history, temp_file)
        return temp_file.name


def load_chat_history(file: IO[str]) -> List[Tuple[str, str]]:
    # Load chat history from a file
    return json.load(file)


def search_database(query: str) -> List[Tuple[int, str]]:
    # Implement database search functionality
    results = search_db(query, ["title", "content"], "", page=1, results_per_page=10)
    return [(result['id'], result['title']) for result in results]


def get_existing_files() -> List[Tuple[int, str]]:
    # Fetch list of existing files from the database
    with db.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT id, title FROM Media ORDER BY title")
        return cursor.fetchall()


#
# End of RAG_QA_Chat.py
########################################################################################################################