From 920a4c5f15b5a641b95ae76fab050d2c08303917 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Wed, 11 Feb 2026 20:39:00 +0400 Subject: [PATCH] feat(backend/chat): Allow Read/Write/Edit/Glob/Grep in SDK within workspace MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move these tools from fully-blocked to workspace-scoped: they are now allowed when the file path stays within the SDK working directory (/tmp/copilot-/) or the tool-results directory (~/.claude/projects/…/tool-results/). This enables the SDK's built-in oversized tool result handling and workspace file operations. - Add _validate_workspace_path() with normpath-based path validation - Pass sdk_cwd from service.py into create_security_hooks() - Add 20 unit tests covering allowed/denied paths, traversal attacks --- .../api/features/chat/sdk/security_hooks.py | 94 ++++++++--- .../features/chat/sdk/security_hooks_test.py | 156 ++++++++++++++++++ .../backend/api/features/chat/sdk/service.py | 2 +- 3 files changed, 226 insertions(+), 26 deletions(-) create mode 100644 autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks_test.py diff --git a/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks.py b/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks.py index 1bf74cf94e..a4a9267ec0 100644 --- a/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks.py +++ b/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks.py @@ -5,6 +5,7 @@ ensuring multi-user isolation and preventing unauthorized operations. """ import logging +import os import re from typing import Any, cast @@ -20,13 +21,13 @@ BLOCKED_TOOLS = { "exec", "terminal", "command", - "Read", # Block raw file read - use workspace tools instead - "Write", # Block raw file write - use workspace tools instead - "Edit", # Block raw file edit - use workspace tools instead - "Glob", # Block raw file glob - use workspace tools instead - "Grep", # Block raw file grep - use workspace tools instead } +# Tools allowed only when their path argument stays within the SDK workspace. +# The SDK uses these to handle oversized tool results (writes to tool-results/ +# files, then reads them back) and for workspace file operations. +WORKSPACE_SCOPED_TOOLS = {"Read", "Write", "Edit", "Glob", "Grep"} + # Dangerous patterns in tool inputs DANGEROUS_PATTERNS = [ r"sudo", @@ -45,7 +46,55 @@ DANGEROUS_PATTERNS = [ ] -def _validate_tool_access(tool_name: str, tool_input: dict[str, Any]) -> dict[str, Any]: +def _deny(reason: str) -> dict[str, Any]: + """Return a hook denial response.""" + return { + "hookSpecificOutput": { + "hookEventName": "PreToolUse", + "permissionDecision": "deny", + "permissionDecisionReason": reason, + } + } + + +def _validate_workspace_path( + tool_name: str, tool_input: dict[str, Any], sdk_cwd: str | None +) -> dict[str, Any]: + """Validate that a workspace-scoped tool only accesses allowed paths. + + Allowed directories: + - The SDK working directory (``/tmp/copilot-/``) + - The SDK tool-results directory (``~/.claude/projects/…/tool-results/``) + """ + path = tool_input.get("file_path") or tool_input.get("path") or "" + if not path: + # Glob/Grep without a path default to cwd which is already sandboxed + return {} + + resolved = os.path.normpath(os.path.expanduser(path)) + + # Allow access within the SDK working directory + if sdk_cwd: + norm_cwd = os.path.normpath(sdk_cwd) + if resolved.startswith(norm_cwd + os.sep) or resolved == norm_cwd: + return {} + + # Allow access to ~/.claude/projects/*/tool-results/ (big tool results) + claude_dir = os.path.normpath(os.path.expanduser("~/.claude/projects")) + if resolved.startswith(claude_dir + os.sep) and "tool-results" in resolved: + return {} + + logger.warning( + f"Blocked {tool_name} outside workspace: {path} (resolved={resolved})" + ) + return _deny( + f"Tool '{tool_name}' can only access files within the workspace directory." + ) + + +def _validate_tool_access( + tool_name: str, tool_input: dict[str, Any], sdk_cwd: str | None = None +) -> dict[str, Any]: """Validate that a tool call is allowed. Returns: @@ -54,16 +103,14 @@ def _validate_tool_access(tool_name: str, tool_input: dict[str, Any]) -> dict[st # Block forbidden tools if tool_name in BLOCKED_TOOLS: logger.warning(f"Blocked tool access attempt: {tool_name}") - return { - "hookSpecificOutput": { - "hookEventName": "PreToolUse", - "permissionDecision": "deny", - "permissionDecisionReason": ( - f"Tool '{tool_name}' is not available. " - "Use the CoPilot-specific tools instead." - ), - } - } + return _deny( + f"Tool '{tool_name}' is not available. " + "Use the CoPilot-specific tools instead." + ) + + # Workspace-scoped tools: allowed only within the SDK workspace directory + if tool_name in WORKSPACE_SCOPED_TOOLS: + return _validate_workspace_path(tool_name, tool_input, sdk_cwd) # Check for dangerous patterns in tool input input_str = str(tool_input) @@ -73,13 +120,7 @@ def _validate_tool_access(tool_name: str, tool_input: dict[str, Any]) -> dict[st logger.warning( f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}" ) - return { - "hookSpecificOutput": { - "hookEventName": "PreToolUse", - "permissionDecision": "deny", - "permissionDecisionReason": "Input contains blocked pattern", - } - } + return _deny("Input contains blocked pattern") return {} @@ -108,7 +149,9 @@ def _validate_user_isolation( return {} -def create_security_hooks(user_id: str | None) -> dict[str, Any]: +def create_security_hooks( + user_id: str | None, sdk_cwd: str | None = None +) -> dict[str, Any]: """Create the security hooks configuration for Claude Agent SDK. Includes security validation and observability hooks: @@ -119,6 +162,7 @@ def create_security_hooks(user_id: str | None) -> dict[str, Any]: Args: user_id: Current user ID for isolation validation + sdk_cwd: SDK working directory for workspace-scoped tool validation Returns: Hooks configuration dict for ClaudeAgentOptions @@ -144,7 +188,7 @@ def create_security_hooks(user_id: str | None) -> dict[str, Any]: # Only block non-CoPilot tools; our MCP-registered tools # (including Read for oversized results) are already sandboxed. if not is_copilot_tool: - result = _validate_tool_access(clean_name, tool_input) + result = _validate_tool_access(clean_name, tool_input, sdk_cwd) if result: return cast(SyncHookJSONOutput, result) diff --git a/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks_test.py b/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks_test.py new file mode 100644 index 0000000000..3e694dc425 --- /dev/null +++ b/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks_test.py @@ -0,0 +1,156 @@ +"""Unit tests for SDK security hooks.""" + +import os + +from .security_hooks import _validate_tool_access, _validate_user_isolation + +SDK_CWD = "/tmp/copilot-abc123" + + +def _is_denied(result: dict) -> bool: + hook = result.get("hookSpecificOutput", {}) + return hook.get("permissionDecision") == "deny" + + +# -- Blocked tools ----------------------------------------------------------- + + +def test_blocked_tools_denied(): + for tool in ("Bash", "bash", "shell", "exec", "terminal", "command"): + result = _validate_tool_access(tool, {}) + assert _is_denied(result), f"{tool} should be blocked" + + +def test_unknown_tool_allowed(): + result = _validate_tool_access("SomeCustomTool", {}) + assert result == {} + + +# -- Workspace-scoped tools -------------------------------------------------- + + +def test_read_within_workspace_allowed(): + result = _validate_tool_access( + "Read", {"file_path": f"{SDK_CWD}/file.txt"}, sdk_cwd=SDK_CWD + ) + assert result == {} + + +def test_write_within_workspace_allowed(): + result = _validate_tool_access( + "Write", {"file_path": f"{SDK_CWD}/output.json"}, sdk_cwd=SDK_CWD + ) + assert result == {} + + +def test_edit_within_workspace_allowed(): + result = _validate_tool_access( + "Edit", {"file_path": f"{SDK_CWD}/src/main.py"}, sdk_cwd=SDK_CWD + ) + assert result == {} + + +def test_glob_within_workspace_allowed(): + result = _validate_tool_access("Glob", {"path": f"{SDK_CWD}/src"}, sdk_cwd=SDK_CWD) + assert result == {} + + +def test_grep_within_workspace_allowed(): + result = _validate_tool_access("Grep", {"path": f"{SDK_CWD}/src"}, sdk_cwd=SDK_CWD) + assert result == {} + + +def test_read_outside_workspace_denied(): + result = _validate_tool_access( + "Read", {"file_path": "/etc/passwd"}, sdk_cwd=SDK_CWD + ) + assert _is_denied(result) + + +def test_write_outside_workspace_denied(): + result = _validate_tool_access( + "Write", {"file_path": "/home/user/secrets.txt"}, sdk_cwd=SDK_CWD + ) + assert _is_denied(result) + + +def test_traversal_attack_denied(): + result = _validate_tool_access( + "Read", + {"file_path": f"{SDK_CWD}/../../etc/passwd"}, + sdk_cwd=SDK_CWD, + ) + assert _is_denied(result) + + +def test_no_path_allowed(): + """Glob/Grep without a path argument defaults to cwd — should pass.""" + result = _validate_tool_access("Glob", {}, sdk_cwd=SDK_CWD) + assert result == {} + + +def test_read_no_cwd_denies_absolute(): + """If no sdk_cwd is set, absolute paths are denied.""" + result = _validate_tool_access("Read", {"file_path": "/tmp/anything"}) + assert _is_denied(result) + + +# -- Tool-results directory -------------------------------------------------- + + +def test_read_tool_results_allowed(): + home = os.path.expanduser("~") + path = f"{home}/.claude/projects/-tmp-copilot-abc123/tool-results/12345.txt" + result = _validate_tool_access("Read", {"file_path": path}, sdk_cwd=SDK_CWD) + assert result == {} + + +def test_read_claude_projects_without_tool_results_denied(): + home = os.path.expanduser("~") + path = f"{home}/.claude/projects/-tmp-copilot-abc123/settings.json" + result = _validate_tool_access("Read", {"file_path": path}, sdk_cwd=SDK_CWD) + assert _is_denied(result) + + +# -- Dangerous patterns ------------------------------------------------------ + + +def test_dangerous_pattern_blocked(): + result = _validate_tool_access("SomeTool", {"cmd": "sudo rm -rf /"}) + assert _is_denied(result) + + +def test_subprocess_pattern_blocked(): + result = _validate_tool_access("SomeTool", {"code": "subprocess.run(...)"}) + assert _is_denied(result) + + +# -- User isolation ---------------------------------------------------------- + + +def test_workspace_path_traversal_blocked(): + result = _validate_user_isolation( + "workspace_read", {"path": "../../../etc/shadow"}, user_id="user-1" + ) + assert _is_denied(result) + + +def test_workspace_absolute_path_blocked(): + result = _validate_user_isolation( + "workspace_read", {"path": "/etc/passwd"}, user_id="user-1" + ) + assert _is_denied(result) + + +def test_workspace_normal_path_allowed(): + result = _validate_user_isolation( + "workspace_read", {"path": "src/main.py"}, user_id="user-1" + ) + assert result == {} + + +def test_non_workspace_tool_passes_isolation(): + result = _validate_user_isolation( + "find_agent", {"query": "email"}, user_id="user-1" + ) + assert result == {} diff --git a/autogpt_platform/backend/backend/api/features/chat/sdk/service.py b/autogpt_platform/backend/backend/api/features/chat/sdk/service.py index c347156ead..ad91fcd3ee 100644 --- a/autogpt_platform/backend/backend/api/features/chat/sdk/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/sdk/service.py @@ -273,7 +273,7 @@ async def stream_chat_completion_sdk( system_prompt=system_prompt, mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type] allowed_tools=COPILOT_TOOL_NAMES, - hooks=create_security_hooks(user_id), # type: ignore[arg-type] + hooks=create_security_hooks(user_id, sdk_cwd=sdk_cwd), # type: ignore[arg-type] cwd=sdk_cwd, )