Compare commits

..

18 Commits

Author SHA1 Message Date
Zamil Majdy
da2d3418bd Merge remote-tracking branch 'origin/dev' into feat/copilot-token-optimization
# Conflicts:
#	autogpt_platform/backend/backend/copilot/prompting.py
2026-03-17 06:18:51 +07:00
Zamil Majdy
0faee668ab fix(copilot): address review comments — should-fix + nice-to-have items
Should Fix:
- run_block.py: remove block_name from properties (never consumed via kwargs)
- tool_schema_test.py: cache _ALL_SCHEMAS at module level (avoid double invocation)
- workspace_files.py: restore routing-hint description for list_workspace_files
- feature_requests.py: restore "phone numbers" in PII prohibition list

Nice to Have:
- prompting.py: restore attention signal on Storage heading (### Storage — important)
- tool_schema_test.py: clarify token baseline comment (tool JSON only, ~6470 tokens)
- bash_exec.py: restore use-case hints in description
- get_agent_building_guide.py: restore AgentExecutorBlock/MCPToolBlock signal in description
2026-03-16 06:14:12 +07:00
Zamil Majdy
64790e769a fix(copilot): include all run_time values in agent_output schema description 2026-03-15 22:55:48 +07:00
Zamil Majdy
3bc8db491f fix(copilot): address autogpt-reviewer should-fix items from PR #12398 review
- run_block.py: mark block_name as optional in description
- web_fetch.py: restore SSRF hint "Public URLs only — internal addresses blocked"
- agent_output.py: restore "Returns current state on timeout" to wait_if_running;
  trim run_time description from ~140 chars to ~75 chars
- workspace_files.py: add trailing period to filename description
- tool_schema_test.py: add token budget regression test (assert < 8000 tokens,
  current baseline ~5200 tokens) to lock in the 34% token reduction
2026-03-15 22:18:43 +07:00
Zamil Majdy
f0c3eb87d1 fix(copilot): address reviewer should-fix items for tool schemas
- Add wait_if_running hint to view_agent_output tool description
- Make block_name optional in run_block schema (required but unused)
2026-03-15 14:05:02 +07:00
Zamil Majdy
1db18f2f0a fix(backend/copilot): add tool schema regression test 2026-03-14 23:00:22 +07:00
Zamil Majdy
e542880660 fix: clarify list_workspace_files description re cross-session persistence
The description "Current session only by default" was confusing for the LLM,
implying the workspace itself is session-scoped. Reworded to clarify the
workspace is persistent and only the default view is filtered to the current
session.
2026-03-14 22:52:18 +07:00
Zamil Majdy
f89a8e5de0 fix(copilot): improve tool schema descriptions for clarity and completeness
- Expand run_time description to list all supported parser inputs (today, last 7 days, last month, last 30 days, YYYY-MM-DD, ISO datetime)
- Add cross-tool reference hints (find_block before run_block, get_mcp_guide for URLs)
- Add mutual exclusivity notes for write_workspace_file content params
- Add auth guidance to browser_navigate, return note to browser_act
- Fix duplicate line range in agptfile example
2026-03-14 22:46:38 +07:00
Zamil Majdy
84c58b6624 fix: restore critical cross-tool references in trimmed tool descriptions
- Add web_fetch preference hint to browser_navigate
- Restore continue_run_block reference in run_block
- Restore fix_agent_graph reference in validate_agent
- Add back ephemeral temp directory example in @@agptfile docs
- Restore search quality hint in find_agent query description
- Restore get_doc_page cross-reference in search_docs
2026-03-14 22:27:44 +07:00
Zamil Majdy
05182b8368 Merge remote-tracking branch 'origin/dev' into feat/copilot-token-optimization 2026-03-14 22:11:13 +07:00
Zamil Majdy
f710bde7a9 Merge remote-tracking branch 'origin/dev' into feat/copilot-token-optimization 2026-03-14 10:21:04 +07:00
Zamil Majdy
1256e8a938 fix(copilot): fix indentation of persistent workspace sub-items in prompt template
Align bullet indentation under item 2 (Persistent workspace) to use
3-space indent, matching item 1's sub-items formatted via the
characteristics/persistence variables.
2026-03-13 22:39:37 +07:00
Zamil Majdy
777b71d409 fix(copilot): remove measure_copilot_tokens.py script from PR
Remove development-only token measurement script that should not be
included in the pull request.
2026-03-13 22:04:31 +07:00
Zamil Majdy
a6ecfe6e5b fix(copilot): address remaining PR review comments
- Add workspace:/// path and E2B sandbox path examples to @@agptfile docs
- Add multi-ref and expansion-rules note to file references section
- Add path guidance ("Use relative or absolute paths under this dir")
- Fix indentation for persistent workspace sub-items in storage template
- Add SPA handling guidance to browser_navigate description
- Strengthen "IMPORTANT" emphasis on browser_screenshot read_workspace_file step
2026-03-13 19:46:21 +07:00
Zamil Majdy
9cc93d84d4 Merge remote-tracking branch 'origin/dev' into feat/copilot-token-optimization 2026-03-13 19:27:00 +07:00
Zamil Majdy
32f1e51869 fix(copilot): address PR review — restore default values, strengthen guidance
- Restore JSON Schema "default" fields on: bash_exec timeout, find_block
  include_schemas, web_fetch extract_text, create_agent/edit_agent save
- Strengthen run_block description with IMPORTANT prefix for block ID warning
- Add find_library_agent search guidance to create_agent and edit_agent descriptions
2026-03-13 18:18:39 +07:00
Zamil Majdy
67a121bbe7 fix(copilot): restore functional schema constraints and critical behavioral guidance
Address review comments:
- Restore minimum/maximum JSON Schema constraints on limit parameter
- Restore default fields in JSON Schema for wait_for, annotate, filename, direction
- Restore read_workspace_file instruction after browser_screenshot
- Restore @@agptfile: syntax examples in system prompt
- Restore stronger 'do NOT guess' warning for block IDs
- Restore browser actions list in browser_act description
2026-03-13 18:14:23 +07:00
Zamil Majdy
3c754825aa perf(copilot): reduce tool schema token cost by 34%
Trim verbose tool descriptions and parameter schemas across all 35
CoPilot tools. Remove redundant cross-tool workflow references.
Condense system prompt shared notes and storage supplement.
No functional changes — only description text trimmed.

Before: ~10,609 tokens (865 system + 9,744 tools)
After:  ~6,967 tokens (497 system + 6,470 tools)
Saving: ~3,642 tokens per conversation turn (34% reduction)
2026-03-13 15:28:24 +07:00
42 changed files with 762 additions and 3976 deletions

View File

@@ -115,7 +115,7 @@ class ChatConfig(BaseSettings):
description="E2B sandbox template to use for copilot sessions.",
)
e2b_sandbox_timeout: int = Field(
default=420, # 7 min safety net — allows headroom for compaction retries
default=300, # 5 min safety net — explicit per-turn pause is the primary mechanism
description="E2B sandbox running-time timeout (seconds). "
"E2B timeout is wall-clock (not idle). Explicit per-turn pause is the primary "
"mechanism; this is the safety net.",

View File

@@ -11,34 +11,18 @@ from backend.copilot.tools import TOOL_REGISTRY
# Shared technical notes that apply to both SDK and baseline modes
_SHARED_TOOL_NOTES = """\
### Sharing files with the user
After saving a file to the persistent workspace with `write_workspace_file`,
share it with the user by embedding the `download_url` from the response in
your message as a Markdown link or image:
### Sharing files
After `write_workspace_file`, embed the `download_url` in Markdown:
- File: `[report.csv](workspace://file_id#text/csv)`
- Image: `![chart](workspace://file_id#image/png)`
- Video: `![recording](workspace://file_id#video/mp4)`
- **Any file** — shows as a clickable download link:
`[report.csv](workspace://file_id#text/csv)`
- **Image** — renders inline in chat:
`![chart](workspace://file_id#image/png)`
- **Video** — renders inline in chat with player controls:
`![recording](workspace://file_id#video/mp4)`
The `download_url` field in the `write_workspace_file` response is already
in the correct format — paste it directly after the `(` in the Markdown.
### Passing file content to tools — @@agptfile: references
Instead of copying large file contents into a tool argument, pass a file
reference and the platform will load the content for you.
Syntax: `@@agptfile:<uri>[<start>-<end>]`
- `<uri>` **must** start with `workspace://` or `/` (absolute path):
- `workspace://<file_id>` — workspace file by ID
- `workspace:///<path>` — workspace file by virtual path
- `/absolute/local/path` — ephemeral or sdk_cwd file
- E2B sandbox absolute path (e.g. `/home/user/script.py`)
- `[<start>-<end>]` is an optional 1-indexed inclusive line range.
- URIs that do not start with `workspace://` or `/` are **not** expanded.
### File references — @@agptfile:
Pass large file content to tools by reference: `@@agptfile:<uri>[<start>-<end>]`
- `workspace://<file_id>` or `workspace:///<path>` — workspace files
- `/absolute/path` — local/sandbox files
- `[start-end]` — optional 1-indexed line range
- Multiple refs per argument supported. Only `workspace://` and absolute paths are expanded.
Examples:
```
@@ -49,50 +33,16 @@ Examples:
@@agptfile:/home/user/script.py
```
You can embed a reference inside any string argument, or use it as the entire
value. Multiple references in one argument are all expanded.
**Structured data**: When the entire argument is a single file reference, the platform auto-parses by extension/MIME. Supported: JSON, JSONL, CSV, TSV, YAML, TOML, Parquet, Excel (.xlsx only). Unrecognised formats return plain string.
**Structured data**: When the **entire** argument value is a single file
reference (no surrounding text), the platform automatically parses the file
content based on its extension or MIME type. Supported formats: JSON, JSONL,
CSV, TSV, YAML, TOML, Parquet, and Excel (.xlsx — first sheet only).
For example, pass `@@agptfile:workspace://<id>` where the file is a `.csv` and
the rows will be parsed into `list[list[str]]` automatically. If the format is
unrecognised or parsing fails, the content is returned as a plain string.
Legacy `.xls` files are **not** supported — only the modern `.xlsx` format.
**Type coercion**: The platform also coerces expanded values to match the
block's expected input types. For example, if a block expects `list[list[str]]`
and the expanded value is a JSON string, it will be parsed into the correct type.
**Type coercion**: The platform auto-coerces expanded string values to match block input types (e.g. JSON string → `list[list[str]]`).
### Media file inputs (format: "file")
Some block inputs accept media files — their schema shows `"format": "file"`.
These fields accept:
- **`workspace://<file_id>`** or **`workspace://<file_id>#<mime>`** — preferred
for large files (images, videos, PDFs). The platform passes the reference
directly to the block without reading the content into memory.
- **`data:<mime>;base64,<payload>`** — inline base64 data URI, suitable for
small files only.
When a block input has `format: "file"`, **pass the `workspace://` URI
directly as the value** (do NOT wrap it in `@@agptfile:`). This avoids large
payloads in tool arguments and preserves binary content (images, videos)
that would be corrupted by text encoding.
Example — committing an image file to GitHub:
```json
{
"files": [{
"path": "docs/hero.png",
"content": "workspace://abc123#image/png",
"operation": "upsert"
}]
}
```
Inputs with `"format": "file"` accept `workspace://<file_id>` or `data:<mime>;base64,<payload>`.
Pass the `workspace://` URI directly (do NOT wrap in `@@agptfile:`). This avoids large payloads and preserves binary content.
### Sub-agent tasks
- When using the Task tool, NEVER set `run_in_background` to true.
All tasks must run in the foreground.
- Task tool: NEVER set `run_in_background` to true.
"""
@@ -128,30 +78,18 @@ def _build_storage_supplement(
## Tool notes
### Shell commands
- The SDK built-in Bash tool is NOT available. Use the `bash_exec` MCP tool
for shell commands — it runs {sandbox_type}.
### Working directory
- Your working directory is: `{working_dir}`
- All SDK file tools AND `bash_exec` operate on the same filesystem
- Use relative paths or absolute paths under `{working_dir}` for all file operations
### Two storage systems — CRITICAL to understand
### Shell & filesystem
- Use `bash_exec` for shell commands ({sandbox_type}). Working dir: `{working_dir}`
- All file tools share the same filesystem. Use relative or absolute paths under this dir.
### Storage — important
1. **{storage_system_1_name}** (`{working_dir}`):
{characteristics}
{persistence}
2. **Persistent workspace** (cloud storage):
- Files here **survive across sessions indefinitely**
### Moving files between storages
- **{file_move_name_1_to_2}**: Copy to persistent workspace
- **{file_move_name_2_to_1}**: Download for processing
### File persistence
Important files (code, configs, outputs) should be saved to workspace to ensure they persist.
2. **Persistent workspace** (cloud) — survives across sessions.
- {file_move_name_1_to_2}: use `write_workspace_file`
- {file_move_name_2_to_1}: use `read_workspace_file` with save_to_path
- Save important files to workspace for persistence.
{_SHARED_TOOL_NOTES}"""

View File

@@ -43,7 +43,6 @@ class ResponseType(str, Enum):
ERROR = "error"
USAGE = "usage"
HEARTBEAT = "heartbeat"
STATUS = "status"
class StreamBaseResponse(BaseModel):
@@ -233,26 +232,3 @@ class StreamHeartbeat(StreamBaseResponse):
def to_sse(self) -> str:
"""Convert to SSE comment format to keep connection alive."""
return ": heartbeat\n\n"
class StreamStatus(StreamBaseResponse):
"""Transient status notification shown to the user during long operations.
Used to provide feedback when the backend performs behind-the-scenes work
(e.g., compacting conversation context on a retry) that would otherwise
leave the user staring at an unexplained pause.
"""
type: ResponseType = ResponseType.STATUS
message: str = Field(..., description="Human-readable status message")
def to_sse(self) -> str:
"""Encode as an SSE comment so the AI SDK stream parser ignores it.
The frontend AI SDK validates every ``data:`` line against a strict
Zod union of known chunk types. ``"status"`` is not in that union,
so sending it as ``data:`` would cause a schema-validation error that
breaks the entire stream. Using an SSE comment (``:``) keeps the
connection alive and is silently discarded by ``EventSource`` parsers.
"""
return f": status {self.message}\n\n"

View File

@@ -12,7 +12,6 @@ import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from typing import Any
from ..constants import COMPACTION_DONE_MSG, COMPACTION_TOOL_NAME
from ..model import ChatMessage, ChatSession
@@ -120,12 +119,14 @@ def filter_compaction_messages(
filtered: list[ChatMessage] = []
for msg in messages:
if msg.role == "assistant" and msg.tool_calls:
real_calls: list[dict[str, Any]] = []
for tc in msg.tool_calls:
if tc.get("function", {}).get("name") == COMPACTION_TOOL_NAME:
compaction_ids.add(tc.get("id", ""))
else:
real_calls.append(tc)
real_calls = [
tc
for tc in msg.tool_calls
if tc.get("function", {}).get("name") != COMPACTION_TOOL_NAME
]
if not real_calls and not msg.content:
continue
if msg.role == "tool" and msg.tool_call_id in compaction_ids:
@@ -221,7 +222,6 @@ class CompactionTracker:
def reset_for_query(self) -> None:
"""Reset per-query state before a new SDK query."""
self._compact_start.clear()
self._done = False
self._start_emitted = False
self._tool_call_id = ""

View File

@@ -1,41 +0,0 @@
"""Shared test fixtures for copilot SDK tests."""
from __future__ import annotations
from uuid import uuid4
from backend.util import json
def build_test_transcript(pairs: list[tuple[str, str]]) -> str:
"""Build a minimal valid JSONL transcript from (role, content) pairs.
Use this helper in any copilot SDK test that needs a well-formed
transcript without hitting the real storage layer.
"""
lines: list[str] = []
last_uuid: str | None = None
for role, content in pairs:
uid = str(uuid4())
entry_type = "assistant" if role == "assistant" else "user"
msg: dict = {"role": role, "content": content}
if role == "assistant":
msg.update(
{
"model": "",
"id": f"msg_{uid[:8]}",
"type": "message",
"content": [{"type": "text", "text": content}],
"stop_reason": "end_turn",
"stop_sequence": None,
}
)
entry = {
"type": entry_type,
"uuid": uid,
"parentUuid": last_uuid,
"message": msg,
}
lines.append(json.dumps(entry, separators=(",", ":")))
last_uuid = uid
return "\n".join(lines) + "\n"

View File

@@ -1,552 +0,0 @@
"""Tests for retry logic and transcript compaction helpers."""
from __future__ import annotations
from unittest.mock import AsyncMock, patch
from uuid import uuid4
import pytest
from backend.util import json
from .conftest import build_test_transcript as _build_transcript
from .service import _is_prompt_too_long
from .transcript import (
_flatten_assistant_content,
_flatten_tool_result_content,
_messages_to_transcript,
_transcript_to_messages,
compact_transcript,
validate_transcript,
)
# ---------------------------------------------------------------------------
# _flatten_assistant_content
# ---------------------------------------------------------------------------
class TestFlattenAssistantContent:
def test_text_blocks(self):
blocks = [
{"type": "text", "text": "Hello"},
{"type": "text", "text": "World"},
]
assert _flatten_assistant_content(blocks) == "Hello\nWorld"
def test_tool_use_blocks(self):
blocks = [{"type": "tool_use", "name": "read_file", "input": {}}]
assert _flatten_assistant_content(blocks) == "[tool_use: read_file]"
def test_mixed_blocks(self):
blocks = [
{"type": "text", "text": "Let me read that."},
{"type": "tool_use", "name": "Read", "input": {"path": "/foo"}},
]
result = _flatten_assistant_content(blocks)
assert "Let me read that." in result
assert "[tool_use: Read]" in result
def test_raw_strings(self):
assert _flatten_assistant_content(["hello", "world"]) == "hello\nworld"
def test_unknown_block_type_preserved_as_placeholder(self):
blocks = [
{"type": "text", "text": "See this image:"},
{"type": "image", "source": {"type": "base64", "data": "..."}},
]
result = _flatten_assistant_content(blocks)
assert "See this image:" in result
assert "[__image__]" in result
def test_empty(self):
assert _flatten_assistant_content([]) == ""
# ---------------------------------------------------------------------------
# _flatten_tool_result_content
# ---------------------------------------------------------------------------
class TestFlattenToolResultContent:
def test_tool_result_with_text(self):
blocks = [
{
"type": "tool_result",
"tool_use_id": "123",
"content": [{"type": "text", "text": "file contents here"}],
}
]
assert _flatten_tool_result_content(blocks) == "file contents here"
def test_tool_result_with_string_content(self):
blocks = [{"type": "tool_result", "tool_use_id": "123", "content": "ok"}]
assert _flatten_tool_result_content(blocks) == "ok"
def test_text_block(self):
blocks = [{"type": "text", "text": "plain text"}]
assert _flatten_tool_result_content(blocks) == "plain text"
def test_raw_string(self):
assert _flatten_tool_result_content(["raw"]) == "raw"
def test_tool_result_with_none_content(self):
"""tool_result with content=None should produce empty string."""
blocks = [{"type": "tool_result", "tool_use_id": "x", "content": None}]
assert _flatten_tool_result_content(blocks) == ""
def test_tool_result_with_empty_list_content(self):
"""tool_result with content=[] should produce empty string."""
blocks = [{"type": "tool_result", "tool_use_id": "x", "content": []}]
assert _flatten_tool_result_content(blocks) == ""
def test_empty(self):
assert _flatten_tool_result_content([]) == ""
def test_nested_dict_without_text(self):
"""Dict blocks without text key use json.dumps fallback."""
blocks = [
{
"type": "tool_result",
"tool_use_id": "x",
"content": [{"type": "image", "source": "data:..."}],
}
]
result = _flatten_tool_result_content(blocks)
assert "image" in result # json.dumps fallback
def test_unknown_block_type_preserved_as_placeholder(self):
blocks = [{"type": "image", "source": {"type": "base64", "data": "..."}}]
result = _flatten_tool_result_content(blocks)
assert "[__image__]" in result
# ---------------------------------------------------------------------------
# _transcript_to_messages
# ---------------------------------------------------------------------------
def _make_entry(entry_type: str, role: str, content: str | list, **kwargs) -> str:
"""Build a JSONL line for testing."""
uid = str(uuid4())
msg: dict = {"role": role, "content": content}
msg.update(kwargs)
entry = {
"type": entry_type,
"uuid": uid,
"parentUuid": None,
"message": msg,
}
return json.dumps(entry, separators=(",", ":"))
class TestTranscriptToMessages:
def test_basic_roundtrip(self):
lines = [
_make_entry("user", "user", "Hello"),
_make_entry("assistant", "assistant", [{"type": "text", "text": "Hi"}]),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 2
assert messages[0] == {"role": "user", "content": "Hello"}
assert messages[1] == {"role": "assistant", "content": "Hi"}
def test_skips_strippable_types(self):
"""Progress and metadata entries are excluded."""
lines = [
_make_entry("user", "user", "Hello"),
json.dumps(
{
"type": "progress",
"uuid": str(uuid4()),
"parentUuid": None,
"message": {"role": "assistant", "content": "..."},
}
),
_make_entry("assistant", "assistant", [{"type": "text", "text": "Hi"}]),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 2
def test_empty_content(self):
assert _transcript_to_messages("") == []
def test_tool_result_content(self):
"""User entries with tool_result content blocks are flattened."""
lines = [
_make_entry(
"user",
"user",
[
{
"type": "tool_result",
"tool_use_id": "123",
"content": "tool output",
}
],
),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 1
assert messages[0]["content"] == "tool output"
def test_malformed_json_lines_skipped(self):
"""Malformed JSON lines in transcript are silently skipped."""
lines = [
_make_entry("user", "user", "Hello"),
"this is not valid json",
_make_entry("assistant", "assistant", [{"type": "text", "text": "Hi"}]),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 2
def test_empty_lines_skipped(self):
"""Empty lines and whitespace-only lines are skipped."""
lines = [
_make_entry("user", "user", "Hello"),
"",
" ",
_make_entry("assistant", "assistant", [{"type": "text", "text": "Hi"}]),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 2
def test_unicode_content_preserved(self):
"""Unicode characters survive transcript roundtrip."""
lines = [
_make_entry("user", "user", "Hello 你好 🌍"),
_make_entry(
"assistant",
"assistant",
[{"type": "text", "text": "Bonjour 日本語 émojis 🎉"}],
),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert messages[0]["content"] == "Hello 你好 🌍"
assert messages[1]["content"] == "Bonjour 日本語 émojis 🎉"
def test_entry_without_role_skipped(self):
"""Entries with missing role in message are skipped."""
entry_no_role = json.dumps(
{
"type": "user",
"uuid": str(uuid4()),
"parentUuid": None,
"message": {"content": "no role here"},
}
)
lines = [
entry_no_role,
_make_entry("user", "user", "Hello"),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 1
assert messages[0]["content"] == "Hello"
def test_tool_use_and_result_pairs(self):
"""Tool use + tool result pairs are properly flattened."""
lines = [
_make_entry(
"assistant",
"assistant",
[
{"type": "text", "text": "Let me check."},
{"type": "tool_use", "name": "read_file", "input": {"path": "/x"}},
],
),
_make_entry(
"user",
"user",
[
{
"type": "tool_result",
"tool_use_id": "abc",
"content": [{"type": "text", "text": "file contents"}],
}
],
),
]
content = "\n".join(lines) + "\n"
messages = _transcript_to_messages(content)
assert len(messages) == 2
assert "Let me check." in messages[0]["content"]
assert "[tool_use: read_file]" in messages[0]["content"]
assert messages[1]["content"] == "file contents"
# ---------------------------------------------------------------------------
# _messages_to_transcript
# ---------------------------------------------------------------------------
class TestMessagesToTranscript:
def test_produces_valid_jsonl(self):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
]
result = _messages_to_transcript(messages)
lines = result.strip().split("\n")
assert len(lines) == 2
for line in lines:
parsed = json.loads(line)
assert "type" in parsed
assert "uuid" in parsed
assert "message" in parsed
def test_assistant_has_proper_structure(self):
messages = [{"role": "assistant", "content": "Hello"}]
result = _messages_to_transcript(messages)
entry = json.loads(result.strip())
assert entry["type"] == "assistant"
msg = entry["message"]
assert msg["role"] == "assistant"
assert msg["type"] == "message"
assert msg["stop_reason"] == "end_turn"
assert isinstance(msg["content"], list)
assert msg["content"][0]["type"] == "text"
def test_user_has_plain_content(self):
messages = [{"role": "user", "content": "Hi"}]
result = _messages_to_transcript(messages)
entry = json.loads(result.strip())
assert entry["type"] == "user"
assert entry["message"]["content"] == "Hi"
def test_parent_uuid_chain(self):
messages = [
{"role": "user", "content": "A"},
{"role": "assistant", "content": "B"},
{"role": "user", "content": "C"},
]
result = _messages_to_transcript(messages)
lines = result.strip().split("\n")
entries = [json.loads(line) for line in lines]
assert entries[0]["parentUuid"] == ""
assert entries[1]["parentUuid"] == entries[0]["uuid"]
assert entries[2]["parentUuid"] == entries[1]["uuid"]
def test_empty_messages(self):
assert _messages_to_transcript([]) == ""
def test_output_is_valid_transcript(self):
"""Output should pass validate_transcript if it has assistant entries."""
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
]
result = _messages_to_transcript(messages)
assert validate_transcript(result)
def test_roundtrip_to_messages(self):
"""Messages → transcript → messages preserves structure."""
original = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
{"role": "user", "content": "How are you?"},
]
transcript = _messages_to_transcript(original)
restored = _transcript_to_messages(transcript)
assert len(restored) == len(original)
for orig, rest in zip(original, restored):
assert orig["role"] == rest["role"]
assert orig["content"] == rest["content"]
# ---------------------------------------------------------------------------
# compact_transcript
# ---------------------------------------------------------------------------
class TestCompactTranscript:
@pytest.mark.asyncio
async def test_too_few_messages_returns_none(self):
"""compact_transcript returns None when transcript has < 2 messages."""
transcript = _build_transcript([("user", "Hello")])
with patch(
"backend.copilot.config.ChatConfig",
return_value=type(
"Cfg", (), {"model": "m", "api_key": "k", "base_url": "u"}
)(),
):
result = await compact_transcript(transcript, model="test-model")
assert result is None
@pytest.mark.asyncio
async def test_returns_none_when_not_compacted(self):
"""When compress_context says no compaction needed, returns None.
The compressor couldn't reduce it, so retrying with the same
content would fail identically."""
transcript = _build_transcript(
[
("user", "Hello"),
("assistant", "Hi there"),
]
)
mock_result = type(
"CompressResult",
(),
{
"was_compacted": False,
"messages": [],
"original_token_count": 100,
"token_count": 100,
"messages_summarized": 0,
"messages_dropped": 0,
},
)()
with (
patch(
"backend.copilot.config.ChatConfig",
return_value=type(
"Cfg", (), {"model": "m", "api_key": "k", "base_url": "u"}
)(),
),
patch(
"backend.copilot.sdk.transcript._run_compression",
new_callable=AsyncMock,
return_value=mock_result,
),
):
result = await compact_transcript(transcript, model="test-model")
assert result is None
@pytest.mark.asyncio
async def test_returns_compacted_transcript(self):
"""When compaction succeeds, returns a valid compacted transcript."""
transcript = _build_transcript(
[
("user", "Hello"),
("assistant", "Hi"),
("user", "More"),
("assistant", "Details"),
]
)
compacted_msgs = [
{"role": "user", "content": "[summary]"},
{"role": "assistant", "content": "Summarized response"},
]
mock_result = type(
"CompressResult",
(),
{
"was_compacted": True,
"messages": compacted_msgs,
"original_token_count": 500,
"token_count": 100,
"messages_summarized": 2,
"messages_dropped": 0,
},
)()
with (
patch(
"backend.copilot.config.ChatConfig",
return_value=type(
"Cfg", (), {"model": "m", "api_key": "k", "base_url": "u"}
)(),
),
patch(
"backend.copilot.sdk.transcript._run_compression",
new_callable=AsyncMock,
return_value=mock_result,
),
):
result = await compact_transcript(transcript, model="test-model")
assert result is not None
assert validate_transcript(result)
msgs = _transcript_to_messages(result)
assert len(msgs) == 2
assert msgs[1]["content"] == "Summarized response"
@pytest.mark.asyncio
async def test_returns_none_on_compression_failure(self):
"""When _run_compression raises, returns None."""
transcript = _build_transcript(
[
("user", "Hello"),
("assistant", "Hi"),
]
)
with (
patch(
"backend.copilot.config.ChatConfig",
return_value=type(
"Cfg", (), {"model": "m", "api_key": "k", "base_url": "u"}
)(),
),
patch(
"backend.copilot.sdk.transcript._run_compression",
new_callable=AsyncMock,
side_effect=RuntimeError("LLM unavailable"),
),
):
result = await compact_transcript(transcript, model="test-model")
assert result is None
# ---------------------------------------------------------------------------
# _is_prompt_too_long
# ---------------------------------------------------------------------------
class TestIsPromptTooLong:
"""Unit tests for _is_prompt_too_long pattern matching."""
def test_prompt_is_too_long(self):
err = RuntimeError("prompt is too long for model context")
assert _is_prompt_too_long(err) is True
def test_request_too_large(self):
err = Exception("request too large: 250000 tokens")
assert _is_prompt_too_long(err) is True
def test_maximum_context_length(self):
err = ValueError("maximum context length exceeded")
assert _is_prompt_too_long(err) is True
def test_context_length_exceeded(self):
err = Exception("context_length_exceeded")
assert _is_prompt_too_long(err) is True
def test_input_tokens_exceed(self):
err = Exception("input tokens exceed the max_tokens limit")
assert _is_prompt_too_long(err) is True
def test_input_is_too_long(self):
err = Exception("input is too long for the model")
assert _is_prompt_too_long(err) is True
def test_content_length_exceeds(self):
err = Exception("content length exceeds maximum")
assert _is_prompt_too_long(err) is True
def test_unrelated_error_returns_false(self):
err = RuntimeError("network timeout")
assert _is_prompt_too_long(err) is False
def test_auth_error_returns_false(self):
err = Exception("authentication failed: invalid API key")
assert _is_prompt_too_long(err) is False
def test_chained_exception_detected(self):
"""Prompt-too-long error wrapped in another exception is detected."""
inner = RuntimeError("prompt is too long")
outer = Exception("SDK error")
outer.__cause__ = inner
assert _is_prompt_too_long(outer) is True
def test_case_insensitive(self):
err = Exception("PROMPT IS TOO LONG")
assert _is_prompt_too_long(err) is True
def test_old_max_tokens_exceeded_not_matched(self):
"""The old broad 'max_tokens_exceeded' pattern was removed.
Only 'input tokens exceed' should match now."""
err = Exception("max_tokens_exceeded")
assert _is_prompt_too_long(err) is False

View File

@@ -226,7 +226,7 @@ class SDKResponseAdapter:
responses.append(StreamFinish())
else:
logger.debug("Unhandled SDK message type: %s", type(sdk_message).__name__)
logger.debug(f"Unhandled SDK message type: {type(sdk_message).__name__}")
return responses

View File

@@ -52,7 +52,7 @@ def _validate_workspace_path(
if is_allowed_local_path(path, sdk_cwd):
return {}
logger.warning("Blocked %s outside workspace: %s", tool_name, path)
logger.warning(f"Blocked {tool_name} outside workspace: {path}")
workspace_hint = f" Allowed workspace: {sdk_cwd}" if sdk_cwd else ""
return _deny(
f"[SECURITY] Tool '{tool_name}' can only access files within the workspace "
@@ -71,7 +71,7 @@ def _validate_tool_access(
"""
# Block forbidden tools
if tool_name in BLOCKED_TOOLS:
logger.warning("Blocked tool access attempt: %s", tool_name)
logger.warning(f"Blocked tool access attempt: {tool_name}")
return _deny(
f"[SECURITY] Tool '{tool_name}' is blocked for security. "
"This is enforced by the platform and cannot be bypassed. "
@@ -89,9 +89,7 @@ def _validate_tool_access(
for pattern in DANGEROUS_PATTERNS:
if re.search(pattern, input_str, re.IGNORECASE):
logger.warning(
"Blocked dangerous pattern in tool input: %s in %s",
pattern,
tool_name,
f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}"
)
return _deny(
"[SECURITY] Input contains a blocked pattern. "
@@ -113,9 +111,7 @@ def _validate_user_isolation(
# the tool itself via _validate_ephemeral_path.
path = tool_input.get("path", "") or tool_input.get("file_path", "")
if path and ".." in path:
logger.warning(
"Blocked path traversal attempt: %s by user %s", path, user_id
)
logger.warning(f"Blocked path traversal attempt: {path} by user {user_id}")
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
@@ -174,7 +170,7 @@ def create_security_hooks(
# Block background task execution first — denied calls
# should not consume a subtask slot.
if tool_input.get("run_in_background"):
logger.info("[SDK] Blocked background Task, user=%s", user_id)
logger.info(f"[SDK] Blocked background Task, user={user_id}")
return cast(
SyncHookJSONOutput,
_deny(
@@ -185,9 +181,7 @@ def create_security_hooks(
)
if len(task_tool_use_ids) >= max_subtasks:
logger.warning(
"[SDK] Task limit reached (%d), user=%s",
max_subtasks,
user_id,
f"[SDK] Task limit reached ({max_subtasks}), user={user_id}"
)
return cast(
SyncHookJSONOutput,
@@ -218,7 +212,7 @@ def create_security_hooks(
if tool_name == "Task" and tool_use_id is not None:
task_tool_use_ids.add(tool_use_id)
logger.debug("[SDK] Tool start: %s, user=%s", tool_name, user_id)
logger.debug(f"[SDK] Tool start: {tool_name}, user={user_id}")
return cast(SyncHookJSONOutput, {})
def _release_task_slot(tool_name: str, tool_use_id: str | None) -> None:
@@ -288,11 +282,8 @@ def create_security_hooks(
tool_name = cast(str, input_data.get("tool_name", ""))
error = input_data.get("error", "Unknown error")
logger.warning(
"[SDK] Tool failed: %s, error=%s, user=%s, tool_use_id=%s",
tool_name,
str(error).replace("\n", "").replace("\r", ""),
user_id,
tool_use_id,
f"[SDK] Tool failed: {tool_name}, error={error}, "
f"user={user_id}, tool_use_id={tool_use_id}"
)
_release_task_slot(tool_name, tool_use_id)
@@ -310,19 +301,16 @@ def create_security_hooks(
This hook provides visibility into when compaction happens.
"""
_ = context, tool_use_id
trigger = input_data.get("trigger", "auto")
# Sanitize untrusted input before logging to prevent log injection
trigger = (
str(input_data.get("trigger", "auto"))
.replace("\n", "")
.replace("\r", "")
)
transcript_path = (
str(input_data.get("transcript_path", ""))
.replace("\n", "")
.replace("\r", "")
)
logger.info(
"[SDK] Context compaction triggered: %s, user=%s, transcript_path=%s",
"[SDK] Context compaction triggered: %s, user=%s, "
"transcript_path=%s",
trigger,
user_id,
transcript_path,

File diff suppressed because it is too large Load Diff

View File

@@ -1,283 +0,0 @@
"""Unit tests for extracted service helpers.
Covers ``_is_prompt_too_long``, ``_reduce_context``, ``_iter_sdk_messages``,
and the ``ReducedContext`` named tuple.
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncGenerator
from unittest.mock import AsyncMock, patch
import pytest
from .conftest import build_test_transcript as _build_transcript
from .service import (
ReducedContext,
_is_prompt_too_long,
_iter_sdk_messages,
_reduce_context,
)
# ---------------------------------------------------------------------------
# _is_prompt_too_long
# ---------------------------------------------------------------------------
class TestIsPromptTooLong:
def test_direct_match(self) -> None:
assert _is_prompt_too_long(Exception("prompt is too long")) is True
def test_case_insensitive(self) -> None:
assert _is_prompt_too_long(Exception("PROMPT IS TOO LONG")) is True
def test_no_match(self) -> None:
assert _is_prompt_too_long(Exception("network timeout")) is False
def test_request_too_large(self) -> None:
assert _is_prompt_too_long(Exception("request too large for model")) is True
def test_context_length_exceeded(self) -> None:
assert _is_prompt_too_long(Exception("context_length_exceeded")) is True
def test_max_tokens_exceeded_not_matched(self) -> None:
"""'max_tokens_exceeded' is intentionally excluded (too broad)."""
assert _is_prompt_too_long(Exception("max_tokens_exceeded")) is False
def test_max_tokens_config_error_no_match(self) -> None:
"""'max_tokens must be at least 1' should NOT match."""
assert _is_prompt_too_long(Exception("max_tokens must be at least 1")) is False
def test_chained_cause(self) -> None:
inner = Exception("prompt is too long")
outer = RuntimeError("SDK error")
outer.__cause__ = inner
assert _is_prompt_too_long(outer) is True
def test_chained_context(self) -> None:
inner = Exception("request too large")
outer = RuntimeError("wrapped")
outer.__context__ = inner
assert _is_prompt_too_long(outer) is True
def test_deep_chain(self) -> None:
bottom = Exception("maximum context length")
middle = RuntimeError("middle")
middle.__cause__ = bottom
top = ValueError("top")
top.__cause__ = middle
assert _is_prompt_too_long(top) is True
def test_chain_no_match(self) -> None:
inner = Exception("rate limit exceeded")
outer = RuntimeError("wrapped")
outer.__cause__ = inner
assert _is_prompt_too_long(outer) is False
def test_cycle_detection(self) -> None:
"""Exception chain with a cycle should not infinite-loop."""
a = Exception("error a")
b = Exception("error b")
a.__cause__ = b
b.__cause__ = a # cycle
assert _is_prompt_too_long(a) is False
def test_all_patterns(self) -> None:
patterns = [
"prompt is too long",
"request too large",
"maximum context length",
"context_length_exceeded",
"input tokens exceed",
"input is too long",
"content length exceeds",
]
for pattern in patterns:
assert _is_prompt_too_long(Exception(pattern)) is True, pattern
# ---------------------------------------------------------------------------
# _reduce_context
# ---------------------------------------------------------------------------
class TestReduceContext:
@pytest.mark.asyncio
async def test_first_retry_compaction_success(self) -> None:
transcript = _build_transcript([("user", "hi"), ("assistant", "hello")])
compacted = _build_transcript([("user", "hi"), ("assistant", "[summary]")])
with (
patch(
"backend.copilot.sdk.service.compact_transcript",
new_callable=AsyncMock,
return_value=compacted,
),
patch(
"backend.copilot.sdk.service.validate_transcript",
return_value=True,
),
patch(
"backend.copilot.sdk.service.write_transcript_to_tempfile",
return_value="/tmp/resume.jsonl",
),
):
ctx = await _reduce_context(
transcript, False, "sess-123", "/tmp/cwd", "[test]"
)
assert isinstance(ctx, ReducedContext)
assert ctx.use_resume is True
assert ctx.resume_file == "/tmp/resume.jsonl"
assert ctx.transcript_lost is False
assert ctx.tried_compaction is True
@pytest.mark.asyncio
async def test_compaction_fails_drops_transcript(self) -> None:
transcript = _build_transcript([("user", "hi"), ("assistant", "hello")])
with patch(
"backend.copilot.sdk.service.compact_transcript",
new_callable=AsyncMock,
return_value=None,
):
ctx = await _reduce_context(
transcript, False, "sess-123", "/tmp/cwd", "[test]"
)
assert ctx.use_resume is False
assert ctx.resume_file is None
assert ctx.transcript_lost is True
assert ctx.tried_compaction is True
@pytest.mark.asyncio
async def test_already_tried_compaction_skips(self) -> None:
transcript = _build_transcript([("user", "hi"), ("assistant", "hello")])
ctx = await _reduce_context(transcript, True, "sess-123", "/tmp/cwd", "[test]")
assert ctx.use_resume is False
assert ctx.transcript_lost is True
assert ctx.tried_compaction is True
@pytest.mark.asyncio
async def test_empty_transcript_drops(self) -> None:
ctx = await _reduce_context("", False, "sess-123", "/tmp/cwd", "[test]")
assert ctx.use_resume is False
assert ctx.transcript_lost is True
@pytest.mark.asyncio
async def test_compaction_returns_same_content_drops(self) -> None:
transcript = _build_transcript([("user", "hi"), ("assistant", "hello")])
with patch(
"backend.copilot.sdk.service.compact_transcript",
new_callable=AsyncMock,
return_value=transcript, # same content
):
ctx = await _reduce_context(
transcript, False, "sess-123", "/tmp/cwd", "[test]"
)
assert ctx.transcript_lost is True
@pytest.mark.asyncio
async def test_write_tempfile_fails_drops(self) -> None:
transcript = _build_transcript([("user", "hi"), ("assistant", "hello")])
compacted = _build_transcript([("user", "hi"), ("assistant", "[summary]")])
with (
patch(
"backend.copilot.sdk.service.compact_transcript",
new_callable=AsyncMock,
return_value=compacted,
),
patch(
"backend.copilot.sdk.service.validate_transcript",
return_value=True,
),
patch(
"backend.copilot.sdk.service.write_transcript_to_tempfile",
return_value=None,
),
):
ctx = await _reduce_context(
transcript, False, "sess-123", "/tmp/cwd", "[test]"
)
assert ctx.transcript_lost is True
# ---------------------------------------------------------------------------
# _iter_sdk_messages
# ---------------------------------------------------------------------------
class TestIterSdkMessages:
@pytest.mark.asyncio
async def test_yields_messages(self) -> None:
messages = ["msg1", "msg2", "msg3"]
client = AsyncMock()
async def _fake_receive() -> AsyncGenerator[str]:
for m in messages:
yield m
client.receive_response = _fake_receive
result = [msg async for msg in _iter_sdk_messages(client)]
assert result == messages
@pytest.mark.asyncio
async def test_heartbeat_on_timeout(self) -> None:
"""Yields None when asyncio.wait times out."""
client = AsyncMock()
received: list = []
async def _slow_receive() -> AsyncGenerator[str]:
await asyncio.sleep(100) # never completes
yield "never" # pragma: no cover — unreachable, yield makes this an async generator
client.receive_response = _slow_receive
with patch("backend.copilot.sdk.service._HEARTBEAT_INTERVAL", 0.01):
count = 0
async for msg in _iter_sdk_messages(client):
received.append(msg)
count += 1
if count >= 3:
break
assert all(m is None for m in received)
@pytest.mark.asyncio
async def test_exception_propagates(self) -> None:
client = AsyncMock()
async def _error_receive() -> AsyncGenerator[str]:
raise RuntimeError("SDK crash")
yield # pragma: no cover — unreachable, yield makes this an async generator
client.receive_response = _error_receive
with pytest.raises(RuntimeError, match="SDK crash"):
async for _ in _iter_sdk_messages(client):
pass
@pytest.mark.asyncio
async def test_task_cleanup_on_break(self) -> None:
"""Pending task is cancelled when generator is closed."""
client = AsyncMock()
async def _slow_receive() -> AsyncGenerator[str]:
yield "first"
await asyncio.sleep(100)
yield "second"
client.receive_response = _slow_receive
gen = _iter_sdk_messages(client)
first = await gen.__anext__()
assert first == "first"
await gen.aclose() # should cancel pending task cleanly

View File

@@ -234,9 +234,7 @@ def create_tool_handler(base_tool: BaseTool):
try:
return await _execute_tool_sync(base_tool, user_id, session, args)
except Exception as e:
logger.error(
"Error executing tool %s: %s", base_tool.name, e, exc_info=True
)
logger.error(f"Error executing tool {base_tool.name}: {e}", exc_info=True)
return _mcp_error(f"Failed to execute {base_tool.name}: {e}")
return tool_handler

View File

@@ -10,9 +10,6 @@ Storage is handled via ``WorkspaceStorageBackend`` (GCS in prod, local
filesystem for self-hosted) — no DB column needed.
"""
from __future__ import annotations
import asyncio
import logging
import os
import re
@@ -20,12 +17,8 @@ import shutil
import time
from dataclasses import dataclass
from pathlib import Path
from uuid import uuid4
from backend.util import json
from backend.util.clients import get_openai_client
from backend.util.prompt import CompressResult, compress_context
from backend.util.workspace_storage import GCSWorkspaceStorage, get_workspace_storage
logger = logging.getLogger(__name__)
@@ -106,14 +99,7 @@ def strip_progress_entries(content: str) -> str:
continue
parent = entry.get("parentUuid", "")
original_parent = parent
# seen_parents is local per-entry (not shared across iterations) so
# it can only detect cycles within a single ancestry walk, not across
# entries. This is intentional: each entry's parent chain is
# independent, and reusing a global set would incorrectly short-circuit
# valid re-use of the same UUID as a parent in different subtrees.
seen_parents: set[str] = set()
while parent in stripped_uuids and parent not in seen_parents:
seen_parents.add(parent)
while parent in stripped_uuids:
parent = uuid_to_parent.get(parent, "")
if parent != original_parent:
entry["parentUuid"] = parent
@@ -341,7 +327,7 @@ def write_transcript_to_tempfile(
# Validate cwd is under the expected sandbox prefix (CodeQL sanitizer).
real_cwd = os.path.realpath(cwd)
if not real_cwd.startswith(_SAFE_CWD_PREFIX):
logger.warning("[Transcript] cwd outside sandbox: %s", cwd)
logger.warning(f"[Transcript] cwd outside sandbox: {cwd}")
return None
try:
@@ -351,17 +337,17 @@ def write_transcript_to_tempfile(
os.path.join(real_cwd, f"transcript-{safe_id}.jsonl")
)
if not jsonl_path.startswith(real_cwd):
logger.warning("[Transcript] Path escaped cwd: %s", jsonl_path)
logger.warning(f"[Transcript] Path escaped cwd: {jsonl_path}")
return None
with open(jsonl_path, "w") as f:
f.write(transcript_content)
logger.info("[Transcript] Wrote resume file: %s", jsonl_path)
logger.info(f"[Transcript] Wrote resume file: {jsonl_path}")
return jsonl_path
except OSError as e:
logger.warning("[Transcript] Failed to write resume file: %s", e)
logger.warning(f"[Transcript] Failed to write resume file: {e}")
return None
@@ -422,6 +408,8 @@ def _meta_storage_path_parts(user_id: str, session_id: str) -> tuple[str, str, s
def _build_path_from_parts(parts: tuple[str, str, str], backend: object) -> str:
"""Build a full storage path from (workspace_id, file_id, filename) parts."""
from backend.util.workspace_storage import GCSWorkspaceStorage
wid, fid, fname = parts
if isinstance(backend, GCSWorkspaceStorage):
blob = f"workspaces/{wid}/{fid}/{fname}"
@@ -460,15 +448,17 @@ async def upload_transcript(
content: Complete JSONL transcript (from TranscriptBuilder).
message_count: ``len(session.messages)`` at upload time.
"""
from backend.util.workspace_storage import get_workspace_storage
# Strip metadata entries (progress, file-history-snapshot, etc.)
# Note: SDK-built transcripts shouldn't have these, but strip for safety
stripped = strip_progress_entries(content)
if not validate_transcript(stripped):
# Log entry types for debugging — helps identify why validation failed
entry_types = [
json.loads(line, fallback={"type": "INVALID_JSON"}).get("type", "?")
for line in stripped.strip().split("\n")
]
entry_types: list[str] = []
for line in stripped.strip().split("\n"):
entry = json.loads(line, fallback={"type": "INVALID_JSON"})
entry_types.append(entry.get("type", "?"))
logger.warning(
"%s Skipping upload — stripped content not valid "
"(types=%s, stripped_len=%d, raw_len=%d)",
@@ -504,14 +494,11 @@ async def upload_transcript(
content=json.dumps(meta).encode("utf-8"),
)
except Exception as e:
logger.warning("%s Failed to write metadata: %s", log_prefix, e)
logger.warning(f"{log_prefix} Failed to write metadata: {e}")
logger.info(
"%s Uploaded %dB (stripped from %dB, msg_count=%d)",
log_prefix,
len(encoded),
len(content),
message_count,
f"{log_prefix} Uploaded {len(encoded)}B "
f"(stripped from {len(content)}B, msg_count={message_count})"
)
@@ -525,6 +512,8 @@ async def download_transcript(
Returns a ``TranscriptDownload`` with the JSONL content and the
``message_count`` watermark from the upload, or ``None`` if not found.
"""
from backend.util.workspace_storage import get_workspace_storage
storage = await get_workspace_storage()
path = _build_storage_path(user_id, session_id, storage)
@@ -532,10 +521,10 @@ async def download_transcript(
data = await storage.retrieve(path)
content = data.decode("utf-8")
except FileNotFoundError:
logger.debug("%s No transcript in storage", log_prefix)
logger.debug(f"{log_prefix} No transcript in storage")
return None
except Exception as e:
logger.warning("%s Failed to download transcript: %s", log_prefix, e)
logger.warning(f"{log_prefix} Failed to download transcript: {e}")
return None
# Try to load metadata (best-effort — old transcripts won't have it)
@@ -547,14 +536,10 @@ async def download_transcript(
meta = json.loads(meta_data.decode("utf-8"), fallback={})
message_count = meta.get("message_count", 0)
uploaded_at = meta.get("uploaded_at", 0.0)
except FileNotFoundError:
except (FileNotFoundError, Exception):
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
except Exception as e:
logger.debug("%s Failed to load transcript metadata: %s", log_prefix, e)
logger.info(
"%s Downloaded %dB (msg_count=%d)", log_prefix, len(content), message_count
)
logger.info(f"{log_prefix} Downloaded {len(content)}B (msg_count={message_count})")
return TranscriptDownload(
content=content,
message_count=message_count,
@@ -568,6 +553,8 @@ async def delete_transcript(user_id: str, session_id: str) -> None:
Removes both the ``.jsonl`` transcript and the companion ``.meta.json``
so stale ``message_count`` watermarks cannot corrupt gap-fill logic.
"""
from backend.util.workspace_storage import get_workspace_storage
storage = await get_workspace_storage()
path = _build_storage_path(user_id, session_id, storage)
@@ -584,280 +571,3 @@ async def delete_transcript(user_id: str, session_id: str) -> None:
logger.info("[Transcript] Deleted metadata for session %s", session_id)
except Exception as e:
logger.warning("[Transcript] Failed to delete metadata: %s", e)
# ---------------------------------------------------------------------------
# Transcript compaction — LLM summarization for prompt-too-long recovery
# ---------------------------------------------------------------------------
# JSONL protocol values used in transcript serialization.
STOP_REASON_END_TURN = "end_turn"
COMPACT_MSG_ID_PREFIX = "msg_compact_"
ENTRY_TYPE_MESSAGE = "message"
def _flatten_assistant_content(blocks: list) -> str:
"""Flatten assistant content blocks into a single plain-text string.
Structured ``tool_use`` blocks are converted to ``[tool_use: name]``
placeholders. This is intentional: ``compress_context`` requires plain
text for token counting and LLM summarization. The structural loss is
acceptable because compaction only runs when the original transcript was
already too large for the model — a summarized plain-text version is
better than no context at all.
"""
parts: list[str] = []
for block in blocks:
if isinstance(block, dict):
btype = block.get("type", "")
if btype == "text":
parts.append(block.get("text", ""))
elif btype == "tool_use":
parts.append(f"[tool_use: {block.get('name', '?')}]")
else:
# Preserve non-text blocks (e.g. image) as placeholders.
# Use __prefix__ to distinguish from literal user text.
parts.append(f"[__{btype}__]")
elif isinstance(block, str):
parts.append(block)
return "\n".join(parts) if parts else ""
def _flatten_tool_result_content(blocks: list) -> str:
"""Flatten tool_result and other content blocks into plain text.
Handles nested tool_result structures, text blocks, and raw strings.
Uses ``json.dumps`` as fallback for dict blocks without a ``text`` key
or where ``text`` is ``None``.
Like ``_flatten_assistant_content``, structured blocks (images, nested
tool results) are reduced to text representations for compression.
"""
str_parts: list[str] = []
for block in blocks:
if isinstance(block, dict) and block.get("type") == "tool_result":
inner = block.get("content") or ""
if isinstance(inner, list):
for sub in inner:
if isinstance(sub, dict):
sub_type = sub.get("type")
if sub_type in ("image", "document"):
# Avoid serializing base64 binary data into
# the compaction input — use a placeholder.
str_parts.append(f"[__{sub_type}__]")
elif sub_type == "text" or sub.get("text") is not None:
str_parts.append(str(sub.get("text", "")))
else:
str_parts.append(json.dumps(sub))
else:
str_parts.append(str(sub))
else:
str_parts.append(str(inner))
elif isinstance(block, dict) and block.get("type") == "text":
str_parts.append(str(block.get("text", "")))
elif isinstance(block, dict):
# Preserve non-text/non-tool_result blocks (e.g. image) as placeholders.
# Use __prefix__ to distinguish from literal user text.
btype = block.get("type", "unknown")
str_parts.append(f"[__{btype}__]")
elif isinstance(block, str):
str_parts.append(block)
return "\n".join(str_parts) if str_parts else ""
def _transcript_to_messages(content: str) -> list[dict]:
"""Convert JSONL transcript entries to plain message dicts for compression.
Parses each line of the JSONL *content*, skips strippable metadata entries
(progress, file-history-snapshot, etc.), and extracts the ``role`` and
flattened ``content`` from the ``message`` field of each remaining entry.
Structured content blocks (``tool_use``, ``tool_result``, images) are
flattened to plain text via ``_flatten_assistant_content`` and
``_flatten_tool_result_content`` so that ``compress_context`` can
perform token counting and LLM summarization on uniform strings.
Returns:
A list of ``{"role": str, "content": str}`` dicts suitable for
``compress_context``.
"""
messages: list[dict] = []
for line in content.strip().split("\n"):
if not line.strip():
continue
entry = json.loads(line, fallback=None)
if not isinstance(entry, dict):
continue
if entry.get("type", "") in STRIPPABLE_TYPES and not entry.get(
"isCompactSummary"
):
continue
msg = entry.get("message", {})
role = msg.get("role", "")
if not role:
continue
msg_dict: dict = {"role": role}
raw_content = msg.get("content")
if role == "assistant" and isinstance(raw_content, list):
msg_dict["content"] = _flatten_assistant_content(raw_content)
elif isinstance(raw_content, list):
msg_dict["content"] = _flatten_tool_result_content(raw_content)
else:
msg_dict["content"] = raw_content or ""
messages.append(msg_dict)
return messages
def _messages_to_transcript(messages: list[dict]) -> str:
"""Convert compressed message dicts back to JSONL transcript format.
Rebuilds a minimal JSONL transcript from the ``{"role", "content"}``
dicts returned by ``compress_context``. Each message becomes one JSONL
line with a fresh ``uuid`` / ``parentUuid`` chain so the CLI's
``--resume`` flag can reconstruct a valid conversation tree.
Assistant messages are wrapped in the full ``message`` envelope
(``id``, ``model``, ``stop_reason``, structured ``content`` blocks)
that the CLI expects. User messages use the simpler ``{role, content}``
form.
Returns:
A newline-terminated JSONL string, or an empty string if *messages*
is empty.
"""
lines: list[str] = []
last_uuid: str = "" # root entry uses empty string, not null
for msg in messages:
role = msg.get("role", "user")
entry_type = "assistant" if role == "assistant" else "user"
uid = str(uuid4())
content = msg.get("content", "")
if role == "assistant":
message: dict = {
"role": "assistant",
"model": "",
"id": f"{COMPACT_MSG_ID_PREFIX}{uuid4().hex[:24]}",
"type": ENTRY_TYPE_MESSAGE,
"content": [{"type": "text", "text": content}] if content else [],
"stop_reason": STOP_REASON_END_TURN,
"stop_sequence": None,
}
else:
message = {"role": role, "content": content}
entry = {
"type": entry_type,
"uuid": uid,
"parentUuid": last_uuid,
"message": message,
}
lines.append(json.dumps(entry, separators=(",", ":")))
last_uuid = uid
return "\n".join(lines) + "\n" if lines else ""
_COMPACTION_TIMEOUT_SECONDS = 60
_TRUNCATION_TIMEOUT_SECONDS = 30
async def _run_compression(
messages: list[dict],
model: str,
log_prefix: str,
) -> CompressResult:
"""Run LLM-based compression with truncation fallback.
Uses the shared OpenAI client from ``get_openai_client()``.
If no client is configured or the LLM call fails, falls back to
truncation-based compression which drops older messages without
summarization.
A 60-second timeout prevents a hung LLM call from blocking the
retry path indefinitely. The truncation fallback also has a
30-second timeout to guard against slow tokenization on very large
transcripts.
"""
client = get_openai_client()
if client is None:
logger.warning("%s No OpenAI client configured, using truncation", log_prefix)
return await asyncio.wait_for(
compress_context(messages=messages, model=model, client=None),
timeout=_TRUNCATION_TIMEOUT_SECONDS,
)
try:
return await asyncio.wait_for(
compress_context(messages=messages, model=model, client=client),
timeout=_COMPACTION_TIMEOUT_SECONDS,
)
except Exception as e:
logger.warning("%s LLM compaction failed, using truncation: %s", log_prefix, e)
return await asyncio.wait_for(
compress_context(messages=messages, model=model, client=None),
timeout=_TRUNCATION_TIMEOUT_SECONDS,
)
async def compact_transcript(
content: str,
*,
model: str,
log_prefix: str = "[Transcript]",
) -> str | None:
"""Compact an oversized JSONL transcript using LLM summarization.
Converts transcript entries to plain messages, runs ``compress_context``
(the same compressor used for pre-query history), and rebuilds JSONL.
Structured content (``tool_use`` blocks, ``tool_result`` nesting, images)
is flattened to plain text for compression. This matches the fidelity of
the Plan C (DB compression) fallback path, where
``_format_conversation_context`` similarly renders tool calls as
``You called tool: name(args)`` and results as ``Tool result: ...``.
Neither path preserves structured API content blocks — the compacted
context serves as text history for the LLM, which creates proper
structured tool calls going forward.
Images are per-turn attachments loaded from workspace storage by file ID
(via ``_prepare_file_attachments``), not part of the conversation history.
They are re-attached each turn and are unaffected by compaction.
Returns the compacted JSONL string, or ``None`` on failure.
See also:
``_compress_messages`` in ``service.py`` — compresses ``ChatMessage``
lists for pre-query DB history. Both share ``compress_context()``
but operate on different input formats (JSONL transcript entries
here vs. ChatMessage dicts there).
"""
messages = _transcript_to_messages(content)
if len(messages) < 2:
logger.warning("%s Too few messages to compact (%d)", log_prefix, len(messages))
return None
try:
result = await _run_compression(messages, model, log_prefix)
if not result.was_compacted:
# Compressor says it's within budget, but the SDK rejected it.
# Return None so the caller falls through to DB fallback.
logger.warning(
"%s Compressor reports within budget but SDK rejected — "
"signalling failure",
log_prefix,
)
return None
logger.info(
"%s Compacted transcript: %d->%d tokens (%d summarized, %d dropped)",
log_prefix,
result.original_token_count,
result.token_count,
result.messages_summarized,
result.messages_dropped,
)
compacted = _messages_to_transcript(result.messages)
if not validate_transcript(compacted):
logger.warning("%s Compacted transcript failed validation", log_prefix)
return None
return compacted
except Exception as e:
logger.error(
"%s Transcript compaction failed: %s", log_prefix, e, exc_info=True
)
return None

View File

@@ -68,7 +68,7 @@ class TranscriptBuilder:
type=entry_type,
uuid=data.get("uuid") or str(uuid4()),
parentUuid=data.get("parentUuid"),
isCompactSummary=data.get("isCompactSummary"),
isCompactSummary=data.get("isCompactSummary") or None,
message=data.get("message", {}),
)

View File

@@ -1,7 +1,7 @@
"""Unit tests for JSONL transcript management utilities."""
import os
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, patch
import pytest
@@ -382,7 +382,7 @@ class TestDeleteTranscript:
mock_storage.delete = AsyncMock()
with patch(
"backend.copilot.sdk.transcript.get_workspace_storage",
"backend.util.workspace_storage.get_workspace_storage",
new_callable=AsyncMock,
return_value=mock_storage,
):
@@ -402,7 +402,7 @@ class TestDeleteTranscript:
)
with patch(
"backend.copilot.sdk.transcript.get_workspace_storage",
"backend.util.workspace_storage.get_workspace_storage",
new_callable=AsyncMock,
return_value=mock_storage,
):
@@ -420,7 +420,7 @@ class TestDeleteTranscript:
)
with patch(
"backend.copilot.sdk.transcript.get_workspace_storage",
"backend.util.workspace_storage.get_workspace_storage",
new_callable=AsyncMock,
return_value=mock_storage,
):
@@ -897,134 +897,3 @@ class TestCompactionFlowIntegration:
output2 = builder2.to_jsonl()
lines2 = [json.loads(line) for line in output2.strip().split("\n")]
assert lines2[-1]["parentUuid"] == "a2"
# ---------------------------------------------------------------------------
# _run_compression (direct tests for the 3 code paths)
# ---------------------------------------------------------------------------
class TestRunCompression:
"""Direct tests for ``_run_compression`` covering all 3 code paths.
Paths:
(a) No OpenAI client configured → truncation fallback immediately.
(b) LLM success → returns LLM-compressed result.
(c) LLM call raises → truncation fallback.
"""
def _make_compress_result(self, was_compacted: bool, msgs=None):
"""Build a minimal CompressResult-like object."""
from types import SimpleNamespace
return SimpleNamespace(
was_compacted=was_compacted,
messages=msgs or [{"role": "user", "content": "summary"}],
original_token_count=500,
token_count=100 if was_compacted else 500,
messages_summarized=2 if was_compacted else 0,
messages_dropped=0,
)
@pytest.mark.asyncio
async def test_no_client_uses_truncation(self):
"""Path (a): ``get_openai_client()`` returns None → truncation only."""
from .transcript import _run_compression
truncation_result = self._make_compress_result(
True, [{"role": "user", "content": "truncated"}]
)
with (
patch(
"backend.copilot.sdk.transcript.get_openai_client",
return_value=None,
),
patch(
"backend.copilot.sdk.transcript.compress_context",
new_callable=AsyncMock,
return_value=truncation_result,
) as mock_compress,
):
result = await _run_compression(
[{"role": "user", "content": "hello"}],
model="test-model",
log_prefix="[test]",
)
# compress_context called with client=None (truncation mode)
call_kwargs = mock_compress.call_args
assert (
call_kwargs.kwargs.get("client") is None
or (call_kwargs.args and call_kwargs.args[2] is None)
or mock_compress.call_args[1].get("client") is None
)
assert result is truncation_result
@pytest.mark.asyncio
async def test_llm_success_returns_llm_result(self):
"""Path (b): ``get_openai_client()`` returns a client → LLM compresses."""
from .transcript import _run_compression
llm_result = self._make_compress_result(
True, [{"role": "user", "content": "LLM summary"}]
)
mock_client = MagicMock()
with (
patch(
"backend.copilot.sdk.transcript.get_openai_client",
return_value=mock_client,
),
patch(
"backend.copilot.sdk.transcript.compress_context",
new_callable=AsyncMock,
return_value=llm_result,
) as mock_compress,
):
result = await _run_compression(
[{"role": "user", "content": "long conversation"}],
model="test-model",
log_prefix="[test]",
)
# compress_context called with the real client
assert mock_compress.called
assert result is llm_result
@pytest.mark.asyncio
async def test_llm_failure_falls_back_to_truncation(self):
"""Path (c): LLM call raises → truncation fallback used instead."""
from .transcript import _run_compression
truncation_result = self._make_compress_result(
True, [{"role": "user", "content": "truncated fallback"}]
)
mock_client = MagicMock()
call_count = [0]
async def _compress_side_effect(**kwargs):
call_count[0] += 1
if kwargs.get("client") is not None:
raise RuntimeError("LLM timeout")
return truncation_result
with (
patch(
"backend.copilot.sdk.transcript.get_openai_client",
return_value=mock_client,
),
patch(
"backend.copilot.sdk.transcript.compress_context",
side_effect=_compress_side_effect,
),
):
result = await _run_compression(
[{"role": "user", "content": "long conversation"}],
model="test-model",
log_prefix="[test]",
)
# compress_context called twice: once for LLM (raises), once for truncation
assert call_count[0] == 2
assert result is truncation_result

View File

@@ -22,13 +22,11 @@ class AddUnderstandingTool(BaseTool):
@property
def description(self) -> str:
return """Capture and store information about the user's business context,
workflows, pain points, and automation goals. Call this tool whenever the user
shares information about their business. Each call incrementally adds to the
existing understanding - you don't need to provide all fields at once.
Use this to build a comprehensive profile that helps recommend better agents
and automations for the user's specific needs."""
return (
"Store user's business context, workflows, pain points, and automation goals. "
"Call whenever the user shares business info. Each call incrementally merges "
"with existing data — provide only the fields you have."
)
@property
def parameters(self) -> dict[str, Any]:

View File

@@ -408,18 +408,11 @@ class BrowserNavigateTool(BaseTool):
@property
def description(self) -> str:
return (
"Navigate to a URL using a real browser. Returns an accessibility "
"tree snapshot listing the page's interactive elements with @ref IDs "
"(e.g. @e3) that can be used with browser_act. "
"Session persists — cookies and login state carry over between calls. "
"Use this (with browser_act) for multi-step interaction: login flows, "
"form filling, button clicks, or anything requiring page interaction. "
"For plain static pages, prefer web_fetch — no browser overhead. "
"For authenticated pages: navigate to the login page first, use browser_act "
"to fill credentials and submit, then navigate to the target page. "
"Note: for slow SPAs, the returned snapshot may reflect a partially-loaded "
"state. If elements seem missing, use browser_act with action='wait' and a "
"CSS selector or millisecond delay, then take a browser_screenshot to verify."
"Navigate to a URL in a real browser. Returns accessibility tree with @ref IDs "
"for browser_act. Session persists (cookies/auth carry over). "
"For static pages, prefer web_fetch. "
"For SPAs, elements may load late — use browser_act with wait + browser_screenshot to verify. "
"For auth: navigate to login, fill creds with browser_act, then navigate to target."
)
@property
@@ -429,13 +422,13 @@ class BrowserNavigateTool(BaseTool):
"properties": {
"url": {
"type": "string",
"description": "The HTTP/HTTPS URL to navigate to.",
"description": "HTTP/HTTPS URL to navigate to.",
},
"wait_for": {
"type": "string",
"enum": ["networkidle", "load", "domcontentloaded"],
"default": "networkidle",
"description": "When to consider navigation complete. Use 'networkidle' for SPAs (default).",
"description": "Navigation completion strategy (default: networkidle).",
},
},
"required": ["url"],
@@ -554,14 +547,12 @@ class BrowserActTool(BaseTool):
@property
def description(self) -> str:
return (
"Interact with the current browser page. Use @ref IDs from the "
"snapshot (e.g. '@e3') to target elements. Returns an updated snapshot. "
"Supported actions: click, dblclick, fill, type, scroll, hover, press, "
"Interact with the current browser page using @ref IDs from the snapshot. "
"Actions: click, dblclick, fill, type, scroll, hover, press, "
"check, uncheck, select, wait, back, forward, reload. "
"fill clears the field before typing; type appends without clearing. "
"wait accepts a CSS selector (waits for element) or milliseconds string (e.g. '1000'). "
"Example login flow: fill @e1 with email → fill @e2 with password → "
"click @e3 (submit) → browser_navigate to the target page."
"fill clears field first; type appends. "
"wait accepts CSS selector or milliseconds (e.g. '1000'). "
"Returns updated snapshot."
)
@property
@@ -587,30 +578,21 @@ class BrowserActTool(BaseTool):
"forward",
"reload",
],
"description": "The action to perform.",
"description": "Action to perform.",
},
"target": {
"type": "string",
"description": (
"Element to target. Use @ref from snapshot (e.g. '@e3'), "
"a CSS selector, or a text description. "
"Required for: click, dblclick, fill, type, hover, check, uncheck, select. "
"For wait: a CSS selector to wait for, or milliseconds as a string (e.g. '1000')."
),
"description": "@ref ID (e.g. '@e3'), CSS selector, or text description.",
},
"value": {
"type": "string",
"description": (
"For fill/type: the text to enter. "
"For press: key name (e.g. 'Enter', 'Tab', 'Control+a'). "
"For select: the option value to select."
),
"description": "Text for fill/type, key for press (e.g. 'Enter'), option for select.",
},
"direction": {
"type": "string",
"enum": ["up", "down", "left", "right"],
"default": "down",
"description": "For scroll: direction to scroll.",
"description": "Scroll direction (default: down).",
},
},
"required": ["action"],
@@ -757,12 +739,10 @@ class BrowserScreenshotTool(BaseTool):
@property
def description(self) -> str:
return (
"Take a screenshot of the current browser page and save it to the workspace. "
"IMPORTANT: After calling this tool, immediately call read_workspace_file "
"with the returned file_id to display the image inline to the user — "
"the screenshot is not visible until you do this. "
"With annotate=true (default), @ref labels are overlaid on interactive "
"elements, making it easy to see which @ref ID maps to which element on screen."
"Screenshot the current browser page and save to workspace. "
"annotate=true overlays @ref labels on elements. "
"IMPORTANT: After calling, you MUST immediately call read_workspace_file with the "
"returned file_id to display the image inline."
)
@property
@@ -773,12 +753,12 @@ class BrowserScreenshotTool(BaseTool):
"annotate": {
"type": "boolean",
"default": True,
"description": "Overlay @ref labels on interactive elements (default: true).",
"description": "Overlay @ref labels (default: true).",
},
"filename": {
"type": "string",
"default": "screenshot.png",
"description": "Filename to save in the workspace.",
"description": "Workspace filename (default: screenshot.png).",
},
},
}

View File

@@ -108,22 +108,12 @@ class AgentOutputTool(BaseTool):
@property
def description(self) -> str:
return """Retrieve execution outputs from agents in the user's library.
Identify the agent using one of:
- agent_name: Fuzzy search in user's library
- library_agent_id: Exact library agent ID
- store_slug: Marketplace format 'username/agent-name'
Select which run to retrieve using:
- execution_id: Specific execution ID
- run_time: 'latest' (default), 'yesterday', 'last week', or ISO date 'YYYY-MM-DD'
Wait for completion (optional):
- wait_if_running: Max seconds to wait if execution is still running (0-300).
If the execution is running/queued, waits up to this many seconds for completion.
Returns current status on timeout. If already finished, returns immediately.
"""
return (
"Retrieve execution outputs from a library agent. "
"Identify by agent_name, library_agent_id, or store_slug. "
"Filter by execution_id or run_time. "
"Optionally wait for running executions."
)
@property
def parameters(self) -> dict[str, Any]:
@@ -132,32 +122,27 @@ class AgentOutputTool(BaseTool):
"properties": {
"agent_name": {
"type": "string",
"description": "Agent name to search for in user's library (fuzzy match)",
"description": "Agent name (fuzzy match).",
},
"library_agent_id": {
"type": "string",
"description": "Exact library agent ID",
"description": "Library agent ID.",
},
"store_slug": {
"type": "string",
"description": "Marketplace identifier: 'username/agent-slug'",
"description": "Marketplace 'username/agent-slug'.",
},
"execution_id": {
"type": "string",
"description": "Specific execution ID to retrieve",
"description": "Specific execution ID.",
},
"run_time": {
"type": "string",
"description": (
"Time filter: 'latest', 'yesterday', 'last week', or 'YYYY-MM-DD'"
),
"description": "Time filter: 'latest', today/yesterday/last week/last 7 days/last month/last 30 days, 'YYYY-MM-DD', or ISO datetime.",
},
"wait_if_running": {
"type": "integer",
"description": (
"Max seconds to wait if execution is still running (0-300). "
"If running, waits for completion. Returns current state on timeout."
),
"description": "Max seconds to wait if still running (0-300). Returns current state on timeout.",
},
},
"required": [],

View File

@@ -41,15 +41,9 @@ class BashExecTool(BaseTool):
@property
def description(self) -> str:
return (
"Execute a Bash command or script. "
"Full Bash scripting is supported (loops, conditionals, pipes, "
"functions, etc.). "
"The working directory is shared with the SDK Read/Write/Edit/Glob/Grep "
"tools — files created by either are immediately visible to both. "
"Execution is killed after the timeout (default 30s, max 120s). "
"Returns stdout and stderr. "
"Useful for file manipulation, data processing, running scripts, "
"and installing packages."
"Execute a Bash command or script. Shares filesystem with SDK file tools. "
"Useful for scripts, data processing, and package installation. "
"Killed after timeout (default 30s, max 120s)."
)
@property
@@ -59,13 +53,11 @@ class BashExecTool(BaseTool):
"properties": {
"command": {
"type": "string",
"description": "Bash command or script to execute.",
"description": "Bash command or script.",
},
"timeout": {
"type": "integer",
"description": (
"Max execution time in seconds (default 30, max 120)."
),
"description": "Max seconds (default 30, max 120).",
"default": 30,
},
},

View File

@@ -30,12 +30,7 @@ class ContinueRunBlockTool(BaseTool):
@property
def description(self) -> str:
return (
"Continue executing a block after human review approval. "
"Use this after a run_block call returned review_required. "
"Pass the review_id from the review_required response. "
"The block will execute with the original pre-approved input data."
)
return "Resume block execution after human review approval. Pass the review_id."
@property
def parameters(self) -> dict[str, Any]:
@@ -44,10 +39,7 @@ class ContinueRunBlockTool(BaseTool):
"properties": {
"review_id": {
"type": "string",
"description": (
"The review_id from a previous review_required response. "
"This resumes execution with the pre-approved input data."
),
"description": "review_id from the review_required response.",
},
},
"required": ["review_id"],

View File

@@ -23,12 +23,8 @@ class CreateAgentTool(BaseTool):
@property
def description(self) -> str:
return (
"Create a new agent workflow. Pass `agent_json` with the complete "
"agent graph JSON you generated using block schemas from find_block. "
"The tool validates, auto-fixes, and saves.\n\n"
"IMPORTANT: Before calling this tool, search for relevant existing agents "
"using find_library_agent that could be used as building blocks. "
"Pass their IDs in the library_agent_ids parameter."
"Create a new agent from JSON (nodes + links). Validates, auto-fixes, and saves. "
"Before calling, search for existing agents with find_library_agent."
)
@property
@@ -42,34 +38,21 @@ class CreateAgentTool(BaseTool):
"properties": {
"agent_json": {
"type": "object",
"description": (
"The agent JSON to validate and save. "
"Must contain 'nodes' and 'links' arrays, and optionally "
"'name' and 'description'."
),
"description": "Agent graph with 'nodes' and 'links' arrays.",
},
"library_agent_ids": {
"type": "array",
"items": {"type": "string"},
"description": (
"List of library agent IDs to use as building blocks."
),
"description": "Library agent IDs as building blocks.",
},
"save": {
"type": "boolean",
"description": (
"Whether to save the agent. Default is true. "
"Set to false for preview only."
),
"description": "Save the agent (default: true). False for preview.",
"default": True,
},
"folder_id": {
"type": "string",
"description": (
"Optional folder ID to save the agent into. "
"If not provided, the agent is saved at root level. "
"Use list_folders to find available folders."
),
"description": "Folder ID to save into (default: root).",
},
},
"required": ["agent_json"],

View File

@@ -23,9 +23,7 @@ class CustomizeAgentTool(BaseTool):
@property
def description(self) -> str:
return (
"Customize a marketplace or template agent. Pass `agent_json` "
"with the complete customized agent JSON. The tool validates, "
"auto-fixes, and saves."
"Customize a marketplace/template agent. Validates, auto-fixes, and saves."
)
@property
@@ -39,32 +37,21 @@ class CustomizeAgentTool(BaseTool):
"properties": {
"agent_json": {
"type": "object",
"description": (
"Complete customized agent JSON to validate and save. "
"Optionally include 'name' and 'description'."
),
"description": "Customized agent JSON with nodes and links.",
},
"library_agent_ids": {
"type": "array",
"items": {"type": "string"},
"description": (
"List of library agent IDs to use as building blocks."
),
"description": "Library agent IDs as building blocks.",
},
"save": {
"type": "boolean",
"description": (
"Whether to save the customized agent. Default is true."
),
"description": "Save the agent (default: true). False for preview.",
"default": True,
},
"folder_id": {
"type": "string",
"description": (
"Optional folder ID to save the agent into. "
"If not provided, the agent is saved at root level. "
"Use list_folders to find available folders."
),
"description": "Folder ID to save into (default: root).",
},
},
"required": ["agent_json"],

View File

@@ -41,7 +41,8 @@ import contextlib
import logging
from typing import Any, Awaitable, Callable, Literal
from e2b import AsyncSandbox, SandboxLifecycle
from e2b import AsyncSandbox
from e2b.sandbox.sandbox_api import SandboxLifecycle
from backend.data.redis_client import get_redis_async

View File

@@ -23,12 +23,8 @@ class EditAgentTool(BaseTool):
@property
def description(self) -> str:
return (
"Edit an existing agent. Pass `agent_json` with the complete "
"updated agent JSON you generated. The tool validates, auto-fixes, "
"and saves.\n\n"
"IMPORTANT: Before calling this tool, if the changes involve adding new "
"functionality, search for relevant existing agents using find_library_agent "
"that could be used as building blocks."
"Edit an existing agent. Validates, auto-fixes, and saves. "
"Before calling, search for existing agents with find_library_agent."
)
@property
@@ -42,33 +38,20 @@ class EditAgentTool(BaseTool):
"properties": {
"agent_id": {
"type": "string",
"description": (
"The ID of the agent to edit. "
"Can be a graph ID or library agent ID."
),
"description": "Graph ID or library agent ID to edit.",
},
"agent_json": {
"type": "object",
"description": (
"Complete updated agent JSON to validate and save. "
"Must contain 'nodes' and 'links'. "
"Include 'name' and/or 'description' if they need "
"to be updated."
),
"description": "Updated agent JSON with nodes and links.",
},
"library_agent_ids": {
"type": "array",
"items": {"type": "string"},
"description": (
"List of library agent IDs to use as building blocks for the changes."
),
"description": "Library agent IDs as building blocks.",
},
"save": {
"type": "boolean",
"description": (
"Whether to save the changes. "
"Default is true. Set to false for preview only."
),
"description": "Save changes (default: true). False for preview.",
"default": True,
},
},

View File

@@ -134,11 +134,7 @@ class SearchFeatureRequestsTool(BaseTool):
@property
def description(self) -> str:
return (
"Search existing feature requests to check if a similar request "
"already exists before creating a new one. Returns matching feature "
"requests with their ID, title, and description."
)
return "Search existing feature requests. Check before creating a new one."
@property
def parameters(self) -> dict[str, Any]:
@@ -234,14 +230,9 @@ class CreateFeatureRequestTool(BaseTool):
@property
def description(self) -> str:
return (
"Create a new feature request or add a customer need to an existing one. "
"Always search first with search_feature_requests to avoid duplicates. "
"If a matching request exists, pass its ID as existing_issue_id to add "
"the user's need to it instead of creating a duplicate. "
"IMPORTANT: Never include personally identifiable information (PII) in "
"the title or description — no names, emails, phone numbers, company "
"names, or other identifying details. Write titles and descriptions in "
"generic, feature-focused language."
"Create a feature request or add need to existing one. "
"Search first to avoid duplicates. Pass existing_issue_id to add to existing. "
"Never include PII (names, emails, phone numbers, company names) in title/description."
)
@property
@@ -251,28 +242,15 @@ class CreateFeatureRequestTool(BaseTool):
"properties": {
"title": {
"type": "string",
"description": (
"Title for the feature request. Must be generic and "
"feature-focused — do not include any user names, emails, "
"company names, or other PII."
),
"description": "Feature request title. No PII.",
},
"description": {
"type": "string",
"description": (
"Detailed description of what the user wants and why. "
"Must not contain any personally identifiable information "
"(PII) — describe the feature need generically without "
"referencing specific users, companies, or contact details."
),
"description": "What the user wants and why. No PII.",
},
"existing_issue_id": {
"type": "string",
"description": (
"If adding a need to an existing feature request, "
"provide its Linear issue ID (from search results). "
"Omit to create a new feature request."
),
"description": "Linear issue ID to add need to (from search results).",
},
},
"required": ["title", "description"],

View File

@@ -18,9 +18,7 @@ class FindAgentTool(BaseTool):
@property
def description(self) -> str:
return (
"Discover agents from the marketplace based on capabilities and user needs."
)
return "Search marketplace agents by capability."
@property
def parameters(self) -> dict[str, Any]:
@@ -29,7 +27,7 @@ class FindAgentTool(BaseTool):
"properties": {
"query": {
"type": "string",
"description": "Search query describing what the user wants to accomplish. Use single keywords for best results.",
"description": "Search keywords (single keywords work best).",
},
},
"required": ["query"],

View File

@@ -51,14 +51,7 @@ class FindBlockTool(BaseTool):
@property
def description(self) -> str:
return (
"Search for available blocks by name or description. "
"Blocks are reusable components that perform specific tasks like "
"sending emails, making API calls, processing text, etc. "
"IMPORTANT: Use this tool FIRST to get the block's 'id' before calling run_block. "
"The response includes each block's id, name, and description. "
"Call run_block with the block's id **with no inputs** to see detailed inputs/outputs and execute it."
)
return "Search blocks by name or description. Returns block IDs for run_block. Always call this FIRST to get block IDs before using run_block."
@property
def parameters(self) -> dict[str, Any]:
@@ -67,18 +60,11 @@ class FindBlockTool(BaseTool):
"properties": {
"query": {
"type": "string",
"description": (
"Search query to find blocks by name or description. "
"Use keywords like 'email', 'http', 'text', 'ai', etc."
),
"description": "Search keywords (e.g. 'email', 'http', 'ai').",
},
"include_schemas": {
"type": "boolean",
"description": (
"If true, include full input_schema and output_schema "
"for each block. Use when generating agent JSON that "
"needs block schemas. Default is false."
),
"description": "Include full input/output schemas (for agent JSON generation).",
"default": False,
},
},

View File

@@ -19,13 +19,8 @@ class FindLibraryAgentTool(BaseTool):
@property
def description(self) -> str:
return (
"Search for or list agents in the user's library. Use this to find "
"agents the user has already added to their library, including agents "
"they created or added from the marketplace. "
"When creating agents with sub-agent composition, use this to get "
"the agent's graph_id, graph_version, input_schema, and output_schema "
"needed for AgentExecutorBlock nodes. "
"Omit the query to list all agents."
"Search user's library agents. Returns graph_id, schemas for sub-agent composition. "
"Omit query to list all."
)
@property
@@ -35,10 +30,7 @@ class FindLibraryAgentTool(BaseTool):
"properties": {
"query": {
"type": "string",
"description": (
"Search query to find agents by name or description. "
"Omit to list all agents in the library."
),
"description": "Search by name/description. Omit to list all.",
},
},
"required": [],

View File

@@ -22,20 +22,8 @@ class FixAgentGraphTool(BaseTool):
@property
def description(self) -> str:
return (
"Auto-fix common issues in an agent JSON graph. Applies fixes for:\n"
"- Missing or invalid UUIDs on nodes and links\n"
"- StoreValueBlock prerequisites for ConditionBlock\n"
"- Double curly brace escaping in prompt templates\n"
"- AddToList/AddToDictionary prerequisite blocks\n"
"- CodeExecutionBlock output field naming\n"
"- Missing credentials configuration\n"
"- Node X coordinate spacing (800+ units apart)\n"
"- AI model default parameters\n"
"- Link static properties based on input schema\n"
"- Type mismatches (inserts conversion blocks)\n\n"
"Returns the fixed agent JSON plus a list of fixes applied. "
"After fixing, the agent is re-validated. If still invalid, "
"the remaining errors are included in the response."
"Auto-fix common agent JSON issues (UUIDs, types, credentials, spacing, etc.). "
"Returns fixed JSON and list of fixes applied."
)
@property

View File

@@ -42,12 +42,7 @@ class GetAgentBuildingGuideTool(BaseTool):
@property
def description(self) -> str:
return (
"Returns the complete guide for building agent JSON graphs, including "
"block IDs, link structure, AgentInputBlock, AgentOutputBlock, "
"AgentExecutorBlock (for sub-agent composition), and MCPToolBlock usage. "
"Call this before generating agent JSON to ensure correct structure."
)
return "Get the agent JSON building guide (nodes, links, AgentExecutorBlock, MCPToolBlock usage). Call before generating agent JSON."
@property
def parameters(self) -> dict[str, Any]:

View File

@@ -25,8 +25,7 @@ class GetDocPageTool(BaseTool):
@property
def description(self) -> str:
return (
"Get the full content of a documentation page by its path. "
"Use this after search_docs to read the complete content of a relevant page."
"Read full documentation page content by path (from search_docs results)."
)
@property
@@ -36,10 +35,7 @@ class GetDocPageTool(BaseTool):
"properties": {
"path": {
"type": "string",
"description": (
"The path to the documentation file, as returned by search_docs. "
"Example: 'platform/block-sdk-guide.md'"
),
"description": "Doc file path (e.g. 'platform/block-sdk-guide.md').",
},
},
"required": ["path"],

View File

@@ -38,11 +38,7 @@ class GetMCPGuideTool(BaseTool):
@property
def description(self) -> str:
return (
"Returns the MCP tool guide: known hosted server URLs (Notion, Linear, "
"Stripe, Intercom, Cloudflare, Atlassian) and authentication workflow. "
"Call before using run_mcp_tool if you need a server URL or auth info."
)
return "Get MCP server URLs and auth guide."
@property
def parameters(self) -> dict[str, Any]:

View File

@@ -88,10 +88,7 @@ class CreateFolderTool(BaseTool):
@property
def description(self) -> str:
return (
"Create a new folder in the user's library to organize agents. "
"Optionally nest it inside an existing folder using parent_id."
)
return "Create a library folder. Use parent_id to nest inside another folder."
@property
def requires_auth(self) -> bool:
@@ -104,22 +101,19 @@ class CreateFolderTool(BaseTool):
"properties": {
"name": {
"type": "string",
"description": "Name for the new folder (max 100 chars).",
"description": "Folder name (max 100 chars).",
},
"parent_id": {
"type": "string",
"description": (
"ID of the parent folder to nest inside. "
"Omit to create at root level."
),
"description": "Parent folder ID (omit for root).",
},
"icon": {
"type": "string",
"description": "Optional icon identifier for the folder.",
"description": "Icon identifier.",
},
"color": {
"type": "string",
"description": "Optional hex color code (#RRGGBB).",
"description": "Hex color (#RRGGBB).",
},
},
"required": ["name"],
@@ -175,13 +169,8 @@ class ListFoldersTool(BaseTool):
@property
def description(self) -> str:
return (
"List the user's library folders. "
"Omit parent_id to get the full folder tree. "
"Provide parent_id to list only direct children of that folder. "
"Set include_agents=true to also return the agents inside each folder "
"and root-level agents not in any folder. Always set include_agents=true "
"when the user asks about agents, wants to see what's in their folders, "
"or mentions agents alongside folders."
"List library folders. Omit parent_id for full tree. "
"Set include_agents=true when user asks about agents in folders."
)
@property
@@ -195,17 +184,11 @@ class ListFoldersTool(BaseTool):
"properties": {
"parent_id": {
"type": "string",
"description": (
"List children of this folder. "
"Omit to get the full folder tree."
),
"description": "List children of this folder (omit for full tree).",
},
"include_agents": {
"type": "boolean",
"description": (
"Whether to include the list of agents inside each folder. "
"Defaults to false."
),
"description": "Include agents in each folder (default: false).",
},
},
"required": [],
@@ -357,10 +340,7 @@ class MoveFolderTool(BaseTool):
@property
def description(self) -> str:
return (
"Move a folder to a different parent folder. "
"Set target_parent_id to null to move to root level."
)
return "Move a folder. Set target_parent_id to null for root."
@property
def requires_auth(self) -> bool:
@@ -373,14 +353,11 @@ class MoveFolderTool(BaseTool):
"properties": {
"folder_id": {
"type": "string",
"description": "ID of the folder to move.",
"description": "Folder ID.",
},
"target_parent_id": {
"type": ["string", "null"],
"description": (
"ID of the new parent folder. "
"Use null to move to root level."
),
"description": "New parent folder ID (null for root).",
},
},
"required": ["folder_id"],
@@ -433,10 +410,7 @@ class DeleteFolderTool(BaseTool):
@property
def description(self) -> str:
return (
"Delete a folder from the user's library. "
"Agents inside the folder are moved to root level (not deleted)."
)
return "Delete a folder. Agents inside move to root (not deleted)."
@property
def requires_auth(self) -> bool:
@@ -499,10 +473,7 @@ class MoveAgentsToFolderTool(BaseTool):
@property
def description(self) -> str:
return (
"Move one or more agents to a folder. "
"Set folder_id to null to move agents to root level."
)
return "Move agents to a folder. Set folder_id to null for root."
@property
def requires_auth(self) -> bool:
@@ -516,13 +487,11 @@ class MoveAgentsToFolderTool(BaseTool):
"agent_ids": {
"type": "array",
"items": {"type": "string"},
"description": "List of library agent IDs to move.",
"description": "Library agent IDs to move.",
},
"folder_id": {
"type": ["string", "null"],
"description": (
"Target folder ID. Use null to move to root level."
),
"description": "Target folder ID (null for root).",
},
},
"required": ["agent_ids"],

View File

@@ -104,19 +104,11 @@ class RunAgentTool(BaseTool):
@property
def description(self) -> str:
return """Run or schedule an agent from the marketplace or user's library.
The tool automatically handles the setup flow:
- Returns missing inputs if required fields are not provided
- Returns missing credentials if user needs to configure them
- Executes immediately if all requirements are met
- Schedules execution if cron expression is provided
Identify the agent using either:
- username_agent_slug: Marketplace format 'username/agent-name'
- library_agent_id: ID of an agent in the user's library
For scheduled execution, provide: schedule_name, cron, and optionally timezone."""
return (
"Run or schedule an agent. Automatically checks inputs and credentials. "
"Identify by username_agent_slug ('user/agent') or library_agent_id. "
"For scheduling, provide schedule_name + cron."
)
@property
def parameters(self) -> dict[str, Any]:
@@ -125,40 +117,36 @@ class RunAgentTool(BaseTool):
"properties": {
"username_agent_slug": {
"type": "string",
"description": "Agent identifier in format 'username/agent-name'",
"description": "Marketplace format 'username/agent-name'.",
},
"library_agent_id": {
"type": "string",
"description": "Library agent ID from user's library",
"description": "Library agent ID.",
},
"inputs": {
"type": "object",
"description": "Input values for the agent",
"description": "Input values for the agent.",
"additionalProperties": True,
},
"use_defaults": {
"type": "boolean",
"description": "Set to true to run with default values (user must confirm)",
"description": "Run with default values (confirm with user first).",
},
"schedule_name": {
"type": "string",
"description": "Name for scheduled execution (triggers scheduling mode)",
"description": "Name for scheduled execution.",
},
"cron": {
"type": "string",
"description": "Cron expression (5 fields: min hour day month weekday)",
"description": "Cron expression (min hour day month weekday).",
},
"timezone": {
"type": "string",
"description": "IANA timezone for schedule (default: UTC)",
"description": "IANA timezone (default: UTC).",
},
"wait_for_result": {
"type": "integer",
"description": (
"Max seconds to wait for execution to complete (0-300). "
"If >0, blocks until the execution finishes or times out. "
"Returns execution outputs when complete."
),
"description": "Max seconds to wait for completion (0-300).",
},
},
"required": [],

View File

@@ -45,13 +45,10 @@ class RunBlockTool(BaseTool):
@property
def description(self) -> str:
return (
"Execute a specific block with the provided input data. "
"IMPORTANT: You MUST call find_block first to get the block's 'id' - "
"do NOT guess or make up block IDs. "
"On first attempt (without input_data), returns detailed schema showing "
"required inputs and outputs. Then call again with proper input_data to execute. "
"If a block requires human review, use continue_run_block with the "
"review_id after the user approves."
"Execute a block. IMPORTANT: Always get block_id from find_block first "
"— do NOT guess or fabricate IDs. "
"Call with empty input_data to see schema, then with data to execute. "
"If review_required, use continue_run_block."
)
@property
@@ -61,28 +58,14 @@ class RunBlockTool(BaseTool):
"properties": {
"block_id": {
"type": "string",
"description": (
"The block's 'id' field from find_block results. "
"NEVER guess this - always get it from find_block first."
),
},
"block_name": {
"type": "string",
"description": (
"The block's human-readable name from find_block results. "
"Used for display purposes in the UI."
),
"description": "Block ID from find_block results.",
},
"input_data": {
"type": "object",
"description": (
"Input values for the block. "
"First call with empty {} to see the block's schema, "
"then call again with proper values to execute."
),
"description": "Input values. Use {} first to see schema.",
},
},
"required": ["block_id", "block_name", "input_data"],
"required": ["block_id", "input_data"],
}
@property

View File

@@ -57,10 +57,9 @@ class RunMCPToolTool(BaseTool):
@property
def description(self) -> str:
return (
"Connect to an MCP (Model Context Protocol) server to discover and execute its tools. "
"Two-step: (1) call with server_url to list available tools, "
"(2) call again with server_url + tool_name + tool_arguments to execute. "
"Call get_mcp_guide for known server URLs and auth details."
"Discover and execute MCP server tools. "
"Call with server_url only to list tools, then with tool_name + tool_arguments to execute. "
"Call get_mcp_guide first for server URLs and auth."
)
@property
@@ -70,24 +69,15 @@ class RunMCPToolTool(BaseTool):
"properties": {
"server_url": {
"type": "string",
"description": (
"URL of the MCP server (Streamable HTTP endpoint), "
"e.g. https://mcp.example.com/mcp"
),
"description": "MCP server URL (Streamable HTTP endpoint).",
},
"tool_name": {
"type": "string",
"description": (
"Name of the MCP tool to execute. "
"Omit on first call to discover available tools."
),
"description": "Tool to execute. Omit to discover available tools.",
},
"tool_arguments": {
"type": "object",
"description": (
"Arguments to pass to the selected tool. "
"Must match the tool's input schema returned during discovery."
),
"description": "Arguments matching the tool's input schema.",
},
},
"required": ["server_url"],

View File

@@ -38,11 +38,7 @@ class SearchDocsTool(BaseTool):
@property
def description(self) -> str:
return (
"Search the AutoGPT platform documentation for information about "
"how to use the platform, build agents, configure blocks, and more. "
"Returns relevant documentation sections. Use get_doc_page to read full content."
)
return "Search platform documentation by keyword. Use get_doc_page to read full results."
@property
def parameters(self) -> dict[str, Any]:
@@ -51,10 +47,7 @@ class SearchDocsTool(BaseTool):
"properties": {
"query": {
"type": "string",
"description": (
"Search query to find relevant documentation. "
"Use natural language to describe what you're looking for."
),
"description": "Documentation search query.",
},
},
"required": ["query"],

View File

@@ -0,0 +1,81 @@
"""Schema regression tests for all registered CoPilot tools.
Validates that every tool in TOOL_REGISTRY produces a well-formed schema:
- description is non-empty
- all `required` fields exist in `properties`
- every property has a `type` and `description`
- total token budget does not regress past 8000 tokens
"""
import json
import pytest
import tiktoken
from backend.copilot.tools import TOOL_REGISTRY
_TOKEN_BUDGET = 8_000
def _get_all_tool_schemas() -> list[tuple[str, object]]:
"""Return (tool_name, openai_schema) pairs for every registered tool."""
return [(name, tool.as_openai_tool()) for name, tool in TOOL_REGISTRY.items()]
_ALL_SCHEMAS = _get_all_tool_schemas()
@pytest.mark.parametrize(
"tool_name,schema",
_ALL_SCHEMAS,
ids=[name for name, _ in _ALL_SCHEMAS],
)
class TestToolSchema:
"""Validate schema invariants for every registered tool."""
def test_description_non_empty(self, tool_name: str, schema: dict) -> None:
desc = schema["function"].get("description", "")
assert desc, f"Tool '{tool_name}' has an empty description"
def test_required_fields_exist_in_properties(
self, tool_name: str, schema: dict
) -> None:
params = schema["function"].get("parameters", {})
properties = params.get("properties", {})
required = params.get("required", [])
for field in required:
assert field in properties, (
f"Tool '{tool_name}': required field '{field}' "
f"not found in properties {list(properties.keys())}"
)
def test_every_property_has_type_and_description(
self, tool_name: str, schema: dict
) -> None:
params = schema["function"].get("parameters", {})
properties = params.get("properties", {})
for prop_name, prop_def in properties.items():
assert (
"type" in prop_def
), f"Tool '{tool_name}', property '{prop_name}' is missing 'type'"
assert (
"description" in prop_def
), f"Tool '{tool_name}', property '{prop_name}' is missing 'description'"
def test_total_schema_token_budget() -> None:
"""Assert total tool schema size stays under the token budget.
This locks in the 34% token reduction from #12398 and prevents future
description bloat from eroding the gains. Budget is set to 8000 tokens.
Note: this measures tool JSON only (not the full system prompt); the actual
baseline for tool schemas alone is ~6470 tokens, giving ~19% headroom.
"""
schemas = [tool.as_openai_tool() for tool in TOOL_REGISTRY.values()]
serialized = json.dumps(schemas)
enc = tiktoken.get_encoding("cl100k_base")
total_tokens = len(enc.encode(serialized))
assert total_tokens < _TOKEN_BUDGET, (
f"Tool schemas use {total_tokens} tokens, exceeding budget of {_TOKEN_BUDGET}. "
f"Description bloat detected — trim descriptions or raise the budget intentionally."
)

View File

@@ -21,19 +21,7 @@ class ValidateAgentGraphTool(BaseTool):
@property
def description(self) -> str:
return (
"Validate an agent JSON graph for correctness. Checks:\n"
"- All block_ids reference real blocks\n"
"- All links reference valid source/sink nodes and fields\n"
"- Required input fields are wired or have defaults\n"
"- Data types are compatible across links\n"
"- Nested sink links use correct notation\n"
"- Prompt templates use proper curly brace escaping\n"
"- AgentExecutorBlock configurations are valid\n\n"
"Call this after generating agent JSON to verify correctness. "
"If validation fails, either fix issues manually based on the error "
"descriptions, or call fix_agent_graph to auto-fix common problems."
)
return "Validate agent JSON for correctness (block_ids, links, types, schemas). On failure, use fix_agent_graph to auto-fix."
@property
def requires_auth(self) -> bool:
@@ -46,11 +34,7 @@ class ValidateAgentGraphTool(BaseTool):
"properties": {
"agent_json": {
"type": "object",
"description": (
"The agent JSON to validate. Must contain 'nodes' and 'links' arrays. "
"Each node needs: id (UUID), block_id, input_default, metadata. "
"Each link needs: id (UUID), source_id, source_name, sink_id, sink_name."
),
"description": "Agent JSON with 'nodes' and 'links' arrays.",
},
},
"required": ["agent_json"],

View File

@@ -59,13 +59,7 @@ class WebFetchTool(BaseTool):
@property
def description(self) -> str:
return (
"Fetch the content of a public web page by URL. "
"Returns readable text extracted from HTML by default. "
"Useful for reading documentation, articles, and API responses. "
"Only supports HTTP/HTTPS GET requests to public URLs "
"(private/internal network addresses are blocked)."
)
return "Fetch a public web page. Public URLs only — internal addresses blocked. Returns readable text from HTML by default."
@property
def parameters(self) -> dict[str, Any]:
@@ -74,14 +68,11 @@ class WebFetchTool(BaseTool):
"properties": {
"url": {
"type": "string",
"description": "The public HTTP/HTTPS URL to fetch.",
"description": "Public HTTP/HTTPS URL.",
},
"extract_text": {
"type": "boolean",
"description": (
"If true (default), extract readable text from HTML. "
"If false, return raw content."
),
"description": "Extract text from HTML (default: true).",
"default": True,
},
},

View File

@@ -321,13 +321,7 @@ class ListWorkspaceFilesTool(BaseTool):
@property
def description(self) -> str:
return (
"List files in the user's persistent workspace (cloud storage). "
"These files survive across sessions. "
"For ephemeral session files, use the SDK Read/Glob tools instead. "
"Returns file names, paths, sizes, and metadata. "
"Optionally filter by path prefix."
)
return "List persistent workspace files. For ephemeral session files, use SDK Glob/Read instead. Optionally filter by path prefix."
@property
def parameters(self) -> dict[str, Any]:
@@ -336,24 +330,17 @@ class ListWorkspaceFilesTool(BaseTool):
"properties": {
"path_prefix": {
"type": "string",
"description": (
"Optional path prefix to filter files "
"(e.g., '/documents/' to list only files in documents folder). "
"By default, only files from the current session are listed."
),
"description": "Filter by path prefix (e.g. '/documents/').",
},
"limit": {
"type": "integer",
"description": "Maximum number of files to return (default 50, max 100)",
"description": "Max files to return (default 50, max 100).",
"minimum": 1,
"maximum": 100,
},
"include_all_sessions": {
"type": "boolean",
"description": (
"If true, list files from all sessions. "
"Default is false (only current session's files)."
),
"description": "Include files from all sessions (default: false).",
},
},
"required": [],
@@ -436,18 +423,10 @@ class ReadWorkspaceFileTool(BaseTool):
@property
def description(self) -> str:
return (
"Read a file from the user's persistent workspace (cloud storage). "
"These files survive across sessions. "
"For ephemeral session files, use the SDK Read tool instead. "
"Specify either file_id or path to identify the file. "
"For small text files, returns content directly. "
"For large or binary files, returns metadata and a download URL. "
"Use 'save_to_path' to copy the file to the working directory "
"(sandbox or ephemeral) for processing with bash_exec or file tools. "
"Use 'offset' and 'length' for paginated reads of large files "
"(e.g., persisted tool outputs). "
"Paths are scoped to the current session by default. "
"Use /sessions/<session_id>/... for cross-session access."
"Read a file from persistent workspace. Specify file_id or path. "
"Small text/image files return inline; large/binary return metadata+URL. "
"Use save_to_path to copy to working dir for processing. "
"Use offset/length for paginated reads."
)
@property
@@ -457,48 +436,30 @@ class ReadWorkspaceFileTool(BaseTool):
"properties": {
"file_id": {
"type": "string",
"description": "The file's unique ID (from list_workspace_files)",
"description": "File ID from list_workspace_files.",
},
"path": {
"type": "string",
"description": (
"The virtual file path (e.g., '/documents/report.pdf'). "
"Scoped to current session by default."
),
"description": "Virtual file path (e.g. '/documents/report.pdf').",
},
"save_to_path": {
"type": "string",
"description": (
"If provided, save the file to this path in the working "
"directory (cloud sandbox when E2B is active, or "
"ephemeral dir otherwise) so it can be processed with "
"bash_exec or file tools. "
"The file content is still returned in the response."
),
"description": "Copy file to this working directory path for processing.",
},
"force_download_url": {
"type": "boolean",
"description": (
"If true, always return metadata+URL instead of inline content. "
"Default is false (auto-selects based on file size/type)."
),
"description": "Always return metadata+URL instead of inline content.",
},
"offset": {
"type": "integer",
"description": (
"Character offset to start reading from (0-based). "
"Use with 'length' for paginated reads of large files."
),
"description": "Character offset for paginated reads (0-based).",
},
"length": {
"type": "integer",
"description": (
"Maximum number of characters to return. "
"Defaults to full file. Use with 'offset' for paginated reads."
),
"description": "Max characters to return for paginated reads.",
},
},
"required": [], # At least one must be provided
"required": [], # At least one of file_id or path must be provided
}
@property
@@ -653,15 +614,9 @@ class WriteWorkspaceFileTool(BaseTool):
@property
def description(self) -> str:
return (
"Write or create a file in the user's persistent workspace (cloud storage). "
"These files survive across sessions. "
"For ephemeral session files, use the SDK Write tool instead. "
"Provide content as plain text via 'content', OR base64-encoded via "
"'content_base64', OR copy a file from the ephemeral working directory "
"via 'source_path'. Exactly one of these three is required. "
f"Maximum file size is {Config().max_file_size_mb}MB. "
"Files are saved to the current session's folder by default. "
"Use /sessions/<session_id>/... for cross-session access."
"Write a file to persistent workspace (survives across sessions). "
"Provide exactly one of: content (text), content_base64 (binary), "
f"or source_path (copy from working dir). Max {Config().max_file_size_mb}MB."
)
@property
@@ -671,51 +626,31 @@ class WriteWorkspaceFileTool(BaseTool):
"properties": {
"filename": {
"type": "string",
"description": "Name for the file (e.g., 'report.pdf')",
"description": "Filename (e.g. 'report.pdf').",
},
"content": {
"type": "string",
"description": (
"Plain text content to write. Use this for text files "
"(code, configs, documents, etc.). "
"Mutually exclusive with content_base64 and source_path."
),
"description": "Plain text content. Mutually exclusive with content_base64/source_path.",
},
"content_base64": {
"type": "string",
"description": (
"Base64-encoded file content. Use this for binary files "
"(images, PDFs, etc.). "
"Mutually exclusive with content and source_path."
),
"description": "Base64-encoded binary content. Mutually exclusive with content/source_path.",
},
"source_path": {
"type": "string",
"description": (
"Path to a file in the ephemeral working directory to "
"copy to workspace (e.g., '/tmp/copilot-.../output.csv'). "
"Use this to persist files created by bash_exec or SDK Write. "
"Mutually exclusive with content and content_base64."
),
"description": "Working directory path to copy to workspace. Mutually exclusive with content/content_base64.",
},
"path": {
"type": "string",
"description": (
"Optional virtual path where to save the file "
"(e.g., '/documents/report.pdf'). "
"Defaults to '/{filename}'. Scoped to current session."
),
"description": "Virtual path (e.g. '/documents/report.pdf'). Defaults to '/{filename}'.",
},
"mime_type": {
"type": "string",
"description": (
"Optional MIME type of the file. "
"Auto-detected from filename if not provided."
),
"description": "MIME type. Auto-detected from filename if omitted.",
},
"overwrite": {
"type": "boolean",
"description": "Whether to overwrite if file exists at path (default: false)",
"description": "Overwrite if file exists (default: false).",
},
},
"required": ["filename"],
@@ -842,12 +777,7 @@ class DeleteWorkspaceFileTool(BaseTool):
@property
def description(self) -> str:
return (
"Delete a file from the user's persistent workspace (cloud storage). "
"Specify either file_id or path to identify the file. "
"Paths are scoped to the current session by default. "
"Use /sessions/<session_id>/... for cross-session access."
)
return "Delete a file from persistent workspace. Specify file_id or path."
@property
def parameters(self) -> dict[str, Any]:
@@ -856,17 +786,14 @@ class DeleteWorkspaceFileTool(BaseTool):
"properties": {
"file_id": {
"type": "string",
"description": "The file's unique ID (from list_workspace_files)",
"description": "File ID from list_workspace_files.",
},
"path": {
"type": "string",
"description": (
"The virtual file path (e.g., '/documents/report.pdf'). "
"Scoped to current session by default."
),
"description": "Virtual file path.",
},
},
"required": [], # At least one must be provided
"required": [], # At least one of file_id or path must be provided
}
@property

View File

@@ -70,10 +70,6 @@ def _msg_tokens(msg: dict, enc) -> int:
# Count tool result tokens
tool_call_tokens += _tok_len(item.get("tool_use_id", ""), enc)
tool_call_tokens += _tok_len(item.get("content", ""), enc)
elif isinstance(item, dict) and item.get("type") == "text":
# Count text block tokens (standard: "text" key, fallback: "content")
text_val = item.get("text") or item.get("content", "")
tool_call_tokens += _tok_len(text_val, enc)
elif isinstance(item, dict) and "content" in item:
# Other content types with content field
tool_call_tokens += _tok_len(item.get("content", ""), enc)
@@ -149,16 +145,10 @@ def _truncate_middle_tokens(text: str, enc, max_tok: int) -> str:
if len(ids) <= max_tok:
return text # nothing to do
# Need at least 3 tokens (head + ellipsis + tail) for meaningful truncation
if max_tok < 1:
return ""
mid = enc.encode("")
if max_tok < 3:
return enc.decode(ids[:max_tok])
# Split the allowance between the two ends:
head = max_tok // 2 - 1 # -1 for the ellipsis
tail = max_tok - head - 1
mid = enc.encode("")
return enc.decode(ids[:head] + mid + ids[-tail:])
@@ -555,14 +545,6 @@ async def _summarize_messages_llm(
"- Actions taken and key decisions made\n"
"- Technical specifics (file names, tool outputs, function signatures)\n"
"- Errors encountered and resolutions applied\n\n"
"IMPORTANT: Preserve all concrete references verbatim — these are small but "
"critical for continuing the conversation:\n"
"- File paths and directory paths (e.g. /src/app/page.tsx, ./output/result.csv)\n"
"- Image/media file paths from tool outputs\n"
"- URLs, API endpoints, and webhook addresses\n"
"- Resource IDs, session IDs, and identifiers\n"
"- Tool names that were called and their key parameters\n"
"- Environment variables, config keys, and credentials names (not values)\n\n"
"Include ONLY the sections below that have relevant content "
"(skip sections with nothing to report):\n\n"
"## 1. Primary Request and Intent\n"
@@ -570,8 +552,7 @@ async def _summarize_messages_llm(
"## 2. Key Technical Concepts\n"
"Technologies, frameworks, tools, and patterns being used or discussed.\n\n"
"## 3. Files and Resources Involved\n"
"Specific files examined or modified, with relevant snippets and identifiers. "
"Include exact file paths, image paths from tool outputs, and resource URLs.\n\n"
"Specific files examined or modified, with relevant snippets and identifiers.\n\n"
"## 4. Errors and Fixes\n"
"Problems encountered, error messages, and their resolutions.\n\n"
"## 5. All User Messages\n"
@@ -585,7 +566,7 @@ async def _summarize_messages_llm(
},
{"role": "user", "content": f"Summarize:\n\n{conversation_text}"},
],
max_tokens=2000,
max_tokens=1500,
temperature=0.3,
)
@@ -705,15 +686,11 @@ async def compress_context(
msgs = [summary_msg] + recent_msgs
logger.info(
"Context summarized: %d -> %d tokens, summarized %d messages",
original_count,
total_tokens(),
messages_summarized,
f"Context summarized: {original_count} -> {total_tokens()} tokens, "
f"summarized {messages_summarized} messages"
)
except Exception as e:
logger.warning(
"Summarization failed, continuing with truncation: %s", e
)
logger.warning(f"Summarization failed, continuing with truncation: {e}")
# Fall through to content truncation
# ---- STEP 2: Normalize content ----------------------------------------