|
import sys |
|
from typing import Dict, Any |
|
|
|
from flows.base_flows import CircularFlow |
|
from flows.utils import logging |
|
|
|
logging.set_verbosity_debug() |
|
|
|
log = logging.get_logger(__name__) |
|
|
|
from flow_modules.aiflows.ControllerExecutorFlowModule import ControllerAtomicFlow |
|
from flow_modules.aiflows.VectorStoreFlowModule import ChromaDBFlow |
|
|
|
class AutoGPTFlow(CircularFlow): |
|
def _on_reach_max_round(self): |
|
self._state_update_dict({ |
|
"answer": "The maximum amount of rounds was reached before the model found an answer.", |
|
"status": "unfinished" |
|
}) |
|
|
|
@staticmethod |
|
def _get_memory_key(flow_state): |
|
goal = flow_state.get("goal") |
|
last_command = flow_state.get("command") |
|
last_command_args = flow_state.get("command_args") |
|
last_observation = flow_state.get("observation") |
|
last_human_feedback = flow_state.get("human_feedback") |
|
|
|
if last_command is None: |
|
return "" |
|
|
|
assert goal is not None, goal |
|
assert last_command_args is not None, last_command_args |
|
assert last_observation is not None, last_observation |
|
|
|
current_context = \ |
|
f""" |
|
== Goal == |
|
{goal} |
|
|
|
== Command == |
|
{last_command} |
|
== Args |
|
{last_command_args} |
|
== Result |
|
{last_observation} |
|
|
|
== Human Feedback == |
|
{last_human_feedback} |
|
""" |
|
|
|
return current_context |
|
|
|
@CircularFlow.input_msg_payload_builder |
|
def prepare_memory_read_input(self, flow_state: Dict[str, Any], dst_flow: ChromaDBFlow) -> Dict[str, Any]: |
|
"""A (very) basic example implementation of how the memory retrieval could be constructed.""" |
|
query = self._get_memory_key(flow_state) |
|
|
|
return { |
|
"operation": "read", |
|
"content": query |
|
} |
|
|
|
@CircularFlow.output_msg_payload_processor |
|
def prepare_memory_read_output(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow): |
|
retrieved_memories = output_payload["retrieved"][0][1:] |
|
return {"memory": "\n".join(retrieved_memories)} |
|
|
|
@CircularFlow.input_msg_payload_builder |
|
def prepare_memory_write_input(self, flow_state: Dict[str, Any], dst_flow: ChromaDBFlow) -> Dict[str, Any]: |
|
"""An (very) example (not optimized) implementation of how the memory population could be implemented.""" |
|
query = self._get_memory_key(flow_state) |
|
|
|
return { |
|
"operation": "write", |
|
"content": str(query) |
|
} |
|
|
|
@CircularFlow.output_msg_payload_processor |
|
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[ |
|
str, Any]: |
|
command = output_payload["command"] |
|
if command == "finish": |
|
return { |
|
"EARLY_EXIT": True, |
|
"answer": output_payload["command_args"]["answer"], |
|
"status": "finished" |
|
} |
|
else: |
|
return output_payload |
|
|
|
@CircularFlow.output_msg_payload_processor |
|
def detect_finish_in_human_input(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[ |
|
str, Any]: |
|
human_feedback = output_payload["human_input"] |
|
if human_feedback.strip().lower() == "q": |
|
return { |
|
"EARLY_EXIT": True, |
|
"answer": "The user has chosen to exit before a final answer was generated.", |
|
"status": "unfinished" |
|
} |
|
|
|
return {"human_feedback": human_feedback} |
|
|