diff --git a/autogpt_platform/backend/backend/copilot/sdk/file_tools.py b/autogpt_platform/backend/backend/copilot/sdk/file_tools.py index 095926e139..74244df274 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/file_tools.py +++ b/autogpt_platform/backend/backend/copilot/sdk/file_tools.py @@ -1,21 +1,21 @@ -"""Unified MCP Write tool that works in both E2B and non-E2B modes. +"""Unified MCP file tools (Read/Write/Edit) for both E2B and non-E2B modes. -Replaces the CLI's built-in Write tool, which has no defence against output-token -truncation. When the LLM generates a very large ``content`` argument the API +Replaces the CLI's built-in Write and Edit tools, which have no defence against +output-token truncation. When the LLM generates a very large argument the API truncates the response mid-JSON and Ajv rejects it with the opaque "'file_path' is a required property" error, losing the user's work. -This MCP tool: -- Detects partial truncation (content present but file_path missing) +Each MCP tool: +- Detects partial truncation (arguments present but file_path missing) - Detects complete truncation (empty args) -- Warns on large content that succeeded (>50K chars) -- In non-E2B mode: writes to the SDK working directory -- In E2B mode: delegates to the E2B sandbox write handler +- In non-E2B mode: operates on the SDK working directory +- In E2B mode: delegates to the E2B sandbox handler -The JSON schema places ``file_path`` FIRST so that truncation is more likely +The JSON schemas place ``file_path`` FIRST so that truncation is more likely to preserve the path (the API serialises properties in schema order). """ +import itertools import json import logging import os @@ -65,6 +65,27 @@ def _check_truncation(file_path: str, content: str) -> dict[str, Any] | None: return None +def _resolve_and_validate( + file_path: str, sdk_cwd: str +) -> tuple[str, None] | tuple[None, dict[str, Any]]: + """Resolve *file_path* against *sdk_cwd* and validate it stays within bounds. + + Returns ``(resolved_path, None)`` on success, or ``(None, error_response)`` + on failure. + """ + if not os.path.isabs(file_path): + resolved = os.path.normpath(os.path.join(sdk_cwd, file_path)) + else: + resolved = os.path.normpath(file_path) + + if not is_allowed_local_path(resolved, sdk_cwd): + return None, _mcp( + f"Path must be within the working directory: {os.path.basename(file_path)}", + error=True, + ) + return resolved, None + + async def _handle_write_non_e2b(args: dict[str, Any]) -> dict[str, Any]: """Write content to a file in the SDK working directory (non-E2B mode).""" file_path: str = args.get("file_path", "") @@ -78,18 +99,10 @@ async def _handle_write_non_e2b(args: dict[str, Any]) -> dict[str, Any]: if not sdk_cwd: return _mcp("No SDK working directory available", error=True) - # Resolve relative paths against SDK working directory - if not os.path.isabs(file_path): - resolved = os.path.normpath(os.path.join(sdk_cwd, file_path)) - else: - resolved = os.path.normpath(file_path) - - # Validate path stays within allowed directories - if not is_allowed_local_path(resolved, sdk_cwd): - return _mcp( - f"Path must be within the working directory: {os.path.basename(file_path)}", - error=True, - ) + resolved, err = _resolve_and_validate(file_path, sdk_cwd) + if err is not None: + return err + assert resolved is not None # for type narrowing try: parent = os.path.dirname(resolved) @@ -160,3 +173,280 @@ WRITE_TOOL_SCHEMA: dict[str, Any] = { }, "required": ["file_path", "content"], } + + +# --------------------------------------------------------------------------- +# Unified Read tool +# --------------------------------------------------------------------------- + +_READ_BINARY_EXTENSIONS = frozenset( + { + ".png", + ".jpg", + ".jpeg", + ".gif", + ".bmp", + ".ico", + ".webp", + ".pdf", + ".zip", + ".gz", + ".tar", + ".bz2", + ".xz", + ".7z", + ".exe", + ".dll", + ".so", + ".dylib", + ".bin", + ".o", + ".a", + ".pyc", + ".pyo", + ".class", + ".wasm", + ".mp3", + ".mp4", + ".avi", + ".mov", + ".mkv", + ".wav", + ".flac", + ".sqlite", + ".db", + } +) + + +def _is_likely_binary(path: str) -> bool: + """Heuristic check for binary files by extension.""" + _, ext = os.path.splitext(path) + return ext.lower() in _READ_BINARY_EXTENSIONS + + +async def _handle_read_non_e2b(args: dict[str, Any]) -> dict[str, Any]: + """Read a file from the SDK working directory (non-E2B mode).""" + file_path: str = args.get("file_path", "") + offset: int = max(0, int(args.get("offset", 0))) + limit: int = max(1, int(args.get("limit", 2000))) + + if not file_path: + return _mcp("file_path is required", error=True) + + sdk_cwd = get_sdk_cwd() + if not sdk_cwd: + return _mcp("No SDK working directory available", error=True) + + resolved, err = _resolve_and_validate(file_path, sdk_cwd) + if err is not None: + return err + assert resolved is not None + + if _is_likely_binary(resolved): + return _mcp( + f"Cannot read binary file: {os.path.basename(resolved)}. " + "Use bash_exec with 'xxd' or 'file' to inspect binary files.", + error=True, + ) + + try: + with open(resolved, encoding="utf-8", errors="replace") as f: + selected = list(itertools.islice(f, offset, offset + limit)) + except FileNotFoundError: + return _mcp(f"File not found: {file_path}", error=True) + except PermissionError: + return _mcp(f"Permission denied: {file_path}", error=True) + except Exception as exc: + return _mcp(f"Failed to read {file_path}: {exc}", error=True) + + numbered = "".join( + f"{i + offset + 1:>6}\t{line}" for i, line in enumerate(selected) + ) + return _mcp(numbered) + + +async def _handle_read_e2b(args: dict[str, Any]) -> dict[str, Any]: + """Read a file, delegating to the E2B sandbox.""" + from .e2b_file_tools import _handle_read_file + + return await _handle_read_file(args) + + +def get_read_tool_handler(*, use_e2b: bool) -> Callable[..., Any]: + """Return the appropriate Read handler for the current execution mode.""" + if use_e2b: + return _handle_read_e2b + return _handle_read_non_e2b + + +READ_TOOL_NAME = "read_file" +READ_TOOL_DESCRIPTION = ( + "Read a file from the working directory. Returns content with line numbers " + "(cat -n format). Use offset and limit to read specific ranges for large files." +) +READ_TOOL_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": { + "file_path": { + "type": "string", + "description": ( + "The path to the file to read. " + "Relative paths are resolved against the working directory." + ), + }, + "offset": { + "type": "integer", + "description": ( + "Line number to start reading from (0-indexed). Default: 0." + ), + }, + "limit": { + "type": "integer", + "description": "Number of lines to read. Default: 2000.", + }, + }, + "required": ["file_path"], +} + + +# --------------------------------------------------------------------------- +# Unified Edit tool +# --------------------------------------------------------------------------- + +_EDIT_PARTIAL_TRUNCATION_MSG = ( + "Your Edit call was truncated (file_path missing but old_string/new_string " + "were present). The arguments were too large for a single tool call. " + "Break your edit into smaller replacements, or use bash_exec with " + "'sed' for large-scale find-and-replace." +) + + +async def _handle_edit_non_e2b(args: dict[str, Any]) -> dict[str, Any]: + """Edit a file in the SDK working directory (non-E2B mode).""" + file_path: str = args.get("file_path", "") + old_string: str = args.get("old_string", "") + new_string: str = args.get("new_string", "") + replace_all: bool = args.get("replace_all", False) + + # Partial truncation: file_path missing but edit strings present + if not file_path: + if old_string or new_string: + return _mcp(_EDIT_PARTIAL_TRUNCATION_MSG, error=True) + return _mcp( + "Your Edit call had empty arguments — this means your previous " + "response was too long and the tool call was truncated by the API. " + "Break your work into smaller steps.", + error=True, + ) + + if not old_string: + return _mcp("old_string is required", error=True) + + sdk_cwd = get_sdk_cwd() + if not sdk_cwd: + return _mcp("No SDK working directory available", error=True) + + resolved, err = _resolve_and_validate(file_path, sdk_cwd) + if err is not None: + return err + assert resolved is not None + + try: + with open(resolved, encoding="utf-8") as f: + content = f.read() + except FileNotFoundError: + return _mcp(f"File not found: {file_path}", error=True) + except PermissionError: + return _mcp(f"Permission denied: {file_path}", error=True) + except Exception as exc: + return _mcp(f"Failed to read {file_path}: {exc}", error=True) + + count = content.count(old_string) + if count == 0: + return _mcp(f"old_string not found in {file_path}", error=True) + if count > 1 and not replace_all: + return _mcp( + f"old_string appears {count} times in {file_path}. " + "Use replace_all=true or provide a more unique string.", + error=True, + ) + + updated = ( + content.replace(old_string, new_string) + if replace_all + else content.replace(old_string, new_string, 1) + ) + + try: + with open(resolved, "w", encoding="utf-8") as f: + f.write(updated) + except Exception as exc: + return _mcp(f"Failed to write {file_path}: {exc}", error=True) + + return _mcp(f"Edited {resolved} ({count} replacement{'s' if count > 1 else ''})") + + +async def _handle_edit_e2b(args: dict[str, Any]) -> dict[str, Any]: + """Edit a file, delegating to the E2B sandbox.""" + from .e2b_file_tools import _handle_edit_file + + file_path: str = args.get("file_path", "") + old_string: str = args.get("old_string", "") + new_string: str = args.get("new_string", "") + + # Check truncation before delegating + if not file_path: + if old_string or new_string: + return _mcp(_EDIT_PARTIAL_TRUNCATION_MSG, error=True) + return _mcp( + "Your Edit call had empty arguments — this means your previous " + "response was too long and the tool call was truncated by the API. " + "Break your work into smaller steps.", + error=True, + ) + + return await _handle_edit_file(args) + + +def get_edit_tool_handler(*, use_e2b: bool) -> Callable[..., Any]: + """Return the appropriate Edit handler for the current execution mode.""" + if use_e2b: + return _handle_edit_e2b + return _handle_edit_non_e2b + + +EDIT_TOOL_NAME = "Edit" +EDIT_TOOL_DESCRIPTION = ( + "Make targeted text replacements in a file. Finds old_string in the file " + "and replaces it with new_string. For replacing all occurrences, set " + "replace_all=true." +) +EDIT_TOOL_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": { + "file_path": { + "type": "string", + "description": ( + "The path to the file to edit. " + "Relative paths are resolved against the working directory." + ), + }, + "old_string": { + "type": "string", + "description": "The text to find in the file.", + }, + "new_string": { + "type": "string", + "description": "The replacement text.", + }, + "replace_all": { + "type": "boolean", + "description": ( + "Replace all occurrences of old_string (default: false). " + "When false, old_string must appear exactly once." + ), + }, + }, + "required": ["file_path", "old_string", "new_string"], +} diff --git a/autogpt_platform/backend/backend/copilot/sdk/file_tools_test.py b/autogpt_platform/backend/backend/copilot/sdk/file_tools_test.py index 1734916f43..6cf088d0f9 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/file_tools_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/file_tools_test.py @@ -1,8 +1,8 @@ -"""Tests for the unified MCP Write tool (file_tools.py). +"""Tests for the unified MCP file tools (Read/Write/Edit) in file_tools.py. -Covers: normal write, large content warning, partial truncation, +Covers: normal read/write/edit, large content warning, partial truncation, complete truncation, path validation (no escape from working dir), -E2B delegation, and CLI built-in Write disallowance. +E2B delegation, binary file handling, and CLI built-in disallowance. """ import os @@ -13,8 +13,14 @@ from backend.copilot.sdk.tool_adapter import SDK_DISALLOWED_TOOLS from .file_tools import ( _LARGE_CONTENT_WARN_CHARS, + EDIT_TOOL_NAME, + EDIT_TOOL_SCHEMA, + READ_TOOL_NAME, + READ_TOOL_SCHEMA, WRITE_TOOL_NAME, WRITE_TOOL_SCHEMA, + _handle_edit_non_e2b, + _handle_read_non_e2b, _handle_write_non_e2b, ) @@ -194,3 +200,348 @@ class TestCliBuiltinWriteDisallowed: def test_tool_name_is_write(self): assert WRITE_TOOL_NAME == "Write" + + +# =========================================================================== +# Read tool tests +# =========================================================================== + + +class TestReadToolSchema: + def test_file_path_is_first_property(self): + props = list(READ_TOOL_SCHEMA["properties"].keys()) + assert props[0] == "file_path" + + def test_file_path_required(self): + assert "file_path" in READ_TOOL_SCHEMA["required"] + + def test_offset_and_limit_optional(self): + assert "offset" not in READ_TOOL_SCHEMA.get("required", []) + assert "limit" not in READ_TOOL_SCHEMA.get("required", []) + + def test_tool_name_is_read_file(self): + assert READ_TOOL_NAME == "read_file" + + +class TestNormalRead: + @pytest.mark.asyncio + async def test_read_file(self, sdk_cwd): + path = os.path.join(sdk_cwd, "hello.txt") + with open(path, "w") as f: + f.write("line1\nline2\nline3\n") + result = await _handle_read_non_e2b({"file_path": "hello.txt"}) + assert not result["isError"] + text = result["content"][0]["text"] + assert "line1" in text + assert "line2" in text + assert "line3" in text + + @pytest.mark.asyncio + async def test_read_with_line_numbers(self, sdk_cwd): + path = os.path.join(sdk_cwd, "numbered.txt") + with open(path, "w") as f: + f.write("alpha\nbeta\ngamma\n") + result = await _handle_read_non_e2b({"file_path": "numbered.txt"}) + text = result["content"][0]["text"] + # cat -n format: line numbers with tab separator + assert "1\t" in text + assert "2\t" in text + assert "3\t" in text + + @pytest.mark.asyncio + async def test_read_absolute_path_within_cwd(self, sdk_cwd): + path = os.path.join(sdk_cwd, "abs.txt") + with open(path, "w") as f: + f.write("absolute content") + result = await _handle_read_non_e2b({"file_path": path}) + assert not result["isError"] + assert "absolute content" in result["content"][0]["text"] + + +class TestReadOffsetLimit: + @pytest.mark.asyncio + async def test_read_with_offset(self, sdk_cwd): + path = os.path.join(sdk_cwd, "lines.txt") + with open(path, "w") as f: + for i in range(10): + f.write(f"line{i}\n") + result = await _handle_read_non_e2b( + {"file_path": "lines.txt", "offset": 5, "limit": 3} + ) + text = result["content"][0]["text"] + assert "line5" in text + assert "line6" in text + assert "line7" in text + assert "line4" not in text + assert "line8" not in text + + @pytest.mark.asyncio + async def test_read_with_limit(self, sdk_cwd): + path = os.path.join(sdk_cwd, "many.txt") + with open(path, "w") as f: + for i in range(100): + f.write(f"line{i}\n") + result = await _handle_read_non_e2b({"file_path": "many.txt", "limit": 2}) + text = result["content"][0]["text"] + assert "line0" in text + assert "line1" in text + assert "line2" not in text + + @pytest.mark.asyncio + async def test_offset_line_numbers_are_correct(self, sdk_cwd): + path = os.path.join(sdk_cwd, "offset_nums.txt") + with open(path, "w") as f: + for i in range(10): + f.write(f"line{i}\n") + result = await _handle_read_non_e2b( + {"file_path": "offset_nums.txt", "offset": 3, "limit": 2} + ) + text = result["content"][0]["text"] + # Lines 4 and 5 (1-indexed) should appear + assert "4\t" in text + assert "5\t" in text + + +class TestReadFileNotFound: + @pytest.mark.asyncio + async def test_file_not_found(self, sdk_cwd): + result = await _handle_read_non_e2b({"file_path": "nonexistent.txt"}) + assert result["isError"] + text = result["content"][0]["text"] + assert "not found" in text.lower() + + +class TestReadPathTraversal: + @pytest.mark.asyncio + async def test_path_traversal_blocked(self, sdk_cwd): + result = await _handle_read_non_e2b({"file_path": "../../etc/passwd"}) + assert result["isError"] + text = result["content"][0]["text"] + assert "must be within" in text.lower() + + @pytest.mark.asyncio + async def test_absolute_outside_cwd_blocked(self, sdk_cwd): + result = await _handle_read_non_e2b({"file_path": "/etc/passwd"}) + assert result["isError"] + + +class TestReadBinaryFile: + @pytest.mark.asyncio + async def test_binary_file_rejected(self, sdk_cwd): + path = os.path.join(sdk_cwd, "image.png") + with open(path, "wb") as f: + f.write(b"\x89PNG\r\n\x1a\n") + result = await _handle_read_non_e2b({"file_path": "image.png"}) + assert result["isError"] + text = result["content"][0]["text"] + assert "binary" in text.lower() + + @pytest.mark.asyncio + async def test_text_file_not_rejected_as_binary(self, sdk_cwd): + path = os.path.join(sdk_cwd, "code.py") + with open(path, "w") as f: + f.write("print('hello')\n") + result = await _handle_read_non_e2b({"file_path": "code.py"}) + assert not result["isError"] + + +class TestReadEmptyFilePath: + @pytest.mark.asyncio + async def test_empty_file_path(self, sdk_cwd): + result = await _handle_read_non_e2b({"file_path": ""}) + assert result["isError"] + + @pytest.mark.asyncio + async def test_no_sdk_cwd(self, monkeypatch): + monkeypatch.setattr("backend.copilot.sdk.file_tools.get_sdk_cwd", lambda: "") + result = await _handle_read_non_e2b({"file_path": "test.txt"}) + assert result["isError"] + assert "working directory" in result["content"][0]["text"].lower() + + +# =========================================================================== +# Edit tool tests +# =========================================================================== + + +class TestEditToolSchema: + def test_file_path_is_first_property(self): + props = list(EDIT_TOOL_SCHEMA["properties"].keys()) + assert props[0] == "file_path" + + def test_required_fields(self): + assert "file_path" in EDIT_TOOL_SCHEMA["required"] + assert "old_string" in EDIT_TOOL_SCHEMA["required"] + assert "new_string" in EDIT_TOOL_SCHEMA["required"] + + def test_replace_all_optional(self): + assert "replace_all" not in EDIT_TOOL_SCHEMA.get("required", []) + + def test_tool_name_is_edit(self): + assert EDIT_TOOL_NAME == "Edit" + + def test_edit_in_disallowed_tools(self): + assert "Edit" in SDK_DISALLOWED_TOOLS + + +class TestNormalEdit: + @pytest.mark.asyncio + async def test_simple_replacement(self, sdk_cwd): + path = os.path.join(sdk_cwd, "edit_me.txt") + with open(path, "w") as f: + f.write("Hello World\n") + result = await _handle_edit_non_e2b( + {"file_path": "edit_me.txt", "old_string": "World", "new_string": "Earth"} + ) + assert not result["isError"] + content = open(path).read() + assert content == "Hello Earth\n" + + @pytest.mark.asyncio + async def test_edit_reports_replacement_count(self, sdk_cwd): + path = os.path.join(sdk_cwd, "count.txt") + with open(path, "w") as f: + f.write("one two three\n") + result = await _handle_edit_non_e2b( + {"file_path": "count.txt", "old_string": "two", "new_string": "2"} + ) + text = result["content"][0]["text"] + assert "1 replacement" in text + + @pytest.mark.asyncio + async def test_edit_absolute_path(self, sdk_cwd): + path = os.path.join(sdk_cwd, "abs_edit.txt") + with open(path, "w") as f: + f.write("before\n") + result = await _handle_edit_non_e2b( + {"file_path": path, "old_string": "before", "new_string": "after"} + ) + assert not result["isError"] + assert open(path).read() == "after\n" + + +class TestEditOldStringNotFound: + @pytest.mark.asyncio + async def test_old_string_not_found(self, sdk_cwd): + path = os.path.join(sdk_cwd, "nope.txt") + with open(path, "w") as f: + f.write("Hello World\n") + result = await _handle_edit_non_e2b( + {"file_path": "nope.txt", "old_string": "MISSING", "new_string": "x"} + ) + assert result["isError"] + text = result["content"][0]["text"] + assert "not found" in text.lower() + + +class TestEditOldStringNotUnique: + @pytest.mark.asyncio + async def test_not_unique_without_replace_all(self, sdk_cwd): + path = os.path.join(sdk_cwd, "dup.txt") + with open(path, "w") as f: + f.write("foo bar foo baz\n") + result = await _handle_edit_non_e2b( + {"file_path": "dup.txt", "old_string": "foo", "new_string": "qux"} + ) + assert result["isError"] + text = result["content"][0]["text"] + assert "2 times" in text + # File should be unchanged + assert open(path).read() == "foo bar foo baz\n" + + +class TestEditReplaceAll: + @pytest.mark.asyncio + async def test_replace_all(self, sdk_cwd): + path = os.path.join(sdk_cwd, "all.txt") + with open(path, "w") as f: + f.write("foo bar foo baz foo\n") + result = await _handle_edit_non_e2b( + { + "file_path": "all.txt", + "old_string": "foo", + "new_string": "qux", + "replace_all": True, + } + ) + assert not result["isError"] + content = open(path).read() + assert content == "qux bar qux baz qux\n" + text = result["content"][0]["text"] + assert "3 replacement" in text + + +class TestEditPartialTruncation: + @pytest.mark.asyncio + async def test_partial_truncation(self, sdk_cwd): + """file_path missing but old_string/new_string present.""" + result = await _handle_edit_non_e2b( + {"old_string": "something", "new_string": "else"} + ) + assert result["isError"] + text = result["content"][0]["text"] + assert "truncated" in text.lower() + + @pytest.mark.asyncio + async def test_complete_truncation(self, sdk_cwd): + result = await _handle_edit_non_e2b({}) + assert result["isError"] + text = result["content"][0]["text"] + assert "truncated" in text.lower() + + @pytest.mark.asyncio + async def test_empty_file_path_with_content(self, sdk_cwd): + result = await _handle_edit_non_e2b( + {"file_path": "", "old_string": "x", "new_string": "y"} + ) + assert result["isError"] + + +class TestEditPathTraversal: + @pytest.mark.asyncio + async def test_path_traversal_blocked(self, sdk_cwd): + result = await _handle_edit_non_e2b( + { + "file_path": "../../etc/passwd", + "old_string": "root", + "new_string": "evil", + } + ) + assert result["isError"] + text = result["content"][0]["text"] + assert "must be within" in text.lower() + + @pytest.mark.asyncio + async def test_absolute_outside_cwd_blocked(self, sdk_cwd): + result = await _handle_edit_non_e2b( + { + "file_path": "/etc/passwd", + "old_string": "root", + "new_string": "evil", + } + ) + assert result["isError"] + + +class TestEditFileNotFound: + @pytest.mark.asyncio + async def test_file_not_found(self, sdk_cwd): + result = await _handle_edit_non_e2b( + { + "file_path": "nonexistent.txt", + "old_string": "x", + "new_string": "y", + } + ) + assert result["isError"] + text = result["content"][0]["text"] + assert "not found" in text.lower() + + @pytest.mark.asyncio + async def test_no_sdk_cwd(self, monkeypatch): + monkeypatch.setattr("backend.copilot.sdk.file_tools.get_sdk_cwd", lambda: "") + result = await _handle_edit_non_e2b( + {"file_path": "test.txt", "old_string": "x", "new_string": "y"} + ) + assert result["isError"] + assert "working directory" in result["content"][0]["text"].lower() diff --git a/autogpt_platform/backend/backend/copilot/sdk/tool_adapter.py b/autogpt_platform/backend/backend/copilot/sdk/tool_adapter.py index 9159fd1e30..6e0a7de3a8 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/tool_adapter.py +++ b/autogpt_platform/backend/backend/copilot/sdk/tool_adapter.py @@ -40,9 +40,17 @@ from backend.util.truncate import truncate from .e2b_file_tools import E2B_FILE_TOOL_NAMES, E2B_FILE_TOOLS, bridge_and_annotate from .file_tools import ( + EDIT_TOOL_DESCRIPTION, + EDIT_TOOL_NAME, + EDIT_TOOL_SCHEMA, + READ_TOOL_DESCRIPTION, + READ_TOOL_NAME, + READ_TOOL_SCHEMA, WRITE_TOOL_DESCRIPTION, WRITE_TOOL_NAME, WRITE_TOOL_SCHEMA, + get_edit_tool_handler, + get_read_tool_handler, get_write_tool_handler, ) @@ -641,6 +649,38 @@ def create_copilot_mcp_server(*, use_e2b: bool = False): ) sdk_tools.append(write_tool) + # Unified Read tool — in non-E2B reads from SDK working dir, in E2B + # delegates to the sandbox. Named "read_file" to match E2B naming. + # The CLI's built-in Read is NOT disabled (it's used internally for + # oversized tool results); our MCP version is an additional tool. + read_file_handler = get_read_tool_handler(use_e2b=use_e2b) + read_file_tool = tool( + READ_TOOL_NAME, + READ_TOOL_DESCRIPTION, + READ_TOOL_SCHEMA, + annotations=_PARALLEL_ANNOTATION, + )( + _make_truncating_wrapper( + read_file_handler, READ_TOOL_NAME, input_schema=READ_TOOL_SCHEMA + ) + ) + sdk_tools.append(read_file_tool) + + # Unified Edit tool — replaces the CLI's built-in Edit which has no + # defence against output-token truncation. + edit_handler = get_edit_tool_handler(use_e2b=use_e2b) + edit_tool = tool( + EDIT_TOOL_NAME, + EDIT_TOOL_DESCRIPTION, + EDIT_TOOL_SCHEMA, + annotations=_PARALLEL_ANNOTATION, + )( + _make_truncating_wrapper( + edit_handler, EDIT_TOOL_NAME, input_schema=EDIT_TOOL_SCHEMA + ) + ) + sdk_tools.append(edit_tool) + # Read tool for SDK-truncated tool results (always needed, read-only). read_tool = tool( _READ_TOOL_NAME, @@ -683,11 +723,17 @@ _SDK_BUILTIN_TOOLS = [*_SDK_BUILTIN_FILE_TOOLS, *_SDK_BUILTIN_ALWAYS] # "'file_path' is a required property" error, losing the user's work. # All writes go through our MCP Write tool (file_tools.py) where we # control validation and return actionable guidance. +# Edit: same truncation risk as Write — the CLI's built-in Edit has no +# defence against output-token truncation. All edits go through our +# MCP Edit tool (file_tools.py). +# Note: Read is NOT disallowed — the CLI uses Read internally for +# oversized tool results. Our MCP read_file is an additional tool. SDK_DISALLOWED_TOOLS = [ "Bash", "WebFetch", "AskUserQuestion", "Write", + "Edit", ] # Tools that are blocked entirely in security hooks (defence-in-depth). @@ -727,6 +773,8 @@ DANGEROUS_PATTERNS = [ COPILOT_TOOL_NAMES = [ *[f"{MCP_TOOL_PREFIX}{name}" for name in TOOL_REGISTRY.keys()], f"{MCP_TOOL_PREFIX}{WRITE_TOOL_NAME}", + f"{MCP_TOOL_PREFIX}{READ_TOOL_NAME}", + f"{MCP_TOOL_PREFIX}{EDIT_TOOL_NAME}", f"{MCP_TOOL_PREFIX}{_READ_TOOL_NAME}", *_SDK_BUILTIN_TOOLS, ] @@ -744,6 +792,8 @@ def get_copilot_tool_names(*, use_e2b: bool = False) -> list[str]: return [ *[f"{MCP_TOOL_PREFIX}{name}" for name in TOOL_REGISTRY.keys()], f"{MCP_TOOL_PREFIX}{WRITE_TOOL_NAME}", + f"{MCP_TOOL_PREFIX}{READ_TOOL_NAME}", + f"{MCP_TOOL_PREFIX}{EDIT_TOOL_NAME}", f"{MCP_TOOL_PREFIX}{_READ_TOOL_NAME}", *[f"{MCP_TOOL_PREFIX}{name}" for name in E2B_FILE_TOOL_NAMES], *_SDK_BUILTIN_ALWAYS,