|
from typing import Dict, Any |
|
import os |
|
from flow_modules.Tachi67.PlanWriterFlowModule import PlanWriterFlow |
|
from flows.base_flows import CircularFlow |
|
|
|
class Planner_JarvisFlow(PlanWriterFlow): |
|
|
|
@CircularFlow.output_msg_payload_processor |
|
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: |
|
command = output_payload["command"] |
|
if command == "finish": |
|
|
|
keys_to_fetch_from_state = ["temp_plan_file_location", "plan", "memory_files"] |
|
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
|
temp_plan_file_location = fetched_state["temp_plan_file_location"] |
|
plan_content = fetched_state["plan"] |
|
plan_file_location = fetched_state["memory_files"]["plan"] |
|
|
|
|
|
if os.path.exists(temp_plan_file_location): |
|
os.remove(temp_plan_file_location) |
|
|
|
|
|
with open(plan_file_location, 'w') as file: |
|
file.write(plan_content) |
|
|
|
|
|
return { |
|
"EARLY_EXIT": True, |
|
"plan": plan_content, |
|
"summary": "Jarvis/PlanWriter: " + output_payload["command_args"]["summary"], |
|
"status": "finished" |
|
} |
|
elif command == "manual_finish": |
|
|
|
keys_to_fetch_from_state = ["temp_plan_file_location"] |
|
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
|
temp_plan_file_location = fetched_state["temp_plan_file_location"] |
|
if os.path.exists(temp_plan_file_location): |
|
os.remove(temp_plan_file_location) |
|
|
|
return { |
|
"EARLY_EXIT": True, |
|
"plan": "no plan was generated", |
|
"summary": "Jarvis/PlanWriter: PlanWriter was terminated explicitly by the user, process is unfinished", |
|
"status": "unfinished" |
|
} |
|
elif command == "write_plan": |
|
keys_to_fetch_from_state = ["memory_files"] |
|
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
|
output_payload["command_args"]["plan_file_location"] = fetched_state["memory_files"]["plan"] |
|
return output_payload |
|
else: |
|
return output_payload |