fix(backend/copilot): harden tool-result path validation and address review feedback

- Tighten is_allowed_local_path to only allow UUID-nested tool-results
  paths (<encoded-cwd>/<uuid>/tool-results/<file>), rejecting the
  non-UUID pattern that isn't a real SDK flow
- Add TTL-based cleanup (24h) for stale conversation UUID dirs under
  ~/.claude/projects/ to prevent disk leak (addresses sentry bot review)
- Add path re-validation after mkdir in E2B write handler to prevent
  symlink escape
- Increase wait_for_stash timeout from 0.5s to 2.0s and add post-timeout
  retry to reduce PostToolUse hook race condition output loss
- Update all affected tests to use UUID-nested path pattern
This commit is contained in:
Zamil Majdy
2026-03-13 15:50:17 +07:00
parent a4deae0f69
commit e0128470a9
7 changed files with 94 additions and 24 deletions

View File

@@ -87,10 +87,8 @@ def is_allowed_local_path(path: str, sdk_cwd: str | None = None) -> bool:
Allowed:
- Files under *sdk_cwd* (``/tmp/copilot-<session>/``)
- Files under any ``tool-results/`` directory within
``~/.claude/projects/<encoded-cwd>/``. The SDK may nest tool-results
under a conversation UUID, e.g.
``<encoded-cwd>/<conversation-uuid>/tool-results/<file>``.
- Files under ``~/.claude/projects/<encoded-cwd>/<uuid>/tool-results/``.
The SDK always nests tool-results under a conversation UUID directory.
"""
if not path:
return False
@@ -110,13 +108,14 @@ def is_allowed_local_path(path: str, sdk_cwd: str | None = None) -> bool:
encoded = _current_project_dir.get("")
if encoded:
project_dir = os.path.join(_SDK_PROJECTS_DIR, encoded)
# Allow any tool-results/ directory under the project dir, at any
# nesting depth (the SDK may insert a conversation UUID between the
# project dir and tool-results/).
# Only allow: <encoded-cwd>/<uuid>/tool-results/<file>
# The SDK always creates a conversation UUID directory between
# the project dir and tool-results/.
if resolved.startswith(project_dir + os.sep):
relative = resolved[len(project_dir) + 1 :]
parts = relative.split(os.sep)
if "tool-results" in parts:
# Require exactly: [<uuid>, "tool-results", <file>, ...]
if len(parts) >= 3 and parts[1] == "tool-results":
return True
return False

View File

@@ -104,11 +104,13 @@ def test_is_allowed_local_path_no_sdk_cwd_no_project_dir():
assert not is_allowed_local_path("/tmp/some-file.txt", sdk_cwd=None)
def test_is_allowed_local_path_tool_results_dir():
"""Files under the tool-results directory for the current project are allowed."""
def test_is_allowed_local_path_tool_results_with_uuid():
"""Files under <encoded-cwd>/<uuid>/tool-results/ are allowed."""
encoded = "test-encoded-dir"
tool_results_dir = os.path.join(_SDK_PROJECTS_DIR, encoded, "tool-results")
path = os.path.join(tool_results_dir, "output.txt")
conv_uuid = "abc123-conv-uuid"
path = os.path.join(
_SDK_PROJECTS_DIR, encoded, conv_uuid, "tool-results", "output.txt"
)
_current_project_dir.set(encoded)
try:
@@ -117,6 +119,18 @@ def test_is_allowed_local_path_tool_results_dir():
_current_project_dir.set("")
def test_is_allowed_local_path_tool_results_without_uuid_rejected():
"""Direct <encoded-cwd>/tool-results/ (no UUID) is rejected."""
encoded = "test-encoded-dir"
path = os.path.join(_SDK_PROJECTS_DIR, encoded, "tool-results", "output.txt")
_current_project_dir.set(encoded)
try:
assert not is_allowed_local_path(path, sdk_cwd=None)
finally:
_current_project_dir.set("")
def test_is_allowed_local_path_sibling_of_tool_results_is_rejected():
"""A path adjacent to tool-results/ but not inside it is rejected."""
encoded = "test-encoded-dir"

View File

@@ -106,6 +106,12 @@ async def _handle_write_file(args: dict[str, Any]) -> dict[str, Any]:
parent = os.path.dirname(remote)
if parent and parent != E2B_WORKDIR:
await sandbox.files.make_dir(parent)
# Re-validate path after mkdir — a symlink in the sandbox could
# redirect the write target outside /home/user.
try:
resolve_sandbox_path(remote)
except ValueError as exc:
return _mcp(str(exc), error=True)
await sandbox.files.write(remote, content)
except Exception as exc:
return _mcp(f"Failed to write {remote}: {exc}", error=True)

View File

@@ -4,6 +4,7 @@ Pure unit tests with no external dependencies (no E2B, no sandbox).
"""
import os
import shutil
import pytest
@@ -73,9 +74,13 @@ class TestResolveSandboxPath:
class TestReadLocal:
_CONV_UUID = "test-conv-uuid"
def _make_tool_results_file(self, encoded: str, filename: str, content: str) -> str:
"""Create a tool-results file and return its path."""
tool_results_dir = os.path.join(_SDK_PROJECTS_DIR, encoded, "tool-results")
"""Create a tool-results file under <encoded>/<uuid>/tool-results/."""
tool_results_dir = os.path.join(
_SDK_PROJECTS_DIR, encoded, self._CONV_UUID, "tool-results"
)
os.makedirs(tool_results_dir, exist_ok=True)
filepath = os.path.join(tool_results_dir, filename)
with open(filepath, "w") as f:
@@ -107,7 +112,9 @@ class TestReadLocal:
def test_read_nonexistent_tool_results(self):
"""A tool-results path that doesn't exist returns FileNotFoundError."""
encoded = "-tmp-copilot-e2b-test-nofile"
tool_results_dir = os.path.join(_SDK_PROJECTS_DIR, encoded, "tool-results")
tool_results_dir = os.path.join(
_SDK_PROJECTS_DIR, encoded, self._CONV_UUID, "tool-results"
)
os.makedirs(tool_results_dir, exist_ok=True)
filepath = os.path.join(tool_results_dir, "nonexistent.txt")
token = _current_project_dir.set(encoded)
@@ -117,7 +124,7 @@ class TestReadLocal:
assert "not found" in result["content"][0]["text"].lower()
finally:
_current_project_dir.reset(token)
os.rmdir(tool_results_dir)
shutil.rmtree(os.path.join(_SDK_PROJECTS_DIR, encoded), ignore_errors=True)
def test_read_traversal_path_blocked(self):
"""A traversal attempt that escapes allowed directories is blocked."""

