File size: 10,994 Bytes
82b9374 fd29476 82b9374 fd29476 82b9374 210a49b 82b9374 798fa73 210a49b 798fa73 82b9374 798fa73 82b9374 798fa73 82b9374 798fa73 82b9374 798fa73 82b9374 798fa73 82b9374 798fa73 82b9374 798fa73 82b9374 210a49b 798fa73 210a49b 798fa73 210a49b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
import jinja2
from aiflows.base_flows import AtomicFlow
from aiflows.utils import logging
from aiflows.utils import general_helpers
from typing import Dict,Any,Optional,List
from aiflows.prompt_template import JinjaPrompt
from copy import deepcopy
from aiflows.messages import FlowMessage
import os
import hydra
log = logging.get_logger(__name__)
class DemonstrationsAtomicFlow(AtomicFlow):
""" This class implements a Demonstrations Atomic Flow. It is a flow which is usually used to pass demonstrations (of user assistant interactions)
to the ChatAtomicFlow.
*Configuration Parameters*:
- `name` (str): The name of the flow. Default: "DemonstrationsAtomicFlow"
- `description` (str): A description of the flow. This description is used to generate the help message of the flow.
Default: "A flow that passes demonstrations to the ChatFlow"
- `data` (List[Dict[str, Any]]): The data of the demonstrations.
If data is None, the data is loaded from the file specified in the params["data_dir"].
Default: No default value this field must be set.
- `params` (Dict[str, Any]): The parameters specific to the dataset of the demonstrations. Its default parameters are:
- `data_dir` (str): The directory where the demonstrations are stored. If the data is not directly passed to the flow through `data` then
the data is loaded from this directory. Default: No default value this field must be set.
- `demonstrations_id` (str): The id of the demonstrations (name of the data file). If the data is not directly passed to the flow through `data` then
the data is loaded from this file. Default: No default value this field must be set.
- `demonstrations_k` (int): The number of demonstrations to pass to the ChatFlow.
If None, all the demonstrations are passed to the ChatFlow. Default: None
- `query_prompt_template` (Dict[str, Any]): The prompt template used to generate the query of the demonstrations.
By default its of type flows.prompt_template.JinjaPrompt. None of the parameters of the prompt are defined by default and therefore need to be defined if one
wants to use the query_prompt_template. Default parameters are defined in flows.prompt_template.jinja2_prompts.JinjaPrompt.
- `response_prompt_template` (Dict[str, Any]): The prompt template used to generate the response of the demonstrations. By default its of type flows.prompt_template.JinjaPrompt.
None of the parameters of the prompt are defined by default and therefore need to be defined if one
wants to use the response_prompt_template. Default parameters are defined in flows.prompt_template.jinja2_prompts.JinjaPrompt.
*Input Interface*:
- The input interface expected by its successor flow (e.g. typically ChatAtomicFlow so the input interface is the one expected by ChatAtomicFlow)
*Output Interface*:
- Whichever data that was passed in the input_message (e.g. typically ChatAtomicFlow so the input interface expected by ChatAtomicFlow))
- `demonstrations` (List[Dict[str, Any]]): A list of demonstrations. Each demonstration is a dictionary with the following keys:
- idx (int): The index of the demonstration
- query (str): The query of the demonstration
- response (str): The response of the demonstration
:param params: The parameters specific to the dataset of the demonstrations. It must sould contain the following keys:
- 'data_dir' (str): The directory where the demonstrations are stored. This field is used if the data is not directly passed to the flow through the 'data' field.
- 'demonstrations_id' (str): The id of the demonstrations (name of the data file). This field is used if the data is not directly passed to the flow through the 'data' field.
- 'demonstrations_k' (int): The number of demonstrations to pass to the ChatFlow. If None, all the demonstrations are passed to the ChatFlow.
- 'ids_to_keep' (Optional[Union[str, List[str]]]): The ids of the demonstrations to keep. If None, all the demonstrations are kept.
:type params: Dict[str, Any]
:param query_prompt_template: The prompt template used to generate the query of the demonstrations.
:type query_prompt_template: JinjaPrompt
:param response_prompt_template: The prompt template used to generate the response of the demonstrations.
:type response_prompt_template: JinjaPrompt
:param data: The data of the demonstrations. If None, the data is loaded from the file specified in the params.
:type data: Optional[List[Dict[str, Any]]]
"""
demonstrations_k: Optional[int] = None
query_prompt_template: JinjaPrompt
response_prompt_template: JinjaPrompt
params: Dict
def __init__(self,params,query_prompt_template,response_prompt_template, data=None,**kwargs):
super().__init__(**kwargs)
self.params = params
self.data = data
self.demonstrations_k = self.params.get("demonstrations_k", None)
#typically the query would be what the user (human) asks the assistant (LLM)
self.query_prompt_template = query_prompt_template
#typically the response would be what the assistant (LLM) should answer to the user (human)
self.response_prompt_template = response_prompt_template
if self.data is None:
self._load_data()
@classmethod
def _set_up_prompts(cls, config):
""" This method instantiates the prompt templates of the flow (used when instantiating the flow from a config file)
:param config: The configuration of the flow.
:type config: Dict[str, Any]
:return: A dictionary of keyword arguments to pass to the constructor of the flow.
:rtype: Dict[str, Any]
"""
kwargs = {}
kwargs["query_prompt_template"] = \
hydra.utils.instantiate(config['query_prompt_template'], _convert_="partial")
kwargs["response_prompt_template"] = \
hydra.utils.instantiate(config['response_prompt_template'], _convert_="partial")
return kwargs
@classmethod
def instantiate_from_config(cls, config):
""" This method instantiates the flow from a config file.
:param config: The configuration of the flow.
:type config: Dict[str, Any]
:return: The instantiated flow.
:rtype: Flow
"""
flow_config = deepcopy(config)
kwargs = {"flow_config": flow_config}
# ~~~ Set up prompts ~~~
kwargs.update(cls._set_up_prompts(flow_config))
kwargs.update({"params": flow_config["params"]})
kwargs.update({"data": flow_config["data"]})
# ~~~ Instantiate flow ~~~
return cls(**kwargs)
def _get_query_message_content(self, sample_data: Dict):
""" This method returns the query message content of a demonstration given the sample data (by rendering the query prompt template).
:param sample_data: The sample data of the demonstration.
:type sample_data: Dict[str, Any]
:return: The query message content of the demonstration.
:rtype: str
"""
input_variables = self.query_prompt_template.input_variables
return self.query_prompt_template.format(**{k: sample_data[k] for k in input_variables})
def _get_response_message_content(self, sample_data: Dict):
""" This method returns the response message content of a demonstration given the sample data (by rendering the response prompt template).
:param sample_data: The sample data of the demonstration.
:type sample_data: Dict[str, Any]
:return: The response message content of the demonstration.
:rtype: str
"""
input_variables = self.response_prompt_template.input_variables
return self.response_prompt_template.format(**{k: sample_data[k] for k in input_variables})
def _get_io_pair(self, idx):
""" This method, given the index of a demonstration, returns an query-response pair from the demonstrations data.
:param idx: The index of the demonstration.
:type idx: int
:return: The query-response pair at idx from the demonstrations data.
:rtype: Dict[str, Any]
"""
dp = self.data[idx]
query_data = dp["query_data"]
response_data = dp["response_data"]
query = self._get_query_message_content(query_data)
response = self._get_response_message_content(response_data)
return {"idx": idx, "query": query,"response": response}
def _get_io_pairs(self,input_data: Dict[str, Any]) -> List[Any]:
""" This method returns the demonstrations that are passed to the destination flow (typically ChatAtomicFlow).
:param input_data: The input data of the flow.
:type input_data: Dict[str, Any]
:return: The demonstrations that are passed to the destination flow.
:rtype: List[Any]
"""
demonstrations_k = self.demonstrations_k if self.demonstrations_k is not None else len(self.data)
io_pairs = [self._get_io_pair(idx) for idx in range(demonstrations_k)]
return io_pairs
def _load_data(self):
""" This method loads the demonstrations from the file specified in the params. It also filters the demonstrations if the ids_to_keep parameter is specified."""
demonstrations_file = os.path.join(self.params["data_dir"], f"{self.params['demonstrations_id']}.jsonl")
self.data = general_helpers.read_jsonlines(demonstrations_file)
if self.params.get("ids_to_keep", False):
if isinstance(self.params["ids_to_keep"], str):
ids_to_keep = set(self.params["ids_to_keep"].split(","))
else:
ids_to_keep = set(self.params["ids_to_keep"])
self.data = [d for d in self.data if d["id"] in ids_to_keep]
log.info("Loaded the demonstrations for %d datapoints from %s", len(self.data), self.params["data_dir"])
def run(self,
input_message: FlowMessage):
""" This method runs the flow. It returns the data of the input_message with the demonstrations added to it.
:param input_message: The input message of the flow.
:type input_message: FlowMessage
"""
input_data = input_message.data
reply = self.package_output_message(
input_message=input_message,
response = {**{"demonstrations": self._get_io_pairs(input_data=input_data)},**input_data}
)
self.send_message(reply) |