import traceback import logging from typing import Tuple, List import copy from pathlib import Path import json from collections import OrderedDict import os import sys sys.path.append(os.getcwd()) from cllm.agents import container from cllm.agents.builtin import BUILTIN_PLANS, load_builtin_plans from cllm.agents.container import auto_type from cllm.agents.base import DataType, NON_FILE_TYPES from .interpretor import Interpretor from .planner import Planner from .responser import generate_response logger = logging.getLogger(__name__) class Controller: def __init__(self, stream=True, interpretor_kwargs={}): self.stream = stream self.planner = Planner(self.stream) self.interpretor = Interpretor(**interpretor_kwargs) self.SHORTCUT = "**Using builtin shortcut solution.**" BUILTIN_PLANS.update(load_builtin_plans("builtin_plan.json")) logger.info(BUILTIN_PLANS) def plan(self, request: str, state: dict): logger.info(request) resource_memory = state.get("resources", {}) raw_solution = None # shortcut for builtin plan for trigger_prompt, _ in BUILTIN_PLANS.items(): if request == trigger_prompt: return self.SHORTCUT # dynamic execution if raw_solution is None: raw_solution = self.planner.plan(request, resource_memory) return raw_solution def parse_solution_from_stream(self, raw_solution): return self.planner.parse(raw_solution) def execute(self, raw_solution: str, state: dict): resource_memory = state.get("resources") request = state["request"] solution = None if raw_solution == self.SHORTCUT: for trigger_prompt, builtin_plan in BUILTIN_PLANS.items(): if request == trigger_prompt: solution = builtin_plan solution = self._fill_args(solution, resource_memory) else: solution = self.planner.parse(raw_solution) if not solution: return None try: history_msgs = state.get("history_msgs") return self.interpretor.interpret(solution, history_msgs) except Exception as e: traceback.print_exc() return None def reply(self, executed_plan: dict, outputs: list, state: dict): error_response = [ auto_type( "response", DataType.TEXT, "Sorry, I cannot understand your request due to an internal error.", ) ] state = copy.deepcopy(state) if ( executed_plan is None or len(executed_plan) == 0 or outputs is None or len(outputs) == 0 ): return error_response, state resources = state.get("resources", OrderedDict()) for o in outputs: if isinstance(o, container.File): resources[str(o.filename)] = str(o.rtype) state["resources"] = resources response = generate_response(state["request"], executed_plan, outputs) if len(response) == 0: return error_response, state logger.info(response) return response, state def run(self, task: str, state: dict) -> Tuple[List, str]: try: return self._run(task, state) except: traceback.print_exc() logger.info(traceback.format_exc()) return [ auto_type( "response", DataType.TEXT, "Sorry, I cannot understand your request due to an internal error.", ) ], "Internal Error" def _run(self, task: str, state: dict) -> Tuple[List, str]: logger.info(task) BUILTIN_PLANS.update(load_builtin_plans("builtin_plan.json")) logger.info(BUILTIN_PLANS) resource_memory = state.get("resources", OrderedDict()) history_msgs = state.get("history_msgs", []) plan = None # shortcut for builtin plan for trigger_prompt, builtin_plan in BUILTIN_PLANS.items(): if task == trigger_prompt: plan = builtin_plan plan = self._fill_args(plan, resource_memory) # dynamic executation if plan is None: plan = self.planner.planning(task, resource_memory) logger.info(plan) executed_plan, output_files = self.interpretor.interpret( plan, resource_memory, history_msgs ) logger.info(output_files) for o in output_files: if isinstance(o, container.File): resource_memory[o.filename] = str(o.rtype) outputs = generate_response(task, executed_plan, output_files) logger.info(outputs) return outputs, executed_plan def _fill_args(self, plan, memory): plan = copy.deepcopy(plan) latest_resource = OrderedDict() for key, val in memory.items(): latest_resource[val] = key for actions in plan: for action in actions: for key, val in action.inputs.items(): if "" not in val: action.inputs[key] = latest_resource.get(val, val) return plan