data-village / crewai /agent.py
eaglelandsonce's picture
Upload 30 files
dc13c57 verified
raw
history blame
8.45 kB
import uuid
from typing import Any, List, Optional
from langchain.agents.format_scratchpad import format_log_to_str
from langchain.agents.agent import RunnableAgent
from langchain.memory import ConversationSummaryMemory
from langchain.tools.render import render_text_description
from langchain_core.runnables.config import RunnableConfig
from langchain_openai import ChatOpenAI
from langchain_core.language_models import BaseLanguageModel
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
PrivateAttr,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
from crewai.agents import (
CacheHandler,
CrewAgentExecutor,
CrewAgentOutputParser,
ToolsHandler,
)
from crewai.utilities import I18N, Logger, Prompts, RPMController
class Agent(BaseModel):
"""Represents an agent in a system.
Each agent has a role, a goal, a backstory, and an optional language model (llm).
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents.
Attributes:
agent_executor: An instance of the CrewAgentExecutor class.
role: The role of the agent.
goal: The objective of the agent.
backstory: The backstory of the agent.
llm: The language model that will run the agent.
max_iter: Maximum number of iterations for an agent to execute a task.
memory: Whether the agent should have memory or not.
max_rpm: Maximum number of requests per minute for the agent execution to be respected.
verbose: Whether the agent execution should be in verbose mode.
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
tools: Tools at agents disposal
"""
__hash__ = object.__hash__ # type: ignore
_logger: Logger = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr(default=None)
_request_within_rpm_limit: Any = PrivateAttr(default=None)
model_config = ConfigDict(arbitrary_types_allowed=True)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
max_rpm: Optional[int] = Field(
default=None,
description="Maximum number of requests per minute for the agent execution to be respected.",
)
memory: bool = Field(
default=True, description="Whether the agent should have memory or not"
)
verbose: bool = Field(
default=False, description="Verbose mode for the Agent Execution"
)
allow_delegation: bool = Field(
default=True, description="Allow delegation of tasks to agents"
)
tools: List[Any] = Field(
default_factory=list, description="Tools at agents disposal"
)
max_iter: Optional[int] = Field(
default=15, description="Maximum iterations for an agent to execute a task"
)
agent_executor: InstanceOf[CrewAgentExecutor] = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
)
cache_handler: InstanceOf[CacheHandler] = Field(
default=CacheHandler(), description="An instance of the CacheHandler class."
)
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
llm: Any = Field(
default_factory=lambda: ChatOpenAI(
model="gpt-4",
),
description="Language model that will run the agent.",
)
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def set_private_attrs(self):
"""Set private attributes."""
self._logger = Logger(self.verbose)
if self.max_rpm and not self._rpm_controller:
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
return self
@model_validator(mode="after")
def check_agent_executor(self) -> "Agent":
"""Check if the agent executor is set."""
if not self.agent_executor:
self.set_cache_handler(self.cache_handler)
return self
def execute_task(
self,
task: str,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
"""Execute a task with the agent.
Args:
task: Task to execute.
context: Context to execute the task in.
tools: Tools to use for the task.
Returns:
Output of the agent
"""
if context:
task = self.i18n.slice("task_with_context").format(
task=task, context=context
)
tools = tools or self.tools
self.agent_executor.tools = tools
result = self.agent_executor.invoke(
{
"input": task,
"tool_names": self.__tools_names(tools),
"tools": render_text_description(tools),
},
RunnableConfig(callbacks=[self.tools_handler]),
)["output"]
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
return result
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
"""Set the cache handler for the agent.
Args:
cache_handler: An instance of the CacheHandler class.
"""
self.cache_handler = cache_handler
self.tools_handler = ToolsHandler(cache=self.cache_handler)
self.__create_agent_executor()
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
Args:
rpm_controller: An instance of the RPMController class.
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.__create_agent_executor()
def __create_agent_executor(self) -> None:
"""Create an agent executor for the agent.
Returns:
An instance of the CrewAgentExecutor class.
"""
agent_args = {
"input": lambda x: x["input"],
"tools": lambda x: x["tools"],
"tool_names": lambda x: x["tool_names"],
"agent_scratchpad": lambda x: format_log_to_str(x["intermediate_steps"]),
}
executor_args = {
"i18n": self.i18n,
"tools": self.tools,
"verbose": self.verbose,
"handle_parsing_errors": True,
"max_iterations": self.max_iter,
}
if self._rpm_controller:
executor_args["request_within_rpm_limit"] = (
self._rpm_controller.check_or_wait
)
if self.memory:
summary_memory = ConversationSummaryMemory(
llm=self.llm, input_key="input", memory_key="chat_history"
)
executor_args["memory"] = summary_memory
agent_args["chat_history"] = lambda x: x["chat_history"]
prompt = Prompts(i18n=self.i18n).task_execution_with_memory()
else:
prompt = Prompts(i18n=self.i18n).task_execution()
execution_prompt = prompt.partial(
goal=self.goal,
role=self.role,
backstory=self.backstory,
)
bind = self.llm.bind(stop=[self.i18n.slice("observation")])
inner_agent = (
agent_args
| execution_prompt
| bind
| CrewAgentOutputParser(
tools_handler=self.tools_handler,
cache=self.cache_handler,
i18n=self.i18n,
)
)
self.agent_executor = CrewAgentExecutor(
agent=RunnableAgent(runnable=inner_agent), **executor_args
)
@staticmethod
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])