Spaces:
Running
Running
import time | |
from langchain.agents import AgentType | |
from langchain.agents import initialize_agent | |
from langchain.callbacks import get_openai_callback | |
from langchain.chat_models import ChatOpenAI | |
from nodes.Worker import * | |
from utils.CustomDocstoreExplorer import CustomDocstoreExplorer | |
from utils.util import * | |
class ReactBase: | |
def __init__(self, fewshot, model_name="text-davinci-002", max_iter=8, verbose=True): | |
self.model_name = model_name | |
self.max_iter = max_iter | |
self.verbose = verbose | |
self.fewshot = fewshot | |
self.tools = self._load_tools() | |
if model_name in OPENAI_COMPLETION_MODELS: | |
self.agent = initialize_agent(self.tools, | |
OpenAI(temperature=0, model_name=self.model_name), | |
agent=AgentType.REACT_DOCSTORE, | |
verbose=self.verbose, | |
return_intermediate_steps=True, | |
max_iterations=max_iter) | |
elif model_name in OPENAI_CHAT_MODELS: | |
self.agent = initialize_agent(self.tools, | |
ChatOpenAI(temperature=0, model_name=self.model_name), | |
agent=AgentType.REACT_DOCSTORE, | |
verbose=self.verbose, | |
return_intermediate_steps=True, | |
max_iterations=max_iter) | |
self.agent.agent.llm_chain.prompt.template = fewshot | |
def run(self, prompt): | |
self.reset() | |
result = {} | |
with get_openai_callback() as cb: | |
st = time.time() | |
response = self.agent(prompt) | |
result["wall_time"] = time.time() - st | |
result["input"] = response["input"] | |
result["output"] = response["output"] | |
result["intermediate_steps"] = response["intermediate_steps"] | |
result["tool_usage"] = self._parse_tool(response["intermediate_steps"]) | |
result["total_tokens"] = cb.total_tokens | |
result["prompt_tokens"] = cb.prompt_tokens | |
result["completion_tokens"] = cb.completion_tokens | |
result["total_cost"] = cb.total_cost | |
result["steps"] = len(response["intermediate_steps"]) + 1 | |
result["token_cost"] = result["total_cost"] | |
result["tool_cost"] = 0 | |
return result | |
def _load_tools(self): | |
docstore = CustomDocstoreExplorer(Wikipedia()) | |
return [ | |
Tool( | |
name="Search", | |
func=docstore.search, | |
description="useful for when you need to ask with search" | |
), | |
Tool( | |
name="Lookup", | |
func=docstore.lookup, | |
description="useful for when you need to ask with lookup" | |
) | |
] | |
def reset(self): | |
self.tools = self._load_tools() | |
if self.model_name in OPENAI_COMPLETION_MODELS: | |
self.agent = initialize_agent(self.tools, | |
OpenAI(temperature=0, model_name=self.model_name), | |
agent=AgentType.REACT_DOCSTORE, | |
verbose=self.verbose, | |
return_intermediate_steps=True, | |
max_iterations=self.max_iter) | |
elif self.model_name in OPENAI_CHAT_MODELS: | |
self.agent = initialize_agent(self.tools, | |
ChatOpenAI(temperature=0, model_name=self.model_name), | |
agent=AgentType.REACT_DOCSTORE, | |
verbose=self.verbose, | |
return_intermediate_steps=True, | |
max_iterations=self.max_iter) | |
self.agent.agent.llm_chain.prompt.template = self.fewshot | |
def _parse_tool(self, intermediate_steps): | |
tool_usage = {"search": 0, "lookup": 0} | |
for step in intermediate_steps: | |
if step[0].tool == "Search": | |
tool_usage["search"] += 1 | |
if step[0].tool == "Lookup": | |
tool_usage["lookup"] += 1 | |
return tool_usage | |
class ReactExtraTool(ReactBase): | |
def __init__(self, model_name="text-davinci-003", available_tools=["Google", "Calculator"], fewshot="\n", | |
verbose=True): | |
self.model_name = model_name | |
self.verbose = verbose | |
self.fewshot = fewshot | |
self.available_tools = available_tools | |
self.tools = self._load_tools() | |
self.agent = initialize_agent(self.tools, | |
OpenAI(temperature=0, model_name=self.model_name), | |
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
verbose=self.verbose, | |
return_intermediate_steps=True) | |
def run(self, prompt): | |
self.reset() | |
result = {} | |
with get_openai_callback() as cb: | |
st = time.time() | |
response = self.agent(prompt) | |
result["wall_time"] = time.time() - st | |
result["input"] = response["input"] | |
result["output"] = response["output"] | |
result["intermediate_steps"] = response["intermediate_steps"] | |
result["tool_usage"] = self._parse_tool(response["intermediate_steps"]) | |
result["total_tokens"] = cb.total_tokens + result["tool_usage"]["llm-math_token"] | |
result["prompt_tokens"] = cb.prompt_tokens | |
result["completion_tokens"] = cb.completion_tokens | |
result["total_cost"] = cb.total_cost + result["tool_usage"]["llm-math_token"] * 0.000002 + \ | |
result["tool_usage"]["serpapi"] * 0.01 # Developer Plan | |
result["steps"] = len(response["intermediate_steps"]) + 1 | |
result["token_cost"] = result["total_cost"] | |
result["tool_cost"] = 0 | |
return result | |
def _load_tools(self): | |
tools = [] | |
for tool_name in self.available_tools: | |
tool_cls = WORKER_REGISTRY[tool_name] | |
tools += [Tool(name=tool_name, | |
func=tool_cls.run, | |
description=tool_cls.description)] | |
return tools | |
def reset(self): | |
self.tools = self._load_tools() | |
self.agent = initialize_agent(self.tools, | |
OpenAI(temperature=0, model_name=self.model_name), | |
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
verbose=self.verbose, | |
return_intermediate_steps=True) | |
self.agent.agent.llm_chain.prompt.template = PREFIX + self._generate_tool_prompt() + "\n" + self.fewshot | |
def _parse_tool(self, intermediate_steps): | |
tool_usage = {"serpapi": 0, "llm-math_token": 0} | |
for step in intermediate_steps: | |
if step[0].tool == "Search": | |
tool_usage["serpapi"] += 1 | |
if step[0].tool == "Calculator": | |
tool_usage["llm-math_token"] += len(step[0].tool_input + step[1]) // 4 # 4 chars per token | |
return tool_usage | |
def _get_worker(self, name): | |
if name in WORKER_REGISTRY: | |
return WORKER_REGISTRY[name] | |
else: | |
raise ValueError("Worker not found") | |
def _generate_tool_prompt(self): | |
prompt = "Tools can be one of the following:\n" | |
for name in self.available_tools: | |
worker = self._get_worker(name) | |
prompt += f"{worker.name}[input]: {worker.description}\n" | |
return prompt + "\n" | |
PREFIX = """Answer the following questions as best you can. You have access to the following tools: | |
""" | |