ChatWithDemonstrationsFlowModule / ChatWithDemonstrationsFlow.py
nbaldwin's picture
merge with coflows
210a49b
from aiflows.base_flows import CompositeFlow
from aiflows.utils import logging
from aiflows.interfaces import KeyInterface
logging.set_verbosity_debug()
from aiflows.messages import FlowMessage
log = logging.get_logger(__name__)
class ChatWithDemonstrationsFlow(CompositeFlow):
""" A Chat with Demonstrations Flow. It is a flow that consists of multiple sub-flows that are executed sequentially.
It's parent class is SequentialFlow.
It Contains the following subflows:
- A Demonstration Flow: It is a flow that passes demonstrations to the ChatFlow
- A Chat Flow: It is a flow that uses the demonstrations to answer queries asked by the user/human.
An illustration of the flow is as follows:
-------> Demonstration Flow -------> Chat Flow ------->
*Configuration Parameters*:
- `name` (str): The name of the flow. Default: "ChatAtomic_Flow_with_Demonstrations"
- `description` (str): A description of the flow. This description is used to generate the help message of the flow.
Default: "A sequential flow that answers questions with demonstrations"
- `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations of the sequential Flow. Default:
- `Demonstration Flow`: The configuration of the Demonstration Flow. By default, it a DemonstrationsAtomicFlow.
Its default parmaters are defined in DemonstrationsAtomicFlow).
- `Chat Flow`: The configuration of the Chat Flow. By default, its a ChatAtomicFlow.
Its default parmaters are defined in ChatAtomicFlowModule (see Flowcard, i.e. README.md, of ChatAtomicFlowModule).
- `topology` (str): The topology of the flow which is "sequential". By default, the topology is the one shown in the
illustration above (the topology is also described in ChatWithDemonstrationsFlow.yaml).
*Input Interface*:
- `query` (str): A query asked to the flow (e.g. "What is the capital of France?")
Output Interface:
- `answer` (str): The answer of the flow to the query
:param \**kwargs: Arguments to be passed to the parent class SequentialFlow constructor.
"""
def __init__(self,**kwargs):
super().__init__(**kwargs)
self.output_interface = KeyInterface(
keys_to_rename={"api_output": "answer"}
)
def set_up_flow_state(self):
""" Set up the flow state. It sets the last state of the flow to None.
"""
super().set_up_flow_state()
self.flow_state["last_state"] = None
def run(self,input_message):
""" Runs the flow.
:param input_message: The input message of the flow.
:type input_message: FlowMessage
"""
# #~~~~~~~~~~~Solution 1 - Blocking ~~~~~~~
# future = self.subflows["demonstration_flow"].get_reply_future(input_message)
# answer = self.subflows["chat_flow"].get_reply_future(future.get_message())
#reply = self.package_output_message(input_message, self.output_interface(answer.get_data()))
# self.send_message(reply)
# #~~~~~~~~~~~Solution 2 - Non-Blocking ~~~~~~~
if self.flow_state["last_state"] is None:
self.flow_state["initial_message"] = input_message
msg = self.package_input_message(
input_message.data
)
self.subflows["demonstration_flow"].get_reply(
msg,
)
self.flow_state["last_state"] = "demonstration_flow"
elif self.flow_state["last_state"] == "demonstration_flow":
msg = self.package_input_message(
input_message.data
)
self.subflows["chat_flow"].get_reply(
msg,
)
self.flow_state["last_state"] = "chat_flow"
else:
self.flow_state["last_state"] = None
reply = self.package_output_message(
self.flow_state["initial_message"],
response = self.output_interface(input_message).data
)
self.send_message(
reply
)