harshalmore31's picture
Synced repo using 'sync_with_huggingface' Github Action
d8d14f1 verified

A newer version of the Gradio SDK is available: 5.22.0

Upgrade

Task Class Documentation

The Task class is a pivotal component designed for managing tasks in a sequential workflow. This class allows for the execution of tasks using various agents, which can be callable objects or specific instances of the Agent class. It supports the scheduling of tasks, handling their dependencies, and setting conditions and actions that govern their execution.

Key features of the Task class include:

  • Executing tasks with specified agents and handling their results.
  • Scheduling tasks to run at specified times.
  • Setting triggers, actions, and conditions for tasks.
  • Managing task dependencies and priorities.
  • Providing a history of task executions for tracking purposes.

Class Definition

The Task class is defined as follows:

Attributes

Attribute Type Description
agent Union[Callable, Agent] The agent or callable object to run the task.
description str Description of the task.
result Any Result of the task.
history List[Any] History of the task.
schedule_time datetime Time to schedule the task.
scheduler sched.scheduler Scheduler to schedule the task.
trigger Callable Trigger to run the task.
action Callable Action to run the task.
condition Callable Condition to run the task.
priority int Priority of the task.
dependencies List[Task] List of tasks that need to be completed before this task can be executed.
args List[Any] Arguments to pass to the agent or callable object.
kwargs Dict[str, Any] Keyword arguments to pass to the agent or callable object.

Methods

execute(self, *args, **kwargs)

Executes the task by calling the agent or model with the specified arguments and keyword arguments. If a condition is set, the task will only execute if the condition returns True.

Parameters

  • args: Arguments to pass to the agent or callable object.
  • kwargs: Keyword arguments to pass to the agent or callable object.

Examples

>>> from swarms.structs import Task, Agent
>>> from swarm_models import OpenAIChat
>>> agent = Agent(llm=OpenAIChat(openai_api_key=""), max_loops=1, dashboard=False)
>>> task = Task(description="What's the weather in Miami?", agent=agent)
>>> task.run()
>>> task.result

handle_scheduled_task(self)

Handles the execution of a scheduled task. If the schedule time is not set or has already passed, the task is executed immediately. Otherwise, the task is scheduled to be executed at the specified schedule time.

Examples

>>> task.schedule_time = datetime.now() + timedelta(seconds=10)
>>> task.handle_scheduled_task()

set_trigger(self, trigger: Callable)

Sets the trigger for the task.

Parameters

  • trigger (Callable): The trigger to set.

Examples

>>> def my_trigger():
>>>     print("Trigger executed")
>>> task.set_trigger(my_trigger)

set_action(self, action: Callable)

Sets the action for the task.

Parameters

  • action (Callable): The action to set.

Examples

>>> def my_action():
>>>     print("Action executed")
>>> task.set_action(my_action)

set_condition(self, condition: Callable)

Sets the condition for the task.

Parameters

  • condition (Callable): The condition to set.

Examples

>>> def my_condition():
>>>     print("Condition checked")
>>>     return True
>>> task.set_condition(my_condition)

is_completed(self)

Checks whether the task has been completed.

Returns

  • bool: True if the task has been completed, False otherwise.

Examples

>>> task.is_completed()

add_dependency(self, task)

Adds a task to the list of dependencies.

Parameters

  • task (Task): The task to add as a dependency.

Examples

>>> dependent_task = Task(description="Dependent Task")
>>> task.add_dependency(dependent_task)

set_priority(self, priority: int)

Sets the priority of the task.

Parameters

  • priority (int): The priority to set.

Examples

>>> task.set_priority(5)

check_dependency_completion(self)

Checks whether all the dependencies have been completed.

Returns

  • bool: True if all the dependencies have been completed, False otherwise.

Examples

>>> task.check_dependency_completion()

context(self, task: "Task" = None, context: List["Task"] = None, *args, **kwargs)

Sets the context for the task. For a sequential workflow, it sequentially adds the context of the previous task in the list.

Parameters

  • task (Task, optional): The task whose context is to be set.
  • context (List[Task], optional): The list of tasks to set the context.

Examples

>>> task1 = Task(description="Task 1")
>>> task2 = Task(description="Task 2")
>>> task2.context(context=[task1])

Usage Examples

Basic Usage

import os
from dotenv import load_dotenv
from swarms import Agent, OpenAIChat, Task

# Load the environment variables
load_dotenv()

# Define a function to be used as the action
def my_action():
    print("Action executed")

# Define a function to be used as the condition
def my_condition():
    print("Condition checked")
    return True

# Create an agent
agent = Agent(
    llm=OpenAIChat(openai_api_key=os.environ["OPENAI_API_KEY"]),
    max_loops=1,
    dashboard=False,
)

# Create a task
task = Task(
    description="Generate a report on the top 3 biggest expenses for small businesses and how businesses can save 20%",
    agent=agent,
)

# Set the action and condition
task.set_action(my_action)
task.set_condition(my_condition)

# Execute the task
print("Executing task...")
task.run()

# Check if the task is completed
if task.is_completed():
    print("Task completed")
else:
    print("Task not completed")

# Output the result of the task
print(f"Task result: {task.result}")

Scheduled Task Execution

from datetime import datetime, timedelta
import os
from dotenv import load_dotenv
from swarms import Agent, OpenAIChat, Task

# Load the environment variables
load_dotenv()

# Create an agent
agent = Agent(
    llm=OpenAIChat(openai_api_key=os.environ["OPENAI_API_KEY"]),
    max_loops=1,
    dashboard=False,
)

# Create a task
task = Task(
    description="Scheduled task example",
    agent=agent,
    schedule_time=datetime.now() + timedelta(seconds=10)
)

# Handle scheduled task
task.handle_scheduled_task()

Task with Dependencies

import os
from dotenv import load_dotenv
from swarms import Agent, OpenAIChat, Task

# Load the environment variables
load_dotenv()

# Create agents
agent1 = Agent(
    llm=OpenAIChat(openai_api_key=os.environ["OPENAI_API_KEY"]),
    max_loops=1,
    dashboard=False,
)
agent2 = Agent(
    llm=OpenAIChat(openai_api_key=os.environ["OPENAI_API_KEY"]),
    max_loops=1,
    dashboard=False,
)

# Create tasks
task1 = Task(description="First task", agent=agent1)
task2 = Task(description="Second task", agent=agent2)

# Add dependency
task2.add_dependency(task1)

# Execute tasks
print("Executing first task...")
task1.run()

print("Executing second task...")
task2.run()

# Check if tasks are completed
print(f"Task 1 completed: {task1.is_completed()}")
print(f"Task 2 completed: {task2.is_completed()}")

Task Context

import os
from dotenv import load_dotenv
from swarms import Agent, OpenAIChat, Task

# Load the environment variables
load_dotenv()

# Create an agent
agent = Agent(
    llm=OpenAIChat(openai_api_key=os.environ["OPENAI_API_KEY"]),
    max_loops

=1,
    dashboard=False,
)

# Create tasks
task1 = Task(description="First task", agent=agent)
task2 = Task(description="Second task", agent=agent)

# Set context for the second task
task2.context(context=[task1])

# Execute tasks
print("Executing first task...")
task1.run()

print("Executing second task...")
task2.run()

# Output the context of the second task
print(f"Task 2 context: {task2.history}")