fix(backend/copilot): prompt-too-long retry, compaction churn, model-aware compression, and truncated tool call recovery (#12625)

## Why

CoPilot has several context management issues that degrade long
sessions:
1. "Prompt is too long" errors crash the session instead of triggering
retry/compaction
2. Stale thinking blocks bloat transcripts, causing unnecessary
compaction every turn
3. Compression target is hardcoded regardless of model context window
size
4. Truncated tool calls (empty `{}` args from max_tokens) kill the
session instead of guiding the model to self-correct

## What

**Fix 1: Prompt-too-long retry bypass (SENTRY-1207)**
The SDK surfaces "prompt too long" via `AssistantMessage.error` and
`ResultMessage.result` — neither triggered the retry/compaction loop
(only Python exceptions did). Now both paths are intercepted and
re-raised.

**Fix 2: Strip stale thinking blocks before upload**
Thinking/redacted_thinking blocks in non-last assistant entries are
10-50K tokens each but only needed for API signature verification in the
*last* message. Stripping before upload reduces transcript size and
prevents per-turn compaction.

**Fix 3: Model-aware compression target**
`compress_context()` now computes `target_tokens` from the model's
context window (e.g. 140K for Opus 200K) instead of a hardcoded 120K
default. Larger models retain more history; smaller models compress more
aggressively.

**Fix 4: Self-correcting truncated tool calls**
When the model's response exceeds max_tokens, tool call inputs get
silently truncated to `{}`. Previously this tripped a circuit breaker
after 3 attempts. Now the MCP wrapper detects empty args and returns
guidance: "write in chunks with `cat >>`, pass via
`@@agptfile:filename`". The model can self-correct instead of the
session dying.

## How

- **service.py**: `_is_prompt_too_long` checks in both
`AssistantMessage.error` and `ResultMessage` error handlers. Circuit
breaker limit raised from 3→5.
- **transcript.py**: `strip_stale_thinking_blocks()` reverse-scans for
last assistant `message.id`, strips thinking blocks from all others.
Called in `upload_transcript()`.
- **prompt.py**: `get_compression_target(model)` computes
`context_window - 60K overhead`. `compress_context()` uses it when
`target_tokens` is None.
- **tool_adapter.py**: `_truncating` wrapper intercepts empty args on
tools with required params, returns actionable guidance instead of
failing.

## Related

- Fixes SENTRY-1207
- Sessions: `d2f7cba3` (repeated compaction), `08b807d4` (prompt too
long), `130d527c` (truncated tool calls)
- Extends #12413, consolidates #12626

## Test plan

- [x] 6 unit tests for `strip_stale_thinking_blocks`
- [x] 1 integration test for ResultMessage prompt-too-long → compaction
retry
- [x] Pyright clean (0 errors), all pre-commit hooks pass
- [ ] E2E: Load transcripts from affected sessions and verify behavior
This commit is contained in:
Zamil Majdy
2026-04-01 17:10:57 +02:00
committed by GitHub
parent 8aae7751dc
commit 24d0c35ed3
11 changed files with 815 additions and 35 deletions

View File

@@ -38,7 +38,7 @@ class TestFlattenAssistantContent:
def test_tool_use_blocks(self):
blocks = [{"type": "tool_use", "name": "read_file", "input": {}}]
assert _flatten_assistant_content(blocks) == "[tool_use: read_file]"
assert _flatten_assistant_content(blocks) == ""
def test_mixed_blocks(self):
blocks = [
@@ -47,19 +47,22 @@ class TestFlattenAssistantContent:
]
result = _flatten_assistant_content(blocks)
assert "Let me read that." in result
assert "[tool_use: Read]" in result
# tool_use blocks are dropped entirely to prevent model mimicry
assert "Read" not in result
def test_raw_strings(self):
assert _flatten_assistant_content(["hello", "world"]) == "hello\nworld"
def test_unknown_block_type_preserved_as_placeholder(self):
def test_unknown_block_type_dropped(self):
blocks = [
{"type": "text", "text": "See this image:"},
{"type": "image", "source": {"type": "base64", "data": "..."}},
]
result = _flatten_assistant_content(blocks)
assert "See this image:" in result
assert "[__image__]" in result
# Unknown block types are dropped to prevent model mimicry
assert "[__image__]" not in result
assert "base64" not in result
def test_empty(self):
assert _flatten_assistant_content([]) == ""
@@ -279,7 +282,8 @@ class TestTranscriptToMessages:
messages = _transcript_to_messages(content)
assert len(messages) == 2
assert "Let me check." in messages[0]["content"]
assert "[tool_use: read_file]" in messages[0]["content"]
# tool_use blocks are dropped entirely to prevent model mimicry
assert "read_file" not in messages[0]["content"]
assert messages[1]["content"] == "file contents"

View File

@@ -49,22 +49,22 @@ def test_format_assistant_tool_calls():
)
]
result = _format_conversation_context(msgs)
assert result is not None
assert 'You called tool: search({"q": "test"})' in result
# Assistant with no content and tool_calls omitted produces no lines
assert result is None
def test_format_tool_result():
msgs = [ChatMessage(role="tool", content='{"result": "ok"}')]
result = _format_conversation_context(msgs)
assert result is not None
assert 'Tool result: {"result": "ok"}' in result
assert 'Tool output: {"result": "ok"}' in result
def test_format_tool_result_none_content():
msgs = [ChatMessage(role="tool", content=None)]
result = _format_conversation_context(msgs)
assert result is not None
assert "Tool result: " in result
assert "Tool output: " in result
def test_format_full_conversation():
@@ -84,8 +84,8 @@ def test_format_full_conversation():
assert result is not None
assert "User: find agents" in result
assert "You responded: I'll search for agents." in result
assert "You called tool: find_agents" in result
assert "Tool result:" in result
# tool_calls are omitted to prevent model mimicry
assert "Tool output:" in result
assert "You responded: Found Agent1." in result

