fix(backend/copilot): address third-bump review comments

- Add defence-in-depth is_allowed_local_path check in _read_local_tool_result
- Scope _sweep_stale_project_dirs to current session's encoded_dir only
- Remove dead cleanup_cli_project_dir from transcript.py
- Check readlink exit_code in e2b_file_tools symlink validation
- Remove redundant try/except around shutil.rmtree(ignore_errors=True)
- Add test for parts[1] != "tool-results" rejection path
- Rename _SDK_PROJECTS_DIR to SDK_PROJECTS_DIR (public API)
- Remove sleep(0) band-aid from wait_for_stash, add timeout justification
- Extract _UUID_RE compiled constant for conversation UUID validation
This commit is contained in:
Zamil Majdy
2026-03-13 19:54:00 +07:00
parent 3334a4b4b5
commit 85101bfc5b
8 changed files with 94 additions and 91 deletions

View File

@@ -15,8 +15,14 @@ from backend.copilot.model import ChatSession
if TYPE_CHECKING:
from e2b import AsyncSandbox
# Allowed base directory for the Read tool.
_SDK_PROJECTS_DIR = os.path.realpath(os.path.expanduser("~/.claude/projects"))
# Allowed base directory for the Read tool. Public so service.py can use it
# for sweep operations without depending on a private implementation detail.
SDK_PROJECTS_DIR = os.path.realpath(os.path.expanduser("~/.claude/projects"))
# Compiled UUID-v4 pattern for validating conversation directory names.
# Kept as a module-level constant so the security-relevant pattern is easy
# to audit in one place and avoids recompilation on every call.
_UUID_RE = re.compile(r"^[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}$")
# Encoded project-directory name for the current session (e.g.
# "-private-tmp-copilot-<uuid>"). Set by set_execution_context() so path
@@ -87,8 +93,9 @@ def is_allowed_local_path(path: str, sdk_cwd: str | None = None) -> bool:
Allowed:
- Files under *sdk_cwd* (``/tmp/copilot-<session>/``)
- Files under ``~/.claude/projects/<encoded-cwd>/<uuid>/tool-results/``.
The SDK always nests tool-results under a conversation UUID directory.
- Files under ``~/.claude/projects/<encoded-cwd>/<uuid>/tool-results/...``.
The SDK nests tool-results under a conversation UUID directory;
the UUID segment is validated with ``_UUID_RE``.
"""
if not path:
return False
@@ -107,7 +114,7 @@ def is_allowed_local_path(path: str, sdk_cwd: str | None = None) -> bool:
encoded = _current_project_dir.get("")
if encoded:
project_dir = os.path.join(_SDK_PROJECTS_DIR, encoded)
project_dir = os.path.join(SDK_PROJECTS_DIR, encoded)
# Only allow: <encoded-cwd>/<uuid>/tool-results/<file>
# The SDK always creates a conversation UUID directory between
# the project dir and tool-results/.
@@ -117,7 +124,7 @@ def is_allowed_local_path(path: str, sdk_cwd: str | None = None) -> bool:
# Require exactly: [<uuid>, "tool-results", <file>, ...]
if (
len(parts) >= 3
and re.match(r"^[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}$", parts[0])
and _UUID_RE.match(parts[0])
and parts[1] == "tool-results"
):
return True

View File

@@ -9,7 +9,7 @@ from unittest.mock import MagicMock
import pytest
from backend.copilot.context import (
_SDK_PROJECTS_DIR,
SDK_PROJECTS_DIR,
_current_project_dir,
get_current_sandbox,
get_execution_context,
@@ -109,7 +109,7 @@ def test_is_allowed_local_path_tool_results_with_uuid():
encoded = "test-encoded-dir"
conv_uuid = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
path = os.path.join(
_SDK_PROJECTS_DIR, encoded, conv_uuid, "tool-results", "output.txt"
SDK_PROJECTS_DIR, encoded, conv_uuid, "tool-results", "output.txt"
)
_current_project_dir.set(encoded)
@@ -122,7 +122,7 @@ def test_is_allowed_local_path_tool_results_with_uuid():
def test_is_allowed_local_path_tool_results_without_uuid_rejected():
"""Direct <encoded-cwd>/tool-results/ (no UUID) is rejected."""
encoded = "test-encoded-dir"
path = os.path.join(_SDK_PROJECTS_DIR, encoded, "tool-results", "output.txt")
path = os.path.join(SDK_PROJECTS_DIR, encoded, "tool-results", "output.txt")
_current_project_dir.set(encoded)
try:
@@ -134,7 +134,7 @@ def test_is_allowed_local_path_tool_results_without_uuid_rejected():
def test_is_allowed_local_path_sibling_of_tool_results_is_rejected():
"""A path adjacent to tool-results/ but not inside it is rejected."""
encoded = "test-encoded-dir"
sibling_path = os.path.join(_SDK_PROJECTS_DIR, encoded, "other-dir", "file.txt")
sibling_path = os.path.join(SDK_PROJECTS_DIR, encoded, "other-dir", "file.txt")
_current_project_dir.set(encoded)
try:
@@ -143,6 +143,21 @@ def test_is_allowed_local_path_sibling_of_tool_results_is_rejected():
_current_project_dir.set("")
def test_is_allowed_local_path_valid_uuid_wrong_segment_name_rejected():
"""A valid UUID dir but non-'tool-results' second segment is rejected."""
encoded = "test-encoded-dir"
uuid_str = "12345678-1234-5678-9abc-def012345678"
path = os.path.join(
SDK_PROJECTS_DIR, encoded, uuid_str, "not-tool-results", "output.txt"
)
_current_project_dir.set(encoded)
try:
assert not is_allowed_local_path(path, sdk_cwd=None)
finally:
_current_project_dir.set("")
# ---------------------------------------------------------------------------
# resolve_sandbox_path
# ---------------------------------------------------------------------------

View File

@@ -115,9 +115,13 @@ async def _handle_write_file(args: dict[str, Any]) -> dict[str, Any]:
timeout=5,
)
canonical_parent = (canonical_res.stdout or "").strip()
if not canonical_parent or (
canonical_parent != E2B_WORKDIR
and not canonical_parent.startswith(E2B_WORKDIR + "/")
if (
canonical_res.exit_code != 0
or not canonical_parent
or (
canonical_parent != E2B_WORKDIR
and not canonical_parent.startswith(E2B_WORKDIR + "/")
)
):
return _mcp(f"Path must be within {E2B_WORKDIR}: {parent}", error=True)
remote = os.path.join(canonical_parent, os.path.basename(remote))

View File

@@ -8,13 +8,10 @@ import shutil
import pytest
from backend.copilot.context import _current_project_dir
from backend.copilot.context import SDK_PROJECTS_DIR, _current_project_dir
from .e2b_file_tools import _read_local, resolve_sandbox_path
_SDK_PROJECTS_DIR = os.path.realpath(os.path.expanduser("~/.claude/projects"))
# ---------------------------------------------------------------------------
# resolve_sandbox_path — sandbox path normalisation & boundary enforcement
# ---------------------------------------------------------------------------
@@ -79,7 +76,7 @@ class TestReadLocal:
def _make_tool_results_file(self, encoded: str, filename: str, content: str) -> str:
"""Create a tool-results file under <encoded>/<uuid>/tool-results/."""
tool_results_dir = os.path.join(
_SDK_PROJECTS_DIR, encoded, self._CONV_UUID, "tool-results"
SDK_PROJECTS_DIR, encoded, self._CONV_UUID, "tool-results"
)
os.makedirs(tool_results_dir, exist_ok=True)
filepath = os.path.join(tool_results_dir, filename)
@@ -113,7 +110,7 @@ class TestReadLocal:
"""A tool-results path that doesn't exist returns FileNotFoundError."""
encoded = "-tmp-copilot-e2b-test-nofile"
tool_results_dir = os.path.join(
_SDK_PROJECTS_DIR, encoded, self._CONV_UUID, "tool-results"
SDK_PROJECTS_DIR, encoded, self._CONV_UUID, "tool-results"
)
os.makedirs(tool_results_dir, exist_ok=True)
filepath = os.path.join(tool_results_dir, "nonexistent.txt")
@@ -124,7 +121,7 @@ class TestReadLocal:
assert "not found" in result["content"][0]["text"].lower()
finally:
_current_project_dir.reset(token)
shutil.rmtree(os.path.join(_SDK_PROJECTS_DIR, encoded), ignore_errors=True)
shutil.rmtree(os.path.join(SDK_PROJECTS_DIR, encoded), ignore_errors=True)
def test_read_traversal_path_blocked(self):
"""A traversal attempt that escapes allowed directories is blocked."""

