|
from flow_modules.aiflows.HumanStandardInputFlowModule import HumanStandardInputFlow |
|
|
|
from typing import Dict, Any |
|
|
|
from aiflows.messages import UpdateMessage_Generic |
|
|
|
from aiflows.utils import logging |
|
|
|
log = logging.get_logger(f"aiflows.{__name__}") |
|
|
|
|
|
class FinalAns_Jarvis(HumanStandardInputFlow): |
|
"""This class inherits from the HumanStandardInputFlow class. |
|
It is used to give the final answer to the user. |
|
""" |
|
def run(self, |
|
input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
|
query_message = self._get_message(self.query_message_prompt_template, input_data) |
|
state_update_message = UpdateMessage_Generic( |
|
created_by=self.flow_config['name'], |
|
updated_flow=self.flow_config["name"], |
|
data={"query_message": query_message}, |
|
) |
|
self._log_message(state_update_message) |
|
|
|
log.info(query_message) |
|
human_input = self._read_input() |
|
|
|
answer = input_data["answer"] |
|
response = {} |
|
response["result"] = human_input |
|
response["summary"] = f"Final answer provided: {answer}; feedback of user: {human_input}" |
|
|
|
return response |