import os |
import hydra |
from flows.backends.api_info import ApiInfo |
from flows.messages import InputMessage |
from flows.utils.general_helpers import read_yaml_file |
from flows import logging |
from flows.flow_cache import CACHING_PARAMETERS, clear_cache |
CACHING_PARAMETERS.do_caching = False |
logging.set_verbosity_debug() |
logging.auto_set_dir() |
dependencies = [ |
{"url": "Tachi67/ReplanningFlowModule", "revision": "main"}, |
{"url": "Tachi67/PlanGeneratorFlowModule", "revision": "main"}, |
{"url": "Tachi67/InteractivePlanGenFlowModule", "revision": "main"}, |
{"url": "Tachi67/PlanWriterFlowModule", "revision": "main"}, |
{"url": "Tachi67/PlanFileEditFlowModule", "revision": "main"}, |
{"url": "Tachi67/ParseFeedbackFlowModule", "revision": "main"}, |
{"url": "aiflows/HumanStandardInputFlowModule", "revision": "5683a922372c5fa90be9f6447d6662d8d80341fc"} |
] |
from flows import flow_verse |
flow_verse.sync_dependencies(dependencies) |
if __name__ == "__main__": |
key = os.getenv("OPENAI_API_KEY") |
api_information = [ApiInfo(backend_used="openai", api_key=os.getenv("OPENAI_API_KEY"))] |
path_to_output_file = None |
current_dir = os.getcwd() |
cfg_path = os.path.join(current_dir, "ReplanningFlow.yaml") |
cfg = read_yaml_file(cfg_path) |
cfg["subflows_config"]["Controller"]["backend"]["api_infos"] = api_information |
cfg["subflows_config"]["Executor"]["subflows_config"]["write_plan"]["subflows_config"]["PlanGenerator"]["backend"]["api_infos"] = api_information |
ReplanningFlow = hydra.utils.instantiate(cfg, _recursive_=False, _convert_="partial") |
plan_file_location = os.path.join(current_dir, "ExtLib_plan.txt") |
with open(plan_file_location, 'w') as file: |
pass |
input_data = { |
"goal": "data source OpenWeatherMap is deprecated", |
"plan": "1. Write a function that fetches today's weather at Lausanne from OpenWeatherMap. \n 2. Write a funtion that prints the fetched data in a formatted way.", |
"plan_file_location": plan_file_location |
} |
input_message = InputMessage.build( |
data_dict=input_data, |
src_flow="Launcher", |
dst_flow=ReplanningFlow.name |
) |
output_message = ReplanningFlow(input_message) |