ContentWriterFlowModule / ContentWritrerFlow.py
Tachi67's picture
Upload 5 files
b9e485c
from typing import Dict, Any
from aiflows.base_flows import CircularFlow
from aiflows.utils import logging
from abc import ABC, abstractmethod
logging.set_verbosity_debug()
log = logging.get_logger(__name__)
class ContentWriterFlow(CircularFlow, ABC):
"""This is an abstract class for writing content (plan, code)
The ContentWriterFlow is made of a controller and a branching executor.
Each time the controller is called, the controller decides whether to write content
or to finish. If the content writer executor is called, the executor will write content
in an interactive way, finally, the user is able to give feedback to the content, so that
the controller can decide whether to write content again or to finish.
*Configuration Parameters*:
- `name`
- `description`
- `max_round`
- `subflows_config`:
- `Controller` (dict): The controller that decides whether to write content or to finish.
- `Executor` (dict): A branching flow, we configure the specific executor in the subflows of the executor.
- `early_exit_key`: The key of the early exit variable in the output payload of the executor.
- `topology`: The topology of the subflows, this describes the I/O interface instances.
*Input Interface*:
- `goal`
*Output Interface*:
- `answer`
- `status`
"""
@abstractmethod
def _on_reach_max_round(self):
"""
should update flow state dictionary about the output variables and status.
"""
pass
@abstractmethod
@CircularFlow.output_msg_payload_processor
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]:
"""
1. Writing content to file;
2. Finish and early exit.
"""
pass