import json from copy import deepcopy from typing import Any, Dict, List from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow from dataclasses import dataclass @dataclass class Command: name: str description: str input_args: List[str] # TODO: controller should be generalized class Controller_JarvisFlow(ChatAtomicFlow): def __init__( self, commands: List[Command], **kwargs): super().__init__(**kwargs) self.system_message_prompt_template = self.system_message_prompt_template.partial( commands=self._build_commands_manual(commands), plan="no plans yet", plan_file_location="no plan file location yet", logs="no logs yet", ) self.hint_for_model = """ Make sure your response is in the following format: Response Format: { "command": "call one of the subordinates", "command_args": { "arg name": "value" } } """ def _get_content_file_location(self, input_data, content_name): # get the location of the file that contains the content: plan, logs, code_library assert "memory_files" in input_data, "memory_files not passed to Jarvis/Controller" assert content_name in input_data["memory_files"], f"{content_name} not in memory files" return input_data["memory_files"][content_name] def _get_content(self, input_data, content_name): # get the content of the file that contains the content: plan, logs, code_library assert content_name in input_data, f"{content_name} not passed to Jarvis/Controller" content = input_data[content_name] if len(content) == 0: content = f'No {content_name} yet' return content @staticmethod def _build_commands_manual(commands: List[Command]) -> str: ret = "" for i, command in enumerate(commands): command_input_json_schema = json.dumps( {input_arg: f"YOUR_{input_arg.upper()}" for input_arg in command.input_args}) ret += f"{i + 1}. {command.name}: {command.description} Input arguments (given in the JSON schema): {command_input_json_schema}\n" return ret @classmethod def instantiate_from_config(cls, config): flow_config = deepcopy(config) kwargs = {"flow_config": flow_config} # ~~~ Set up prompts ~~~ kwargs.update(cls._set_up_prompts(flow_config)) # ~~~Set up backend ~~~ kwargs.update(cls._set_up_backend(flow_config)) # ~~~ Set up commands ~~~ commands = flow_config["commands"] commands = [ Command(name, command_conf["description"], command_conf["input_args"]) for name, command_conf in commands.items() ] kwargs.update({"commands": commands}) # ~~~ Instantiate flow ~~~ return cls(**kwargs) def _update_prompts_and_input(self, input_data: Dict[str, Any]): if 'goal' in input_data: input_data['goal'] += self.hint_for_model if 'result' in input_data: input_data['result'] += self.hint_for_model plan_file_location = self._get_content_file_location(input_data, "plan") plan_content = self._get_content(input_data, "plan") logs_content = self._get_content(input_data, "logs") self.system_message_prompt_template = self.system_message_prompt_template.partial( plan_file_location=plan_file_location, plan=plan_content, logs=logs_content ) def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: self._update_prompts_and_input(input_data) api_output = super().run(input_data)["api_output"].strip() try: response = json.loads(api_output) return response except json.decoder.JSONDecodeError: new_input_data = input_data.copy() new_input_data['result'] = "The previous respond cannot be parsed with json.loads. Make sure your next response is in JSON format." new_api_output = super().run(new_input_data)["api_output"].strip() return json.loads(new_api_output)