Spaces:
Running
Running
import json | |
import os | |
from multiprocessing import Pool, cpu_count | |
# import requests | |
from tenacity import RetryError | |
import concurrent.futures # Add this import at the top of your file | |
import re | |
import logging | |
import chainlit as cl | |
from termcolor import colored | |
from typing import Any, Dict, Union, List | |
from typing import TypedDict, Annotated | |
from langgraph.graph.message import add_messages | |
from agents.base_agent import BaseAgent | |
from utils.read_markdown import read_markdown_file | |
from tools.google_serper import serper_search, serper_shopping_search | |
from utils.logging import log_function, setup_logging | |
from tools.offline_graph_rag_tool import run_rag | |
from prompt_engineering.guided_json_lib import ( | |
guided_json_search_query, | |
guided_json_best_url_two, | |
guided_json_router_decision, | |
guided_json_parse_expert, | |
guided_json_search_query_two | |
) | |
setup_logging(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class MessageDict(TypedDict): | |
role: str | |
content: str | |
class State(TypedDict): | |
meta_prompt: Annotated[List[dict], add_messages] | |
conversation_history: Annotated[List[dict], add_messages] | |
requirements_gathering: Annotated[List[str], add_messages] | |
expert_plan: str | |
expert_research: Annotated[List[str], add_messages] | |
expert_research_shopping: Annotated[List[str], add_messages] | |
expert_writing: str | |
user_input: Annotated[List[str], add_messages] | |
previous_search_queries: Annotated[List[dict], add_messages] | |
router_decision: str | |
chat_limit: int | |
chat_finished: bool | |
recursion_limit: int | |
final_answer: str | |
previous_type2_work: Annotated[List[str], add_messages] | |
progress_tracking: str | |
state: State = { | |
"meta_prompt": [], | |
"conversation_history": [], | |
"requirements_gathering": [], | |
"expert_plan": [], | |
"expert_research": [], | |
"expert_research_shopping": [], | |
"expert_writing": [], | |
"user_input": [], | |
"previous_search_queries": [], | |
"router_decision": None, | |
"chat_limit": None, | |
"chat_finished": False, | |
"recursion_limit": None, | |
"final_answer": None, | |
"previous_type2_work": [], | |
"progress_tracking": None | |
} | |
def chat_counter(state: State) -> State: | |
chat_limit = state.get("chat_limit") | |
if chat_limit is None: | |
chat_limit = 0 | |
chat_limit += 1 | |
state["chat_limit"] = chat_limit | |
return chat_limit | |
def routing_function(state: State) -> str: | |
decision = state["router_decision"] | |
print(colored(f"\n\n Routing function called. Decision: {decision}\n\n", 'green')) | |
return decision | |
def format_final_response(final_response: str) -> str: | |
print(colored(f"\n\n DEBUG FINAL RESPONSE: {final_response}\n\n", 'green')) | |
# Split the response at ">> FINAL ANSWER:" | |
parts = final_response.split(">> FINAL ANSWER:") | |
if len(parts) > 1: | |
answer_part = parts[1].strip() | |
# Remove any triple quotes | |
final_response_formatted = answer_part.strip('"""') | |
# Remove leading and trailing whitespace | |
final_response_formatted = final_response_formatted.strip() | |
# Remove the CoR dictionary at the end | |
cor_pattern = r'\nCoR\s*=\s*\{[\s\S]*\}\s*$' | |
final_response_formatted = re.sub(cor_pattern, '', final_response_formatted) | |
# Remove any trailing whitespace | |
final_response_formatted = final_response_formatted.rstrip() | |
return final_response_formatted | |
else: | |
error_message = "Error: Could not find '>> FINAL ANSWER:' in the response" | |
print(colored(error_message, 'red')) | |
return "Error: No final answer found" | |
def set_chat_finished(state: State) -> State: | |
state["chat_finished"] = True | |
final_response = state["meta_prompt"][-1].content | |
# Use the formatting function | |
final_response_formatted = format_final_response(final_response) | |
agent_memory_dir = '/app/agent_memory' # No change needed | |
file_path = os.path.join(agent_memory_dir, 'jar3d_final_response_previous_run.txt') | |
# Save the formatted final response to a text file | |
with open(file_path, 'w') as file: | |
file.write(final_response_formatted) | |
# Print confirmation message | |
print(colored(f"\n\nFinal response saved to jar3d_final_response_previous_run.txt", 'green')) | |
# Print the formatted final response | |
print(colored(f"\n\n Jar3d👩💻: {final_response_formatted}", 'cyan')) | |
# Update the state with the final answer | |
# state["final_answer"] = final_response_formatted | |
return state | |
class Jar3d(BaseAgent[State]): | |
def __init__(self, model: str = None, server: str = None, temperature: float = 0, | |
model_endpoint: str = None, stop: str = None): | |
super().__init__(model, server, temperature, model_endpoint, stop) | |
self.llm = self.get_llm(json_model=False) | |
def get_prompt(self, state: State = None) -> str: | |
system_prompt_md = read_markdown_file('prompt_engineering/jar3d_requirements_prompt.md') | |
final_answer = None | |
agent_memory_dir = '/app/agent_memory' | |
file_path = os.path.join(agent_memory_dir, 'jar3d_final_response_previous_run.txt') | |
if os.path.exists(file_path): | |
with open(file_path, 'r') as file: | |
final_answer = file.read().strip() | |
# Add the final_answer to the system prompt if it exists | |
if final_answer: | |
system_prompt = f"{system_prompt_md}\n # The AI Agent's Previous Work \n <Type2> {final_answer} </Type2>" | |
print(colored(f"\n\n DEBUG JAR3D SYSTEM PROMPT FINAL ANSWER: {final_answer}\n\n", 'green')) | |
else: | |
system_prompt = system_prompt_md | |
return system_prompt | |
def process_response(self, response: Any, user_input: str, state: State = None) -> Dict[str, List[Dict[str, str]]]: | |
updates_conversation_history = { | |
"requirements_gathering": [ | |
{"role": "user", "content": f"{user_input}"}, | |
{"role": "assistant", "content": str(response)} | |
] | |
} | |
return updates_conversation_history | |
def get_conv_history(self, state: State) -> str: | |
conversation_history = state.get('requirements_gathering', []) | |
return "\n".join([f"{msg['role']}: {msg['content']}" for msg in conversation_history]) | |
def get_user_input(self) -> str: | |
pass | |
def get_guided_json(self, state: State) -> Dict[str, Any]: | |
pass | |
def use_tool(self) -> Any: | |
pass | |
def run_chainlit(self, state: State, message: cl.Message) -> State: | |
user_message = message.content | |
# system_prompt = self.get_prompt() | |
# user_input = f"{system_prompt}\n cogor {user_message}" | |
user_input = f"cogor: {user_message}" | |
state = self.invoke(state=state, user_input=user_input) | |
response = state['requirements_gathering'][-1]["content"] | |
response = re.sub(r'^```python[\s\S]*?```\s*', '', response, flags=re.MULTILINE) | |
response = response.lstrip() | |
return state, response | |
class MetaExpert(BaseAgent[State]): | |
def __init__(self, model: str = None, server: str = None, temperature: float = 0, | |
model_endpoint: str = None, stop: str = None): | |
super().__init__(model, server, temperature, model_endpoint, stop) | |
self.llm = self.get_llm(json_model=False) | |
def get_prompt(self, state:None) -> str: | |
system_prompt = read_markdown_file('prompt_engineering/jar3d_meta_prompt.md') | |
return system_prompt | |
def process_response(self, response: Any, user_input: str, state: State = None) -> Dict[str, List[MessageDict]]: | |
user_input = None | |
# Identify the type of work and expert (if applicable) from the response | |
response_str = str(response) | |
formatted_response = None | |
if ">> FINAL ANSWER:" in response_str: | |
# It's a Type 2 work - Jar3d is delivering a final answer | |
next_steps = "Jar3d has delivered a final answer" | |
formatted_response = format_final_response(response_str) | |
else: | |
# Try to extract the expert's name for Type 1 work | |
expert_match = re.search(r"Expert\s+([\w\s]+):", response_str) | |
if expert_match: | |
# It's a Type 1 work - Jar3d is allocating an expert | |
associated_expert = expert_match.group(1).strip() | |
next_steps = f"Jar3d has allocated {associated_expert} to work on your request." | |
else: | |
# Neither Type 1 nor Type 2 work detected | |
next_steps = "Jar3d is processing the request." | |
updates_conversation_history = { | |
"meta_prompt": [ | |
{"role": "user", "content": f"{user_input}"}, | |
{"role": "assistant", "content": str(response)} | |
], | |
"conversation_history": [ | |
{"role": "user", "content": f"{user_input}"}, | |
{"role": "assistant", "content": str(response)} | |
], | |
"progress_tracking": f"{next_steps}", | |
"final_answer": formatted_response | |
} | |
return updates_conversation_history | |
# @log_function(logger) | |
def get_conv_history(self, state: State) -> str: | |
all_expert_research = [] | |
if state["expert_research"]: | |
expert_research = state["expert_research"] | |
all_expert_research.extend(expert_research) | |
else: | |
all_expert_research = [] | |
max_length = 350000 | |
truncated_expert_research = all_expert_research[:max_length] | |
expert_message_history = f""" | |
<expert_plan> | |
## Your Expert Plan:\n{state.get("expert_plan", [])}\n | |
</expert_plan> | |
<expert_writing> | |
## Your Expert Writing:\n{state.get("expert_writing", [])}\n | |
</expert_writing> | |
<internet_research_shopping_list> | |
## Your Expert Shopping List:\n{state.get("expert_research_shopping", [])}\n | |
</internet_research_shopping_list> | |
<internet_research> | |
## Your Expert Research:{truncated_expert_research}\n | |
</internet_research> | |
""" | |
return expert_message_history | |
def get_user_input(self) -> str: | |
user_input = input("Enter your query: ") | |
return user_input | |
def get_guided_json(self, state: State) -> Dict[str, Any]: | |
pass | |
def use_tool(self) -> Any: | |
pass | |
def run(self, state: State) -> State: | |
counter = chat_counter(state) # Counts every time we invoke the Meta Agent | |
recursion_limit = state.get("recursion_limit") | |
recursions = 3*counter - 2 | |
print(colored(f"\n\n * We have envoked the Meta-Agent {counter} times.\n * we have run {recursions} max total iterations: {recursion_limit}\n\n", "green")) | |
upper_limit_recursions = recursion_limit | |
lower_limit_recursions = recursion_limit - 2 | |
if recursions >= lower_limit_recursions and recursions <= upper_limit_recursions: | |
final_answer = "**You are being explicitly told to produce your [Type 2] work now!**" | |
elif recursions > upper_limit_recursions: | |
extra_recursions = recursions - upper_limit_recursions | |
base_message = "**You are being explicitly told to produce your [Type 2] work now!**" | |
final_answer = (base_message + "\n") * (extra_recursions + 1) | |
else: | |
final_answer = None | |
try: | |
requirements = state['requirements_gathering'][-1]["content"] | |
except: | |
requirements = state['requirements_gathering'][-1].content | |
formatted_requirements = '\n\n'.join(re.findall(r'```python\s*([\s\S]*?)\s*```', requirements, re.MULTILINE)) | |
print(colored(f"\n\n User Requirements: {formatted_requirements}\n\n", 'green')) | |
if state.get("meta_prompt"): | |
try: | |
meta_prompt = state['meta_prompt'][-1]["content"] | |
except: | |
meta_prompt = state['meta_prompt'][-1].content | |
# print(colored(f"\n\n DEBUG Meta-Prompt: {meta_prompt}\n\n", 'yellow')) | |
cor_match = '\n\n'.join(re.findall(r'```python\s*([\s\S]*?)\s*```', meta_prompt, re.MULTILINE)) | |
# print(colored(f"\n\n DEBUG CoR Match: {cor_match}\n\n", 'yellow')) | |
user_input = f"<requirements>{formatted_requirements}</requirements> \n\n Here is your last CoR {cor_match} update your next CoR from here." | |
else: | |
user_input = formatted_requirements | |
state = self.invoke(state=state, user_input=user_input, final_answer=final_answer) | |
meta_prompt_cor = state['meta_prompt'][-1]["content"] | |
print(colored(f"\n\n Meta-Prompt Chain of Reasoning: {meta_prompt_cor}\n\n", 'green')) | |
return state | |
class NoToolExpert(BaseAgent[State]): | |
print(colored(f"\n\n DEBUG: We are running the NoToolExpert tool\n\n", 'red')) | |
def __init__(self, model: str = None, server: str = None, temperature: float = 0, | |
model_endpoint: str = None, stop: str = None): | |
super().__init__(model, server, temperature, model_endpoint, stop) | |
self.llm = self.get_llm(json_model=False) | |
def get_prompt(self, state) -> str: | |
# print(f"\nn{state}\n") | |
system_prompt = state["meta_prompt"][-1].content | |
return system_prompt | |
def process_response(self, response: Any, user_input: str = None, state: State = None) -> Dict[str, Union[str, dict]]: | |
# meta_prompts = state.get("meta_prompt", []) | |
associated_meta_prompt = state["meta_prompt"][-1].content | |
parse_expert = self.get_llm(json_model=True) | |
parse_expert_prompt = """ | |
You must parse the expert from the text. The expert will be one of the following. | |
1. Expert Planner | |
2. Expert Writer | |
Return your response as the following JSON | |
{{"expert": "Expert Planner" or "Expert Writer"}} | |
""" | |
input = [ | |
{"role": "user", "content": associated_meta_prompt}, | |
{"role": "assistant", "content": f"system_prompt:{parse_expert_prompt}"} | |
] | |
retries = 0 | |
associated_expert = None | |
while retries < 4 and associated_expert is None: | |
retries += 1 | |
if self.server == 'vllm': | |
guided_json = guided_json_parse_expert | |
parse_expert_response = parse_expert.invoke(input, guided_json) | |
else: | |
parse_expert_response = parse_expert.invoke(input) | |
associated_expert_json = json.loads(parse_expert_response) | |
associated_expert = associated_expert_json.get("expert") | |
# associated_expert = parse_expert_text(associated_meta_prompt) | |
print(colored(f"\n\n Expert: {associated_expert}\n\n", 'green')) | |
if associated_expert == "Expert Planner": | |
expert_update_key = "expert_plan" | |
if associated_expert == "Expert Writer": | |
expert_update_key = "expert_writing" | |
updates_conversation_history = { | |
"conversation_history": [ | |
{"role": "user", "content": user_input}, | |
{"role": "assistant", "content": f"{str(response)}"} | |
], | |
expert_update_key: {"role": "assistant", "content": f"{str(response)}"}, | |
"progress_tracking": f"Jar3d has completed its {associated_expert} work" | |
} | |
return updates_conversation_history | |
def get_conv_history(self, state: State) -> str: | |
pass | |
def get_user_input(self) -> str: | |
pass | |
def get_guided_json(self, state: State) -> Dict[str, Any]: | |
pass | |
def use_tool(self) -> Any: | |
pass | |
# @log_function(logger) | |
def run(self, state: State) -> State: | |
# chat_counter(state) | |
all_expert_research = [] | |
meta_prompt = state["meta_prompt"][1].content | |
if state.get("expert_research"): | |
expert_research = state["expert_research"] | |
all_expert_research.extend(expert_research) | |
research_prompt = f"\n Your response must be delivered considering following research.\n ## Research\n {all_expert_research} " | |
user_input = f"{meta_prompt}\n{research_prompt}" | |
else: | |
user_input = meta_prompt | |
print(colored(f"\n\n DEBUG: We are running the NoToolExpert tool\n\n", 'red')) | |
state = self.invoke(state=state, user_input=user_input) | |
return state | |
class ToolExpert(BaseAgent[State]): | |
print(colored(f"\n\n DEBUG: We are running the ToolExpert tool\n\n", 'red')) | |
def __init__(self, model: str = None, server: str = None, temperature: float = 0, | |
model_endpoint: str = None, stop: str = None, location: str = None, hybrid: bool = False): | |
super().__init__(model, server, temperature, model_endpoint, stop, location, hybrid) | |
# print(f"\n\n DEBUG LOCATION: {self.location}") | |
self.llm = self.get_llm(json_model=False) | |
def get_prompt(self, state) -> str: | |
system_prompt = state["meta_prompt"][-1].content | |
return system_prompt | |
def process_response(self, response: Any, user_input: str = None, state: State = None) -> Dict[str, Union[str, dict]]: | |
if self.hybrid: | |
message = f"""Jar3d has completed its internet research. | |
Jar3d has generated a knowledge graph, you can view it here: https://neo4j.com/product/auradb/ | |
""" | |
else: | |
message = f"""Jar3d has completed its internet research. | |
""" | |
updates_conversation_history = { | |
"conversation_history": [ | |
{"role": "user", "content": user_input}, | |
{"role": "assistant", "content": f"{str(response)}"} | |
], | |
"expert_research": {"role": "assistant", "content": f"{str(response)}"}, | |
"progress_tracking": message | |
} | |
return updates_conversation_history | |
def get_conv_history(self, state: State) -> str: | |
pass | |
def get_user_input(self) -> str: | |
pass | |
def get_guided_json(self, state: State) -> Dict[str, Any]: | |
pass | |
def use_tool(self, mode: str, engine: str, tool_input: str, meta_prompt: str = None, query: list[str] = None, hybrid: bool = False) -> Any: | |
if mode == "serper": | |
if engine == "search": | |
results = serper_search(tool_input, self.location) | |
return {"results": results, "is_shopping": False} | |
elif engine == "shopping": | |
results = serper_shopping_search(tool_input, self.location) | |
return {"results": results, "is_shopping": True} | |
# elif engine == "scholar": | |
# results = serper_scholar_search(tool_input, self.location) | |
# return {"results": results, "is_shopping": False} | |
elif mode == "rag": | |
print(colored(f"\n\n DEBUG: We are running the Graph RAG TOOL!!\n\n", 'red')) | |
nodes = None | |
relationships = None | |
print(colored(f"\n\n DEBUG Retreival Mode: {hybrid}\n\n", 'green')) | |
results = run_rag(urls=tool_input, allowed_nodes=nodes, allowed_relationships=relationships, query=query, hybrid=self.hybrid) | |
return {"results": results, "is_shopping": False} | |
def generate_search_queries(self, meta_prompt: str, num_queries: int = 5) -> List[Dict[str, str]]: | |
print(colored(f"\n\n DEBUG: We are running the generate_search_queries tool\n\n", 'red')) | |
refine_query_template = """ | |
# Objective | |
Your mission is to systematically address your manager's instructions by determining | |
the most appropriate search queries to use **AND** to determine the best engine to use for each query. | |
Your engine choice is either "search" or "shopping". You must return either "search" or "shopping" for each query. | |
You will generate {num_queries} different search queries. | |
# Manager's Instructions | |
{manager_instructions} | |
# Flexible Search Algorithm for Simple and Complex Questions | |
1. Initial search: | |
- For a simple question: "[Question keywords]" | |
- For a complex topic: "[Main topic] overview" | |
2. For each subsequent search: | |
- Choose one of these strategies: | |
a. Specify: | |
Add a more specific term or aspect related to the topic. | |
b. Broaden: | |
Remove a specific term or add "general" or "overview" to the query. | |
c. Pivot: | |
Choose a different but related term from the topic. | |
d. Compare: | |
Add "vs" or "compared to" along with a related term. | |
e. Question: | |
Rephrase the query as a question by adding "what", "how", "why", etc. | |
# Response Format | |
**Return the following JSON:** | |
{{ | |
"search_queries": [ | |
{{"engine": "search", "query": "Query 1"}}, | |
{{"engine": "shopping", "query": "Query 2"}}, | |
... | |
{{"engine": "search", "query": "Query {num_queries}"}} | |
] | |
}} | |
Remember: | |
- Generate {num_queries} unique and diverse search queries. | |
- Each query should explore a different aspect or approach to the topic. | |
- Ensure the queries cover various aspects of the manager's instructions. | |
- The "engine" field should be "search" or "shopping" for each query. | |
- "search" best for general websearch. | |
- "shopping" best when you need to find products and prices. | |
""" | |
refine_query = self.get_llm(json_model=True) | |
refine_prompt = refine_query_template.format(manager_instructions=meta_prompt, num_queries=num_queries) | |
input_data = [ | |
{"role": "user", "content": "Generate search queries"}, | |
{"role": "assistant", "content": f"system_prompt:{refine_prompt}"} | |
] | |
guided_json = guided_json_search_query_two | |
if self.server == 'vllm': | |
refined_queries = refine_query.invoke(input_data, guided_json) | |
else: | |
print(colored(f"\n\n DEBUG: We are running the refine_query tool without vllm\n\n", 'red')) | |
refined_queries = refine_query.invoke(input_data) | |
refined_queries_json = json.loads(refined_queries) | |
return refined_queries_json.get("search_queries", []) | |
def process_serper_result(self, query: Dict[str, str], serper_response: Dict[str, Any]) -> Dict[str, Any]: | |
print(colored(f"\n\n DEBUG: We are running the process_serper_result tool\n\n", 'red')) | |
best_url_template = """ | |
Given the Serper results and the search query, select the best URL. | |
# Search Query | |
{search_query} | |
# Serper Results | |
{serper_results} | |
**Return the following JSON:** | |
{{"best_url": The URL from the Serper results that aligns most with the search query.}} | |
""" | |
best_url = self.get_llm(json_model=True) | |
best_url_prompt = best_url_template.format(search_query=query["query"], serper_results=serper_response) | |
input_data = [ | |
{"role": "user", "content": serper_response}, | |
{"role": "assistant", "content": f"system_prompt:{best_url_prompt}"} | |
] | |
guided_json = guided_json_best_url_two | |
if self.server == 'vllm': | |
best_url = best_url.invoke(input_data, guided_json) | |
else: | |
print(colored(f"\n\n DEBUG: We are running the best_url tool without vllm\n\n", 'red')) | |
best_url = best_url.invoke(input_data) | |
best_url_json = json.loads(best_url) | |
return {"query": query, "url": best_url_json.get("best_url")} | |
def analyze_and_refine_queries( | |
self, | |
serper_results: List[Dict[str, Any]], | |
meta_prompt: str, | |
num_queries: int = 1 # Default to 1 query | |
) -> List[Dict[str, str]]: | |
""" | |
Analyzes the search results and generates refined search queries. | |
""" | |
print(colored(f"\n\n DEBUG: We are running the analyze_and_refine_queries tool\n\n", 'red')) | |
observations = [] | |
for result in serper_results: | |
results_content = result.get("results", {}) | |
if result.get("is_shopping"): | |
# Handle shopping results if necessary | |
shopping_results = results_content.get("shopping_results", []) | |
snippets = [f"{item.get('title', '')} - {item.get('price', '')}" for item in shopping_results] | |
else: | |
# Handle organic search results | |
organic_results = results_content.get("organic_results", []) | |
snippets = [item.get("snippet", "") for item in organic_results] | |
observations.extend(snippets) | |
# Include num_queries in the prompt to control the number of queries generated | |
analysis_prompt_template = """ | |
Based on the following search results, generate {num_queries} new search queries to further investigate the topic. | |
# Search Results | |
{observations} | |
# Manager's Instructions | |
{meta_prompt} | |
# Guidelines | |
- Identify gaps or missing information in the current search results. | |
- Generate queries that could fill these gaps or provide deeper insight. | |
- Provide diverse and relevant queries. | |
Provide the new search queries in a JSON format: | |
{{ | |
"search_queries": [ | |
{{"engine": "search", "query": "New Query 1"}}, | |
{{"engine": "shopping", "query": "New Query 2"}}, | |
... | |
{{"engine": "search", "query": "New Query {num_queries}"}} | |
] | |
}} | |
""" | |
analysis_prompt = analysis_prompt_template.format( | |
observations="\n".join(observations), | |
meta_prompt=meta_prompt, | |
num_queries=num_queries # Pass the num_queries to the prompt | |
) | |
analysis_llm = self.get_llm(json_model=True) | |
input_data = [ | |
{"role": "user", "content": "Analyze and refine search queries"}, | |
{"role": "assistant", "content": f"system_prompt:{analysis_prompt}"} | |
] | |
guided_json = guided_json_search_query_two | |
if self.server == 'vllm': | |
refined_queries = analysis_llm.invoke(input_data, guided_json) | |
else: | |
print(colored("\n\n DEBUG: We are running the analysis without vllm\n\n", 'red')) | |
refined_queries = analysis_llm.invoke(input_data) | |
# Parse the LLM's response | |
refined_queries_json = json.loads(refined_queries) | |
refined_queries_list = refined_queries_json.get("search_queries", []) | |
# Limit the number of queries returned to num_queries | |
return refined_queries_list[:num_queries] | |
def run(self, state: State) -> State: | |
meta_prompt = state["meta_prompt"][-1].content | |
print(colored(f"\n\n Meta-Prompt: {meta_prompt}\n\n", 'green')) | |
# Set up iterative search parameters | |
max_iterations = 5 # Define a maximum number of iterations to prevent infinite loops | |
iteration = 0 | |
# Initial search queries | |
search_queries = self.generate_search_queries(meta_prompt, num_queries=5) | |
all_serper_results = [] | |
all_best_urls = [] | |
while iteration < max_iterations: | |
print(colored(f"\n\n Iteration {iteration + 1}\n\n", 'yellow')) | |
iteration += 1 | |
# Use ThreadPoolExecutor to call Serper tool for each query in parallel | |
try: | |
with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(search_queries), 5)) as executor: | |
future_to_query = { | |
executor.submit( | |
self.use_tool, | |
"serper", | |
query["engine"], | |
query["query"], | |
None | |
): query for query in search_queries | |
} | |
serper_results = [] | |
for future in concurrent.futures.as_completed(future_to_query): | |
query = future_to_query[future] | |
try: | |
result = future.result() | |
serper_results.append(result) | |
except Exception as exc: | |
print(colored(f"Error processing query {query}: {exc}", 'red')) | |
serper_results.append(None) | |
except Exception as e: | |
print(colored(f"Error in threading: {str(e)}. Falling back to non-parallel processing.", 'red')) | |
serper_results = [self.use_tool("serper", query["engine"], query["query"], None) for query in search_queries] | |
# Collect and store all results | |
all_serper_results.extend(zip(search_queries, serper_results)) | |
# Process Serper results to get best URLs | |
try: | |
with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(serper_results), 5)) as executor: | |
future_to_query = { | |
executor.submit( | |
self.process_serper_result, | |
query, | |
result["results"] if result else {} | |
): query for query, result in zip(search_queries, serper_results) | |
} | |
best_url_results = [] | |
for future in concurrent.futures.as_completed(future_to_query): | |
query = future_to_query[future] | |
try: | |
result = future.result() | |
best_url_results.append(result) | |
except Exception as exc: | |
print(colored(f"Error processing result for query {query}: {exc}", 'red')) | |
best_url_results.append(None) | |
except Exception as e: | |
print(colored(f"Error in threading: {str(e)}. Falling back to non-parallel processing for best URLs.", 'red')) | |
best_url_results = [ | |
self.process_serper_result(query, result["results"] if result else {}) | |
for query, result in zip(search_queries, serper_results) | |
] | |
# Collect all best URLs | |
all_best_urls.extend(best_url_results) | |
# Remove duplicates while preserving query alignment | |
url_query_pairs = [] | |
seen_urls = set() | |
for item in all_best_urls: | |
url = item["url"] | |
query = item["query"]["query"] | |
engine = item["query"]["engine"] | |
if url and engine == "search" and url not in seen_urls: | |
url_query_pairs.append({"url": url, "query": query}) | |
seen_urls.add(url) | |
# Extract unique URLs and queries while preserving alignment | |
unique_urls = [item["url"] for item in url_query_pairs] | |
unique_queries = [item["query"] for item in url_query_pairs] | |
print(colored("\n\n Sourced data from {} sources:".format(len(unique_urls)), 'yellow')) | |
print(colored(f"\n\n Search Queries {unique_queries}", 'yellow')) | |
for i, url in enumerate(unique_urls, 1): | |
print(colored(" {}. {}".format(i, url), 'green')) | |
# Analyze search results and refine the queries | |
refined_search_queries = self.analyze_and_refine_queries( | |
[result for _, result in all_serper_results], | |
meta_prompt, | |
num_queries=1 # Limit to 1 query per iteration | |
) | |
# Check if refinement is needed | |
if not refined_search_queries or refined_search_queries == search_queries: | |
# No further refinement possible | |
break | |
# Update search queries for the next iteration | |
search_queries = refined_search_queries | |
# After iterations, process the collected results | |
try: | |
scraper_response = self.use_tool( | |
mode="rag", | |
engine=None, | |
tool_input=unique_urls, | |
meta_prompt=meta_prompt, | |
query=unique_queries # Pass aligned queries | |
) | |
except Exception as e: | |
scraper_response = {"results": f"Error {e}: Failed to scrape results", "is_shopping": False} | |
updates = self.process_response(scraper_response, user_input="Research") | |
for key, value in updates.items(): | |
state = self.update_state(key, value, state) | |
return state | |
class Router(BaseAgent[State]): | |
def __init__(self, model: str = None, server: str = None, temperature: float = 0, | |
model_endpoint: str = None, stop: str = None): | |
super().__init__(model, server, temperature, model_endpoint, stop) | |
self.llm = self.get_llm(json_model=True) | |
def get_prompt(self, state) -> str: | |
system_prompt = state["meta_prompt"][-1].content | |
return system_prompt | |
def process_response(self, response: Any, user_input: str = None, state: State = None) -> Dict[str, Union[str, dict]]: | |
updates_conversation_history = { | |
"router_decision": [ | |
{"role": "user", "content": user_input}, | |
{"role": "assistant", "content": f"{str(response)}"} | |
], | |
"progress_tracking": f"Jar3d has routed to an expert 🤓" | |
}, | |
return updates_conversation_history | |
def get_conv_history(self, state: State) -> str: | |
pass | |
def get_user_input(self) -> str: | |
pass | |
def get_guided_json(self, state: State) -> Dict[str, Any]: | |
pass | |
def use_tool(self, tool_input: str, mode: str) -> Any: | |
pass | |
# @log_function(logger) | |
def run(self, state: State) -> State: | |
router_template = """ | |
Given these instructions from your manager. | |
# Response from Manager | |
{manager_response} | |
**Return the following JSON:** | |
{{""router_decision: Return the next agent to pass control to.}} | |
**strictly** adhere to these **guidelines** for routing. | |
If your maneger's response contains "Expert Internet Researcher", return "tool_expert". | |
If your manager's response contains "Expert Planner" or "Expert Writer", return "no_tool_expert". | |
If your manager's response contains '>> FINAL ANSWER:', return "end_chat". | |
""" | |
system_prompt = router_template.format(manager_response=state["meta_prompt"][-1].content) | |
input = [ | |
{"role": "user", "content": ""}, | |
{"role": "assistant", "content": f"system_prompt:{system_prompt}"} | |
] | |
router = self.get_llm(json_model=True) | |
if self.server == 'vllm': | |
guided_json = guided_json_router_decision | |
router_response = router.invoke(input, guided_json) | |
else: | |
router_response = router.invoke(input) | |
router_response = json.loads(router_response) | |
router_response = router_response.get("router_decision") | |
state = self.update_state("router_decision", router_response, state) | |
return state | |