View File

@@ -30,7 +30,7 @@ from langfuse import propagate_attributes
from langsmith.integrations.claude_agent_sdk import configure_claude_agent_sdk
from pydantic import BaseModel
from backend.copilot.context import _SDK_PROJECTS_DIR
from backend.copilot.context import SDK_PROJECTS_DIR
from backend.data.redis_client import get_redis_async
from backend.executor.cluster_lock import AsyncClusterLock
from backend.util.exceptions import NotFoundError
@@ -305,15 +305,14 @@ async def _cleanup_sdk_tool_results(cwd: str) -> None:
logger.warning(f"[SDK] Rejecting cleanup for path outside workspace: {cwd}")
return
encoded_dir = re.sub(r"[^a-zA-Z0-9]", "-", normalized)
def _sync_cleanup() -> None:
# Clean up the temp cwd directory itself.
try:
shutil.rmtree(normalized, ignore_errors=True)
except OSError:
pass
shutil.rmtree(normalized, ignore_errors=True)
# Sweep stale conversation dirs under the CLI project directory.
_sweep_stale_project_dirs()
_sweep_stale_project_dirs(encoded_dir)
await asyncio.to_thread(_sync_cleanup)
@@ -321,9 +320,13 @@ async def _cleanup_sdk_tool_results(cwd: str) -> None:
def _latest_mtime(dir_path: str) -> float:
"""Return the most recent mtime of *dir_path* and its immediate children.
A directory's mtime only updates when its direct entries change, so we
check one level deeper to catch active sessions that write new files
inside sub-directories like ``tool-results/``.
One level of children suffices because adding a file inside
``tool-results/`` updates ``tool-results/``'s own mtime, and
``tool-results/`` is an immediate child of the UUID dir.
``follow_symlinks=False`` prevents symlink-based mtime spoofing --
an attacker could create a symlink to a recently-modified file to
prevent cleanup of a stale directory.
"""
latest = os.stat(dir_path, follow_symlinks=False).st_mtime
for child in os.scandir(dir_path):
@@ -334,31 +337,32 @@ def _latest_mtime(dir_path: str) -> float:
return latest
def _sweep_stale_project_dirs() -> None:
def _sweep_stale_project_dirs(encoded_dir: str) -> None:
"""Remove conversation UUID dirs older than the TTL threshold.
Only sweeps within the given *encoded_dir* (the CLI-encoded cwd for the
current session) to avoid cross-session interference in multi-tenant
deployments.
The SDK creates ``~/.claude/projects/<encoded-cwd>/<uuid>/`` directories
for each conversation. We keep recent ones (needed by ``--resume``) and
remove those older than ``_STALE_PROJECT_DIR_TTL_SECONDS``.
"""
try:
if not os.path.isdir(_SDK_PROJECTS_DIR):
project_dir = os.path.join(SDK_PROJECTS_DIR, encoded_dir)
if not os.path.isdir(project_dir):
return
cutoff = time.time() - _STALE_PROJECT_DIR_TTL_SECONDS
for encoded_dir in os.scandir(_SDK_PROJECTS_DIR):
if not encoded_dir.is_dir(follow_symlinks=False):
for conv_dir in os.scandir(project_dir):
if not conv_dir.is_dir(follow_symlinks=False):
continue
for conv_dir in os.scandir(encoded_dir.path):
if not conv_dir.is_dir(follow_symlinks=False):
continue
try:
if _latest_mtime(conv_dir.path) < cutoff:
shutil.rmtree(conv_dir.path, ignore_errors=True)
logger.debug(
"[SDK] Removed stale project dir: %s", conv_dir.path
)
except OSError:
continue
try:
mtime = _latest_mtime(conv_dir.path)
except OSError:
continue
if mtime < cutoff:
shutil.rmtree(conv_dir.path, ignore_errors=True)
logger.debug("[SDK] Removed stale project dir: %s", conv_dir.path)
except OSError:
pass # Best-effort cleanup

