from langchain.chains.summarize import load_summarize_chain from langchain import PromptTemplate, LLMChain from langchain.chat_models import ChatOpenAI from langchain.prompts import PromptTemplate from langchain.text_splitter import TokenTextSplitter from langchain.embeddings import OpenAIEmbeddings from langchain.vectorstores import FAISS from langchain.chains import RetrievalQA from langchain.agents import load_tools from langchain.agents import initialize_agent from langchain.agents import AgentType from langchain.docstore.document import Document from langchain.tools import BaseTool, StructuredTool, Tool, tool from langchain.callbacks.stdout import StdOutCallbackHandler from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler from langchain.callbacks.manager import BaseCallbackManager from duckduckgo_search import DDGS from itertools import islice from typing import Any, Dict, List, Optional, Union from langchain.callbacks.base import BaseCallbackHandler from langchain.input import print_text from langchain.schema import AgentAction, AgentFinish, LLMResult from pydantic import BaseModel, Field import requests from bs4 import BeautifulSoup from threading import Thread, Condition from collections import deque from .base_model import BaseLLMModel, CallbackToIterator, ChuanhuCallbackHandler from ..config import default_chuanhu_assistant_model from ..presets import SUMMARIZE_PROMPT, i18n from ..index_func import construct_index from langchain.callbacks import get_openai_callback import os import gradio as gr import logging class GoogleSearchInput(BaseModel): keywords: str = Field(description="keywords to search") class WebBrowsingInput(BaseModel): url: str = Field(description="URL of a webpage") class WebAskingInput(BaseModel): url: str = Field(description="URL of a webpage") question: str = Field(description="Question that you want to know the answer to, based on the webpage's content.") class ChuanhuAgent_Client(BaseLLMModel): def __init__(self, model_name, openai_api_key, user_name="") -> None: super().__init__(model_name=model_name, user=user_name) self.text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=30) self.api_key = openai_api_key self.llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0, model_name=default_chuanhu_assistant_model, openai_api_base=os.environ.get("OPENAI_API_BASE", None)) self.cheap_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0, model_name="gpt-3.5-turbo", openai_api_base=os.environ.get("OPENAI_API_BASE", None)) PROMPT = PromptTemplate(template=SUMMARIZE_PROMPT, input_variables=["text"]) self.summarize_chain = load_summarize_chain(self.cheap_llm, chain_type="map_reduce", return_intermediate_steps=True, map_prompt=PROMPT, combine_prompt=PROMPT) self.index_summary = None self.index = None if "Pro" in self.model_name: self.tools = load_tools(["serpapi", "google-search-results-json", "llm-math", "arxiv", "wikipedia", "wolfram-alpha"], llm=self.llm) else: self.tools = load_tools(["ddg-search", "llm-math", "arxiv", "wikipedia"], llm=self.llm) self.tools.append( Tool.from_function( func=self.google_search_simple, name="Google Search JSON", description="useful when you need to search the web.", args_schema=GoogleSearchInput ) ) self.tools.append( Tool.from_function( func=self.summary_url, name="Summary Webpage", description="useful when you need to know the overall content of a webpage.", args_schema=WebBrowsingInput ) ) self.tools.append( StructuredTool.from_function( func=self.ask_url, name="Ask Webpage", description="useful when you need to ask detailed questions about a webpage.", args_schema=WebAskingInput ) ) def google_search_simple(self, query): results = [] with DDGS() as ddgs: ddgs_gen = ddgs.text("notes from a dead house", backend="lite") for r in islice(ddgs_gen, 10): results.append({ "title": r["title"], "link": r["href"], "snippet": r["body"] }) return str(results) def handle_file_upload(self, files, chatbot, language): """if the model accepts multi modal input, implement this function""" status = gr.Markdown.update() if files: index = construct_index(self.api_key, file_src=files) assert index is not None, "获取索引失败" self.index = index status = i18n("索引构建完成") # Summarize the document logging.info(i18n("生成内容总结中……")) with get_openai_callback() as cb: os.environ["OPENAI_API_KEY"] = self.api_key from langchain.chains.summarize import load_summarize_chain from langchain.prompts import PromptTemplate from langchain.chat_models import ChatOpenAI prompt_template = "Write a concise summary of the following:\n\n{text}\n\nCONCISE SUMMARY IN " + language + ":" PROMPT = PromptTemplate(template=prompt_template, input_variables=["text"]) llm = ChatOpenAI() chain = load_summarize_chain(llm, chain_type="map_reduce", return_intermediate_steps=True, map_prompt=PROMPT, combine_prompt=PROMPT) summary = chain({"input_documents": list(index.docstore.__dict__["_dict"].values())}, return_only_outputs=True)["output_text"] logging.info(f"Summary: {summary}") self.index_summary = summary chatbot.append((f"Uploaded {len(files)} files", summary)) logging.info(cb) return gr.Files.update(), chatbot, status def query_index(self, query): if self.index is not None: retriever = self.index.as_retriever() qa = RetrievalQA.from_chain_type(llm=self.llm, chain_type="stuff", retriever=retriever) return qa.run(query) else: "Error during query." def summary(self, text): texts = Document(page_content=text) texts = self.text_splitter.split_documents([texts]) return self.summarize_chain({"input_documents": texts}, return_only_outputs=True)["output_text"] def fetch_url_content(self, url): response = requests.get(url) soup = BeautifulSoup(response.text, 'html.parser') # 提取所有的文本 text = ''.join(s.getText() for s in soup.find_all('p')) logging.info(f"Extracted text from {url}") return text def summary_url(self, url): text = self.fetch_url_content(url) if text == "": return "URL unavailable." text_summary = self.summary(text) url_content = "webpage content summary:\n" + text_summary return url_content def ask_url(self, url, question): text = self.fetch_url_content(url) if text == "": return "URL unavailable." texts = Document(page_content=text) texts = self.text_splitter.split_documents([texts]) # use embedding embeddings = OpenAIEmbeddings(openai_api_key=self.api_key, openai_api_base=os.environ.get("OPENAI_API_BASE", None)) # create vectorstore db = FAISS.from_documents(texts, embeddings) retriever = db.as_retriever() qa = RetrievalQA.from_chain_type(llm=self.cheap_llm, chain_type="stuff", retriever=retriever) return qa.run(f"{question} Reply in 中文") def get_answer_at_once(self): question = self.history[-1]["content"] # llm=ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo") agent = initialize_agent(self.tools, self.llm, agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose=True) reply = agent.run(input=f"{question} Reply in 简体中文") return reply, -1 def get_answer_stream_iter(self): question = self.history[-1]["content"] it = CallbackToIterator() manager = BaseCallbackManager(handlers=[ChuanhuCallbackHandler(it.callback)]) def thread_func(): tools = self.tools if self.index is not None: tools.append( Tool.from_function( func=self.query_index, name="Query Knowledge Base", description=f"useful when you need to know about: {self.index_summary}", args_schema=WebBrowsingInput ) ) agent = initialize_agent(self.tools, self.llm, agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION, verbose=True, callback_manager=manager) try: reply = agent.run(input=f"{question} Reply in 简体中文") except Exception as e: import traceback traceback.print_exc() reply = str(e) it.callback(reply) it.finish() t = Thread(target=thread_func) t.start() partial_text = "" for value in it: partial_text += value yield partial_text