fix(platform): address PR #12401 review - one-shot compaction flag, path validation, tests

- Make compaction_just_ended a one-shot flag that resets after being read,
  fixing multiple mid-stream compactions not triggering replace_entries
- Add path validation to read_compacted_entries() to reject paths outside
  the CLI projects directory
- Reset _transcript_path in reset_for_query() for consistency
- Add tests for read_compacted_entries and TranscriptBuilder.replace_entries
- Preserve isCompactSummary entries during STRIPPABLE_TYPES filtering
This commit is contained in:
Zamil Majdy
2026-03-13 23:46:46 +07:00
parent 9f60fda37f
commit 8f0f6ced10
6 changed files with 303 additions and 12 deletions

View File

@@ -185,8 +185,15 @@ class CompactionTracker:
@property
def compaction_just_ended(self) -> bool:
"""True after emit_end_if_ready() has completed a compaction cycle."""
return self._done
"""One-shot: returns True once after a compaction cycle completes.
The flag resets after being read so subsequent compactions within the
same query are detected correctly.
"""
if self._done:
self._done = False
return True
return False
def on_compact(self, transcript_path: str = "") -> None:
"""Callback for the PreCompact hook. Stores transcript_path."""
@@ -211,6 +218,7 @@ class CompactionTracker:
self._done = False
self._start_emitted = False
self._tool_call_id = ""
self._transcript_path = ""
def emit_start_if_ready(self) -> list[StreamBaseResponse]:
"""If the PreCompact hook fired, emit start events (spinning tool)."""

View File

@@ -216,13 +216,15 @@ class TestCompactionTracker:
assert len(session.messages) == 2
@pytest.mark.asyncio
async def test_emit_end_no_op_when_done(self):
async def test_emit_end_no_op_when_no_new_compaction(self):
tracker = CompactionTracker()
session = _make_session()
tracker.on_compact()
tracker.emit_start_if_ready()
await tracker.emit_end_if_ready(session)
# Second call should be no-op
# Consume the one-shot flag
_ = tracker.compaction_just_ended
# Second call should be no-op (no new on_compact)
evts = await tracker.emit_end_if_ready(session)
assert evts == []
@@ -246,20 +248,28 @@ class TestCompactionTracker:
tracker._done = True
tracker._start_emitted = True
tracker._tool_call_id = "old"
tracker._transcript_path = "/some/path"
tracker.reset_for_query()
assert tracker._done is False
assert tracker._start_emitted is False
assert tracker._tool_call_id == ""
assert tracker._transcript_path == ""
@pytest.mark.asyncio
async def test_pre_query_blocks_sdk_compaction(self):
"""After pre-query compaction, SDK compaction events are suppressed."""
async def test_pre_query_blocks_sdk_compaction_until_consumed(self):
"""After pre-query compaction, SDK compaction is blocked until the
one-shot flag is consumed via compaction_just_ended."""
tracker = CompactionTracker()
session = _make_session()
tracker.emit_pre_query(session)
tracker.on_compact()
# _done is True so emit_start_if_ready is blocked
evts = tracker.emit_start_if_ready()
assert evts == [] # _done blocks it
assert evts == []
# Consuming the one-shot flag allows subsequent compaction
assert tracker.compaction_just_ended is True
evts = tracker.emit_start_if_ready()
assert len(evts) == 3
@pytest.mark.asyncio
async def test_reset_allows_new_compaction(self):
@@ -289,3 +299,36 @@ class TestCompactionTracker:
tool_calls = session.messages[0].tool_calls
assert tool_calls is not None
assert tool_calls[0]["id"] == start_evt.toolCallId
def test_compaction_just_ended_is_one_shot(self):
"""compaction_just_ended returns True once, then False."""
tracker = CompactionTracker()
tracker._done = True
assert tracker.compaction_just_ended is True
assert tracker.compaction_just_ended is False
@pytest.mark.asyncio
async def test_multiple_compactions_within_query(self):
"""Two mid-stream compactions within a single query both trigger."""
tracker = CompactionTracker()
session = _make_session()
# First compaction cycle
tracker.on_compact("/path/1")
tracker.emit_start_if_ready()
evts1 = await tracker.emit_end_if_ready(session)
assert len(evts1) == 2
assert tracker.compaction_just_ended is True
# Second compaction cycle (should NOT be blocked)
tracker.on_compact("/path/2")
start_evts = tracker.emit_start_if_ready()
assert len(start_evts) == 3 # Not blocked by first compaction
evts2 = await tracker.emit_end_if_ready(session)
assert len(evts2) == 2
assert tracker.compaction_just_ended is True
def test_on_compact_stores_transcript_path(self):
tracker = CompactionTracker()
tracker.on_compact("/some/path.jsonl")
assert tracker.transcript_path == "/some/path.jsonl"

View File

