from typing import Union from datetime import timedelta from request import RequestChat as req from response import ResponseChat as res from repository import ChatHistoryRepository, DetailChatRepository, UserRepository import function.chatbot as sf from typing import Dict import json, re from pydantic import BaseModel from function import support_function regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' def check_email(email): if(re.fullmatch(regex, email)): return True else: return False class Document(BaseModel): page_content: str metadata: Dict[str, str] type: str def query2_upgrade_old(request: req.RequestQuery2UpgradeOld): try: user_id = request.user_id question = request.question text_all = request.text_all chat_name = request.chat_name email = support_function.check_email_service(str(user_id)) if isinstance(email, res.ReponseError): return email if question is None: return res.ReponseError( status=400, data =res.Message(message="question is empty") ) if chat_name is None: return res.ReponseError( status=400, data =res.Message(message="chat_name is empty") ) text_all_dicts = json.loads(text_all) text_all1 = [Document(**doc) for doc in text_all_dicts] test, list1, list2 = sf.handle_query_upgrade_keyword_old(question, text_all1, email) text1 = "".join(list1) text2 = "".join(list2) id = 0 if test: chat_id = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id,chat_name) if chat_id: id = DetailChatRepository.addDetailChat(chat_id,question, test, text1, text2) if chat_id is None: ChatHistoryRepository.addChatHistory(user_id,chat_name) chat_id_new = ChatHistoryRepository.getIdChatHistoryByUserIdAndNameChat(user_id,chat_name) id = DetailChatRepository.addDetailChat(chat_id_new, question, test, text1, text2) return res.ResponseQuery2UpgradeOld( status= 200, data = res.DataAnswer1(id = id, answer=test, data_relevant=list1, sources=list2)) if test is None or test == "": return res.ReponseError( status=500, data =res.Message(message="No answer") ) except: return res.ReponseError( status=500, data =res.Message(message="Server Error") ) def extract_file(request: req.RequestExtractFile): try: user_id = request.user_id email = support_function.check_email_service(str(user_id)) if isinstance(email, res.ReponseError): return email text_all1 = sf.extract_data2(email) if text_all1 is False: return res.ResponseExtractFile( status= 200, data = res.DataExtractFile(text_all="No data response")) return res.ResponseExtractFile( status= 200, data = res.DataExtractFile(text_all=text_all1)) except: return res.ReponseError( status=500, data =res.Message(message="Server Error") ) def generate_question(request: req.RequestGenerateQuestion): try: user_id = request.user_id email = support_function.check_email_service(str(user_id)) if isinstance(email, res.ReponseError): return email text_all1 = sf.generate_question(email) if text_all1 is False: return res.ResponseGenerateQuestion( status= 200, data = res.GenerateQuestion(question=False)) return res.ResponseGenerateQuestion( status= 200, data = res.GenerateQuestion(question=text_all1)) except: return res.ReponseError( status=500, data=res.Message(message="Server Error") ) def delete_chat(request: req.RequestDeleteChat): try: user_id = request.user_id email = support_function.check_email_service(str(user_id)) if isinstance(email, res.ReponseError): return email chat_name = request.chat_name if chat_name is None: return res.ReponseError( status=400, data =res.Message(message="chat_name is empty") ) DetailChatRepository.delete_chat_detail(chat_name) check = ChatHistoryRepository.deleteChatHistory(user_id,chat_name) if check is False: return res.ResponseDeleteChat( status = 500, data = res.Message(message="Delete conversation chat failed")) else: return res.ResponseDeleteChat( status = 200, data = res.Message(message="Delete conversation chat success")) except: return res.ReponseError( status= 500, data = res.Message(message="Server Error") )