Compare commits

...

25 Commits

Author SHA1 Message Date
Zamil Majdy
9123178556 fix(backend/copilot): prevent double upload in timeout handling
Previous fix created NEW task after timeout, causing double upload:
- Original shielded task still running
- New task also uploading same transcript

Correct fix: Create task FIRST, then shield it. If timeout occurs,
track the SAME task (no double upload).

Fixes double-upload bug in b8c65e3d2
2026-03-06 19:25:45 +07:00
Zamil Majdy
b8c65e3d2b fix(backend/copilot): prevent transcript upload task garbage collection
HIGH severity fix: When upload_transcript times out after 30s, the shielded
coroutine continues running but becomes orphaned (no reference). Python's GC
can reclaim the task before completion, causing silent data loss.

Fix: If TimeoutError occurs, explicitly create task and track in _background_tasks
to maintain strong reference. Upload completes in background without blocking
session lock release.

Addresses PR discussion r2895491552
2026-03-06 19:23:54 +07:00
Zamil Majdy
0eddb6f1bb fix(backend/copilot): address PR review - downgrade PII logging and add upload timeout
1. Downgrade content preview logging from WARNING to DEBUG (transcript.py:325-326)
   - Prevents logging up to 500 chars of user conversation content (PII risk)
   - Keep validation failure at WARNING, only preview at DEBUG

2. Add 30s timeout to upload_transcript in finally block (service.py:1557)
   - Prevents session lock from hanging indefinitely if upload stalls
   - Uses asyncio.timeout wrapper around asyncio.shield

Addresses PR review #3903048969 item #2 and discussion r2895449830
2026-03-06 19:16:23 +07:00
Zamil Majdy
042ed42c0b refactor(backend/copilot): move SDK imports to top-level
Move TextBlock, ThinkingBlock, ToolResultBlock imports from inside
_format_sdk_content_blocks to top-level, following code style guidelines
(prefer top-level imports over function-local imports).
2026-03-06 19:06:26 +07:00
Zamil Majdy
eadd67c70c test(backend/copilot): add test for blank lines in validate_transcript
Covers the fix in ad7044995 that skips empty lines instead of
treating them as parse errors.
2026-03-06 19:05:22 +07:00
Zamil Majdy
ad7044995e fix(backend/copilot): handle blank lines in validate_transcript
Skip empty lines instead of treating them as parse errors,
preventing silent data loss from transcripts with blank lines.

Addresses PR comment #2894841670
2026-03-06 19:02:12 +07:00
Zamil Majdy
203bf2ca32 test(backend/copilot): remove tests for deleted transcript functions
- Remove TestReadTranscriptFile class (read_transcript_file deleted)
- Remove TestMergeWithPreviousTranscript class (merge_with_previous_transcript deleted)
- Remove TestTryUploadTranscript class (_try_upload_transcript deleted)
- All remaining tests pass (23 tests in transcript_test.py)
2026-03-06 18:58:42 +07:00
Zamil Majdy
a1e0caa983 refactor(backend/copilot): build transcript from SDK messages (atomic full-context)
Replace CLI file reading (race condition) with direct SDK message capture.
Transcript now represents COMPLETE active context, not incremental changes.

Changes:
- NEW: transcript_builder.py - TranscriptBuilder class
- REMOVED: Stop hook + file reading logic (~200 lines)
- REMOVED: merge_with_previous_transcript, read_transcript_file
- SIMPLIFIED: upload_transcript (no merge, atomic replace)
- CLEANED: Removed gap-based compression fallback

Benefits:
- Eliminates race conditions (Stop hook unreliable)
- Atomic transcript (full context always)
- -372 total lines removed
- Cleaner, more maintainable code

Transcript flow (atomic):
  Turn N: Download full context → Add new messages → Upload complete (REPLACE)
2026-03-06 18:44:35 +07:00
Zamil Majdy
440a06ad9e fix(backend/copilot): manually append CLI transcript to previous when CLI fails to append 2026-03-06 16:55:47 +07:00
Zamil Majdy
8ec706c125 debug(backend/copilot): add detailed UUID logging for merge diagnosis 2026-03-06 16:46:17 +07:00
Zamil Majdy
6c83a91ae3 debug(backend/copilot): add logging to diagnose merge issue 2026-03-06 16:42:19 +07:00
Zamil Majdy
f19a423cae refactor(backend/copilot): simplify transcript merge to use UUID matching only
Remove fragile synthetic entry detection ("<synthetic>" string check) in favor
of simple UUID-based matching: previous transcript always wins for matching UUIDs,
new UUIDs are added.

This approach is more robust and doesn't depend on CLI implementation details
that could change. The test `test_preserves_real_entries` was updated to reflect
the new behavior since the scenario it tested (same UUID with different real content)
is not a known real-world case.
2026-03-06 16:33:52 +07:00
Zamil Majdy
87258441f2 refactor(backend/copilot): eliminate synthetic detection, use pure UUID matching
MAJOR SIMPLIFICATION: Instead of detecting '<synthetic>' marker (fragile),
just replace ANY assistant entry if its UUID exists in previous transcript.

This works because:
- Previous transcript has real content with UUIDs: a1, a2, ...
- New transcript (--resume) has placeholders with SAME UUIDs + new content with NEW UUID
- Matching UUIDs = old turns that need real content restored 
- Non-matching UUID = current turn's new real content, keep as-is 

Benefits:
- No fragile '<synthetic>' constant to maintain
- No SDK/CLI version compatibility concerns
- Simpler logic: just UUID matching
- Works even if CLI changes synthetic format

Credit: @majdyz for the insight!
2026-03-06 16:21:09 +07:00
Zamil Majdy
494978319e refactor(backend/copilot): simplify synthetic entry detection and flatten merge logic
- Use simple model=='<synthetic>' check (not content - avoids false positives)
- Rename _is_synthetic_assistant_entry() -> _is_synthetic() (concise)
- Flatten merge_with_previous_transcript() with early returns/continues
- Use walrus operator where appropriate
- Reduce nesting and improve readability
- Keep debug logging to diagnose UUID mismatch issues

Note: SDK doesn't expose SYNTHETIC_MODEL constant - it's a CLI detail.
2026-03-06 16:18:57 +07:00
Zamil Majdy
9135969c34 fix(backend/copilot): convert conversation_turn to string for metadata type 2026-03-06 16:08:18 +07:00
Zamil Majdy
8625a82495 fix(backend/copilot): add conversation_turn to Langfuse trace metadata
Add conversation_turn field to trace metadata with the correct turn number
based on user message count. This fixes the Langfuse observability issue where
SDK's num_turns always showed '1' when using --resume (which creates a new CLI
session each turn).