@@ -1134,10 +1134,9 @@ async def stream_chat_completion_sdk(
# Emit compaction end if SDK finished compacting.
# When compaction ends, sync TranscriptBuilder with the
# CLI's active context so they stay identical.
compaction_was_done = compaction.compaction_just_ended
for ev in await compaction.emit_end_if_ready(session):
yield ev
if not compaction_was_done and compaction.compaction_just_ended:
if compaction.compaction_just_ended:
compacted = read_compacted_entries(compaction.transcript_path)
if compacted is not None:
transcript_builder.replace_entries(

View File

@@ -195,8 +195,17 @@ def read_compacted_entries(transcript_path: str) -> list[dict] | None:
if not transcript_path:
return None
config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.expanduser("~/.claude")
projects_base = os.path.realpath(os.path.join(config_dir, "projects"))
real_path = os.path.realpath(transcript_path)
if not real_path.startswith(projects_base + os.sep):
logger.warning(
"[Transcript] transcript_path outside projects base: %s", transcript_path
)
return None
try:
content = Path(transcript_path).read_text()
content = Path(real_path).read_text()
except OSError as e:
logger.warning(
"[Transcript] Failed to read session file %s: %s", transcript_path, e

View File

@@ -80,8 +80,9 @@ class TranscriptBuilder:
# Load all non-strippable entries (user/assistant/system/etc.)
# Skip only STRIPPABLE_TYPES to match strip_progress_entries() behavior
# but preserve isCompactSummary entries (compacted context from CLI)
entry_type = data.get("type", "")
if entry_type in STRIPPABLE_TYPES:
if entry_type in STRIPPABLE_TYPES and not data.get("isCompactSummary"):
continue
entry = TranscriptEntry(
@@ -176,7 +177,7 @@ class TranscriptBuilder:
for data in compacted_entries:
entry_type = data.get("type", "")
if entry_type in STRIPPABLE_TYPES:
if entry_type in STRIPPABLE_TYPES and not data.get("isCompactSummary"):
continue
entry = TranscriptEntry(

View File

@@ -12,10 +12,12 @@ from .transcript import (
_cli_project_dir,
delete_transcript,
read_cli_session_file,
read_compacted_entries,
strip_progress_entries,
validate_transcript,
write_transcript_to_tempfile,
)
from .transcript_builder import TranscriptBuilder
def _make_jsonl(*entries: dict) -> str:
@@ -424,3 +426,232 @@ class TestDeleteTranscript:
):
# Should not raise
await delete_transcript("user-123", "session-456")
# --- read_compacted_entries ---
COMPACT_SUMMARY = {
"type": "summary",
"uuid": "cs1",
"isCompactSummary": True,
"message": {"role": "assistant", "content": "compacted context"},
}
POST_COMPACT_ASST = {
"type": "assistant",
"uuid": "a2",
"parentUuid": "cs1",
"message": {"role": "assistant", "content": "response after compaction"},
}
class TestReadCompactedEntries:
def test_returns_summary_and_entries_after(self, tmp_path, monkeypatch):
"""File with isCompactSummary entry returns summary + entries after."""
config_dir = tmp_path / "config"
projects_dir = config_dir / "projects"
session_dir = projects_dir / "proj"
session_dir.mkdir(parents=True)
monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(config_dir))
pre_compact = {"type": "user", "uuid": "u1", "message": {"role": "user"}}
path = session_dir / "session.jsonl"
path.write_text(_make_jsonl(pre_compact, COMPACT_SUMMARY, POST_COMPACT_ASST))
result = read_compacted_entries(str(path))
assert result is not None
assert len(result) == 2
assert result[0]["isCompactSummary"] is True
assert result[1]["uuid"] == "a2"
def test_no_compact_summary_returns_none(self, tmp_path, monkeypatch):
"""File without isCompactSummary returns None."""
config_dir = tmp_path / "config"
projects_dir = config_dir / "projects"
session_dir = projects_dir / "proj"
session_dir.mkdir(parents=True)
monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(config_dir))
path = session_dir / "session.jsonl"
path.write_text(_make_jsonl(USER_MSG, ASST_MSG))
result = read_compacted_entries(str(path))
assert result is None
def test_file_not_found_returns_none(self, tmp_path, monkeypatch):
"""Non-existent file returns None."""
config_dir = tmp_path / "config"
projects_dir = config_dir / "projects"
projects_dir.mkdir(parents=True)
monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(config_dir))
result = read_compacted_entries(str(projects_dir / "missing.jsonl"))
assert result is None
def test_empty_path_returns_none(self):
"""Empty string path returns None."""
result = read_compacted_entries("")
assert result is None
def test_malformed_json_lines_skipped(self, tmp_path, monkeypatch):
"""Malformed JSON lines are skipped gracefully."""
config_dir = tmp_path / "config"
projects_dir = config_dir / "projects"
session_dir = projects_dir / "proj"
session_dir.mkdir(parents=True)
monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(config_dir))
path = session_dir / "session.jsonl"
content = "not valid json\n" + json.dumps(COMPACT_SUMMARY) + "\n"
content += "also bad\n" + json.dumps(POST_COMPACT_ASST) + "\n"
path.write_text(content)
result = read_compacted_entries(str(path))
assert result is not None
assert len(result) == 2 # summary + post-compact assistant
def test_multiple_compact_summaries_uses_first(self, tmp_path, monkeypatch):
"""When multiple isCompactSummary entries exist, uses the first one."""
config_dir = tmp_path / "config"
projects_dir = config_dir / "projects"
session_dir = projects_dir / "proj"
session_dir.mkdir(parents=True)
monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(config_dir))
second_summary = {
"type": "summary",
"uuid": "cs2",
"isCompactSummary": True,
"message": {"role": "assistant", "content": "second summary"},
}
path = session_dir / "session.jsonl"
path.write_text(_make_jsonl(COMPACT_SUMMARY, POST_COMPACT_ASST, second_summary))
result = read_compacted_entries(str(path))
assert result is not None
# First summary found, so all 3 entries returned
assert len(result) == 3
assert result[0]["uuid"] == "cs1"
def test_path_outside_projects_base_returns_none(self, tmp_path, monkeypatch):
"""Transcript path outside the projects directory is rejected."""
config_dir = tmp_path / "config"
(config_dir / "projects").mkdir(parents=True)
monkeypatch.setenv("CLAUDE_CONFIG_DIR", str(config_dir))
evil_file = tmp_path / "evil.jsonl"
evil_file.write_text(_make_jsonl(COMPACT_SUMMARY))
result = read_compacted_entries(str(evil_file))
assert result is None
# --- TranscriptBuilder.replace_entries ---
class TestTranscriptBuilderReplaceEntries:
def test_replaces_existing_entries(self):
"""replace_entries replaces all entries with compacted ones."""
builder = TranscriptBuilder()
builder.append_user("hello")
builder.append_assistant([{"type": "text", "text": "world"}])
assert builder.entry_count == 2
compacted = [
{
"type": "user",
"uuid": "cs1",
"isCompactSummary": True,
"message": {"role": "user", "content": "compacted summary"},
},
{
"type": "assistant",
"uuid": "a1",
"parentUuid": "cs1",
"message": {"role": "assistant", "content": "response"},
},
]
builder.replace_entries(compacted)
assert builder.entry_count == 2
output = builder.to_jsonl()
entries = [json.loads(line) for line in output.strip().split("\n")]
assert entries[0]["uuid"] == "cs1"
assert entries[1]["uuid"] == "a1"
def test_filters_strippable_types(self):
"""Strippable types are filtered out during replace."""
builder = TranscriptBuilder()
compacted = [
{
"type": "user",
"uuid": "cs1",
"message": {"role": "user", "content": "compacted summary"},
},
{"type": "progress", "uuid": "p1", "message": {}},
{"type": "summary", "uuid": "s1", "message": {}},
{
"type": "assistant",
"uuid": "a1",
"parentUuid": "cs1",
"message": {"role": "assistant", "content": "hi"},
},
]
builder.replace_entries(compacted)
assert builder.entry_count == 2 # progress and summary were filtered
def test_maintains_last_uuid_chain(self):
"""After replace, _last_uuid is the last entry's uuid."""
builder = TranscriptBuilder()
compacted = [
{
"type": "user",
"uuid": "cs1",
"message": {"role": "user", "content": "compacted summary"},
},
{
"type": "assistant",
"uuid": "a1",
"parentUuid": "cs1",
"message": {"role": "assistant", "content": "hi"},
},
]
builder.replace_entries(compacted)
# Appending a new user message should chain to a1
builder.append_user("next question")
output = builder.to_jsonl()
entries = [json.loads(line) for line in output.strip().split("\n")]
assert entries[-1]["parentUuid"] == "a1"
def test_empty_entries_list(self):
"""Replacing with empty list clears all entries."""
builder = TranscriptBuilder()
builder.append_user("hello")
builder.replace_entries([])
assert builder.entry_count == 0
assert builder.is_empty
# --- TranscriptBuilder.load_previous with compacted content ---
class TestTranscriptBuilderLoadPreviousCompacted:
def test_preserves_compact_summary_entry(self):
"""load_previous preserves isCompactSummary entries even though
their type is 'summary' (which is in STRIPPABLE_TYPES)."""
compacted_content = _make_jsonl(COMPACT_SUMMARY, POST_COMPACT_ASST)
builder = TranscriptBuilder()
builder.load_previous(compacted_content)
assert builder.entry_count == 2
output = builder.to_jsonl()
entries = [json.loads(line) for line in output.strip().split("\n")]
assert entries[0]["type"] == "summary"
assert entries[0]["uuid"] == "cs1"
assert entries[1]["uuid"] == "a2"
def test_strips_regular_summary_entries(self):
"""Regular summary entries (without isCompactSummary) are still stripped."""
regular_summary = {"type": "summary", "uuid": "s1", "message": {"content": "x"}}
content = _make_jsonl(regular_summary, POST_COMPACT_ASST)
builder = TranscriptBuilder()
builder.load_previous(content)
assert builder.entry_count == 1 # Only the assistant entry