AutoGPTFlowModule / AutoGPTFlow.py
martinjosifoski's picture
First commit.
b52b47e
raw
history blame
3.61 kB
import sys
from typing import Dict, Any
from flows.base_flows import CircularFlow
from flows.utils import logging
logging.set_verbosity_debug()
log = logging.get_logger(__name__)
from flow_modules.aiflows.ControllerExecutorFlowModule import ControllerAtomicFlow
from flow_modules.aiflows.VectorStoreFlowModule import ChromaDBFlow
class AutoGPTFlow(CircularFlow):
def _on_reach_max_round(self):
self._state_update_dict({
"answer": "The maximum amount of rounds was reached before the model found an answer.",
"status": "unfinished"
})
@staticmethod
def _get_memory_key(flow_state):
goal = flow_state.get("goal")
last_command = flow_state.get("command")
last_command_args = flow_state.get("command_args")
last_observation = flow_state.get("observation")
last_human_feedback = flow_state.get("human_feedback")
if last_command is None:
return ""
assert goal is not None, goal
assert last_command_args is not None, last_command_args
assert last_observation is not None, last_observation
current_context = \
f"""
== Goal ==
{goal}
== Command ==
{last_command}
== Args
{last_command_args}
== Result
{last_observation}
== Human Feedback ==
{last_human_feedback}
"""
return current_context
@CircularFlow.input_msg_payload_builder
def prepare_memory_read_input(self, flow_state: Dict[str, Any], dst_flow: ChromaDBFlow) -> Dict[str, Any]:
"""A (very) basic example implementation of how the memory retrieval could be constructed."""
query = self._get_memory_key(flow_state)
return {
"operation": "read",
"content": query
}
@CircularFlow.output_msg_payload_processor
def prepare_memory_read_output(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow):
retrieved_memories = output_payload["retrieved"][0][1:]
return {"memory": "\n".join(retrieved_memories)}
@CircularFlow.input_msg_payload_builder
def prepare_memory_write_input(self, flow_state: Dict[str, Any], dst_flow: ChromaDBFlow) -> Dict[str, Any]:
"""An (very) example (not optimized) implementation of how the memory population could be implemented."""
query = self._get_memory_key(flow_state)
return {
"operation": "write",
"content": str(query)
}
@CircularFlow.output_msg_payload_processor
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[
str, Any]:
command = output_payload["command"]
if command == "finish":
return {
"EARLY_EXIT": True,
"answer": output_payload["command_args"]["answer"],
"status": "finished"
}
else:
return output_payload
@CircularFlow.output_msg_payload_processor
def detect_finish_in_human_input(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[
str, Any]:
human_feedback = output_payload["human_input"]
if human_feedback.strip().lower() == "q":
return {
"EARLY_EXIT": True,
"answer": "The user has chosen to exit before a final answer was generated.",
"status": "unfinished"
}
return {"human_feedback": human_feedback}