|
|
|
from typing import Dict, Any |
|
from flows.base_flows.atomic import AtomicFlow |
|
class UpdatePlanAtomicFlow(AtomicFlow): |
|
def _check_input(self, input_data: Dict[str, Any]): |
|
assert "memory_files" in input_data, "memory_files not passed to UpdatePlanAtomicFlow.yaml" |
|
assert "plan" in input_data["memory_files"], "plan not in memory_files" |
|
|
|
def _call(self, input_data: Dict[str, Any]): |
|
try: |
|
plan_file_location = input_data["memory_files"]["plan"] |
|
plan_to_write = input_data["updated_plan"] |
|
with open(plan_file_location, 'w') as file: |
|
file.write(plan_to_write + "\n") |
|
return { |
|
"result": "updated plan saved to the plan file and has overriden the previous plan", |
|
"summary": f"Jarvis/UpdatePlanFlow: updated plan saved to {plan_file_location}" |
|
} |
|
except Exception as e: |
|
return { |
|
"result": f"Error occurred: {str(e)}", |
|
"summary": f"Jarvis/UpdatePlanFlow: error occurred while writing updated plan: {str(e)}" |
|
} |
|
|
|
def run( |
|
self, |
|
input_data: Dict[str, Any] |
|
): |
|
self._check_input(input_data) |
|
return self._call(input_data) |