feat(copilot): unified MCP Read and Edit tools to prevent truncation data loss

Add read_file and Edit MCP tools following the same pattern as the
existing unified Write tool.  Both route to the E2B sandbox when active
and fall back to the SDK working directory in non-E2B mode.

Read tool (read_file):
- Reads files with cat -n formatted line numbers
- Supports offset/limit for large files
- Binary file detection by extension
- Path validation via is_allowed_local_path()
- CLI built-in Read is NOT disabled (used internally for oversized
  tool results)

Edit tool:
- Targeted find-and-replace with old_string/new_string
- replace_all flag for multi-occurrence replacements
- Uniqueness check when replace_all=false
- Partial truncation detection with actionable guidance
- CLI built-in Edit IS disabled in SDK_DISALLOWED_TOOLS

Both tools are registered in create_copilot_mcp_server() and included
in get_copilot_tool_names() for both E2B and non-E2B modes.
This commit is contained in:
majdyz
2026-04-11 17:57:45 +00:00
parent 82018770c2
commit 0fa24c8332
3 changed files with 715 additions and 24 deletions

View File

@@ -1,21 +1,21 @@
"""Unified MCP Write tool that works in both E2B and non-E2B modes.
"""Unified MCP file tools (Read/Write/Edit) for both E2B and non-E2B modes.
Replaces the CLI's built-in Write tool, which has no defence against output-token
truncation. When the LLM generates a very large ``content`` argument the API
Replaces the CLI's built-in Write and Edit tools, which have no defence against
output-token truncation. When the LLM generates a very large argument the API
truncates the response mid-JSON and Ajv rejects it with the opaque
"'file_path' is a required property" error, losing the user's work.
This MCP tool:
- Detects partial truncation (content present but file_path missing)
Each MCP tool:
- Detects partial truncation (arguments present but file_path missing)
- Detects complete truncation (empty args)
- Warns on large content that succeeded (>50K chars)
- In non-E2B mode: writes to the SDK working directory
- In E2B mode: delegates to the E2B sandbox write handler
- In non-E2B mode: operates on the SDK working directory
- In E2B mode: delegates to the E2B sandbox handler
The JSON schema places ``file_path`` FIRST so that truncation is more likely
The JSON schemas place ``file_path`` FIRST so that truncation is more likely
to preserve the path (the API serialises properties in schema order).
"""
import itertools
import json
import logging
import os
@@ -65,6 +65,27 @@ def _check_truncation(file_path: str, content: str) -> dict[str, Any] | None:
return None
def _resolve_and_validate(
file_path: str, sdk_cwd: str
) -> tuple[str, None] | tuple[None, dict[str, Any]]:
"""Resolve *file_path* against *sdk_cwd* and validate it stays within bounds.
Returns ``(resolved_path, None)`` on success, or ``(None, error_response)``
on failure.
"""
if not os.path.isabs(file_path):
resolved = os.path.normpath(os.path.join(sdk_cwd, file_path))
else:
resolved = os.path.normpath(file_path)
if not is_allowed_local_path(resolved, sdk_cwd):
return None, _mcp(
f"Path must be within the working directory: {os.path.basename(file_path)}",
error=True,
)
return resolved, None
async def _handle_write_non_e2b(args: dict[str, Any]) -> dict[str, Any]:
"""Write content to a file in the SDK working directory (non-E2B mode)."""
file_path: str = args.get("file_path", "")
@@ -78,18 +99,10 @@ async def _handle_write_non_e2b(args: dict[str, Any]) -> dict[str, Any]:
if not sdk_cwd:
return _mcp("No SDK working directory available", error=True)
# Resolve relative paths against SDK working directory
if not os.path.isabs(file_path):
resolved = os.path.normpath(os.path.join(sdk_cwd, file_path))
else:
resolved = os.path.normpath(file_path)
# Validate path stays within allowed directories
if not is_allowed_local_path(resolved, sdk_cwd):
return _mcp(
f"Path must be within the working directory: {os.path.basename(file_path)}",
error=True,
)
resolved, err = _resolve_and_validate(file_path, sdk_cwd)
if err is not None:
return err
assert resolved is not None # for type narrowing
try:
parent = os.path.dirname(resolved)
@@ -160,3 +173,280 @@ WRITE_TOOL_SCHEMA: dict[str, Any] = {
},
"required": ["file_path", "content"],
}
# ---------------------------------------------------------------------------
# Unified Read tool
# ---------------------------------------------------------------------------
_READ_BINARY_EXTENSIONS = frozenset(
{
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".ico",
".webp",
".pdf",
".zip",
".gz",
".tar",
".bz2",
".xz",
".7z",
".exe",
".dll",
".so",
".dylib",
".bin",
".o",
".a",
".pyc",
".pyo",
".class",
".wasm",
".mp3",
".mp4",
".avi",
".mov",
".mkv",
".wav",
".flac",
".sqlite",
".db",
}
)
def _is_likely_binary(path: str) -> bool:
"""Heuristic check for binary files by extension."""
_, ext = os.path.splitext(path)
return ext.lower() in _READ_BINARY_EXTENSIONS
async def _handle_read_non_e2b(args: dict[str, Any]) -> dict[str, Any]:
"""Read a file from the SDK working directory (non-E2B mode)."""
file_path: str = args.get("file_path", "")
offset: int = max(0, int(args.get("offset", 0)))
limit: int = max(1, int(args.get("limit", 2000)))
if not file_path:
return _mcp("file_path is required", error=True)
sdk_cwd = get_sdk_cwd()
if not sdk_cwd:
return _mcp("No SDK working directory available", error=True)
resolved, err = _resolve_and_validate(file_path, sdk_cwd)
if err is not None:
return err
assert resolved is not None
if _is_likely_binary(resolved):
return _mcp(
f"Cannot read binary file: {os.path.basename(resolved)}. "
"Use bash_exec with 'xxd' or 'file' to inspect binary files.",
error=True,
)
try:
with open(resolved, encoding="utf-8", errors="replace") as f:
selected = list(itertools.islice(f, offset, offset + limit))
except FileNotFoundError:
return _mcp(f"File not found: {file_path}", error=True)
except PermissionError:
return _mcp(f"Permission denied: {file_path}", error=True)
except Exception as exc:
return _mcp(f"Failed to read {file_path}: {exc}", error=True)
numbered = "".join(
f"{i + offset + 1:>6}\t{line}" for i, line in enumerate(selected)
)
return _mcp(numbered)
async def _handle_read_e2b(args: dict[str, Any]) -> dict[str, Any]:
"""Read a file, delegating to the E2B sandbox."""
from .e2b_file_tools import _handle_read_file
return await _handle_read_file(args)
def get_read_tool_handler(*, use_e2b: bool) -> Callable[..., Any]:
"""Return the appropriate Read handler for the current execution mode."""
if use_e2b:
return _handle_read_e2b
return _handle_read_non_e2b
READ_TOOL_NAME = "read_file"
READ_TOOL_DESCRIPTION = (
"Read a file from the working directory. Returns content with line numbers "
"(cat -n format). Use offset and limit to read specific ranges for large files."
)
READ_TOOL_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": (
"The path to the file to read. "
"Relative paths are resolved against the working directory."
),
},
"offset": {
"type": "integer",
"description": (
"Line number to start reading from (0-indexed). Default: 0."
),
},
"limit": {
"type": "integer",
"description": "Number of lines to read. Default: 2000.",
},
},
"required": ["file_path"],
}
# ---------------------------------------------------------------------------
# Unified Edit tool
# ---------------------------------------------------------------------------
_EDIT_PARTIAL_TRUNCATION_MSG = (
"Your Edit call was truncated (file_path missing but old_string/new_string "
"were present). The arguments were too large for a single tool call. "
"Break your edit into smaller replacements, or use bash_exec with "
"'sed' for large-scale find-and-replace."
)
async def _handle_edit_non_e2b(args: dict[str, Any]) -> dict[str, Any]:
"""Edit a file in the SDK working directory (non-E2B mode)."""
file_path: str = args.get("file_path", "")
old_string: str = args.get("old_string", "")
new_string: str = args.get("new_string", "")
replace_all: bool = args.get("replace_all", False)
# Partial truncation: file_path missing but edit strings present
if not file_path:
if old_string or new_string:
return _mcp(_EDIT_PARTIAL_TRUNCATION_MSG, error=True)
return _mcp(
"Your Edit call had empty arguments — this means your previous "
"response was too long and the tool call was truncated by the API. "
"Break your work into smaller steps.",
error=True,
)
if not old_string:
return _mcp("old_string is required", error=True)
sdk_cwd = get_sdk_cwd()
if not sdk_cwd:
return _mcp("No SDK working directory available", error=True)
resolved, err = _resolve_and_validate(file_path, sdk_cwd)
if err is not None:
return err
assert resolved is not None
try:
with open(resolved, encoding="utf-8") as f:
content = f.read()
except FileNotFoundError:
return _mcp(f"File not found: {file_path}", error=True)
except PermissionError:
return _mcp(f"Permission denied: {file_path}", error=True)
except Exception as exc:
return _mcp(f"Failed to read {file_path}: {exc}", error=True)
count = content.count(old_string)
if count == 0:
return _mcp(f"old_string not found in {file_path}", error=True)
if count > 1 and not replace_all:
return _mcp(
f"old_string appears {count} times in {file_path}. "
"Use replace_all=true or provide a more unique string.",
error=True,
)
updated = (
content.replace(old_string, new_string)
if replace_all
else content.replace(old_string, new_string, 1)
)
try:
with open(resolved, "w", encoding="utf-8") as f:
f.write(updated)
except Exception as exc:
return _mcp(f"Failed to write {file_path}: {exc}", error=True)
return _mcp(f"Edited {resolved} ({count} replacement{'s' if count > 1 else ''})")
async def _handle_edit_e2b(args: dict[str, Any]) -> dict[str, Any]:
"""Edit a file, delegating to the E2B sandbox."""
from .e2b_file_tools import _handle_edit_file
file_path: str = args.get("file_path", "")
old_string: str = args.get("old_string", "")
new_string: str = args.get("new_string", "")
# Check truncation before delegating
if not file_path:
if old_string or new_string:
return _mcp(_EDIT_PARTIAL_TRUNCATION_MSG, error=True)
return _mcp(
"Your Edit call had empty arguments — this means your previous "
"response was too long and the tool call was truncated by the API. "
"Break your work into smaller steps.",
error=True,
)
return await _handle_edit_file(args)
def get_edit_tool_handler(*, use_e2b: bool) -> Callable[..., Any]:
"""Return the appropriate Edit handler for the current execution mode."""
if use_e2b:
return _handle_edit_e2b
return _handle_edit_non_e2b
EDIT_TOOL_NAME = "Edit"
EDIT_TOOL_DESCRIPTION = (
"Make targeted text replacements in a file. Finds old_string in the file "
"and replaces it with new_string. For replacing all occurrences, set "
"replace_all=true."
)
EDIT_TOOL_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": (
"The path to the file to edit. "
"Relative paths are resolved against the working directory."
),
},
"old_string": {
"type": "string",
"description": "The text to find in the file.",
},
"new_string": {
"type": "string",
"description": "The replacement text.",
},
"replace_all": {
"type": "boolean",
"description": (
"Replace all occurrences of old_string (default: false). "
"When false, old_string must appear exactly once."
),
},
},
"required": ["file_path", "old_string", "new_string"],
}

View File

@@ -1,8 +1,8 @@
"""Tests for the unified MCP Write tool (file_tools.py).
"""Tests for the unified MCP file tools (Read/Write/Edit) in file_tools.py.
Covers: normal write, large content warning, partial truncation,
Covers: normal read/write/edit, large content warning, partial truncation,
complete truncation, path validation (no escape from working dir),
E2B delegation, and CLI built-in Write disallowance.
E2B delegation, binary file handling, and CLI built-in disallowance.
"""
import os
@@ -13,8 +13,14 @@ from backend.copilot.sdk.tool_adapter import SDK_DISALLOWED_TOOLS
from .file_tools import (
_LARGE_CONTENT_WARN_CHARS,
EDIT_TOOL_NAME,
EDIT_TOOL_SCHEMA,
READ_TOOL_NAME,
READ_TOOL_SCHEMA,
WRITE_TOOL_NAME,
WRITE_TOOL_SCHEMA,
_handle_edit_non_e2b,
_handle_read_non_e2b,
_handle_write_non_e2b,
)
@@ -194,3 +200,348 @@ class TestCliBuiltinWriteDisallowed:
def test_tool_name_is_write(self):
assert WRITE_TOOL_NAME == "Write"
# ===========================================================================
# Read tool tests
# ===========================================================================
class TestReadToolSchema:
def test_file_path_is_first_property(self):
props = list(READ_TOOL_SCHEMA["properties"].keys())
assert props[0] == "file_path"
def test_file_path_required(self):
assert "file_path" in READ_TOOL_SCHEMA["required"]
def test_offset_and_limit_optional(self):
assert "offset" not in READ_TOOL_SCHEMA.get("required", [])
assert "limit" not in READ_TOOL_SCHEMA.get("required", [])
def test_tool_name_is_read_file(self):
assert READ_TOOL_NAME == "read_file"
class TestNormalRead:
@pytest.mark.asyncio
async def test_read_file(self, sdk_cwd):
path = os.path.join(sdk_cwd, "hello.txt")
with open(path, "w") as f:
f.write("line1\nline2\nline3\n")
result = await _handle_read_non_e2b({"file_path": "hello.txt"})
assert not result["isError"]
text = result["content"][0]["text"]
assert "line1" in text
assert "line2" in text
assert "line3" in text
@pytest.mark.asyncio
async def test_read_with_line_numbers(self, sdk_cwd):
path = os.path.join(sdk_cwd, "numbered.txt")
with open(path, "w") as f:
f.write("alpha\nbeta\ngamma\n")
result = await _handle_read_non_e2b({"file_path": "numbered.txt"})
text = result["content"][0]["text"]
# cat -n format: line numbers with tab separator
assert "1\t" in text
assert "2\t" in text
assert "3\t" in text
@pytest.mark.asyncio
async def test_read_absolute_path_within_cwd(self, sdk_cwd):
path = os.path.join(sdk_cwd, "abs.txt")
with open(path, "w") as f:
f.write("absolute content")
result = await _handle_read_non_e2b({"file_path": path})
assert not result["isError"]
assert "absolute content" in result["content"][0]["text"]
class TestReadOffsetLimit:
@pytest.mark.asyncio
async def test_read_with_offset(self, sdk_cwd):
path = os.path.join(sdk_cwd, "lines.txt")
with open(path, "w") as f:
for i in range(10):
f.write(f"line{i}\n")
result = await _handle_read_non_e2b(
{"file_path": "lines.txt", "offset": 5, "limit": 3}
)
text = result["content"][0]["text"]
assert "line5" in text
assert "line6" in text
assert "line7" in text
assert "line4" not in text
assert "line8" not in text
@pytest.mark.asyncio
async def test_read_with_limit(self, sdk_cwd):
path = os.path.join(sdk_cwd, "many.txt")
with open(path, "w") as f:
for i in range(100):
f.write(f"line{i}\n")
result = await _handle_read_non_e2b({"file_path": "many.txt", "limit": 2})
text = result["content"][0]["text"]
assert "line0" in text
assert "line1" in text
assert "line2" not in text
@pytest.mark.asyncio
async def test_offset_line_numbers_are_correct(self, sdk_cwd):
path = os.path.join(sdk_cwd, "offset_nums.txt")
with open(path, "w") as f:
for i in range(10):
f.write(f"line{i}\n")
result = await _handle_read_non_e2b(
{"file_path": "offset_nums.txt", "offset": 3, "limit": 2}
)
text = result["content"][0]["text"]
# Lines 4 and 5 (1-indexed) should appear
assert "4\t" in text
assert "5\t" in text
class TestReadFileNotFound:
@pytest.mark.asyncio
async def test_file_not_found(self, sdk_cwd):
result = await _handle_read_non_e2b({"file_path": "nonexistent.txt"})
assert result["isError"]
text = result["content"][0]["text"]
assert "not found" in text.lower()
class TestReadPathTraversal:
@pytest.mark.asyncio
async def test_path_traversal_blocked(self, sdk_cwd):
result = await _handle_read_non_e2b({"file_path": "../../etc/passwd"})
assert result["isError"]
text = result["content"][0]["text"]
assert "must be within" in text.lower()
@pytest.mark.asyncio
async def test_absolute_outside_cwd_blocked(self, sdk_cwd):
result = await _handle_read_non_e2b({"file_path": "/etc/passwd"})
assert result["isError"]
class TestReadBinaryFile:
@pytest.mark.asyncio
async def test_binary_file_rejected(self, sdk_cwd):
path = os.path.join(sdk_cwd, "image.png")
with open(path, "wb") as f:
f.write(b"\x89PNG\r\n\x1a\n")
result = await _handle_read_non_e2b({"file_path": "image.png"})
assert result["isError"]
text = result["content"][0]["text"]
assert "binary" in text.lower()
@pytest.mark.asyncio
async def test_text_file_not_rejected_as_binary(self, sdk_cwd):
path = os.path.join(sdk_cwd, "code.py")
with open(path, "w") as f:
f.write("print('hello')\n")
result = await _handle_read_non_e2b({"file_path": "code.py"})
assert not result["isError"]
class TestReadEmptyFilePath:
@pytest.mark.asyncio
async def test_empty_file_path(self, sdk_cwd):
result = await _handle_read_non_e2b({"file_path": ""})
assert result["isError"]
@pytest.mark.asyncio
async def test_no_sdk_cwd(self, monkeypatch):
monkeypatch.setattr("backend.copilot.sdk.file_tools.get_sdk_cwd", lambda: "")
result = await _handle_read_non_e2b({"file_path": "test.txt"})
assert result["isError"]
assert "working directory" in result["content"][0]["text"].lower()
# ===========================================================================
# Edit tool tests
# ===========================================================================
class TestEditToolSchema:
def test_file_path_is_first_property(self):
props = list(EDIT_TOOL_SCHEMA["properties"].keys())
assert props[0] == "file_path"
def test_required_fields(self):
assert "file_path" in EDIT_TOOL_SCHEMA["required"]
assert "old_string" in EDIT_TOOL_SCHEMA["required"]
assert "new_string" in EDIT_TOOL_SCHEMA["required"]
def test_replace_all_optional(self):
assert "replace_all" not in EDIT_TOOL_SCHEMA.get("required", [])
def test_tool_name_is_edit(self):
assert EDIT_TOOL_NAME == "Edit"
def test_edit_in_disallowed_tools(self):
assert "Edit" in SDK_DISALLOWED_TOOLS
class TestNormalEdit:
@pytest.mark.asyncio
async def test_simple_replacement(self, sdk_cwd):
path = os.path.join(sdk_cwd, "edit_me.txt")
with open(path, "w") as f:
f.write("Hello World\n")
result = await _handle_edit_non_e2b(
{"file_path": "edit_me.txt", "old_string": "World", "new_string": "Earth"}
)
assert not result["isError"]
content = open(path).read()
assert content == "Hello Earth\n"
@pytest.mark.asyncio
async def test_edit_reports_replacement_count(self, sdk_cwd):
path = os.path.join(sdk_cwd, "count.txt")
with open(path, "w") as f:
f.write("one two three\n")
result = await _handle_edit_non_e2b(
{"file_path": "count.txt", "old_string": "two", "new_string": "2"}
)
text = result["content"][0]["text"]
assert "1 replacement" in text
@pytest.mark.asyncio
async def test_edit_absolute_path(self, sdk_cwd):
path = os.path.join(sdk_cwd, "abs_edit.txt")
with open(path, "w") as f:
f.write("before\n")
result = await _handle_edit_non_e2b(
{"file_path": path, "old_string": "before", "new_string": "after"}
)
assert not result["isError"]
assert open(path).read() == "after\n"
class TestEditOldStringNotFound:
@pytest.mark.asyncio
async def test_old_string_not_found(self, sdk_cwd):
path = os.path.join(sdk_cwd, "nope.txt")
with open(path, "w") as f:
f.write("Hello World\n")
result = await _handle_edit_non_e2b(
{"file_path": "nope.txt", "old_string": "MISSING", "new_string": "x"}
)
assert result["isError"]
text = result["content"][0]["text"]
assert "not found" in text.lower()
class TestEditOldStringNotUnique:
@pytest.mark.asyncio
async def test_not_unique_without_replace_all(self, sdk_cwd):
path = os.path.join(sdk_cwd, "dup.txt")
with open(path, "w") as f:
f.write("foo bar foo baz\n")
result = await _handle_edit_non_e2b(
{"file_path": "dup.txt", "old_string": "foo", "new_string": "qux"}
)
assert result["isError"]
text = result["content"][0]["text"]
assert "2 times" in text
# File should be unchanged
assert open(path).read() == "foo bar foo baz\n"
class TestEditReplaceAll:
@pytest.mark.asyncio
async def test_replace_all(self, sdk_cwd):
path = os.path.join(sdk_cwd, "all.txt")
with open(path, "w") as f:
f.write("foo bar foo baz foo\n")
result = await _handle_edit_non_e2b(
{
"file_path": "all.txt",
"old_string": "foo",
"new_string": "qux",
"replace_all": True,
}
)
assert not result["isError"]
content = open(path).read()
assert content == "qux bar qux baz qux\n"
text = result["content"][0]["text"]
assert "3 replacement" in text
class TestEditPartialTruncation:
@pytest.mark.asyncio
async def test_partial_truncation(self, sdk_cwd):
"""file_path missing but old_string/new_string present."""
result = await _handle_edit_non_e2b(
{"old_string": "something", "new_string": "else"}
)
assert result["isError"]
text = result["content"][0]["text"]
assert "truncated" in text.lower()
@pytest.mark.asyncio
async def test_complete_truncation(self, sdk_cwd):
result = await _handle_edit_non_e2b({})
assert result["isError"]
text = result["content"][0]["text"]
assert "truncated" in text.lower()
@pytest.mark.asyncio
async def test_empty_file_path_with_content(self, sdk_cwd):
result = await _handle_edit_non_e2b(
{"file_path": "", "old_string": "x", "new_string": "y"}
)
assert result["isError"]
class TestEditPathTraversal:
@pytest.mark.asyncio
async def test_path_traversal_blocked(self, sdk_cwd):
result = await _handle_edit_non_e2b(
{
"file_path": "../../etc/passwd",
"old_string": "root",
"new_string": "evil",
}
)
assert result["isError"]
text = result["content"][0]["text"]
assert "must be within" in text.lower()
@pytest.mark.asyncio
async def test_absolute_outside_cwd_blocked(self, sdk_cwd):
result = await _handle_edit_non_e2b(
{
"file_path": "/etc/passwd",
"old_string": "root",
"new_string": "evil",
}
)
assert result["isError"]
class TestEditFileNotFound:
@pytest.mark.asyncio
async def test_file_not_found(self, sdk_cwd):
result = await _handle_edit_non_e2b(
{
"file_path": "nonexistent.txt",
"old_string": "x",
"new_string": "y",
}
)
assert result["isError"]
text = result["content"][0]["text"]
assert "not found" in text.lower()
@pytest.mark.asyncio
async def test_no_sdk_cwd(self, monkeypatch):
monkeypatch.setattr("backend.copilot.sdk.file_tools.get_sdk_cwd", lambda: "")
result = await _handle_edit_non_e2b(
{"file_path": "test.txt", "old_string": "x", "new_string": "y"}
)
assert result["isError"]
assert "working directory" in result["content"][0]["text"].lower()

