fix(copilot): address review items — top-level import, path sanitization, E2B_WORKDIR constant, st_mtime comment, no-fallback test

- Move `cleanup_stale_project_dirs` from deferred import inside `_cleanup_sdk_tool_results` to the top-level `from .transcript import (...)` block
- Sanitize `FileNotFoundError` message in `_read_local_tool_result` to use `os.path.basename(path)` instead of leaking the full path
- Replace hardcoded `/home/user` strings in `e2b_file_tools_test.py` with the `E2B_WORKDIR` constant
- Add `st_mtime` write-once invariant comment to `cleanup_stale_project_dirs` explaining why mtime reliably signals session activity
- Add test asserting the local-disk fallback is NOT invoked when `_resolve_file` succeeds
This commit is contained in:
Zamil Majdy
2026-03-16 06:15:59 +07:00
parent 49e031b54d
commit 24cbe738ff
5 changed files with 93 additions and 37 deletions

View File

@@ -10,7 +10,7 @@ from unittest.mock import AsyncMock
import pytest
from backend.copilot.context import SDK_PROJECTS_DIR, _current_project_dir
from backend.copilot.context import E2B_WORKDIR, SDK_PROJECTS_DIR, _current_project_dir
from .e2b_file_tools import (
_check_sandbox_symlink_escape,
@@ -25,46 +25,48 @@ from .e2b_file_tools import (
class TestResolveSandboxPath:
def test_relative_path_resolved(self):
assert resolve_sandbox_path("src/main.py") == "/home/user/src/main.py"
assert resolve_sandbox_path("src/main.py") == f"{E2B_WORKDIR}/src/main.py"
def test_absolute_within_sandbox(self):
assert resolve_sandbox_path("/home/user/file.txt") == "/home/user/file.txt"
assert (
resolve_sandbox_path(f"{E2B_WORKDIR}/file.txt") == f"{E2B_WORKDIR}/file.txt"
)
def test_workdir_itself(self):
assert resolve_sandbox_path("/home/user") == "/home/user"
assert resolve_sandbox_path(E2B_WORKDIR) == E2B_WORKDIR
def test_relative_dotslash(self):
assert resolve_sandbox_path("./README.md") == "/home/user/README.md"
assert resolve_sandbox_path("./README.md") == f"{E2B_WORKDIR}/README.md"
def test_traversal_blocked(self):
with pytest.raises(ValueError, match="must be within /home/user"):
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
resolve_sandbox_path("../../etc/passwd")
def test_absolute_traversal_blocked(self):
with pytest.raises(ValueError, match="must be within /home/user"):
resolve_sandbox_path("/home/user/../../etc/passwd")
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
resolve_sandbox_path(f"{E2B_WORKDIR}/../../etc/passwd")
def test_absolute_outside_sandbox_blocked(self):
with pytest.raises(ValueError, match="must be within /home/user"):
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
resolve_sandbox_path("/etc/passwd")
def test_root_blocked(self):
with pytest.raises(ValueError, match="must be within /home/user"):
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
resolve_sandbox_path("/")
def test_home_other_user_blocked(self):
with pytest.raises(ValueError, match="must be within /home/user"):
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
resolve_sandbox_path("/home/other/file.txt")
def test_deep_nested_allowed(self):
assert resolve_sandbox_path("a/b/c/d/e.txt") == "/home/user/a/b/c/d/e.txt"
assert resolve_sandbox_path("a/b/c/d/e.txt") == f"{E2B_WORKDIR}/a/b/c/d/e.txt"
def test_trailing_slash_normalised(self):
assert resolve_sandbox_path("src/") == "/home/user/src"
assert resolve_sandbox_path("src/") == f"{E2B_WORKDIR}/src"
def test_double_dots_within_sandbox_ok(self):
"""Path that resolves back within /home/user is allowed."""
assert resolve_sandbox_path("a/b/../c.txt") == "/home/user/a/c.txt"
"""Path that resolves back within E2B_WORKDIR is allowed."""
assert resolve_sandbox_path("a/b/../c.txt") == f"{E2B_WORKDIR}/a/c.txt"
# ---------------------------------------------------------------------------
@@ -179,49 +181,49 @@ def _make_sandbox(stdout: str, exit_code: int = 0) -> SimpleNamespace:
class TestCheckSandboxSymlinkEscape:
@pytest.mark.asyncio
async def test_canonical_path_within_workdir_returns_path(self):
"""When readlink -f resolves to a path inside /home/user, returns it."""
sandbox = _make_sandbox(stdout="/home/user/src\n", exit_code=0)
result = await _check_sandbox_symlink_escape(sandbox, "/home/user/src")
assert result == "/home/user/src"
"""When readlink -f resolves to a path inside E2B_WORKDIR, returns it."""
sandbox = _make_sandbox(stdout=f"{E2B_WORKDIR}/src\n", exit_code=0)
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/src")
assert result == f"{E2B_WORKDIR}/src"
@pytest.mark.asyncio
async def test_workdir_itself_returns_workdir(self):
"""When readlink -f resolves to /home/user exactly, returns /home/user."""
sandbox = _make_sandbox(stdout="/home/user\n", exit_code=0)
result = await _check_sandbox_symlink_escape(sandbox, "/home/user")
assert result == "/home/user"
"""When readlink -f resolves to E2B_WORKDIR exactly, returns E2B_WORKDIR."""
sandbox = _make_sandbox(stdout=f"{E2B_WORKDIR}\n", exit_code=0)
result = await _check_sandbox_symlink_escape(sandbox, E2B_WORKDIR)
assert result == E2B_WORKDIR
@pytest.mark.asyncio
async def test_symlink_escape_returns_none(self):
"""When readlink -f resolves outside /home/user (symlink escape), returns None."""
"""When readlink -f resolves outside E2B_WORKDIR (symlink escape), returns None."""
sandbox = _make_sandbox(stdout="/etc\n", exit_code=0)
result = await _check_sandbox_symlink_escape(sandbox, "/home/user/evil")
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/evil")
assert result is None
@pytest.mark.asyncio
async def test_nonzero_exit_code_returns_none(self):
"""A non-zero exit code from readlink -f returns None."""
sandbox = _make_sandbox(stdout="", exit_code=1)
result = await _check_sandbox_symlink_escape(sandbox, "/home/user/src")
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/src")
assert result is None
@pytest.mark.asyncio
async def test_empty_stdout_returns_none(self):
"""Empty stdout from readlink (e.g. path doesn't exist yet) returns None."""
sandbox = _make_sandbox(stdout="", exit_code=0)
result = await _check_sandbox_symlink_escape(sandbox, "/home/user/src")
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/src")
assert result is None
@pytest.mark.asyncio
async def test_prefix_collision_returns_none(self):
"""/home/user-evil does not satisfy the startswith check."""
sandbox = _make_sandbox(stdout="/home/user-evil\n", exit_code=0)
result = await _check_sandbox_symlink_escape(sandbox, "/home/user-evil")
"""A path prefixed with E2B_WORKDIR but not within it is rejected."""
sandbox = _make_sandbox(stdout=f"{E2B_WORKDIR}-evil\n", exit_code=0)
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}-evil")
assert result is None
@pytest.mark.asyncio
async def test_deeply_nested_path_within_workdir(self):
"""Deep nested paths inside /home/user are allowed."""
sandbox = _make_sandbox(stdout="/home/user/a/b/c/d\n", exit_code=0)
result = await _check_sandbox_symlink_escape(sandbox, "/home/user/a/b/c/d")
assert result == "/home/user/a/b/c/d"
"""Deep nested paths inside E2B_WORKDIR are allowed."""
sandbox = _make_sandbox(stdout=f"{E2B_WORKDIR}/a/b/c/d\n", exit_code=0)
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/a/b/c/d")
assert result == f"{E2B_WORKDIR}/a/b/c/d"

View File

@@ -77,6 +77,7 @@ from .tool_adapter import (
wait_for_stash,
)
from .transcript import (
cleanup_stale_project_dirs,
download_transcript,
read_compacted_entries,
upload_transcript,
@@ -297,8 +298,6 @@ async def _cleanup_sdk_tool_results(cwd: str) -> None:
Security: *cwd* MUST be created by ``_make_sdk_cwd()`` which sanitizes
the session_id.
"""
from .transcript import cleanup_stale_project_dirs
normalized = os.path.normpath(cwd)
if not normalized.startswith(_SDK_CWD_PREFIX):
logger.warning("[SDK] Rejecting cleanup for path outside workspace: %s", cwd)

View File

@@ -191,6 +191,11 @@ def cleanup_stale_project_dirs(encoded_cwd: str | None = None) -> int:
)
return 0
try:
# st_mtime is used as a proxy for session activity. Claude CLI writes
# its JSONL transcript into this directory during each turn, so mtime
# advances on every turn. A directory whose mtime is older than
# _STALE_PROJECT_DIR_SECONDS has not had an active turn in that window
# and is safe to remove (the session cannot --resume after cleanup).
age = now - target.stat().st_mtime
except OSError:
return 0
@@ -229,6 +234,8 @@ def cleanup_stale_project_dirs(encoded_cwd: str | None = None) -> int:
if not entry.is_dir():
continue
try:
# See the scoped-mode comment above: st_mtime advances on every turn,
# so a stale mtime reliably indicates an inactive session.
age = now - entry.stat().st_mtime
except OSError:
continue

View File

@@ -352,7 +352,9 @@ def _read_local_tool_result(
)
slice_text = text_content[char_offset:end]
except FileNotFoundError:
return ErrorResponse(message=f"File not found: {path}", session_id=session_id)
return ErrorResponse(
message=f"File not found: {os.path.basename(path)}", session_id=session_id
)
except Exception as exc:
return ErrorResponse(
message=f"Error reading file: {type(exc).__name__}", session_id=session_id

View File

@@ -577,3 +577,49 @@ async def test_read_workspace_file_falls_back_to_local_tool_result(setup_test_da
finally:
_current_project_dir.reset(token)
shutil.rmtree(os.path.join(SDK_PROJECTS_DIR, encoded), ignore_errors=True)
@pytest.mark.asyncio(loop_scope="session")
async def test_read_workspace_file_no_fallback_when_resolve_succeeds(setup_test_data):
"""When _resolve_file succeeds, the local-disk fallback must NOT be invoked."""
user = setup_test_data["user"]
session = make_session(user.id)
fake_file_id = "fake-file-id-001"
fake_content = b"workspace content"
# Build a minimal file_info stub that the tool's happy-path needs.
class _FakeFileInfo:
id = fake_file_id
name = "result.json"
path = "/result.json"
mime_type = "text/plain"
size_bytes = len(fake_content)
mock_resolve = AsyncMock(return_value=(fake_file_id, _FakeFileInfo()))
mock_manager = AsyncMock()
mock_manager.read_file_by_id = AsyncMock(return_value=fake_content)
with (
patch("backend.copilot.tools.workspace_files._resolve_file", mock_resolve),
patch(
"backend.copilot.tools.workspace_files.get_manager",
AsyncMock(return_value=mock_manager),
),
patch(
"backend.copilot.tools.workspace_files._read_local_tool_result"
) as patched_local,
):
read_tool = ReadWorkspaceFileTool()
result = await read_tool._execute(
user_id=user.id,
session=session,
file_id=fake_file_id,
)
# Fallback must not have been called.
patched_local.assert_not_called()
# Normal workspace path must have produced a content response.
assert isinstance(result, WorkspaceFileContentResponse)
assert base64.b64decode(result.content_base64) == fake_content