View File

@@ -122,7 +122,7 @@ def test_read_no_cwd_denies_absolute():
def test_read_tool_results_allowed():
home = os.path.expanduser("~")
path = f"{home}/.claude/projects/-tmp-copilot-abc123/tool-results/12345.txt"
path = f"{home}/.claude/projects/-tmp-copilot-abc123/conv-uuid-123/tool-results/12345.txt"
# is_allowed_local_path requires the session's encoded cwd to be set
token = _current_project_dir.set("-tmp-copilot-abc123")
try:

View File

@@ -279,16 +279,18 @@ def _make_sdk_cwd(session_id: str) -> str:
return cwd
_STALE_PROJECT_DIR_TTL_SECONDS = 24 * 60 * 60 # 24 hours
def _cleanup_sdk_tool_results(cwd: str) -> None:
"""Remove SDK session artifacts for a specific working directory.
Cleans up the ephemeral working directory ``/tmp/copilot-<session>/``.
NOTE: The CLI project directory ``~/.claude/projects/<encoded-cwd>/``
is intentionally NOT cleaned up between turns. The SDK stores
tool-result files there that the model may reference in subsequent
turns via ``--resume``. Deleting them causes "file not found" errors
when the model tries to re-read truncated tool outputs.
Also sweeps stale conversation UUID directories under
``~/.claude/projects/<encoded-cwd>/`` that are older than 24 hours.
This prevents unbounded disk growth while preserving tool-result files
needed by recent ``--resume`` turns.
Security: *cwd* MUST be created by ``_make_sdk_cwd()`` which sanitizes
the session_id.
@@ -304,6 +306,42 @@ def _cleanup_sdk_tool_results(cwd: str) -> None:
except OSError:
pass
# Sweep stale conversation dirs under the CLI project directory.
_sweep_stale_project_dirs()
def _sweep_stale_project_dirs() -> None:
"""Remove conversation UUID dirs older than the TTL threshold.
The SDK creates ``~/.claude/projects/<encoded-cwd>/<uuid>/`` directories
for each conversation. We keep recent ones (needed by ``--resume``) and
remove those older than ``_STALE_PROJECT_DIR_TTL_SECONDS``.
"""
import time
from backend.copilot.context import _SDK_PROJECTS_DIR
try:
if not os.path.isdir(_SDK_PROJECTS_DIR):
return
now = time.time()
cutoff = now - _STALE_PROJECT_DIR_TTL_SECONDS
for encoded_dir in os.scandir(_SDK_PROJECTS_DIR):
if not encoded_dir.is_dir(follow_symlinks=False):
continue
for conv_dir in os.scandir(encoded_dir.path):
if not conv_dir.is_dir(follow_symlinks=False):
continue
try:
mtime = conv_dir.stat(follow_symlinks=False).st_mtime
except OSError:
continue
if mtime < cutoff:
shutil.rmtree(conv_dir.path, ignore_errors=True)
logger.debug("[SDK] Removed stale project dir: %s", conv_dir.path)
except OSError:
pass # Best-effort cleanup
def _format_sdk_content_blocks(blocks: list) -> list[dict[str, Any]]:
"""Convert SDK content blocks to transcript format.
@@ -1096,7 +1134,7 @@ async def stream_chat_completion_sdk(
and isinstance(sdk_msg, (AssistantMessage, ResultMessage))
and not is_parallel_continuation
):
if await wait_for_stash(timeout=0.5):
if await wait_for_stash(timeout=2.0):
await asyncio.sleep(0)
else:
logger.warning(

View File

@@ -146,7 +146,7 @@ def stash_pending_tool_output(tool_name: str, output: Any) -> None:
event.set()
async def wait_for_stash(timeout: float = 0.5) -> bool:
async def wait_for_stash(timeout: float = 2.0) -> bool:
"""Wait for a PostToolUse hook to stash tool output.
The SDK fires PostToolUse hooks asynchronously via ``start_soon()`` —
@@ -176,6 +176,12 @@ async def wait_for_stash(timeout: float = 0.5) -> bool:
event.clear()
return True
except TimeoutError:
# Last chance: yield to the event loop once — the hook may have
# completed between the timeout and this point.
await asyncio.sleep(0)
if event.is_set():
event.clear()
return True
return False