mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-03-17 03:00:27 -04:00
Compare commits
25 Commits
feat/workf
...
fix/copilo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
885459c6e1 | ||
|
|
38ff768a65 | ||
|
|
76dbf3bbec | ||
|
|
c0ade4be68 | ||
|
|
24cbe738ff | ||
|
|
49e031b54d | ||
|
|
4b6c2a1323 | ||
|
|
c854c1a485 | ||
|
|
53a2c84796 | ||
|
|
3063ce22ac | ||
|
|
69db0815c3 | ||
|
|
775ed85bba | ||
|
|
f07bb52ac3 | ||
|
|
84482071a8 | ||
|
|
9ac01a0cf6 | ||
|
|
71337c0514 | ||
|
|
b2808f223a | ||
|
|
85101bfc5b | ||
|
|
3334a4b4b5 | ||
|
|
796e737d77 | ||
|
|
8d16f8052b | ||
|
|
1f8ab0687c | ||
|
|
035aba9cf1 | ||
|
|
e0128470a9 | ||
|
|
a4deae0f69 |
@@ -17,8 +17,17 @@ from backend.util.workspace import WorkspaceManager
|
||||
if TYPE_CHECKING:
|
||||
from e2b import AsyncSandbox
|
||||
|
||||
# Allowed base directory for the Read tool.
|
||||
_SDK_PROJECTS_DIR = os.path.realpath(os.path.expanduser("~/.claude/projects"))
|
||||
# Allowed base directory for the Read tool. Public so service.py can use it
|
||||
# for sweep operations without depending on a private implementation detail.
|
||||
# Respects CLAUDE_CONFIG_DIR env var, consistent with transcript.py's
|
||||
# _projects_base() function.
|
||||
_config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.expanduser("~/.claude")
|
||||
SDK_PROJECTS_DIR = os.path.realpath(os.path.join(_config_dir, "projects"))
|
||||
|
||||
# Compiled UUID pattern for validating conversation directory names.
|
||||
# Kept as a module-level constant so the security-relevant pattern is easy
|
||||
# to audit in one place and avoids recompilation on every call.
|
||||
_UUID_RE = re.compile(r"^[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}$", re.IGNORECASE)
|
||||
|
||||
# Encoded project-directory name for the current session (e.g.
|
||||
# "-private-tmp-copilot-<uuid>"). Set by set_execution_context() so path
|
||||
@@ -35,11 +44,20 @@ _current_sandbox: ContextVar["AsyncSandbox | None"] = ContextVar(
|
||||
_current_sdk_cwd: ContextVar[str] = ContextVar("_current_sdk_cwd", default="")
|
||||
|
||||
|
||||
def _encode_cwd_for_cli(cwd: str) -> str:
|
||||
"""Encode a working directory path the same way the Claude CLI does."""
|
||||
def encode_cwd_for_cli(cwd: str) -> str:
|
||||
"""Encode a working directory path the same way the Claude CLI does.
|
||||
|
||||
The Claude CLI encodes the absolute cwd as a directory name by replacing
|
||||
every non-alphanumeric character with ``-``. For example
|
||||
``/tmp/copilot-abc`` becomes ``-tmp-copilot-abc``.
|
||||
"""
|
||||
return re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(cwd))
|
||||
|
||||
|
||||
# Keep the private alias for internal callers (backwards compat).
|
||||
_encode_cwd_for_cli = encode_cwd_for_cli
|
||||
|
||||
|
||||
def set_execution_context(
|
||||
user_id: str | None,
|
||||
session: ChatSession,
|
||||
@@ -100,7 +118,9 @@ def is_allowed_local_path(path: str, sdk_cwd: str | None = None) -> bool:
|
||||
|
||||
Allowed:
|
||||
- Files under *sdk_cwd* (``/tmp/copilot-<session>/``)
|
||||
- Files under ``~/.claude/projects/<encoded-cwd>/tool-results/`` (SDK tool-results)
|
||||
- Files under ``~/.claude/projects/<encoded-cwd>/<uuid>/tool-results/...``.
|
||||
The SDK nests tool-results under a conversation UUID directory;
|
||||
the UUID segment is validated with ``_UUID_RE``.
|
||||
"""
|
||||
if not path:
|
||||
return False
|
||||
@@ -119,10 +139,22 @@ def is_allowed_local_path(path: str, sdk_cwd: str | None = None) -> bool:
|
||||
|
||||
encoded = _current_project_dir.get("")
|
||||
if encoded:
|
||||
tool_results_dir = os.path.join(_SDK_PROJECTS_DIR, encoded, "tool-results")
|
||||
if resolved == tool_results_dir or resolved.startswith(
|
||||
tool_results_dir + os.sep
|
||||
):
|
||||
return True
|
||||
project_dir = os.path.realpath(os.path.join(SDK_PROJECTS_DIR, encoded))
|
||||
# Defence-in-depth: ensure project_dir didn't escape the base.
|
||||
if not project_dir.startswith(SDK_PROJECTS_DIR + os.sep):
|
||||
return False
|
||||
# Only allow: <encoded-cwd>/<uuid>/tool-results/<file>
|
||||
# The SDK always creates a conversation UUID directory between
|
||||
# the project dir and tool-results/.
|
||||
if resolved.startswith(project_dir + os.sep):
|
||||
relative = resolved[len(project_dir) + 1 :]
|
||||
parts = relative.split(os.sep)
|
||||
# Require exactly: [<uuid>, "tool-results", <file>, ...]
|
||||
if (
|
||||
len(parts) >= 3
|
||||
and _UUID_RE.match(parts[0])
|
||||
and parts[1] == "tool-results"
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@@ -9,7 +9,7 @@ from unittest.mock import MagicMock
|
||||
import pytest
|
||||
|
||||
from backend.copilot.context import (
|
||||
_SDK_PROJECTS_DIR,
|
||||
SDK_PROJECTS_DIR,
|
||||
_current_project_dir,
|
||||
get_current_sandbox,
|
||||
get_execution_context,
|
||||
@@ -104,11 +104,13 @@ def test_is_allowed_local_path_no_sdk_cwd_no_project_dir():
|
||||
assert not is_allowed_local_path("/tmp/some-file.txt", sdk_cwd=None)
|
||||
|
||||
|
||||
def test_is_allowed_local_path_tool_results_dir():
|
||||
"""Files under the tool-results directory for the current project are allowed."""
|
||||
def test_is_allowed_local_path_tool_results_with_uuid():
|
||||
"""Files under <encoded-cwd>/<uuid>/tool-results/ are allowed."""
|
||||
encoded = "test-encoded-dir"
|
||||
tool_results_dir = os.path.join(_SDK_PROJECTS_DIR, encoded, "tool-results")
|
||||
path = os.path.join(tool_results_dir, "output.txt")
|
||||
conv_uuid = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
|
||||
path = os.path.join(
|
||||
SDK_PROJECTS_DIR, encoded, conv_uuid, "tool-results", "output.txt"
|
||||
)
|
||||
|
||||
_current_project_dir.set(encoded)
|
||||
try:
|
||||
@@ -117,10 +119,22 @@ def test_is_allowed_local_path_tool_results_dir():
|
||||
_current_project_dir.set("")
|
||||
|
||||
|
||||
def test_is_allowed_local_path_tool_results_without_uuid_rejected():
|
||||
"""Direct <encoded-cwd>/tool-results/ (no UUID) is rejected."""
|
||||
encoded = "test-encoded-dir"
|
||||
path = os.path.join(SDK_PROJECTS_DIR, encoded, "tool-results", "output.txt")
|
||||
|
||||
_current_project_dir.set(encoded)
|
||||
try:
|
||||
assert not is_allowed_local_path(path, sdk_cwd=None)
|
||||
finally:
|
||||
_current_project_dir.set("")
|
||||
|
||||
|
||||
def test_is_allowed_local_path_sibling_of_tool_results_is_rejected():
|
||||
"""A path adjacent to tool-results/ but not inside it is rejected."""
|
||||
encoded = "test-encoded-dir"
|
||||
sibling_path = os.path.join(_SDK_PROJECTS_DIR, encoded, "other-dir", "file.txt")
|
||||
sibling_path = os.path.join(SDK_PROJECTS_DIR, encoded, "other-dir", "file.txt")
|
||||
|
||||
_current_project_dir.set(encoded)
|
||||
try:
|
||||
@@ -129,6 +143,21 @@ def test_is_allowed_local_path_sibling_of_tool_results_is_rejected():
|
||||
_current_project_dir.set("")
|
||||
|
||||
|
||||
def test_is_allowed_local_path_valid_uuid_wrong_segment_name_rejected():
|
||||
"""A valid UUID dir but non-'tool-results' second segment is rejected."""
|
||||
encoded = "test-encoded-dir"
|
||||
uuid_str = "12345678-1234-5678-9abc-def012345678"
|
||||
path = os.path.join(
|
||||
SDK_PROJECTS_DIR, encoded, uuid_str, "not-tool-results", "output.txt"
|
||||
)
|
||||
|
||||
_current_project_dir.set(encoded)
|
||||
try:
|
||||
assert not is_allowed_local_path(path, sdk_cwd=None)
|
||||
finally:
|
||||
_current_project_dir.set("")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# resolve_sandbox_path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -152,6 +152,13 @@ def _build_storage_supplement(
|
||||
|
||||
### File persistence
|
||||
Important files (code, configs, outputs) should be saved to workspace to ensure they persist.
|
||||
|
||||
### SDK tool-result files
|
||||
When tool outputs are large, the SDK truncates them and saves the full output to
|
||||
a local file under `~/.claude/projects/.../tool-results/`. To read these files,
|
||||
always use `read_file` or `Read` (NOT `read_workspace_file`).
|
||||
`read_workspace_file` reads from cloud workspace storage, where SDK
|
||||
tool-results are NOT stored.
|
||||
{_SHARED_TOOL_NOTES}"""
|
||||
|
||||
|
||||
|
||||
@@ -26,6 +26,41 @@ from backend.copilot.context import (
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def _check_sandbox_symlink_escape(
|
||||
sandbox: Any,
|
||||
parent: str,
|
||||
) -> str | None:
|
||||
"""Resolve the canonical parent path inside the sandbox to detect symlink escapes.
|
||||
|
||||
``normpath`` (used by ``resolve_sandbox_path``) only normalises the string;
|
||||
``readlink -f`` follows actual symlinks on the sandbox filesystem.
|
||||
|
||||
Returns the canonical parent path, or ``None`` if the path escapes
|
||||
``E2B_WORKDIR``.
|
||||
|
||||
Note: There is an inherent TOCTOU window between this check and the
|
||||
subsequent ``sandbox.files.write()``. A symlink could theoretically be
|
||||
replaced between the two operations. This is acceptable in the E2B
|
||||
sandbox model since the sandbox is single-user and ephemeral.
|
||||
"""
|
||||
canonical_res = await sandbox.commands.run(
|
||||
f"readlink -f {shlex.quote(parent or E2B_WORKDIR)}",
|
||||
cwd=E2B_WORKDIR,
|
||||
timeout=5,
|
||||
)
|
||||
canonical_parent = (canonical_res.stdout or "").strip()
|
||||
if (
|
||||
canonical_res.exit_code != 0
|
||||
or not canonical_parent
|
||||
or (
|
||||
canonical_parent != E2B_WORKDIR
|
||||
and not canonical_parent.startswith(E2B_WORKDIR + "/")
|
||||
)
|
||||
):
|
||||
return None
|
||||
return canonical_parent
|
||||
|
||||
|
||||
def _get_sandbox():
|
||||
return get_current_sandbox()
|
||||
|
||||
@@ -106,6 +141,10 @@ async def _handle_write_file(args: dict[str, Any]) -> dict[str, Any]:
|
||||
parent = os.path.dirname(remote)
|
||||
if parent and parent != E2B_WORKDIR:
|
||||
await sandbox.files.make_dir(parent)
|
||||
canonical_parent = await _check_sandbox_symlink_escape(sandbox, parent)
|
||||
if canonical_parent is None:
|
||||
return _mcp(f"Path must be within {E2B_WORKDIR}: {parent}", error=True)
|
||||
remote = os.path.join(canonical_parent, os.path.basename(remote))
|
||||
await sandbox.files.write(remote, content)
|
||||
except Exception as exc:
|
||||
return _mcp(f"Failed to write {remote}: {exc}", error=True)
|
||||
@@ -130,6 +169,12 @@ async def _handle_edit_file(args: dict[str, Any]) -> dict[str, Any]:
|
||||
return result
|
||||
sandbox, remote = result
|
||||
|
||||
parent = os.path.dirname(remote)
|
||||
canonical_parent = await _check_sandbox_symlink_escape(sandbox, parent)
|
||||
if canonical_parent is None:
|
||||
return _mcp(f"Path must be within {E2B_WORKDIR}: {parent}", error=True)
|
||||
remote = os.path.join(canonical_parent, os.path.basename(remote))
|
||||
|
||||
try:
|
||||
raw: bytes = await sandbox.files.read(remote, format="bytes")
|
||||
content = raw.decode("utf-8", errors="replace")
|
||||
|
||||
@@ -4,15 +4,19 @@ Pure unit tests with no external dependencies (no E2B, no sandbox).
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.copilot.context import _current_project_dir
|
||||
|
||||
from .e2b_file_tools import _read_local, resolve_sandbox_path
|
||||
|
||||
_SDK_PROJECTS_DIR = os.path.realpath(os.path.expanduser("~/.claude/projects"))
|
||||
from backend.copilot.context import E2B_WORKDIR, SDK_PROJECTS_DIR, _current_project_dir
|
||||
|
||||
from .e2b_file_tools import (
|
||||
_check_sandbox_symlink_escape,
|
||||
_read_local,
|
||||
resolve_sandbox_path,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# resolve_sandbox_path — sandbox path normalisation & boundary enforcement
|
||||
@@ -21,46 +25,48 @@ _SDK_PROJECTS_DIR = os.path.realpath(os.path.expanduser("~/.claude/projects"))
|
||||
|
||||
class TestResolveSandboxPath:
|
||||
def test_relative_path_resolved(self):
|
||||
assert resolve_sandbox_path("src/main.py") == "/home/user/src/main.py"
|
||||
assert resolve_sandbox_path("src/main.py") == f"{E2B_WORKDIR}/src/main.py"
|
||||
|
||||
def test_absolute_within_sandbox(self):
|
||||
assert resolve_sandbox_path("/home/user/file.txt") == "/home/user/file.txt"
|
||||
assert (
|
||||
resolve_sandbox_path(f"{E2B_WORKDIR}/file.txt") == f"{E2B_WORKDIR}/file.txt"
|
||||
)
|
||||
|
||||
def test_workdir_itself(self):
|
||||
assert resolve_sandbox_path("/home/user") == "/home/user"
|
||||
assert resolve_sandbox_path(E2B_WORKDIR) == E2B_WORKDIR
|
||||
|
||||
def test_relative_dotslash(self):
|
||||
assert resolve_sandbox_path("./README.md") == "/home/user/README.md"
|
||||
assert resolve_sandbox_path("./README.md") == f"{E2B_WORKDIR}/README.md"
|
||||
|
||||
def test_traversal_blocked(self):
|
||||
with pytest.raises(ValueError, match="must be within /home/user"):
|
||||
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
|
||||
resolve_sandbox_path("../../etc/passwd")
|
||||
|
||||
def test_absolute_traversal_blocked(self):
|
||||
with pytest.raises(ValueError, match="must be within /home/user"):
|
||||
resolve_sandbox_path("/home/user/../../etc/passwd")
|
||||
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
|
||||
resolve_sandbox_path(f"{E2B_WORKDIR}/../../etc/passwd")
|
||||
|
||||
def test_absolute_outside_sandbox_blocked(self):
|
||||
with pytest.raises(ValueError, match="must be within /home/user"):
|
||||
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
|
||||
resolve_sandbox_path("/etc/passwd")
|
||||
|
||||
def test_root_blocked(self):
|
||||
with pytest.raises(ValueError, match="must be within /home/user"):
|
||||
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
|
||||
resolve_sandbox_path("/")
|
||||
|
||||
def test_home_other_user_blocked(self):
|
||||
with pytest.raises(ValueError, match="must be within /home/user"):
|
||||
with pytest.raises(ValueError, match=f"must be within {E2B_WORKDIR}"):
|
||||
resolve_sandbox_path("/home/other/file.txt")
|
||||
|
||||
def test_deep_nested_allowed(self):
|
||||
assert resolve_sandbox_path("a/b/c/d/e.txt") == "/home/user/a/b/c/d/e.txt"
|
||||
assert resolve_sandbox_path("a/b/c/d/e.txt") == f"{E2B_WORKDIR}/a/b/c/d/e.txt"
|
||||
|
||||
def test_trailing_slash_normalised(self):
|
||||
assert resolve_sandbox_path("src/") == "/home/user/src"
|
||||
assert resolve_sandbox_path("src/") == f"{E2B_WORKDIR}/src"
|
||||
|
||||
def test_double_dots_within_sandbox_ok(self):
|
||||
"""Path that resolves back within /home/user is allowed."""
|
||||
assert resolve_sandbox_path("a/b/../c.txt") == "/home/user/a/c.txt"
|
||||
"""Path that resolves back within E2B_WORKDIR is allowed."""
|
||||
assert resolve_sandbox_path("a/b/../c.txt") == f"{E2B_WORKDIR}/a/c.txt"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -73,9 +79,13 @@ class TestResolveSandboxPath:
|
||||
|
||||
|
||||
class TestReadLocal:
|
||||
_CONV_UUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
|
||||
|
||||
def _make_tool_results_file(self, encoded: str, filename: str, content: str) -> str:
|
||||
"""Create a tool-results file and return its path."""
|
||||
tool_results_dir = os.path.join(_SDK_PROJECTS_DIR, encoded, "tool-results")
|
||||
"""Create a tool-results file under <encoded>/<uuid>/tool-results/."""
|
||||
tool_results_dir = os.path.join(
|
||||
SDK_PROJECTS_DIR, encoded, self._CONV_UUID, "tool-results"
|
||||
)
|
||||
os.makedirs(tool_results_dir, exist_ok=True)
|
||||
filepath = os.path.join(tool_results_dir, filename)
|
||||
with open(filepath, "w") as f:
|
||||
@@ -107,7 +117,9 @@ class TestReadLocal:
|
||||
def test_read_nonexistent_tool_results(self):
|
||||
"""A tool-results path that doesn't exist returns FileNotFoundError."""
|
||||
encoded = "-tmp-copilot-e2b-test-nofile"
|
||||
tool_results_dir = os.path.join(_SDK_PROJECTS_DIR, encoded, "tool-results")
|
||||
tool_results_dir = os.path.join(
|
||||
SDK_PROJECTS_DIR, encoded, self._CONV_UUID, "tool-results"
|
||||
)
|
||||
os.makedirs(tool_results_dir, exist_ok=True)
|
||||
filepath = os.path.join(tool_results_dir, "nonexistent.txt")
|
||||
token = _current_project_dir.set(encoded)
|
||||
@@ -117,7 +129,7 @@ class TestReadLocal:
|
||||
assert "not found" in result["content"][0]["text"].lower()
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
os.rmdir(tool_results_dir)
|
||||
shutil.rmtree(os.path.join(SDK_PROJECTS_DIR, encoded), ignore_errors=True)
|
||||
|
||||
def test_read_traversal_path_blocked(self):
|
||||
"""A traversal attempt that escapes allowed directories is blocked."""
|
||||
@@ -152,3 +164,66 @@ class TestReadLocal:
|
||||
"""Without _current_project_dir set, all paths are blocked."""
|
||||
result = _read_local("/tmp/anything.txt", offset=0, limit=10)
|
||||
assert result["isError"] is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _check_sandbox_symlink_escape — symlink escape detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_sandbox(stdout: str, exit_code: int = 0) -> SimpleNamespace:
|
||||
"""Build a minimal sandbox mock whose commands.run returns a fixed result."""
|
||||
run_result = SimpleNamespace(stdout=stdout, exit_code=exit_code)
|
||||
commands = SimpleNamespace(run=AsyncMock(return_value=run_result))
|
||||
return SimpleNamespace(commands=commands)
|
||||
|
||||
|
||||
class TestCheckSandboxSymlinkEscape:
|
||||
@pytest.mark.asyncio
|
||||
async def test_canonical_path_within_workdir_returns_path(self):
|
||||
"""When readlink -f resolves to a path inside E2B_WORKDIR, returns it."""
|
||||
sandbox = _make_sandbox(stdout=f"{E2B_WORKDIR}/src\n", exit_code=0)
|
||||
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/src")
|
||||
assert result == f"{E2B_WORKDIR}/src"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_workdir_itself_returns_workdir(self):
|
||||
"""When readlink -f resolves to E2B_WORKDIR exactly, returns E2B_WORKDIR."""
|
||||
sandbox = _make_sandbox(stdout=f"{E2B_WORKDIR}\n", exit_code=0)
|
||||
result = await _check_sandbox_symlink_escape(sandbox, E2B_WORKDIR)
|
||||
assert result == E2B_WORKDIR
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_symlink_escape_returns_none(self):
|
||||
"""When readlink -f resolves outside E2B_WORKDIR (symlink escape), returns None."""
|
||||
sandbox = _make_sandbox(stdout="/etc\n", exit_code=0)
|
||||
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/evil")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nonzero_exit_code_returns_none(self):
|
||||
"""A non-zero exit code from readlink -f returns None."""
|
||||
sandbox = _make_sandbox(stdout="", exit_code=1)
|
||||
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/src")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_stdout_returns_none(self):
|
||||
"""Empty stdout from readlink (e.g. path doesn't exist yet) returns None."""
|
||||
sandbox = _make_sandbox(stdout="", exit_code=0)
|
||||
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/src")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prefix_collision_returns_none(self):
|
||||
"""A path prefixed with E2B_WORKDIR but not within it is rejected."""
|
||||
sandbox = _make_sandbox(stdout=f"{E2B_WORKDIR}-evil\n", exit_code=0)
|
||||
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}-evil")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deeply_nested_path_within_workdir(self):
|
||||
"""Deep nested paths inside E2B_WORKDIR are allowed."""
|
||||
sandbox = _make_sandbox(stdout=f"{E2B_WORKDIR}/a/b/c/d\n", exit_code=0)
|
||||
result = await _check_sandbox_symlink_escape(sandbox, f"{E2B_WORKDIR}/a/b/c/d")
|
||||
assert result == f"{E2B_WORKDIR}/a/b/c/d"
|
||||
|
||||
@@ -42,7 +42,7 @@ def _validate_workspace_path(
|
||||
Delegates to :func:`is_allowed_local_path` which permits:
|
||||
- The SDK working directory (``/tmp/copilot-<session>/``)
|
||||
- The current session's tool-results directory
|
||||
(``~/.claude/projects/<encoded-cwd>/tool-results/``)
|
||||
(``~/.claude/projects/<encoded-cwd>/<uuid>/tool-results/``)
|
||||
"""
|
||||
path = tool_input.get("file_path") or tool_input.get("path") or ""
|
||||
if not path:
|
||||
@@ -302,7 +302,11 @@ def create_security_hooks(
|
||||
"""
|
||||
_ = context, tool_use_id
|
||||
trigger = input_data.get("trigger", "auto")
|
||||
# Sanitize untrusted input before logging to prevent log injection
|
||||
# Sanitize untrusted input: strip control chars for logging AND
|
||||
# for the value passed downstream. read_compacted_entries()
|
||||
# validates against _projects_base() as defence-in-depth, but
|
||||
# sanitizing here prevents log injection and rejects obviously
|
||||
# malformed paths early.
|
||||
transcript_path = (
|
||||
str(input_data.get("transcript_path", ""))
|
||||
.replace("\n", "")
|
||||
|
||||
@@ -122,7 +122,7 @@ def test_read_no_cwd_denies_absolute():
|
||||
|
||||
def test_read_tool_results_allowed():
|
||||
home = os.path.expanduser("~")
|
||||
path = f"{home}/.claude/projects/-tmp-copilot-abc123/tool-results/12345.txt"
|
||||
path = f"{home}/.claude/projects/-tmp-copilot-abc123/a1b2c3d4-e5f6-7890-abcd-ef1234567890/tool-results/12345.txt"
|
||||
# is_allowed_local_path requires the session's encoded cwd to be set
|
||||
token = _current_project_dir.set("-tmp-copilot-abc123")
|
||||
try:
|
||||
|
||||
@@ -10,6 +10,7 @@ import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any, cast
|
||||
@@ -38,6 +39,7 @@ from backend.util.settings import Settings
|
||||
|
||||
from ..config import ChatConfig
|
||||
from ..constants import COPILOT_ERROR_PREFIX, COPILOT_SYSTEM_PREFIX
|
||||
from ..context import encode_cwd_for_cli
|
||||
from ..model import (
|
||||
ChatMessage,
|
||||
ChatSession,
|
||||
@@ -75,7 +77,7 @@ from .tool_adapter import (
|
||||
wait_for_stash,
|
||||
)
|
||||
from .transcript import (
|
||||
cleanup_cli_project_dir,
|
||||
cleanup_stale_project_dirs,
|
||||
download_transcript,
|
||||
read_compacted_entries,
|
||||
upload_transcript,
|
||||
@@ -143,6 +145,9 @@ _background_tasks: set[asyncio.Task[Any]] = set()
|
||||
|
||||
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
|
||||
|
||||
_last_sweep_time: float = 0.0
|
||||
_SWEEP_INTERVAL_SECONDS = 300 # 5 minutes
|
||||
|
||||
# Heartbeat interval — keep SSE alive through proxies/LBs during tool execution.
|
||||
# IMPORTANT: Must be less than frontend timeout (12s in useCopilotPage.ts)
|
||||
_HEARTBEAT_INTERVAL = 10.0 # seconds
|
||||
@@ -281,31 +286,34 @@ def _make_sdk_cwd(session_id: str) -> str:
|
||||
return cwd
|
||||
|
||||
|
||||
def _cleanup_sdk_tool_results(cwd: str) -> None:
|
||||
async def _cleanup_sdk_tool_results(cwd: str) -> None:
|
||||
"""Remove SDK session artifacts for a specific working directory.
|
||||
|
||||
Cleans up:
|
||||
- ``~/.claude/projects/<encoded-cwd>/`` — CLI session transcripts and
|
||||
tool-result files. Each SDK turn uses a unique cwd, so this directory
|
||||
is safe to remove entirely.
|
||||
- ``/tmp/copilot-<session>/`` — the ephemeral working directory.
|
||||
Cleans up the ephemeral working directory ``/tmp/copilot-<session>/``.
|
||||
|
||||
Also sweeps stale CLI project directories (older than 12 h) to prevent
|
||||
unbounded disk growth. The sweep is best-effort, rate-limited to once
|
||||
every 5 minutes, and capped at 50 directories per sweep.
|
||||
|
||||
Security: *cwd* MUST be created by ``_make_sdk_cwd()`` which sanitizes
|
||||
the session_id.
|
||||
"""
|
||||
normalized = os.path.normpath(cwd)
|
||||
if not normalized.startswith(_SDK_CWD_PREFIX):
|
||||
logger.warning(f"[SDK] Rejecting cleanup for path outside workspace: {cwd}")
|
||||
logger.warning("[SDK] Rejecting cleanup for path outside workspace: %s", cwd)
|
||||
return
|
||||
|
||||
# Clean the CLI's project directory (transcripts + tool-results).
|
||||
cleanup_cli_project_dir(cwd)
|
||||
await asyncio.to_thread(shutil.rmtree, normalized, True)
|
||||
|
||||
# Clean up the temp cwd directory itself.
|
||||
try:
|
||||
shutil.rmtree(normalized, ignore_errors=True)
|
||||
except OSError:
|
||||
pass
|
||||
# Best-effort sweep of old project dirs to prevent disk leak.
|
||||
# Pass the encoded cwd so only this session's project directory is swept,
|
||||
# which is safe in multi-tenant environments.
|
||||
global _last_sweep_time
|
||||
now = time.time()
|
||||
if now - _last_sweep_time >= _SWEEP_INTERVAL_SECONDS:
|
||||
_last_sweep_time = now
|
||||
encoded = encode_cwd_for_cli(normalized)
|
||||
await asyncio.to_thread(cleanup_stale_project_dirs, encoded)
|
||||
|
||||
|
||||
def _format_sdk_content_blocks(blocks: list) -> list[dict[str, Any]]:
|
||||
@@ -797,7 +805,7 @@ async def stream_chat_completion_sdk(
|
||||
)
|
||||
except Exception as transcript_err:
|
||||
logger.warning(
|
||||
"%s Transcript download failed, continuing without " "--resume: %s",
|
||||
"%s Transcript download failed, continuing without --resume: %s",
|
||||
log_prefix,
|
||||
transcript_err,
|
||||
)
|
||||
@@ -820,7 +828,7 @@ async def stream_chat_completion_sdk(
|
||||
is_valid = validate_transcript(dl.content)
|
||||
dl_lines = dl.content.strip().split("\n") if dl.content else []
|
||||
logger.info(
|
||||
"%s Downloaded transcript: %dB, %d lines, " "msg_count=%d, valid=%s",
|
||||
"%s Downloaded transcript: %dB, %d lines, msg_count=%d, valid=%s",
|
||||
log_prefix,
|
||||
len(dl.content),
|
||||
len(dl_lines),
|
||||
@@ -1054,8 +1062,7 @@ async def stream_chat_completion_sdk(
|
||||
break
|
||||
|
||||
logger.info(
|
||||
"%s Received: %s %s "
|
||||
"(unresolved=%d, current=%d, resolved=%d)",
|
||||
"%s Received: %s %s (unresolved=%d, current=%d, resolved=%d)",
|
||||
log_prefix,
|
||||
type(sdk_msg).__name__,
|
||||
getattr(sdk_msg, "subtype", ""),
|
||||
@@ -1100,7 +1107,14 @@ async def stream_chat_completion_sdk(
|
||||
and isinstance(sdk_msg, (AssistantMessage, ResultMessage))
|
||||
and not is_parallel_continuation
|
||||
):
|
||||
if await wait_for_stash(timeout=0.5):
|
||||
# 2.0 s timeout: the original 0.5 s caused frequent
|
||||
# timeouts under load (parallel tool calls, large
|
||||
# outputs). 2.0 s gives margin while still failing
|
||||
# fast when the hook genuinely will not fire.
|
||||
if await wait_for_stash(timeout=2.0):
|
||||
# Yield once so any callbacks scheduled by the
|
||||
# stash signal can propagate before we process
|
||||
# the next SDK message.
|
||||
await asyncio.sleep(0)
|
||||
else:
|
||||
logger.warning(
|
||||
@@ -1486,11 +1500,14 @@ async def stream_chat_completion_sdk(
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
if sdk_cwd:
|
||||
_cleanup_sdk_tool_results(sdk_cwd)
|
||||
|
||||
# Release stream lock to allow new streams for this session
|
||||
await lock.release()
|
||||
try:
|
||||
if sdk_cwd:
|
||||
await _cleanup_sdk_tool_results(sdk_cwd)
|
||||
except Exception:
|
||||
logger.warning("%s SDK cleanup failed", log_prefix, exc_info=True)
|
||||
finally:
|
||||
# Release stream lock to allow new streams for this session
|
||||
await lock.release()
|
||||
|
||||
|
||||
async def _update_title_async(
|
||||
|
||||
@@ -288,3 +288,90 @@ class TestPromptSupplement:
|
||||
# Count how many times this tool appears as a bullet point
|
||||
count = docs.count(f"- **`{tool_name}`**")
|
||||
assert count == 1, f"Tool '{tool_name}' appears {count} times (should be 1)"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _cleanup_sdk_tool_results — orchestration + rate-limiting
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCleanupSdkToolResults:
|
||||
"""Tests for _cleanup_sdk_tool_results orchestration and sweep rate-limiting."""
|
||||
|
||||
# All valid cwds must start with /tmp/copilot- (the _SDK_CWD_PREFIX).
|
||||
_CWD_PREFIX = "/tmp/copilot-"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_removes_cwd_directory(self):
|
||||
"""Cleanup removes the session working directory."""
|
||||
|
||||
from .service import _cleanup_sdk_tool_results
|
||||
|
||||
cwd = "/tmp/copilot-test-cleanup-remove"
|
||||
os.makedirs(cwd, exist_ok=True)
|
||||
|
||||
with patch("backend.copilot.sdk.service.cleanup_stale_project_dirs"):
|
||||
import backend.copilot.sdk.service as svc_mod
|
||||
|
||||
svc_mod._last_sweep_time = 0.0
|
||||
await _cleanup_sdk_tool_results(cwd)
|
||||
|
||||
assert not os.path.exists(cwd)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sweep_runs_when_interval_elapsed(self):
|
||||
"""cleanup_stale_project_dirs is called when 5-minute interval has elapsed."""
|
||||
|
||||
import backend.copilot.sdk.service as svc_mod
|
||||
|
||||
from .service import _cleanup_sdk_tool_results
|
||||
|
||||
cwd = "/tmp/copilot-test-sweep-elapsed"
|
||||
os.makedirs(cwd, exist_ok=True)
|
||||
|
||||
with patch(
|
||||
"backend.copilot.sdk.service.cleanup_stale_project_dirs"
|
||||
) as mock_sweep:
|
||||
# Set last sweep to a time far in the past
|
||||
svc_mod._last_sweep_time = 0.0
|
||||
await _cleanup_sdk_tool_results(cwd)
|
||||
|
||||
mock_sweep.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sweep_skipped_within_interval(self):
|
||||
"""cleanup_stale_project_dirs is NOT called when within 5-minute interval."""
|
||||
import time
|
||||
|
||||
import backend.copilot.sdk.service as svc_mod
|
||||
|
||||
from .service import _cleanup_sdk_tool_results
|
||||
|
||||
cwd = "/tmp/copilot-test-sweep-ratelimit"
|
||||
os.makedirs(cwd, exist_ok=True)
|
||||
|
||||
with patch(
|
||||
"backend.copilot.sdk.service.cleanup_stale_project_dirs"
|
||||
) as mock_sweep:
|
||||
# Set last sweep to now — interval not elapsed
|
||||
svc_mod._last_sweep_time = time.time()
|
||||
await _cleanup_sdk_tool_results(cwd)
|
||||
|
||||
mock_sweep.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rejects_path_outside_prefix(self, tmp_path):
|
||||
"""Cleanup rejects a cwd that does not start with the expected prefix."""
|
||||
from .service import _cleanup_sdk_tool_results
|
||||
|
||||
evil_cwd = str(tmp_path / "evil-path")
|
||||
os.makedirs(evil_cwd, exist_ok=True)
|
||||
|
||||
with patch(
|
||||
"backend.copilot.sdk.service.cleanup_stale_project_dirs"
|
||||
) as mock_sweep:
|
||||
await _cleanup_sdk_tool_results(evil_cwd)
|
||||
|
||||
# Directory should NOT have been removed (rejected early)
|
||||
assert os.path.exists(evil_cwd)
|
||||
mock_sweep.assert_not_called()
|
||||
|
||||
@@ -146,7 +146,7 @@ def stash_pending_tool_output(tool_name: str, output: Any) -> None:
|
||||
event.set()
|
||||
|
||||
|
||||
async def wait_for_stash(timeout: float = 0.5) -> bool:
|
||||
async def wait_for_stash(timeout: float = 2.0) -> bool:
|
||||
"""Wait for a PostToolUse hook to stash tool output.
|
||||
|
||||
The SDK fires PostToolUse hooks asynchronously via ``start_soon()`` —
|
||||
@@ -155,12 +155,12 @@ async def wait_for_stash(timeout: float = 0.5) -> bool:
|
||||
by waiting on the ``_stash_event``, which is signaled by
|
||||
:func:`stash_pending_tool_output`.
|
||||
|
||||
After the event fires, callers should ``await asyncio.sleep(0)`` to
|
||||
give any remaining concurrent hooks a chance to complete.
|
||||
|
||||
Returns ``True`` if a stash signal was received, ``False`` on timeout.
|
||||
The timeout is a safety net — normally the stash happens within
|
||||
microseconds of yielding to the event loop.
|
||||
|
||||
The 2.0 s default was chosen based on production metrics: the original
|
||||
0.5 s caused frequent timeouts under load (parallel tool calls, large
|
||||
outputs). 2.0 s gives a comfortable margin while still failing fast
|
||||
when the hook genuinely will not fire.
|
||||
"""
|
||||
event = _stash_event.get(None)
|
||||
if event is None:
|
||||
@@ -285,7 +285,7 @@ async def _read_file_handler(args: dict[str, Any]) -> dict[str, Any]:
|
||||
|
||||
resolved = os.path.realpath(os.path.expanduser(file_path))
|
||||
try:
|
||||
with open(resolved) as f:
|
||||
with open(resolved, encoding="utf-8", errors="replace") as f:
|
||||
selected = list(itertools.islice(f, offset, offset + limit))
|
||||
# Cleanup happens in _cleanup_sdk_tool_results after session ends;
|
||||
# don't delete here — the SDK may read in multiple chunks.
|
||||
|
||||
@@ -151,44 +151,110 @@ def _projects_base() -> str:
|
||||
return os.path.realpath(os.path.join(config_dir, "projects"))
|
||||
|
||||
|
||||
def _cli_project_dir(sdk_cwd: str) -> str | None:
|
||||
"""Return the CLI's project directory for a given working directory.
|
||||
_STALE_PROJECT_DIR_SECONDS = 12 * 3600 # 12 hours — matches max session lifetime
|
||||
_MAX_PROJECT_DIRS_TO_SWEEP = 50 # limit per sweep to avoid long pauses
|
||||
|
||||
Returns ``None`` if the path would escape the projects base.
|
||||
|
||||
def cleanup_stale_project_dirs(encoded_cwd: str | None = None) -> int:
|
||||
"""Remove CLI project directories older than ``_STALE_PROJECT_DIR_SECONDS``.
|
||||
|
||||
Each CoPilot SDK turn creates a unique ``~/.claude/projects/<encoded-cwd>/``
|
||||
directory. These are intentionally kept across turns so the model can read
|
||||
tool-result files via ``--resume``. However, after a session ends they
|
||||
become stale. This function sweeps old ones to prevent unbounded disk
|
||||
growth.
|
||||
|
||||
When *encoded_cwd* is provided the sweep is scoped to that single
|
||||
directory, making the operation safe in multi-tenant environments where
|
||||
multiple copilot sessions share the same host. Without it the function
|
||||
falls back to sweeping all directories matching the copilot naming pattern
|
||||
(``-tmp-copilot-``), which is only safe for single-tenant deployments.
|
||||
|
||||
Returns the number of directories removed.
|
||||
"""
|
||||
cwd_encoded = re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(sdk_cwd))
|
||||
projects_base = _projects_base()
|
||||
project_dir = os.path.realpath(os.path.join(projects_base, cwd_encoded))
|
||||
if not os.path.isdir(projects_base):
|
||||
return 0
|
||||
|
||||
if not project_dir.startswith(projects_base + os.sep):
|
||||
logger.warning(
|
||||
"[Transcript] Project dir escaped projects base: %s", project_dir
|
||||
)
|
||||
return None
|
||||
return project_dir
|
||||
now = time.time()
|
||||
removed = 0
|
||||
|
||||
|
||||
def _safe_glob_jsonl(project_dir: str) -> list[Path]:
|
||||
"""Glob ``*.jsonl`` files, filtering out symlinks that escape the directory."""
|
||||
try:
|
||||
resolved_base = Path(project_dir).resolve()
|
||||
except OSError as e:
|
||||
logger.warning("[Transcript] Failed to resolve project dir: %s", e)
|
||||
return []
|
||||
|
||||
result: list[Path] = []
|
||||
for candidate in Path(project_dir).glob("*.jsonl"):
|
||||
try:
|
||||
resolved = candidate.resolve()
|
||||
if resolved.is_relative_to(resolved_base):
|
||||
result.append(resolved)
|
||||
except (OSError, RuntimeError) as e:
|
||||
logger.debug(
|
||||
"[Transcript] Skipping invalid CLI session candidate %s: %s",
|
||||
candidate,
|
||||
e,
|
||||
# Scoped mode: only clean up the one directory for the current session.
|
||||
if encoded_cwd:
|
||||
target = Path(projects_base) / encoded_cwd
|
||||
if not target.is_dir():
|
||||
return 0
|
||||
# Guard: only sweep copilot-generated dirs.
|
||||
if "-tmp-copilot-" not in target.name:
|
||||
logger.warning(
|
||||
"[Transcript] Refusing to sweep non-copilot dir: %s", target.name
|
||||
)
|
||||
return result
|
||||
return 0
|
||||
try:
|
||||
# st_mtime is used as a proxy for session activity. Claude CLI writes
|
||||
# its JSONL transcript into this directory during each turn, so mtime
|
||||
# advances on every turn. A directory whose mtime is older than
|
||||
# _STALE_PROJECT_DIR_SECONDS has not had an active turn in that window
|
||||
# and is safe to remove (the session cannot --resume after cleanup).
|
||||
age = now - target.stat().st_mtime
|
||||
except OSError:
|
||||
return 0
|
||||
if age < _STALE_PROJECT_DIR_SECONDS:
|
||||
return 0
|
||||
try:
|
||||
shutil.rmtree(target, ignore_errors=True)
|
||||
removed = 1
|
||||
except OSError:
|
||||
pass
|
||||
if removed:
|
||||
logger.info(
|
||||
"[Transcript] Swept stale CLI project dir %s (age %ds > %ds)",
|
||||
target.name,
|
||||
int(age),
|
||||
_STALE_PROJECT_DIR_SECONDS,
|
||||
)
|
||||
return removed
|
||||
|
||||
# Unscoped fallback: sweep all copilot dirs across the projects base.
|
||||
# Only safe for single-tenant deployments; callers should prefer the
|
||||
# scoped variant by passing encoded_cwd.
|
||||
try:
|
||||
entries = Path(projects_base).iterdir()
|
||||
except OSError as e:
|
||||
logger.warning("[Transcript] Failed to list projects dir: %s", e)
|
||||
return 0
|
||||
|
||||
for entry in entries:
|
||||
if removed >= _MAX_PROJECT_DIRS_TO_SWEEP:
|
||||
break
|
||||
# Only sweep copilot-generated dirs (pattern: -tmp-copilot- or
|
||||
# -private-tmp-copilot-).
|
||||
if "-tmp-copilot-" not in entry.name:
|
||||
continue
|
||||
if not entry.is_dir():
|
||||
continue
|
||||
try:
|
||||
# See the scoped-mode comment above: st_mtime advances on every turn,
|
||||
# so a stale mtime reliably indicates an inactive session.
|
||||
age = now - entry.stat().st_mtime
|
||||
except OSError:
|
||||
continue
|
||||
if age < _STALE_PROJECT_DIR_SECONDS:
|
||||
continue
|
||||
|
||||
try:
|
||||
shutil.rmtree(entry, ignore_errors=True)
|
||||
removed += 1
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
if removed:
|
||||
logger.info(
|
||||
"[Transcript] Swept %d stale CLI project dirs (older than %ds)",
|
||||
removed,
|
||||
_STALE_PROJECT_DIR_SECONDS,
|
||||
)
|
||||
return removed
|
||||
|
||||
|
||||
def read_compacted_entries(transcript_path: str) -> list[dict] | None:
|
||||
@@ -255,63 +321,6 @@ def read_compacted_entries(transcript_path: str) -> list[dict] | None:
|
||||
return entries
|
||||
|
||||
|
||||
def read_cli_session_file(sdk_cwd: str) -> str | None:
|
||||
"""Read the CLI's own session file, which reflects any compaction.
|
||||
|
||||
The CLI writes its session transcript to
|
||||
``~/.claude/projects/<encoded_cwd>/<session_id>.jsonl``.
|
||||
Since each SDK turn uses a unique ``sdk_cwd``, there should be
|
||||
exactly one ``.jsonl`` file in that directory.
|
||||
|
||||
Returns the file content, or ``None`` if not found.
|
||||
"""
|
||||
project_dir = _cli_project_dir(sdk_cwd)
|
||||
if not project_dir or not os.path.isdir(project_dir):
|
||||
return None
|
||||
|
||||
jsonl_files = _safe_glob_jsonl(project_dir)
|
||||
if not jsonl_files:
|
||||
logger.debug("[Transcript] No CLI session file found in %s", project_dir)
|
||||
return None
|
||||
|
||||
# Pick the most recently modified file (should be only one per turn).
|
||||
try:
|
||||
session_file = max(jsonl_files, key=lambda p: p.stat().st_mtime)
|
||||
except OSError as e:
|
||||
logger.warning("[Transcript] Failed to inspect CLI session files: %s", e)
|
||||
return None
|
||||
|
||||
try:
|
||||
content = session_file.read_text()
|
||||
logger.info(
|
||||
"[Transcript] Read CLI session file: %s (%d bytes)",
|
||||
session_file,
|
||||
len(content),
|
||||
)
|
||||
return content
|
||||
except OSError as e:
|
||||
logger.warning("[Transcript] Failed to read CLI session file: %s", e)
|
||||
return None
|
||||
|
||||
|
||||
def cleanup_cli_project_dir(sdk_cwd: str) -> None:
|
||||
"""Remove the CLI's project directory for a specific working directory.
|
||||
|
||||
The CLI stores session data under ``~/.claude/projects/<encoded_cwd>/``.
|
||||
Each SDK turn uses a unique ``sdk_cwd``, so the project directory is
|
||||
safe to remove entirely after the transcript has been uploaded.
|
||||
"""
|
||||
project_dir = _cli_project_dir(sdk_cwd)
|
||||
if not project_dir:
|
||||
return
|
||||
|
||||
if os.path.isdir(project_dir):
|
||||
shutil.rmtree(project_dir, ignore_errors=True)
|
||||
logger.debug("[Transcript] Cleaned up CLI project dir: %s", project_dir)
|
||||
else:
|
||||
logger.debug("[Transcript] Project dir not found: %s", project_dir)
|
||||
|
||||
|
||||
def write_transcript_to_tempfile(
|
||||
transcript_content: str,
|
||||
session_id: str,
|
||||
|
||||
@@ -9,9 +9,7 @@ from backend.util import json
|
||||
|
||||
from .transcript import (
|
||||
STRIPPABLE_TYPES,
|
||||
_cli_project_dir,
|
||||
delete_transcript,
|
||||
read_cli_session_file,
|
||||
read_compacted_entries,
|
||||
strip_progress_entries,
|
||||
validate_transcript,
|
||||
@@ -292,85 +290,6 @@ class TestStripProgressEntries:
|
||||
assert asst_entry["parentUuid"] == "u1" # reparented
|
||||
|
||||
|
||||
# --- read_cli_session_file ---
|
||||
|
||||
|
||||
class TestReadCliSessionFile:
|
||||
def test_no_matching_files_returns_none(self, tmp_path, monkeypatch):
|
||||
"""read_cli_session_file returns None when no .jsonl files exist."""
|
||||
# Create a project dir with no jsonl files
|
||||
project_dir = tmp_path / "projects" / "encoded-cwd"
|
||||
project_dir.mkdir(parents=True)
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._cli_project_dir",
|
||||
lambda sdk_cwd: str(project_dir),
|
||||
)
|
||||
assert read_cli_session_file("/fake/cwd") is None
|
||||
|
||||
def test_one_jsonl_file_returns_content(self, tmp_path, monkeypatch):
|
||||
"""read_cli_session_file returns the content of a single .jsonl file."""
|
||||
project_dir = tmp_path / "projects" / "encoded-cwd"
|
||||
project_dir.mkdir(parents=True)
|
||||
jsonl_file = project_dir / "session.jsonl"
|
||||
jsonl_file.write_text("line1\nline2\n")
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._cli_project_dir",
|
||||
lambda sdk_cwd: str(project_dir),
|
||||
)
|
||||
result = read_cli_session_file("/fake/cwd")
|
||||
assert result == "line1\nline2\n"
|
||||
|
||||
def test_symlink_escaping_project_dir_is_skipped(self, tmp_path, monkeypatch):
|
||||
"""read_cli_session_file skips symlinks that escape the project dir."""
|
||||
project_dir = tmp_path / "projects" / "encoded-cwd"
|
||||
project_dir.mkdir(parents=True)
|
||||
|
||||
# Create a file outside the project dir
|
||||
outside = tmp_path / "outside"
|
||||
outside.mkdir()
|
||||
outside_file = outside / "evil.jsonl"
|
||||
outside_file.write_text("should not be read\n")
|
||||
|
||||
# Symlink from inside project_dir to outside file
|
||||
symlink = project_dir / "evil.jsonl"
|
||||
symlink.symlink_to(outside_file)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._cli_project_dir",
|
||||
lambda sdk_cwd: str(project_dir),
|
||||
)
|
||||
# The symlink target resolves outside project_dir, so it should be skipped
|
||||
result = read_cli_session_file("/fake/cwd")
|
||||
assert result is None
|
||||
|
||||
|
||||
# --- _cli_project_dir ---
|
||||
|
||||
|
||||
class TestCliProjectDir:
|
||||
def test_returns_none_for_path_traversal(self, tmp_path, monkeypatch):
|
||||
"""_cli_project_dir returns None when the project dir symlink escapes projects base."""
|
||||
config_dir = tmp_path / "config"
|
||||
config_dir.mkdir()
|
||||
projects_dir = config_dir / "projects"
|
||||
projects_dir.mkdir()
|
||||
|
||||
monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(config_dir))
|
||||
|
||||
# Create a symlink inside projects/ that points outside of it.
|
||||
# _cli_project_dir encodes the cwd as all-alnum-hyphens, so use a
|
||||
# cwd whose encoded form matches the symlink name we create.
|
||||
evil_target = tmp_path / "escaped"
|
||||
evil_target.mkdir()
|
||||
|
||||
# The encoded form of "/evil/cwd" is "-evil-cwd"
|
||||
symlink_path = projects_dir / "-evil-cwd"
|
||||
symlink_path.symlink_to(evil_target)
|
||||
|
||||
result = _cli_project_dir("/evil/cwd")
|
||||
assert result is None
|
||||
|
||||
|
||||
# --- delete_transcript ---
|
||||
|
||||
|
||||
@@ -897,3 +816,209 @@ class TestCompactionFlowIntegration:
|
||||
output2 = builder2.to_jsonl()
|
||||
lines2 = [json.loads(line) for line in output2.strip().split("\n")]
|
||||
assert lines2[-1]["parentUuid"] == "a2"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# cleanup_stale_project_dirs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCleanupStaleProjectDirs:
|
||||
"""Tests for cleanup_stale_project_dirs (disk leak prevention)."""
|
||||
|
||||
def test_removes_old_copilot_dirs(self, tmp_path, monkeypatch):
|
||||
"""Directories matching copilot pattern older than threshold are removed."""
|
||||
from backend.copilot.sdk.transcript import (
|
||||
_STALE_PROJECT_DIR_SECONDS,
|
||||
cleanup_stale_project_dirs,
|
||||
)
|
||||
|
||||
projects_dir = tmp_path / "projects"
|
||||
projects_dir.mkdir()
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._projects_base",
|
||||
lambda: str(projects_dir),
|
||||
)
|
||||
|
||||
# Create a stale dir
|
||||
stale = projects_dir / "-tmp-copilot-old-session"
|
||||
stale.mkdir()
|
||||
# Set mtime to past the threshold
|
||||
import time
|
||||
|
||||
old_time = time.time() - _STALE_PROJECT_DIR_SECONDS - 100
|
||||
os.utime(stale, (old_time, old_time))
|
||||
|
||||
# Create a fresh dir
|
||||
fresh = projects_dir / "-tmp-copilot-new-session"
|
||||
fresh.mkdir()
|
||||
|
||||
removed = cleanup_stale_project_dirs()
|
||||
assert removed == 1
|
||||
assert not stale.exists()
|
||||
assert fresh.exists()
|
||||
|
||||
def test_ignores_non_copilot_dirs(self, tmp_path, monkeypatch):
|
||||
"""Directories not matching copilot pattern are left alone."""
|
||||
from backend.copilot.sdk.transcript import cleanup_stale_project_dirs
|
||||
|
||||
projects_dir = tmp_path / "projects"
|
||||
projects_dir.mkdir()
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._projects_base",
|
||||
lambda: str(projects_dir),
|
||||
)
|
||||
|
||||
# Non-copilot dir that's old
|
||||
import time
|
||||
|
||||
other = projects_dir / "some-other-project"
|
||||
other.mkdir()
|
||||
old_time = time.time() - 999999
|
||||
os.utime(other, (old_time, old_time))
|
||||
|
||||
removed = cleanup_stale_project_dirs()
|
||||
assert removed == 0
|
||||
assert other.exists()
|
||||
|
||||
def test_ttl_boundary_not_removed(self, tmp_path, monkeypatch):
|
||||
"""A directory exactly at the TTL boundary should NOT be removed."""
|
||||
from backend.copilot.sdk.transcript import (
|
||||
_STALE_PROJECT_DIR_SECONDS,
|
||||
cleanup_stale_project_dirs,
|
||||
)
|
||||
|
||||
projects_dir = tmp_path / "projects"
|
||||
projects_dir.mkdir()
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._projects_base",
|
||||
lambda: str(projects_dir),
|
||||
)
|
||||
|
||||
import time
|
||||
|
||||
# Dir that's exactly at the TTL (age == threshold, not >) — should survive
|
||||
boundary = projects_dir / "-tmp-copilot-boundary"
|
||||
boundary.mkdir()
|
||||
boundary_time = time.time() - _STALE_PROJECT_DIR_SECONDS + 1
|
||||
os.utime(boundary, (boundary_time, boundary_time))
|
||||
|
||||
removed = cleanup_stale_project_dirs()
|
||||
assert removed == 0
|
||||
assert boundary.exists()
|
||||
|
||||
def test_skips_non_directory_entries(self, tmp_path, monkeypatch):
|
||||
"""Regular files matching the copilot pattern are not removed."""
|
||||
from backend.copilot.sdk.transcript import (
|
||||
_STALE_PROJECT_DIR_SECONDS,
|
||||
cleanup_stale_project_dirs,
|
||||
)
|
||||
|
||||
projects_dir = tmp_path / "projects"
|
||||
projects_dir.mkdir()
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._projects_base",
|
||||
lambda: str(projects_dir),
|
||||
)
|
||||
|
||||
import time
|
||||
|
||||
# Create a regular FILE (not a dir) with the copilot pattern name
|
||||
stale_file = projects_dir / "-tmp-copilot-stale-file"
|
||||
stale_file.write_text("not a dir")
|
||||
old_time = time.time() - _STALE_PROJECT_DIR_SECONDS - 100
|
||||
os.utime(stale_file, (old_time, old_time))
|
||||
|
||||
removed = cleanup_stale_project_dirs()
|
||||
assert removed == 0
|
||||
assert stale_file.exists()
|
||||
|
||||
def test_missing_base_dir_returns_zero(self, tmp_path, monkeypatch):
|
||||
"""If the projects base directory doesn't exist, return 0 gracefully."""
|
||||
from backend.copilot.sdk.transcript import cleanup_stale_project_dirs
|
||||
|
||||
nonexistent = str(tmp_path / "does-not-exist" / "projects")
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._projects_base",
|
||||
lambda: nonexistent,
|
||||
)
|
||||
|
||||
removed = cleanup_stale_project_dirs()
|
||||
assert removed == 0
|
||||
|
||||
def test_scoped_removes_only_target_dir(self, tmp_path, monkeypatch):
|
||||
"""When encoded_cwd is supplied only that directory is swept."""
|
||||
import time
|
||||
|
||||
from backend.copilot.sdk.transcript import (
|
||||
_STALE_PROJECT_DIR_SECONDS,
|
||||
cleanup_stale_project_dirs,
|
||||
)
|
||||
|
||||
projects_dir = tmp_path / "projects"
|
||||
projects_dir.mkdir()
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._projects_base",
|
||||
lambda: str(projects_dir),
|
||||
)
|
||||
|
||||
old_time = time.time() - _STALE_PROJECT_DIR_SECONDS - 100
|
||||
|
||||
# Two stale copilot dirs
|
||||
target = projects_dir / "-tmp-copilot-session-abc"
|
||||
target.mkdir()
|
||||
os.utime(target, (old_time, old_time))
|
||||
|
||||
other = projects_dir / "-tmp-copilot-session-xyz"
|
||||
other.mkdir()
|
||||
os.utime(other, (old_time, old_time))
|
||||
|
||||
# Only the target dir should be removed
|
||||
removed = cleanup_stale_project_dirs(encoded_cwd="-tmp-copilot-session-abc")
|
||||
assert removed == 1
|
||||
assert not target.exists()
|
||||
assert other.exists() # untouched — not the current session
|
||||
|
||||
def test_scoped_fresh_dir_not_removed(self, tmp_path, monkeypatch):
|
||||
"""Scoped sweep leaves a fresh directory alone."""
|
||||
from backend.copilot.sdk.transcript import cleanup_stale_project_dirs
|
||||
|
||||
projects_dir = tmp_path / "projects"
|
||||
projects_dir.mkdir()
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._projects_base",
|
||||
lambda: str(projects_dir),
|
||||
)
|
||||
|
||||
fresh = projects_dir / "-tmp-copilot-session-new"
|
||||
fresh.mkdir()
|
||||
# mtime is now — well within TTL
|
||||
|
||||
removed = cleanup_stale_project_dirs(encoded_cwd="-tmp-copilot-session-new")
|
||||
assert removed == 0
|
||||
assert fresh.exists()
|
||||
|
||||
def test_scoped_non_copilot_dir_not_removed(self, tmp_path, monkeypatch):
|
||||
"""Scoped sweep refuses to remove a non-copilot directory."""
|
||||
import time
|
||||
|
||||
from backend.copilot.sdk.transcript import (
|
||||
_STALE_PROJECT_DIR_SECONDS,
|
||||
cleanup_stale_project_dirs,
|
||||
)
|
||||
|
||||
projects_dir = tmp_path / "projects"
|
||||
projects_dir.mkdir()
|
||||
monkeypatch.setattr(
|
||||
"backend.copilot.sdk.transcript._projects_base",
|
||||
lambda: str(projects_dir),
|
||||
)
|
||||
|
||||
old_time = time.time() - _STALE_PROJECT_DIR_SECONDS - 100
|
||||
non_copilot = projects_dir / "some-other-project"
|
||||
non_copilot.mkdir()
|
||||
os.utime(non_copilot, (old_time, old_time))
|
||||
|
||||
removed = cleanup_stale_project_dirs(encoded_cwd="some-other-project")
|
||||
assert removed == 0
|
||||
assert non_copilot.exists()
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import base64
|
||||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
from typing import Any, Optional
|
||||
|
||||
@@ -10,7 +11,9 @@ from pydantic import BaseModel
|
||||
from backend.copilot.context import (
|
||||
E2B_WORKDIR,
|
||||
get_current_sandbox,
|
||||
get_sdk_cwd,
|
||||
get_workspace_manager,
|
||||
is_allowed_local_path,
|
||||
resolve_sandbox_path,
|
||||
)
|
||||
from backend.copilot.model import ChatSession
|
||||
@@ -24,6 +27,10 @@ from .models import ErrorResponse, ResponseType, ToolResponseBase
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Sentinel file_id used when a tool-result file is read directly from the local
|
||||
# host filesystem (rather than from workspace storage).
|
||||
_LOCAL_TOOL_RESULT_FILE_ID = "local"
|
||||
|
||||
|
||||
async def _resolve_write_content(
|
||||
content_text: str | None,
|
||||
@@ -275,6 +282,93 @@ class WorkspaceFileContentResponse(ToolResponseBase):
|
||||
content_base64: str
|
||||
|
||||
|
||||
_MAX_LOCAL_TOOL_RESULT_BYTES = 10 * 1024 * 1024 # 10 MB
|
||||
|
||||
|
||||
def _read_local_tool_result(
|
||||
path: str,
|
||||
char_offset: int,
|
||||
char_length: Optional[int],
|
||||
session_id: str,
|
||||
sdk_cwd: str | None = None,
|
||||
) -> ToolResponseBase:
|
||||
"""Read an SDK tool-result file from local disk.
|
||||
|
||||
This is a fallback for when the model mistakenly calls
|
||||
``read_workspace_file`` with an SDK tool-result path that only exists on
|
||||
the host filesystem, not in cloud workspace storage.
|
||||
|
||||
Defence-in-depth: validates *path* via :func:`is_allowed_local_path`
|
||||
regardless of what the caller has already checked.
|
||||
"""
|
||||
# TOCTOU: path validated then opened separately. Acceptable because
|
||||
# the tool-results directory is server-controlled, not user-writable.
|
||||
expanded = os.path.realpath(os.path.expanduser(path))
|
||||
# Defence-in-depth: re-check with resolved path (caller checked raw path).
|
||||
if not is_allowed_local_path(expanded, sdk_cwd or get_sdk_cwd()):
|
||||
return ErrorResponse(
|
||||
message=f"Path not allowed: {os.path.basename(path)}", session_id=session_id
|
||||
)
|
||||
try:
|
||||
# The 10 MB cap (_MAX_LOCAL_TOOL_RESULT_BYTES) bounds memory usage.
|
||||
# Pre-read size check prevents loading files far above the cap;
|
||||
# the remaining TOCTOU gap is acceptable for server-controlled paths.
|
||||
file_size = os.path.getsize(expanded)
|
||||
if file_size > _MAX_LOCAL_TOOL_RESULT_BYTES:
|
||||
return ErrorResponse(
|
||||
message=(f"File too large: {os.path.basename(path)}"),
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Detect binary files: try strict UTF-8 first, fall back to
|
||||
# base64-encoding the raw bytes for binary content.
|
||||
with open(expanded, "rb") as fh:
|
||||
raw = fh.read()
|
||||
try:
|
||||
text_content = raw.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
# Binary file — return raw base64, ignore char_offset/char_length
|
||||
return WorkspaceFileContentResponse(
|
||||
file_id=_LOCAL_TOOL_RESULT_FILE_ID,
|
||||
name=os.path.basename(path),
|
||||
path=path,
|
||||
mime_type=mimetypes.guess_type(path)[0] or "application/octet-stream",
|
||||
content_base64=base64.b64encode(raw).decode("ascii"),
|
||||
message=(
|
||||
f"Read {file_size:,} bytes (binary) from local tool-result "
|
||||
f"{os.path.basename(path)}"
|
||||
),
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
end = (
|
||||
char_offset + char_length if char_length is not None else len(text_content)
|
||||
)
|
||||
slice_text = text_content[char_offset:end]
|
||||
except FileNotFoundError:
|
||||
return ErrorResponse(
|
||||
message=f"File not found: {os.path.basename(path)}", session_id=session_id
|
||||
)
|
||||
except Exception as exc:
|
||||
return ErrorResponse(
|
||||
message=f"Error reading file: {type(exc).__name__}", session_id=session_id
|
||||
)
|
||||
|
||||
return WorkspaceFileContentResponse(
|
||||
file_id=_LOCAL_TOOL_RESULT_FILE_ID,
|
||||
name=os.path.basename(path),
|
||||
path=path,
|
||||
mime_type=mimetypes.guess_type(path)[0] or "text/plain",
|
||||
content_base64=base64.b64encode(slice_text.encode("utf-8")).decode("ascii"),
|
||||
message=(
|
||||
f"Read chars {char_offset}\u2013{char_offset + len(slice_text)} "
|
||||
f"of {len(text_content):,} chars from local tool-result "
|
||||
f"{os.path.basename(path)}"
|
||||
),
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
|
||||
class WorkspaceFileMetadataResponse(ToolResponseBase):
|
||||
"""Response containing workspace file metadata and download URL (prevents context bloat)."""
|
||||
|
||||
@@ -533,6 +627,14 @@ class ReadWorkspaceFileTool(BaseTool):
|
||||
manager = await get_workspace_manager(user_id, session_id)
|
||||
resolved = await _resolve_file(manager, file_id, path, session_id)
|
||||
if isinstance(resolved, ErrorResponse):
|
||||
# Fallback: if the path is an SDK tool-result on local disk,
|
||||
# read it directly instead of failing. The model sometimes
|
||||
# calls read_workspace_file for these paths by mistake.
|
||||
sdk_cwd = get_sdk_cwd()
|
||||
if path and is_allowed_local_path(path, sdk_cwd):
|
||||
return _read_local_tool_result(
|
||||
path, char_offset, char_length, session_id, sdk_cwd=sdk_cwd
|
||||
)
|
||||
return resolved
|
||||
target_file_id, file_info = resolved
|
||||
|
||||
|
||||
@@ -2,18 +2,25 @@
|
||||
|
||||
import base64
|
||||
import os
|
||||
import shutil
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.copilot.context import SDK_PROJECTS_DIR, _current_project_dir
|
||||
from backend.copilot.tools._test_data import make_session, setup_test_data
|
||||
from backend.copilot.tools.models import ErrorResponse
|
||||
from backend.copilot.tools.workspace_files import (
|
||||
_MAX_LOCAL_TOOL_RESULT_BYTES,
|
||||
DeleteWorkspaceFileTool,
|
||||
ListWorkspaceFilesTool,
|
||||
ReadWorkspaceFileTool,
|
||||
WorkspaceDeleteResponse,
|
||||
WorkspaceFileContentResponse,
|
||||
WorkspaceFileListResponse,
|
||||
WorkspaceWriteResponse,
|
||||
WriteWorkspaceFileTool,
|
||||
_read_local_tool_result,
|
||||
_resolve_write_content,
|
||||
_validate_ephemeral_path,
|
||||
)
|
||||
@@ -325,3 +332,294 @@ async def test_write_workspace_file_source_path(setup_test_data):
|
||||
await delete_tool._execute(
|
||||
user_id=user.id, session=session, file_id=write_resp.file_id
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _read_local_tool_result — local disk fallback for SDK tool-result files
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_CONV_UUID = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
|
||||
|
||||
|
||||
class TestReadLocalToolResult:
|
||||
"""Tests for _read_local_tool_result (local disk fallback)."""
|
||||
|
||||
def _make_tool_result(self, encoded: str, filename: str, content: bytes) -> str:
|
||||
"""Create a tool-results file and return its path."""
|
||||
tool_dir = os.path.join(SDK_PROJECTS_DIR, encoded, _CONV_UUID, "tool-results")
|
||||
os.makedirs(tool_dir, exist_ok=True)
|
||||
filepath = os.path.join(tool_dir, filename)
|
||||
with open(filepath, "wb") as f:
|
||||
f.write(content)
|
||||
return filepath
|
||||
|
||||
def _cleanup(self, encoded: str) -> None:
|
||||
shutil.rmtree(os.path.join(SDK_PROJECTS_DIR, encoded), ignore_errors=True)
|
||||
|
||||
def test_read_text_file(self):
|
||||
"""Read a UTF-8 text tool-result file."""
|
||||
encoded = "-tmp-copilot-local-read-text"
|
||||
path = self._make_tool_result(encoded, "output.txt", b"hello world")
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
result = _read_local_tool_result(path, 0, None, "s1")
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
decoded = base64.b64decode(result.content_base64).decode("utf-8")
|
||||
assert decoded == "hello world"
|
||||
assert "text/plain" in result.mime_type
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
def test_read_text_with_offset(self):
|
||||
"""Read a slice of a text file using char_offset and char_length."""
|
||||
encoded = "-tmp-copilot-local-read-offset"
|
||||
path = self._make_tool_result(encoded, "data.txt", b"ABCDEFGHIJ")
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
result = _read_local_tool_result(path, 3, 4, "s1")
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
decoded = base64.b64decode(result.content_base64).decode("utf-8")
|
||||
assert decoded == "DEFG"
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
def test_read_binary_file(self):
|
||||
"""Binary files are returned as raw base64."""
|
||||
encoded = "-tmp-copilot-local-read-binary"
|
||||
binary_data = bytes(range(256))
|
||||
path = self._make_tool_result(encoded, "image.png", binary_data)
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
result = _read_local_tool_result(path, 0, None, "s1")
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
decoded = base64.b64decode(result.content_base64)
|
||||
assert decoded == binary_data
|
||||
assert "binary" in result.message
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
def test_disallowed_path_rejected(self):
|
||||
"""Paths not under allowed directories are rejected."""
|
||||
result = _read_local_tool_result("/etc/passwd", 0, None, "s1")
|
||||
assert isinstance(result, ErrorResponse)
|
||||
assert "not allowed" in result.message.lower()
|
||||
|
||||
def test_file_not_found(self):
|
||||
"""Missing files return an error."""
|
||||
encoded = "-tmp-copilot-local-read-missing"
|
||||
tool_dir = os.path.join(SDK_PROJECTS_DIR, encoded, _CONV_UUID, "tool-results")
|
||||
os.makedirs(tool_dir, exist_ok=True)
|
||||
path = os.path.join(tool_dir, "nope.txt")
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
result = _read_local_tool_result(path, 0, None, "s1")
|
||||
assert isinstance(result, ErrorResponse)
|
||||
assert "not found" in result.message.lower()
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
def test_file_too_large(self, monkeypatch):
|
||||
"""Files exceeding the size limit are rejected."""
|
||||
encoded = "-tmp-copilot-local-read-large"
|
||||
# Create a small file but fake os.path.getsize to return a huge value
|
||||
path = self._make_tool_result(encoded, "big.txt", b"small")
|
||||
token = _current_project_dir.set(encoded)
|
||||
monkeypatch.setattr(
|
||||
"os.path.getsize", lambda _: _MAX_LOCAL_TOOL_RESULT_BYTES + 1
|
||||
)
|
||||
try:
|
||||
result = _read_local_tool_result(path, 0, None, "s1")
|
||||
assert isinstance(result, ErrorResponse)
|
||||
assert "too large" in result.message.lower()
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
def test_offset_beyond_file_length(self):
|
||||
"""Offset past end-of-file returns empty content."""
|
||||
encoded = "-tmp-copilot-local-read-past-eof"
|
||||
path = self._make_tool_result(encoded, "short.txt", b"abc")
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
result = _read_local_tool_result(path, 999, 10, "s1")
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
decoded = base64.b64decode(result.content_base64).decode("utf-8")
|
||||
assert decoded == ""
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
def test_zero_length_read(self):
|
||||
"""Requesting zero characters returns empty content."""
|
||||
encoded = "-tmp-copilot-local-read-zero-len"
|
||||
path = self._make_tool_result(encoded, "data.txt", b"ABCDEF")
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
result = _read_local_tool_result(path, 2, 0, "s1")
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
decoded = base64.b64decode(result.content_base64).decode("utf-8")
|
||||
assert decoded == ""
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
def test_mime_type_from_json_extension(self):
|
||||
"""JSON files get application/json MIME type, not hardcoded text/plain."""
|
||||
encoded = "-tmp-copilot-local-read-json"
|
||||
path = self._make_tool_result(encoded, "result.json", b'{"key": "value"}')
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
result = _read_local_tool_result(path, 0, None, "s1")
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
assert result.mime_type == "application/json"
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
def test_mime_type_from_png_extension(self):
|
||||
"""Binary .png files get image/png MIME type via mimetypes."""
|
||||
encoded = "-tmp-copilot-local-read-png-mime"
|
||||
binary_data = bytes(range(256))
|
||||
path = self._make_tool_result(encoded, "chart.png", binary_data)
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
result = _read_local_tool_result(path, 0, None, "s1")
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
assert result.mime_type == "image/png"
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
def test_explicit_sdk_cwd_parameter(self):
|
||||
"""The sdk_cwd parameter overrides get_sdk_cwd() for path validation."""
|
||||
encoded = "-tmp-copilot-local-read-sdkcwd"
|
||||
path = self._make_tool_result(encoded, "out.txt", b"content")
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
# Pass sdk_cwd explicitly — should still succeed because the path
|
||||
# is under SDK_PROJECTS_DIR which is always allowed.
|
||||
result = _read_local_tool_result(
|
||||
path, 0, None, "s1", sdk_cwd="/tmp/copilot-test"
|
||||
)
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
decoded = base64.b64decode(result.content_base64).decode("utf-8")
|
||||
assert decoded == "content"
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
def test_offset_with_no_length_reads_to_end(self):
|
||||
"""When char_length is None, read from offset to end of file."""
|
||||
encoded = "-tmp-copilot-local-read-offset-noLen"
|
||||
path = self._make_tool_result(encoded, "data.txt", b"0123456789")
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
result = _read_local_tool_result(path, 5, None, "s1")
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
decoded = base64.b64decode(result.content_base64).decode("utf-8")
|
||||
assert decoded == "56789"
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
self._cleanup(encoded)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ReadWorkspaceFileTool fallback to _read_local_tool_result
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_read_workspace_file_falls_back_to_local_tool_result(setup_test_data):
|
||||
"""When _resolve_file returns ErrorResponse for an allowed local path,
|
||||
ReadWorkspaceFileTool should fall back to _read_local_tool_result."""
|
||||
user = setup_test_data["user"]
|
||||
session = make_session(user.id)
|
||||
|
||||
# Create a real tool-result file on disk so the fallback can read it.
|
||||
encoded = "-tmp-copilot-fallback-test"
|
||||
conv_uuid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
|
||||
tool_dir = os.path.join(SDK_PROJECTS_DIR, encoded, conv_uuid, "tool-results")
|
||||
os.makedirs(tool_dir, exist_ok=True)
|
||||
filepath = os.path.join(tool_dir, "result.txt")
|
||||
with open(filepath, "w") as f:
|
||||
f.write("fallback content")
|
||||
|
||||
token = _current_project_dir.set(encoded)
|
||||
try:
|
||||
# Mock _resolve_file to return an ErrorResponse (simulating "file not
|
||||
# found in workspace") so the fallback branch is exercised.
|
||||
mock_resolve = AsyncMock(
|
||||
return_value=ErrorResponse(
|
||||
message="File not found at path: result.txt",
|
||||
session_id=session.session_id,
|
||||
)
|
||||
)
|
||||
with patch("backend.copilot.tools.workspace_files._resolve_file", mock_resolve):
|
||||
read_tool = ReadWorkspaceFileTool()
|
||||
result = await read_tool._execute(
|
||||
user_id=user.id,
|
||||
session=session,
|
||||
path=filepath,
|
||||
)
|
||||
|
||||
# Should have fallen back to _read_local_tool_result and succeeded.
|
||||
assert isinstance(result, WorkspaceFileContentResponse), (
|
||||
f"Expected fallback to local read, got {type(result).__name__}: "
|
||||
f"{getattr(result, 'message', '')}"
|
||||
)
|
||||
decoded = base64.b64decode(result.content_base64).decode("utf-8")
|
||||
assert decoded == "fallback content"
|
||||
mock_resolve.assert_awaited_once()
|
||||
finally:
|
||||
_current_project_dir.reset(token)
|
||||
shutil.rmtree(os.path.join(SDK_PROJECTS_DIR, encoded), ignore_errors=True)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_read_workspace_file_no_fallback_when_resolve_succeeds(setup_test_data):
|
||||
"""When _resolve_file succeeds, the local-disk fallback must NOT be invoked."""
|
||||
user = setup_test_data["user"]
|
||||
session = make_session(user.id)
|
||||
|
||||
fake_file_id = "fake-file-id-001"
|
||||
fake_content = b"workspace content"
|
||||
|
||||
# Build a minimal file_info stub that the tool's happy-path needs.
|
||||
class _FakeFileInfo:
|
||||
id = fake_file_id
|
||||
name = "result.json"
|
||||
path = "/result.json"
|
||||
mime_type = "text/plain"
|
||||
size_bytes = len(fake_content)
|
||||
|
||||
mock_resolve = AsyncMock(return_value=(fake_file_id, _FakeFileInfo()))
|
||||
|
||||
mock_manager = AsyncMock()
|
||||
mock_manager.read_file_by_id = AsyncMock(return_value=fake_content)
|
||||
|
||||
with (
|
||||
patch("backend.copilot.tools.workspace_files._resolve_file", mock_resolve),
|
||||
patch(
|
||||
"backend.copilot.tools.workspace_files.get_workspace_manager",
|
||||
AsyncMock(return_value=mock_manager),
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.tools.workspace_files._read_local_tool_result"
|
||||
) as patched_local,
|
||||
):
|
||||
read_tool = ReadWorkspaceFileTool()
|
||||
result = await read_tool._execute(
|
||||
user_id=user.id,
|
||||
session=session,
|
||||
file_id=fake_file_id,
|
||||
)
|
||||
|
||||
# Fallback must not have been called.
|
||||
patched_local.assert_not_called()
|
||||
# Normal workspace path must have produced a content response.
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
assert base64.b64decode(result.content_base64) == fake_content
|
||||
|
||||
Reference in New Issue
Block a user