from aiflows.base_flows import CompositeFlow from aiflows.utils import logging from aiflows.interfaces import KeyInterface logging.set_verbosity_debug() from aiflows.messages import FlowMessage log = logging.get_logger(__name__) class ChatWithDemonstrationsFlow(CompositeFlow): """ A Chat with Demonstrations Flow. It is a flow that consists of multiple sub-flows that are executed sequentially. It's parent class is SequentialFlow. It Contains the following subflows: - A Demonstration Flow: It is a flow that passes demonstrations to the ChatFlow - A Chat Flow: It is a flow that uses the demonstrations to answer queries asked by the user/human. An illustration of the flow is as follows: -------> Demonstration Flow -------> Chat Flow -------> *Configuration Parameters*: - `name` (str): The name of the flow. Default: "ChatAtomic_Flow_with_Demonstrations" - `description` (str): A description of the flow. This description is used to generate the help message of the flow. Default: "A sequential flow that answers questions with demonstrations" - `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations of the sequential Flow. Default: - `Demonstration Flow`: The configuration of the Demonstration Flow. By default, it a DemonstrationsAtomicFlow. Its default parmaters are defined in DemonstrationsAtomicFlow). - `Chat Flow`: The configuration of the Chat Flow. By default, its a ChatAtomicFlow. Its default parmaters are defined in ChatAtomicFlowModule (see Flowcard, i.e. README.md, of ChatAtomicFlowModule). - `topology` (str): The topology of the flow which is "sequential". By default, the topology is the one shown in the illustration above (the topology is also described in ChatWithDemonstrationsFlow.yaml). *Input Interface*: - `query` (str): A query asked to the flow (e.g. "What is the capital of France?") Output Interface: - `answer` (str): The answer of the flow to the query :param \**kwargs: Arguments to be passed to the parent class SequentialFlow constructor. """ def __init__(self,**kwargs): super().__init__(**kwargs) self.output_interface = KeyInterface( keys_to_rename={"api_output": "answer"} ) def set_up_flow_state(self): """ Set up the flow state. It sets the last state of the flow to None. """ super().set_up_flow_state() self.flow_state["last_state"] = None def run(self,input_message): """ Runs the flow. :param input_message: The input message of the flow. :type input_message: FlowMessage """ # #~~~~~~~~~~~Solution 1 - Blocking ~~~~~~~ # future = self.subflows["demonstration_flow"].get_reply_future(input_message) # answer = self.subflows["chat_flow"].get_reply_future(future.get_message()) #reply = self.package_output_message(input_message, self.output_interface(answer.get_data())) # self.send_message(reply) # #~~~~~~~~~~~Solution 2 - Non-Blocking ~~~~~~~ if self.flow_state["last_state"] is None: self.flow_state["initial_message"] = input_message msg = self.package_input_message( input_message.data ) self.subflows["demonstration_flow"].get_reply( msg, ) self.flow_state["last_state"] = "demonstration_flow" elif self.flow_state["last_state"] == "demonstration_flow": msg = self.package_input_message( input_message.data ) self.subflows["chat_flow"].get_reply( msg, ) self.flow_state["last_state"] = "chat_flow" else: self.flow_state["last_state"] = None reply = self.package_output_message( self.flow_state["initial_message"], response = self.output_interface(input_message).data ) self.send_message( reply )