Spaces:
Running
Running
from typing import List, Union, Any, Optional, Dict | |
import uuid | |
import re | |
from datetime import date | |
import asyncio | |
from collections import defaultdict | |
import os | |
from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent, AgentOutputParser | |
from langchain.prompts import StringPromptTemplate | |
from langchain import LLMChain | |
from langchain.chat_models import ChatOpenAI | |
from langchain.schema import AgentAction, AgentFinish | |
from langchain.callbacks import get_openai_callback | |
from langchain.callbacks.base import AsyncCallbackHandler | |
from langchain.callbacks.manager import AsyncCallbackManager | |
from langchain.base_language import BaseLanguageModel | |
from autoagents.tools.tools import search_tool, note_tool, rewrite_search_query | |
from autoagents.utils.logger import InteractionsLogger | |
# Set up the base template | |
template = """ | |
We are working together to satisfy the user's original goal step-by-step. Play to your strengths as an LLM. | |
Make sure the plan is achievable using the | |
available tools. You SHOULD directly produce a `Final Answer:` when you | |
think you have good-enough information to achieve the Goal. The final answer should be descriptive should be descriptive, encompassing all relevant details.. | |
Today is {today}. | |
## Goal: | |
{input} | |
If you require assistance or additional information, you should use *only* one of the following tools: | |
{tools}. | |
## Output format | |
You MUST produce Output in the following format: | |
Thought: you should always think about what to do when you think you have not achieved the Goal. | |
Reasoning: reasoning | |
Plan: | |
- short bulleted | |
- list that conveys | |
- next-step plan | |
Action: the action to take, should be ONE OF {tool_names} | |
Action Input: the input to the Action | |
Observation: the result of the Action | |
(this Thought/Reasoning/Plan/Action/Action Input/Observation can repeat N | |
times until there is a Final Answer) | |
Final Answer: the final answer to achieve the original Goal which can be the | |
only output or when you have no Action to do next. | |
## History | |
{agent_scratchpad} | |
Do not repeat any past actions in History, because you will not get additional information. | |
If the last action is search, then you should use notepad to keep critical information. | |
If you have gathered all information in your plannings to satisfy the user's original goal, then respond immediately as the Final Answer. | |
""" | |
# Set up a prompt template | |
class CustomPromptTemplate(StringPromptTemplate): | |
# The template to use | |
template: str | |
# The list of tools available | |
tools: List[Tool] | |
ialogger: InteractionsLogger | |
def format(self, **kwargs) -> str: | |
# Get the intermediate steps (AgentAction, Observation tuples) | |
# Format them in a particular way | |
intermediate_steps = kwargs.pop("intermediate_steps") | |
outputs = "" | |
# Set the agent_scratchpad variable to that value | |
for action, observation in intermediate_steps[:-1]: | |
outputs += f"{action.log}\n" | |
if len(intermediate_steps) > 0: | |
action, observation = intermediate_steps[-1] | |
# self.ialogger.add_system({"action": action, "observation": observation}) | |
if action.tool not in ("Search", "Notepad"): | |
raise Exception("Invalid tool requested by the model.") | |
if action.tool == "Notepad": | |
outputs += f"{action.log}\n" | |
outputs += f"Observation: {observation}\n" | |
elif action.tool == "Search": | |
current = "".join([f"{d}" for d in observation]) | |
outputs += f"{action.log}\n" | |
outputs += f"Observation: {current}\n" | |
# Parse the output ofr the last step for the reasoning and plan | |
regex = r"Thought\s*\d*\s*:(.*?)\n(.*)" | |
match = re.search(regex, action.log, re.DOTALL) | |
thoughts = match.group(1).strip() if match else "" | |
regex = r"Reasoning\s*\d*\s*:(.*?)\n(.*)" | |
match = re.search(regex, action.log, re.DOTALL) | |
reasoning = match.group(1).strip() if match else "" | |
regex = r"Plan\s*\d*\s*:(.*?)\nAction(.*)" | |
match = re.search(regex, action.log, re.DOTALL) | |
plans = match.group(1).strip() if match else "" | |
self.ialogger.add_structured_data({"output":{"thoughts": thoughts, | |
"reasoning": reasoning, | |
"plans": plans, | |
"action": action.tool, | |
"action_input": action.tool_input, | |
"raw_output":action.log}, | |
"observation": observation}) | |
kwargs["agent_scratchpad"] = outputs | |
# Create a tools variable from the list of tools provided | |
kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools]) | |
# Create a list of tool names for the tools provided | |
kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools]) | |
kwargs["today"] = date.today() | |
final_prompt = self.template.format(**kwargs) | |
self.ialogger.add_system({"value": final_prompt}) | |
return final_prompt | |
class CustomOutputParser(AgentOutputParser): | |
class Config: | |
arbitrary_types_allowed = True | |
ialogger: InteractionsLogger | |
llm: BaseLanguageModel | |
new_action_input: Optional[str] | |
action_history = defaultdict(set) | |
def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]: | |
self.ialogger.add_ai(llm_output) | |
# Check if agent should finish | |
if "Final Answer:" in llm_output: | |
final_answer = llm_output.split("Final Answer:")[-1].strip() | |
self.ialogger.add_structured_data({"output": {"action": "Final Answer", | |
"action_input": final_answer, | |
"raw_output": llm_output}}) | |
return AgentFinish( | |
# Return values is generally always a dictionary with a single `output` key | |
# It is not recommended to try anything else at the moment :) | |
return_values={"output": final_answer}, | |
log=llm_output, | |
) | |
# Parse out the action and action input | |
regex = r"Action\s*\d*\s*:(.*?)\nAction\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)" | |
match = re.search(regex, llm_output, re.DOTALL) | |
if not match: | |
raise ValueError(f"Could not parse LLM output: `{llm_output}`") | |
action = match.group(1).strip() | |
action_input = match.group(2).strip().strip('"') | |
if action_input in self.action_history[action]: | |
new_action_input = rewrite_search_query(action_input, | |
self.action_history[action], | |
self.llm) | |
self.ialogger.add_message({"query_rewrite": True}) | |
self.new_action_input = new_action_input | |
self.action_history[action].add(new_action_input) | |
return AgentAction(tool=action, tool_input=new_action_input, log=llm_output) | |
else: | |
# Return the action and action input | |
self.action_history[action].add(action_input) | |
return AgentAction(tool=action, tool_input=action_input, log=llm_output) | |
class ActionRunner: | |
def __init__(self, | |
outputq, | |
llm: BaseLanguageModel, | |
persist_logs: bool = False): | |
self.ialogger = InteractionsLogger(name=f"{uuid.uuid4().hex[:6]}", persist=persist_logs) | |
tools = [search_tool, note_tool] | |
prompt = CustomPromptTemplate( | |
template=template, | |
tools=tools, | |
input_variables=["input", "intermediate_steps"], | |
ialogger=self.ialogger) | |
output_parser = CustomOutputParser(ialogger=self.ialogger, llm=llm) | |
class MyCustomHandler(AsyncCallbackHandler): | |
def __init__(self): | |
pass | |
async def on_chain_end(self, outputs, **kwargs) -> None: | |
if "text" in outputs: | |
await outputq.put(outputs["text"]) | |
async def on_agent_action( | |
self, | |
action: AgentAction, | |
*, | |
run_id: uuid.UUID, | |
parent_run_id: Optional[uuid.UUID] = None, | |
**kwargs: Any, | |
) -> None: | |
if (new_action_input := output_parser.new_action_input): | |
# Notify users | |
await outputq.put(RuntimeWarning(f"Action Input Rewritten: {new_action_input}")) | |
output_parser.new_action_input = None | |
async def on_tool_start( | |
self, | |
serialized: Dict[str, Any], | |
input_str: str, | |
*, | |
run_id: uuid.UUID, | |
parent_run_id: Optional[uuid.UUID] = None, | |
**kwargs: Any, | |
) -> None: | |
pass | |
async def on_tool_end( | |
self, | |
output: str, | |
*, | |
run_id: uuid.UUID, | |
parent_run_id: Optional[uuid.UUID] = None, | |
**kwargs: Any, | |
) -> None: | |
await outputq.put(output) | |
handler = MyCustomHandler() | |
llm_chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler]) | |
tool_names = [tool.name for tool in tools] | |
for tool in tools: | |
tool.callbacks = [handler] | |
agent = LLMSingleActionAgent( | |
llm_chain=llm_chain, | |
output_parser=output_parser, | |
stop=["\nObservation:"], | |
allowed_tools=tool_names | |
) | |
callback_manager = AsyncCallbackManager([handler]) | |
# Finally create the Executor | |
self.agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, | |
tools=tools, | |
verbose=False, | |
callback_manager=callback_manager) | |
async def run(self, goal: str, outputq): | |
self.ialogger.set_goal(goal) | |
try: | |
with get_openai_callback() as cb: | |
output = await self.agent_executor.arun(goal) | |
self.ialogger.add_cost({"total_tokens": cb.total_tokens, | |
"prompt_tokens": cb.prompt_tokens, | |
"completion_tokens": cb.completion_tokens, | |
"total_cost": cb.total_cost, | |
"successful_requests": cb.successful_requests}) | |
self.ialogger.save() | |
except Exception as e: | |
self.ialogger.add_message({"error": str(e)}) | |
self.ialogger.save() | |
await outputq.put(e) | |
return | |
return output | |