feat(chat/sandbox): bubblewrap sandbox for bash_exec, remove python_exec

- Replace `--ro-bind / /` with whitelist-only filesystem: only /usr, /etc,
  /bin, /lib, /sbin mounted read-only. /app, /root, /home, /opt, /var are
  completely invisible inside the sandbox.
- Add `--clearenv` to wipe all inherited env vars (API keys, DB passwords).
  Only safe vars (PATH, HOME=workspace, LANG) are explicitly set.
- Remove python_exec tool — bash_exec can run `python3 -c` or heredocs with
  identical bubblewrap protection, reducing attack surface.
- Remove all fallback security code (import hooks, blocked modules, network
  command lists). Tools now hard-require bubblewrap — disabled on platforms
  without bwrap.
- Clean up security_hooks.py: remove ~200 lines of dead bash validation code,
  add Bash to BLOCKED_TOOLS as defence-in-depth.
- Wire up long-running tool callback in SDK service for create_agent/edit_agent
  delegation to Redis Streams background infrastructure.
This commit is contained in:
Zamil Majdy
2026-02-12 21:44:40 +04:00
parent 06c16ee2fe
commit 1dd53db21c
11 changed files with 481 additions and 550 deletions

View File

@@ -62,8 +62,9 @@ ENV POETRY_HOME=/opt/poetry \
DEBIAN_FRONTEND=noninteractive
ENV PATH=/opt/poetry/bin:$PATH
# Install Python, FFmpeg, ImageMagick, and CLI tools for agent use
# CLI tools match ALLOWED_BASH_COMMANDS in security_hooks.py
# Install Python, FFmpeg, ImageMagick, and CLI tools for agent use.
# bubblewrap provides OS-level sandbox (whitelist-only FS + no network)
# for the bash_exec MCP tool.
RUN apt-get update && apt-get install -y \
python3.13 \
python3-pip \
@@ -72,6 +73,7 @@ RUN apt-get update && apt-get install -y \
jq \
ripgrep \
tree \
bubblewrap \
&& rm -rf /var/lib/apt/lists/*
# Copy only necessary files from builder

View File

@@ -8,15 +8,18 @@ import json
import logging
import os
import re
import shlex
from typing import Any, cast
from backend.api.features.chat.sdk.tool_adapter import MCP_TOOL_PREFIX
logger = logging.getLogger(__name__)
# Tools that are blocked entirely (CLI/system access)
# Tools that are blocked entirely (CLI/system access).
# "Bash" (capital) is the SDK built-in — it's NOT in allowed_tools but blocked
# here as defence-in-depth. The agent uses mcp__copilot__bash_exec instead,
# which has kernel-level network isolation (unshare --net).
BLOCKED_TOOLS = {
"Bash",
"bash",
"shell",
"exec",
@@ -24,66 +27,11 @@ BLOCKED_TOOLS = {
"command",
}
# Safe read-only commands allowed in the sandboxed Bash tool.
# These are data-processing / inspection utilities — no writes, no network.
ALLOWED_BASH_COMMANDS = {
# JSON / structured data
"jq",
# Text processing
"grep",
"egrep",
"fgrep",
"rg",
"head",
"tail",
"cat",
"wc",
"sort",
"uniq",
"cut",
"tr",
"sed",
"awk",
"column",
"fold",
"fmt",
"nl",
"paste",
"rev",
# File inspection (read-only)
"find",
"ls",
"file",
"stat",
"du",
"tree",
"basename",
"dirname",
"realpath",
# Utilities
"echo",
"printf",
"date",
"true",
"false",
"xargs",
"tee",
# Comparison / encoding
"diff",
"comm",
"base64",
"md5sum",
"sha256sum",
}
# Tools allowed only when their path argument stays within the SDK workspace.
# The SDK uses these to handle oversized tool results (writes to tool-results/
# files, then reads them back) and for workspace file operations.
WORKSPACE_SCOPED_TOOLS = {"Read", "Write", "Edit", "Glob", "Grep"}
# Tools that get sandboxed Bash validation (command allowlist + workspace paths).
SANDBOXED_BASH_TOOLS = {"Bash"}
# Dangerous patterns in tool inputs
DANGEROUS_PATTERNS = [
r"sudo",
@@ -144,88 +92,11 @@ def _validate_workspace_path(
f"Blocked {tool_name} outside workspace: {path} (resolved={resolved})"
)
return _deny(
f"Tool '{tool_name}' can only access files within the workspace directory."
f"[SECURITY] Tool '{tool_name}' can only access files within the workspace "
"directory. This is enforced by the platform and cannot be bypassed."
)
def _validate_bash_command(
tool_input: dict[str, Any], sdk_cwd: str | None
) -> dict[str, Any]:
"""Validate a Bash command against the allowlist of safe commands.
Only read-only data-processing commands are allowed (jq, grep, head, etc.).
Blocks command substitution, output redirection, and disallowed executables.
Uses ``shlex.split`` to properly handle quoted strings (e.g. jq filters
containing ``|`` won't be mistaken for shell pipes).
"""
command = tool_input.get("command", "")
if not command or not isinstance(command, str):
return _deny("Bash command is empty.")
# Block command substitution — can smuggle arbitrary commands
if "$(" in command or "`" in command:
return _deny("Command substitution ($() or ``) is not allowed in Bash.")
# Block output redirection — Bash should be read-only.
# Strip quoted strings first so `jq '.x > 5'` isn't a false positive,
# then check for unquoted > or >> (with or without surrounding spaces).
unquoted = re.sub(r"'[^']*'|\"[^\"]*\"", "", command)
if re.search(r"(?<![0-9&])>{1,2}", unquoted):
return _deny("Output redirection (> or >>) is not allowed in Bash.")
# Block /dev/ access (e.g., /dev/tcp for network)
if "/dev/" in command:
return _deny("Access to /dev/ is not allowed in Bash.")
# Tokenize with shlex (respects quotes), then extract command names.
# shlex preserves shell operators like | ; && || as separate tokens.
try:
tokens = shlex.split(command)
except ValueError:
return _deny("Malformed command (unmatched quotes).")
# Walk tokens: the first non-assignment token after a pipe/separator is a command.
expect_command = True
for token in tokens:
if token in ("|", "||", "&&", ";"):
expect_command = True
continue
if expect_command:
# Skip env var assignments (VAR=value)
if "=" in token and not token.startswith("-"):
continue
cmd_name = os.path.basename(token)
if cmd_name not in ALLOWED_BASH_COMMANDS:
allowed = ", ".join(sorted(ALLOWED_BASH_COMMANDS))
logger.warning(f"Blocked Bash command: {cmd_name}")
return _deny(
f"Command '{cmd_name}' is not allowed. "
f"Allowed commands: {allowed}"
)
expect_command = False
# Validate absolute file paths stay within workspace
if sdk_cwd:
norm_cwd = os.path.normpath(sdk_cwd)
claude_dir = os.path.normpath(os.path.expanduser("~/.claude/projects"))
for token in tokens:
if not token.startswith("/"):
continue
resolved = os.path.normpath(token)
if resolved.startswith(norm_cwd + os.sep) or resolved == norm_cwd:
continue
if resolved.startswith(claude_dir + os.sep) and "tool-results" in resolved:
continue
logger.warning(f"Blocked Bash path outside workspace: {token}")
return _deny(
f"Bash can only access files within the workspace directory. "
f"Path '{token}' is outside the workspace."
)
return {}
def _validate_tool_access(
tool_name: str, tool_input: dict[str, Any], sdk_cwd: str | None = None
) -> dict[str, Any]:
@@ -238,14 +109,11 @@ def _validate_tool_access(
if tool_name in BLOCKED_TOOLS:
logger.warning(f"Blocked tool access attempt: {tool_name}")
return _deny(
f"Tool '{tool_name}' is not available. "
"Use the CoPilot-specific tools instead."
f"[SECURITY] Tool '{tool_name}' is blocked for security. "
"This is enforced by the platform and cannot be bypassed. "
"Use the CoPilot-specific MCP tools instead."
)
# Sandboxed Bash: only allowlisted commands, workspace-scoped paths
if tool_name in SANDBOXED_BASH_TOOLS:
return _validate_bash_command(tool_input, sdk_cwd)
# Workspace-scoped tools: allowed only within the SDK workspace directory
if tool_name in WORKSPACE_SCOPED_TOOLS:
return _validate_workspace_path(tool_name, tool_input, sdk_cwd)
@@ -259,7 +127,10 @@ def _validate_tool_access(
logger.warning(
f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}"
)
return _deny("Input contains blocked pattern")
return _deny(
"[SECURITY] Input contains a blocked pattern. "
"This is enforced by the platform and cannot be bypassed."
)
return {}

View File

@@ -10,6 +10,7 @@ from typing import Any
from backend.util.exceptions import NotFoundError
from .. import stream_registry
from ..config import ChatConfig
from ..model import (
ChatMessage,
@@ -27,13 +28,19 @@ from ..response_model import (
StreamToolInputAvailable,
StreamToolOutputAvailable,
)
from ..service import _build_system_prompt, _generate_session_title
from ..service import (
_build_system_prompt,
_execute_long_running_tool_with_streaming,
_generate_session_title,
)
from ..tools.models import OperationPendingResponse, OperationStartedResponse
from ..tools.sandbox import WORKSPACE_PREFIX, make_session_path
from ..tracking import track_user_message
from .response_adapter import SDKResponseAdapter
from .security_hooks import create_security_hooks
from .tool_adapter import (
COPILOT_TOOL_NAMES,
LongRunningCallback,
create_copilot_mcp_server,
set_execution_context,
)
@@ -47,21 +54,136 @@ _background_tasks: set[asyncio.Task[Any]] = set()
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
# Appended to the system prompt to inform the agent about Bash restrictions.
# The SDK already describes each tool (Read, Write, Edit, Glob, Grep, Bash),
# but it doesn't know about our security hooks' command allowlist for Bash.
# Appended to the system prompt to inform the agent about available tools.
# The SDK built-in Bash is NOT available — use mcp__copilot__bash_exec instead,
# which has kernel-level network isolation (unshare --net).
_SDK_TOOL_SUPPLEMENT = """
## Bash restrictions
## Tool notes
The Bash tool is restricted to safe, read-only data-processing commands:
jq, grep, head, tail, cat, wc, sort, uniq, cut, tr, sed, awk, find, ls,
echo, diff, base64, and similar utilities.
Network commands (curl, wget), destructive commands (rm, chmod), and
interpreters (python, node) are NOT available.
- The SDK built-in Bash tool is NOT available. Use the `bash_exec` MCP tool
for shell commands — it runs in a network-isolated sandbox.
- Long-running tools (create_agent, edit_agent, etc.) are handled
asynchronously. You will receive an immediate response; the actual result
is delivered to the user via a background stream.
"""
def _build_long_running_callback(user_id: str | None) -> LongRunningCallback:
"""Build a callback that delegates long-running tools to the non-SDK infrastructure.
Long-running tools (create_agent, edit_agent, etc.) are delegated to the
existing background infrastructure: stream_registry (Redis Streams),
database persistence, and SSE reconnection. This means results survive
page refreshes / pod restarts, and the frontend shows the proper loading
widget with progress updates.
The returned callback matches the ``LongRunningCallback`` signature:
``(tool_name, args, session) -> MCP response dict``.
"""
async def _callback(
tool_name: str, args: dict[str, Any], session: ChatSession
) -> dict[str, Any]:
operation_id = str(uuid.uuid4())
task_id = str(uuid.uuid4())
tool_call_id = f"sdk-{uuid.uuid4().hex[:12]}"
session_id = session.session_id
# --- Build user-friendly messages (matches non-SDK service) ---
if tool_name == "create_agent":
desc = args.get("description", "")
desc_preview = (desc[:100] + "...") if len(desc) > 100 else desc
pending_msg = (
f"Creating your agent: {desc_preview}"
if desc_preview
else "Creating agent... This may take a few minutes."
)
started_msg = (
"Agent creation started. You can close this tab - "
"check your library in a few minutes."
)
elif tool_name == "edit_agent":
changes = args.get("changes", "")
changes_preview = (changes[:100] + "...") if len(changes) > 100 else changes
pending_msg = (
f"Editing agent: {changes_preview}"
if changes_preview
else "Editing agent... This may take a few minutes."
)
started_msg = (
"Agent edit started. You can close this tab - "
"check your library in a few minutes."
)
else:
pending_msg = f"Running {tool_name}... This may take a few minutes."
started_msg = (
f"{tool_name} started. You can close this tab - "
"check back in a few minutes."
)
# --- Register task in Redis for SSE reconnection ---
await stream_registry.create_task(
task_id=task_id,
session_id=session_id,
user_id=user_id,
tool_call_id=tool_call_id,
tool_name=tool_name,
operation_id=operation_id,
)
# --- Save OperationPendingResponse to chat history ---
pending_message = ChatMessage(
role="tool",
content=OperationPendingResponse(
message=pending_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
tool_call_id=tool_call_id,
)
session.messages.append(pending_message)
await upsert_chat_session(session)
# --- Spawn background task (reuses non-SDK infrastructure) ---
bg_task = asyncio.create_task(
_execute_long_running_tool_with_streaming(
tool_name=tool_name,
parameters=args,
tool_call_id=tool_call_id,
operation_id=operation_id,
task_id=task_id,
session_id=session_id,
user_id=user_id,
)
)
_background_tasks.add(bg_task)
bg_task.add_done_callback(_background_tasks.discard)
await stream_registry.set_task_asyncio_task(task_id, bg_task)
logger.info(
f"[SDK] Long-running tool {tool_name} delegated to background "
f"(operation_id={operation_id}, task_id={task_id})"
)
# --- Return OperationStartedResponse as MCP tool result ---
# This flows through SDK → response adapter → frontend, triggering
# the loading widget with SSE reconnection support.
started_json = OperationStartedResponse(
message=started_msg,
operation_id=operation_id,
tool_name=tool_name,
task_id=task_id,
).model_dump_json()
return {
"content": [{"type": "text", "text": started_json}],
"isError": False,
}
return _callback
def _resolve_sdk_model() -> str | None:
"""Resolve the model name for the Claude Agent SDK CLI.
@@ -339,7 +461,11 @@ async def stream_chat_completion_sdk(
sdk_cwd = _make_sdk_cwd(session_id)
os.makedirs(sdk_cwd, exist_ok=True)
set_execution_context(user_id, session)
set_execution_context(
user_id,
session,
long_running_callback=_build_long_running_callback(user_id),
)
try:
try:

View File

@@ -2,15 +2,19 @@
This module provides the adapter layer that converts existing BaseTool implementations
into in-process MCP tools that can be used with the Claude Agent SDK.
Long-running tools (``is_long_running=True``) are delegated to the non-SDK
background infrastructure (stream_registry, Redis persistence, SSE reconnection)
via a callback provided by the service layer. This avoids wasteful SDK polling
and makes results survive page refreshes.
"""
import asyncio
import json
import logging
import os
import uuid
from collections.abc import Awaitable, Callable
from contextvars import ContextVar
from dataclasses import dataclass
from typing import Any
from backend.api.features.chat.model import ChatSession
@@ -40,37 +44,38 @@ _pending_tool_outputs: ContextVar[dict[str, str]] = ContextVar(
"pending_tool_outputs", default=None # type: ignore[arg-type]
)
# Callback type for delegating long-running tools to the non-SDK infrastructure.
# Args: (tool_name, arguments, session) → MCP-formatted response dict.
LongRunningCallback = Callable[
[str, dict[str, Any], ChatSession], Awaitable[dict[str, Any]]
]
@dataclass
class _BackgroundOp:
"""Tracks a background tool operation."""
tool_name: str
task: asyncio.Task[Any]
result: dict[str, Any] | None = None
done: bool = False
# Module-level registry for background long-running operations.
# Keyed by operation_id. Cleaned up after result is consumed.
_background_ops: dict[str, _BackgroundOp] = {}
_background_ops_lock = asyncio.Lock()
_CHECK_OP_TOOL_NAME = "check_operation"
# ContextVar so the service layer can inject the callback per-request.
_long_running_callback: ContextVar[LongRunningCallback | None] = ContextVar(
"long_running_callback", default=None
)
def set_execution_context(
user_id: str | None,
session: ChatSession,
long_running_callback: LongRunningCallback | None = None,
) -> None:
"""Set the execution context for tool calls.
This must be called before streaming begins to ensure tools have access
to user_id and session information.
Args:
user_id: Current user's ID.
session: Current chat session.
long_running_callback: Optional callback to delegate long-running tools
to the non-SDK background infrastructure (stream_registry + Redis).
"""
_current_user_id.set(user_id)
_current_session.set(session)
_pending_tool_outputs.set({})
_long_running_callback.set(long_running_callback)
def get_execution_context() -> tuple[str | None, ChatSession | None]:
@@ -142,9 +147,10 @@ def create_tool_handler(base_tool: BaseTool):
This wraps the existing BaseTool._execute method to be compatible
with the Claude Agent SDK MCP tool format.
Long-running tools (``is_long_running=True``) are spawned as background
tasks and return immediately with an ``operation_id``. The SDK should
then poll ``check_operation`` to retrieve the result.
Long-running tools (``is_long_running=True``) are delegated to the
non-SDK background infrastructure via a callback set in the execution
context. The callback persists the operation in Redis (stream_registry)
so results survive page refreshes and pod restarts.
"""
async def tool_handler(args: dict[str, Any]) -> dict[str, Any]:
@@ -154,52 +160,23 @@ def create_tool_handler(base_tool: BaseTool):
if session is None:
return _mcp_error("No session context available")
# --- Long-running: fire-and-forget, return operation_id ---
# --- Long-running: delegate to non-SDK background infrastructure ---
if base_tool.is_long_running:
op_id = f"op-{uuid.uuid4().hex[:12]}"
async def _bg_run() -> None:
callback = _long_running_callback.get(None)
if callback:
try:
result = await _execute_tool_sync(base_tool, user_id, session, args)
op = _background_ops.get(op_id)
if op:
op.result = result
op.done = True
except Exception as exc:
op = _background_ops.get(op_id)
if op:
op.result = _mcp_error(str(exc))
op.done = True
return await callback(base_tool.name, args, session)
except Exception as e:
logger.error(
f"Background tool {base_tool.name} failed: {exc}",
f"Long-running callback failed for {base_tool.name}: {e}",
exc_info=True,
)
task = asyncio.create_task(_bg_run())
_background_ops[op_id] = _BackgroundOp(tool_name=base_tool.name, task=task)
logger.info(
f"[SDK] Long-running tool {base_tool.name} started "
f"(operation_id={op_id})"
return _mcp_error(f"Failed to start {base_tool.name}: {e}")
# No callback — fall through to synchronous execution
logger.warning(
f"[SDK] No long-running callback for {base_tool.name}, "
f"executing synchronously (may block)"
)
return {
"content": [
{
"type": "text",
"text": json.dumps(
{
"status": "started",
"operation_id": op_id,
"message": (
f"{base_tool.name} is running in the background. "
f"Call check_operation with "
f"operation_id='{op_id}' to get the result."
),
}
),
}
],
"isError": False,
}
# --- Normal (fast) tool: execute synchronously ---
try:
@@ -255,58 +232,6 @@ async def _read_file_handler(args: dict[str, Any]) -> dict[str, Any]:
}
async def _check_operation_handler(args: dict[str, Any]) -> dict[str, Any]:
"""Check the status of a background long-running operation."""
op_id = args.get("operation_id", "")
if not op_id or op_id not in _background_ops:
return _mcp_error(f"Operation '{op_id}' not found.")
op = _background_ops[op_id]
if not op.done:
return {
"content": [
{
"type": "text",
"text": json.dumps(
{
"status": "in_progress",
"operation_id": op_id,
"tool_name": op.tool_name,
"message": (
f"{op.tool_name} is still running. "
"Check again in a few seconds."
),
}
),
}
],
"isError": False,
}
# Done — return result and clean up
result = op.result or _mcp_error("Operation completed but no result available.")
del _background_ops[op_id]
logger.info(f"[SDK] Background operation {op_id} ({op.tool_name}) collected")
return result
_CHECK_OP_DESCRIPTION = (
"Check the status of a background operation started by a long-running tool "
"(like create_agent). Returns the result when done, or 'in_progress' if still "
"running. Call this periodically (every few seconds) after starting an operation."
)
_CHECK_OP_SCHEMA = {
"type": "object",
"properties": {
"operation_id": {
"type": "string",
"description": "The operation_id returned by the long-running tool.",
},
},
"required": ["operation_id"],
}
_READ_TOOL_NAME = "Read"
_READ_TOOL_DESCRIPTION = (
"Read a file from the local filesystem. "
@@ -365,14 +290,6 @@ def create_copilot_mcp_server():
)(_read_file_handler)
sdk_tools.append(read_tool)
# Add the check_operation tool for polling background operations
check_op_tool = tool(
_CHECK_OP_TOOL_NAME,
_CHECK_OP_DESCRIPTION,
_CHECK_OP_SCHEMA,
)(_check_operation_handler)
sdk_tools.append(check_op_tool)
server = create_sdk_mcp_server(
name=MCP_SERVER_NAME,
version="1.0.0",
@@ -399,6 +316,5 @@ _SDK_BUILTIN_TOOLS = ["Read", "Write", "Edit", "Glob", "Grep", "Task"]
COPILOT_TOOL_NAMES = [
*[f"{MCP_TOOL_PREFIX}{name}" for name in TOOL_REGISTRY.keys()],
f"{MCP_TOOL_PREFIX}{_READ_TOOL_NAME}",
f"{MCP_TOOL_PREFIX}{_CHECK_OP_TOOL_NAME}",
*_SDK_BUILTIN_TOOLS,
]

View File

@@ -17,7 +17,6 @@ from .find_agent import FindAgentTool
from .find_block import FindBlockTool
from .find_library_agent import FindLibraryAgentTool
from .get_doc_page import GetDocPageTool
from .python_exec import PythonExecTool
from .run_agent import RunAgentTool
from .run_block import RunBlockTool
from .search_docs import SearchDocsTool
@@ -50,8 +49,7 @@ TOOL_REGISTRY: dict[str, BaseTool] = {
"get_doc_page": GetDocPageTool(),
# Web fetch for safe URL retrieval
"web_fetch": WebFetchTool(),
# Sandboxed code execution (network-isolated)
"python_exec": PythonExecTool(),
# Sandboxed code execution (bubblewrap)
"bash_exec": BashExecTool(),
# Workspace tools for CoPilot file operations
"list_workspace_files": ListWorkspaceFilesTool(),

View File

@@ -1,12 +1,14 @@
"""Bash execution tool — run shell commands in a network-isolated sandbox.
"""Bash execution tool — run shell commands in a bubblewrap sandbox.
Full Bash scripting is allowed (loops, conditionals, pipes, functions, etc.).
Safety comes from kernel-level network isolation and workspace confinement,
not from restricting language features.
Safety comes from OS-level isolation (bubblewrap): only system dirs visible
read-only, writable workspace only, clean env, no network.
Requires bubblewrap (``bwrap``) — the tool is disabled when bwrap is not
available (e.g. macOS development).
"""
import logging
import re
from typing import Any
from backend.api.features.chat.model import ChatSession
@@ -18,46 +20,15 @@ from backend.api.features.chat.tools.models import (
)
from backend.api.features.chat.tools.sandbox import (
get_workspace_dir,
has_network_sandbox,
has_full_sandbox,
run_sandboxed,
)
logger = logging.getLogger(__name__)
# Destructive patterns blocked regardless of network sandbox
_BLOCKED_PATTERNS: list[tuple[str, str]] = [
(r"rm\s+-[a-zA-Z]*r[a-zA-Z]*\s+/(?!\w)", "Recursive removal of root paths"),
(r"dd\s+.*of=/dev/", "Direct disk writes"),
(r"mkfs\b", "Filesystem formatting"),
(r":\(\)\s*\{", "Fork bomb"),
(r"\bshutdown\b|\breboot\b|\bhalt\b|\bpoweroff\b", "System power commands"),
(r"/dev/sd[a-z]|/dev/nvme|/dev/hd[a-z]", "Raw disk device access"),
]
# Commands blocked when kernel network sandbox is NOT available (fallback)
_NETWORK_COMMANDS = {
"curl",
"wget",
"ssh",
"scp",
"sftp",
"rsync",
"nc",
"ncat",
"netcat",
"telnet",
"ftp",
"ping",
"traceroute",
"nslookup",
"dig",
"host",
"nmap",
}
class BashExecTool(BaseTool):
"""Execute Bash commands in a sandboxed environment."""
"""Execute Bash commands in a bubblewrap sandbox."""
@property
def name(self) -> str:
@@ -65,14 +36,21 @@ class BashExecTool(BaseTool):
@property
def description(self) -> str:
if not has_full_sandbox():
return (
"Bash execution is DISABLED — bubblewrap sandbox is not "
"available on this platform. Do not call this tool."
)
return (
"Execute a Bash command or script in a sandboxed environment. "
"Full Bash scripting is supported (loops, conditionals, pipes, functions, etc.). "
"SECURITY: All internet/network access is blocked at the kernel level "
"(no curl, wget, nc, or any outbound connections). "
"Execute a Bash command or script in a bubblewrap sandbox. "
"Full Bash scripting is supported (loops, conditionals, pipes, "
"functions, etc.). "
"SECURITY: Only system directories (/usr, /bin, /lib, /etc) are "
"visible read-only, the per-session workspace is the only writable "
"path, environment variables are wiped (no secrets), and all "
"network access is blocked at the kernel level. Application code, "
"configs, and other directories are NOT accessible. "
"To fetch web content, use the web_fetch tool instead. "
"Commands run in an isolated per-session workspace directory — "
"they cannot access files outside that directory. "
"Execution is killed after the timeout (default 30s, max 120s). "
"Returns stdout and stderr. "
"Useful for file manipulation, data processing with Unix tools "
@@ -109,9 +87,17 @@ class BashExecTool(BaseTool):
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
session_id = session.session_id if session else None
if not has_full_sandbox():
return ErrorResponse(
message="bash_exec requires bubblewrap sandbox (Linux only).",
error="sandbox_unavailable",
session_id=session_id,
)
command: str = (kwargs.get("command") or "").strip()
timeout: int = kwargs.get("timeout", 30)
session_id = session.session_id if session else None
if not command:
return ErrorResponse(
@@ -120,29 +106,6 @@ class BashExecTool(BaseTool):
session_id=session_id,
)
# Block destructive patterns
for pattern, reason in _BLOCKED_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return ErrorResponse(
message=f"Command blocked: {reason}",
error="blocked_command",
session_id=session_id,
)
# When kernel network sandbox unavailable, block network commands
if not has_network_sandbox():
words = set(re.findall(r"\b\w+\b", command))
blocked = words & _NETWORK_COMMANDS
if blocked:
return ErrorResponse(
message=(
f"Network commands not available: {', '.join(sorted(blocked))}. "
"Use web_fetch instead."
),
error="network_blocked",
session_id=session_id,
)
workspace = get_workspace_dir(session_id or "default")
stdout, stderr, exit_code, timed_out = await run_sandboxed(

View File

@@ -43,7 +43,6 @@ class ResponseType(str, Enum):
# Web fetch
WEB_FETCH = "web_fetch"
# Code execution
PYTHON_EXEC = "python_exec"
BASH_EXEC = "bash_exec"
@@ -445,16 +444,6 @@ class WebFetchResponse(ToolResponseBase):
truncated: bool = False
class PythonExecResponse(ToolResponseBase):
"""Response for python_exec tool."""
type: ResponseType = ResponseType.PYTHON_EXEC
stdout: str
stderr: str
exit_code: int
timed_out: bool = False
class BashExecResponse(ToolResponseBase):
"""Response for bash_exec tool."""

View File

@@ -1,162 +0,0 @@
"""Python execution tool — run Python code in a network-isolated sandbox."""
import logging
import os
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool
from backend.api.features.chat.tools.models import (
ErrorResponse,
PythonExecResponse,
ToolResponseBase,
)
from backend.api.features.chat.tools.sandbox import (
get_workspace_dir,
has_network_sandbox,
run_sandboxed,
)
logger = logging.getLogger(__name__)
# Modules blocked via import hook when kernel network sandbox is unavailable
_BLOCKED_MODULES = {
"socket",
"ssl",
"http",
"urllib",
"requests",
"httpx",
"aiohttp",
"ftplib",
"smtplib",
"poplib",
"imaplib",
"telnetlib",
"xmlrpc",
"subprocess",
"ctypes",
"multiprocessing",
}
# Security prelude injected before user code (only when unshare unavailable)
_SECURITY_PRELUDE = """\
import builtins as _b
_BLOCKED = {blocked}
_orig = _b.__import__
def _si(name, *a, **k):
if name.split(".")[0] in _BLOCKED:
raise ImportError(f"Module '{{name}}' is not available in the sandbox")
return _orig(name, *a, **k)
_b.__import__ = _si
import os as _os
_os.system = lambda *a, **k: (_ for _ in ()).throw(
PermissionError("os.system is blocked")
)
_os.popen = lambda *a, **k: (_ for _ in ()).throw(
PermissionError("os.popen is blocked")
)
del _b, _BLOCKED, _orig, _si, _os
"""
class PythonExecTool(BaseTool):
"""Execute Python code in a sandboxed environment."""
@property
def name(self) -> str:
return "python_exec"
@property
def description(self) -> str:
return (
"Execute Python code in a sandboxed environment. "
"SECURITY: All internet/network access is blocked at the kernel level "
"(no HTTP, sockets, DNS, or any outbound connections). "
"To fetch web content, use the web_fetch tool instead. "
"Code runs in an isolated per-session workspace directory — "
"it cannot read or write files outside that directory. "
"Execution is killed after the timeout (default 30s, max 120s). "
"Returns stdout and stderr. "
"Useful for data processing, calculations, text manipulation, "
"JSON/CSV parsing, and generating files in the workspace."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Python code to execute.",
},
"timeout": {
"type": "integer",
"description": (
"Max execution time in seconds (default 30, max 120)."
),
"default": 30,
},
},
"required": ["code"],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
code: str = (kwargs.get("code") or "").strip()
timeout: int = kwargs.get("timeout", 30)
session_id = session.session_id if session else None
if not code:
return ErrorResponse(
message="No code provided.",
error="empty_code",
session_id=session_id,
)
workspace = get_workspace_dir(session_id or "default")
# Add security prelude when kernel network isolation is unavailable
if not has_network_sandbox():
prelude = _SECURITY_PRELUDE.format(blocked=repr(_BLOCKED_MODULES))
full_code = prelude + "\n" + code
else:
full_code = code
script_path = os.path.join(workspace, "_exec.py")
try:
with open(script_path, "w") as f:
f.write(full_code)
stdout, stderr, exit_code, timed_out = await run_sandboxed(
command=["python3", "-I", "-u", script_path],
cwd=workspace,
timeout=timeout,
)
return PythonExecResponse(
message=(
"Execution timed out"
if timed_out
else f"Code executed (exit {exit_code})"
),
stdout=stdout,
stderr=stderr,
exit_code=exit_code,
timed_out=timed_out,
session_id=session_id,
)
finally:
try:
os.unlink(script_path)
except OSError:
pass

View File

@@ -1,7 +1,11 @@
"""Sandbox execution utilities for code execution tools.
Provides network-isolated command execution using Linux ``unshare --net``
(kernel-level, no bypass possible) with a fallback for development on macOS.
Provides filesystem + network isolated command execution using **bubblewrap**
(``bwrap``): whitelist-only filesystem (only system dirs visible read-only),
writable workspace only, clean environment, network blocked.
Tools that call :func:`run_sandboxed` must first check :func:`has_full_sandbox`
and refuse to run if bubblewrap is not available.
"""
import asyncio
@@ -18,23 +22,24 @@ _DEFAULT_TIMEOUT = 30
_MAX_TIMEOUT = 120
def _check_unshare() -> bool:
"""Check if ``unshare --net`` is available for kernel-level network isolation."""
if platform.system() != "Linux":
return False
return shutil.which("unshare") is not None
# ---------------------------------------------------------------------------
# Sandbox capability detection (cached at first call)
# ---------------------------------------------------------------------------
_BWRAP_AVAILABLE: bool | None = None
# Cached at import time so we don't shell out on every call
_UNSHARE_AVAILABLE: bool | None = None
def has_full_sandbox() -> bool:
"""Return True if bubblewrap is available (filesystem + network isolation).
def has_network_sandbox() -> bool:
"""Return True if kernel-level network isolation is available."""
global _UNSHARE_AVAILABLE
if _UNSHARE_AVAILABLE is None:
_UNSHARE_AVAILABLE = _check_unshare()
return _UNSHARE_AVAILABLE
On non-Linux platforms (macOS), always returns False.
"""
global _BWRAP_AVAILABLE
if _BWRAP_AVAILABLE is None:
_BWRAP_AVAILABLE = (
platform.system() == "Linux" and shutil.which("bwrap") is not None
)
return _BWRAP_AVAILABLE
WORKSPACE_PREFIX = "/tmp/copilot-"
@@ -70,30 +75,122 @@ def get_workspace_dir(session_id: str) -> str:
"""Get or create the workspace directory for a session.
Uses :func:`make_session_path` — the same path the SDK uses — so that
python_exec / bash_exec share the workspace with the SDK file tools.
bash_exec shares the workspace with the SDK file tools.
"""
workspace = make_session_path(session_id)
os.makedirs(workspace, exist_ok=True)
return workspace
# ---------------------------------------------------------------------------
# Bubblewrap command builder
# ---------------------------------------------------------------------------
# System directories mounted read-only inside the sandbox.
# ONLY these are visible — /app, /root, /home, /opt, /var etc. are NOT accessible.
_SYSTEM_RO_BINDS = [
"/usr", # binaries, libraries, Python interpreter
"/etc", # system config: ld.so, locale, passwd, alternatives
]
# Symlinks to /usr/* on modern Debian, may be real dirs on older systems.
_COMPAT_RO_BINDS = [
"/bin", # -> /usr/bin on Debian 13
"/sbin", # -> /usr/sbin on Debian 13
"/lib", # -> /usr/lib on Debian 13
"/lib64", # 64-bit libraries (may not exist)
]
def _build_bwrap_command(
command: list[str], cwd: str, env: dict[str, str]
) -> list[str]:
"""Build a bubblewrap command with strict filesystem + network isolation.
Security model:
- **Whitelist-only filesystem**: only system directories (``/usr``, ``/etc``,
``/bin``, ``/lib``) are mounted read-only. Application code (``/app``),
home directories, ``/var``, ``/opt``, etc. are NOT accessible at all.
- **Writable workspace only**: the per-session workspace is the sole
writable path.
- **Clean environment**: ``--clearenv`` wipes all inherited env vars.
Only the explicitly-passed safe env vars are set inside the sandbox.
- **Network isolation**: ``--unshare-net`` blocks all network access.
- **New session**: prevents terminal control escape.
- **Die with parent**: prevents orphaned sandbox processes.
"""
cmd = [
"bwrap",
# Wipe all inherited environment variables (API keys, secrets, etc.)
"--clearenv",
]
# Set only the safe env vars inside the sandbox
for key, value in env.items():
cmd.extend(["--setenv", key, value])
# System directories: read-only
for path in _SYSTEM_RO_BINDS:
cmd.extend(["--ro-bind", path, path])
# Compat paths: bind only if they exist on the host
for path in _COMPAT_RO_BINDS:
if os.path.exists(path):
cmd.extend(["--ro-bind", path, path])
cmd.extend(
[
# Writable workspace only
"--bind",
cwd,
cwd,
# Fresh virtual filesystems
"--dev",
"/dev",
"--proc",
"/proc",
"--tmpdir",
"/tmp",
# Isolation
"--unshare-net",
"--die-with-parent",
"--new-session",
"--chdir",
cwd,
"--",
*command,
]
)
return cmd
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def run_sandboxed(
command: list[str],
cwd: str,
timeout: int = _DEFAULT_TIMEOUT,
env: dict[str, str] | None = None,
) -> tuple[str, str, int, bool]:
"""Run a command in a sandboxed environment.
"""Run a command inside a bubblewrap sandbox.
Callers **must** check :func:`has_full_sandbox` before calling this
function. If bubblewrap is not available, this function raises
:class:`RuntimeError` rather than running unsandboxed.
Returns:
(stdout, stderr, exit_code, timed_out)
Security layers:
- Network isolation via ``unshare --net`` (Linux)
- Restricted working directory
- Minimal environment variables
- Hard timeout
"""
if not has_full_sandbox():
raise RuntimeError(
"run_sandboxed() requires bubblewrap but bwrap is not available. "
"Callers must check has_full_sandbox() before calling this function."
)
timeout = min(max(timeout, 1), _MAX_TIMEOUT)
safe_env = {
@@ -107,11 +204,7 @@ async def run_sandboxed(
if env:
safe_env.update(env)
# Wrap with unshare --net on Linux for kernel-level network isolation
if has_network_sandbox():
full_command = ["unshare", "--net", *command]
else:
full_command = command
full_command = _build_bwrap_command(command, cwd, safe_env)
try:
proc = await asyncio.create_subprocess_exec(
@@ -134,5 +227,7 @@ async def run_sandboxed(
await proc.communicate()
return "", f"Execution timed out after {timeout}s", -1, True
except RuntimeError:
raise
except Exception as e:
return "", f"Sandbox error: {e}", -1, False

View File

@@ -0,0 +1,133 @@
"""Tests for SDK security hooks — workspace paths, tool access, and deny messages.
These are pure unit tests with no external dependencies (no SDK, no DB, no server).
They validate that the security hooks correctly block unauthorized paths,
tool access, and dangerous input patterns.
Note: Bash command validation was removed — the SDK built-in Bash tool is not in
allowed_tools, and the bash_exec MCP tool has kernel-level network isolation
(unshare --net) making command-level parsing unnecessary.
"""
from backend.api.features.chat.sdk.security_hooks import (
_validate_tool_access,
_validate_workspace_path,
)
SDK_CWD = "/tmp/copilot-test-session"
def _is_denied(result: dict) -> bool:
hook = result.get("hookSpecificOutput", {})
return hook.get("permissionDecision") == "deny"
def _reason(result: dict) -> str:
return result.get("hookSpecificOutput", {}).get("permissionDecisionReason", "")
# ============================================================
# Workspace path validation (Read, Write, Edit, etc.)
# ============================================================
class TestWorkspacePathValidation:
def test_path_in_workspace(self):
result = _validate_workspace_path(
"Read", {"file_path": f"{SDK_CWD}/file.txt"}, SDK_CWD
)
assert not _is_denied(result)
def test_path_outside_workspace(self):
result = _validate_workspace_path("Read", {"file_path": "/etc/passwd"}, SDK_CWD)
assert _is_denied(result)
def test_tool_results_allowed(self):
result = _validate_workspace_path(
"Read",
{"file_path": "~/.claude/projects/abc/tool-results/out.txt"},
SDK_CWD,
)
assert not _is_denied(result)
def test_claude_settings_blocked(self):
result = _validate_workspace_path(
"Read", {"file_path": "~/.claude/settings.json"}, SDK_CWD
)
assert _is_denied(result)
def test_claude_projects_without_tool_results(self):
result = _validate_workspace_path(
"Read", {"file_path": "~/.claude/projects/abc/credentials.json"}, SDK_CWD
)
assert _is_denied(result)
def test_no_path_allowed(self):
"""Glob/Grep without path defaults to cwd — should be allowed."""
result = _validate_workspace_path("Grep", {"pattern": "foo"}, SDK_CWD)
assert not _is_denied(result)
def test_path_traversal_with_dotdot(self):
result = _validate_workspace_path(
"Read", {"file_path": f"{SDK_CWD}/../../../etc/passwd"}, SDK_CWD
)
assert _is_denied(result)
# ============================================================
# Tool access validation
# ============================================================
class TestToolAccessValidation:
def test_blocked_tools(self):
for tool in ("bash", "shell", "exec", "terminal", "command"):
result = _validate_tool_access(tool, {})
assert _is_denied(result), f"Tool '{tool}' should be blocked"
def test_bash_builtin_blocked(self):
"""SDK built-in Bash (capital) is blocked as defence-in-depth."""
result = _validate_tool_access("Bash", {"command": "echo hello"}, SDK_CWD)
assert _is_denied(result)
assert "Bash" in _reason(result)
def test_workspace_tools_delegate(self):
result = _validate_tool_access(
"Read", {"file_path": f"{SDK_CWD}/file.txt"}, SDK_CWD
)
assert not _is_denied(result)
def test_dangerous_pattern_blocked(self):
result = _validate_tool_access("SomeUnknownTool", {"data": "sudo rm -rf /"})
assert _is_denied(result)
def test_safe_unknown_tool_allowed(self):
result = _validate_tool_access("SomeSafeTool", {"data": "hello world"})
assert not _is_denied(result)
# ============================================================
# Deny message quality (ntindle feedback)
# ============================================================
class TestDenyMessageClarity:
"""Deny messages must include [SECURITY] and 'cannot be bypassed'
so the model knows the restriction is enforced, not a suggestion."""
def test_blocked_tool_message(self):
reason = _reason(_validate_tool_access("bash", {}))
assert "[SECURITY]" in reason
assert "cannot be bypassed" in reason
def test_bash_builtin_blocked_message(self):
reason = _reason(_validate_tool_access("Bash", {"command": "echo hello"}))
assert "[SECURITY]" in reason
assert "cannot be bypassed" in reason
def test_workspace_path_message(self):
reason = _reason(
_validate_workspace_path("Read", {"file_path": "/etc/passwd"}, SDK_CWD)
)
assert "[SECURITY]" in reason
assert "cannot be bypassed" in reason