feat(classic): add TodoComponent with LLM-powered decomposition

Add a task management component modeled after Claude Code's TodoWrite:
- TodoItem with recursive sub_items for hierarchical task structure
- todo_write: atomic list replacement with sub-items support
- todo_read: retrieve current todos with nested structure
- todo_clear: clear all todos
- todo_decompose: use smart LLM to break down tasks into sub-steps

Features:
- Hierarchical task tracking with independent status per sub-item
- MessageProvider shows todos in LLM context with proper indentation
- DirectiveProvider adds best practices for task management
- Graceful fallback when LLM provider not configured

Integrates with:
- original_autogpt Agent (full LLM decomposition support)
- ForgeAgent (basic task tracking, no decomposition)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Nicholas Tindle
2026-01-18 18:49:48 -06:00
parent 0adbc0bd05
commit 4c264b7ae9
9 changed files with 1220 additions and 80 deletions

View File

@@ -19,7 +19,14 @@ from forge.agent_protocol.models.task import (
TaskRequestBody,
)
from forge.command.command import Command
from forge.components.archive_handler import ArchiveHandlerComponent
from forge.components.clipboard import ClipboardComponent
from forge.components.data_processor import DataProcessorComponent
from forge.components.http_client import HTTPClientComponent
from forge.components.math_utils import MathUtilsComponent
from forge.components.system.system import SystemComponent
from forge.components.text_utils import TextUtilsComponent
from forge.components.todo import TodoComponent
from forge.config.ai_profile import AIProfile
from forge.file_storage.base import FileStorage
from forge.llm.prompting.schema import ChatPrompt
@@ -82,6 +89,19 @@ class ForgeAgent(ProtocolAgent, BaseAgent):
# System component provides "finish" command and adds some prompt information
self.system = SystemComponent()
# Todo component provides task management for tracking multi-step work
# Note: llm_provider not available in ForgeAgent, so todo_decompose won't work
# For full functionality, use original_autogpt's Agent which has LLM access
self.todo = TodoComponent()
# Utility components
self.archive_handler = ArchiveHandlerComponent(workspace)
self.clipboard = ClipboardComponent()
self.data_processor = DataProcessorComponent()
self.http_client = HTTPClientComponent()
self.math_utils = MathUtilsComponent()
self.text_utils = TextUtilsComponent()
async def create_task(self, task_request: TaskRequestBody) -> Task:
"""
The agent protocol, which is the core of the Forge,

View File

@@ -0,0 +1,11 @@
"""Todo component for task management."""
from .todo import TodoComponent, TodoConfiguration, TodoItem, TodoList, TodoStatus
__all__ = [
"TodoComponent",
"TodoConfiguration",
"TodoItem",
"TodoList",
"TodoStatus",
]

View File

@@ -0,0 +1,556 @@
"""
Todo Component - Task management for autonomous agents.
A simple, effective task management system modeled after Claude Code's TodoWrite tool.
Agents use this to track multi-step tasks naturally and frequently.
Features:
- Hierarchical task structure with sub-items
- Smart LLM-based task decomposition
- Status tracking at all levels
"""
import json
import logging
from typing import TYPE_CHECKING, Any, Iterator, Literal, Optional
from pydantic import BaseModel, ConfigDict, Field
from forge.agent.components import ConfigurableComponent
from forge.agent.protocols import CommandProvider, DirectiveProvider, MessageProvider
from forge.command import Command, command
from forge.llm.providers import ChatMessage
from forge.models.json_schema import JSONSchema
if TYPE_CHECKING:
from forge.llm.providers import MultiProvider
logger = logging.getLogger(__name__)
# Status type
TodoStatus = Literal["pending", "in_progress", "completed"]
# System prompt for task decomposition
DECOMPOSE_SYSTEM_PROMPT = """You are a task decomposition specialist. Your job is to break down a task into actionable sub-steps.
Current Plan Context:
{current_todos}
Task to Decompose:
{task_content}
Additional Context:
{context}
Instructions:
1. Analyze the task and break it into 3-7 concrete sub-steps
2. Each sub-step should be actionable and specific
3. Sub-steps should be in logical order
4. Keep sub-steps concise (1 line each)
5. Generate both imperative (content) and present continuous (active_form) versions
Respond with ONLY a JSON object (no markdown, no explanation):
{{"sub_items": [{{"content": "Do X", "active_form": "Doing X"}}, {{"content": "Do Y", "active_form": "Doing Y"}}], "summary": "Brief explanation of the breakdown"}}"""
class TodoItem(BaseModel):
"""A single todo item with optional nested sub-items."""
content: str = Field(..., description="Imperative form: 'Fix the bug'")
status: TodoStatus = Field(default="pending", description="Task status")
active_form: str = Field(
..., description="Present continuous form: 'Fixing the bug'"
)
sub_items: list["TodoItem"] = Field(
default_factory=list, description="Nested sub-tasks"
)
model_config = ConfigDict(frozen=False)
# Rebuild model to resolve forward reference
TodoItem.model_rebuild()
class TodoList(BaseModel):
"""The complete todo list."""
items: list[TodoItem] = Field(default_factory=list)
model_config = ConfigDict(frozen=False)
class TodoConfiguration(BaseModel):
"""Configuration for the Todo component."""
max_items: int = Field(default=50, description="Maximum number of todos")
show_in_prompt: bool = Field(
default=True, description="Whether to include todos in LLM context"
)
decompose_model: Optional[str] = Field(
default=None, description="Model for decomposition (defaults to smart_llm)"
)
model_config = ConfigDict(frozen=False)
class TodoComponent(
DirectiveProvider,
CommandProvider,
MessageProvider,
ConfigurableComponent[TodoConfiguration],
):
"""
Task management component for tracking multi-step tasks.
Features:
- Hierarchical todo list with sub-items
- Atomic updates (replace entire list)
- Three statuses: pending, in_progress, completed
- Dual descriptions (imperative + active form)
- Smart LLM-based task decomposition
- Visible in LLM context for awareness
"""
config_class = TodoConfiguration
def __init__(
self,
llm_provider: Optional["MultiProvider"] = None,
smart_llm: Optional[str] = None,
config: Optional[TodoConfiguration] = None,
):
ConfigurableComponent.__init__(self, config)
self._todos = TodoList()
self._llm_provider = llm_provider
self._smart_llm = smart_llm
# -------------------------------------------------------------------------
# DirectiveProvider Implementation
# -------------------------------------------------------------------------
def get_resources(self) -> Iterator[str]:
yield "A todo list to track and manage multi-step tasks. Use frequently!"
def get_best_practices(self) -> Iterator[str]:
yield "Use todo_write when working on multi-step tasks to track progress"
yield "Mark todos as in_progress before starting work on them"
yield "Mark todos as completed immediately after finishing, not in batches"
yield "Only have ONE todo as in_progress at a time"
# -------------------------------------------------------------------------
# MessageProvider Implementation
# -------------------------------------------------------------------------
def _format_todo_item(self, item: TodoItem, indent: int = 0) -> list[str]:
"""Format a todo item with its sub-items recursively."""
lines = []
prefix = " " * indent
if item.status == "completed":
lines.append(f"{prefix}- [x] {item.content}")
elif item.status == "in_progress":
lines.append(f"{prefix}- [~] {item.active_form}")
else:
lines.append(f"{prefix}- [ ] {item.content}")
# Recursively format sub-items
for sub in item.sub_items:
lines.extend(self._format_todo_item(sub, indent + 1))
return lines
def _get_current_todos_text(self) -> str:
"""Get a text representation of current todos for the decomposition prompt."""
if not self._todos.items:
return "No current todos."
lines = []
for i, item in enumerate(self._todos.items):
lines.extend(self._format_todo_item(item))
return "\n".join(lines)
def get_messages(self) -> Iterator[ChatMessage]:
if not self.config.show_in_prompt or not self._todos.items:
return
in_progress = [t for t in self._todos.items if t.status == "in_progress"]
pending = [t for t in self._todos.items if t.status == "pending"]
completed = [t for t in self._todos.items if t.status == "completed"]
lines = ["## Your Todo List\n"]
# Show in-progress first (most important) with sub-items
if in_progress:
lines.append("**Currently working on:**")
for todo in in_progress:
lines.extend(self._format_todo_item(todo))
# Show pending with sub-items
if pending:
lines.append("\n**Pending:**")
for todo in pending:
lines.extend(self._format_todo_item(todo))
# Show completed (brief summary)
if completed:
lines.append(f"\n**Completed:** {len(completed)} task(s)")
yield ChatMessage.system("\n".join(lines))
# -------------------------------------------------------------------------
# Helper Methods
# -------------------------------------------------------------------------
def _parse_todo_item(
self, item: dict, path: str = "Item"
) -> tuple[Optional[TodoItem], Optional[str]]:
"""
Recursively parse a dict into a TodoItem with sub_items.
Returns (TodoItem, None) on success or (None, error_message) on failure.
"""
# Check required fields
if not item.get("content"):
return None, f"{path}: 'content' is required and must be non-empty"
if not item.get("active_form"):
return None, f"{path}: 'active_form' is required and must be non-empty"
if item.get("status") not in ("pending", "in_progress", "completed"):
return (
None,
f"{path}: 'status' must be one of: pending, in_progress, completed",
)
# Parse sub_items recursively
sub_items = []
raw_sub_items = item.get("sub_items", [])
if raw_sub_items:
for j, sub_item in enumerate(raw_sub_items):
parsed, error = self._parse_todo_item(
sub_item, f"{path}.sub_items[{j}]"
)
if error:
return None, error
if parsed:
sub_items.append(parsed)
return (
TodoItem(
content=item["content"],
status=item["status"],
active_form=item["active_form"],
sub_items=sub_items,
),
None,
)
def _serialize_todo_item(self, item: TodoItem) -> dict:
"""
Recursively serialize a TodoItem to a dict including sub_items.
"""
result = {
"content": item.content,
"status": item.status,
"active_form": item.active_form,
}
if item.sub_items:
result["sub_items"] = [
self._serialize_todo_item(sub) for sub in item.sub_items
]
return result
# -------------------------------------------------------------------------
# CommandProvider Implementation
# -------------------------------------------------------------------------
def get_commands(self) -> Iterator[Command]:
yield self.todo_write
yield self.todo_read
yield self.todo_clear
yield self.todo_decompose
@command(
names=["todo_write"],
parameters={
"todos": JSONSchema(
type=JSONSchema.Type.ARRAY,
description=(
"The complete todo list. Each item must have: "
"'content' (imperative form like 'Fix bug'), "
"'status' (pending|in_progress|completed), "
"'active_form' (present continuous like 'Fixing bug'). "
"Optional: 'sub_items' (array of nested todo items)"
),
items=JSONSchema(
type=JSONSchema.Type.OBJECT,
properties={
"content": JSONSchema(
type=JSONSchema.Type.STRING,
description="Imperative form of the task",
required=True,
),
"status": JSONSchema(
type=JSONSchema.Type.STRING,
description="Task status: pending, in_progress, or completed",
enum=["pending", "in_progress", "completed"],
required=True,
),
"active_form": JSONSchema(
type=JSONSchema.Type.STRING,
description="Present continuous form (shown when in_progress)",
required=True,
),
"sub_items": JSONSchema(
type=JSONSchema.Type.ARRAY,
description="Optional nested sub-tasks (recursive structure)",
required=False,
),
},
),
required=True,
),
},
)
def todo_write(self, todos: list[dict]) -> dict:
"""
Replace the entire todo list with a new list.
This is the primary command for managing todos. Use it to:
- Create initial todos when starting a multi-step task
- Mark tasks as in_progress when you start working on them
- Mark tasks as completed when done
- Add new tasks discovered during work
- Remove tasks that are no longer relevant
- Update sub-items created by todo_decompose
The entire list is replaced atomically, ensuring consistency.
Supports nested sub_items for hierarchical task tracking.
"""
# Validate item count
if len(todos) > self.config.max_items:
return {
"status": "error",
"message": f"Too many items. Maximum is {self.config.max_items}.",
}
# Validate and convert items recursively
validated_items = []
for i, item in enumerate(todos):
parsed, error = self._parse_todo_item(item, f"Item {i}")
if error:
return {
"status": "error",
"message": error,
}
if parsed:
validated_items.append(parsed)
# Count in_progress items and warn if more than one
in_progress_count = sum(1 for t in validated_items if t.status == "in_progress")
warning = None
if in_progress_count > 1:
warning = (
f"Warning: {in_progress_count} tasks are in_progress. "
"Best practice is to have only ONE task in_progress at a time."
)
logger.warning(warning)
# Replace the list
self._todos = TodoList(items=validated_items)
# Build response
pending = sum(1 for t in validated_items if t.status == "pending")
completed = sum(1 for t in validated_items if t.status == "completed")
response = {
"status": "success",
"item_count": len(validated_items),
"pending": pending,
"in_progress": in_progress_count,
"completed": completed,
}
if warning:
response["warning"] = warning
return response
@command(names=["todo_read"])
def todo_read(self) -> dict:
"""
Get the current todo list.
Returns all todos with their current statuses and sub-items.
Useful for reviewing progress or understanding current state.
"""
return {
"status": "success",
"items": [self._serialize_todo_item(t) for t in self._todos.items],
"summary": {
"pending": sum(1 for t in self._todos.items if t.status == "pending"),
"in_progress": sum(
1 for t in self._todos.items if t.status == "in_progress"
),
"completed": sum(
1 for t in self._todos.items if t.status == "completed"
),
},
}
@command(names=["todo_clear"])
def todo_clear(self) -> dict:
"""
Clear all todos.
Removes all items from the todo list.
Use when starting fresh or when the current task list is no longer relevant.
"""
count = len(self._todos.items)
self._todos = TodoList()
return {
"status": "success",
"message": f"Cleared {count} todo(s)",
}
@command(
names=["todo_decompose"],
parameters={
"item_index": JSONSchema(
type=JSONSchema.Type.INTEGER,
description="Index of the todo item to decompose (0-based)",
required=True,
),
"context": JSONSchema(
type=JSONSchema.Type.STRING,
description="Additional context to help guide the decomposition",
required=False,
),
},
)
async def todo_decompose(self, item_index: int, context: str = "") -> dict:
"""
Use the smart LLM to break down a todo item into actionable sub-steps.
This spawns a focused decomposition call with the current plan context.
The LLM analyzes the task and generates 3-7 concrete sub-steps.
Requires an LLM provider to be configured for this component.
"""
# Validate LLM availability
if not self._llm_provider or not self._smart_llm:
return {
"status": "error",
"message": "LLM provider not configured. Cannot decompose tasks.",
}
# Validate item index
if item_index < 0 or item_index >= len(self._todos.items):
return {
"status": "error",
"message": f"Invalid item_index {item_index}. Valid range: 0-{len(self._todos.items) - 1}",
}
target_item = self._todos.items[item_index]
# Check if already has sub-items
if target_item.sub_items:
return {
"status": "error",
"message": f"Item '{target_item.content}' already has {len(target_item.sub_items)} sub-items. Clear them first if you want to re-decompose.",
}
# Build the decomposition prompt
prompt_content = DECOMPOSE_SYSTEM_PROMPT.format(
current_todos=self._get_current_todos_text(),
task_content=target_item.content,
context=context or "No additional context provided.",
)
try:
from forge.llm.providers import ChatMessage
# Call the LLM for decomposition
response = await self._llm_provider.create_chat_completion(
model_prompt=[ChatMessage.user(prompt_content)],
model_name=self.config.decompose_model or self._smart_llm,
)
# Parse the JSON response
response_text = response.response.content
if not response_text:
return {
"status": "error",
"message": "LLM returned empty response",
}
# Try to extract JSON from response (handle potential markdown wrapping)
json_text = response_text.strip()
if json_text.startswith("```"):
# Remove markdown code blocks
lines = json_text.split("\n")
json_lines = []
in_code = False
for line in lines:
if line.startswith("```"):
in_code = not in_code
continue
if in_code or not line.startswith("```"):
json_lines.append(line)
json_text = "\n".join(json_lines)
decomposition = json.loads(json_text)
# Validate response structure
if "sub_items" not in decomposition:
return {
"status": "error",
"message": "LLM response missing 'sub_items' field",
}
# Create sub-items
new_sub_items = []
for sub in decomposition["sub_items"]:
if not sub.get("content") or not sub.get("active_form"):
continue
new_sub_items.append(
TodoItem(
content=sub["content"],
active_form=sub["active_form"],
status="pending",
)
)
if not new_sub_items:
return {
"status": "error",
"message": "LLM generated no valid sub-items",
}
# Update the target item with sub-items
target_item.sub_items = new_sub_items
return {
"status": "success",
"item": target_item.content,
"sub_items_count": len(new_sub_items),
"sub_items": [
{"content": s.content, "active_form": s.active_form}
for s in new_sub_items
],
"summary": decomposition.get("summary", "Task decomposed successfully"),
}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse LLM decomposition response: {e}")
return {
"status": "error",
"message": f"Failed to parse LLM response as JSON: {e}",
}
except Exception as e:
logger.error(f"Decomposition failed: {e}")
return {
"status": "error",
"message": f"Decomposition failed: {e}",
}

View File

@@ -13,9 +13,22 @@ from forge.config.workspace_settings import AgentPermissions, WorkspaceSettings
class ApprovalScope(str, Enum):
"""Scope of permission approval."""
AGENT = "agent" # y - this agent only
WORKSPACE = "workspace" # Y - all agents
DENY = "deny" # n - deny for session
ONCE = "once" # Allow this one time only (not saved)
AGENT = "agent" # Always allow for this agent
WORKSPACE = "workspace" # Always allow for all agents
DENY = "deny" # Deny this command
class UserFeedbackProvided(Exception):
"""Raised when user provides feedback instead of approving/denying a command.
This exception should be caught by the main loop to pass feedback to the agent
via do_not_execute() instead of executing the command.
"""
def __init__(self, feedback: str):
self.feedback = feedback
super().__init__(f"User provided feedback: {feedback}")
class CommandPermissionManager:
@@ -102,7 +115,10 @@ class CommandPermissionManager:
scope = self.prompt_fn(command_name, args_str, arguments)
pattern = self._generalize_pattern(command_name, args_str)
if scope == ApprovalScope.WORKSPACE:
if scope == ApprovalScope.ONCE:
# Allow this one time only, don't save anywhere
return True
elif scope == ApprovalScope.WORKSPACE:
self.workspace_settings.add_permission(pattern, self.workspace)
return True
elif scope == ApprovalScope.AGENT:

View File

@@ -0,0 +1 @@
# Tests package

View File

@@ -0,0 +1 @@
# Component tests package

View File

@@ -0,0 +1,547 @@
"""Tests for TodoComponent."""
import pytest
from forge.components.todo import TodoComponent, TodoConfiguration
@pytest.fixture
def todo_component():
"""Create a fresh TodoComponent for testing."""
return TodoComponent()
class TestTodoWrite:
"""Tests for the todo_write command."""
def test_write_empty_list(self, todo_component):
"""Writing an empty list should succeed."""
result = todo_component.todo_write([])
assert result["status"] == "success"
assert result["item_count"] == 0
assert result["pending"] == 0
assert result["in_progress"] == 0
assert result["completed"] == 0
def test_write_single_pending_todo(self, todo_component):
"""Writing a single pending todo should succeed."""
result = todo_component.todo_write(
[
{
"content": "Fix the bug",
"status": "pending",
"active_form": "Fixing the bug",
}
]
)
assert result["status"] == "success"
assert result["item_count"] == 1
assert result["pending"] == 1
assert result["in_progress"] == 0
def test_write_multiple_todos(self, todo_component):
"""Writing multiple todos with different statuses should succeed."""
result = todo_component.todo_write(
[
{
"content": "Research patterns",
"status": "completed",
"active_form": "Researching patterns",
},
{
"content": "Implement feature",
"status": "in_progress",
"active_form": "Implementing feature",
},
{
"content": "Write tests",
"status": "pending",
"active_form": "Writing tests",
},
]
)
assert result["status"] == "success"
assert result["item_count"] == 3
assert result["pending"] == 1
assert result["in_progress"] == 1
assert result["completed"] == 1
def test_write_replaces_entire_list(self, todo_component):
"""Writing should replace the entire list, not append."""
# First write
todo_component.todo_write(
[
{
"content": "Task 1",
"status": "pending",
"active_form": "Doing task 1",
}
]
)
# Second write should replace
result = todo_component.todo_write(
[
{
"content": "Task 2",
"status": "pending",
"active_form": "Doing task 2",
}
]
)
assert result["item_count"] == 1
# Verify only Task 2 exists
read_result = todo_component.todo_read()
assert len(read_result["items"]) == 1
assert read_result["items"][0]["content"] == "Task 2"
def test_write_warns_on_multiple_in_progress(self, todo_component):
"""Writing multiple in_progress items should include a warning."""
result = todo_component.todo_write(
[
{
"content": "Task 1",
"status": "in_progress",
"active_form": "Doing task 1",
},
{
"content": "Task 2",
"status": "in_progress",
"active_form": "Doing task 2",
},
]
)
assert result["status"] == "success"
assert "warning" in result
assert "2 tasks are in_progress" in result["warning"]
def test_write_validates_required_content(self, todo_component):
"""Writing without content should fail."""
result = todo_component.todo_write(
[
{
"content": "",
"status": "pending",
"active_form": "Doing something",
}
]
)
assert result["status"] == "error"
assert "content" in result["message"]
def test_write_validates_required_active_form(self, todo_component):
"""Writing without active_form should fail."""
result = todo_component.todo_write(
[
{
"content": "Fix bug",
"status": "pending",
"active_form": "",
}
]
)
assert result["status"] == "error"
assert "active_form" in result["message"]
def test_write_validates_status(self, todo_component):
"""Writing with invalid status should fail."""
result = todo_component.todo_write(
[
{
"content": "Fix bug",
"status": "invalid_status",
"active_form": "Fixing bug",
}
]
)
assert result["status"] == "error"
assert "status" in result["message"]
def test_write_enforces_max_items(self, todo_component):
"""Writing more items than max_items should fail."""
component = TodoComponent(TodoConfiguration(max_items=2))
result = component.todo_write(
[
{"content": "Task 1", "status": "pending", "active_form": "Task 1"},
{"content": "Task 2", "status": "pending", "active_form": "Task 2"},
{"content": "Task 3", "status": "pending", "active_form": "Task 3"},
]
)
assert result["status"] == "error"
assert "Too many items" in result["message"]
class TestTodoRead:
"""Tests for the todo_read command."""
def test_read_empty_list(self, todo_component):
"""Reading an empty list should return empty items."""
result = todo_component.todo_read()
assert result["status"] == "success"
assert result["items"] == []
assert result["summary"]["pending"] == 0
def test_read_after_write(self, todo_component):
"""Reading after writing should return the written items."""
todo_component.todo_write(
[
{
"content": "Fix bug",
"status": "pending",
"active_form": "Fixing bug",
}
]
)
result = todo_component.todo_read()
assert result["status"] == "success"
assert len(result["items"]) == 1
assert result["items"][0]["content"] == "Fix bug"
assert result["items"][0]["status"] == "pending"
assert result["items"][0]["active_form"] == "Fixing bug"
class TestTodoClear:
"""Tests for the todo_clear command."""
def test_clear_empty_list(self, todo_component):
"""Clearing an empty list should succeed."""
result = todo_component.todo_clear()
assert result["status"] == "success"
assert "Cleared 0 todo(s)" in result["message"]
def test_clear_populated_list(self, todo_component):
"""Clearing a populated list should remove all items."""
todo_component.todo_write(
[
{"content": "Task 1", "status": "pending", "active_form": "Task 1"},
{"content": "Task 2", "status": "pending", "active_form": "Task 2"},
]
)
result = todo_component.todo_clear()
assert result["status"] == "success"
assert "Cleared 2 todo(s)" in result["message"]
# Verify list is empty
read_result = todo_component.todo_read()
assert len(read_result["items"]) == 0
class TestProtocols:
"""Tests for protocol implementations."""
def test_get_resources(self, todo_component):
"""DirectiveProvider.get_resources should yield a resource."""
resources = list(todo_component.get_resources())
assert len(resources) == 1
assert "todo list" in resources[0].lower()
def test_get_best_practices(self, todo_component):
"""DirectiveProvider.get_best_practices should yield practices."""
practices = list(todo_component.get_best_practices())
assert len(practices) == 4
assert any("todo_write" in p for p in practices)
def test_get_commands(self, todo_component):
"""CommandProvider.get_commands should yield commands."""
commands = list(todo_component.get_commands())
command_names = [c.names[0] for c in commands]
assert "todo_write" in command_names
assert "todo_read" in command_names
assert "todo_clear" in command_names
def test_get_messages_empty_list(self, todo_component):
"""MessageProvider should not yield messages for empty list."""
messages = list(todo_component.get_messages())
assert len(messages) == 0
def test_get_messages_with_todos(self, todo_component):
"""MessageProvider should include todos in LLM context."""
todo_component.todo_write(
[
{
"content": "Implement feature",
"status": "in_progress",
"active_form": "Implementing feature",
},
{
"content": "Write tests",
"status": "pending",
"active_form": "Writing tests",
},
]
)
messages = list(todo_component.get_messages())
assert len(messages) == 1
content = messages[0].content
assert "Your Todo List" in content
assert "Currently working on" in content
assert "Implementing feature" in content
assert "Pending" in content
assert "Write tests" in content
def test_get_messages_respects_show_in_prompt_config(self):
"""MessageProvider should respect show_in_prompt config."""
component = TodoComponent(TodoConfiguration(show_in_prompt=False))
component.todo_write(
[{"content": "Task", "status": "pending", "active_form": "Task"}]
)
messages = list(component.get_messages())
assert len(messages) == 0
class TestConfiguration:
"""Tests for TodoConfiguration."""
def test_default_configuration(self):
"""Default configuration should have expected values."""
config = TodoConfiguration()
assert config.max_items == 50
assert config.show_in_prompt is True
def test_custom_configuration(self):
"""Custom configuration should be respected."""
component = TodoComponent(TodoConfiguration(max_items=10, show_in_prompt=False))
assert component.config.max_items == 10
assert component.config.show_in_prompt is False
class TestSubItems:
"""Tests for hierarchical sub-items support."""
def test_write_with_sub_items(self, todo_component):
"""Writing todos with sub_items should succeed."""
result = todo_component.todo_write(
[
{
"content": "Implement feature",
"status": "in_progress",
"active_form": "Implementing feature",
"sub_items": [
{
"content": "Design API",
"status": "completed",
"active_form": "Designing API",
},
{
"content": "Write code",
"status": "in_progress",
"active_form": "Writing code",
},
{
"content": "Add tests",
"status": "pending",
"active_form": "Adding tests",
},
],
}
]
)
assert result["status"] == "success"
assert result["item_count"] == 1
def test_read_returns_sub_items(self, todo_component):
"""Reading should return sub_items."""
todo_component.todo_write(
[
{
"content": "Main task",
"status": "in_progress",
"active_form": "Working on main task",
"sub_items": [
{
"content": "Sub task 1",
"status": "completed",
"active_form": "Doing sub task 1",
},
{
"content": "Sub task 2",
"status": "pending",
"active_form": "Doing sub task 2",
},
],
}
]
)
result = todo_component.todo_read()
assert result["status"] == "success"
assert len(result["items"]) == 1
assert "sub_items" in result["items"][0]
assert len(result["items"][0]["sub_items"]) == 2
assert result["items"][0]["sub_items"][0]["content"] == "Sub task 1"
assert result["items"][0]["sub_items"][0]["status"] == "completed"
def test_nested_sub_items(self, todo_component):
"""Writing deeply nested sub_items should succeed."""
result = todo_component.todo_write(
[
{
"content": "Level 1",
"status": "in_progress",
"active_form": "Level 1",
"sub_items": [
{
"content": "Level 2",
"status": "pending",
"active_form": "Level 2",
"sub_items": [
{
"content": "Level 3",
"status": "pending",
"active_form": "Level 3",
}
],
}
],
}
]
)
assert result["status"] == "success"
# Verify nested structure
read_result = todo_component.todo_read()
level1 = read_result["items"][0]
level2 = level1["sub_items"][0]
level3 = level2["sub_items"][0]
assert level3["content"] == "Level 3"
def test_sub_items_validation_error(self, todo_component):
"""Sub-items with invalid fields should fail validation."""
result = todo_component.todo_write(
[
{
"content": "Main task",
"status": "pending",
"active_form": "Main task",
"sub_items": [
{
"content": "", # Invalid: empty content
"status": "pending",
"active_form": "Sub task",
}
],
}
]
)
assert result["status"] == "error"
assert "sub_items" in result["message"]
def test_messages_include_sub_items(self, todo_component):
"""MessageProvider should format sub-items with indentation."""
todo_component.todo_write(
[
{
"content": "Main task",
"status": "in_progress",
"active_form": "Working on main task",
"sub_items": [
{
"content": "Sub completed",
"status": "completed",
"active_form": "Sub completed",
},
{
"content": "Sub pending",
"status": "pending",
"active_form": "Sub pending",
},
],
}
]
)
messages = list(todo_component.get_messages())
assert len(messages) == 1
content = messages[0].content
# Check parent is shown
assert "Working on main task" in content
# Check sub-items are shown (with their status indicators)
assert "[x] Sub completed" in content
assert "[ ] Sub pending" in content
class TestTodoDecompose:
"""Tests for the todo_decompose command."""
def test_decompose_without_llm_provider(self, todo_component):
"""Decompose should fail gracefully without LLM provider."""
todo_component.todo_write(
[
{
"content": "Complex task",
"status": "pending",
"active_form": "Complex task",
}
]
)
import asyncio
result = asyncio.get_event_loop().run_until_complete(
todo_component.todo_decompose(item_index=0)
)
assert result["status"] == "error"
assert "LLM provider not configured" in result["message"]
def test_decompose_invalid_index(self, todo_component):
"""Decompose with invalid index should fail."""
todo_component.todo_write(
[{"content": "Task", "status": "pending", "active_form": "Task"}]
)
import asyncio
result = asyncio.get_event_loop().run_until_complete(
todo_component.todo_decompose(item_index=5)
)
assert result["status"] == "error"
assert "Invalid item_index" in result["message"]
def test_decompose_empty_list(self, todo_component):
"""Decompose on empty list should fail."""
import asyncio
result = asyncio.get_event_loop().run_until_complete(
todo_component.todo_decompose(item_index=0)
)
assert result["status"] == "error"
def test_decompose_already_has_sub_items(self, todo_component):
"""Decompose should fail if item already has sub-items."""
todo_component.todo_write(
[
{
"content": "Task with subs",
"status": "pending",
"active_form": "Task with subs",
"sub_items": [
{
"content": "Existing sub",
"status": "pending",
"active_form": "Existing sub",
}
],
}
]
)
import asyncio
result = asyncio.get_event_loop().run_until_complete(
todo_component.todo_decompose(item_index=0)
)
assert result["status"] == "error"
assert "already has" in result["message"]
def test_get_commands_includes_decompose(self, todo_component):
"""CommandProvider should include todo_decompose command."""
commands = list(todo_component.get_commands())
command_names = [c.names[0] for c in commands]
assert "todo_decompose" in command_names

View File

@@ -5,6 +5,8 @@ import logging
from typing import TYPE_CHECKING, Any, ClassVar, Optional
import sentry_sdk
from pydantic import Field
from forge.agent.base import BaseAgent, BaseAgentConfiguration, BaseAgentSettings
from forge.agent.protocols import (
AfterExecute,
@@ -19,18 +21,24 @@ from forge.components.action_history import (
EpisodicActionHistory,
)
from forge.components.action_history.action_history import ActionHistoryConfiguration
from forge.components.archive_handler import ArchiveHandlerComponent
from forge.components.clipboard import ClipboardComponent
from forge.components.code_executor.code_executor import (
CodeExecutorComponent,
CodeExecutorConfiguration,
)
from forge.components.context.context import AgentContext, ContextComponent
from forge.components.data_processor import DataProcessorComponent
from forge.components.file_manager import FileManagerComponent
from forge.components.git_operations import GitOperationsComponent
from forge.components.http_client import HTTPClientComponent
from forge.components.image_gen import ImageGeneratorComponent
from forge.components.math_utils import MathUtilsComponent
from forge.components.system import SystemComponent
from forge.components.text_utils import TextUtilsComponent
from forge.components.todo import TodoComponent
from forge.components.user_interaction import UserInteractionComponent
from forge.components.watchdog import WatchdogComponent
from forge.components.todo import TodoComponent
from forge.components.web import WebSearchComponent, WebSeleniumComponent
from forge.file_storage.base import FileStorage
from forge.llm.prompting.schema import ChatPrompt
@@ -56,7 +64,6 @@ from forge.utils.exceptions import (
CommandExecutionError,
UnknownCommandError,
)
from pydantic import Field
from .prompt_strategies.one_shot import (
OneShotAgentActionProposal,
@@ -145,7 +152,16 @@ class Agent(BaseAgent[OneShotAgentActionProposal], Configurable[AgentSettings]):
app_config.app_data_dir,
)
self.context = ContextComponent(self.file_manager.workspace, settings.context)
self.todo = TodoComponent()
self.todo = TodoComponent(
llm_provider=llm_provider,
smart_llm=str(app_config.smart_llm),
)
self.archive_handler = ArchiveHandlerComponent(self.file_manager.workspace)
self.clipboard = ClipboardComponent()
self.data_processor = DataProcessorComponent()
self.http_client = HTTPClientComponent()
self.math_utils = MathUtilsComponent()
self.text_utils = TextUtilsComponent()
self.watchdog = WatchdogComponent(settings.config, settings.history).run_after(
ContextComponent
)

View File

@@ -28,7 +28,11 @@ from forge.logging.config import configure_logging
from forge.logging.utils import print_attribute, speak
from forge.models.action import ActionInterruptedByHuman, ActionProposal
from forge.models.utils import ModelWithSummary
from forge.permissions import ApprovalScope, CommandPermissionManager
from forge.permissions import (
ApprovalScope,
CommandPermissionManager,
UserFeedbackProvided,
)
from forge.utils.const import FINISH_COMMAND
from forge.utils.exceptions import AgentTerminated, InvalidAgentResponseError
@@ -113,17 +117,32 @@ async def run_auto_gpt(
Returns:
ApprovalScope indicating user's choice.
Raises:
UserFeedbackProvided: If user types feedback instead of choosing an option.
"""
response = clean_input(
f"\nAgent wants to execute:\n"
f" {cmd}({args_str})\n"
f"Allow? [y=this agent / Y=all agents / n=deny] "
print(f"\n{Fore.CYAN}{cmd}({args_str}){Style.RESET_ALL}")
print(
f" {Fore.GREEN}[1]{Style.RESET_ALL} Once "
f"{Fore.GREEN}[2]{Style.RESET_ALL} Always (agent) "
f"{Fore.GREEN}[3]{Style.RESET_ALL} Always (all) "
f"{Fore.RED}[4]{Style.RESET_ALL} Deny"
)
if response in ("Y", "YES", "all"):
return ApprovalScope.WORKSPACE
elif response.lower() in ("y", "yes"):
response = clean_input(" Choice or feedback: ")
if response == "1":
return ApprovalScope.ONCE
elif response == "2":
return ApprovalScope.AGENT
elif response == "3":
return ApprovalScope.WORKSPACE
elif response == "4":
return ApprovalScope.DENY
elif response.strip():
# Any other non-empty input is feedback for the agent
raise UserFeedbackProvided(response)
else:
# Empty input defaults to deny
return ApprovalScope.DENY
# Set up logging module
@@ -492,15 +511,12 @@ def _configure_llm_provider(config: AppConfig) -> MultiProvider:
def _get_cycle_budget(continuous_mode: bool, continuous_limit: int) -> int | float:
# Translate from the continuous_mode/continuous_limit config
# to a cycle_budget (maximum number of cycles to run without checking in with the
# user) and a count of cycles_remaining before we check in..
if continuous_mode:
cycle_budget = continuous_limit if continuous_limit else math.inf
else:
cycle_budget = 1
return cycle_budget
# Always run continuously - the permission manager handles per-command approval.
# The cycle budget is now only used for Ctrl+C handling graceful shutdown.
# If a limit is set, use it; otherwise run indefinitely.
if continuous_limit:
return continuous_limit
return math.inf
class UserFeedback(str, enum.Enum):
@@ -612,73 +628,29 @@ async def run_interaction_loop(
speak_mode=app_config.tts_config.speak_mode,
)
##################
# Get user input #
##################
# Permission manager handles per-command approval during execute()
handle_stop_signal()
if cycles_remaining == 1: # Last cycle
feedback_type, feedback, new_cycles_remaining = await get_user_feedback(
app_config,
ai_profile,
)
if feedback_type == UserFeedback.AUTHORIZE:
if new_cycles_remaining is not None:
# Case 1: User is altering the cycle budget.
if cycle_budget > 1:
cycle_budget = new_cycles_remaining + 1
# Case 2: User is running iteratively and
# has initiated a one-time continuous cycle
cycles_remaining = new_cycles_remaining + 1
else:
# Case 1: Continuous iteration was interrupted -> resume
if cycle_budget > 1:
logger.info(
f"The cycle budget is {cycle_budget}.",
extra={
"title": "RESUMING CONTINUOUS EXECUTION",
"title_color": Fore.MAGENTA,
},
)
# Case 2: The agent used up its cycle budget -> reset
cycles_remaining = cycle_budget + 1
logger.info(
"-=-=-=-=-=-=-= COMMAND AUTHORISED BY USER -=-=-=-=-=-=-=",
extra={"color": Fore.MAGENTA},
)
elif feedback_type == UserFeedback.EXIT:
logger.warning("Exiting...")
exit()
else: # user_feedback == UserFeedback.TEXT
pass
else:
feedback = ""
# First log new-line so user can differentiate sections better in console
print()
if cycles_remaining != math.inf:
# Print authorized commands left value
print_attribute(
"AUTHORIZED_COMMANDS_LEFT", cycles_remaining, title_color=Fore.CYAN
)
###################
# Execute Command #
###################
# Decrement the cycle counter first to reduce the likelihood of a SIGINT
# happening during command execution, setting the cycles remaining to 1,
# and then having the decrement set it to 0, exiting the application.
if not feedback:
cycles_remaining -= 1
if not action_proposal.use_tool:
continue
handle_stop_signal()
if not feedback:
# Execute the command. Permission manager will prompt user if needed.
# If user provides feedback instead of approving, catch the exception
# and pass the feedback to the agent.
try:
result = await agent.execute(action_proposal)
else:
result = await agent.do_not_execute(action_proposal, feedback)
cycles_remaining -= 1
except UserFeedbackProvided as e:
result = await agent.do_not_execute(action_proposal, e.feedback)
logger.info(
f"Feedback provided: {e.feedback}",
extra={"title": "USER:", "title_color": Fore.MAGENTA},
)
if result.status == "success":
logger.info(result, extra={"title": "SYSTEM:", "title_color": Fore.YELLOW})