|
import os |
|
import openai |
|
import gradio as gr |
|
from langchain.chains import ConversationalRetrievalChain |
|
from langchain.text_splitter import CharacterTextSplitter |
|
from langchain_community.document_loaders import PyMuPDFLoader, PyPDFLoader |
|
from langchain.vectorstores import Chroma |
|
from langchain_community.embeddings import OpenAIEmbeddings |
|
from langchain_community.chat_models import ChatOpenAI |
|
import shutil |
|
|
|
|
|
api_key_env = os.getenv("OPENAI_API_KEY") |
|
if api_key_env: |
|
openai.api_key = api_key_env |
|
else: |
|
print("未設置固定的 OpenAI API 密鑰。將使用使用者提供的密鑰。") |
|
|
|
|
|
VECTORDB_DIR = os.path.abspath("./data") |
|
os.makedirs(VECTORDB_DIR, exist_ok=True) |
|
os.chmod(VECTORDB_DIR, 0o755) |
|
|
|
|
|
def load_and_process_documents(file_paths, loader_type='PyMuPDFLoader', api_key=None): |
|
if not api_key: |
|
raise ValueError("未提供 OpenAI API 密鑰。") |
|
documents = [] |
|
|
|
for file_path in file_paths: |
|
if not os.path.exists(file_path): |
|
continue |
|
try: |
|
if loader_type == 'PyMuPDFLoader': |
|
loader = PyMuPDFLoader(file_path) |
|
elif loader_type == 'PyPDFLoader': |
|
loader = PyPDFLoader(file_path) |
|
else: |
|
continue |
|
loaded_docs = loader.load() |
|
if loaded_docs: |
|
documents.extend(loaded_docs) |
|
except Exception as e: |
|
continue |
|
|
|
if not documents: |
|
raise ValueError("沒有找到任何 PDF 文件或 PDF 文件無法載入。") |
|
|
|
|
|
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=50) |
|
documents = text_splitter.split_documents(documents) |
|
|
|
if not documents: |
|
raise ValueError("分割後的文檔列表為空。請檢查 PDF 文件內容。") |
|
|
|
|
|
try: |
|
embeddings = OpenAIEmbeddings(openai_api_key=api_key) |
|
except Exception as e: |
|
raise ValueError(f"初始化 OpenAIEmbeddings 時出現錯誤: {e}") |
|
|
|
try: |
|
vectordb = Chroma.from_documents( |
|
documents, |
|
embedding=embeddings, |
|
persist_directory=VECTORDB_DIR |
|
) |
|
except Exception as e: |
|
raise ValueError(f"初始化 Chroma 向量資料庫時出現錯誤: {e}") |
|
|
|
return vectordb |
|
|
|
|
|
def handle_query(user_message, chat_history, vectordb, api_key): |
|
try: |
|
if not user_message: |
|
return chat_history |
|
|
|
|
|
preface = """ |
|
指令: 以繁體中文回答問題,200字以內。你是一位勞動法專家,針對員工權益與合同條款等法律問題進行回應。 |
|
非相關問題,請回應:「目前僅支援勞動法相關問題。」。 |
|
""" |
|
query = f"{preface} 查詢內容:{user_message}" |
|
|
|
|
|
pdf_qa = ConversationalRetrievalChain.from_llm( |
|
ChatOpenAI(temperature=0.7, model="gpt-4", openai_api_key=api_key), |
|
retriever=vectordb.as_retriever(search_kwargs={'k': 6}), |
|
return_source_documents=True |
|
) |
|
|
|
|
|
result = pdf_qa.invoke({"question": query, "chat_history": chat_history}) |
|
|
|
if "answer" in result: |
|
chat_history = chat_history + [(user_message, result["answer"])] |
|
else: |
|
chat_history = chat_history + [(user_message, "抱歉,未能獲得有效回應。")] |
|
return chat_history |
|
|
|
except Exception as e: |
|
return chat_history + [("系統", f"出現錯誤: {str(e)}")] |
|
|
|
|
|
def save_api_key(api_key, state): |
|
if not api_key.startswith("sk-"): |
|
return "請輸入有效的 OpenAI API 密鑰。", state |
|
state['api_key'] = api_key |
|
return "API 密鑰已成功保存。您現在可以上傳 PDF 文件並開始提問。", state |
|
|
|
|
|
def process_files(files, state): |
|
if files: |
|
try: |
|
api_key = state.get('api_key', None) |
|
if not api_key: |
|
return "請先輸入並保存您的 OpenAI API 密鑰。", state |
|
|
|
saved_file_paths = [] |
|
for idx, file_data in enumerate(files): |
|
filename = f"uploaded_{idx}.pdf" |
|
save_path = os.path.join(VECTORDB_DIR, filename) |
|
with open(save_path, "wb") as f: |
|
f.write(file_data) |
|
saved_file_paths.append(save_path) |
|
vectordb = load_and_process_documents(saved_file_paths, loader_type='PyMuPDFLoader', api_key=api_key) |
|
state['vectordb'] = vectordb |
|
return "PDF 文件已成功上傳並處理。您現在可以開始提問。", state |
|
except Exception as e: |
|
return f"處理文件時出現錯誤: {e}", state |
|
else: |
|
return "請上傳至少一個 PDF 文件。", state |
|
|
|
def chat_interface(user_message, chat_history, state): |
|
vectordb = state.get('vectordb', None) |
|
api_key = state.get('api_key', None) |
|
if not vectordb: |
|
return chat_history, state, "請先上傳 PDF 文件以進行處理。" |
|
if not api_key: |
|
return chat_history, state, "請先輸入並保存您的 OpenAI API 密鑰。" |
|
|
|
updated_history = handle_query(user_message, chat_history, vectordb, api_key) |
|
return updated_history, state, "" |
|
|
|
|
|
with gr.Blocks(css="body { background-color: #EBD6D6; }") as demo: |
|
gr.Markdown("<h1 style='text-align: center;'>勞動法智能諮詢系統</h1>") |
|
|
|
state = gr.State({"vectordb": None, "api_key": None}) |
|
|
|
|
|
api_key_input = gr.Textbox( |
|
label="輸入您的 OpenAI API 密鑰", |
|
placeholder="sk-...", |
|
type="password", |
|
interactive=True |
|
) |
|
save_api_key_btn = gr.Button("保存 API 密鑰") |
|
api_key_status = gr.Textbox(label="狀態", interactive=False) |
|
|
|
|
|
gr.Markdown("<span style='font-size: 1.5em; font-weight: bold;'>請上傳勞動法相關文檔,讓我協助解決您的職場問題!🤖</span>") |
|
upload = gr.File( |
|
file_count="multiple", |
|
file_types=[".pdf"], |
|
label="上傳勞動法 PDF 文件", |
|
interactive=True, |
|
type="binary" |
|
) |
|
upload_btn = gr.Button("上傳並處理") |
|
upload_status = gr.Textbox(label="上傳狀態", interactive=False) |
|
|
|
|
|
gr.Markdown("### 勞動法小幫手") |
|
chatbot = gr.Chatbot() |
|
|
|
txt = gr.Textbox(show_label=False, placeholder="請輸入您的法律問題...") |
|
submit_btn = gr.Button("提問") |
|
|
|
|
|
save_api_key_btn.click( |
|
save_api_key, |
|
inputs=[api_key_input, state], |
|
outputs=[api_key_status, state] |
|
) |
|
|
|
upload_btn.click( |
|
process_files, |
|
inputs=[upload, state], |
|
outputs=[upload_status, state] |
|
) |
|
|
|
submit_btn.click( |
|
chat_interface, |
|
inputs=[txt, chatbot, state], |
|
outputs=[chatbot, state, txt] |
|
) |
|
|
|
txt.submit( |
|
chat_interface, |
|
inputs=[txt, chatbot, state], |
|
outputs=[chatbot, state, txt] |
|
) |
|
|
|
|
|
demo.launch() |