Spaces:
Sleeping
Sleeping
File size: 5,830 Bytes
14dc68f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
import openai
import pinecone
import time
from collections import deque
from typing import Dict, List
#Set API Keys
OPENAI_API_KEY = ""
PINECONE_API_KEY = ""
PINECONE_ENVIRONMENT = "us-east1-gcp" #Pinecone Environment (eg. "us-east1-gcp")
#Set Variables
YOUR_TABLE_NAME = "test-table"
OBJECTIVE = "Solve world hunger."
YOUR_FIRST_TASK = "Develop a task list."
#Print OBJECTIVE
print("\033[96m\033[1m"+"\n*****OBJECTIVE*****\n"+"\033[0m\033[0m")
print(OBJECTIVE)
# Configure OpenAI and Pinecone
openai.api_key = OPENAI_API_KEY
pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENVIRONMENT)
# Create Pinecone index
table_name = YOUR_TABLE_NAME
dimension = 1536
metric = "cosine"
pod_type = "p1"
if table_name not in pinecone.list_indexes():
pinecone.create_index(table_name, dimension=dimension, metric=metric, pod_type=pod_type)
# Connect to the index
index = pinecone.Index(table_name)
# Task list
task_list = deque([])
def add_task(task: Dict):
task_list.append(task)
def get_ada_embedding(text):
text = text.replace("\n", " ")
return openai.Embedding.create(input=[text], model="text-embedding-ada-002")["data"][0]["embedding"]
def task_creation_agent(objective: str, result: Dict, task_description: str, task_list: List[str]):
prompt = f"You are an task creation AI that uses the result of an execution agent to create new tasks with the following objective: {objective}, The last completed task has the result: {result}. This result was based on this task description: {task_description}. These are incomplete tasks: {', '.join(task_list)}. Based on the result, create new tasks to be completed by the AI system that do not overlap with incomplete tasks. Return the tasks as an array."
response = openai.Completion.create(engine="text-davinci-003",prompt=prompt,temperature=0.5,max_tokens=100,top_p=1,frequency_penalty=0,presence_penalty=0)
new_tasks = response.choices[0].text.strip().split('\n')
return [{"task_name": task_name} for task_name in new_tasks]
def prioritization_agent(this_task_id:int):
global task_list
task_names = [t["task_name"] for t in task_list]
next_task_id = int(this_task_id)+1
prompt = f"""You are an task prioritization AI tasked with cleaning the formatting of and reprioritizing the following tasks: {task_names}. Consider the ultimate objective of your team:{OBJECTIVE}. Do not remove any tasks. Return the result as a numbered list, like:
#. First task
#. Second task
Start the task list with number {next_task_id}."""
response = openai.Completion.create(engine="text-davinci-003",prompt=prompt,temperature=0.5,max_tokens=1000,top_p=1,frequency_penalty=0,presence_penalty=0)
new_tasks = response.choices[0].text.strip().split('\n')
task_list = deque()
for task_string in new_tasks:
task_parts = task_string.strip().split(".", 1)
if len(task_parts) == 2:
task_id = task_parts[0].strip()
task_name = task_parts[1].strip()
task_list.append({"task_id": task_id, "task_name": task_name})
def execution_agent(objective:str,task: str) -> str:
#context = context_agent(index="quickstart", query="my_search_query", n=5)
context=context_agent(index=YOUR_TABLE_NAME, query=objective, n=5)
#print("\n*******RELEVANT CONTEXT******\n")
#print(context)
response = openai.Completion.create(
engine="text-davinci-003",
prompt=f"You are an AI who performs one task based on the following objective: {objective}. Your task: {task}\nResponse:",
temperature=0.7,
max_tokens=2000,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
return response.choices[0].text.strip()
def context_agent(query: str, index: str, n: int):
query_embedding = get_ada_embedding(query)
index = pinecone.Index(index_name=index)
results = index.query(query_embedding, top_k=n,
include_metadata=True)
#print("***** RESULTS *****")
#print(results)
sorted_results = sorted(results.matches, key=lambda x: x.score, reverse=True)
return [(str(item.metadata['task'])) for item in sorted_results]
# Add the first task
first_task = {
"task_id": 1,
"task_name": YOUR_FIRST_TASK
}
add_task(first_task)
# Main loop
task_id_counter = 1
while True:
if task_list:
# Print the task list
print("\033[95m\033[1m"+"\n*****TASK LIST*****\n"+"\033[0m\033[0m")
for t in task_list:
print(str(t['task_id'])+": "+t['task_name'])
# Step 1: Pull the first task
task = task_list.popleft()
print("\033[92m\033[1m"+"\n*****NEXT TASK*****\n"+"\033[0m\033[0m")
print(str(task['task_id'])+": "+task['task_name'])
# Send to execution function to complete the task based on the context
result = execution_agent(OBJECTIVE,task["task_name"])
this_task_id = int(task["task_id"])
print("\033[93m\033[1m"+"\n*****TASK RESULT*****\n"+"\033[0m\033[0m")
print(result)
# Step 2: Enrich result and store in Pinecone
enriched_result = {'data': result} # This is where you should enrich the result if needed
result_id = f"result_{task['task_id']}"
vector = enriched_result['data'] # extract the actual result from the dictionary
index.upsert([(result_id, get_ada_embedding(vector),{"task":task['task_name'],"result":result})])
# Step 3: Create new tasks and reprioritize task list
new_tasks = task_creation_agent(OBJECTIVE,enriched_result, task["task_name"], [t["task_name"] for t in task_list])
for new_task in new_tasks:
task_id_counter += 1
new_task.update({"task_id": task_id_counter})
add_task(new_task)
prioritization_agent(this_task_id)
time.sleep(1) # Sleep before checking the task list again
|