fix(platform): sync TranscriptBuilder with CLI on mid-stream compaction

TranscriptBuilder accumulated ALL messages (pre + post compaction),
making it larger than the CLI's active context after mid-stream
compaction. This caused "Prompt is too long" errors on resume.

Now when the PreCompact hook fires, CompactionTracker captures the
transcript_path. When compaction ends (next message arrives), we read
the CLI session file, find the isCompactSummary entry, and replace
TranscriptBuilder's entries with the compacted entries. This ensures
TranscriptBuilder always mirrors the CLI's active context.

Removes the racy read_cli_session_file fallback from the finally block
- TranscriptBuilder is now the single source of truth for uploads.
This commit is contained in:
Zamil Majdy
2026-03-13 23:26:06 +07:00
parent b04f806760
commit 9f60fda37f
5 changed files with 130 additions and 56 deletions

View File

@@ -11,7 +11,6 @@ persistence, and the ``CompactionTracker`` state machine.
import asyncio
import logging
import uuid
from collections.abc import Callable
from ..constants import COMPACTION_DONE_MSG, COMPACTION_TOOL_NAME
from ..model import ChatMessage, ChatSession
@@ -177,11 +176,22 @@ class CompactionTracker:
self._start_emitted = False
self._done = False
self._tool_call_id = ""
self._transcript_path: str = ""
@property
def on_compact(self) -> Callable[[], None]:
"""Callback for the PreCompact hook."""
return self._compact_start.set
def transcript_path(self) -> str:
"""Path to the CLI session file, captured from PreCompact hook."""
return self._transcript_path
@property
def compaction_just_ended(self) -> bool:
"""True after emit_end_if_ready() has completed a compaction cycle."""
return self._done
def on_compact(self, transcript_path: str = "") -> None:
"""Callback for the PreCompact hook. Stores transcript_path."""
self._transcript_path = transcript_path
self._compact_start.set()
# ------------------------------------------------------------------
# Pre-query compaction

View File

