feat(chat/tools): add sandboxed python_exec, bash_exec, web_fetch tools and enable Task

- Add sandbox.py with network-isolated execution via unshare --net (Linux)
  and import/command blocklist fallback (macOS dev)
- Add python_exec tool: runs Python in subprocess with no network, workspace-scoped
- Add bash_exec tool: full Bash scripting with no network, workspace-scoped
- Add web_fetch tool: SSRF-protected URL fetching via backend Requests utility
- Remove SDK built-in Bash from allowlist (replaced by sandboxed bash_exec)
- Enable SDK built-in Task (sub-agents) with per-session rate limit (default 3)
- Add claude_agent_max_subtasks config field
This commit is contained in:
Zamil Majdy
2026-02-12 19:06:37 +04:00
parent fd28c386f4
commit f31cb49557
10 changed files with 501 additions and 6 deletions

View File

@@ -107,6 +107,10 @@ class ChatConfig(BaseSettings):
description="Max buffer size in bytes for Claude Agent SDK JSON message parsing. "
"Increase if tool outputs exceed the limit.",
)
claude_agent_max_subtasks: int = Field(
default=3,
description="Max number of sub-agent Tasks the SDK can spawn per session.",
)
# Extended thinking configuration for Claude models
thinking_enabled: bool = Field(

View File

@@ -286,7 +286,9 @@ def _validate_user_isolation(
def create_security_hooks(
user_id: str | None, sdk_cwd: str | None = None
user_id: str | None,
sdk_cwd: str | None = None,
max_subtasks: int = 3,
) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
@@ -299,6 +301,7 @@ def create_security_hooks(
Args:
user_id: Current user ID for isolation validation
sdk_cwd: SDK working directory for workspace-scoped tool validation
max_subtasks: Maximum Task (sub-agent) spawns allowed per session
Returns:
Hooks configuration dict for ClaudeAgentOptions
@@ -307,16 +310,35 @@ def create_security_hooks(
from claude_agent_sdk import HookMatcher
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
# Per-session counter for Task sub-agent spawns
task_spawn_count = 0
async def pre_tool_use_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Combined pre-tool-use validation hook."""
nonlocal task_spawn_count
_ = context # unused but required by signature
tool_name = cast(str, input_data.get("tool_name", ""))
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
# Rate-limit Task (sub-agent) spawns per session
if tool_name == "Task":
task_spawn_count += 1
if task_spawn_count > max_subtasks:
logger.warning(
f"[SDK] Task limit reached ({max_subtasks}), user={user_id}"
)
return cast(
SyncHookJSONOutput,
_deny(
f"Maximum {max_subtasks} sub-tasks per session. "
"Please continue in the main conversation."
),
)
# Strip MCP prefix for consistent validation
is_copilot_tool = tool_name.startswith(MCP_TOOL_PREFIX)
clean_name = tool_name.removeprefix(MCP_TOOL_PREFIX)

View File

@@ -374,7 +374,11 @@ async def stream_chat_completion_sdk(
tracer = TracedSession(session_id, user_id, system_prompt)
# Merge security hooks with optional tracing hooks
security_hooks = create_security_hooks(user_id, sdk_cwd=sdk_cwd)
security_hooks = create_security_hooks(
user_id,
sdk_cwd=sdk_cwd,
max_subtasks=config.claude_agent_max_subtasks,
)
tracing_hooks = create_tracing_hooks(tracer)
combined_hooks = merge_hooks(security_hooks, tracing_hooks)

View File

@@ -307,9 +307,11 @@ def create_copilot_mcp_server():
# SDK built-in tools allowed within the workspace directory.
# Security hooks validate that file paths stay within sdk_cwd
# and that Bash commands are restricted to a safe allowlist.
_SDK_BUILTIN_TOOLS = ["Read", "Write", "Edit", "Glob", "Grep", "Bash"]
# Security hooks validate that file paths stay within sdk_cwd.
# Bash is NOT included — use the sandboxed MCP bash_exec tool instead,
# which provides kernel-level network isolation via unshare --net.
# Task allows spawning sub-agents (rate-limited by security hooks).
_SDK_BUILTIN_TOOLS = ["Read", "Write", "Edit", "Glob", "Grep", "Task"]
# List of tool names for allowed_tools configuration
# Include MCP tools, the MCP Read tool for oversized results,

View File

@@ -9,6 +9,7 @@ from backend.api.features.chat.tracking import track_tool_called
from .add_understanding import AddUnderstandingTool
from .agent_output import AgentOutputTool
from .base import BaseTool
from .bash_exec import BashExecTool
from .create_agent import CreateAgentTool
from .customize_agent import CustomizeAgentTool
from .edit_agent import EditAgentTool
@@ -16,6 +17,7 @@ from .find_agent import FindAgentTool
from .find_block import FindBlockTool
from .find_library_agent import FindLibraryAgentTool
from .get_doc_page import GetDocPageTool
from .python_exec import PythonExecTool
from .run_agent import RunAgentTool
from .run_block import RunBlockTool
from .search_docs import SearchDocsTool
@@ -48,6 +50,9 @@ TOOL_REGISTRY: dict[str, BaseTool] = {
"get_doc_page": GetDocPageTool(),
# Web fetch for safe URL retrieval
"web_fetch": WebFetchTool(),
# Sandboxed code execution (network-isolated)
"python_exec": PythonExecTool(),
"bash_exec": BashExecTool(),
# Workspace tools for CoPilot file operations
"list_workspace_files": ListWorkspaceFilesTool(),
"read_workspace_file": ReadWorkspaceFileTool(),

View File

@@ -0,0 +1,165 @@
"""Bash execution tool — run shell commands in a network-isolated sandbox.
Full Bash scripting is allowed (loops, conditionals, pipes, functions, etc.).
Safety comes from kernel-level network isolation and workspace confinement,
not from restricting language features.
"""
import logging
import re
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool
from backend.api.features.chat.tools.models import (
BashExecResponse,
ErrorResponse,
ToolResponseBase,
)
from backend.api.features.chat.tools.sandbox import (
get_workspace_dir,
has_network_sandbox,
run_sandboxed,
)
logger = logging.getLogger(__name__)
# Destructive patterns blocked regardless of network sandbox
_BLOCKED_PATTERNS: list[tuple[str, str]] = [
(r"rm\s+-[a-zA-Z]*r[a-zA-Z]*\s+/(?!\w)", "Recursive removal of root paths"),
(r"dd\s+.*of=/dev/", "Direct disk writes"),
(r"mkfs\b", "Filesystem formatting"),
(r":\(\)\s*\{", "Fork bomb"),
(r"\bshutdown\b|\breboot\b|\bhalt\b|\bpoweroff\b", "System power commands"),
(r"/dev/sd[a-z]|/dev/nvme|/dev/hd[a-z]", "Raw disk device access"),
]
# Commands blocked when kernel network sandbox is NOT available (fallback)
_NETWORK_COMMANDS = {
"curl",
"wget",
"ssh",
"scp",
"sftp",
"rsync",
"nc",
"ncat",
"netcat",
"telnet",
"ftp",
"ping",
"traceroute",
"nslookup",
"dig",
"host",
"nmap",
}
class BashExecTool(BaseTool):
"""Execute Bash commands in a sandboxed environment."""
@property
def name(self) -> str:
return "bash_exec"
@property
def description(self) -> str:
return (
"Execute a Bash command or script in a sandboxed environment. "
"Full Bash scripting is supported (loops, conditionals, pipes, functions, etc.). "
"SECURITY: All internet/network access is blocked at the kernel level "
"(no curl, wget, nc, or any outbound connections). "
"To fetch web content, use the web_fetch tool instead. "
"Commands run in an isolated per-session workspace directory — "
"they cannot access files outside that directory. "
"Execution is killed after the timeout (default 30s, max 120s). "
"Returns stdout and stderr. "
"Useful for file manipulation, data processing with Unix tools "
"(grep, awk, sed, jq, etc.), and running shell scripts."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Bash command or script to execute.",
},
"timeout": {
"type": "integer",
"description": (
"Max execution time in seconds (default 30, max 120)."
),
"default": 30,
},
},
"required": ["command"],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
command: str = (kwargs.get("command") or "").strip()
timeout: int = kwargs.get("timeout", 30)
session_id = session.session_id if session else None
if not command:
return ErrorResponse(
message="No command provided.",
error="empty_command",
session_id=session_id,
)
# Block destructive patterns
for pattern, reason in _BLOCKED_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return ErrorResponse(
message=f"Command blocked: {reason}",
error="blocked_command",
session_id=session_id,
)
# When kernel network sandbox unavailable, block network commands
if not has_network_sandbox():
words = set(re.findall(r"\b\w+\b", command))
blocked = words & _NETWORK_COMMANDS
if blocked:
return ErrorResponse(
message=(
f"Network commands not available: {', '.join(sorted(blocked))}. "
"Use web_fetch instead."
),
error="network_blocked",
session_id=session_id,
)
workspace = get_workspace_dir(session_id or "default")
stdout, stderr, exit_code, timed_out = await run_sandboxed(
command=["bash", "-c", command],
cwd=workspace,
timeout=timeout,
)
return BashExecResponse(
message=(
"Execution timed out"
if timed_out
else f"Command executed (exit {exit_code})"
),
stdout=stdout,
stderr=stderr,
exit_code=exit_code,
timed_out=timed_out,
session_id=session_id,
)

View File

@@ -42,6 +42,9 @@ class ResponseType(str, Enum):
INPUT_VALIDATION_ERROR = "input_validation_error"
# Web fetch
WEB_FETCH = "web_fetch"
# Code execution
PYTHON_EXEC = "python_exec"
BASH_EXEC = "bash_exec"
# Base response model
@@ -440,3 +443,23 @@ class WebFetchResponse(ToolResponseBase):
content_type: str
content: str
truncated: bool = False
class PythonExecResponse(ToolResponseBase):
"""Response for python_exec tool."""
type: ResponseType = ResponseType.PYTHON_EXEC
stdout: str
stderr: str
exit_code: int
timed_out: bool = False
class BashExecResponse(ToolResponseBase):
"""Response for bash_exec tool."""
type: ResponseType = ResponseType.BASH_EXEC
stdout: str
stderr: str
exit_code: int
timed_out: bool = False

View File

@@ -0,0 +1,162 @@
"""Python execution tool — run Python code in a network-isolated sandbox."""
import logging
import os
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool
from backend.api.features.chat.tools.models import (
ErrorResponse,
PythonExecResponse,
ToolResponseBase,
)
from backend.api.features.chat.tools.sandbox import (
get_workspace_dir,
has_network_sandbox,
run_sandboxed,
)
logger = logging.getLogger(__name__)
# Modules blocked via import hook when kernel network sandbox is unavailable
_BLOCKED_MODULES = {
"socket",
"ssl",
"http",
"urllib",
"requests",
"httpx",
"aiohttp",
"ftplib",
"smtplib",
"poplib",
"imaplib",
"telnetlib",
"xmlrpc",
"subprocess",
"ctypes",
"multiprocessing",
}
# Security prelude injected before user code (only when unshare unavailable)
_SECURITY_PRELUDE = """\
import builtins as _b
_BLOCKED = {blocked}
_orig = _b.__import__
def _si(name, *a, **k):
if name.split(".")[0] in _BLOCKED:
raise ImportError(f"Module '{{name}}' is not available in the sandbox")
return _orig(name, *a, **k)
_b.__import__ = _si
import os as _os
_os.system = lambda *a, **k: (_ for _ in ()).throw(
PermissionError("os.system is blocked")
)
_os.popen = lambda *a, **k: (_ for _ in ()).throw(
PermissionError("os.popen is blocked")
)
del _b, _BLOCKED, _orig, _si, _os
"""
class PythonExecTool(BaseTool):
"""Execute Python code in a sandboxed environment."""
@property
def name(self) -> str:
return "python_exec"
@property
def description(self) -> str:
return (
"Execute Python code in a sandboxed environment. "
"SECURITY: All internet/network access is blocked at the kernel level "
"(no HTTP, sockets, DNS, or any outbound connections). "
"To fetch web content, use the web_fetch tool instead. "
"Code runs in an isolated per-session workspace directory — "
"it cannot read or write files outside that directory. "
"Execution is killed after the timeout (default 30s, max 120s). "
"Returns stdout and stderr. "
"Useful for data processing, calculations, text manipulation, "
"JSON/CSV parsing, and generating files in the workspace."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Python code to execute.",
},
"timeout": {
"type": "integer",
"description": (
"Max execution time in seconds (default 30, max 120)."
),
"default": 30,
},
},
"required": ["code"],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
code: str = (kwargs.get("code") or "").strip()
timeout: int = kwargs.get("timeout", 30)
session_id = session.session_id if session else None
if not code:
return ErrorResponse(
message="No code provided.",
error="empty_code",
session_id=session_id,
)
workspace = get_workspace_dir(session_id or "default")
# Add security prelude when kernel network isolation is unavailable
if not has_network_sandbox():
prelude = _SECURITY_PRELUDE.format(blocked=repr(_BLOCKED_MODULES))
full_code = prelude + "\n" + code
else:
full_code = code
script_path = os.path.join(workspace, "_exec.py")
try:
with open(script_path, "w") as f:
f.write(full_code)
stdout, stderr, exit_code, timed_out = await run_sandboxed(
command=["python3", "-I", "-u", script_path],
cwd=workspace,
timeout=timeout,
)
return PythonExecResponse(
message=(
"Execution timed out"
if timed_out
else f"Code executed (exit {exit_code})"
),
stdout=stdout,
stderr=stderr,
exit_code=exit_code,
timed_out=timed_out,
session_id=session_id,
)
finally:
try:
os.unlink(script_path)
except OSError:
pass

View File

@@ -0,0 +1,105 @@
"""Sandbox execution utilities for code execution tools.
Provides network-isolated command execution using Linux ``unshare --net``
(kernel-level, no bypass possible) with a fallback for development on macOS.
"""
import asyncio
import logging
import os
import platform
import shutil
logger = logging.getLogger(__name__)
# Output limits — prevent blowing up LLM context
_MAX_OUTPUT_CHARS = 50_000
_DEFAULT_TIMEOUT = 30
_MAX_TIMEOUT = 120
def _check_unshare() -> bool:
"""Check if ``unshare --net`` is available for kernel-level network isolation."""
if platform.system() != "Linux":
return False
return shutil.which("unshare") is not None
# Cached at import time so we don't shell out on every call
_UNSHARE_AVAILABLE: bool | None = None
def has_network_sandbox() -> bool:
"""Return True if kernel-level network isolation is available."""
global _UNSHARE_AVAILABLE
if _UNSHARE_AVAILABLE is None:
_UNSHARE_AVAILABLE = _check_unshare()
return _UNSHARE_AVAILABLE
def get_workspace_dir(session_id: str) -> str:
"""Get or create the workspace directory for a session."""
workspace = f"/tmp/copilot-{session_id}"
os.makedirs(workspace, exist_ok=True)
return workspace
async def run_sandboxed(
command: list[str],
cwd: str,
timeout: int = _DEFAULT_TIMEOUT,
env: dict[str, str] | None = None,
) -> tuple[str, str, int, bool]:
"""Run a command in a sandboxed environment.
Returns:
(stdout, stderr, exit_code, timed_out)
Security layers:
- Network isolation via ``unshare --net`` (Linux)
- Restricted working directory
- Minimal environment variables
- Hard timeout
"""
timeout = min(max(timeout, 1), _MAX_TIMEOUT)
safe_env = {
"PATH": "/usr/local/bin:/usr/bin:/bin",
"HOME": cwd,
"TMPDIR": cwd,
"LANG": "en_US.UTF-8",
"PYTHONDONTWRITEBYTECODE": "1",
"PYTHONIOENCODING": "utf-8",
}
if env:
safe_env.update(env)
# Wrap with unshare --net on Linux for kernel-level network isolation
if has_network_sandbox():
full_command = ["unshare", "--net", *command]
else:
full_command = command
try:
proc = await asyncio.create_subprocess_exec(
*full_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=safe_env,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
stdout = stdout_bytes.decode("utf-8", errors="replace")[:_MAX_OUTPUT_CHARS]
stderr = stderr_bytes.decode("utf-8", errors="replace")[:_MAX_OUTPUT_CHARS]
return stdout, stderr, proc.returncode or 0, False
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
return "", f"Execution timed out after {timeout}s", -1, True
except Exception as e:
return "", f"Sandbox error: {e}", -1, False

View File

@@ -10490,7 +10490,10 @@
"operation_started",
"operation_pending",
"operation_in_progress",
"input_validation_error"
"input_validation_error",
"web_fetch",
"python_exec",
"bash_exec"
],
"title": "ResponseType",
"description": "Types of tool responses."