import sys from typing import Dict, Any from flows.base_flows import CircularFlow from flows.utils import logging logging.set_verbosity_debug() log = logging.get_logger(__name__) from flow_modules.aiflows.ControllerExecutorFlowModule import ControllerAtomicFlow from flow_modules.aiflows.VectorStoreFlowModule import ChromaDBFlow class AutoGPTFlow(CircularFlow): def _on_reach_max_round(self): self._state_update_dict({ "answer": "The maximum amount of rounds was reached before the model found an answer.", "status": "unfinished" }) @staticmethod def _get_memory_key(flow_state): goal = flow_state.get("goal") last_command = flow_state.get("command") last_command_args = flow_state.get("command_args") last_observation = flow_state.get("observation") last_human_feedback = flow_state.get("human_feedback") if last_command is None: return "" assert goal is not None, goal assert last_command_args is not None, last_command_args assert last_observation is not None, last_observation current_context = \ f""" == Goal == {goal} == Command == {last_command} == Args {last_command_args} == Result {last_observation} == Human Feedback == {last_human_feedback} """ return current_context @CircularFlow.input_msg_payload_builder def prepare_memory_read_input(self, flow_state: Dict[str, Any], dst_flow: ChromaDBFlow) -> Dict[str, Any]: """A (very) basic example implementation of how the memory retrieval could be constructed.""" query = self._get_memory_key(flow_state) return { "operation": "read", "content": query } @CircularFlow.output_msg_payload_processor def prepare_memory_read_output(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow): retrieved_memories = output_payload["retrieved"][0][1:] return {"memory": "\n".join(retrieved_memories)} @CircularFlow.input_msg_payload_builder def prepare_memory_write_input(self, flow_state: Dict[str, Any], dst_flow: ChromaDBFlow) -> Dict[str, Any]: """An (very) example (not optimized) implementation of how the memory population could be implemented.""" query = self._get_memory_key(flow_state) return { "operation": "write", "content": str(query) } @CircularFlow.output_msg_payload_processor def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[ str, Any]: command = output_payload["command"] if command == "finish": return { "EARLY_EXIT": True, "answer": output_payload["command_args"]["answer"], "status": "finished" } else: return output_payload @CircularFlow.output_msg_payload_processor def detect_finish_in_human_input(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[ str, Any]: human_feedback = output_payload["human_input"] if human_feedback.strip().lower() == "q": return { "EARLY_EXIT": True, "answer": "The user has chosen to exit before a final answer was generated.", "status": "unfinished" } return {"human_feedback": human_feedback}