Now Langfuse traces will include:
- langsmith.metadata.num_turns: '1' (from SDK, represents current CLI session)
- metadata.conversation_turn: <actual turn> (from our code, represents conversation history)

This gives us accurate turn tracking for multi-turn conversation analysis.
2026-03-06 16:07:30 +07:00
Zamil Majdy
c17f19317b test(backend/copilot): add unit tests for _try_upload_transcript
- Add TestTryUploadTranscript class with 3 tests
- test_upload_succeeds_with_valid_transcript: verifies successful upload returns True
- test_upload_returns_false_on_timeout: verifies timeout handling
- test_upload_returns_false_on_exception: verifies exception handling

These tests prevent regression of the double-upload bug where transcripts were
uploaded twice per turn (once in success path, once in finally block), causing
new data to be overwritten with stale data.

Note: Full integration test of stream_chat_completion_sdk upload behavior requires
extensive mocking of SDK, locks, sessions, and sandbox infrastructure. This is
deferred to follow-up work. The code structure ensures single upload by having
only one call site in the finally block at service.py:1563-1570.
2026-03-06 15:55:02 +07:00
Zamil Majdy
fc48944b56 fix(backend/copilot): address CodeRabbit review comments
- Fix turn number off-by-one: compute log_prefix after appending user message
- Fix stream error logging: check ended_with_stream_error before logging success
- Initialize ended_with_stream_error at function start for pyright
2026-03-06 15:32:59 +07:00
Zamil Majdy
2f57c1499c Merge remote-tracking branch 'origin/dev' into hotfix/transcript-error 2026-03-06 15:04:08 +07:00
Zamil Majdy
7042fcecdf fix(backend/copilot): merge synthetic transcript entries with real assistant content
When using --resume, the CLI creates a new session and writes synthetic
placeholders (model: "<synthetic>", "No response requested.") for all
previous assistant turns. This caused the copilot to "forget" its own
answers across turns.

Changes:
- Wire up merge_with_previous_transcript in the upload pipeline: the
  downloaded transcript from the start of the turn is passed through
  to upload_transcript, which restores real assistant content before
  stripping and uploading.
- Refactor strip_progress_entries to preserve original JSON line
  formatting for entries that don't need reparenting, avoiding
  unnecessary re-serialization.
- Add structured log_prefix ([SDK][session][turn]) across all SDK
  and transcript log lines for better debugging.
- Add tests for merge logic and line-preservation behavior.
2026-03-06 14:33:43 +07:00
Zamil Majdy
3e45a28307 fix(backend/copilot): don't short-circuit JSONL validation in validate_transcript
Remove `break` after finding first assistant entry so all remaining
lines are still validated for JSON correctness. Without this, corrupted
JSONL after the first assistant entry would slip through and get
uploaded as a broken --resume file.
2026-03-06 13:07:40 +07:00
Zamil Majdy
81aea5dc52 test(backend/copilot): add regression tests for --resume transcript validation
- Fix test_assistant_only_no_user to assert True (was False — the old buggy behavior)
- Add test_resume_transcript_without_user_entry: simulates a real --resume
  stop hook transcript with summary + assistant entries but no user entry
- Add test_returns_content_for_resume_transcript: verifies read_transcript_file
  accepts transcripts without user entries
2026-03-06 13:06:20 +07:00
Zamil Majdy
60f950c719 fix(backend/copilot): fix transcript validation rejecting --resume transcripts and remove double upload
Two root causes for copilot "forgetting" conversation history:

1. validate_transcript() required both `type: "user"` AND `type: "assistant"`
   entries. With --resume, the user's message is passed as a CLI query
   parameter and does NOT appear in the transcript file. This caused
   read_transcript_file() to return None in the stop hook, so the transcript
   was never captured or uploaded. Confirmed via Langfuse: num_turns drops
   to 1 on subsequent turns across all 3 affected sessions.

   Fix: Only require `has_assistant` — assistant entries are the meaningful
   conversation content and are always present.

2. The success path (before the finally block) uploaded the OLD resume file
   (downloaded transcript from previous turn), then the finally block
   overwrote it with the stop hook content. This double-upload was wasteful
   and could overwrite newer data with stale data.

   Fix: Remove success path upload entirely — the finally block is the
   single source of truth for transcript uploads.
2026-03-06 12:55:05 +07:00
Zamil Majdy
0b9e0665dd Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT 2026-03-06 02:32:36 +07:00
Zamil Majdy
f6f268a1f0 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into HEAD 2026-03-06 02:29:56 +07:00
5 changed files with 478 additions and 366 deletions

View File