@@ -127,7 +127,7 @@ def create_security_hooks(
user_id: str | None,
sdk_cwd: str | None = None,
max_subtasks: int = 3,
on_compact: Callable[[], None] | None = None,
on_compact: Callable[[str], None] | None = None,
) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
@@ -142,6 +142,7 @@ def create_security_hooks(
sdk_cwd: SDK working directory for workspace-scoped tool validation
max_subtasks: Maximum concurrent Task (sub-agent) spawns allowed per session
on_compact: Callback invoked when SDK starts compacting context.
Receives the transcript_path from the hook input.
Returns:
Hooks configuration dict for ClaudeAgentOptions
@@ -301,11 +302,16 @@ def create_security_hooks(
"""
_ = context, tool_use_id
trigger = input_data.get("trigger", "auto")
transcript_path = input_data.get("transcript_path", "")
logger.info(
f"[SDK] Context compaction triggered: {trigger}, user={user_id}"
"[SDK] Context compaction triggered: %s, user=%s, "
"transcript_path=%s",
trigger,
user_id,
transcript_path,
)
if on_compact is not None:
on_compact()
on_compact(transcript_path)
return cast(SyncHookJSONOutput, {})
hooks: dict[str, Any] = {

View File

@@ -77,7 +77,7 @@ from .tool_adapter import (
from .transcript import (
cleanup_cli_project_dir,
download_transcript,
read_cli_session_file,
read_compacted_entries,
upload_transcript,
validate_transcript,
write_transcript_to_tempfile,
@@ -1131,9 +1131,18 @@ async def stream_chat_completion_sdk(
sdk_msg.result or "(no error message provided)",
)
# Emit compaction end if SDK finished compacting
# Emit compaction end if SDK finished compacting.
# When compaction ends, sync TranscriptBuilder with the
# CLI's active context so they stay identical.
compaction_was_done = compaction.compaction_just_ended
for ev in await compaction.emit_end_if_ready(session):
yield ev
if not compaction_was_done and compaction.compaction_just_ended:
compacted = read_compacted_entries(compaction.transcript_path)
if compacted is not None:
transcript_builder.replace_entries(
compacted, log_prefix=log_prefix
)
for response in adapter.convert_message(sdk_msg):
if isinstance(response, StreamStart):
@@ -1424,51 +1433,13 @@ async def stream_chat_completion_sdk(
task.add_done_callback(_background_tasks.discard)
# --- Upload transcript for next-turn --resume ---
# This MUST run in finally so the transcript is uploaded even when
# the streaming loop raises an exception.
# The transcript represents the COMPLETE active context (atomic).
# TranscriptBuilder is the single source of truth. It mirrors the
# CLI's active context: on compaction, replace_entries() syncs it
# with the compacted session file. No CLI file read needed here.
if config.claude_agent_use_resume and user_id and session is not None:
try:
# Prefer the CLI's own session file — it reflects any
# mid-stream compaction the CLI performed. Fall back to
# TranscriptBuilder output when the CLI file isn't available
# (e.g. the process was killed before writing it).
cli_transcript = read_cli_session_file(sdk_cwd) if sdk_cwd else None
if cli_transcript and validate_transcript(cli_transcript):
transcript_content = cli_transcript
logger.info(
"%s Using CLI session file for transcript upload (%d bytes)",
log_prefix,
len(cli_transcript),
)
else:
if cli_transcript:
# CLI file exists but is invalid (partially flushed).
logger.warning(
"%s CLI session file found but invalid "
"(%d bytes), falling back to TranscriptBuilder",
log_prefix,
len(cli_transcript),
)
else:
logger.info(
"%s CLI session file not available, using "
"TranscriptBuilder",
log_prefix,
)
cli_transcript = None
transcript_content = transcript_builder.to_jsonl()
logger.info(
"%s TranscriptBuilder output: %d bytes",
log_prefix,
len(transcript_content) if transcript_content else 0,
)
entry_count = (
len(transcript_content.strip().splitlines())
if cli_transcript
else transcript_builder.entry_count
)
transcript_content = transcript_builder.to_jsonl()
entry_count = transcript_builder.entry_count
if not transcript_content:
logger.warning(
@@ -1482,14 +1453,11 @@ async def stream_chat_completion_sdk(
)
else:
logger.info(
"%s Uploading complete transcript (entries=%d, bytes=%d)",
"%s Uploading transcript (entries=%d, bytes=%d)",
log_prefix,
entry_count,
len(transcript_content),
)
# Shield upload from cancellation - let it complete even if
# the finally block is interrupted. No timeout to avoid race
# conditions where backgrounded uploads overwrite newer transcripts.
await asyncio.shield(
upload_transcript(
user_id=user_id,

View File

@@ -180,6 +180,63 @@ def _safe_glob_jsonl(project_dir: str) -> list[Path]:
return result
def read_compacted_entries(transcript_path: str) -> list[dict] | None:
"""Read compacted entries from the CLI session file after compaction.
Parses the JSONL file line-by-line, finds the ``isCompactSummary: true``
entry, and returns it plus all entries after it.
The CLI writes the compaction summary BEFORE sending the next message,
so the file is guaranteed to be flushed by the time we read it.
Returns a list of parsed dicts, or ``None`` if the file cannot be read
or no compaction summary is found.
"""
if not transcript_path:
return None
try:
content = Path(transcript_path).read_text()
except OSError as e:
logger.warning(
"[Transcript] Failed to read session file %s: %s", transcript_path, e
)
return None
lines = content.strip().split("\n")
compact_idx: int | None = None
for idx, line in enumerate(lines):
if not line.strip():
continue
entry = json.loads(line, fallback=None)
if not isinstance(entry, dict):
continue
if entry.get("isCompactSummary"):
compact_idx = idx
break
if compact_idx is None:
logger.debug("[Transcript] No compaction summary found in %s", transcript_path)
return None
entries: list[dict] = []
for line in lines[compact_idx:]:
if not line.strip():
continue
entry = json.loads(line, fallback=None)
if isinstance(entry, dict):
entries.append(entry)
logger.info(
"[Transcript] Read %d compacted entries from %s (summary at line %d)",
len(entries),
transcript_path,
compact_idx + 1,
)
return entries
def read_cli_session_file(sdk_cwd: str) -> str | None:
"""Read the CLI's own session file, which reflects any compaction.

View File

@@ -162,6 +162,39 @@ class TranscriptBuilder:
)
self._last_uuid = msg_uuid
def replace_entries(
self, compacted_entries: list[dict], log_prefix: str = "[Transcript]"
) -> None:
"""Replace all entries with compacted entries from the CLI session file.
Called after mid-stream compaction so TranscriptBuilder mirrors the
CLI's active context (compaction summary + post-compaction entries).
"""
old_count = len(self._entries)
self._entries.clear()
self._last_uuid = None
for data in compacted_entries:
entry_type = data.get("type", "")
if entry_type in STRIPPABLE_TYPES:
continue
entry = TranscriptEntry(
type=entry_type,
uuid=data.get("uuid") or str(uuid4()),
parentUuid=data.get("parentUuid"),
message=data.get("message", {}),
)
self._entries.append(entry)
self._last_uuid = entry.uuid
logger.info(
"%s TranscriptBuilder compacted: %d entries -> %d entries",
log_prefix,
old_count,
len(self._entries),
)
def to_jsonl(self) -> str:
"""Export complete context as JSONL.