feat(classic): add workspace permissions system for agent commands

Add a layered permission system that controls agent command execution:

- Create autogpt.yaml in .autogpt/ folder with default allow/deny rules
- File operations in workspace allowed by default
- Sensitive files (.env, .key, .pem) blocked by default
- Dangerous shell commands (sudo, rm -rf) blocked by default
- Interactive prompts for unknown commands (y=agent, Y=workspace, n=deny)
- Agent-specific permissions stored in .autogpt/agents/{id}/permissions.yaml

Files added:
- forge/forge/config/workspace_settings.py - Pydantic models for settings
- forge/forge/permissions.py - CommandPermissionManager with pattern matching

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Nicholas Tindle
2026-01-18 17:39:33 -06:00
parent 7a20de880d
commit 8f3291bc92
6 changed files with 472 additions and 20 deletions

View File

@@ -34,6 +34,7 @@ from forge.llm.providers import CHAT_MODELS, ModelName, OpenAIModelName
from forge.llm.providers.schema import ChatModelInfo
from forge.models.action import ActionResult, AnyProposal
from forge.models.config import SystemConfiguration, SystemSettings, UserConfigurable
from forge.permissions import CommandPermissionManager
logger = logging.getLogger(__name__)
@@ -130,10 +131,12 @@ class BaseAgent(Generic[AnyProposal], metaclass=AgentMeta):
def __init__(
self,
settings: BaseAgentSettings,
permission_manager: Optional[CommandPermissionManager] = None,
):
self.state = settings
self.components: list[AgentComponent] = []
self.config = settings.config
self.permission_manager = permission_manager
# Execution data for debugging
self._trace: list[str] = []
@@ -156,24 +159,21 @@ class BaseAgent(Generic[AnyProposal], metaclass=AgentMeta):
return self.config.send_token_limit or self.llm.max_tokens * 3 // 4
@abstractmethod
async def propose_action(self) -> AnyProposal:
...
async def propose_action(self) -> AnyProposal: ...
@abstractmethod
async def execute(
self,
proposal: AnyProposal,
user_feedback: str = "",
) -> ActionResult:
...
) -> ActionResult: ...
@abstractmethod
async def do_not_execute(
self,
denied_proposal: AnyProposal,
user_feedback: str,
) -> ActionResult:
...
) -> ActionResult: ...
def reset_trace(self):
self._trace = []
@@ -181,8 +181,7 @@ class BaseAgent(Generic[AnyProposal], metaclass=AgentMeta):
@overload
async def run_pipeline(
self, protocol_method: Callable[P, Iterator[T]], *args, retry_limit: int = 3
) -> list[T]:
...
) -> list[T]: ...
@overload
async def run_pipeline(
@@ -190,8 +189,7 @@ class BaseAgent(Generic[AnyProposal], metaclass=AgentMeta):
protocol_method: Callable[P, None | Awaitable[None]],
*args,
retry_limit: int = 3,
) -> list[None]:
...
) -> list[None]: ...
async def run_pipeline(
self,

View File

@@ -0,0 +1,134 @@
"""Workspace and agent permission settings for AutoGPT."""
from __future__ import annotations
from pathlib import Path
import yaml
from pydantic import BaseModel, Field
class PermissionsConfig(BaseModel):
"""Configuration for allow/deny permission patterns."""
allow: list[str] = Field(default_factory=list)
deny: list[str] = Field(default_factory=list)
class WorkspaceSettings(BaseModel):
"""Workspace-level permissions that apply to all agents."""
permissions: PermissionsConfig = Field(
default_factory=lambda: PermissionsConfig(
allow=[
"read_file({workspace}/**)",
"write_to_file({workspace}/**)",
"list_folder({workspace}/**)",
],
deny=[
"read_file(**.env)",
"read_file(**.env.*)",
"read_file(**.key)",
"read_file(**.pem)",
"execute_shell(rm -rf:*)",
"execute_shell(sudo:*)",
],
)
)
@classmethod
def load_or_create(cls, workspace: Path) -> "WorkspaceSettings":
"""Load settings from workspace or create default settings file.
Args:
workspace: Path to the workspace directory.
Returns:
WorkspaceSettings instance.
"""
autogpt_dir = workspace / ".autogpt"
settings_path = autogpt_dir / "autogpt.yaml"
if settings_path.exists():
with open(settings_path) as f:
data = yaml.safe_load(f)
return cls.model_validate(data or {})
settings = cls()
settings.save(workspace)
return settings
def save(self, workspace: Path) -> None:
"""Save settings to the workspace .autogpt/autogpt.yaml file.
Args:
workspace: Path to the workspace directory.
"""
autogpt_dir = workspace / ".autogpt"
autogpt_dir.mkdir(parents=True, exist_ok=True)
settings_path = autogpt_dir / "autogpt.yaml"
with open(settings_path, "w") as f:
f.write("# autogpt.yaml - Workspace Permissions (all agents)\n")
f.write("# Auto-generated and updated as you grant permissions\n\n")
yaml.safe_dump(
self.model_dump(), f, default_flow_style=False, sort_keys=False
)
def add_permission(self, pattern: str, workspace: Path) -> None:
"""Add a permission pattern to the allow list.
Args:
pattern: The permission pattern to add.
workspace: Path to the workspace directory for saving.
"""
if pattern not in self.permissions.allow:
self.permissions.allow.append(pattern)
self.save(workspace)
class AgentPermissions(BaseModel):
"""Agent-specific permissions that override workspace settings."""
permissions: PermissionsConfig = Field(default_factory=PermissionsConfig)
@classmethod
def load_or_create(cls, agent_dir: Path) -> "AgentPermissions":
"""Load agent permissions or create empty permissions.
Args:
agent_dir: Path to the agent's data directory.
Returns:
AgentPermissions instance.
"""
settings_path = agent_dir / "permissions.yaml"
if settings_path.exists():
with open(settings_path) as f:
data = yaml.safe_load(f)
return cls.model_validate(data or {})
return cls()
def save(self, agent_dir: Path) -> None:
"""Save agent permissions to permissions.yaml.
Args:
agent_dir: Path to the agent's data directory.
"""
settings_path = agent_dir / "permissions.yaml"
# Ensure directory exists
agent_dir.mkdir(parents=True, exist_ok=True)
with open(settings_path, "w") as f:
f.write("# Agent-specific permissions\n")
f.write("# These override workspace-level permissions\n\n")
yaml.safe_dump(
self.model_dump(), f, default_flow_style=False, sort_keys=False
)
def add_permission(self, pattern: str, agent_dir: Path) -> None:
"""Add a permission pattern to the agent's allow list.
Args:
pattern: The permission pattern to add.
agent_dir: Path to the agent's data directory for saving.
"""
if pattern not in self.permissions.allow:
self.permissions.allow.append(pattern)
self.save(agent_dir)

View File

@@ -0,0 +1,248 @@
"""Permission management for agent command execution."""
from __future__ import annotations
import re
from enum import Enum
from pathlib import Path
from typing import Any, Callable
from forge.config.workspace_settings import AgentPermissions, WorkspaceSettings
class ApprovalScope(str, Enum):
"""Scope of permission approval."""
AGENT = "agent" # y - this agent only
WORKSPACE = "workspace" # Y - all agents
DENY = "deny" # n - deny for session
class CommandPermissionManager:
"""Manages layered permissions for agent command execution.
Check order (first match wins):
1. Agent deny list → block
2. Workspace deny list → block
3. Agent allow list → allow
4. Workspace allow list → allow
5. No match → prompt user
"""
def __init__(
self,
workspace: Path,
agent_dir: Path,
workspace_settings: WorkspaceSettings,
agent_permissions: AgentPermissions,
prompt_fn: Callable[[str, str, dict], ApprovalScope] | None = None,
):
"""Initialize the permission manager.
Args:
workspace: Path to the workspace directory.
agent_dir: Path to the agent's data directory.
workspace_settings: Workspace-level permission settings.
agent_permissions: Agent-specific permission settings.
prompt_fn: Callback to prompt user for permission.
Takes (command_name, args_str, arguments) and returns ApprovalScope.
"""
self.workspace = workspace.resolve()
self.agent_dir = agent_dir
self.workspace_settings = workspace_settings
self.agent_permissions = agent_permissions
self.prompt_fn = prompt_fn
self._session_denied: set[str] = set()
def check_command(self, command_name: str, arguments: dict[str, Any]) -> bool:
"""Check if command execution is allowed. Prompts if needed.
Args:
command_name: Name of the command to check.
arguments: Command arguments.
Returns:
True if command is allowed, False if denied.
"""
args_str = self._format_args(command_name, arguments)
perm_string = f"{command_name}({args_str})"
# 1. Check agent deny list
if self._matches_patterns(
command_name, args_str, self.agent_permissions.permissions.deny
):
return False
# 2. Check workspace deny list
if self._matches_patterns(
command_name, args_str, self.workspace_settings.permissions.deny
):
return False
# 3. Check agent allow list
if self._matches_patterns(
command_name, args_str, self.agent_permissions.permissions.allow
):
return True
# 4. Check workspace allow list
if self._matches_patterns(
command_name, args_str, self.workspace_settings.permissions.allow
):
return True
# 5. Check session denials
if perm_string in self._session_denied:
return False
# 6. Prompt user
if self.prompt_fn is None:
return False
scope = self.prompt_fn(command_name, args_str, arguments)
pattern = self._generalize_pattern(command_name, args_str)
if scope == ApprovalScope.WORKSPACE:
self.workspace_settings.add_permission(pattern, self.workspace)
return True
elif scope == ApprovalScope.AGENT:
self.agent_permissions.add_permission(pattern, self.agent_dir)
return True
else:
self._session_denied.add(perm_string)
return False
def _format_args(self, command_name: str, arguments: dict[str, Any]) -> str:
"""Format command arguments for pattern matching.
Args:
command_name: Name of the command.
arguments: Command arguments dict.
Returns:
Formatted arguments string.
"""
# For file operations, use the resolved file path for symlink handling
if command_name in ("read_file", "write_to_file", "list_folder"):
path = arguments.get("filename") or arguments.get("path") or ""
if path:
return str(Path(path).resolve())
return ""
# For shell commands, format as "command:args"
if command_name in ("execute_shell", "execute_python"):
cmd = arguments.get("command_line") or arguments.get("code") or ""
return str(cmd)
# For web operations
if command_name == "web_search":
query = arguments.get("query", "")
return str(query)
if command_name == "read_webpage":
url = arguments.get("url", "")
return str(url)
# Generic: join all argument values
if arguments:
return ":".join(str(v) for v in arguments.values())
return "*"
def _matches_patterns(self, cmd: str, args: str, patterns: list[str]) -> bool:
"""Check if command matches any pattern in the list.
Args:
cmd: Command name.
args: Formatted arguments string.
patterns: List of permission patterns.
Returns:
True if any pattern matches.
"""
for pattern in patterns:
if self._pattern_matches(pattern, cmd, args):
return True
return False
def _pattern_matches(self, pattern: str, cmd: str, args: str) -> bool:
"""Check if a single pattern matches the command.
Args:
pattern: Permission pattern like "command_name(glob_pattern)".
cmd: Command name.
args: Formatted arguments string.
Returns:
True if pattern matches.
"""
# Parse pattern: command_name(args_pattern)
match = re.match(r"^(\w+)\((.+)\)$", pattern)
if not match:
return False
pattern_cmd, args_pattern = match.groups()
# Command name must match
if pattern_cmd != cmd:
return False
# Expand {workspace} placeholder
args_pattern = args_pattern.replace("{workspace}", str(self.workspace))
# Convert glob pattern to regex
# ** matches any path (including /)
# * matches any characters except /
regex_pattern = args_pattern
regex_pattern = re.escape(regex_pattern)
# Restore glob patterns
regex_pattern = regex_pattern.replace(r"\*\*", ".*")
regex_pattern = regex_pattern.replace(r"\*", "[^/]*")
regex_pattern = f"^{regex_pattern}$"
try:
return bool(re.match(regex_pattern, args))
except re.error:
return False
def _generalize_pattern(self, command_name: str, args_str: str) -> str:
"""Create a generalized pattern from specific command args.
Args:
command_name: Name of the command.
args_str: Formatted arguments string.
Returns:
Generalized permission pattern.
"""
# For file paths, generalize to parent directory
if command_name in ("read_file", "write_to_file", "list_folder"):
path = Path(args_str)
# If within workspace, use {workspace} placeholder
try:
rel = path.resolve().relative_to(self.workspace)
return f"{command_name}({{workspace}}/{rel.parent}/*)"
except ValueError:
# Outside workspace, use exact path
return f"{command_name}({path})"
# For shell commands, use command:* pattern
if command_name in ("execute_shell", "execute_python"):
# Extract command name (first word)
parts = args_str.split()
if parts:
base_cmd = parts[0]
return f"{command_name}({base_cmd}:*)"
return f"{command_name}(*)"
# For web operations
if command_name == "web_search":
return "web_search(*)"
if command_name == "read_webpage":
# Extract domain
match = re.match(r"https?://([^/]+)", args_str)
if match:
domain = match.group(1)
return f"read_webpage(*{domain}*)"
return "read_webpage(*)"
# Generic: use wildcard
return f"{command_name}(*)"

View File

@@ -4,6 +4,7 @@ from forge.config.ai_directives import AIDirectives
from forge.config.ai_profile import AIProfile
from forge.file_storage.base import FileStorage
from forge.llm.providers import MultiProvider
from forge.permissions import CommandPermissionManager
from autogpt.agents.agent import Agent, AgentConfiguration, AgentSettings
from autogpt.app.config import AppConfig
@@ -17,6 +18,7 @@ def create_agent(
llm_provider: MultiProvider,
ai_profile: Optional[AIProfile] = None,
directives: Optional[AIDirectives] = None,
permission_manager: Optional[CommandPermissionManager] = None,
) -> Agent:
if not task:
raise ValueError("No task specified for new agent")
@@ -31,6 +33,7 @@ def create_agent(
app_config=app_config,
file_storage=file_storage,
llm_provider=llm_provider,
permission_manager=permission_manager,
)
return agent
@@ -41,12 +44,14 @@ def configure_agent_with_state(
app_config: AppConfig,
file_storage: FileStorage,
llm_provider: MultiProvider,
permission_manager: Optional[CommandPermissionManager] = None,
) -> Agent:
return _configure_agent(
state=state,
app_config=app_config,
file_storage=file_storage,
llm_provider=llm_provider,
permission_manager=permission_manager,
)
@@ -59,6 +64,7 @@ def _configure_agent(
ai_profile: Optional[AIProfile] = None,
directives: Optional[AIDirectives] = None,
state: Optional[AgentSettings] = None,
permission_manager: Optional[CommandPermissionManager] = None,
) -> Agent:
if state:
agent_state = state
@@ -81,6 +87,7 @@ def _configure_agent(
llm_provider=llm_provider,
file_storage=file_storage,
app_config=app_config,
permission_manager=permission_manager,
)

View File

@@ -30,6 +30,7 @@ from forge.components.image_gen import ImageGeneratorComponent
from forge.components.system import SystemComponent
from forge.components.user_interaction import UserInteractionComponent
from forge.components.watchdog import WatchdogComponent
from forge.components.todo import TodoComponent
from forge.components.web import WebSearchComponent, WebSeleniumComponent
from forge.file_storage.base import FileStorage
from forge.llm.prompting.schema import ChatPrompt
@@ -48,6 +49,7 @@ from forge.models.action import (
ActionSuccessResult,
)
from forge.models.config import Configurable
from forge.permissions import CommandPermissionManager
from forge.utils.exceptions import (
AgentException,
AgentTerminated,
@@ -96,8 +98,9 @@ class Agent(BaseAgent[OneShotAgentActionProposal], Configurable[AgentSettings]):
llm_provider: MultiProvider,
file_storage: FileStorage,
app_config: AppConfig,
permission_manager: Optional[CommandPermissionManager] = None,
):
super().__init__(settings)
super().__init__(settings, permission_manager=permission_manager)
self.llm_provider = llm_provider
prompt_config = OneShotAgentPromptStrategy.default_configuration.model_copy(
@@ -142,6 +145,7 @@ class Agent(BaseAgent[OneShotAgentActionProposal], Configurable[AgentSettings]):
app_config.app_data_dir,
)
self.context = ContextComponent(self.file_manager.workspace, settings.context)
self.todo = TodoComponent()
self.watchdog = WatchdogComponent(settings.config, settings.history).run_after(
ContextComponent
)
@@ -201,14 +205,14 @@ class Agent(BaseAgent[OneShotAgentActionProposal], Configurable[AgentSettings]):
if exception:
prompt.messages.append(ChatMessage.system(f"Error: {exception}"))
response: ChatModelResponse[
OneShotAgentActionProposal
] = await self.llm_provider.create_chat_completion(
prompt.messages,
model_name=self.llm.name,
completion_parser=self.prompt_strategy.parse_response_content,
functions=prompt.functions,
prefill_response=prompt.prefill_response,
response: ChatModelResponse[OneShotAgentActionProposal] = (
await self.llm_provider.create_chat_completion(
prompt.messages,
model_name=self.llm.name,
completion_parser=self.prompt_strategy.parse_response_content,
functions=prompt.functions,
prefill_response=prompt.prefill_response,
)
)
result = response.parsed_result
@@ -227,6 +231,13 @@ class Agent(BaseAgent[OneShotAgentActionProposal], Configurable[AgentSettings]):
self.commands = await self.run_pipeline(CommandProvider.get_commands)
self._remove_disabled_commands()
# Check permissions before execution
if self.permission_manager:
if not self.permission_manager.check_command(tool.name, tool.arguments):
return ActionErrorResult(
reason=f"Permission denied for command '{tool.name}'",
)
try:
return_value = await self._execute_tool(tool)

View File

@@ -21,12 +21,14 @@ from forge.components.code_executor.code_executor import (
)
from forge.config.ai_directives import AIDirectives
from forge.config.ai_profile import AIProfile
from forge.config.workspace_settings import AgentPermissions, WorkspaceSettings
from forge.file_storage import FileStorageBackendName, get_storage
from forge.llm.providers import MultiProvider
from forge.logging.config import configure_logging
from forge.logging.utils import print_attribute, speak
from forge.models.action import ActionInterruptedByHuman, ActionProposal
from forge.models.utils import ModelWithSummary
from forge.permissions import ApprovalScope, CommandPermissionManager
from forge.utils.const import FINISH_COMMAND
from forge.utils.exceptions import AgentTerminated, InvalidAgentResponseError
@@ -87,6 +89,9 @@ async def run_auto_gpt(
# Agent data is stored in .autogpt/ subdirectory of the workspace
data_dir = workspace / ".autogpt"
# Load workspace settings (creates autogpt.yaml if missing)
workspace_settings = WorkspaceSettings.load_or_create(workspace)
# Storage
local = config.file_storage_backend == FileStorageBackendName.LOCAL
restrict_to_root = not local or config.restrict_to_workspace
@@ -97,6 +102,30 @@ async def run_auto_gpt(
)
file_storage.initialize()
# Create prompt callback for permission requests
def prompt_permission(cmd: str, args_str: str, args: dict) -> ApprovalScope:
"""Prompt user for command permission.
Args:
cmd: Command name.
args_str: Formatted arguments string.
args: Full arguments dictionary.
Returns:
ApprovalScope indicating user's choice.
"""
response = clean_input(
f"\nAgent wants to execute:\n"
f" {cmd}({args_str})\n"
f"Allow? [y=this agent / Y=all agents / n=deny] "
)
if response in ("Y", "YES", "all"):
return ApprovalScope.WORKSPACE
elif response.lower() in ("y", "yes"):
return ApprovalScope.AGENT
else:
return ApprovalScope.DENY
# Set up logging module
if speak:
config.tts_config.speak_mode = True
@@ -199,11 +228,23 @@ async def run_auto_gpt(
break
if agent_state:
# Create permission manager for this agent
agent_dir = data_dir / "agents" / agent_state.agent_id
agent_permissions = AgentPermissions.load_or_create(agent_dir)
perm_manager = CommandPermissionManager(
workspace=workspace,
agent_dir=agent_dir,
workspace_settings=workspace_settings,
agent_permissions=agent_permissions,
prompt_fn=prompt_permission if not config.noninteractive_mode else None,
)
agent = configure_agent_with_state(
state=agent_state,
app_config=config,
file_storage=file_storage,
llm_provider=llm_provider,
permission_manager=perm_manager,
)
apply_overrides_to_ai_settings(
ai_profile=agent.state.ai_profile,
@@ -296,14 +337,27 @@ async def run_auto_gpt(
else:
logger.info("AI config overrides specified through CLI; skipping revision")
# Generate agent ID and create permission manager
new_agent_id = agent_manager.generate_id(ai_profile.ai_name)
agent_dir = data_dir / "agents" / new_agent_id
agent_permissions = AgentPermissions.load_or_create(agent_dir)
perm_manager = CommandPermissionManager(
workspace=workspace,
agent_dir=agent_dir,
workspace_settings=workspace_settings,
agent_permissions=agent_permissions,
prompt_fn=prompt_permission if not config.noninteractive_mode else None,
)
agent = create_agent(
agent_id=agent_manager.generate_id(ai_profile.ai_name),
agent_id=new_agent_id,
task=task,
ai_profile=ai_profile,
directives=additional_ai_directives,
app_config=config,
file_storage=file_storage,
llm_provider=llm_provider,
permission_manager=perm_manager,
)
file_manager = agent.file_manager