View File

@@ -155,12 +155,12 @@ async def wait_for_stash(timeout: float = 2.0) -> bool:
by waiting on the ``_stash_event``, which is signaled by
:func:`stash_pending_tool_output`.
After the event fires, callers should ``await asyncio.sleep(0)`` to
give any remaining concurrent hooks a chance to complete.
Returns ``True`` if a stash signal was received, ``False`` on timeout.
The timeout is a safety net — normally the stash happens within
microseconds of yielding to the event loop.
The 2.0 s default was chosen based on production metrics: the original
0.5 s caused frequent timeouts under load (parallel tool calls, large
outputs). 2.0 s gives a comfortable margin while still failing fast
when the hook genuinely will not fire.
"""
event = _stash_event.get(None)
if event is None:
@@ -176,12 +176,6 @@ async def wait_for_stash(timeout: float = 2.0) -> bool:
event.clear()
return True
except TimeoutError:
# Last chance: yield to the event loop once — the hook may have
# completed between the timeout and this point.
await asyncio.sleep(0)
if event.is_set():
event.clear()
return True
return False

View File

@@ -137,34 +137,6 @@ def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
_SAFE_CWD_PREFIX = os.path.realpath("/tmp/copilot-")
def cleanup_cli_project_dir(sdk_cwd: str) -> None:
"""Remove the CLI's project directory for a specific working directory.
The CLI stores session data under ``~/.claude/projects/<encoded_cwd>/``.
Each SDK turn uses a unique ``sdk_cwd``, so the project directory is
safe to remove entirely after the transcript has been uploaded.
"""
import shutil
# Encode cwd the same way CLI does (replaces non-alphanumeric with -)
cwd_encoded = re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(sdk_cwd))
config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.expanduser("~/.claude")
projects_base = os.path.realpath(os.path.join(config_dir, "projects"))
project_dir = os.path.realpath(os.path.join(projects_base, cwd_encoded))
if not project_dir.startswith(projects_base + os.sep):
logger.warning(
f"[Transcript] Cleanup path escaped projects base: {project_dir}"
)
return
if os.path.isdir(project_dir):
shutil.rmtree(project_dir, ignore_errors=True)
logger.debug(f"[Transcript] Cleaned up CLI project dir: {project_dir}")
else:
logger.debug(f"[Transcript] Project dir not found: {project_dir}")
def write_transcript_to_tempfile(
transcript_content: str,
session_id: str,

View File

@@ -289,17 +289,28 @@ def _read_local_tool_result(
char_offset: int,
char_length: Optional[int],
session_id: str,
sdk_cwd: str | None = None,
) -> ToolResponseBase:
"""Read an SDK tool-result file from local disk.
This is a fallback for when the model mistakenly calls
``read_workspace_file`` with an SDK tool-result path that only exists on
the host filesystem, not in cloud workspace storage.
Defence-in-depth: validates *path* via :func:`is_allowed_local_path`
regardless of what the caller has already checked.
"""
expanded = os.path.realpath(os.path.expanduser(path))
if not is_allowed_local_path(expanded, sdk_cwd or get_sdk_cwd()):
return ErrorResponse(message=f"Path not allowed: {path}", session_id=session_id)
try:
with open(expanded, encoding="utf-8", errors="replace") as fh:
text = fh.read()
# Use seek + bounded read to avoid loading the entire file when
# only a slice is requested. os.path.getsize gives an approximate
# total (bytes, not chars) for the status message.
if char_offset > 0:
fh.seek(char_offset)
slice_text = fh.read(char_length) if char_length is not None else fh.read()
except FileNotFoundError:
return ErrorResponse(message=f"File not found: {path}", session_id=session_id)
except Exception as exc:
@@ -307,10 +318,7 @@ def _read_local_tool_result(
message=f"Error reading {path}: {exc}", session_id=session_id
)
total_chars = len(text)
end = char_offset + char_length if char_length is not None else total_chars
slice_text = text[char_offset:end]
total_bytes = os.path.getsize(expanded)
return WorkspaceFileContentResponse(
file_id="local",
name=os.path.basename(path),
@@ -319,7 +327,8 @@ def _read_local_tool_result(
content_base64=base64.b64encode(slice_text.encode("utf-8")).decode("utf-8"),
message=(
f"Read chars {char_offset}\u2013{char_offset + len(slice_text)} "
f"of {total_chars:,} total from local tool-result {os.path.basename(path)}"
f"of ~{total_bytes:,} bytes from local tool-result "
f"{os.path.basename(path)}"
),
session_id=session_id,
)
@@ -586,9 +595,10 @@ class ReadWorkspaceFileTool(BaseTool):
# Fallback: if the path is an SDK tool-result on local disk,
# read it directly instead of failing. The model sometimes
# calls read_workspace_file for these paths by mistake.
if path and is_allowed_local_path(path, get_sdk_cwd()):
sdk_cwd = get_sdk_cwd()
if path and is_allowed_local_path(path, sdk_cwd):
return _read_local_tool_result(
path, char_offset, char_length, session_id
path, char_offset, char_length, session_id, sdk_cwd=sdk_cwd
)
return resolved
target_file_id, file_info = resolved