from typing import List, Union, Any, Optional, Dict import uuid import re from datetime import date import asyncio from collections import defaultdict import os from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent, AgentOutputParser from langchain.prompts import StringPromptTemplate from langchain import LLMChain from langchain.chat_models import ChatOpenAI from langchain.schema import AgentAction, AgentFinish from langchain.callbacks import get_openai_callback from langchain.callbacks.base import AsyncCallbackHandler from langchain.callbacks.manager import AsyncCallbackManager from langchain.base_language import BaseLanguageModel from autoagents.tools.tools import search_tool, note_tool, rewrite_search_query from autoagents.utils.logger import InteractionsLogger # Set up the base template template = """ We are working together to satisfy the user's original goal step-by-step. Play to your strengths as an LLM. Make sure the plan is achievable using the available tools. You SHOULD directly produce a `Final Answer:` when you think you have good-enough information to achieve the Goal. The final answer should be descriptive should be descriptive, encompassing all relevant details.. Today is {today}. ## Goal: {input} If you require assistance or additional information, you should use *only* one of the following tools: {tools}. ## Output format You MUST produce Output in the following format: Thought: you should always think about what to do when you think you have not achieved the Goal. Reasoning: reasoning Plan: - short bulleted - list that conveys - next-step plan Action: the action to take, should be ONE OF {tool_names} Action Input: the input to the Action Observation: the result of the Action ... (this Thought/Reasoning/Plan/Action/Action Input/Observation can repeat N times until there is a Final Answer) Final Answer: the final answer to achieve the original Goal which can be the only output or when you have no Action to do next. ## History {agent_scratchpad} Do not repeat any past actions in History, because you will not get additional information. If the last action is search, then you should use notepad to keep critical information. If you have gathered all information in your plannings to satisfy the user's original goal, then respond immediately as the Final Answer. """ # Set up a prompt template class CustomPromptTemplate(StringPromptTemplate): # The template to use template: str # The list of tools available tools: List[Tool] ialogger: InteractionsLogger def format(self, **kwargs) -> str: # Get the intermediate steps (AgentAction, Observation tuples) # Format them in a particular way intermediate_steps = kwargs.pop("intermediate_steps") outputs = "" # Set the agent_scratchpad variable to that value for action, observation in intermediate_steps[:-1]: outputs += f"{action.log}\n" if len(intermediate_steps) > 0: action, observation = intermediate_steps[-1] # self.ialogger.add_system({"action": action, "observation": observation}) if action.tool not in ("Search", "Notepad"): raise Exception("Invalid tool requested by the model.") if action.tool == "Notepad": outputs += f"{action.log}\n" outputs += f"Observation: {observation}\n" elif action.tool == "Search": current = "".join([f"{d}" for d in observation]) outputs += f"{action.log}\n" outputs += f"Observation: {current}\n" # Parse the output ofr the last step for the reasoning and plan regex = r"Thought\s*\d*\s*:(.*?)\n(.*)" match = re.search(regex, action.log, re.DOTALL) thoughts = match.group(1).strip() if match else "" regex = r"Reasoning\s*\d*\s*:(.*?)\n(.*)" match = re.search(regex, action.log, re.DOTALL) reasoning = match.group(1).strip() if match else "" regex = r"Plan\s*\d*\s*:(.*?)\nAction(.*)" match = re.search(regex, action.log, re.DOTALL) plans = match.group(1).strip() if match else "" self.ialogger.add_structured_data({"output":{"thoughts": thoughts, "reasoning": reasoning, "plans": plans, "action": action.tool, "action_input": action.tool_input, "raw_output":action.log}, "observation": observation}) kwargs["agent_scratchpad"] = outputs # Create a tools variable from the list of tools provided kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools]) # Create a list of tool names for the tools provided kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools]) kwargs["today"] = date.today() final_prompt = self.template.format(**kwargs) self.ialogger.add_system({"value": final_prompt}) return final_prompt class CustomOutputParser(AgentOutputParser): class Config: arbitrary_types_allowed = True ialogger: InteractionsLogger llm: BaseLanguageModel new_action_input: Optional[str] action_history = defaultdict(set) def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]: self.ialogger.add_ai(llm_output) # Check if agent should finish if "Final Answer:" in llm_output: final_answer = llm_output.split("Final Answer:")[-1].strip() self.ialogger.add_structured_data({"output": {"action": "Final Answer", "action_input": final_answer, "raw_output": llm_output}}) return AgentFinish( # Return values is generally always a dictionary with a single `output` key # It is not recommended to try anything else at the moment :) return_values={"output": final_answer}, log=llm_output, ) # Parse out the action and action input regex = r"Action\s*\d*\s*:(.*?)\nAction\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)" match = re.search(regex, llm_output, re.DOTALL) if not match: raise ValueError(f"Could not parse LLM output: `{llm_output}`") action = match.group(1).strip() action_input = match.group(2).strip().strip('"') if action_input in self.action_history[action]: new_action_input = rewrite_search_query(action_input, self.action_history[action], self.llm) self.ialogger.add_message({"query_rewrite": True}) self.new_action_input = new_action_input self.action_history[action].add(new_action_input) return AgentAction(tool=action, tool_input=new_action_input, log=llm_output) else: # Return the action and action input self.action_history[action].add(action_input) return AgentAction(tool=action, tool_input=action_input, log=llm_output) class ActionRunner: def __init__(self, outputq, llm: BaseLanguageModel, persist_logs: bool = False): self.ialogger = InteractionsLogger(name=f"{uuid.uuid4().hex[:6]}", persist=persist_logs) tools = [search_tool, note_tool] prompt = CustomPromptTemplate( template=template, tools=tools, input_variables=["input", "intermediate_steps"], ialogger=self.ialogger) output_parser = CustomOutputParser(ialogger=self.ialogger, llm=llm) class MyCustomHandler(AsyncCallbackHandler): def __init__(self): pass async def on_chain_end(self, outputs, **kwargs) -> None: if "text" in outputs: await outputq.put(outputs["text"]) async def on_agent_action( self, action: AgentAction, *, run_id: uuid.UUID, parent_run_id: Optional[uuid.UUID] = None, **kwargs: Any, ) -> None: if (new_action_input := output_parser.new_action_input): # Notify users await outputq.put(RuntimeWarning(f"Action Input Rewritten: {new_action_input}")) output_parser.new_action_input = None async def on_tool_start( self, serialized: Dict[str, Any], input_str: str, *, run_id: uuid.UUID, parent_run_id: Optional[uuid.UUID] = None, **kwargs: Any, ) -> None: pass async def on_tool_end( self, output: str, *, run_id: uuid.UUID, parent_run_id: Optional[uuid.UUID] = None, **kwargs: Any, ) -> None: await outputq.put(output) handler = MyCustomHandler() llm_chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler]) tool_names = [tool.name for tool in tools] for tool in tools: tool.callbacks = [handler] agent = LLMSingleActionAgent( llm_chain=llm_chain, output_parser=output_parser, stop=["\nObservation:"], allowed_tools=tool_names ) callback_manager = AsyncCallbackManager([handler]) # Finally create the Executor self.agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=False, callback_manager=callback_manager) async def run(self, goal: str, outputq): self.ialogger.set_goal(goal) try: with get_openai_callback() as cb: output = await self.agent_executor.arun(goal) self.ialogger.add_cost({"total_tokens": cb.total_tokens, "prompt_tokens": cb.prompt_tokens, "completion_tokens": cb.completion_tokens, "total_cost": cb.total_cost, "successful_requests": cb.successful_requests}) self.ialogger.save() except Exception as e: self.ialogger.add_message({"error": str(e)}) self.ialogger.save() await outputq.put(e) return return output