mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Compare commits
1 Commits
feat/agent
...
fix/copilo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d227aa5071 |
@@ -1414,3 +1414,76 @@ class TestStreamChatCompletionRetryIntegration:
|
||||
# Verify user-friendly message (not raw SDK text)
|
||||
assert "Authentication" in errors[0].errorText
|
||||
assert any(isinstance(e, StreamStart) for e in events)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_result_message_prompt_too_long_triggers_compaction(self):
|
||||
"""CLI returns ResultMessage(subtype="error") with "Prompt is too long".
|
||||
|
||||
When the Claude CLI rejects the prompt pre-API (model=<synthetic>,
|
||||
duration_api_ms=0), it sends a ResultMessage with is_error=True
|
||||
instead of raising a Python exception. The retry loop must still
|
||||
detect this as a context-length error and trigger compaction.
|
||||
"""
|
||||
import contextlib
|
||||
|
||||
from claude_agent_sdk import ResultMessage
|
||||
|
||||
from backend.copilot.response_model import StreamError, StreamStart
|
||||
from backend.copilot.sdk.service import stream_chat_completion_sdk
|
||||
|
||||
session = self._make_session()
|
||||
success_result = self._make_result_message()
|
||||
attempt_count = [0]
|
||||
|
||||
error_result = ResultMessage(
|
||||
subtype="error",
|
||||
result="Prompt is too long",
|
||||
duration_ms=100,
|
||||
duration_api_ms=0,
|
||||
is_error=True,
|
||||
num_turns=0,
|
||||
session_id="test-session-id",
|
||||
)
|
||||
|
||||
def _client_factory(*args, **kwargs):
|
||||
attempt_count[0] += 1
|
||||
if attempt_count[0] == 1:
|
||||
# First attempt: CLI returns error ResultMessage
|
||||
return self._make_client_mock(result_message=error_result)
|
||||
# Second attempt (after compaction): succeeds
|
||||
return self._make_client_mock(result_message=success_result)
|
||||
|
||||
original_transcript = _build_transcript(
|
||||
[("user", "prior question"), ("assistant", "prior answer")]
|
||||
)
|
||||
compacted_transcript = _build_transcript(
|
||||
[("user", "[summary]"), ("assistant", "summary reply")]
|
||||
)
|
||||
|
||||
patches = _make_sdk_patches(
|
||||
session,
|
||||
original_transcript=original_transcript,
|
||||
compacted_transcript=compacted_transcript,
|
||||
client_side_effect=_client_factory,
|
||||
)
|
||||
|
||||
events = []
|
||||
with contextlib.ExitStack() as stack:
|
||||
for target, kwargs in patches:
|
||||
stack.enter_context(patch(target, **kwargs))
|
||||
async for event in stream_chat_completion_sdk(
|
||||
session_id="test-session-id",
|
||||
message="hello",
|
||||
is_user_message=True,
|
||||
user_id="test-user",
|
||||
session=session,
|
||||
):
|
||||
events.append(event)
|
||||
|
||||
assert attempt_count[0] == 2, (
|
||||
f"Expected 2 SDK attempts (CLI error ResultMessage "
|
||||
f"should trigger compaction retry), got {attempt_count[0]}"
|
||||
)
|
||||
errors = [e for e in events if isinstance(e, StreamError)]
|
||||
assert not errors, f"Unexpected StreamError: {errors}"
|
||||
assert any(isinstance(e, StreamStart) for e in events)
|
||||
|
||||
@@ -1404,6 +1404,14 @@ async def _run_stream_attempt(
|
||||
ctx.log_prefix,
|
||||
sdk_msg.result or "(no error message provided)",
|
||||
)
|
||||
# If the CLI itself rejected the prompt as too long
|
||||
# (pre-API check, duration_api_ms=0), re-raise as an
|
||||
# exception so the retry loop can trigger compaction.
|
||||
# Without this, the ResultMessage is silently consumed
|
||||
# and the retry/compaction mechanism is never invoked.
|
||||
error_text = (sdk_msg.result or "").lower()
|
||||
if any(p in error_text for p in _PROMPT_TOO_LONG_PATTERNS):
|
||||
raise Exception(sdk_msg.result or "prompt is too long")
|
||||
|
||||
# Capture token usage from ResultMessage.
|
||||
# Anthropic reports cached tokens separately:
|
||||
|
||||
@@ -43,6 +43,10 @@ STRIPPABLE_TYPES = frozenset(
|
||||
{"progress", "file-history-snapshot", "queue-operation", "summary", "pr-link"}
|
||||
)
|
||||
|
||||
# Thinking block types that can be stripped from non-last assistant entries.
|
||||
# The Anthropic API only requires these in the *last* assistant message.
|
||||
_THINKING_BLOCK_TYPES = frozenset({"thinking", "redacted_thinking"})
|
||||
|
||||
|
||||
@dataclass
|
||||
class TranscriptDownload:
|
||||
@@ -450,6 +454,75 @@ def _build_meta_storage_path(user_id: str, session_id: str, backend: object) ->
|
||||
)
|
||||
|
||||
|
||||
def strip_stale_thinking_blocks(content: str) -> str:
|
||||
"""Remove thinking/redacted_thinking blocks from non-last assistant entries.
|
||||
|
||||
The Anthropic API only requires thinking blocks in the **last** assistant
|
||||
message to be value-identical to the original response. Older assistant
|
||||
entries carry stale thinking blocks that consume significant tokens
|
||||
(often 10-50K each) without providing useful context for ``--resume``.
|
||||
|
||||
Stripping them before upload prevents the CLI from triggering compaction
|
||||
every turn just to compress away the stale thinking bloat.
|
||||
"""
|
||||
lines = content.strip().split("\n")
|
||||
if not lines:
|
||||
return content
|
||||
|
||||
parsed: list[tuple[str, dict | None]] = []
|
||||
for line in lines:
|
||||
parsed.append((line, json.loads(line, fallback=None)))
|
||||
|
||||
# Reverse scan to find the last assistant message ID.
|
||||
last_asst_msg_id: str | None = None
|
||||
for _line, entry in reversed(parsed):
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
msg = entry.get("message", {})
|
||||
if msg.get("role") == "assistant":
|
||||
last_asst_msg_id = msg.get("id")
|
||||
break
|
||||
|
||||
if last_asst_msg_id is None:
|
||||
return content
|
||||
|
||||
result_lines: list[str] = []
|
||||
stripped_count = 0
|
||||
for line, entry in parsed:
|
||||
if not isinstance(entry, dict):
|
||||
result_lines.append(line)
|
||||
continue
|
||||
|
||||
msg = entry.get("message", {})
|
||||
# Only strip from assistant entries that are NOT the last turn.
|
||||
if (
|
||||
msg.get("role") == "assistant"
|
||||
and msg.get("id") != last_asst_msg_id
|
||||
and isinstance(msg.get("content"), list)
|
||||
):
|
||||
content_blocks = msg["content"]
|
||||
filtered = [
|
||||
b
|
||||
for b in content_blocks
|
||||
if not (isinstance(b, dict) and b.get("type") in _THINKING_BLOCK_TYPES)
|
||||
]
|
||||
if len(filtered) < len(content_blocks):
|
||||
stripped_count += len(content_blocks) - len(filtered)
|
||||
entry = {**entry, "message": {**msg, "content": filtered}}
|
||||
result_lines.append(json.dumps(entry, separators=(",", ":")))
|
||||
continue
|
||||
|
||||
result_lines.append(line)
|
||||
|
||||
if stripped_count:
|
||||
logger.info(
|
||||
"[Transcript] Stripped %d stale thinking block(s) from non-last entries",
|
||||
stripped_count,
|
||||
)
|
||||
|
||||
return "\n".join(result_lines) + "\n"
|
||||
|
||||
|
||||
async def upload_transcript(
|
||||
user_id: str,
|
||||
session_id: str,
|
||||
@@ -472,6 +545,9 @@ async def upload_transcript(
|
||||
# Strip metadata entries (progress, file-history-snapshot, etc.)
|
||||
# Note: SDK-built transcripts shouldn't have these, but strip for safety
|
||||
stripped = strip_progress_entries(content)
|
||||
# Strip stale thinking blocks from older assistant entries — these consume
|
||||
# significant tokens and trigger unnecessary CLI compaction every turn.
|
||||
stripped = strip_stale_thinking_blocks(stripped)
|
||||
if not validate_transcript(stripped):
|
||||
# Log entry types for debugging — helps identify why validation failed
|
||||
entry_types = [
|
||||
|
||||
Reference in New Issue
Block a user