View File

@@ -40,9 +40,17 @@ from backend.util.truncate import truncate
from .e2b_file_tools import E2B_FILE_TOOL_NAMES, E2B_FILE_TOOLS, bridge_and_annotate
from .file_tools import (
EDIT_TOOL_DESCRIPTION,
EDIT_TOOL_NAME,
EDIT_TOOL_SCHEMA,
READ_TOOL_DESCRIPTION,
READ_TOOL_NAME,
READ_TOOL_SCHEMA,
WRITE_TOOL_DESCRIPTION,
WRITE_TOOL_NAME,
WRITE_TOOL_SCHEMA,
get_edit_tool_handler,
get_read_tool_handler,
get_write_tool_handler,
)
@@ -641,6 +649,38 @@ def create_copilot_mcp_server(*, use_e2b: bool = False):
)
sdk_tools.append(write_tool)
# Unified Read tool — in non-E2B reads from SDK working dir, in E2B
# delegates to the sandbox. Named "read_file" to match E2B naming.
# The CLI's built-in Read is NOT disabled (it's used internally for
# oversized tool results); our MCP version is an additional tool.
read_file_handler = get_read_tool_handler(use_e2b=use_e2b)
read_file_tool = tool(
READ_TOOL_NAME,
READ_TOOL_DESCRIPTION,
READ_TOOL_SCHEMA,
annotations=_PARALLEL_ANNOTATION,
)(
_make_truncating_wrapper(
read_file_handler, READ_TOOL_NAME, input_schema=READ_TOOL_SCHEMA
)
)
sdk_tools.append(read_file_tool)
# Unified Edit tool — replaces the CLI's built-in Edit which has no
# defence against output-token truncation.
edit_handler = get_edit_tool_handler(use_e2b=use_e2b)
edit_tool = tool(
EDIT_TOOL_NAME,
EDIT_TOOL_DESCRIPTION,
EDIT_TOOL_SCHEMA,
annotations=_PARALLEL_ANNOTATION,
)(
_make_truncating_wrapper(
edit_handler, EDIT_TOOL_NAME, input_schema=EDIT_TOOL_SCHEMA
)
)
sdk_tools.append(edit_tool)
# Read tool for SDK-truncated tool results (always needed, read-only).
read_tool = tool(
_READ_TOOL_NAME,
@@ -683,11 +723,17 @@ _SDK_BUILTIN_TOOLS = [*_SDK_BUILTIN_FILE_TOOLS, *_SDK_BUILTIN_ALWAYS]
# "'file_path' is a required property" error, losing the user's work.
# All writes go through our MCP Write tool (file_tools.py) where we
# control validation and return actionable guidance.
# Edit: same truncation risk as Write — the CLI's built-in Edit has no
# defence against output-token truncation. All edits go through our
# MCP Edit tool (file_tools.py).
# Note: Read is NOT disallowed — the CLI uses Read internally for
# oversized tool results. Our MCP read_file is an additional tool.
SDK_DISALLOWED_TOOLS = [
"Bash",
"WebFetch",
"AskUserQuestion",
"Write",
"Edit",
]
# Tools that are blocked entirely in security hooks (defence-in-depth).
@@ -727,6 +773,8 @@ DANGEROUS_PATTERNS = [
COPILOT_TOOL_NAMES = [
*[f"{MCP_TOOL_PREFIX}{name}" for name in TOOL_REGISTRY.keys()],
f"{MCP_TOOL_PREFIX}{WRITE_TOOL_NAME}",
f"{MCP_TOOL_PREFIX}{READ_TOOL_NAME}",
f"{MCP_TOOL_PREFIX}{EDIT_TOOL_NAME}",
f"{MCP_TOOL_PREFIX}{_READ_TOOL_NAME}",
*_SDK_BUILTIN_TOOLS,
]
@@ -744,6 +792,8 @@ def get_copilot_tool_names(*, use_e2b: bool = False) -> list[str]:
return [
*[f"{MCP_TOOL_PREFIX}{name}" for name in TOOL_REGISTRY.keys()],
f"{MCP_TOOL_PREFIX}{WRITE_TOOL_NAME}",
f"{MCP_TOOL_PREFIX}{READ_TOOL_NAME}",
f"{MCP_TOOL_PREFIX}{EDIT_TOOL_NAME}",
f"{MCP_TOOL_PREFIX}{_READ_TOOL_NAME}",
*[f"{MCP_TOOL_PREFIX}{name}" for name in E2B_FILE_TOOL_NAMES],
*_SDK_BUILTIN_ALWAYS,