kenken999's picture
fda
0f43f8a
raw
history blame
4.88 kB
import os
from dotenv import load_dotenv
import importlib.util
import json
import openai
import concurrent.futures
import time
from datetime import datetime
from skills.skill import Skill
from skills.skill_registry import SkillRegistry
from tasks.task_registry import TaskRegistry
load_dotenv() # Load environment variables from .env file
# Retrieve all API keys
api_keys = {
'openai': os.environ['OPENAI_API_KEY'],
'serpapi': os.environ['SERPAPI_API_KEY']
# Add more keys here as needed
}
# Set OBJECTIVE
OBJECTIVE = "Create an example objective and tasklist for 'write a poem', which only uses text_completion in the tasks. Do this by usign code_reader to read example1.json, then writing the JSON objective tasklist pair using text_completion, and saving it using objective_saver."
LOAD_SKILLS = ['text_completion','code_reader','objective_saver']
REFLECTION = False
##### START MAIN LOOP########
# Print OBJECTIVE
print("\033[96m\033[1m"+"\n*****OBJECTIVE*****\n"+"\033[0m\033[0m")
print(OBJECTIVE)
if __name__ == "__main__":
session_summary = ""
# Initialize the SkillRegistry and TaskRegistry
skill_registry = SkillRegistry(api_keys=api_keys, skill_names=LOAD_SKILLS)
skill_descriptions = ",".join(f"[{skill.name}: {skill.description}]" for skill in skill_registry.skills.values())
task_registry = TaskRegistry()
# Create the initial task list based on an objective
task_registry.create_tasklist(OBJECTIVE, skill_descriptions)
# Initialize task outputs
task_outputs = {i: {"completed": False, "output": None} for i, _ in enumerate(task_registry.get_tasks())}
# Create a thread pool for parallel execution
with concurrent.futures.ThreadPoolExecutor() as executor:
# Loop until all tasks are completed
while not all(task["completed"] for task in task_outputs.values()):
# Get the tasks that are ready to be executed (i.e., all their dependencies have been completed)
tasks = task_registry.get_tasks()
# Print the updated task list
task_registry.print_tasklist(tasks)
# Update task_outputs to include new tasks
for task in tasks:
if task["id"] not in task_outputs:
task_outputs[task["id"]] = {"completed": False, "output": None}
ready_tasks = [(task["id"], task) for task in tasks
if all((dep in task_outputs and task_outputs[dep]["completed"])
for dep in task.get('dependent_task_ids', []))
and not task_outputs[task["id"]]["completed"]]
session_summary += str(task)+"\n"
futures = [executor.submit(task_registry.execute_task, task_id, task, skill_registry, task_outputs, OBJECTIVE)
for task_id, task in ready_tasks if not task_outputs[task_id]["completed"]]
# Wait for the tasks to complete
for future in futures:
i, output = future.result()
task_outputs[i]["output"] = output
task_outputs[i]["completed"] = True
# Update the task in the TaskRegistry
task_registry.update_tasks({"id": i, "status": "completed", "result": output})
completed_task = task_registry.get_task(i)
print(f"\033[92mTask #{i}: {completed_task.get('task')} \033[0m\033[92m[COMPLETED]\033[0m\033[92m[{completed_task.get('skill')}]\033[0m")
# Reflect on the output
if output:
session_summary += str(output)+"\n"
if REFLECTION == True:
new_tasks, insert_after_ids, tasks_to_update = task_registry.reflect_on_output(output, skill_descriptions)
# Insert new tasks
for new_task, after_id in zip(new_tasks, insert_after_ids):
task_registry.add_task(new_task, after_id)
# Update existing tasks
for task_to_update in tasks_to_update:
task_registry.update_tasks(task_to_update)
#print(task_outputs.values())
if all(task["status"] == "completed" for task in task_registry.tasks):
print("All tasks completed!")
break
# Short delay to prevent busy looping
time.sleep(0.1)
# Print session summary
print("\033[96m\033[1m"+"\n*****SAVING FILE...*****\n"+"\033[0m\033[0m")
file = open(f'output/output_{datetime.now().strftime("%d_%m_%Y_%H_%M_%S")}.txt', 'w')
file.write(session_summary)
file.close()
print("...file saved.")
print("END")
executor.shutdown()