mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
fix(backend): allow /tmp as valid path in E2B sandbox file tools (#12501)
## Summary
- Allow `/tmp` as a valid writable directory in E2B sandbox file tools
(`write_file`, `read_file`, `edit_file`, `glob`, `grep`)
- The E2B sandbox is already fully isolated, so restricting writes to
only `/home/user` was unnecessarily limiting — scripts and tools
commonly use `/tmp` for temporary files
- Extract `is_within_allowed_dirs()` helper in `context.py` to
centralize the allowed-directory check for both path resolution and
symlink escape detection
## Changes
- `context.py`: Add `E2B_ALLOWED_DIRS` tuple and `E2B_ALLOWED_DIRS_STR`,
introduce `is_within_allowed_dirs()`, update `resolve_sandbox_path()` to
use it
- `e2b_file_tools.py`: Update `_check_sandbox_symlink_escape()` to use
`is_within_allowed_dirs()`, update tool descriptions
- Tests: Add coverage for `/tmp` paths in both `context_test.py` and
`e2b_file_tools_test.py`
### Checklist 📋
#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] All 59 existing + new tests pass (`poetry run pytest
backend/copilot/context_test.py
backend/copilot/sdk/e2b_file_tools_test.py`)
- [x] `poetry run format` and `poetry run lint` pass clean
- [x] Verify `/tmp` write works in live E2B sandbox
- [x] E2E: Write file to /tmp/test.py in E2B sandbox via copilot
- [x] E2E: Execute script from /tmp — output "Hello, World!"
- [x] E2E: E2B sandbox lifecycle (create, use, pause) works correctly
This commit is contained in:
@@ -104,17 +104,32 @@ def get_sdk_cwd() -> str:
|
||||
|
||||
|
||||
E2B_WORKDIR = "/home/user"
|
||||
E2B_ALLOWED_DIRS: tuple[str, ...] = (E2B_WORKDIR, "/tmp")
|
||||
E2B_ALLOWED_DIRS_STR: str = " or ".join(E2B_ALLOWED_DIRS)
|
||||
|
||||
|
||||
def is_within_allowed_dirs(path: str) -> bool:
|
||||
"""Return True if *path* is within one of the allowed sandbox directories."""
|
||||
for allowed in E2B_ALLOWED_DIRS:
|
||||
if path == allowed or path.startswith(allowed + "/"):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def resolve_sandbox_path(path: str) -> str:
|
||||
"""Normalise *path* to an absolute sandbox path under ``/home/user``.
|
||||
"""Normalise *path* to an absolute sandbox path under an allowed directory.
|
||||
|
||||
Allowed directories: ``/home/user`` and ``/tmp``.
|
||||
Relative paths are resolved against ``/home/user``.
|
||||
|
||||
Raises :class:`ValueError` if the resolved path escapes the sandbox.
|
||||
"""
|
||||
candidate = path if os.path.isabs(path) else os.path.join(E2B_WORKDIR, path)
|
||||
normalized = os.path.normpath(candidate)
|
||||
if normalized != E2B_WORKDIR and not normalized.startswith(E2B_WORKDIR + "/"):
|
||||
raise ValueError(f"Path must be within {E2B_WORKDIR}: {path}")
|
||||
if not is_within_allowed_dirs(normalized):
|
||||
raise ValueError(
|
||||
f"Path must be within {E2B_ALLOWED_DIRS_STR}: {os.path.basename(path)}"
|
||||
)
|
||||
return normalized
|
||||
|
||||
|
||||
|
||||
@@ -198,10 +198,32 @@ def test_resolve_sandbox_path_normalizes_dots():
|
||||
|
||||
|
||||
def test_resolve_sandbox_path_escape_raises():
|
||||
with pytest.raises(ValueError, match="/home/user"):
|
||||
with pytest.raises(ValueError, match="must be within"):
|
||||
resolve_sandbox_path("/home/user/../../etc/passwd")
|
||||
|
||||
|
||||
def test_resolve_sandbox_path_absolute_outside_raises():
|
||||
with pytest.raises(ValueError, match="/home/user"):
|
||||
with pytest.raises(ValueError):
|
||||
resolve_sandbox_path("/etc/passwd")
|
||||
|
||||
|
||||
def test_resolve_sandbox_path_tmp_allowed():
|
||||
assert resolve_sandbox_path("/tmp/data.txt") == "/tmp/data.txt"
|
||||
|
||||
|
||||
def test_resolve_sandbox_path_tmp_nested():
|
||||
assert resolve_sandbox_path("/tmp/a/b/c.txt") == "/tmp/a/b/c.txt"
|
||||
|
||||
|
||||
def test_resolve_sandbox_path_tmp_itself():
|
||||
assert resolve_sandbox_path("/tmp") == "/tmp"
|
||||
|
||||
|
||||
def test_resolve_sandbox_path_tmp_escape_raises():
|
||||
with pytest.raises(ValueError):
|
||||
resolve_sandbox_path("/tmp/../etc/passwd")
|
||||
|
||||
|
||||
def test_resolve_sandbox_path_tmp_prefix_collision_raises():
|
||||
with pytest.raises(ValueError):
|
||||
resolve_sandbox_path("/tmp_evil/malicious.txt")
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
When E2B is active, these tools replace the SDK built-in Read/Write/Edit/
|
||||
Glob/Grep so that all file operations share the same ``/home/user``
|
||||
filesystem as ``bash_exec``.
|
||||
and ``/tmp`` filesystems as ``bash_exec``.
|
||||
|
||||
SDK-internal paths (``~/.claude/projects/…/tool-results/``) are handled
|
||||
by the separate ``Read`` MCP tool registered in ``tool_adapter.py``.
|
||||
@@ -16,10 +16,13 @@ import shlex
|
||||
from typing import Any, Callable
|
||||
|
||||
from backend.copilot.context import (
|
||||
E2B_ALLOWED_DIRS,
|
||||
E2B_ALLOWED_DIRS_STR,
|
||||
E2B_WORKDIR,
|
||||
get_current_sandbox,
|
||||
get_sdk_cwd,
|
||||
is_allowed_local_path,
|
||||
is_within_allowed_dirs,
|
||||
resolve_sandbox_path,
|
||||
)
|
||||
|
||||
@@ -36,7 +39,7 @@ async def _check_sandbox_symlink_escape(
|
||||
``readlink -f`` follows actual symlinks on the sandbox filesystem.
|
||||
|
||||
Returns the canonical parent path, or ``None`` if the path escapes
|
||||
``E2B_WORKDIR``.
|
||||
the allowed sandbox directories.
|
||||
|
||||
Note: There is an inherent TOCTOU window between this check and the
|
||||
subsequent ``sandbox.files.write()``. A symlink could theoretically be
|
||||
@@ -52,10 +55,7 @@ async def _check_sandbox_symlink_escape(
|
||||
if (
|
||||
canonical_res.exit_code != 0
|
||||
or not canonical_parent
|
||||
or (
|
||||
canonical_parent != E2B_WORKDIR
|
||||
and not canonical_parent.startswith(E2B_WORKDIR + "/")
|
||||
)
|
||||
or not is_within_allowed_dirs(canonical_parent)
|
||||
):
|
||||
return None
|
||||
return canonical_parent
|
||||
@@ -89,6 +89,38 @@ def _get_sandbox_and_path(
|
||||
return sandbox, remote
|
||||
|
||||
|
||||
async def _sandbox_write(sandbox: Any, path: str, content: str) -> None:
|
||||
"""Write *content* to *path* inside the sandbox.
|
||||
|
||||
The E2B filesystem API (``sandbox.files.write``) and the command API
|
||||
(``sandbox.commands.run``) run as **different users**. On ``/tmp``
|
||||
(which has the sticky bit set) this means ``sandbox.files.write`` can
|
||||
create new files but cannot overwrite files previously created by
|
||||
``sandbox.commands.run`` (or itself), because the sticky bit restricts
|
||||
deletion/rename to the file owner.
|
||||
|
||||
To work around this, writes targeting ``/tmp`` are performed via
|
||||
``tee`` through the command API, which runs as the sandbox ``user``
|
||||
and can therefore always overwrite user-owned files.
|
||||
"""
|
||||
if path == "/tmp" or path.startswith("/tmp/"):
|
||||
import base64 as _b64
|
||||
|
||||
encoded = _b64.b64encode(content.encode()).decode()
|
||||
result = await sandbox.commands.run(
|
||||
f"echo {shlex.quote(encoded)} | base64 -d > {shlex.quote(path)}",
|
||||
cwd=E2B_WORKDIR,
|
||||
timeout=10,
|
||||
)
|
||||
if result.exit_code != 0:
|
||||
raise RuntimeError(
|
||||
f"shell write failed (exit {result.exit_code}): "
|
||||
+ (result.stderr or "").strip()
|
||||
)
|
||||
else:
|
||||
await sandbox.files.write(path, content)
|
||||
|
||||
|
||||
# Tool handlers
|
||||
|
||||
|
||||
@@ -139,13 +171,16 @@ async def _handle_write_file(args: dict[str, Any]) -> dict[str, Any]:
|
||||
|
||||
try:
|
||||
parent = os.path.dirname(remote)
|
||||
if parent and parent != E2B_WORKDIR:
|
||||
if parent and parent not in E2B_ALLOWED_DIRS:
|
||||
await sandbox.files.make_dir(parent)
|
||||
canonical_parent = await _check_sandbox_symlink_escape(sandbox, parent)
|
||||
if canonical_parent is None:
|
||||
return _mcp(f"Path must be within {E2B_WORKDIR}: {parent}", error=True)
|
||||
return _mcp(
|
||||
f"Path must be within {E2B_ALLOWED_DIRS_STR}: {os.path.basename(parent)}",
|
||||
error=True,
|
||||
)
|
||||
remote = os.path.join(canonical_parent, os.path.basename(remote))
|
||||
await sandbox.files.write(remote, content)
|
||||
await _sandbox_write(sandbox, remote, content)
|
||||
except Exception as exc:
|
||||
return _mcp(f"Failed to write {remote}: {exc}", error=True)
|
||||
|
||||
@@ -172,7 +207,10 @@ async def _handle_edit_file(args: dict[str, Any]) -> dict[str, Any]:
|
||||
parent = os.path.dirname(remote)
|
||||
canonical_parent = await _check_sandbox_symlink_escape(sandbox, parent)
|
||||
if canonical_parent is None:
|
||||
return _mcp(f"Path must be within {E2B_WORKDIR}: {parent}", error=True)
|
||||
return _mcp(
|
||||
f"Path must be within {E2B_ALLOWED_DIRS_STR}: {os.path.basename(parent)}",
|
||||
error=True,
|
||||
)
|
||||
remote = os.path.join(canonical_parent, os.path.basename(remote))
|
||||
|
||||
try:
|
||||
@@ -197,7 +235,7 @@ async def _handle_edit_file(args: dict[str, Any]) -> dict[str, Any]:
|
||||
else content.replace(old_string, new_string, 1)
|
||||
)
|
||||
try:
|
||||
await sandbox.files.write(remote, updated)
|
||||
await _sandbox_write(sandbox, remote, updated)
|
||||
except Exception as exc:
|
||||
return _mcp(f"Failed to write {remote}: {exc}", error=True)
|
||||
|
||||
@@ -290,14 +328,14 @@ def _read_local(file_path: str, offset: int, limit: int) -> dict[str, Any]:
|
||||
E2B_FILE_TOOLS: list[tuple[str, str, dict[str, Any], Callable[..., Any]]] = [
|
||||
(
|
||||
"read_file",
|
||||
"Read a file from the cloud sandbox (/home/user). "
|
||||
"Read a file from the cloud sandbox (/home/user or /tmp). "
|
||||
"Use offset and limit for large files.",
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"file_path": {
|
||||
"type": "string",
|
||||
"description": "Path (relative to /home/user, or absolute).",
|
||||
"description": "Path (relative to /home/user, or absolute under /home/user or /tmp).",
|
||||
},
|
||||
"offset": {
|
||||
"type": "integer",
|
||||
@@ -314,7 +352,7 @@ E2B_FILE_TOOLS: list[tuple[str, str, dict[str, Any], Callable[..., Any]]] = [
|
||||
),
|
||||
(
|
||||
"write_file",
|
||||
"Write or create a file in the cloud sandbox (/home/user). "
|
||||
"Write or create a file in the cloud sandbox (/home/user or /tmp). "
|
||||
"Parent directories are created automatically. "
|
||||
"To copy a workspace file into the sandbox, use "
|
||||
"read_workspace_file with save_to_path instead.",
|
||||
@@ -323,7 +361,7 @@ E2B_FILE_TOOLS: list[tuple[str, str, dict[str, Any], Callable[..., Any]]] = [
|
||||
"properties": {
|
||||
"file_path": {
|
||||
"type": "string",
|
||||
"description": "Path (relative to /home/user, or absolute).",
|
||||
"description": "Path (relative to /home/user, or absolute under /home/user or /tmp).",
|
||||
},
|
||||
"content": {"type": "string", "description": "Content to write."},
|
||||
},
|
||||
@@ -340,7 +378,7 @@ E2B_FILE_TOOLS: list[tuple[str, str, dict[str, Any], Callable[..., Any]]] = [
|
||||
"properties": {
|
||||
"file_path": {
|
||||
"type": "string",
|
||||
"description": "Path (relative to /home/user, or absolute).",
|
||||
"description": "Path (relative to /home/user, or absolute under /home/user or /tmp).",
|
||||
},
|
||||
"old_string": {"type": "string", "description": "Text to find."},
|
||||
"new_string": {"type": "string", "description": "Replacement text."},
|
||||
|
||||
@@ -15,6 +15,7 @@ from backend.copilot.context import E2B_WORKDIR, SDK_PROJECTS_DIR, _current_proj
|
||||
from .e2b_file_tools import (
|
||||
_check_sandbox_symlink_escape,
|
||||
_read_local,
|
||||
_sandbox_write,
|
||||
resolve_sandbox_path,
|
||||
)
|
||||
|
||||
@@ -39,23 +40,23 @@ class TestResolveSandboxPath:
|
||||
assert resolve_sandbox_path("./README.md") == f"{E2B_WORKDIR}/README.md"
|
||||
|
||||
def test_traversal_blocked(self):
|
||||
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
|
||||
with pytest.raises(ValueError, match="must be within"):
|
||||
resolve_sandbox_path("../../etc/passwd")
|
||||
|
||||
def test_absolute_traversal_blocked(self):
|
||||
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
|
||||
with pytest.raises(ValueError, match="must be within"):
|
||||
resolve_sandbox_path(f"{E2B_WORKDIR}/../../etc/passwd")
|
||||
|
||||
def test_absolute_outside_sandbox_blocked(self):
|
||||
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
|
||||
with pytest.raises(ValueError, match="must be within"):
|
||||
resolve_sandbox_path("/etc/passwd")
|
||||
|
||||
def test_root_blocked(self):
|
||||
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
|
||||
with pytest.raises(ValueError, match="must be within"):
|
||||
resolve_sandbox_path("/")
|
||||
|
||||
def test_home_other_user_blocked(self):
|
||||
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
|
||||
with pytest.raises(ValueError, match="must be within"):
|
||||
resolve_sandbox_path("/home/other/file.txt")
|
||||
|
||||
def test_deep_nested_allowed(self):
|
||||
@@ -68,6 +69,24 @@ class TestResolveSandboxPath:
|
||||
"""Path that resolves back within E2B_WORKDIR is allowed."""
|
||||
assert resolve_sandbox_path("a/b/../c.txt") == f"{E2B_WORKDIR}/a/c.txt"
|
||||
|
||||
def test_tmp_absolute_allowed(self):
|
||||
assert resolve_sandbox_path("/tmp/data.txt") == "/tmp/data.txt"
|
||||
|
||||
def test_tmp_nested_allowed(self):
|
||||
assert resolve_sandbox_path("/tmp/a/b/c.txt") == "/tmp/a/b/c.txt"
|
||||
|
||||
def test_tmp_itself_allowed(self):
|
||||
assert resolve_sandbox_path("/tmp") == "/tmp"
|
||||
|
||||
def test_tmp_escape_blocked(self):
|
||||
with pytest.raises(ValueError, match="must be within"):
|
||||
resolve_sandbox_path("/tmp/../etc/passwd")
|
||||
|
||||
def test_tmp_prefix_collision_blocked(self):
|
||||
"""A path like /tmp_evil should be blocked (not a prefix match)."""
|
||||
with pytest.raises(ValueError, match="must be within"):
|
||||
resolve_sandbox_path("/tmp_evil/malicious.txt")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _read_local — host filesystem reads with allowlist enforcement
|
||||
@@ -227,3 +246,92 @@ class TestCheckSandboxSymlinkEscape:
|
||||
sandbox = _make_sandbox(stdout=f"{E2B_WORKDIR}/a/b/c/d\n", exit_code=0)
|
||||
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/a/b/c/d")
|
||||
assert result == f"{E2B_WORKDIR}/a/b/c/d"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tmp_path_allowed(self):
|
||||
"""Paths resolving to /tmp are allowed."""
|
||||
sandbox = _make_sandbox(stdout="/tmp/workdir\n", exit_code=0)
|
||||
result = await _check_sandbox_symlink_escape(sandbox, "/tmp/workdir")
|
||||
assert result == "/tmp/workdir"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tmp_itself_allowed(self):
|
||||
"""The /tmp directory itself is allowed."""
|
||||
sandbox = _make_sandbox(stdout="/tmp\n", exit_code=0)
|
||||
result = await _check_sandbox_symlink_escape(sandbox, "/tmp")
|
||||
assert result == "/tmp"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _sandbox_write — routing writes through shell for /tmp paths
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSandboxWrite:
|
||||
@pytest.mark.asyncio
|
||||
async def test_tmp_path_uses_shell_command(self):
|
||||
"""Writes to /tmp should use commands.run (shell) instead of files.write."""
|
||||
run_result = SimpleNamespace(stdout="", stderr="", exit_code=0)
|
||||
commands = SimpleNamespace(run=AsyncMock(return_value=run_result))
|
||||
files = SimpleNamespace(write=AsyncMock())
|
||||
sandbox = SimpleNamespace(commands=commands, files=files)
|
||||
|
||||
await _sandbox_write(sandbox, "/tmp/test.py", "print('hello')")
|
||||
|
||||
commands.run.assert_called_once()
|
||||
files.write.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_home_user_path_uses_files_api(self):
|
||||
"""Writes to /home/user should use sandbox.files.write."""
|
||||
run_result = SimpleNamespace(stdout="", stderr="", exit_code=0)
|
||||
commands = SimpleNamespace(run=AsyncMock(return_value=run_result))
|
||||
files = SimpleNamespace(write=AsyncMock())
|
||||
sandbox = SimpleNamespace(commands=commands, files=files)
|
||||
|
||||
await _sandbox_write(sandbox, "/home/user/test.py", "print('hello')")
|
||||
|
||||
files.write.assert_called_once_with("/home/user/test.py", "print('hello')")
|
||||
commands.run.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tmp_nested_path_uses_shell_command(self):
|
||||
"""Writes to nested /tmp paths should use commands.run."""
|
||||
run_result = SimpleNamespace(stdout="", stderr="", exit_code=0)
|
||||
commands = SimpleNamespace(run=AsyncMock(return_value=run_result))
|
||||
files = SimpleNamespace(write=AsyncMock())
|
||||
sandbox = SimpleNamespace(commands=commands, files=files)
|
||||
|
||||
await _sandbox_write(sandbox, "/tmp/subdir/file.txt", "content")
|
||||
|
||||
commands.run.assert_called_once()
|
||||
files.write.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tmp_write_shell_failure_raises(self):
|
||||
"""Shell write failure should raise RuntimeError."""
|
||||
run_result = SimpleNamespace(stdout="", stderr="No space left", exit_code=1)
|
||||
commands = SimpleNamespace(run=AsyncMock(return_value=run_result))
|
||||
sandbox = SimpleNamespace(commands=commands)
|
||||
|
||||
with pytest.raises(RuntimeError, match="shell write failed"):
|
||||
await _sandbox_write(sandbox, "/tmp/test.txt", "content")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tmp_write_preserves_content_with_special_chars(self):
|
||||
"""Content with special shell characters should be preserved via base64."""
|
||||
import base64
|
||||
|
||||
run_result = SimpleNamespace(stdout="", stderr="", exit_code=0)
|
||||
commands = SimpleNamespace(run=AsyncMock(return_value=run_result))
|
||||
sandbox = SimpleNamespace(commands=commands)
|
||||
|
||||
content = "print(\"Hello $USER\")\n# a `backtick` and 'quotes'\n"
|
||||
await _sandbox_write(sandbox, "/tmp/special.py", content)
|
||||
|
||||
# Verify the command contains base64-encoded content
|
||||
call_args = commands.run.call_args[0][0]
|
||||
# Extract the base64 string from the command
|
||||
encoded_in_cmd = call_args.split("echo ")[1].split(" |")[0].strip("'")
|
||||
decoded = base64.b64decode(encoded_in_cmd).decode()
|
||||
assert decoded == content
|
||||
|
||||
Reference in New Issue
Block a user