JarvisFlowModule / Controller_JarvisFlow.py
Tachi67's picture
Upload 15 files
d4a9b53
raw
history blame
4.31 kB
import json
from copy import deepcopy
from typing import Any, Dict, List
from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow
from dataclasses import dataclass
@dataclass
class Command:
name: str
description: str
input_args: List[str]
# TODO: controller should be generalized
class Controller_JarvisFlow(ChatAtomicFlow):
def __init__(
self,
commands: List[Command],
**kwargs):
super().__init__(**kwargs)
self.system_message_prompt_template = self.system_message_prompt_template.partial(
commands=self._build_commands_manual(commands),
plan="no plans yet",
plan_file_location="no plan file location yet",
logs="no logs yet",
)
self.hint_for_model = """
Make sure your response is in the following format:
Response Format:
{
"command": "call one of the subordinates",
"command_args": {
"arg name": "value"
}
}
"""
def _get_content_file_location(self, input_data, content_name):
# get the location of the file that contains the content: plan, logs, code_library
assert "memory_files" in input_data, "memory_files not passed to Jarvis/Controller"
assert content_name in input_data["memory_files"], f"{content_name} not in memory files"
return input_data["memory_files"][content_name]
def _get_content(self, input_data, content_name):
# get the content of the file that contains the content: plan, logs, code_library
assert content_name in input_data, f"{content_name} not passed to Jarvis/Controller"
content = input_data[content_name]
if len(content) == 0:
content = f'No {content_name} yet'
return content
@staticmethod
def _build_commands_manual(commands: List[Command]) -> str:
ret = ""
for i, command in enumerate(commands):
command_input_json_schema = json.dumps(
{input_arg: f"YOUR_{input_arg.upper()}" for input_arg in command.input_args})
ret += f"{i + 1}. {command.name}: {command.description} Input arguments (given in the JSON schema): {command_input_json_schema}\n"
return ret
@classmethod
def instantiate_from_config(cls, config):
flow_config = deepcopy(config)
kwargs = {"flow_config": flow_config}
# ~~~ Set up prompts ~~~
kwargs.update(cls._set_up_prompts(flow_config))
# ~~~Set up backend ~~~
kwargs.update(cls._set_up_backend(flow_config))
# ~~~ Set up commands ~~~
commands = flow_config["commands"]
commands = [
Command(name, command_conf["description"], command_conf["input_args"]) for name, command_conf in
commands.items()
]
kwargs.update({"commands": commands})
# ~~~ Instantiate flow ~~~
return cls(**kwargs)
def _update_prompts_and_input(self, input_data: Dict[str, Any]):
if 'goal' in input_data:
input_data['goal'] += self.hint_for_model
if 'result' in input_data:
input_data['result'] += self.hint_for_model
plan_file_location = self._get_content_file_location(input_data, "plan")
plan_content = self._get_content(input_data, "plan")
logs_content = self._get_content(input_data, "logs")
self.system_message_prompt_template = self.system_message_prompt_template.partial(
plan_file_location=plan_file_location,
plan=plan_content,
logs=logs_content
)
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
self._update_prompts_and_input(input_data)
api_output = super().run(input_data)["api_output"].strip()
try:
response = json.loads(api_output)
return response
except json.decoder.JSONDecodeError:
new_input_data = input_data.copy()
new_input_data['result'] = "The previous respond cannot be parsed with json.loads. Make sure your next response is in JSON format."
new_api_output = super().run(new_input_data)["api_output"].strip()
return json.loads(new_api_output)