From a79bd88e7cb58dfccb0c9255ecc93d5ccf873e99 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Fri, 13 Feb 2026 15:26:53 +0400 Subject: [PATCH] feat(chat/sdk): move transcript storage from DB column to bucket Replace the sdkTranscript TEXT column with WorkspaceStorageBackend (GCS/local) for persisting Claude Code JSONL transcripts. This removes the implicit 512KB cap that caused --resume to degrade after a few tool-heavy turns (JSONL is append-only and never shrinks). Key changes: - Strip progress/metadata entries before storing (~30% size reduction) with parentUuid reparenting for orphaned children - Upload in background (asyncio.create_task) to avoid blocking SSE - Size-based conflict guard: never overwrite a larger (newer) transcript - Validate stripped content before upload - Log warning when falling back to compression approach - Enable claude_agent_use_resume by default - Remove sdkTranscript column from schema, model, and DB layer - Storage path: chat-transcripts/{user_id}/{session_id}/{session_id}.jsonl --- .../backend/api/features/chat/config.py | 2 +- .../backend/backend/api/features/chat/db.py | 3 - .../backend/api/features/chat/model.py | 3 - .../backend/api/features/chat/sdk/service.py | 85 ++++--- .../api/features/chat/sdk/transcript.py | 228 ++++++++++++++++-- .../backend/api/features/chat/service_test.py | 22 +- .../migration.sql | 2 - autogpt_platform/backend/schema.prisma | 3 - .../backend/test/chat/test_transcript.py | 119 ++++++++- 9 files changed, 391 insertions(+), 76 deletions(-) delete mode 100644 autogpt_platform/backend/migrations/20260213082253_add_sdk_chat_transcript/migration.sql diff --git a/autogpt_platform/backend/backend/api/features/chat/config.py b/autogpt_platform/backend/backend/api/features/chat/config.py index 7daecc7053..04bbe8e60d 100644 --- a/autogpt_platform/backend/backend/api/features/chat/config.py +++ b/autogpt_platform/backend/backend/api/features/chat/config.py @@ -112,7 +112,7 @@ class ChatConfig(BaseSettings): description="Max number of sub-agent Tasks the SDK can spawn per session.", ) claude_agent_use_resume: bool = Field( - default=False, + default=True, description="Use --resume for multi-turn conversations instead of " "history compression. Falls back to compression when unavailable.", ) diff --git a/autogpt_platform/backend/backend/api/features/chat/db.py b/autogpt_platform/backend/backend/api/features/chat/db.py index 2529e873d1..303ea0a698 100644 --- a/autogpt_platform/backend/backend/api/features/chat/db.py +++ b/autogpt_platform/backend/backend/api/features/chat/db.py @@ -56,7 +56,6 @@ async def update_chat_session( total_prompt_tokens: int | None = None, total_completion_tokens: int | None = None, title: str | None = None, - sdk_transcript: str | None = None, ) -> PrismaChatSession | None: """Update a chat session's metadata.""" data: ChatSessionUpdateInput = {"updatedAt": datetime.now(UTC)} @@ -73,8 +72,6 @@ async def update_chat_session( data["totalCompletionTokens"] = total_completion_tokens if title is not None: data["title"] = title - if sdk_transcript is not None: - data["sdkTranscript"] = sdk_transcript session = await PrismaChatSession.prisma().update( where={"id": session_id}, diff --git a/autogpt_platform/backend/backend/api/features/chat/model.py b/autogpt_platform/backend/backend/api/features/chat/model.py index 4225416a2d..30ac27aece 100644 --- a/autogpt_platform/backend/backend/api/features/chat/model.py +++ b/autogpt_platform/backend/backend/api/features/chat/model.py @@ -103,7 +103,6 @@ class ChatSession(BaseModel): updated_at: datetime successful_agent_runs: dict[str, int] = {} successful_agent_schedules: dict[str, int] = {} - sdk_transcript: str | None = None # Claude Code JSONL transcript for --resume def add_tool_call_to_current_turn(self, tool_call: dict) -> None: """Attach a tool_call to the current turn's assistant message. @@ -191,7 +190,6 @@ class ChatSession(BaseModel): updated_at=prisma_session.updatedAt, successful_agent_runs=successful_agent_runs, successful_agent_schedules=successful_agent_schedules, - sdk_transcript=getattr(prisma_session, "sdkTranscript", None), ) @staticmethod @@ -414,7 +412,6 @@ async def _save_session_to_db( successful_agent_schedules=session.successful_agent_schedules, total_prompt_tokens=total_prompt, total_completion_tokens=total_completion, - sdk_transcript=session.sdk_transcript, ) # Add new messages (only those after existing count) diff --git a/autogpt_platform/backend/backend/api/features/chat/sdk/service.py b/autogpt_platform/backend/backend/api/features/chat/sdk/service.py index f380610c6c..f975abf195 100644 --- a/autogpt_platform/backend/backend/api/features/chat/sdk/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/sdk/service.py @@ -44,6 +44,14 @@ from .tool_adapter import ( create_copilot_mcp_server, set_execution_context, ) +from .transcript import ( + delete_transcript, + download_transcript, + read_transcript_file, + upload_transcript, + validate_transcript, + write_transcript_to_tempfile, +) logger = logging.getLogger(__name__) config = ChatConfig() @@ -480,12 +488,6 @@ async def stream_chat_completion_sdk( try: from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient - from .transcript import ( - read_transcript_file, - validate_transcript, - write_transcript_to_tempfile, - ) - # Fail fast when no API credentials are available at all sdk_env = _build_sdk_env() if not sdk_env and not os.environ.get("ANTHROPIC_API_KEY"): @@ -513,25 +515,22 @@ async def stream_chat_completion_sdk( on_stop=_on_stop if config.claude_agent_use_resume else None, ) - # --- Resume strategy: try JSONL resume, fall back to compression --- + # --- Resume strategy: download transcript from bucket --- resume_file: str | None = None use_resume = False - if ( - config.claude_agent_use_resume - and session.sdk_transcript - and len(session.messages) > 1 - and validate_transcript(session.sdk_transcript) - ): - resume_file = write_transcript_to_tempfile( - session.sdk_transcript, session_id, sdk_cwd - ) - if resume_file: - use_resume = True - logger.info( - f"[SDK] Using --resume with transcript " - f"({len(session.sdk_transcript)} bytes)" + if config.claude_agent_use_resume and user_id and len(session.messages) > 1: + transcript_content = await download_transcript(user_id, session_id) + if transcript_content and validate_transcript(transcript_content): + resume_file = write_transcript_to_tempfile( + transcript_content, session_id, sdk_cwd ) + if resume_file: + use_resume = True + logger.info( + f"[SDK] Using --resume with transcript " + f"({len(transcript_content)} bytes)" + ) sdk_options_kwargs: dict[str, Any] = { "system_prompt": system_prompt, @@ -573,6 +572,11 @@ async def stream_chat_completion_sdk( # history into a context prefix as before. query_message = current_message if not use_resume and len(session.messages) > 1: + logger.warning( + f"[SDK] Using compression fallback for session " + f"{session_id} ({len(session.messages)} messages) — " + f"no transcript available for --resume" + ) compressed = await _compress_conversation_history(session) history_context = _format_conversation_context(compressed) if history_context: @@ -668,18 +672,21 @@ async def stream_chat_completion_sdk( # --- Capture transcript while CLI is still alive --- # Must happen INSIDE async with: close() sends SIGTERM # which kills the CLI before it can flush the JSONL. - if config.claude_agent_use_resume and captured_transcript.get("path"): + if ( + config.claude_agent_use_resume + and user_id + and captured_transcript.get("path") + ): # Give CLI time to flush JSONL writes before we read await asyncio.sleep(0.5) - transcript_content = read_transcript_file( - captured_transcript["path"] - ) - if transcript_content: - session.sdk_transcript = transcript_content - logger.info( - f"[SDK] Captured transcript: " - f"{len(transcript_content)} bytes" + raw_transcript = read_transcript_file(captured_transcript["path"]) + if raw_transcript: + # Upload in background — strip + store to bucket + task = asyncio.create_task( + _upload_transcript_bg(user_id, session_id, raw_transcript) ) + _background_tasks.add(task) + task.add_done_callback(_background_tasks.discard) else: logger.debug("[SDK] Stop hook fired but transcript not usable") @@ -699,9 +706,11 @@ async def stream_chat_completion_sdk( except Exception as e: logger.error(f"[SDK] Error: {e}", exc_info=True) - if use_resume: - session.sdk_transcript = None - logger.warning("[SDK] Clearing transcript after resume failure") + if use_resume and user_id: + logger.warning("[SDK] Deleting transcript after resume failure") + task = asyncio.create_task(delete_transcript(user_id, session_id)) + _background_tasks.add(task) + task.add_done_callback(_background_tasks.discard) try: await upsert_chat_session(session) except Exception as save_err: @@ -716,6 +725,16 @@ async def stream_chat_completion_sdk( _cleanup_sdk_tool_results(sdk_cwd) +async def _upload_transcript_bg( + user_id: str, session_id: str, raw_content: str +) -> None: + """Background task to strip progress entries and upload transcript.""" + try: + await upload_transcript(user_id, session_id, raw_content) + except Exception as e: + logger.error(f"[SDK] Failed to upload transcript for {session_id}: {e}") + + async def _update_title_async( session_id: str, message: str, user_id: str | None = None ) -> None: diff --git a/autogpt_platform/backend/backend/api/features/chat/sdk/transcript.py b/autogpt_platform/backend/backend/api/features/chat/sdk/transcript.py index 107f6f3b2d..a119d556f6 100644 --- a/autogpt_platform/backend/backend/api/features/chat/sdk/transcript.py +++ b/autogpt_platform/backend/backend/api/features/chat/sdk/transcript.py @@ -1,9 +1,13 @@ """JSONL transcript management for stateless multi-turn resume. The Claude Code CLI persists conversations as JSONL files (one JSON object per -line). When the SDK's ``Stop`` hook fires we read this file, store it in the DB, -and on the next turn write it back to a temp file + pass ``--resume`` so the CLI -can reconstruct the full conversation without lossy history compression. +line). When the SDK's ``Stop`` hook fires we read this file, strip bloat +(progress entries, metadata), and upload the result to bucket storage. On the +next turn we download the transcript, write it to a temp file, and pass +``--resume`` so the CLI can reconstruct the full conversation. + +Storage is handled via ``WorkspaceStorageBackend`` (GCS in prod, local +filesystem for self-hosted) — no DB column needed. """ import json @@ -12,8 +16,88 @@ import os logger = logging.getLogger(__name__) -# Safety limit — large transcripts are truncated to keep DB writes reasonable. -MAX_TRANSCRIPT_SIZE = 512 * 1024 # 512 KB +# Entry types that can be safely removed from the transcript without breaking +# the parentUuid conversation tree that ``--resume`` relies on. +# - progress: UI progress ticks, no message content (avg 97KB for agent_progress) +# - file-history-snapshot: undo tracking metadata +# - queue-operation: internal queue bookkeeping +# - summary: session summaries +# - pr-link: PR link metadata +STRIPPABLE_TYPES = frozenset( + {"progress", "file-history-snapshot", "queue-operation", "summary", "pr-link"} +) + +# Workspace storage constants — deterministic path from session_id. +TRANSCRIPT_STORAGE_PREFIX = "chat-transcripts" + + +# --------------------------------------------------------------------------- +# Progress stripping +# --------------------------------------------------------------------------- + + +def strip_progress_entries(content: str) -> str: + """Remove progress/metadata entries from a JSONL transcript. + + Removes entries whose ``type`` is in ``STRIPPABLE_TYPES`` and reparents + any remaining child entries so the ``parentUuid`` chain stays intact. + Typically reduces transcript size by ~30%. + """ + lines = content.strip().split("\n") + + entries: list[dict] = [] + for line in lines: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + # Keep unparseable lines as-is (safety) + entries.append({"_raw": line}) + + stripped_uuids: set[str] = set() + uuid_to_parent: dict[str, str] = {} + kept: list[dict] = [] + + for entry in entries: + if "_raw" in entry: + kept.append(entry) + continue + uid = entry.get("uuid", "") + parent = entry.get("parentUuid", "") + entry_type = entry.get("type", "") + + if uid: + uuid_to_parent[uid] = parent + + if entry_type in STRIPPABLE_TYPES: + if uid: + stripped_uuids.add(uid) + else: + kept.append(entry) + + # Reparent: walk up chain through stripped entries to find surviving ancestor + for entry in kept: + if "_raw" in entry: + continue + parent = entry.get("parentUuid", "") + original_parent = parent + while parent in stripped_uuids: + parent = uuid_to_parent.get(parent, "") + if parent != original_parent: + entry["parentUuid"] = parent + + result_lines: list[str] = [] + for entry in kept: + if "_raw" in entry: + result_lines.append(entry["_raw"]) + else: + result_lines.append(json.dumps(entry, separators=(",", ":"))) + + return "\n".join(result_lines) + "\n" + + +# --------------------------------------------------------------------------- +# Local file I/O (read from CLI's JSONL, write temp file for --resume) +# --------------------------------------------------------------------------- def read_transcript_file(transcript_path: str) -> str | None: @@ -46,18 +130,8 @@ def read_transcript_file(transcript_path: str) -> str | None: json.loads(lines[0]) json.loads(lines[-1]) - if len(content) > MAX_TRANSCRIPT_SIZE: - # Truncating a JSONL transcript would break the parentUuid tree - # structure that --resume relies on. Instead, return None so the - # caller falls back to the compression approach. - logger.warning( - f"[Transcript] Transcript too large ({len(content)} bytes), " - "skipping — will fall back to history compression" - ) - return None - logger.info( - f"[Transcript] Captured {len(lines)} lines, " + f"[Transcript] Read {len(lines)} lines, " f"{len(content)} bytes from {transcript_path}" ) return content @@ -123,3 +197,125 @@ def validate_transcript(content: str | None) -> bool: return False return has_user and has_assistant + + +# --------------------------------------------------------------------------- +# Bucket storage (GCS / local via WorkspaceStorageBackend) +# --------------------------------------------------------------------------- + + +def _storage_path_parts(user_id: str, session_id: str) -> tuple[str, str, str]: + """Return (workspace_id, file_id, filename) for a session's transcript. + + Path structure: ``chat-transcripts/{user_id}/{session_id}.jsonl`` + """ + workspace_id = f"{TRANSCRIPT_STORAGE_PREFIX}/{user_id}" + return (workspace_id, session_id, f"{session_id}.jsonl") + + +def _build_storage_path(user_id: str, session_id: str, backend: object) -> str: + """Build the full storage path string that ``retrieve()`` expects. + + ``store()`` returns a path like ``gcs://bucket/workspaces/...`` or + ``local://workspace_id/file_id/filename``. Since we use deterministic + arguments we can reconstruct the same path for download/delete without + having stored the return value. + """ + from backend.util.workspace_storage import GCSWorkspaceStorage + + wid, fid, fname = _storage_path_parts(user_id, session_id) + + if isinstance(backend, GCSWorkspaceStorage): + blob = f"workspaces/{wid}/{fid}/{fname}" + return f"gcs://{backend.bucket_name}/{blob}" + else: + # LocalWorkspaceStorage returns local://{relative_path} + return f"local://{wid}/{fid}/{fname}" + + +async def upload_transcript(user_id: str, session_id: str, content: str) -> None: + """Strip progress entries and upload transcript to bucket storage. + + Safety: only overwrites when the new (stripped) transcript is larger than + what is already stored. Since JSONL is append-only, the latest transcript + is always the longest. This prevents a slow/stale background task from + clobbering a newer upload from a concurrent turn. + """ + from backend.util.workspace_storage import get_workspace_storage + + stripped = strip_progress_entries(content) + if not validate_transcript(stripped): + logger.warning( + f"[Transcript] Skipping upload — stripped content is not a valid " + f"transcript for session {session_id}" + ) + return + + storage = await get_workspace_storage() + wid, fid, fname = _storage_path_parts(user_id, session_id) + new_size = len(stripped) + + # Check existing transcript size to avoid overwriting newer with older + path = _build_storage_path(user_id, session_id, storage) + try: + existing = await storage.retrieve(path) + existing_size = len(existing) + if existing_size >= new_size: + logger.info( + f"[Transcript] Skipping upload — existing transcript " + f"({existing_size}B) >= new ({new_size}B) for session " + f"{session_id}" + ) + return + except (FileNotFoundError, Exception): + pass # No existing transcript or retrieval error — proceed with upload + + await storage.store( + workspace_id=wid, + file_id=fid, + filename=fname, + content=stripped.encode("utf-8"), + ) + logger.info( + f"[Transcript] Uploaded {new_size} bytes " + f"(stripped from {len(content)}) for session {session_id}" + ) + + +async def download_transcript(user_id: str, session_id: str) -> str | None: + """Download transcript from bucket storage. + + Returns the JSONL content string, or ``None`` if not found. + """ + from backend.util.workspace_storage import get_workspace_storage + + storage = await get_workspace_storage() + path = _build_storage_path(user_id, session_id, storage) + + try: + data = await storage.retrieve(path) + content = data.decode("utf-8") + logger.info( + f"[Transcript] Downloaded {len(content)} bytes for session {session_id}" + ) + return content + except FileNotFoundError: + logger.debug(f"[Transcript] No transcript in storage for {session_id}") + return None + except Exception as e: + logger.warning(f"[Transcript] Failed to download transcript: {e}") + return None + + +async def delete_transcript(user_id: str, session_id: str) -> None: + """Delete transcript from bucket storage (e.g. after resume failure).""" + from backend.util.workspace_storage import get_workspace_storage + + storage = await get_workspace_storage() + path = _build_storage_path(user_id, session_id, storage) + + try: + await storage.delete(path) + logger.info(f"[Transcript] Deleted transcript for session {session_id}") + except Exception as e: + logger.warning(f"[Transcript] Failed to delete transcript: {e}") diff --git a/autogpt_platform/backend/backend/api/features/chat/service_test.py b/autogpt_platform/backend/backend/api/features/chat/service_test.py index 8781ee8341..b2fc82b790 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service_test.py +++ b/autogpt_platform/backend/backend/api/features/chat/service_test.py @@ -1,3 +1,4 @@ +import asyncio import logging from os import getenv @@ -12,6 +13,7 @@ from .response_model import ( StreamToolOutputAvailable, ) from .sdk import service as sdk_service +from .sdk.transcript import download_transcript logger = logging.getLogger(__name__) @@ -130,14 +132,22 @@ async def test_sdk_resume_multi_turn(setup_test_user, test_user_id): assert not turn1_errors, f"Turn 1 errors: {turn1_errors}" assert turn1_text, "Turn 1 produced no text" - # Reload session from DB and verify transcript was captured - session = await get_chat_session(session.session_id, test_user_id) - assert session, "Session not found after turn 1" - assert session.sdk_transcript, ( - "sdk_transcript was not captured after turn 1 — " + # Wait for background upload task to complete (retry up to 5s) + transcript = None + for _ in range(10): + await asyncio.sleep(0.5) + transcript = await download_transcript(test_user_id, session.session_id) + if transcript: + break + assert transcript, ( + "Transcript was not uploaded to bucket after turn 1 — " "Stop hook may not have fired or transcript was too small" ) - logger.info(f"Turn 1 transcript captured: {len(session.sdk_transcript)} bytes") + logger.info(f"Turn 1 transcript uploaded: {len(transcript)} bytes") + + # Reload session for turn 2 + session = await get_chat_session(session.session_id, test_user_id) + assert session, "Session not found after turn 1" # --- Turn 2: ask model to recall the keyword --- turn2_msg = "What was the special keyword I asked you to remember?" diff --git a/autogpt_platform/backend/migrations/20260213082253_add_sdk_chat_transcript/migration.sql b/autogpt_platform/backend/migrations/20260213082253_add_sdk_chat_transcript/migration.sql deleted file mode 100644 index 9af33d5842..0000000000 --- a/autogpt_platform/backend/migrations/20260213082253_add_sdk_chat_transcript/migration.sql +++ /dev/null @@ -1,2 +0,0 @@ --- AlterTable -ALTER TABLE "ChatSession" ADD COLUMN "sdkTranscript" TEXT; diff --git a/autogpt_platform/backend/schema.prisma b/autogpt_platform/backend/schema.prisma index 20adf97053..2da898a7ce 100644 --- a/autogpt_platform/backend/schema.prisma +++ b/autogpt_platform/backend/schema.prisma @@ -223,9 +223,6 @@ model ChatSession { totalPromptTokens Int @default(0) totalCompletionTokens Int @default(0) - // Claude Code JSONL transcript for stateless --resume across turns - sdkTranscript String? @db.Text - Messages ChatMessage[] @@index([userId, updatedAt]) diff --git a/autogpt_platform/backend/test/chat/test_transcript.py b/autogpt_platform/backend/test/chat/test_transcript.py index f8e5c4fb22..21c9e46def 100644 --- a/autogpt_platform/backend/test/chat/test_transcript.py +++ b/autogpt_platform/backend/test/chat/test_transcript.py @@ -4,8 +4,9 @@ import json import os from backend.api.features.chat.sdk.transcript import ( - MAX_TRANSCRIPT_SIZE, + STRIPPABLE_TYPES, read_transcript_file, + strip_progress_entries, validate_transcript, write_transcript_to_tempfile, ) @@ -27,6 +28,12 @@ ASST_MSG = { "parentUuid": "u1", "message": {"role": "assistant", "content": "hello"}, } +PROGRESS_ENTRY = { + "type": "progress", + "uuid": "p1", + "parentUuid": "u1", + "data": {"type": "bash_progress", "stdout": "running..."}, +} VALID_TRANSCRIPT = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG, ASST_MSG) @@ -59,19 +66,20 @@ class TestReadTranscriptFile: path.write_text(content) assert read_transcript_file(str(path)) is None - def test_returns_none_for_oversized_file(self, tmp_path): - # Build a valid transcript that exceeds MAX_TRANSCRIPT_SIZE - big_content = {"type": "user", "data": "x" * (MAX_TRANSCRIPT_SIZE + 100)} - content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG) - path = tmp_path / "big.jsonl" - path.write_text(content) - assert read_transcript_file(str(path)) is None - def test_returns_none_for_invalid_json(self, tmp_path): path = tmp_path / "bad.jsonl" path.write_text("not json\n{}\n{}\n") assert read_transcript_file(str(path)) is None + def test_no_size_limit(self, tmp_path): + """Large files are accepted — bucket storage has no size limit.""" + big_content = {"type": "user", "uuid": "u9", "data": "x" * 1_000_000} + content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG) + path = tmp_path / "big.jsonl" + path.write_text(content) + result = read_transcript_file(str(path)) + assert result is not None + # --- write_transcript_to_tempfile --- @@ -126,3 +134,96 @@ class TestValidateTranscript: def test_invalid_json_returns_false(self): assert validate_transcript("not json\n{}\n{}\n") is False + + +# --- strip_progress_entries --- + + +class TestStripProgressEntries: + def test_strips_all_strippable_types(self): + """All STRIPPABLE_TYPES are removed from the output.""" + entries = [ + USER_MSG, + {"type": "progress", "uuid": "p1", "parentUuid": "u1"}, + {"type": "file-history-snapshot", "files": []}, + {"type": "queue-operation", "subtype": "create"}, + {"type": "summary", "text": "..."}, + {"type": "pr-link", "url": "..."}, + ASST_MSG, + ] + result = strip_progress_entries(_make_jsonl(*entries)) + result_types = {json.loads(line)["type"] for line in result.strip().split("\n")} + assert result_types == {"user", "assistant"} + for stype in STRIPPABLE_TYPES: + assert stype not in result_types + + def test_reparents_children_of_stripped_entries(self): + """An assistant message whose parent is a progress entry gets reparented.""" + progress = { + "type": "progress", + "uuid": "p1", + "parentUuid": "u1", + "data": {"type": "bash_progress"}, + } + asst = { + "type": "assistant", + "uuid": "a1", + "parentUuid": "p1", # Points to progress + "message": {"role": "assistant", "content": "done"}, + } + content = _make_jsonl(USER_MSG, progress, asst) + result = strip_progress_entries(content) + lines = [json.loads(line) for line in result.strip().split("\n")] + + asst_entry = next(e for e in lines if e["type"] == "assistant") + # Should be reparented to u1 (the user message) + assert asst_entry["parentUuid"] == "u1" + + def test_reparents_through_chain(self): + """Reparenting walks through multiple stripped entries.""" + p1 = {"type": "progress", "uuid": "p1", "parentUuid": "u1"} + p2 = {"type": "progress", "uuid": "p2", "parentUuid": "p1"} + p3 = {"type": "progress", "uuid": "p3", "parentUuid": "p2"} + asst = { + "type": "assistant", + "uuid": "a1", + "parentUuid": "p3", # 3 levels deep + "message": {"role": "assistant", "content": "done"}, + } + content = _make_jsonl(USER_MSG, p1, p2, p3, asst) + result = strip_progress_entries(content) + lines = [json.loads(line) for line in result.strip().split("\n")] + + asst_entry = next(e for e in lines if e["type"] == "assistant") + assert asst_entry["parentUuid"] == "u1" + + def test_preserves_non_strippable_entries(self): + """User, assistant, and system entries are preserved.""" + system = {"type": "system", "uuid": "s1", "message": "prompt"} + content = _make_jsonl(system, USER_MSG, ASST_MSG) + result = strip_progress_entries(content) + result_types = [json.loads(line)["type"] for line in result.strip().split("\n")] + assert result_types == ["system", "user", "assistant"] + + def test_empty_input(self): + result = strip_progress_entries("") + # Should return just a newline (empty content stripped) + assert result.strip() == "" + + def test_no_strippable_entries(self): + """When there's nothing to strip, output matches input structure.""" + content = _make_jsonl(USER_MSG, ASST_MSG) + result = strip_progress_entries(content) + result_lines = result.strip().split("\n") + assert len(result_lines) == 2 + + def test_handles_entries_without_uuid(self): + """Entries without uuid field are handled gracefully.""" + no_uuid = {"type": "queue-operation", "subtype": "create"} + content = _make_jsonl(no_uuid, USER_MSG, ASST_MSG) + result = strip_progress_entries(content) + result_types = [json.loads(line)["type"] for line in result.strip().split("\n")] + # queue-operation is strippable + assert "queue-operation" not in result_types + assert "user" in result_types + assert "assistant" in result_types