View File

@@ -904,14 +904,14 @@ class TestTranscriptEdgeCases:
assert restored[1]["content"] == "Second"
def test_flatten_assistant_with_only_tool_use(self):
"""Assistant message with only tool_use blocks (no text)."""
"""Assistant message with only tool_use blocks (no text) flattens to empty."""
blocks = [
{"type": "tool_use", "name": "bash", "input": {"cmd": "ls"}},
{"type": "tool_use", "name": "read", "input": {"path": "/f"}},
]
result = _flatten_assistant_content(blocks)
assert "[tool_use: bash]" in result
assert "[tool_use: read]" in result
# tool_use blocks are dropped entirely to prevent model mimicry
assert result == ""
def test_flatten_tool_result_nested_image(self):
"""Tool result containing image blocks uses placeholder."""
@@ -1414,3 +1414,76 @@ class TestStreamChatCompletionRetryIntegration:
# Verify user-friendly message (not raw SDK text)
assert "Authentication" in errors[0].errorText
assert any(isinstance(e, StreamStart) for e in events)
@pytest.mark.asyncio
async def test_result_message_prompt_too_long_triggers_compaction(self):
"""CLI returns ResultMessage(subtype="error") with "Prompt is too long".
When the Claude CLI rejects the prompt pre-API (model=<synthetic>,
duration_api_ms=0), it sends a ResultMessage with is_error=True
instead of raising a Python exception. The retry loop must still
detect this as a context-length error and trigger compaction.
"""
import contextlib
from claude_agent_sdk import ResultMessage
from backend.copilot.response_model import StreamError, StreamStart
from backend.copilot.sdk.service import stream_chat_completion_sdk
session = self._make_session()
success_result = self._make_result_message()
attempt_count = [0]
error_result = ResultMessage(
subtype="error",
result="Prompt is too long",
duration_ms=100,
duration_api_ms=0,
is_error=True,
num_turns=0,
session_id="test-session-id",
)
def _client_factory(*args, **kwargs):
attempt_count[0] += 1
if attempt_count[0] == 1:
# First attempt: CLI returns error ResultMessage
return self._make_client_mock(result_message=error_result)
# Second attempt (after compaction): succeeds
return self._make_client_mock(result_message=success_result)
original_transcript = _build_transcript(
[("user", "prior question"), ("assistant", "prior answer")]
)
compacted_transcript = _build_transcript(
[("user", "[summary]"), ("assistant", "summary reply")]
)
patches = _make_sdk_patches(
session,
original_transcript=original_transcript,
compacted_transcript=compacted_transcript,
client_side_effect=_client_factory,
)
events = []
with contextlib.ExitStack() as stack:
for target, kwargs in patches:
stack.enter_context(patch(target, **kwargs))
async for event in stream_chat_completion_sdk(
session_id="test-session-id",
message="hello",
is_user_message=True,
user_id="test-user",
session=session,
):
events.append(event)
assert attempt_count[0] == 2, (
f"Expected 2 SDK attempts (CLI error ResultMessage "
f"should trigger compaction retry), got {attempt_count[0]}"
)
errors = [e for e in events if isinstance(e, StreamError)]
assert not errors, f"Unexpected StreamError: {errors}"
assert any(isinstance(e, StreamStart) for e in events)

View File