@@ -127,7 +127,6 @@ def create_security_hooks(
sdk_cwd: str | None = None,
max_subtasks: int = 3,
on_compact: Callable[[], None] | None = None,
on_stop: Callable[[str, str], None] | None = None,
) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
@@ -136,15 +135,12 @@ def create_security_hooks(
- PostToolUse: Log successful tool executions
- PostToolUseFailure: Log and handle failed tool executions
- PreCompact: Log context compaction events (SDK handles compaction automatically)
- Stop: Capture transcript path for stateless resume (when *on_stop* is provided)
Args:
user_id: Current user ID for isolation validation
sdk_cwd: SDK working directory for workspace-scoped tool validation
max_subtasks: Maximum concurrent Task (sub-agent) spawns allowed per session
on_stop: Callback ``(transcript_path, sdk_session_id)`` invoked when
the SDK finishes processing — used to read the JSONL transcript
before the CLI process exits.
on_compact: Callback invoked when SDK starts compacting context.
Returns:
Hooks configuration dict for ClaudeAgentOptions
@@ -311,30 +307,6 @@ def create_security_hooks(
on_compact()
return cast(SyncHookJSONOutput, {})
# --- Stop hook: capture transcript path for stateless resume ---
async def stop_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Capture transcript path when SDK finishes processing.
The Stop hook fires while the CLI process is still alive, giving us
a reliable window to read the JSONL transcript before SIGTERM.
"""
_ = context, tool_use_id
transcript_path = cast(str, input_data.get("transcript_path", ""))
sdk_session_id = cast(str, input_data.get("session_id", ""))
if transcript_path and on_stop:
logger.info(
f"[SDK] Stop hook: transcript_path={transcript_path}, "
f"sdk_session_id={sdk_session_id[:12]}..."
)
on_stop(transcript_path, sdk_session_id)
return cast(SyncHookJSONOutput, {})
hooks: dict[str, Any] = {
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
@@ -344,9 +316,6 @@ def create_security_hooks(
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
}
if on_stop is not None:
hooks["Stop"] = [HookMatcher(matcher=None, hooks=[stop_hook])]
return hooks
except ImportError:
# Fallback for when SDK isn't available - return empty hooks

View File

@@ -12,7 +12,6 @@ import subprocess
import sys
import uuid
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Any, cast
import openai
@@ -21,6 +20,9 @@ from claude_agent_sdk import (
ClaudeAgentOptions,
ClaudeSDKClient,
ResultMessage,
TextBlock,
ThinkingBlock,
ToolResultBlock,
ToolUseBlock,
)
from langfuse import propagate_attributes
@@ -74,11 +76,11 @@ from .tool_adapter import (
from .transcript import (
cleanup_cli_project_dir,
download_transcript,
read_transcript_file,
upload_transcript,
validate_transcript,
write_transcript_to_tempfile,
)
from .transcript_builder import TranscriptBuilder
logger = logging.getLogger(__name__)
config = ChatConfig()
@@ -137,19 +139,6 @@ _setup_langfuse_otel()
_background_tasks: set[asyncio.Task[Any]] = set()
@dataclass
class CapturedTranscript:
"""Info captured by the SDK Stop hook for stateless --resume."""
path: str = ""
sdk_session_id: str = ""
raw_content: str = ""
@property
def available(self) -> bool:
return bool(self.path)
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
# Heartbeat interval — keep SSE alive through proxies/LBs during tool execution.
@@ -451,6 +440,43 @@ def _cleanup_sdk_tool_results(cwd: str) -> None:
pass
def _format_sdk_content_blocks(blocks: list) -> list[dict[str, Any]]:
"""Convert SDK content blocks to transcript format.
Handles TextBlock, ToolUseBlock, ToolResultBlock, and ThinkingBlock.
"""
result: list[dict[str, Any]] = []
for block in blocks or []:
if isinstance(block, TextBlock):
result.append({"type": "text", "text": block.text})
elif isinstance(block, ToolUseBlock):
result.append(
{
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
}
)
elif isinstance(block, ToolResultBlock):
result.append(
{
"type": "tool_result",
"tool_use_id": block.tool_use_id,
"content": block.content,
}
)
elif isinstance(block, ThinkingBlock):
result.append(
{
"type": "thinking",
"thinking": block.thinking,
"signature": block.signature,
}
)
return result
async def _compress_messages(
messages: list[ChatMessage],
) -> tuple[list[ChatMessage], bool]:
@@ -806,6 +832,11 @@ async def stream_chat_completion_sdk(
user_id=user_id, session_id=session_id, message_length=len(message)
)
# Structured log prefix: [SDK][<session>][T<turn>]
# Turn = number of user messages (1-based), computed AFTER appending the new message.
turn = sum(1 for m in session.messages if m.role == "user")
log_prefix = f"[SDK][{session_id[:12]}][T{turn}]"
session = await upsert_chat_session(session)
# Generate title for new sessions (first user message)
@@ -823,10 +854,11 @@ async def stream_chat_completion_sdk(
message_id = str(uuid.uuid4())
stream_id = str(uuid.uuid4())
stream_completed = False
ended_with_stream_error = False
e2b_sandbox = None
use_resume = False
resume_file: str | None = None
captured_transcript = CapturedTranscript()
transcript_builder = TranscriptBuilder()
sdk_cwd = ""
# Acquire stream lock to prevent concurrent streams to the same session
@@ -841,7 +873,7 @@ async def stream_chat_completion_sdk(
if lock_owner != stream_id:
# Another stream is active
logger.warning(
f"[SDK] Session {session_id} already has an active stream: {lock_owner}"
f"{log_prefix} Session already has an active stream: {lock_owner}"
)
yield StreamError(
errorText="Another stream is already active for this session. "
@@ -865,7 +897,7 @@ async def stream_chat_completion_sdk(
sdk_cwd = _make_sdk_cwd(session_id)
os.makedirs(sdk_cwd, exist_ok=True)
except (ValueError, OSError) as e:
logger.error("[SDK] [%s] Invalid SDK cwd: %s", session_id[:12], e)
logger.error("%s Invalid SDK cwd: %s", log_prefix, e)
yield StreamError(
errorText="Unable to initialize working directory.",
code="sdk_cwd_error",
@@ -909,12 +941,13 @@ async def stream_chat_completion_sdk(
):
return None
try:
return await download_transcript(user_id, session_id)
return await download_transcript(
user_id, session_id, log_prefix=log_prefix
)
except Exception as transcript_err:
logger.warning(
"[SDK] [%s] Transcript download failed, continuing without "
"--resume: %s",
session_id[:12],
"%s Transcript download failed, continuing without " "--resume: %s",
log_prefix,
transcript_err,
)
return None
@@ -936,11 +969,18 @@ async def stream_chat_completion_sdk(
transcript_msg_count = 0
if dl:
is_valid = validate_transcript(dl.content)
dl_lines = dl.content.strip().split("\n") if dl.content else []
logger.info(
"%s Downloaded transcript: %dB, %d lines, " "msg_count=%d, valid=%s",
log_prefix,
len(dl.content),
len(dl_lines),
dl.message_count,
is_valid,
)
if is_valid:
logger.info(
f"[SDK] Transcript available for session {session_id}: "
f"{len(dl.content)}B, msg_count={dl.message_count}"
)
# Load previous FULL context into builder
transcript_builder.load_previous(dl.content)
resume_file = write_transcript_to_tempfile(
dl.content, session_id, sdk_cwd
)
@@ -948,16 +988,14 @@ async def stream_chat_completion_sdk(
use_resume = True
transcript_msg_count = dl.message_count
logger.debug(
f"[SDK] Using --resume ({len(dl.content)}B, "
f"{log_prefix} Using --resume ({len(dl.content)}B, "
f"msg_count={transcript_msg_count})"
)
else:
logger.warning(
f"[SDK] Transcript downloaded but invalid for {session_id}"
)
logger.warning(f"{log_prefix} Transcript downloaded but invalid")
elif config.claude_agent_use_resume and user_id and len(session.messages) > 1:
logger.warning(
f"[SDK] No transcript available for {session_id} "
f"{log_prefix} No transcript available "
f"({len(session.messages)} messages in session)"
)
@@ -979,25 +1017,6 @@ async def stream_chat_completion_sdk(
sdk_model = _resolve_sdk_model()
# --- Transcript capture via Stop hook ---
# Read the file content immediately — the SDK may clean up
# the file before our finally block runs.
def _on_stop(transcript_path: str, sdk_session_id: str) -> None:
captured_transcript.path = transcript_path
captured_transcript.sdk_session_id = sdk_session_id
content = read_transcript_file(transcript_path)
if content:
captured_transcript.raw_content = content
logger.info(
f"[SDK] Stop hook: captured {len(content)}B from "
f"{transcript_path}"
)
else:
logger.warning(
f"[SDK] Stop hook: transcript file empty/missing at "
f"{transcript_path}"
)
# Track SDK-internal compaction (PreCompact hook → start, next msg → end)
compaction = CompactionTracker()
@@ -1005,7 +1024,6 @@ async def stream_chat_completion_sdk(
user_id,
sdk_cwd=sdk_cwd,
max_subtasks=config.claude_agent_max_subtasks,
on_stop=_on_stop if config.claude_agent_use_resume else None,
on_compact=compaction.on_compact,
)
@@ -1040,7 +1058,10 @@ async def stream_chat_completion_sdk(
session_id=session_id,
trace_name="copilot-sdk",
tags=["sdk"],
metadata={"resume": str(use_resume)},
metadata={
"resume": str(use_resume),
"conversation_turn": str(turn),
},
)
_otel_ctx.__enter__()
@@ -1074,9 +1095,9 @@ async def stream_chat_completion_sdk(
query_message = f"{query_message}\n\n{attachments.hint}"
logger.info(
"[SDK] [%s] Sending query — resume=%s, total_msgs=%d, "
"%s Sending query — resume=%s, total_msgs=%d, "
"query_len=%d, attached_files=%d, image_blocks=%d",
session_id[:12],
log_prefix,
use_resume,
len(session.messages),
len(query_message),
@@ -1105,8 +1126,12 @@ async def stream_chat_completion_sdk(
await client._transport.write( # noqa: SLF001
json.dumps(user_msg) + "\n"
)
# Capture user message in transcript (multimodal)
transcript_builder.add_user_message(content=content_blocks)
else:
await client.query(query_message, session_id=session_id)
# Capture user message in transcript (text only)
transcript_builder.add_user_message(content=query_message)
assistant_response = ChatMessage(role="assistant", content="")
accumulated_tool_calls: list[dict[str, Any]] = []
@@ -1150,8 +1175,8 @@ async def stream_chat_completion_sdk(
sdk_msg = done.pop().result()
except StopAsyncIteration:
logger.info(
"[SDK] [%s] Stream ended normally (StopAsyncIteration)",
session_id[:12],
"%s Stream ended normally (StopAsyncIteration)",
log_prefix,
)
break
except Exception as stream_err:
@@ -1160,8 +1185,8 @@ async def stream_chat_completion_sdk(
# so the session can still be saved and the
# frontend gets a clean finish.
logger.error(
"[SDK] [%s] Stream error from SDK: %s",
session_id[:12],
"%s Stream error from SDK: %s",
log_prefix,
stream_err,
exc_info=True,
)
@@ -1173,9 +1198,9 @@ async def stream_chat_completion_sdk(
break
logger.info(
"[SDK] [%s] Received: %s %s "
"%s Received: %s %s "
"(unresolved=%d, current=%d, resolved=%d)",
session_id[:12],
log_prefix,
type(sdk_msg).__name__,
getattr(sdk_msg, "subtype", ""),
len(adapter.current_tool_calls)
@@ -1184,6 +1209,15 @@ async def stream_chat_completion_sdk(
len(adapter.resolved_tool_calls),
)
# Capture AssistantMessage in transcript
if isinstance(sdk_msg, AssistantMessage):
content_blocks = _format_sdk_content_blocks(sdk_msg.content)
model_name = getattr(sdk_msg, "model", "")
transcript_builder.add_assistant_message(
content_blocks=content_blocks,
model=model_name,
)
# Race-condition fix: SDK hooks (PostToolUse) are
# executed asynchronously via start_soon() — the next
# message can arrive before the hook stashes output.
@@ -1210,10 +1244,10 @@ async def stream_chat_completion_sdk(
await asyncio.sleep(0)
else:
logger.warning(
"[SDK] [%s] Timed out waiting for "
"%s Timed out waiting for "
"PostToolUse hook stash "
"(%d unresolved tool calls)",
session_id[:12],
log_prefix,
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
)
@@ -1221,9 +1255,9 @@ async def stream_chat_completion_sdk(
# Log ResultMessage details for debugging
if isinstance(sdk_msg, ResultMessage):
logger.info(
"[SDK] [%s] Received: ResultMessage %s "
"%s Received: ResultMessage %s "
"(unresolved=%d, current=%d, resolved=%d)",
session_id[:12],
log_prefix,
sdk_msg.subtype,
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
@@ -1232,8 +1266,8 @@ async def stream_chat_completion_sdk(
)
if sdk_msg.subtype in ("error", "error_during_execution"):
logger.error(
"[SDK] [%s] SDK execution failed with error: %s",
session_id[:12],
"%s SDK execution failed with error: %s",
log_prefix,
sdk_msg.result or "(no error message provided)",
)
@@ -1258,8 +1292,8 @@ async def stream_chat_completion_sdk(
out_len = len(str(response.output))
extra = f", output_len={out_len}"
logger.info(
"[SDK] [%s] Tool event: %s, tool=%s%s",
session_id[:12],
"%s Tool event: %s, tool=%s%s",
log_prefix,
type(response).__name__,
getattr(response, "toolName", "N/A"),
extra,
@@ -1268,8 +1302,8 @@ async def stream_chat_completion_sdk(
# Log errors being sent to frontend
if isinstance(response, StreamError):
logger.error(
"[SDK] [%s] Sending error to frontend: %s (code=%s)",
session_id[:12],
"%s Sending error to frontend: %s (code=%s)",
log_prefix,
response.errorText,
response.code,
)
@@ -1335,8 +1369,8 @@ async def stream_chat_completion_sdk(
# server shutdown). Log and let the safety-net / finally
# blocks handle cleanup.
logger.warning(
"[SDK] [%s] Streaming loop cancelled (asyncio.CancelledError)",
session_id[:12],
"%s Streaming loop cancelled (asyncio.CancelledError)",
log_prefix,
)
raise
finally:
@@ -1350,7 +1384,8 @@ async def stream_chat_completion_sdk(
except (asyncio.CancelledError, StopAsyncIteration):
# Expected: task was cancelled or exhausted during cleanup
logger.info(
"[SDK] Pending __anext__ task completed during cleanup"
"%s Pending __anext__ task completed during cleanup",
log_prefix,
)
# Safety net: if tools are still unresolved after the
@@ -1359,9 +1394,9 @@ async def stream_chat_completion_sdk(
# them now so the frontend stops showing spinners.
if adapter.has_unresolved_tool_calls:
logger.warning(
"[SDK] [%s] %d unresolved tool(s) after stream loop — "
"%s %d unresolved tool(s) after stream loop — "
"flushing as safety net",
session_id[:12],
log_prefix,
len(adapter.current_tool_calls) - len(adapter.resolved_tool_calls),
)
safety_responses: list[StreamBaseResponse] = []
@@ -1372,8 +1407,8 @@ async def stream_chat_completion_sdk(
(StreamToolInputAvailable, StreamToolOutputAvailable),
):
logger.info(
"[SDK] [%s] Safety flush: %s, tool=%s",
session_id[:12],
"%s Safety flush: %s, tool=%s",
log_prefix,
type(response).__name__,
getattr(response, "toolName", "N/A"),
)
@@ -1386,8 +1421,8 @@ async def stream_chat_completion_sdk(
# StreamFinish is published by mark_session_completed in the processor.
if not stream_completed and not ended_with_stream_error:
logger.info(
"[SDK] [%s] Stream ended without ResultMessage (stopped by user)",
session_id[:12],
"%s Stream ended without ResultMessage (stopped by user)",
log_prefix,
)
closing_responses: list[StreamBaseResponse] = []
adapter._end_text_if_open(closing_responses)
@@ -1408,69 +1443,36 @@ async def stream_chat_completion_sdk(
) and not has_appended_assistant:
session.messages.append(assistant_response)
# --- Upload transcript for next-turn --resume ---
# After async with the SDK task group has exited, so the Stop
# hook has already fired and the CLI has been SIGTERMed. The
# CLI uses appendFileSync, so all writes are safely on disk.
if config.claude_agent_use_resume and user_id:
# With --resume the CLI appends to the resume file (most
# complete). Otherwise use the Stop hook path.
if use_resume and resume_file:
raw_transcript = read_transcript_file(resume_file)
logger.debug("[SDK] Transcript source: resume file")
elif captured_transcript.path:
raw_transcript = read_transcript_file(captured_transcript.path)
logger.debug(
"[SDK] Transcript source: stop hook (%s), read result: %s",
captured_transcript.path,
f"{len(raw_transcript)}B" if raw_transcript else "None",
)
else:
raw_transcript = None
# Transcript upload is handled exclusively in the finally block
# to avoid double-uploads (the success path used to upload the
# old resume file, then the finally block overwrote it with the
# stop hook content — which could be smaller after compaction).
if not raw_transcript:
logger.debug(
"[SDK] No usable transcript — CLI file had no "
"conversation entries (expected for first turn "
"without --resume)"
)
if raw_transcript:
# Shield the upload from generator cancellation so a
# client disconnect / page refresh doesn't lose the
# transcript. The upload must finish even if the SSE
# connection is torn down.
await asyncio.shield(
_try_upload_transcript(
user_id,
session_id,
raw_transcript,
message_count=len(session.messages),
)
)
logger.info(
"[SDK] [%s] Stream completed successfully with %d messages",
session_id[:12],
len(session.messages),
)
if ended_with_stream_error:
logger.warning(
"%s Stream ended with SDK error after %d messages",
log_prefix,
len(session.messages),
)
else:
logger.info(
"%s Stream completed successfully with %d messages",
log_prefix,
len(session.messages),
)
except BaseException as e:
# Catch BaseException to handle both Exception and CancelledError
# (CancelledError inherits from BaseException in Python 3.8+)
if isinstance(e, asyncio.CancelledError):
logger.warning("[SDK] [%s] Session cancelled", session_id[:12])
logger.warning("%s Session cancelled", log_prefix)
error_msg = "Operation cancelled"
else:
error_msg = str(e) or type(e).__name__
# SDK cleanup RuntimeError is expected during cancellation, log as warning
if isinstance(e, RuntimeError) and "cancel scope" in str(e):
logger.warning(
"[SDK] [%s] SDK cleanup error: %s", session_id[:12], error_msg
)
logger.warning("%s SDK cleanup error: %s", log_prefix, error_msg)
else:
logger.error(
f"[SDK] [%s] Error: {error_msg}", session_id[:12], exc_info=True
)
logger.error("%s Error: %s", log_prefix, error_msg, exc_info=True)
# Append error marker to session (non-invasive text parsing approach)
# The finally block will persist the session with this error marker
@@ -1481,8 +1483,8 @@ async def stream_chat_completion_sdk(
)
)
logger.debug(
"[SDK] [%s] Appended error marker, will be persisted in finally",
session_id[:12],
"%s Appended error marker, will be persisted in finally",
log_prefix,
)
# Yield StreamError for immediate feedback (only for non-cancellation errors)
@@ -1514,47 +1516,72 @@ async def stream_chat_completion_sdk(
try:
await asyncio.shield(upsert_chat_session(session))
logger.info(
"[SDK] [%s] Session persisted in finally with %d messages",
session_id[:12],
"%s Session persisted in finally with %d messages",
log_prefix,
len(session.messages),
)
except Exception as persist_err:
logger.error(
"[SDK] [%s] Failed to persist session in finally: %s",
session_id[:12],
"%s Failed to persist session in finally: %s",
log_prefix,
persist_err,
exc_info=True,
)
# --- Upload transcript for next-turn --resume ---
# This MUST run in finally so the transcript is uploaded even when
# the streaming loop raises an exception. The CLI uses
# appendFileSync, so whatever was written before the error/SIGTERM
# is safely on disk and still useful for the next turn.
if config.claude_agent_use_resume and user_id:
# the streaming loop raises an exception.
# The transcript represents the COMPLETE active context (atomic).
if config.claude_agent_use_resume and user_id and session is not None:
try:
# Prefer content captured in the Stop hook (read before
# cleanup removes the file). Fall back to the resume
# file when the stop hook didn't fire (e.g. error before
# completion) so we don't lose the prior transcript.
raw_transcript = captured_transcript.raw_content or None
if not raw_transcript and use_resume and resume_file:
raw_transcript = read_transcript_file(resume_file)
# Build complete transcript from captured SDK messages
transcript_content = transcript_builder.to_jsonl()
if raw_transcript and session is not None:
await asyncio.shield(
_try_upload_transcript(
user_id,
session_id,
raw_transcript,
message_count=len(session.messages),
)
if not transcript_content:
logger.warning(
"%s No transcript to upload (builder empty)", log_prefix
)
elif not validate_transcript(transcript_content):
logger.warning(
"%s Transcript invalid, skipping upload (entries=%d)",
log_prefix,
transcript_builder.entry_count,
)
else:
logger.warning(f"[SDK] No transcript to upload for {session_id}")
logger.info(
"%s Uploading complete transcript (entries=%d, bytes=%d)",
log_prefix,
transcript_builder.entry_count,
len(transcript_content),
)
# Create task first so we have a reference if timeout occurs
upload_task = asyncio.create_task(
upload_transcript(
user_id=user_id,
session_id=session_id,
content=transcript_content,
message_count=len(session.messages),
log_prefix=log_prefix,
)
)
try:
async with asyncio.timeout(30):
await asyncio.shield(upload_task)
except TimeoutError:
# Timeout fired but shield keeps upload running - track the
# SAME task to prevent garbage collection (no double upload)
logger.warning(
"%s Transcript upload exceeded 30s timeout, "
"continuing in background",
log_prefix,
)
_background_tasks.add(upload_task)
upload_task.add_done_callback(_background_tasks.discard)
except Exception as upload_err:
logger.error(
f"[SDK] Transcript upload failed in finally: {upload_err}",
"%s Transcript upload failed in finally: %s",
log_prefix,
upload_err,
exc_info=True,
)
@@ -1565,33 +1592,6 @@ async def stream_chat_completion_sdk(
await lock.release()
async def _try_upload_transcript(
user_id: str,
session_id: str,
raw_content: str,
message_count: int = 0,
) -> bool:
"""Strip progress entries and upload transcript (with timeout).
Returns True if the upload completed without error.
"""
try:
async with asyncio.timeout(30):
await upload_transcript(
user_id, session_id, raw_content, message_count=message_count
)
return True
except asyncio.TimeoutError:
logger.warning(f"[SDK] Transcript upload timed out for {session_id}")
return False
except Exception as e:
logger.error(
f"[SDK] Failed to upload transcript for {session_id}: {e}",
exc_info=True,
)
return False
async def _update_title_async(
session_id: str, message: str, user_id: str | None = None
) -> None:

View File

@@ -58,41 +58,40 @@ def strip_progress_entries(content: str) -> str:
Removes entries whose ``type`` is in ``STRIPPABLE_TYPES`` and reparents
any remaining child entries so the ``parentUuid`` chain stays intact.
Typically reduces transcript size by ~30%.
Entries that are not stripped or reparented are kept as their original
raw JSON line to avoid unnecessary re-serialization that changes
whitespace or key ordering.
"""
lines = content.strip().split("\n")
entries: list[dict] = []
# Parse entries, keeping the original line alongside the parsed dict.
parsed: list[tuple[str, dict | None]] = []
for line in lines:
try:
entries.append(json.loads(line))
parsed.append((line, json.loads(line)))
except json.JSONDecodeError:
# Keep unparseable lines as-is (safety)
entries.append({"_raw": line})
parsed.append((line, None))
# First pass: identify stripped UUIDs and build parent map.
stripped_uuids: set[str] = set()
uuid_to_parent: dict[str, str] = {}
kept: list[dict] = []
for entry in entries:
if "_raw" in entry:
kept.append(entry)
for _line, entry in parsed:
if entry is None:
continue
uid = entry.get("uuid", "")
parent = entry.get("parentUuid", "")
entry_type = entry.get("type", "")
if uid:
uuid_to_parent[uid] = parent
if entry.get("type", "") in STRIPPABLE_TYPES and uid:
stripped_uuids.add(uid)
if entry_type in STRIPPABLE_TYPES:
if uid:
stripped_uuids.add(uid)
else:
kept.append(entry)
# Reparent: walk up chain through stripped entries to find surviving ancestor
for entry in kept:
if "_raw" in entry:
# Second pass: keep non-stripped entries, reparenting where needed.
# Preserve original line when no reparenting is required.
reparented: set[str] = set()
for _line, entry in parsed:
if entry is None:
continue
parent = entry.get("parentUuid", "")
original_parent = parent
@@ -100,63 +99,32 @@ def strip_progress_entries(content: str) -> str:
parent = uuid_to_parent.get(parent, "")
if parent != original_parent:
entry["parentUuid"] = parent
uid = entry.get("uuid", "")
if uid:
reparented.add(uid)
result_lines: list[str] = []
for entry in kept:
if "_raw" in entry:
result_lines.append(entry["_raw"])
else:
for line, entry in parsed:
if entry is None:
result_lines.append(line)
continue
if entry.get("type", "") in STRIPPABLE_TYPES:
continue
uid = entry.get("uuid", "")
if uid in reparented:
# Re-serialize only entries whose parentUuid was changed.
result_lines.append(json.dumps(entry, separators=(",", ":")))
else:
result_lines.append(line)
return "\n".join(result_lines) + "\n"
# ---------------------------------------------------------------------------
# Local file I/O (read from CLI's JSONL, write temp file for --resume)
# Local file I/O (write temp file for --resume)
# ---------------------------------------------------------------------------
def read_transcript_file(transcript_path: str) -> str | None:
"""Read a JSONL transcript file from disk.
Returns the raw JSONL content, or ``None`` if the file is missing, empty,
or only contains metadata (≤2 lines with no conversation messages).
"""
if not transcript_path or not os.path.isfile(transcript_path):
logger.debug(f"[Transcript] File not found: {transcript_path}")
return None
try:
with open(transcript_path) as f:
content = f.read()
if not content.strip():
logger.debug("[Transcript] File is empty: %s", transcript_path)
return None
lines = content.strip().split("\n")
# Validate that the transcript has real conversation content
# (not just metadata like queue-operation entries).
if not validate_transcript(content):
logger.debug(
"[Transcript] No conversation content (%d lines) in %s",
len(lines),
transcript_path,
)
return None
logger.info(
f"[Transcript] Read {len(lines)} lines, "
f"{len(content)} bytes from {transcript_path}"
)
return content
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"[Transcript] Failed to read {transcript_path}: {e}")
return None
def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
"""Sanitize an ID for safe use in file paths.
@@ -171,14 +139,6 @@ def _sanitize_id(raw_id: str, max_len: int = 36) -> str:
_SAFE_CWD_PREFIX = os.path.realpath("/tmp/copilot-")
def _encode_cwd_for_cli(cwd: str) -> str:
"""Encode a working directory path the same way the Claude CLI does.
The CLI replaces all non-alphanumeric characters with ``-``.
"""
return re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(cwd))
def cleanup_cli_project_dir(sdk_cwd: str) -> None:
"""Remove the CLI's project directory for a specific working directory.
@@ -188,7 +148,8 @@ def cleanup_cli_project_dir(sdk_cwd: str) -> None:
"""
import shutil
cwd_encoded = _encode_cwd_for_cli(sdk_cwd)
# Encode cwd the same way CLI does (replaces non-alphanumeric with -)
cwd_encoded = re.sub(r"[^a-zA-Z0-9]", "-", os.path.realpath(sdk_cwd))
config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.expanduser("~/.claude")
projects_base = os.path.realpath(os.path.join(config_dir, "projects"))
project_dir = os.path.realpath(os.path.join(projects_base, cwd_encoded))
@@ -248,32 +209,30 @@ def write_transcript_to_tempfile(
def validate_transcript(content: str | None) -> bool:
"""Check that a transcript has actual conversation messages.
A valid transcript for resume needs at least one user message and one
assistant message (not just queue-operation / file-history-snapshot
metadata).
A valid transcript needs at least one assistant message (not just
queue-operation / file-history-snapshot metadata). We do NOT require
a ``type: "user"`` entry because with ``--resume`` the user's message
is passed as a CLI query parameter and does not appear in the
transcript file.
"""
if not content or not content.strip():
return False
lines = content.strip().split("\n")
if len(lines) < 2:
return False
has_user = False
has_assistant = False
for line in lines:
if not line.strip():
continue
try:
entry = json.loads(line)
msg_type = entry.get("type")
if msg_type == "user":
has_user = True
elif msg_type == "assistant":
if entry.get("type") == "assistant":
has_assistant = True
except json.JSONDecodeError:
return False
return has_user and has_assistant
return has_assistant
# ---------------------------------------------------------------------------
@@ -328,26 +287,43 @@ async def upload_transcript(
session_id: str,
content: str,
message_count: int = 0,
log_prefix: str = "[Transcript]",
) -> None:
"""Strip progress entries and upload transcript to bucket storage.
"""Strip progress entries and upload complete transcript.
The transcript represents the FULL active context (atomic).
Each upload REPLACES the previous transcript entirely.
The executor holds a cluster lock per session, so concurrent uploads for
the same session cannot happen. We always overwrite — with ``--resume``
the CLI may compact old tool results, so neither byte size nor line count
is a reliable proxy for "newer".
the same session cannot happen.
Args:
message_count: ``len(session.messages)`` at upload time — used by
the next turn to detect staleness and compress only the gap.
content: Complete JSONL transcript (from TranscriptBuilder).
message_count: ``len(session.messages)`` at upload time.
"""
from backend.util.workspace_storage import get_workspace_storage
# Strip metadata entries (progress, file-history-snapshot, etc.)
# Note: SDK-built transcripts shouldn't have these, but strip for safety
stripped = strip_progress_entries(content)
if not validate_transcript(stripped):
# Log entry types for debugging — helps identify why validation failed
entry_types: list[str] = []
for line in stripped.strip().split("\n"):
try:
entry_types.append(json.loads(line).get("type", "?"))
except json.JSONDecodeError:
entry_types.append("INVALID_JSON")
logger.warning(
f"[Transcript] Skipping upload — stripped content not valid "
f"for session {session_id}"
"%s Skipping upload — stripped content not valid "
"(types=%s, stripped_len=%d, raw_len=%d)",
log_prefix,
entry_types,
len(stripped),
len(content),
)
logger.debug("%s Raw content preview: %s", log_prefix, content[:500])
logger.debug("%s Stripped content: %s", log_prefix, stripped[:500])
return
storage = await get_workspace_storage()
@@ -373,17 +349,18 @@ async def upload_transcript(
content=json.dumps(meta).encode("utf-8"),
)
except Exception as e:
logger.warning(f"[Transcript] Failed to write metadata for {session_id}: {e}")
logger.warning(f"{log_prefix} Failed to write metadata: {e}")
logger.info(
f"[Transcript] Uploaded {len(encoded)}B "
f"(stripped from {len(content)}B, msg_count={message_count}) "
f"for session {session_id}"
f"{log_prefix} Uploaded {len(encoded)}B "
f"(stripped from {len(content)}B, msg_count={message_count})"
)
async def download_transcript(
user_id: str, session_id: str
user_id: str,
session_id: str,
log_prefix: str = "[Transcript]",
) -> TranscriptDownload | None:
"""Download transcript and metadata from bucket storage.
@@ -399,10 +376,10 @@ async def download_transcript(
data = await storage.retrieve(path)
content = data.decode("utf-8")
except FileNotFoundError:
logger.debug(f"[Transcript] No transcript in storage for {session_id}")
logger.debug(f"{log_prefix} No transcript in storage")
return None
except Exception as e:
logger.warning(f"[Transcript] Failed to download transcript: {e}")
logger.warning(f"{log_prefix} Failed to download transcript: {e}")
return None
# Try to load metadata (best-effort — old transcripts won't have it)
@@ -425,10 +402,7 @@ async def download_transcript(
except (FileNotFoundError, json.JSONDecodeError, Exception):
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
logger.info(
f"[Transcript] Downloaded {len(content)}B "
f"(msg_count={message_count}) for session {session_id}"
)
logger.info(f"{log_prefix} Downloaded {len(content)}B (msg_count={message_count})")
return TranscriptDownload(
content=content,
message_count=message_count,

View File

@@ -0,0 +1,140 @@
"""Build complete JSONL transcript from SDK messages.
The transcript represents the FULL active context at any point in time.
Each upload REPLACES the previous transcript atomically.
Flow:
Turn 1: Upload [msg1, msg2]
Turn 2: Download [msg1, msg2] → Upload [msg1, msg2, msg3, msg4] (REPLACE)
Turn 3: Download [msg1, msg2, msg3, msg4] → Upload [all messages] (REPLACE)
The transcript is never incremental - always the complete atomic state.
"""
import json
import logging
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
logger = logging.getLogger(__name__)
class TranscriptEntry(BaseModel):
"""Single transcript entry (user or assistant turn)."""
type: str
uuid: str
parentUuid: str | None
message: dict[str, Any]
class TranscriptBuilder:
"""Build complete JSONL transcript from SDK messages.
This builder maintains the FULL conversation state, not incremental changes.
The output is always the complete active context.
"""
def __init__(self) -> None:
self._entries: list[TranscriptEntry] = []
self._last_uuid: str | None = None
def load_previous(self, content: str) -> None:
"""Load complete previous transcript.
This loads the FULL previous context. As new messages come in,
we append to this state. The final output is the complete context
(previous + new), not just the delta.
"""
if not content or not content.strip():
return
for line in content.strip().split("\n"):
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError:
logger.warning("Failed to parse transcript line: %s", line[:100])
continue
# Only load conversation messages (user/assistant)
# Skip metadata entries
if data.get("type") not in ("user", "assistant"):
continue
entry = TranscriptEntry(
type=data["type"],
uuid=data.get("uuid") or str(uuid4()),
parentUuid=data.get("parentUuid"),
message=data.get("message", {}),
)
self._entries.append(entry)
self._last_uuid = entry.uuid
logger.info(
"Loaded %d entries from previous transcript (last_uuid=%s)",
len(self._entries),
self._last_uuid[:12] if self._last_uuid else None,
)
def add_user_message(
self, content: str | list[dict], uuid: str | None = None
) -> None:
"""Add user message to the complete context."""
msg_uuid = uuid or str(uuid4())
self._entries.append(
TranscriptEntry(
type="user",
uuid=msg_uuid,
parentUuid=self._last_uuid,
message={"role": "user", "content": content},
)
)
self._last_uuid = msg_uuid
def add_assistant_message(
self, content_blocks: list[dict], model: str = ""
) -> None:
"""Add assistant message to the complete context."""
msg_uuid = str(uuid4())
self._entries.append(
TranscriptEntry(
type="assistant",
uuid=msg_uuid,
parentUuid=self._last_uuid,
message={
"role": "assistant",
"model": model,
"content": content_blocks,
},
)
)
self._last_uuid = msg_uuid
def to_jsonl(self) -> str:
"""Export complete context as JSONL.
Returns the FULL conversation state (all entries), not incremental.
This output REPLACES any previous transcript.
"""
if not self._entries:
return ""
lines = [entry.model_dump_json(exclude_none=True) for entry in self._entries]
return "\n".join(lines) + "\n"
@property
def entry_count(self) -> int:
"""Total number of entries in the complete context."""
return len(self._entries)
@property
def is_empty(self) -> bool:
"""Whether this builder has any entries."""
return len(self._entries) == 0

View File

@@ -5,7 +5,6 @@ import os
from .transcript import (
STRIPPABLE_TYPES,
read_transcript_file,
strip_progress_entries,
validate_transcript,
write_transcript_to_tempfile,
@@ -38,49 +37,6 @@ PROGRESS_ENTRY = {
VALID_TRANSCRIPT = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG, ASST_MSG)
# --- read_transcript_file ---
class TestReadTranscriptFile:
def test_returns_content_for_valid_file(self, tmp_path):
path = tmp_path / "session.jsonl"
path.write_text(VALID_TRANSCRIPT)
result = read_transcript_file(str(path))
assert result is not None
assert "user" in result
def test_returns_none_for_missing_file(self):
assert read_transcript_file("/nonexistent/path.jsonl") is None
def test_returns_none_for_empty_path(self):
assert read_transcript_file("") is None
def test_returns_none_for_empty_file(self, tmp_path):
path = tmp_path / "empty.jsonl"
path.write_text("")
assert read_transcript_file(str(path)) is None
def test_returns_none_for_metadata_only(self, tmp_path):
content = _make_jsonl(METADATA_LINE, FILE_HISTORY)
path = tmp_path / "meta.jsonl"
path.write_text(content)
assert read_transcript_file(str(path)) is None
def test_returns_none_for_invalid_json(self, tmp_path):
path = tmp_path / "bad.jsonl"
path.write_text("not json\n{}\n{}\n")
assert read_transcript_file(str(path)) is None
def test_no_size_limit(self, tmp_path):
"""Large files are accepted — bucket storage has no size limit."""
big_content = {"type": "user", "uuid": "u9", "data": "x" * 1_000_000}
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG)
path = tmp_path / "big.jsonl"
path.write_text(content)
result = read_transcript_file(str(path))
assert result is not None
# --- write_transcript_to_tempfile ---
@@ -155,12 +111,56 @@ class TestValidateTranscript:
assert validate_transcript(content) is False
def test_assistant_only_no_user(self):
"""With --resume the user message is a CLI query param, not a transcript entry.
A transcript with only assistant entries is valid."""
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, ASST_MSG)
assert validate_transcript(content) is False
assert validate_transcript(content) is True
def test_resume_transcript_without_user_entry(self):
"""Simulates a real --resume stop hook transcript: the CLI session file
has summary + assistant entries but no user entry."""
summary = {"type": "summary", "uuid": "s1", "text": "context..."}
asst1 = {
"type": "assistant",
"uuid": "a1",
"message": {"role": "assistant", "content": "Hello!"},
}
asst2 = {
"type": "assistant",
"uuid": "a2",
"parentUuid": "a1",
"message": {"role": "assistant", "content": "Sure, let me help."},
}
content = _make_jsonl(summary, asst1, asst2)
assert validate_transcript(content) is True
def test_single_assistant_entry(self):
"""A transcript with just one assistant line is valid — the CLI may
produce short transcripts for simple responses with no tool use."""
content = json.dumps(ASST_MSG) + "\n"
assert validate_transcript(content) is True
def test_invalid_json_returns_false(self):
assert validate_transcript("not json\n{}\n{}\n") is False
def test_malformed_json_after_valid_assistant_returns_false(self):
"""Validation must scan all lines - malformed JSON anywhere should fail."""
valid_asst = json.dumps(ASST_MSG)
malformed = "not valid json"
content = valid_asst + "\n" + malformed + "\n"
assert validate_transcript(content) is False
def test_blank_lines_are_skipped(self):
"""Transcripts with blank lines should be valid if they contain assistant entries."""
content = (
json.dumps(USER_MSG)
+ "\n\n" # blank line
+ json.dumps(ASST_MSG)
+ "\n"
+ "\n" # another blank line
)
assert validate_transcript(content) is True
# --- strip_progress_entries ---
@@ -253,3 +253,32 @@ class TestStripProgressEntries:
assert "queue-operation" not in result_types
assert "user" in result_types
assert "assistant" in result_types
def test_preserves_original_line_formatting(self):
"""Non-reparented entries keep their original JSON formatting."""
# Use pretty-printed JSON with spaces (as the CLI produces)
original_line = json.dumps(USER_MSG) # default formatting with spaces
compact_line = json.dumps(USER_MSG, separators=(",", ":"))
assert original_line != compact_line # precondition
content = original_line + "\n" + json.dumps(ASST_MSG) + "\n"
result = strip_progress_entries(content)
result_lines = result.strip().split("\n")
# Original line should be byte-identical (not re-serialized)
assert result_lines[0] == original_line
def test_reparented_entries_are_reserialized(self):
"""Entries whose parentUuid changes must be re-serialized."""
progress = {"type": "progress", "uuid": "p1", "parentUuid": "u1"}
asst = {
"type": "assistant",
"uuid": "a1",
"parentUuid": "p1",
"message": {"role": "assistant", "content": "done"},
}
content = _make_jsonl(USER_MSG, progress, asst)
result = strip_progress_entries(content)
lines = result.strip().split("\n")
asst_entry = json.loads(lines[-1])
assert asst_entry["parentUuid"] == "u1" # reparented