File size: 4,364 Bytes
593a9f3 c299ff8 593a9f3 c299ff8 593a9f3 c299ff8 593a9f3 1ed169b c299ff8 1ed169b b9abee6 593a9f3 b9abee6 d3ee82f 593a9f3 c299ff8 8c63faa b9abee6 8c63faa b9abee6 c299ff8 593a9f3 c299ff8 8c63faa b9abee6 8c63faa b9abee6 8c63faa c299ff8 1ed169b c299ff8 8c63faa b9abee6 8c63faa b9abee6 d3ee82f 1ed169b 593a9f3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
from aiflows.base_flows import CompositeFlow
from aiflows.utils import logging
from aiflows.interfaces import KeyInterface
logging.set_verbosity_debug()
from aiflows.messages import FlowMessage
log = logging.get_logger(__name__)
class ChatWithDemonstrationsFlow(CompositeFlow):
""" A Chat with Demonstrations Flow. It is a flow that consists of multiple sub-flows that are executed sequentially.
It's parent class is SequentialFlow.
It Contains the following subflows:
- A Demonstration Flow: It is a flow that passes demonstrations to the ChatFlow
- A Chat Flow: It is a flow that uses the demonstrations to answer queries asked by the user/human.
An illustration of the flow is as follows:
-------> Demonstration Flow -------> Chat Flow ------->
*Configuration Parameters*:
- `name` (str): The name of the flow. Default: "ChatAtomic_Flow_with_Demonstrations"
- `description` (str): A description of the flow. This description is used to generate the help message of the flow.
Default: "A sequential flow that answers questions with demonstrations"
- `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations of the sequential Flow. Default:
- `Demonstration Flow`: The configuration of the Demonstration Flow. By default, it a DemonstrationsAtomicFlow.
Its default parmaters are defined in DemonstrationsAtomicFlow).
- `Chat Flow`: The configuration of the Chat Flow. By default, its a ChatAtomicFlow.
Its default parmaters are defined in ChatAtomicFlowModule (see Flowcard, i.e. README.md, of ChatAtomicFlowModule).
- `topology` (str): The topology of the flow which is "sequential". By default, the topology is the one shown in the
illustration above (the topology is also described in ChatWithDemonstrationsFlow.yaml).
*Input Interface*:
- `query` (str): A query asked to the flow (e.g. "What is the capital of France?")
Output Interface:
- `answer` (str): The answer of the flow to the query
:param \**kwargs: Arguments to be passed to the parent class SequentialFlow constructor.
"""
def __init__(self,**kwargs):
super().__init__(**kwargs)
self.output_interface = KeyInterface(
keys_to_rename={"api_output": "answer"}
)
def set_up_flow_state(self):
""" Set up the flow state. It sets the last state of the flow to None.
"""
super().set_up_flow_state()
self.flow_state["last_state"] = None
def run(self,input_message):
""" Runs the flow.
:param input_message: The input message of the flow.
:type input_message: FlowMessage
"""
# #~~~~~~~~~~~Solution 1 - Blocking ~~~~~~~
# future = self.subflows["demonstration_flow"].get_reply_future(input_message)
# answer = self.subflows["chat_flow"].get_reply_future(future.get_message())
#reply = self.package_output_message(input_message, self.output_interface(answer.get_data()))
# self.send_message(reply)
# #~~~~~~~~~~~Solution 2 - Non-Blocking ~~~~~~~
if self.flow_state["last_state"] is None:
self.flow_state["initial_message"] = input_message
msg = self.package_input_message(
input_message.data
)
self.subflows["demonstration_flow"].get_reply(
msg,
)
self.flow_state["last_state"] = "demonstration_flow"
elif self.flow_state["last_state"] == "demonstration_flow":
msg = self.package_input_message(
input_message.data
)
self.subflows["chat_flow"].get_reply(
msg,
)
self.flow_state["last_state"] = "chat_flow"
else:
self.flow_state["last_state"] = None
reply = self.package_output_message(
self.flow_state["initial_message"],
response = self.output_interface(input_message).data
)
self.send_message(
reply
)
|