mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
fix(backend/copilot): merge synthetic transcript entries with real assistant content
When using --resume, the CLI creates a new session and writes synthetic placeholders (model: "<synthetic>", "No response requested.") for all previous assistant turns. This caused the copilot to "forget" its own answers across turns. Changes: - Wire up merge_with_previous_transcript in the upload pipeline: the downloaded transcript from the start of the turn is passed through to upload_transcript, which restores real assistant content before stripping and uploading. - Refactor strip_progress_entries to preserve original JSON line formatting for entries that don't need reparenting, avoiding unnecessary re-serialization. - Add structured log_prefix ([SDK][session][turn]) across all SDK and transcript log lines for better debugging. - Add tests for merge logic and line-preservation behavior.
This commit is contained in:
@@ -777,6 +777,11 @@ async def stream_chat_completion_sdk(
|
||||
# Type narrowing: session is guaranteed ChatSession after the check above
|
||||
session = cast(ChatSession, session)
|
||||
|
||||
# Structured log prefix: [SDK][<session>][T<turn>]
|
||||
# Turn = number of user messages so far (1-based).
|
||||
turn = sum(1 for m in session.messages if m.role == "user")
|
||||
log_prefix = f"[SDK][{session_id[:12]}][T{turn}]"
|
||||
|
||||
# Clean up stale error markers from previous turn before starting new turn
|
||||
# If the last message contains an error marker, remove it (user is retrying)
|
||||
if (
|
||||
@@ -786,8 +791,8 @@ async def stream_chat_completion_sdk(
|
||||
and COPILOT_ERROR_PREFIX in session.messages[-1].content
|
||||
):
|
||||
logger.info(
|
||||
"[SDK] [%s] Removing stale error marker from previous turn",
|
||||
session_id[:12],
|
||||
"%s Removing stale error marker from previous turn",
|
||||
log_prefix,
|
||||
)
|
||||
session.messages.pop()
|
||||
|
||||
@@ -827,6 +832,7 @@ async def stream_chat_completion_sdk(
|
||||
use_resume = False
|
||||
resume_file: str | None = None
|
||||
captured_transcript = CapturedTranscript()
|
||||
previous_transcript_content: str | None = None
|
||||
sdk_cwd = ""
|
||||
|
||||
# Acquire stream lock to prevent concurrent streams to the same session
|
||||
@@ -841,7 +847,7 @@ async def stream_chat_completion_sdk(
|
||||
if lock_owner != stream_id:
|
||||
# Another stream is active
|
||||
logger.warning(
|
||||
f"[SDK] Session {session_id} already has an active stream: {lock_owner}"
|
||||
f"{log_prefix} Session already has an active stream: {lock_owner}"
|
||||
)
|
||||
yield StreamError(
|
||||
errorText="Another stream is already active for this session. "
|
||||
@@ -865,7 +871,7 @@ async def stream_chat_completion_sdk(
|
||||
sdk_cwd = _make_sdk_cwd(session_id)
|
||||
os.makedirs(sdk_cwd, exist_ok=True)
|
||||
except (ValueError, OSError) as e:
|
||||
logger.error("[SDK] [%s] Invalid SDK cwd: %s", session_id[:12], e)
|
||||
logger.error("%s Invalid SDK cwd: %s", log_prefix, e)
|
||||
yield StreamError(
|
||||
errorText="Unable to initialize working directory.",
|
||||
code="sdk_cwd_error",
|
||||
@@ -909,12 +915,13 @@ async def stream_chat_completion_sdk(
|
||||
):
|
||||
return None
|
||||
try:
|
||||
return await download_transcript(user_id, session_id)
|
||||
return await download_transcript(
|
||||
user_id, session_id, log_prefix=log_prefix
|
||||
)
|
||||
except Exception as transcript_err:
|
||||
logger.warning(
|
||||
"[SDK] [%s] Transcript download failed, continuing without "
|
||||
"--resume: %s",
|
||||
session_id[:12],
|
||||
"%s Transcript download failed, continuing without " "--resume: %s",
|
||||
log_prefix,
|
||||
transcript_err,
|
||||
)
|
||||
return None
|
||||
@@ -936,11 +943,17 @@ async def stream_chat_completion_sdk(
|
||||
transcript_msg_count = 0
|
||||
if dl:
|
||||
is_valid = validate_transcript(dl.content)
|
||||
dl_lines = dl.content.strip().split("\n") if dl.content else []
|
||||
logger.info(
|
||||
"%s Downloaded transcript: %dB, %d lines, " "msg_count=%d, valid=%s",
|
||||
log_prefix,
|
||||
len(dl.content),
|
||||
len(dl_lines),
|
||||
dl.message_count,
|
||||
is_valid,
|
||||
)
|
||||
if is_valid:
|
||||
logger.info(
|
||||
f"[SDK] Transcript available for session {session_id}: "
|
||||
f"{len(dl.content)}B, msg_count={dl.message_count}"
|
||||
)
|
||||
previous_transcript_content = dl.content
|
||||
resume_file = write_transcript_to_tempfile(
|
||||
dl.content, session_id, sdk_cwd
|
||||
)
|
||||
@@ -948,16 +961,14 @@ async def stream_chat_completion_sdk(
|
||||
use_resume = True
|
||||
transcript_msg_count = dl.message_count
|
||||
logger.debug(
|
||||
f"[SDK] Using --resume ({len(dl.content)}B, "
|
||||
f"{log_prefix} Using --resume ({len(dl.content)}B, "
|
||||
f"msg_count={transcript_msg_count})"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"[SDK] Transcript downloaded but invalid for {session_id}"
|
||||
)
|
||||
logger.warning(f"{log_prefix} Transcript downloaded but invalid")
|
||||
elif config.claude_agent_use_resume and user_id and len(session.messages) > 1:
|
||||
logger.warning(
|
||||
f"[SDK] No transcript available for {session_id} "
|
||||
f"{log_prefix} No transcript available "
|
||||
f"({len(session.messages)} messages in session)"
|
||||
)
|
||||
|
||||
@@ -985,18 +996,41 @@ async def stream_chat_completion_sdk(
|
||||
def _on_stop(transcript_path: str, sdk_session_id: str) -> None:
|
||||
captured_transcript.path = transcript_path
|
||||
captured_transcript.sdk_session_id = sdk_session_id
|
||||
content = read_transcript_file(transcript_path)
|
||||
if content:
|
||||
captured_transcript.raw_content = content
|
||||
logger.info(
|
||||
f"[SDK] Stop hook: captured {len(content)}B from "
|
||||
f"{transcript_path}"
|
||||
)
|
||||
else:
|
||||
|
||||
# Read the raw file directly — do NOT call read_transcript_file()
|
||||
# here. That function runs validate_transcript() which rejects
|
||||
# transcripts with < 2 lines or no assistant entries. For very
|
||||
# short responses (single-turn, no tool use) the CLI transcript
|
||||
# can have just 1 JSONL line, causing the stop hook to "miss" it.
|
||||
# Validation is deferred to upload_transcript() instead.
|
||||
if not transcript_path or not os.path.isfile(transcript_path):
|
||||
logger.warning(
|
||||
f"[SDK] Stop hook: transcript file empty/missing at "
|
||||
f"{transcript_path}"
|
||||
"%s Stop hook: file not found: %s", log_prefix, transcript_path
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
with open(transcript_path) as f:
|
||||
content = f.read()
|
||||
except OSError as e:
|
||||
logger.warning("%s Stop hook: read error: %s", log_prefix, e)
|
||||
return
|
||||
|
||||
if not content or not content.strip():
|
||||
logger.warning(
|
||||
"%s Stop hook: file empty at %s", log_prefix, transcript_path
|
||||
)
|
||||
return
|
||||
|
||||
captured_transcript.raw_content = content
|
||||
lines = content.strip().split("\n")
|
||||
logger.info(
|
||||
"%s Stop hook: captured %dB, %d lines from %s",
|
||||
log_prefix,
|
||||
len(content),
|
||||
len(lines),
|
||||
transcript_path,
|
||||
)
|
||||
|
||||
# Track SDK-internal compaction (PreCompact hook → start, next msg → end)
|
||||
compaction = CompactionTracker()
|
||||
@@ -1074,9 +1108,9 @@ async def stream_chat_completion_sdk(
|
||||
query_message = f"{query_message}\n\n{attachments.hint}"
|
||||
|
||||
logger.info(
|
||||
"[SDK] [%s] Sending query — resume=%s, total_msgs=%d, "
|
||||
"%s Sending query — resume=%s, total_msgs=%d, "
|
||||
"query_len=%d, attached_files=%d, image_blocks=%d",
|
||||
session_id[:12],
|
||||
log_prefix,
|
||||
use_resume,
|
||||
len(session.messages),
|
||||
len(query_message),
|
||||
@@ -1150,8 +1184,8 @@ async def stream_chat_completion_sdk(
|
||||
sdk_msg = done.pop().result()
|
||||
except StopAsyncIteration:
|
||||
logger.info(
|
||||
"[SDK] [%s] Stream ended normally (StopAsyncIteration)",
|
||||
session_id[:12],
|
||||
"%s Stream ended normally (StopAsyncIteration)",
|
||||
log_prefix,
|
||||
)
|
||||
break
|
||||
except Exception as stream_err:
|
||||
@@ -1160,8 +1194,8 @@ async def stream_chat_completion_sdk(
|
||||
# so the session can still be saved and the
|
||||
# frontend gets a clean finish.
|
||||
logger.error(
|
||||
"[SDK] [%s] Stream error from SDK: %s",
|
||||
session_id[:12],
|
||||
"%s Stream error from SDK: %s",
|
||||
log_prefix,
|
||||
stream_err,
|
||||
exc_info=True,
|
||||
)
|
||||
@@ -1173,9 +1207,9 @@ async def stream_chat_completion_sdk(
|
||||
break
|
||||
|
||||
logger.info(
|
||||
"[SDK] [%s] Received: %s %s "
|
||||
"%s Received: %s %s "
|
||||
"(unresolved=%d, current=%d, resolved=%d)",
|
||||
session_id[:12],
|
||||
log_prefix,
|
||||
type(sdk_msg).__name__,
|
||||
getattr(sdk_msg, "subtype", ""),
|
||||
len(adapter.current_tool_calls)
|
||||
@@ -1210,10 +1244,10 @@ async def stream_chat_completion_sdk(
|
||||
await asyncio.sleep(0)
|
||||
else:
|
||||
logger.warning(
|
||||
"[SDK] [%s] Timed out waiting for "
|
||||
"%s Timed out waiting for "
|
||||
"PostToolUse hook stash "
|
||||
"(%d unresolved tool calls)",
|
||||
session_id[:12],
|
||||
log_prefix,
|
||||
len(adapter.current_tool_calls)
|
||||
- len(adapter.resolved_tool_calls),
|
||||
)
|
||||
@@ -1221,9 +1255,9 @@ async def stream_chat_completion_sdk(
|
||||
# Log ResultMessage details for debugging
|
||||
if isinstance(sdk_msg, ResultMessage):
|
||||
logger.info(
|
||||
"[SDK] [%s] Received: ResultMessage %s "
|
||||
"%s Received: ResultMessage %s "
|
||||
"(unresolved=%d, current=%d, resolved=%d)",
|
||||
session_id[:12],
|
||||
log_prefix,
|
||||
sdk_msg.subtype,
|
||||
len(adapter.current_tool_calls)
|
||||
- len(adapter.resolved_tool_calls),
|
||||
@@ -1232,8 +1266,8 @@ async def stream_chat_completion_sdk(
|
||||
)
|
||||
if sdk_msg.subtype in ("error", "error_during_execution"):
|
||||
logger.error(
|
||||
"[SDK] [%s] SDK execution failed with error: %s",
|
||||
session_id[:12],
|
||||
"%s SDK execution failed with error: %s",
|
||||
log_prefix,
|
||||
sdk_msg.result or "(no error message provided)",
|
||||
)
|
||||
|
||||
@@ -1258,8 +1292,8 @@ async def stream_chat_completion_sdk(
|
||||
out_len = len(str(response.output))
|
||||
extra = f", output_len={out_len}"
|
||||
logger.info(
|
||||
"[SDK] [%s] Tool event: %s, tool=%s%s",
|
||||
session_id[:12],
|
||||
"%s Tool event: %s, tool=%s%s",
|
||||
log_prefix,
|
||||
type(response).__name__,
|
||||
getattr(response, "toolName", "N/A"),
|
||||
extra,
|
||||
@@ -1268,8 +1302,8 @@ async def stream_chat_completion_sdk(
|
||||
# Log errors being sent to frontend
|
||||
if isinstance(response, StreamError):
|
||||
logger.error(
|
||||
"[SDK] [%s] Sending error to frontend: %s (code=%s)",
|
||||
session_id[:12],
|
||||
"%s Sending error to frontend: %s (code=%s)",
|
||||
log_prefix,
|
||||
response.errorText,
|
||||
response.code,
|
||||
)
|
||||
@@ -1335,8 +1369,8 @@ async def stream_chat_completion_sdk(
|
||||
# server shutdown). Log and let the safety-net / finally
|
||||
# blocks handle cleanup.
|
||||
logger.warning(
|
||||
"[SDK] [%s] Streaming loop cancelled (asyncio.CancelledError)",
|
||||
session_id[:12],
|
||||
"%s Streaming loop cancelled (asyncio.CancelledError)",
|
||||
log_prefix,
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
@@ -1350,7 +1384,8 @@ async def stream_chat_completion_sdk(
|
||||
except (asyncio.CancelledError, StopAsyncIteration):
|
||||
# Expected: task was cancelled or exhausted during cleanup
|
||||
logger.info(
|
||||
"[SDK] Pending __anext__ task completed during cleanup"
|
||||
"%s Pending __anext__ task completed during cleanup",
|
||||
log_prefix,
|
||||
)
|
||||
|
||||
# Safety net: if tools are still unresolved after the
|
||||
@@ -1359,9 +1394,9 @@ async def stream_chat_completion_sdk(
|
||||
# them now so the frontend stops showing spinners.
|
||||
if adapter.has_unresolved_tool_calls:
|
||||
logger.warning(
|
||||
"[SDK] [%s] %d unresolved tool(s) after stream loop — "
|
||||
"%s %d unresolved tool(s) after stream loop — "
|
||||
"flushing as safety net",
|
||||
session_id[:12],
|
||||
log_prefix,
|
||||
len(adapter.current_tool_calls) - len(adapter.resolved_tool_calls),
|
||||
)
|
||||
safety_responses: list[StreamBaseResponse] = []
|
||||
@@ -1372,8 +1407,8 @@ async def stream_chat_completion_sdk(
|
||||
(StreamToolInputAvailable, StreamToolOutputAvailable),
|
||||
):
|
||||
logger.info(
|
||||
"[SDK] [%s] Safety flush: %s, tool=%s",
|
||||
session_id[:12],
|
||||
"%s Safety flush: %s, tool=%s",
|
||||
log_prefix,
|
||||
type(response).__name__,
|
||||
getattr(response, "toolName", "N/A"),
|
||||
)
|
||||
@@ -1386,8 +1421,8 @@ async def stream_chat_completion_sdk(
|
||||
# StreamFinish is published by mark_session_completed in the processor.
|
||||
if not stream_completed and not ended_with_stream_error:
|
||||
logger.info(
|
||||
"[SDK] [%s] Stream ended without ResultMessage (stopped by user)",
|
||||
session_id[:12],
|
||||
"%s Stream ended without ResultMessage (stopped by user)",
|
||||
log_prefix,
|
||||
)
|
||||
closing_responses: list[StreamBaseResponse] = []
|
||||
adapter._end_text_if_open(closing_responses)
|
||||
@@ -1414,27 +1449,23 @@ async def stream_chat_completion_sdk(
|
||||
# stop hook content — which could be smaller after compaction).
|
||||
|
||||
logger.info(
|
||||
"[SDK] [%s] Stream completed successfully with %d messages",
|
||||
session_id[:12],
|
||||
"%s Stream completed successfully with %d messages",
|
||||
log_prefix,
|
||||
len(session.messages),
|
||||
)
|
||||
except BaseException as e:
|
||||
# Catch BaseException to handle both Exception and CancelledError
|
||||
# (CancelledError inherits from BaseException in Python 3.8+)
|
||||
if isinstance(e, asyncio.CancelledError):
|
||||
logger.warning("[SDK] [%s] Session cancelled", session_id[:12])
|
||||
logger.warning("%s Session cancelled", log_prefix)
|
||||
error_msg = "Operation cancelled"
|
||||
else:
|
||||
error_msg = str(e) or type(e).__name__
|
||||
# SDK cleanup RuntimeError is expected during cancellation, log as warning
|
||||
if isinstance(e, RuntimeError) and "cancel scope" in str(e):
|
||||
logger.warning(
|
||||
"[SDK] [%s] SDK cleanup error: %s", session_id[:12], error_msg
|
||||
)
|
||||
logger.warning("%s SDK cleanup error: %s", log_prefix, error_msg)
|
||||
else:
|
||||
logger.error(
|
||||
f"[SDK] [%s] Error: {error_msg}", session_id[:12], exc_info=True
|
||||
)
|
||||
logger.error("%s Error: %s", log_prefix, error_msg, exc_info=True)
|
||||
|
||||
# Append error marker to session (non-invasive text parsing approach)
|
||||
# The finally block will persist the session with this error marker
|
||||
@@ -1445,8 +1476,8 @@ async def stream_chat_completion_sdk(
|
||||
)
|
||||
)
|
||||
logger.debug(
|
||||
"[SDK] [%s] Appended error marker, will be persisted in finally",
|
||||
session_id[:12],
|
||||
"%s Appended error marker, will be persisted in finally",
|
||||
log_prefix,
|
||||
)
|
||||
|
||||
# Yield StreamError for immediate feedback (only for non-cancellation errors)
|
||||
@@ -1478,14 +1509,14 @@ async def stream_chat_completion_sdk(
|
||||
try:
|
||||
await asyncio.shield(upsert_chat_session(session))
|
||||
logger.info(
|
||||
"[SDK] [%s] Session persisted in finally with %d messages",
|
||||
session_id[:12],
|
||||
"%s Session persisted in finally with %d messages",
|
||||
log_prefix,
|
||||
len(session.messages),
|
||||
)
|
||||
except Exception as persist_err:
|
||||
logger.error(
|
||||
"[SDK] [%s] Failed to persist session in finally: %s",
|
||||
session_id[:12],
|
||||
"%s Failed to persist session in finally: %s",
|
||||
log_prefix,
|
||||
persist_err,
|
||||
exc_info=True,
|
||||
)
|
||||
@@ -1502,8 +1533,22 @@ async def stream_chat_completion_sdk(
|
||||
# file when the stop hook didn't fire (e.g. error before
|
||||
# completion) so we don't lose the prior transcript.
|
||||
raw_transcript = captured_transcript.raw_content or None
|
||||
source = "stop_hook"
|
||||
if not raw_transcript and use_resume and resume_file:
|
||||
raw_transcript = read_transcript_file(resume_file)
|
||||
source = "resume_file_fallback"
|
||||
|
||||
logger.info(
|
||||
"%s Transcript upload: source=%s, "
|
||||
"stop_hook_fired=%s, captured_len=%d, "
|
||||
"raw_len=%d, use_resume=%s",
|
||||
log_prefix,
|
||||
source,
|
||||
bool(captured_transcript.path),
|
||||
len(captured_transcript.raw_content),
|
||||
len(raw_transcript) if raw_transcript else 0,
|
||||
use_resume,
|
||||
)
|
||||
|
||||
if raw_transcript and session is not None:
|
||||
await asyncio.shield(
|
||||
@@ -1512,13 +1557,15 @@ async def stream_chat_completion_sdk(
|
||||
session_id,
|
||||
raw_transcript,
|
||||
message_count=len(session.messages),
|
||||
log_prefix=log_prefix,
|
||||
previous_content=previous_transcript_content,
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.warning(f"[SDK] No transcript to upload for {session_id}")
|
||||
logger.warning(f"{log_prefix} No transcript to upload")
|
||||
except Exception as upload_err:
|
||||
logger.error(
|
||||
f"[SDK] Transcript upload failed in finally: {upload_err}",
|
||||
f"{log_prefix} Transcript upload failed in finally: {upload_err}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
@@ -1534,23 +1581,30 @@ async def _try_upload_transcript(
|
||||
session_id: str,
|
||||
raw_content: str,
|
||||
message_count: int = 0,
|
||||
log_prefix: str = "[SDK]",
|
||||
previous_content: str | None = None,
|
||||
) -> bool:
|
||||
"""Strip progress entries and upload transcript (with timeout).
|
||||
"""Merge synthetic entries, strip progress, and upload (with timeout).
|
||||
|
||||
Returns True if the upload completed without error.
|
||||
"""
|
||||
try:
|
||||
async with asyncio.timeout(30):
|
||||
await upload_transcript(
|
||||
user_id, session_id, raw_content, message_count=message_count
|
||||
user_id,
|
||||
session_id,
|
||||
raw_content,
|
||||
message_count=message_count,
|
||||
log_prefix=log_prefix,
|
||||
previous_content=previous_content,
|
||||
)
|
||||
return True
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"[SDK] Transcript upload timed out for {session_id}")
|
||||
logger.warning(f"{log_prefix} Transcript upload timed out")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[SDK] Failed to upload transcript for {session_id}: {e}",
|
||||
f"{log_prefix} Failed to upload transcript: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
return False
|
||||
|
||||
@@ -47,6 +47,83 @@ class TranscriptDownload:
|
||||
TRANSCRIPT_STORAGE_PREFIX = "chat-transcripts"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Synthetic entry merging
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_SYNTHETIC_MODEL = "<synthetic>"
|
||||
|
||||
|
||||
def merge_with_previous_transcript(
|
||||
new_content: str,
|
||||
previous_content: str | None,
|
||||
log_prefix: str = "[Transcript]",
|
||||
) -> str:
|
||||
"""Replace synthetic assistant entries with real ones from a previous transcript.
|
||||
|
||||
When the CLI resumes from a transcript via ``--resume``, it writes synthetic
|
||||
placeholder entries (``model: "<synthetic>"``, ``text: "No response requested."``)
|
||||
for all previous assistant turns. Only the CURRENT turn gets a real response.
|
||||
|
||||
This function restores real assistant content by matching entries between
|
||||
the new and previous transcripts using their ``uuid`` field.
|
||||
|
||||
Returns the merged JSONL content.
|
||||
"""
|
||||
if not previous_content or not previous_content.strip():
|
||||
return new_content
|
||||
|
||||
# Index real assistant entries from previous transcript by uuid
|
||||
real_by_uuid: dict[str, str] = {}
|
||||
for line in previous_content.strip().split("\n"):
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
if entry.get("type") != "assistant":
|
||||
continue
|
||||
msg = entry.get("message", {})
|
||||
if msg.get("model") == _SYNTHETIC_MODEL:
|
||||
continue
|
||||
uid = entry.get("uuid", "")
|
||||
if uid:
|
||||
real_by_uuid[uid] = line
|
||||
|
||||
if not real_by_uuid:
|
||||
return new_content
|
||||
|
||||
# Walk the new transcript and replace synthetic entries with real ones
|
||||
merged_lines: list[str] = []
|
||||
replaced = 0
|
||||
for line in new_content.strip().split("\n"):
|
||||
try:
|
||||
entry = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
merged_lines.append(line)
|
||||
continue
|
||||
|
||||
uid = entry.get("uuid", "")
|
||||
msg = entry.get("message", {})
|
||||
if (
|
||||
entry.get("type") == "assistant"
|
||||
and msg.get("model") == _SYNTHETIC_MODEL
|
||||
and uid in real_by_uuid
|
||||
):
|
||||
merged_lines.append(real_by_uuid[uid])
|
||||
replaced += 1
|
||||
else:
|
||||
merged_lines.append(line)
|
||||
|
||||
if replaced:
|
||||
logger.info(
|
||||
"%s Merged %d synthetic entries with real assistant content",
|
||||
log_prefix,
|
||||
replaced,
|
||||
)
|
||||
|
||||
return "\n".join(merged_lines) + "\n"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Progress stripping
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -58,41 +135,40 @@ def strip_progress_entries(content: str) -> str:
|
||||
Removes entries whose ``type`` is in ``STRIPPABLE_TYPES`` and reparents
|
||||
any remaining child entries so the ``parentUuid`` chain stays intact.
|
||||
Typically reduces transcript size by ~30%.
|
||||
|
||||
Entries that are not stripped or reparented are kept as their original
|
||||
raw JSON line to avoid unnecessary re-serialization that changes
|
||||
whitespace or key ordering.
|
||||
"""
|
||||
lines = content.strip().split("\n")
|
||||
|
||||
entries: list[dict] = []
|
||||
# Parse entries, keeping the original line alongside the parsed dict.
|
||||
parsed: list[tuple[str, dict | None]] = []
|
||||
for line in lines:
|
||||
try:
|
||||
entries.append(json.loads(line))
|
||||
parsed.append((line, json.loads(line)))
|
||||
except json.JSONDecodeError:
|
||||
# Keep unparseable lines as-is (safety)
|
||||
entries.append({"_raw": line})
|
||||
parsed.append((line, None))
|
||||
|
||||
# First pass: identify stripped UUIDs and build parent map.
|
||||
stripped_uuids: set[str] = set()
|
||||
uuid_to_parent: dict[str, str] = {}
|
||||
kept: list[dict] = []
|
||||
|
||||
for entry in entries:
|
||||
if "_raw" in entry:
|
||||
kept.append(entry)
|
||||
for _line, entry in parsed:
|
||||
if entry is None:
|
||||
continue
|
||||
uid = entry.get("uuid", "")
|
||||
parent = entry.get("parentUuid", "")
|
||||
entry_type = entry.get("type", "")
|
||||
|
||||
if uid:
|
||||
uuid_to_parent[uid] = parent
|
||||
if entry.get("type", "") in STRIPPABLE_TYPES and uid:
|
||||
stripped_uuids.add(uid)
|
||||
|
||||
if entry_type in STRIPPABLE_TYPES:
|
||||
if uid:
|
||||
stripped_uuids.add(uid)
|
||||
else:
|
||||
kept.append(entry)
|
||||
|
||||
# Reparent: walk up chain through stripped entries to find surviving ancestor
|
||||
for entry in kept:
|
||||
if "_raw" in entry:
|
||||
# Second pass: keep non-stripped entries, reparenting where needed.
|
||||
# Preserve original line when no reparenting is required.
|
||||
reparented: set[str] = set()
|
||||
for _line, entry in parsed:
|
||||
if entry is None:
|
||||
continue
|
||||
parent = entry.get("parentUuid", "")
|
||||
original_parent = parent
|
||||
@@ -100,13 +176,23 @@ def strip_progress_entries(content: str) -> str:
|
||||
parent = uuid_to_parent.get(parent, "")
|
||||
if parent != original_parent:
|
||||
entry["parentUuid"] = parent
|
||||
uid = entry.get("uuid", "")
|
||||
if uid:
|
||||
reparented.add(uid)
|
||||
|
||||
result_lines: list[str] = []
|
||||
for entry in kept:
|
||||
if "_raw" in entry:
|
||||
result_lines.append(entry["_raw"])
|
||||
else:
|
||||
for line, entry in parsed:
|
||||
if entry is None:
|
||||
result_lines.append(line)
|
||||
continue
|
||||
if entry.get("type", "") in STRIPPABLE_TYPES:
|
||||
continue
|
||||
uid = entry.get("uuid", "")
|
||||
if uid in reparented:
|
||||
# Re-serialize only entries whose parentUuid was changed.
|
||||
result_lines.append(json.dumps(entry, separators=(",", ":")))
|
||||
else:
|
||||
result_lines.append(line)
|
||||
|
||||
return "\n".join(result_lines) + "\n"
|
||||
|
||||
@@ -258,8 +344,6 @@ def validate_transcript(content: str | None) -> bool:
|
||||
return False
|
||||
|
||||
lines = content.strip().split("\n")
|
||||
if len(lines) < 2:
|
||||
return False
|
||||
|
||||
has_assistant = False
|
||||
|
||||
@@ -326,25 +410,47 @@ async def upload_transcript(
|
||||
session_id: str,
|
||||
content: str,
|
||||
message_count: int = 0,
|
||||
log_prefix: str = "[Transcript]",
|
||||
previous_content: str | None = None,
|
||||
) -> None:
|
||||
"""Strip progress entries and upload transcript to bucket storage.
|
||||
"""Merge synthetic entries, strip progress, and upload transcript.
|
||||
|
||||
The executor holds a cluster lock per session, so concurrent uploads for
|
||||
the same session cannot happen. We always overwrite — with ``--resume``
|
||||
the CLI may compact old tool results, so neither byte size nor line count
|
||||
is a reliable proxy for "newer".
|
||||
|
||||
When the CLI resumes, it writes synthetic placeholders for previous
|
||||
assistant turns. If *previous_content* is provided, real assistant
|
||||
entries are restored before stripping and upload.
|
||||
|
||||
Args:
|
||||
message_count: ``len(session.messages)`` at upload time — used by
|
||||
the next turn to detect staleness and compress only the gap.
|
||||
previous_content: The previously-stored transcript (from the download
|
||||
at the start of this turn). Used to restore real assistant content
|
||||
that the CLI replaced with synthetic ``"No response requested."``
|
||||
placeholders.
|
||||
"""
|
||||
from backend.util.workspace_storage import get_workspace_storage
|
||||
|
||||
stripped = strip_progress_entries(content)
|
||||
merged = merge_with_previous_transcript(content, previous_content, log_prefix)
|
||||
stripped = strip_progress_entries(merged)
|
||||
if not validate_transcript(stripped):
|
||||
# Log entry types for debugging — helps identify why validation failed
|
||||
entry_types: list[str] = []
|
||||
for line in stripped.strip().split("\n"):
|
||||
try:
|
||||
entry_types.append(json.loads(line).get("type", "?"))
|
||||
except json.JSONDecodeError:
|
||||
entry_types.append("INVALID_JSON")
|
||||
logger.warning(
|
||||
f"[Transcript] Skipping upload — stripped content not valid "
|
||||
f"for session {session_id}"
|
||||
"%s Skipping upload — stripped content not valid "
|
||||
"(types=%s, stripped_len=%d, raw_len=%d)",
|
||||
log_prefix,
|
||||
entry_types,
|
||||
len(stripped),
|
||||
len(content),
|
||||
)
|
||||
return
|
||||
|
||||
@@ -371,17 +477,18 @@ async def upload_transcript(
|
||||
content=json.dumps(meta).encode("utf-8"),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"[Transcript] Failed to write metadata for {session_id}: {e}")
|
||||
logger.warning(f"{log_prefix} Failed to write metadata: {e}")
|
||||
|
||||
logger.info(
|
||||
f"[Transcript] Uploaded {len(encoded)}B "
|
||||
f"(stripped from {len(content)}B, msg_count={message_count}) "
|
||||
f"for session {session_id}"
|
||||
f"{log_prefix} Uploaded {len(encoded)}B "
|
||||
f"(stripped from {len(content)}B, msg_count={message_count})"
|
||||
)
|
||||
|
||||
|
||||
async def download_transcript(
|
||||
user_id: str, session_id: str
|
||||
user_id: str,
|
||||
session_id: str,
|
||||
log_prefix: str = "[Transcript]",
|
||||
) -> TranscriptDownload | None:
|
||||
"""Download transcript and metadata from bucket storage.
|
||||
|
||||
@@ -397,10 +504,10 @@ async def download_transcript(
|
||||
data = await storage.retrieve(path)
|
||||
content = data.decode("utf-8")
|
||||
except FileNotFoundError:
|
||||
logger.debug(f"[Transcript] No transcript in storage for {session_id}")
|
||||
logger.debug(f"{log_prefix} No transcript in storage")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning(f"[Transcript] Failed to download transcript: {e}")
|
||||
logger.warning(f"{log_prefix} Failed to download transcript: {e}")
|
||||
return None
|
||||
|
||||
# Try to load metadata (best-effort — old transcripts won't have it)
|
||||
@@ -423,10 +530,7 @@ async def download_transcript(
|
||||
except (FileNotFoundError, json.JSONDecodeError, Exception):
|
||||
pass # No metadata — treat as unknown (msg_count=0 → always fill gap)
|
||||
|
||||
logger.info(
|
||||
f"[Transcript] Downloaded {len(content)}B "
|
||||
f"(msg_count={message_count}) for session {session_id}"
|
||||
)
|
||||
logger.info(f"{log_prefix} Downloaded {len(content)}B (msg_count={message_count})")
|
||||
return TranscriptDownload(
|
||||
content=content,
|
||||
message_count=message_count,
|
||||
|
||||
@@ -5,6 +5,7 @@ import os
|
||||
|
||||
from .transcript import (
|
||||
STRIPPABLE_TYPES,
|
||||
merge_with_previous_transcript,
|
||||
read_transcript_file,
|
||||
strip_progress_entries,
|
||||
validate_transcript,
|
||||
@@ -187,6 +188,12 @@ class TestValidateTranscript:
|
||||
content = _make_jsonl(summary, asst1, asst2)
|
||||
assert validate_transcript(content) is True
|
||||
|
||||
def test_single_assistant_entry(self):
|
||||
"""A transcript with just one assistant line is valid — the CLI may
|
||||
produce short transcripts for simple responses with no tool use."""
|
||||
content = json.dumps(ASST_MSG) + "\n"
|
||||
assert validate_transcript(content) is True
|
||||
|
||||
def test_invalid_json_returns_false(self):
|
||||
assert validate_transcript("not json\n{}\n{}\n") is False
|
||||
|
||||
@@ -282,3 +289,145 @@ class TestStripProgressEntries:
|
||||
assert "queue-operation" not in result_types
|
||||
assert "user" in result_types
|
||||
assert "assistant" in result_types
|
||||
|
||||
def test_preserves_original_line_formatting(self):
|
||||
"""Non-reparented entries keep their original JSON formatting."""
|
||||
# Use pretty-printed JSON with spaces (as the CLI produces)
|
||||
original_line = json.dumps(USER_MSG) # default formatting with spaces
|
||||
compact_line = json.dumps(USER_MSG, separators=(",", ":"))
|
||||
assert original_line != compact_line # precondition
|
||||
|
||||
content = original_line + "\n" + json.dumps(ASST_MSG) + "\n"
|
||||
result = strip_progress_entries(content)
|
||||
result_lines = result.strip().split("\n")
|
||||
|
||||
# Original line should be byte-identical (not re-serialized)
|
||||
assert result_lines[0] == original_line
|
||||
|
||||
def test_reparented_entries_are_reserialized(self):
|
||||
"""Entries whose parentUuid changes must be re-serialized."""
|
||||
progress = {"type": "progress", "uuid": "p1", "parentUuid": "u1"}
|
||||
asst = {
|
||||
"type": "assistant",
|
||||
"uuid": "a1",
|
||||
"parentUuid": "p1",
|
||||
"message": {"role": "assistant", "content": "done"},
|
||||
}
|
||||
content = _make_jsonl(USER_MSG, progress, asst)
|
||||
result = strip_progress_entries(content)
|
||||
lines = result.strip().split("\n")
|
||||
asst_entry = json.loads(lines[-1])
|
||||
assert asst_entry["parentUuid"] == "u1" # reparented
|
||||
|
||||
|
||||
class TestMergeWithPreviousTranscript:
|
||||
def test_no_previous_content(self):
|
||||
"""Without previous content, returns new content unchanged."""
|
||||
content = _make_jsonl(USER_MSG, ASST_MSG)
|
||||
result = merge_with_previous_transcript(content, None)
|
||||
assert result == content
|
||||
|
||||
def test_empty_previous_content(self):
|
||||
result = merge_with_previous_transcript(_make_jsonl(USER_MSG), "")
|
||||
assert result.strip() == json.dumps(USER_MSG)
|
||||
|
||||
def test_replaces_synthetic_with_real(self):
|
||||
"""Synthetic assistant entries are replaced with real ones from previous."""
|
||||
real_asst = {
|
||||
"type": "assistant",
|
||||
"uuid": "a1",
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"model": "claude-opus-4-6",
|
||||
"content": "real answer",
|
||||
},
|
||||
}
|
||||
synthetic_asst = {
|
||||
"type": "assistant",
|
||||
"uuid": "a1",
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"model": "<synthetic>",
|
||||
"content": [{"type": "text", "text": "No response requested."}],
|
||||
},
|
||||
}
|
||||
new_user = {
|
||||
"type": "user",
|
||||
"uuid": "u2",
|
||||
"message": {"role": "user", "content": "follow up"},
|
||||
}
|
||||
new_asst = {
|
||||
"type": "assistant",
|
||||
"uuid": "a2",
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"model": "claude-opus-4-6",
|
||||
"content": "real answer 2",
|
||||
},
|
||||
}
|
||||
|
||||
previous = _make_jsonl(USER_MSG, real_asst)
|
||||
new = _make_jsonl(USER_MSG, synthetic_asst, new_user, new_asst)
|
||||
|
||||
result = merge_with_previous_transcript(new, previous)
|
||||
result_entries = [json.loads(line) for line in result.strip().split("\n")]
|
||||
|
||||
# The synthetic entry should be replaced with the real one
|
||||
a1 = next(e for e in result_entries if e.get("uuid") == "a1")
|
||||
assert a1["message"]["model"] == "claude-opus-4-6"
|
||||
assert a1["message"]["content"] == "real answer"
|
||||
|
||||
# The new real entry should be preserved
|
||||
a2 = next(e for e in result_entries if e.get("uuid") == "a2")
|
||||
assert a2["message"]["model"] == "claude-opus-4-6"
|
||||
|
||||
def test_preserves_real_entries(self):
|
||||
"""Non-synthetic assistant entries in the new transcript are not replaced."""
|
||||
real_asst = {
|
||||
"type": "assistant",
|
||||
"uuid": "a1",
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"model": "claude-opus-4-6",
|
||||
"content": "new real",
|
||||
},
|
||||
}
|
||||
previous_asst = {
|
||||
"type": "assistant",
|
||||
"uuid": "a1",
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"model": "claude-opus-4-6",
|
||||
"content": "old real",
|
||||
},
|
||||
}
|
||||
|
||||
new = _make_jsonl(USER_MSG, real_asst)
|
||||
previous = _make_jsonl(USER_MSG, previous_asst)
|
||||
|
||||
result = merge_with_previous_transcript(new, previous)
|
||||
entries = [json.loads(line) for line in result.strip().split("\n")]
|
||||
a1 = next(e for e in entries if e.get("uuid") == "a1")
|
||||
assert a1["message"]["content"] == "new real"
|
||||
|
||||
def test_no_matching_uuids(self):
|
||||
"""When previous has no matching UUIDs, new content is returned as-is."""
|
||||
synthetic = {
|
||||
"type": "assistant",
|
||||
"uuid": "a-new",
|
||||
"message": {"model": "<synthetic>", "content": "No response requested."},
|
||||
}
|
||||
previous_asst = {
|
||||
"type": "assistant",
|
||||
"uuid": "a-old",
|
||||
"message": {"model": "claude-opus-4-6", "content": "old"},
|
||||
}
|
||||
|
||||
new = _make_jsonl(USER_MSG, synthetic)
|
||||
previous = _make_jsonl(USER_MSG, previous_asst)
|
||||
|
||||
result = merge_with_previous_transcript(new, previous)
|
||||
entries = [json.loads(line) for line in result.strip().split("\n")]
|
||||
a = next(e for e in entries if e.get("uuid") == "a-new")
|
||||
# Not replaced because UUID doesn't match
|
||||
assert a["message"]["model"] == "<synthetic>"
|
||||
|
||||
Reference in New Issue
Block a user