@@ -59,11 +59,14 @@ from ..response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamFinishStep,
StreamHeartbeat,
StreamStart,
StreamStartStep,
StreamStatus,
StreamTextDelta,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
StreamUsage,
)
@@ -113,9 +116,10 @@ _MAX_STREAM_ATTEMPTS = 3
# Hard circuit breaker: abort the stream if the model sends this many
# consecutive tool calls with empty parameters (a sign of context
# saturation or serialization failure). Empty input ({}) is never
# legitimate — even one is suspicious, three is conclusive.
_EMPTY_TOOL_CALL_LIMIT = 3
# saturation or serialization failure). The MCP wrapper now returns
# guidance on the first empty call, giving the model a chance to
# self-correct. The limit is generous to allow recovery attempts.
_EMPTY_TOOL_CALL_LIMIT = 5
# User-facing error shown when the empty-tool-call circuit breaker trips.
_CIRCUIT_BREAKER_ERROR_MSG = (
@@ -744,15 +748,11 @@ def _format_conversation_context(messages: list[ChatMessage]) -> str | None:
elif msg.role == "assistant":
if msg.content:
lines.append(f"You responded: {msg.content}")
if msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
tool_name = func.get("name", "unknown")
tool_args = func.get("arguments", "")
lines.append(f"You called tool: {tool_name}({tool_args})")
# Omit tool_calls — any text representation gets mimicked
# by the model. Tool results below provide the context.
elif msg.role == "tool":
content = msg.content or ""
lines.append(f"Tool result: {content}")
lines.append(f"Tool output: {content[:500]}")
if not lines:
return None
@@ -1306,6 +1306,21 @@ async def _run_stream_attempt(
error_preview,
)
# Intercept prompt-too-long errors surfaced as
# AssistantMessage.error (not as a Python exception).
# Re-raise so the outer retry loop can compact the
# transcript and retry with reduced context.
# Only check error_text (the error field), not the
# content preview — content may contain arbitrary text
# that false-positives the pattern match.
if _is_prompt_too_long(Exception(error_text)):
logger.warning(
"%s Prompt-too-long detected via AssistantMessage "
"error — raising for retry",
ctx.log_prefix,
)
raise RuntimeError("Prompt is too long")
# Intercept transient API errors (socket closed,
# ECONNRESET) — replace the raw message with a
# user-friendly error text and use the retryable
@@ -1399,6 +1414,13 @@ async def _run_stream_attempt(
ctx.log_prefix,
sdk_msg.result or "(no error message provided)",
)
# If the CLI itself rejected the prompt as too long
# (pre-API check, duration_api_ms=0), re-raise as an
# exception so the retry loop can trigger compaction.
# Without this, the ResultMessage is silently consumed
# and the retry/compaction mechanism is never invoked.
if _is_prompt_too_long(RuntimeError(sdk_msg.result or "")):
raise RuntimeError("Prompt is too long")
# Capture token usage from ResultMessage.
# Anthropic reports cached tokens separately:
@@ -2031,7 +2053,20 @@ async def stream_chat_completion_sdk(
try:
async for event in _run_stream_attempt(stream_ctx, state):
if not isinstance(event, StreamHeartbeat):
if not isinstance(
event,
(
StreamHeartbeat,
# Compaction UI events are cosmetic and must not
# block retry — they're emitted before the SDK
# query on compacted attempts.
StreamStartStep,
StreamFinishStep,
StreamToolInputStart,
StreamToolInputAvailable,
StreamToolOutputAvailable,
),
):
events_yielded += 1
yield event
break # Stream completed — exit retry loop

View File

@@ -392,7 +392,7 @@ class TestFlattenThinkingBlocks:
assert result == ""
def test_mixed_thinking_text_tool(self):
"""Mixed blocks: only text and tool_use survive flattening."""
"""Mixed blocks: only text survives flattening; thinking and tool_use dropped."""
blocks = [
{"type": "thinking", "thinking": "hmm", "signature": "sig"},
{"type": "redacted_thinking", "data": "xyz"},
@@ -403,7 +403,8 @@ class TestFlattenThinkingBlocks:
assert "hmm" not in result
assert "xyz" not in result
assert "I'll read the file." in result
assert "[tool_use: Read]" in result
# tool_use blocks are dropped entirely to prevent model mimicry
assert "Read" not in result
# ---------------------------------------------------------------------------

View File

@@ -466,6 +466,28 @@ def create_copilot_mcp_server(*, use_e2b: bool = False):
Applied once to every registered tool."""
async def wrapper(args: dict[str, Any]) -> dict[str, Any]:
# Empty tool args = model's output was truncated by the API's
# max_tokens limit. Instead of letting the tool fail with a
# confusing error (and eventually tripping the circuit breaker),
# return clear guidance so the model can self-correct.
if not args and input_schema and input_schema.get("required"):
logger.warning(
"[MCP] %s called with empty args (likely output "
"token truncation) — returning guidance",
tool_name,
)
return _mcp_error(
f"Your call to {tool_name} had empty arguments — "
f"this means your previous response was too long and "
f"the tool call input was truncated by the API. "
f"To fix this: break your work into smaller steps. "
f"For large content, first write it to a file using "
f"bash_exec with cat >> (append section by section), "
f"then pass it via @@agptfile:filename reference. "
f"Do NOT retry with the same approach — it will "
f"be truncated again."
)
# Circuit breaker: stop infinite retry loops with identical args.
# Use the original (pre-expansion) args for fingerprinting so
# check and record always use the same key — @@agptfile:

View File

@@ -43,6 +43,10 @@ STRIPPABLE_TYPES = frozenset(
{"progress", "file-history-snapshot", "queue-operation", "summary", "pr-link"}
)
# Thinking block types that can be stripped from non-last assistant entries.
# The Anthropic API only requires these in the *last* assistant message.
_THINKING_BLOCK_TYPES = frozenset({"thinking", "redacted_thinking"})
@dataclass
class TranscriptDownload:
@@ -450,6 +454,83 @@ def _build_meta_storage_path(user_id: str, session_id: str, backend: object) ->
)
def strip_stale_thinking_blocks(content: str) -> str:
"""Remove thinking/redacted_thinking blocks from non-last assistant entries.
The Anthropic API only requires thinking blocks in the **last** assistant
message to be value-identical to the original response. Older assistant
entries carry stale thinking blocks that consume significant tokens
(often 10-50K each) without providing useful context for ``--resume``.
Stripping them before upload prevents the CLI from triggering compaction
every turn just to compress away the stale thinking bloat.
"""
lines = content.strip().split("\n")
if not lines:
return content
parsed: list[tuple[str, dict | None]] = []
for line in lines:
parsed.append((line, json.loads(line, fallback=None)))
# Reverse scan to find the last assistant message ID and index.
last_asst_msg_id: str | None = None
last_asst_idx: int | None = None
for i in range(len(parsed) - 1, -1, -1):
_line, entry = parsed[i]
if not isinstance(entry, dict):
continue
msg = entry.get("message", {})
if msg.get("role") == "assistant":
last_asst_msg_id = msg.get("id")
last_asst_idx = i
break
if last_asst_idx is None:
return content
result_lines: list[str] = []
stripped_count = 0
for i, (line, entry) in enumerate(parsed):
if not isinstance(entry, dict):
result_lines.append(line)
continue
msg = entry.get("message", {})
# Only strip from assistant entries that are NOT the last turn.
# Use msg_id matching when available; fall back to index for entries
# without an id field.
is_last_turn = (
last_asst_msg_id is not None and msg.get("id") == last_asst_msg_id
) or (last_asst_msg_id is None and i == last_asst_idx)
if (
msg.get("role") == "assistant"
and not is_last_turn
and isinstance(msg.get("content"), list)
):
content_blocks = msg["content"]
filtered = [
b
for b in content_blocks
if not (isinstance(b, dict) and b.get("type") in _THINKING_BLOCK_TYPES)
]
if len(filtered) < len(content_blocks):
stripped_count += len(content_blocks) - len(filtered)
entry = {**entry, "message": {**msg, "content": filtered}}
result_lines.append(json.dumps(entry, separators=(",", ":")))
continue
result_lines.append(line)
if stripped_count:
logger.info(
"[Transcript] Stripped %d stale thinking block(s) from non-last entries",
stripped_count,
)
return "\n".join(result_lines) + "\n"
async def upload_transcript(
user_id: str,
session_id: str,
@@ -472,6 +553,9 @@ async def upload_transcript(
# Strip metadata entries (progress, file-history-snapshot, etc.)
# Note: SDK-built transcripts shouldn't have these, but strip for safety
stripped = strip_progress_entries(content)
# Strip stale thinking blocks from older assistant entries — these consume
# significant tokens and trigger unnecessary CLI compaction every turn.
stripped = strip_stale_thinking_blocks(stripped)
if not validate_transcript(stripped):
# Log entry types for debugging — helps identify why validation failed
entry_types = [
@@ -605,9 +689,6 @@ COMPACT_MSG_ID_PREFIX = "msg_compact_"
ENTRY_TYPE_MESSAGE = "message"
_THINKING_BLOCK_TYPES = frozenset({"thinking", "redacted_thinking"})
def _flatten_assistant_content(blocks: list) -> str:
"""Flatten assistant content blocks into a single plain-text string.
@@ -633,11 +714,14 @@ def _flatten_assistant_content(blocks: list) -> str:
if btype == "text":
parts.append(block.get("text", ""))
elif btype == "tool_use":
parts.append(f"[tool_use: {block.get('name', '?')}]")
# Drop tool_use entirely — any text representation gets
# mimicked by the model as plain text instead of actual
# structured tool calls. The tool results (in the
# following user/tool_result entry) provide sufficient
# context about what happened.
continue
else:
# Preserve non-text blocks (e.g. image) as placeholders.
# Use __prefix__ to distinguish from literal user text.
parts.append(f"[__{btype}__]")
continue
elif isinstance(block, str):
parts.append(block)
return "\n".join(parts) if parts else ""

View File

@@ -13,6 +13,7 @@ from .transcript import (
delete_transcript,
read_compacted_entries,
strip_progress_entries,
strip_stale_thinking_blocks,
validate_transcript,
write_transcript_to_tempfile,
)
@@ -1200,3 +1201,170 @@ class TestCleanupStaleProjectDirs:
removed = cleanup_stale_project_dirs(encoded_cwd="some-other-project")
assert removed == 0
assert non_copilot.exists()
# ---------------------------------------------------------------------------
# strip_stale_thinking_blocks
# ---------------------------------------------------------------------------
class TestStripStaleThinkingBlocks:
"""Tests for strip_stale_thinking_blocks — removes thinking/redacted_thinking
blocks from non-last assistant entries to reduce transcript bloat."""
def _asst_entry(
self, msg_id: str, content: list, uuid: str = "u1", parent: str = ""
) -> dict:
return {
"type": "assistant",
"uuid": uuid,
"parentUuid": parent,
"message": {
"role": "assistant",
"id": msg_id,
"type": "message",
"content": content,
},
}
def _user_entry(self, text: str, uuid: str = "u0", parent: str = "") -> dict:
return {
"type": "user",
"uuid": uuid,
"parentUuid": parent,
"message": {"role": "user", "content": text},
}
def test_strips_thinking_from_older_assistant(self) -> None:
"""Thinking blocks in non-last assistant entries should be removed."""
old_asst = self._asst_entry(
"msg_old",
[
{"type": "thinking", "thinking": "deep thoughts..."},
{"type": "text", "text": "hello"},
{"type": "redacted_thinking", "data": "secret"},
],
uuid="a1",
)
new_asst = self._asst_entry(
"msg_new",
[
{"type": "thinking", "thinking": "latest thoughts"},
{"type": "text", "text": "world"},
],
uuid="a2",
parent="a1",
)
content = _make_jsonl(old_asst, new_asst)
result = strip_stale_thinking_blocks(content)
lines = [json.loads(ln) for ln in result.strip().split("\n")]
# Old assistant should have thinking blocks stripped
old_content = lines[0]["message"]["content"]
assert len(old_content) == 1
assert old_content[0]["type"] == "text"
# New (last) assistant should be untouched
new_content = lines[1]["message"]["content"]
assert len(new_content) == 2
assert new_content[0]["type"] == "thinking"
assert new_content[1]["type"] == "text"
def test_preserves_last_assistant_thinking(self) -> None:
"""The last assistant entry's thinking blocks must be preserved."""
entry = self._asst_entry(
"msg_only",
[
{"type": "thinking", "thinking": "must keep"},
{"type": "text", "text": "response"},
],
)
content = _make_jsonl(entry)
result = strip_stale_thinking_blocks(content)
lines = [json.loads(ln) for ln in result.strip().split("\n")]
assert len(lines[0]["message"]["content"]) == 2
def test_no_assistant_entries_returns_unchanged(self) -> None:
"""Transcripts with only user entries should pass through unchanged."""
user = self._user_entry("hello")
content = _make_jsonl(user)
assert strip_stale_thinking_blocks(content) == content
def test_empty_content_returns_unchanged(self) -> None:
assert strip_stale_thinking_blocks("") == ""
def test_multiple_turns_strips_all_but_last(self) -> None:
"""With 3 assistant turns, only the last keeps thinking blocks."""
entries = [
self._asst_entry(
"msg_1",
[
{"type": "thinking", "thinking": "t1"},
{"type": "text", "text": "a1"},
],
uuid="a1",
),
self._user_entry("q2", uuid="u2", parent="a1"),
self._asst_entry(
"msg_2",
[
{"type": "thinking", "thinking": "t2"},
{"type": "text", "text": "a2"},
],
uuid="a2",
parent="u2",
),
self._user_entry("q3", uuid="u3", parent="a2"),
self._asst_entry(
"msg_3",
[
{"type": "thinking", "thinking": "t3"},
{"type": "text", "text": "a3"},
],
uuid="a3",
parent="u3",
),
]
content = _make_jsonl(*entries)
result = strip_stale_thinking_blocks(content)
lines = [json.loads(ln) for ln in result.strip().split("\n")]
# msg_1: thinking stripped
assert len(lines[0]["message"]["content"]) == 1
assert lines[0]["message"]["content"][0]["type"] == "text"
# msg_2: thinking stripped
assert len(lines[2]["message"]["content"]) == 1
# msg_3 (last): thinking preserved
assert len(lines[4]["message"]["content"]) == 2
assert lines[4]["message"]["content"][0]["type"] == "thinking"
def test_same_msg_id_multi_entry_turn(self) -> None:
"""Multiple entries sharing the same message.id (same turn) are preserved."""
entries = [
self._asst_entry(
"msg_old",
[{"type": "thinking", "thinking": "old"}],
uuid="a1",
),
self._asst_entry(
"msg_last",
[{"type": "thinking", "thinking": "t_part1"}],
uuid="a2",
parent="a1",
),
self._asst_entry(
"msg_last",
[{"type": "text", "text": "response"}],
uuid="a3",
parent="a2",
),
]
content = _make_jsonl(*entries)
result = strip_stale_thinking_blocks(content)
lines = [json.loads(ln) for ln in result.strip().split("\n")]
# Old entry stripped
assert lines[0]["message"]["content"] == []
# Both entries of last turn (msg_last) preserved
assert lines[1]["message"]["content"][0]["type"] == "thinking"
assert lines[2]["message"]["content"][0]["type"] == "text"

View File

@@ -251,6 +251,50 @@ def estimate_token_count_str(
DEFAULT_TOKEN_THRESHOLD = 120_000
DEFAULT_KEEP_RECENT = 15
# Reserve tokens for system prompt, tool definitions, and per-turn overhead.
# The actual model context limit minus this reserve = compression target.
_CONTEXT_OVERHEAD_RESERVE = 60_000
def get_context_window(model: str) -> int | None:
"""Return the context window size for a model, or None if unknown.
Looks up the model in the :class:`LlmModel` enum (which already
carries ``context_window`` via ``MODEL_METADATA``). Handles
provider-prefixed names (``anthropic/claude-opus-4-6``) and
case-insensitive input automatically.
"""
from backend.blocks.llm import LlmModel # lazy to avoid circular import
try:
llm_model = LlmModel(model)
return llm_model.context_window
except (ValueError, KeyError):
pass
# Retry with lowercase for case-insensitive lookup
try:
llm_model = LlmModel(model.lower())
return llm_model.context_window
except (ValueError, KeyError):
return None
def get_compression_target(model: str) -> int:
"""Compute a model-aware compression target for conversation history.
Returns ``context_window - overhead_reserve``, floored at 10K.
Falls back to ``DEFAULT_TOKEN_THRESHOLD`` for unknown models or
models whose context window is too small for the overhead reserve.
"""
window = get_context_window(model)
if window is None:
return DEFAULT_TOKEN_THRESHOLD
target = window - _CONTEXT_OVERHEAD_RESERVE
if target < 10_000:
return DEFAULT_TOKEN_THRESHOLD
return target
@dataclass
class CompressResult:
@@ -660,7 +704,7 @@ async def _summarize_messages_llm(
async def compress_context(
messages: list[dict],
target_tokens: int = DEFAULT_TOKEN_THRESHOLD,
target_tokens: int | None = None,
*,
model: str = "gpt-4o",
client: AsyncOpenAI | None = None,
@@ -672,6 +716,11 @@ async def compress_context(
"""
Unified context compression that combines summarization and truncation strategies.
When ``target_tokens`` is None (the default), it is computed from the
model's context window via ``get_compression_target(model)``. This
ensures large-context models (e.g. Opus 200K) retain more history
while smaller models compress more aggressively.
Strategy (in order):
1. **LLM summarization** If client provided, summarize old messages into a
single context message while keeping recent messages intact. This is the
@@ -699,6 +748,10 @@ async def compress_context(
-------
CompressResult with compressed messages and metadata.
"""
# Resolve model-aware target when caller doesn't specify an explicit limit.
if target_tokens is None:
target_tokens = get_compression_target(model)
# Guard clause for empty messages
if not messages:
return CompressResult(

View File

@@ -7,6 +7,7 @@ from tiktoken import encoding_for_model
from backend.util import json
from backend.util.prompt import (
DEFAULT_TOKEN_THRESHOLD,
CompressResult,
_ensure_tool_pairs_intact,
_msg_tokens,
@@ -15,6 +16,8 @@ from backend.util.prompt import (
_truncate_tool_message_content,
compress_context,
estimate_token_count,
get_compression_target,
get_context_window,
)
@@ -974,3 +977,43 @@ class TestCompressResultDataclass:
assert result.original_token_count == 500
assert result.messages_summarized == 10
assert result.messages_dropped == 5
class TestGetContextWindow:
def test_claude_opus(self) -> None:
assert get_context_window("claude-opus-4-20250514") == 200_000
def test_claude_sonnet(self) -> None:
assert get_context_window("claude-sonnet-4-20250514") == 200_000
def test_openrouter_prefix(self) -> None:
assert get_context_window("anthropic/claude-opus-4-6") == 200_000
def test_version_suffix(self) -> None:
assert get_context_window("claude-opus-4-6") == 200_000
def test_gpt4o(self) -> None:
assert get_context_window("gpt-4o") == 128_000
def test_unknown_model(self) -> None:
assert get_context_window("some-unknown-model") is None
def test_case_insensitive(self) -> None:
assert get_context_window("GPT-4o") == 128_000
class TestGetCompressionTarget:
def test_claude_opus_200k(self) -> None:
target = get_compression_target("anthropic/claude-opus-4-6")
assert target == 140_000 # 200K - 60K overhead
def test_gpt4o_128k(self) -> None:
target = get_compression_target("gpt-4o")
assert target == 68_000 # 128K - 60K overhead
def test_unknown_model_returns_default(self) -> None:
assert get_compression_target("unknown-model") == DEFAULT_TOKEN_THRESHOLD
def test_small_model_returns_default(self) -> None:
# Unknown models fall back to DEFAULT_TOKEN_THRESHOLD
assert get_compression_target("some-tiny-model") == DEFAULT_TOKEN_THRESHOLD

View File

@@ -0,0 +1,297 @@
#!/usr/bin/env python3
"""Download CoPilot transcripts from prod GCS and load into local dev environment.
Usage:
# Step 1: Download from prod GCS (needs MEDIA_GCS_BUCKET_NAME + gcloud auth)
MEDIA_GCS_BUCKET_NAME=<prod-bucket> USER_ID=<user-uuid> \
poetry run python scripts/download_transcripts.py download <session_id> ...
# Step 2: Load downloaded transcripts into local storage + DB
poetry run python scripts/download_transcripts.py load <session_id> ...
# Or do both in one step (if you have GCS access):
MEDIA_GCS_BUCKET_NAME=<prod-bucket> USER_ID=<user-uuid> \
poetry run python scripts/download_transcripts.py both <session_id> ...
The "download" step saves transcripts to transcripts/<session_id>.jsonl.
The "load" step reads those files and:
1. Creates a ChatSession in local DB (or reuses existing)
2. Populates messages from the transcript
3. Stores transcript in local workspace storage
4. Creates metadata so --resume works on the next turn
After "load", you can send a message to the session via the CoPilot UI
and it will use --resume with the loaded transcript.
"""
from __future__ import annotations
import asyncio
import json
import os
import re
import sys
import time
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
_SAFE_RE = re.compile(r"[^0-9a-fA-F-]")
TRANSCRIPTS_DIR = os.path.join(os.path.dirname(__file__), "..", "transcripts")
def _sanitize(raw: str) -> str:
cleaned = _SAFE_RE.sub("", raw or "")[:36]
if not cleaned:
raise ValueError(f"Invalid ID: {raw!r}")
return cleaned
def _transcript_path(session_id: str) -> str:
return os.path.join(TRANSCRIPTS_DIR, f"{_sanitize(session_id)}.jsonl")
def _meta_path(session_id: str) -> str:
return os.path.join(TRANSCRIPTS_DIR, f"{_sanitize(session_id)}.meta.json")
# ── Download from GCS ─────────────────────────────────────────────────────
async def cmd_download(session_ids: list[str]) -> None:
"""Download transcripts from prod GCS to transcripts/ directory."""
from backend.copilot.sdk.transcript import download_transcript
user_id = os.environ.get("USER_ID", "")
if not user_id:
print("ERROR: Set USER_ID env var to the session owner's user ID.")
print(" You can find it in Sentry breadcrumbs or the DB.")
sys.exit(1)
bucket = os.environ.get("MEDIA_GCS_BUCKET_NAME", "")
if not bucket:
print("ERROR: Set MEDIA_GCS_BUCKET_NAME to the prod GCS bucket.")
sys.exit(1)
os.makedirs(TRANSCRIPTS_DIR, exist_ok=True)
print(f"Downloading from GCS bucket: {bucket}")
print(f"User ID: {user_id}\n")
for sid in session_ids:
print(f"[{sid[:12]}] Downloading...")
try:
dl = await download_transcript(user_id, sid)
except Exception as e:
print(f"[{sid[:12]}] Failed: {e}")
continue
if not dl or not dl.content:
print(f"[{sid[:12]}] Not found in GCS")
continue
out = _transcript_path(sid)
with open(out, "w") as f:
f.write(dl.content)
lines = len(dl.content.strip().split("\n"))
meta = {
"session_id": sid,
"user_id": user_id,
"message_count": dl.message_count,
"uploaded_at": dl.uploaded_at,
"transcript_bytes": len(dl.content),
"transcript_lines": lines,
}
with open(_meta_path(sid), "w") as f:
json.dump(meta, f, indent=2)
print(
f"[{sid[:12]}] Saved: {lines} entries, "
f"{len(dl.content)} bytes, msg_count={dl.message_count}"
)
print("\nDone. Run 'load' command to import into local dev environment.")
# ── Load into local dev ───────────────────────────────────────────────────
def _parse_messages_from_transcript(content: str) -> list[dict]:
"""Extract user/assistant messages from JSONL transcript for DB seeding."""
messages: list[dict] = []
for line in content.strip().split("\n"):
if not line.strip():
continue
try:
entry = json.loads(line)
except json.JSONDecodeError:
continue
if not isinstance(entry, dict):
continue
msg = entry.get("message", {})
role = msg.get("role")
if role not in ("user", "assistant"):
continue
content_blocks = msg.get("content", "")
if isinstance(content_blocks, list):
# Flatten content blocks to text
text_parts = []
for block in content_blocks:
if isinstance(block, dict):
if block.get("type") == "text":
text_parts.append(block.get("text", ""))
elif isinstance(block, str):
text_parts.append(block)
text = "\n".join(text_parts)
elif isinstance(content_blocks, str):
text = content_blocks
else:
text = ""
if text:
messages.append({"role": role, "content": text})
return messages
async def cmd_load(session_ids: list[str]) -> None:
"""Load downloaded transcripts into local workspace storage + DB."""
from backend.copilot.sdk.transcript import upload_transcript
# Use the user_id from meta file or env var
default_user_id = os.environ.get("USER_ID", "")
for sid in session_ids:
transcript_file = _transcript_path(sid)
meta_file = _meta_path(sid)
if not os.path.exists(transcript_file):
print(f"[{sid[:12]}] No transcript file at {transcript_file}")
print(" Run 'download' first, or place the file manually.")
continue
with open(transcript_file) as f:
content = f.read()
# Load meta if available
user_id = default_user_id
msg_count = 0
if os.path.exists(meta_file):
with open(meta_file) as f:
meta = json.load(f)
user_id = meta.get("user_id", user_id)
msg_count = meta.get("message_count", 0)
if not user_id:
print(f"[{sid[:12]}] No user_id — set USER_ID env var or download first")
continue
lines = len(content.strip().split("\n"))
print(f"[{sid[:12]}] Loading transcript: {lines} entries, {len(content)} bytes")
# Parse messages from transcript for DB
messages = _parse_messages_from_transcript(content)
if not msg_count:
msg_count = len(messages)
print(f"[{sid[:12]}] Parsed {len(messages)} messages for DB")
# Create chat session in DB
try:
from backend.copilot.db import create_chat_session, get_chat_session
existing = await get_chat_session(sid)
if existing:
print(f"[{sid[:12]}] Session already exists in DB, skipping creation")
else:
await create_chat_session(sid, user_id)
print(f"[{sid[:12]}] Created ChatSession in DB")
except Exception as e:
print(f"[{sid[:12]}] DB session creation failed: {e}")
print(" You may need to create it manually or run with DB access.")
# Add messages to DB
if messages:
try:
from backend.copilot.db import add_chat_messages_batch
msg_dicts = [
{"role": m["role"], "content": m["content"]} for m in messages
]
await add_chat_messages_batch(sid, msg_dicts, start_sequence=0)
print(f"[{sid[:12]}] Added {len(messages)} messages to DB")
except Exception as e:
print(f"[{sid[:12]}] Message insertion failed: {e}")
print(" (Session may already have messages)")
# Store transcript in local workspace storage
try:
await upload_transcript(
user_id=user_id,
session_id=sid,
content=content,
message_count=msg_count,
)
print(f"[{sid[:12]}] Stored transcript in local workspace storage")
except Exception as e:
print(f"[{sid[:12]}] Transcript storage failed: {e}")
# Also store directly to filesystem as fallback
try:
from backend.util.settings import Settings
settings = Settings()
storage_dir = settings.config.workspace_storage_dir or os.path.join(
os.path.expanduser("~"), ".autogpt", "workspaces"
)
ts_dir = os.path.join(storage_dir, "chat-transcripts", _sanitize(user_id))
os.makedirs(ts_dir, exist_ok=True)
ts_path = os.path.join(ts_dir, f"{_sanitize(sid)}.jsonl")
with open(ts_path, "w") as f:
f.write(content)
meta_storage = {
"message_count": msg_count,
"uploaded_at": time.time(),
}
meta_storage_path = os.path.join(ts_dir, f"{_sanitize(sid)}.meta.json")
with open(meta_storage_path, "w") as f:
json.dump(meta_storage, f)
print(f"[{sid[:12]}] Also wrote to: {ts_path}")
except Exception as e:
print(f"[{sid[:12]}] Direct file write failed: {e}")
print(f"[{sid[:12]}] Ready — send a message to this session to test")
print()
print("Done. Start the backend and send a message to the session(s).")
print("The CoPilot will use --resume with the loaded transcript.")
# ── Main ──────────────────────────────────────────────────────────────────
async def main() -> None:
if len(sys.argv) < 3:
print(__doc__)
sys.exit(1)
command = sys.argv[1]
session_ids = sys.argv[2:]
if command == "download":
await cmd_download(session_ids)
elif command == "load":
await cmd_load(session_ids)
elif command == "both":
await cmd_download(session_ids)
print("\n" + "=" * 60 + "\n")
await cmd_load(session_ids)
else:
print(f"Unknown command: {command}")
print("Usage: download | load | both")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())