feat(chat/sdk): move transcript storage from DB column to bucket

Replace the sdkTranscript TEXT column with WorkspaceStorageBackend
(GCS/local) for persisting Claude Code JSONL transcripts.  This removes
the implicit 512KB cap that caused --resume to degrade after a few
tool-heavy turns (JSONL is append-only and never shrinks).

Key changes:
- Strip progress/metadata entries before storing (~30% size reduction)
  with parentUuid reparenting for orphaned children
- Upload in background (asyncio.create_task) to avoid blocking SSE
- Size-based conflict guard: never overwrite a larger (newer) transcript
- Validate stripped content before upload
- Log warning when falling back to compression approach
- Enable claude_agent_use_resume by default
- Remove sdkTranscript column from schema, model, and DB layer
- Storage path: chat-transcripts/{user_id}/{session_id}/{session_id}.jsonl
This commit is contained in:
Zamil Majdy
2026-02-13 15:26:53 +04:00
parent 28c1121a8f
commit a79bd88e7c
9 changed files with 391 additions and 76 deletions

View File

@@ -112,7 +112,7 @@ class ChatConfig(BaseSettings):
description="Max number of sub-agent Tasks the SDK can spawn per session.",
)
claude_agent_use_resume: bool = Field(
default=False,
default=True,
description="Use --resume for multi-turn conversations instead of "
"history compression. Falls back to compression when unavailable.",
)

View File

@@ -56,7 +56,6 @@ async def update_chat_session(
total_prompt_tokens: int | None = None,
total_completion_tokens: int | None = None,
title: str | None = None,
sdk_transcript: str | None = None,
) -> PrismaChatSession | None:
"""Update a chat session's metadata."""
data: ChatSessionUpdateInput = {"updatedAt": datetime.now(UTC)}
@@ -73,8 +72,6 @@ async def update_chat_session(
data["totalCompletionTokens"] = total_completion_tokens
if title is not None:
data["title"] = title
if sdk_transcript is not None:
data["sdkTranscript"] = sdk_transcript
session = await PrismaChatSession.prisma().update(
where={"id": session_id},

View File

@@ -103,7 +103,6 @@ class ChatSession(BaseModel):
updated_at: datetime
successful_agent_runs: dict[str, int] = {}
successful_agent_schedules: dict[str, int] = {}
sdk_transcript: str | None = None # Claude Code JSONL transcript for --resume
def add_tool_call_to_current_turn(self, tool_call: dict) -> None:
"""Attach a tool_call to the current turn's assistant message.
@@ -191,7 +190,6 @@ class ChatSession(BaseModel):
updated_at=prisma_session.updatedAt,
successful_agent_runs=successful_agent_runs,
successful_agent_schedules=successful_agent_schedules,
sdk_transcript=getattr(prisma_session, "sdkTranscript", None),
)
@staticmethod
@@ -414,7 +412,6 @@ async def _save_session_to_db(
successful_agent_schedules=session.successful_agent_schedules,
total_prompt_tokens=total_prompt,
total_completion_tokens=total_completion,
sdk_transcript=session.sdk_transcript,
)
# Add new messages (only those after existing count)

View File

@@ -44,6 +44,14 @@ from .tool_adapter import (
create_copilot_mcp_server,
set_execution_context,
)
from .transcript import (
delete_transcript,
download_transcript,
read_transcript_file,
upload_transcript,
validate_transcript,
write_transcript_to_tempfile,
)
logger = logging.getLogger(__name__)
config = ChatConfig()
@@ -480,12 +488,6 @@ async def stream_chat_completion_sdk(
try:
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
from .transcript import (
read_transcript_file,
validate_transcript,
write_transcript_to_tempfile,
)
# Fail fast when no API credentials are available at all
sdk_env = _build_sdk_env()
if not sdk_env and not os.environ.get("ANTHROPIC_API_KEY"):
@@ -513,25 +515,22 @@ async def stream_chat_completion_sdk(
on_stop=_on_stop if config.claude_agent_use_resume else None,
)
# --- Resume strategy: try JSONL resume, fall back to compression ---
# --- Resume strategy: download transcript from bucket ---
resume_file: str | None = None
use_resume = False
if (
config.claude_agent_use_resume
and session.sdk_transcript
and len(session.messages) > 1
and validate_transcript(session.sdk_transcript)
):
resume_file = write_transcript_to_tempfile(
session.sdk_transcript, session_id, sdk_cwd
)
if resume_file:
use_resume = True
logger.info(
f"[SDK] Using --resume with transcript "
f"({len(session.sdk_transcript)} bytes)"
if config.claude_agent_use_resume and user_id and len(session.messages) > 1:
transcript_content = await download_transcript(user_id, session_id)
if transcript_content and validate_transcript(transcript_content):
resume_file = write_transcript_to_tempfile(
transcript_content, session_id, sdk_cwd
)
if resume_file:
use_resume = True
logger.info(
f"[SDK] Using --resume with transcript "
f"({len(transcript_content)} bytes)"
)
sdk_options_kwargs: dict[str, Any] = {
"system_prompt": system_prompt,
@@ -573,6 +572,11 @@ async def stream_chat_completion_sdk(
# history into a context prefix as before.
query_message = current_message
if not use_resume and len(session.messages) > 1:
logger.warning(
f"[SDK] Using compression fallback for session "
f"{session_id} ({len(session.messages)} messages) — "
f"no transcript available for --resume"
)
compressed = await _compress_conversation_history(session)
history_context = _format_conversation_context(compressed)
if history_context:
@@ -668,18 +672,21 @@ async def stream_chat_completion_sdk(
# --- Capture transcript while CLI is still alive ---
# Must happen INSIDE async with: close() sends SIGTERM
# which kills the CLI before it can flush the JSONL.
if config.claude_agent_use_resume and captured_transcript.get("path"):
if (
config.claude_agent_use_resume
and user_id
and captured_transcript.get("path")
):
# Give CLI time to flush JSONL writes before we read
await asyncio.sleep(0.5)
transcript_content = read_transcript_file(
captured_transcript["path"]
)
if transcript_content:
session.sdk_transcript = transcript_content
logger.info(
f"[SDK] Captured transcript: "
f"{len(transcript_content)} bytes"
raw_transcript = read_transcript_file(captured_transcript["path"])
if raw_transcript:
# Upload in background — strip + store to bucket
task = asyncio.create_task(
_upload_transcript_bg(user_id, session_id, raw_transcript)
)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
else:
logger.debug("[SDK] Stop hook fired but transcript not usable")
@@ -699,9 +706,11 @@ async def stream_chat_completion_sdk(
except Exception as e:
logger.error(f"[SDK] Error: {e}", exc_info=True)
if use_resume:
session.sdk_transcript = None
logger.warning("[SDK] Clearing transcript after resume failure")
if use_resume and user_id:
logger.warning("[SDK] Deleting transcript after resume failure")
task = asyncio.create_task(delete_transcript(user_id, session_id))
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
try:
await upsert_chat_session(session)
except Exception as save_err:
@@ -716,6 +725,16 @@ async def stream_chat_completion_sdk(
_cleanup_sdk_tool_results(sdk_cwd)
async def _upload_transcript_bg(
user_id: str, session_id: str, raw_content: str
) -> None:
"""Background task to strip progress entries and upload transcript."""
try:
await upload_transcript(user_id, session_id, raw_content)
except Exception as e:
logger.error(f"[SDK] Failed to upload transcript for {session_id}: {e}")
async def _update_title_async(
session_id: str, message: str, user_id: str | None = None
) -> None:

View File

@@ -1,9 +1,13 @@
"""JSONL transcript management for stateless multi-turn resume.
The Claude Code CLI persists conversations as JSONL files (one JSON object per
line). When the SDK's ``Stop`` hook fires we read this file, store it in the DB,
and on the next turn write it back to a temp file + pass ``--resume`` so the CLI
can reconstruct the full conversation without lossy history compression.
line). When the SDK's ``Stop`` hook fires we read this file, strip bloat
(progress entries, metadata), and upload the result to bucket storage. On the
next turn we download the transcript, write it to a temp file, and pass
``--resume`` so the CLI can reconstruct the full conversation.
Storage is handled via ``WorkspaceStorageBackend`` (GCS in prod, local
filesystem for self-hosted) — no DB column needed.
"""
import json
@@ -12,8 +16,88 @@ import os
logger = logging.getLogger(__name__)
# Safety limit — large transcripts are truncated to keep DB writes reasonable.
MAX_TRANSCRIPT_SIZE = 512 * 1024 # 512 KB
# Entry types that can be safely removed from the transcript without breaking
# the parentUuid conversation tree that ``--resume`` relies on.
# - progress: UI progress ticks, no message content (avg 97KB for agent_progress)
# - file-history-snapshot: undo tracking metadata
# - queue-operation: internal queue bookkeeping
# - summary: session summaries
# - pr-link: PR link metadata
STRIPPABLE_TYPES = frozenset(
{"progress", "file-history-snapshot", "queue-operation", "summary", "pr-link"}
)
# Workspace storage constants — deterministic path from session_id.
TRANSCRIPT_STORAGE_PREFIX = "chat-transcripts"
# ---------------------------------------------------------------------------
# Progress stripping
# ---------------------------------------------------------------------------
def strip_progress_entries(content: str) -> str:
"""Remove progress/metadata entries from a JSONL transcript.
Removes entries whose ``type`` is in ``STRIPPABLE_TYPES`` and reparents
any remaining child entries so the ``parentUuid`` chain stays intact.
Typically reduces transcript size by ~30%.
"""
lines = content.strip().split("\n")
entries: list[dict] = []
for line in lines:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
# Keep unparseable lines as-is (safety)
entries.append({"_raw": line})
stripped_uuids: set[str] = set()
uuid_to_parent: dict[str, str] = {}
kept: list[dict] = []
for entry in entries:
if "_raw" in entry:
kept.append(entry)
continue
uid = entry.get("uuid", "")
parent = entry.get("parentUuid", "")
entry_type = entry.get("type", "")
if uid:
uuid_to_parent[uid] = parent
if entry_type in STRIPPABLE_TYPES:
if uid:
stripped_uuids.add(uid)
else:
kept.append(entry)
# Reparent: walk up chain through stripped entries to find surviving ancestor
for entry in kept:
if "_raw" in entry:
continue
parent = entry.get("parentUuid", "")
original_parent = parent
while parent in stripped_uuids:
parent = uuid_to_parent.get(parent, "")
if parent != original_parent:
entry["parentUuid"] = parent
result_lines: list[str] = []
for entry in kept:
if "_raw" in entry:
result_lines.append(entry["_raw"])
else:
result_lines.append(json.dumps(entry, separators=(",", ":")))
return "\n".join(result_lines) + "\n"
# ---------------------------------------------------------------------------
# Local file I/O (read from CLI's JSONL, write temp file for --resume)
# ---------------------------------------------------------------------------
def read_transcript_file(transcript_path: str) -> str | None:
@@ -46,18 +130,8 @@ def read_transcript_file(transcript_path: str) -> str | None:
json.loads(lines[0])
json.loads(lines[-1])
if len(content) > MAX_TRANSCRIPT_SIZE:
# Truncating a JSONL transcript would break the parentUuid tree
# structure that --resume relies on. Instead, return None so the
# caller falls back to the compression approach.
logger.warning(
f"[Transcript] Transcript too large ({len(content)} bytes), "
"skipping — will fall back to history compression"
)
return None
logger.info(
f"[Transcript] Captured {len(lines)} lines, "
f"[Transcript] Read {len(lines)} lines, "
f"{len(content)} bytes from {transcript_path}"
)
return content
@@ -123,3 +197,125 @@ def validate_transcript(content: str | None) -> bool:
return False
return has_user and has_assistant
# ---------------------------------------------------------------------------
# Bucket storage (GCS / local via WorkspaceStorageBackend)
# ---------------------------------------------------------------------------
def _storage_path_parts(user_id: str, session_id: str) -> tuple[str, str, str]:
"""Return (workspace_id, file_id, filename) for a session's transcript.
Path structure: ``chat-transcripts/{user_id}/{session_id}.jsonl``
"""
workspace_id = f"{TRANSCRIPT_STORAGE_PREFIX}/{user_id}"
return (workspace_id, session_id, f"{session_id}.jsonl")
def _build_storage_path(user_id: str, session_id: str, backend: object) -> str:
"""Build the full storage path string that ``retrieve()`` expects.
``store()`` returns a path like ``gcs://bucket/workspaces/...`` or
``local://workspace_id/file_id/filename``. Since we use deterministic
arguments we can reconstruct the same path for download/delete without
having stored the return value.
"""
from backend.util.workspace_storage import GCSWorkspaceStorage
wid, fid, fname = _storage_path_parts(user_id, session_id)
if isinstance(backend, GCSWorkspaceStorage):
blob = f"workspaces/{wid}/{fid}/{fname}"
return f"gcs://{backend.bucket_name}/{blob}"
else:
# LocalWorkspaceStorage returns local://{relative_path}
return f"local://{wid}/{fid}/{fname}"
async def upload_transcript(user_id: str, session_id: str, content: str) -> None:
"""Strip progress entries and upload transcript to bucket storage.
Safety: only overwrites when the new (stripped) transcript is larger than
what is already stored. Since JSONL is append-only, the latest transcript
is always the longest. This prevents a slow/stale background task from
clobbering a newer upload from a concurrent turn.
"""
from backend.util.workspace_storage import get_workspace_storage
stripped = strip_progress_entries(content)
if not validate_transcript(stripped):
logger.warning(
f"[Transcript] Skipping upload — stripped content is not a valid "
f"transcript for session {session_id}"
)
return
storage = await get_workspace_storage()
wid, fid, fname = _storage_path_parts(user_id, session_id)
new_size = len(stripped)
# Check existing transcript size to avoid overwriting newer with older
path = _build_storage_path(user_id, session_id, storage)
try:
existing = await storage.retrieve(path)
existing_size = len(existing)
if existing_size >= new_size:
logger.info(
f"[Transcript] Skipping upload — existing transcript "
f"({existing_size}B) >= new ({new_size}B) for session "
f"{session_id}"
)
return
except (FileNotFoundError, Exception):
pass # No existing transcript or retrieval error — proceed with upload
await storage.store(
workspace_id=wid,
file_id=fid,
filename=fname,
content=stripped.encode("utf-8"),
)
logger.info(
f"[Transcript] Uploaded {new_size} bytes "
f"(stripped from {len(content)}) for session {session_id}"
)
async def download_transcript(user_id: str, session_id: str) -> str | None:
"""Download transcript from bucket storage.
Returns the JSONL content string, or ``None`` if not found.
"""
from backend.util.workspace_storage import get_workspace_storage
storage = await get_workspace_storage()
path = _build_storage_path(user_id, session_id, storage)
try:
data = await storage.retrieve(path)
content = data.decode("utf-8")
logger.info(
f"[Transcript] Downloaded {len(content)} bytes for session {session_id}"
)
return content
except FileNotFoundError:
logger.debug(f"[Transcript] No transcript in storage for {session_id}")
return None
except Exception as e:
logger.warning(f"[Transcript] Failed to download transcript: {e}")
return None
async def delete_transcript(user_id: str, session_id: str) -> None:
"""Delete transcript from bucket storage (e.g. after resume failure)."""
from backend.util.workspace_storage import get_workspace_storage
storage = await get_workspace_storage()
path = _build_storage_path(user_id, session_id, storage)
try:
await storage.delete(path)
logger.info(f"[Transcript] Deleted transcript for session {session_id}")
except Exception as e:
logger.warning(f"[Transcript] Failed to delete transcript: {e}")

View File

@@ -1,3 +1,4 @@
import asyncio
import logging
from os import getenv
@@ -12,6 +13,7 @@ from .response_model import (
StreamToolOutputAvailable,
)
from .sdk import service as sdk_service
from .sdk.transcript import download_transcript
logger = logging.getLogger(__name__)
@@ -130,14 +132,22 @@ async def test_sdk_resume_multi_turn(setup_test_user, test_user_id):
assert not turn1_errors, f"Turn 1 errors: {turn1_errors}"
assert turn1_text, "Turn 1 produced no text"
# Reload session from DB and verify transcript was captured
session = await get_chat_session(session.session_id, test_user_id)
assert session, "Session not found after turn 1"
assert session.sdk_transcript, (
"sdk_transcript was not captured after turn 1 — "
# Wait for background upload task to complete (retry up to 5s)
transcript = None
for _ in range(10):
await asyncio.sleep(0.5)
transcript = await download_transcript(test_user_id, session.session_id)
if transcript:
break
assert transcript, (
"Transcript was not uploaded to bucket after turn 1 — "
"Stop hook may not have fired or transcript was too small"
)
logger.info(f"Turn 1 transcript captured: {len(session.sdk_transcript)} bytes")
logger.info(f"Turn 1 transcript uploaded: {len(transcript)} bytes")
# Reload session for turn 2
session = await get_chat_session(session.session_id, test_user_id)
assert session, "Session not found after turn 1"
# --- Turn 2: ask model to recall the keyword ---
turn2_msg = "What was the special keyword I asked you to remember?"

View File

@@ -1,2 +0,0 @@
-- AlterTable
ALTER TABLE "ChatSession" ADD COLUMN "sdkTranscript" TEXT;

View File

@@ -223,9 +223,6 @@ model ChatSession {
totalPromptTokens Int @default(0)
totalCompletionTokens Int @default(0)
// Claude Code JSONL transcript for stateless --resume across turns
sdkTranscript String? @db.Text
Messages ChatMessage[]
@@index([userId, updatedAt])

View File

@@ -4,8 +4,9 @@ import json
import os
from backend.api.features.chat.sdk.transcript import (
MAX_TRANSCRIPT_SIZE,
STRIPPABLE_TYPES,
read_transcript_file,
strip_progress_entries,
validate_transcript,
write_transcript_to_tempfile,
)
@@ -27,6 +28,12 @@ ASST_MSG = {
"parentUuid": "u1",
"message": {"role": "assistant", "content": "hello"},
}
PROGRESS_ENTRY = {
"type": "progress",
"uuid": "p1",
"parentUuid": "u1",
"data": {"type": "bash_progress", "stdout": "running..."},
}
VALID_TRANSCRIPT = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG, ASST_MSG)
@@ -59,19 +66,20 @@ class TestReadTranscriptFile:
path.write_text(content)
assert read_transcript_file(str(path)) is None
def test_returns_none_for_oversized_file(self, tmp_path):
# Build a valid transcript that exceeds MAX_TRANSCRIPT_SIZE
big_content = {"type": "user", "data": "x" * (MAX_TRANSCRIPT_SIZE + 100)}
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG)
path = tmp_path / "big.jsonl"
path.write_text(content)
assert read_transcript_file(str(path)) is None
def test_returns_none_for_invalid_json(self, tmp_path):
path = tmp_path / "bad.jsonl"
path.write_text("not json\n{}\n{}\n")
assert read_transcript_file(str(path)) is None
def test_no_size_limit(self, tmp_path):
"""Large files are accepted — bucket storage has no size limit."""
big_content = {"type": "user", "uuid": "u9", "data": "x" * 1_000_000}
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG)
path = tmp_path / "big.jsonl"
path.write_text(content)
result = read_transcript_file(str(path))
assert result is not None
# --- write_transcript_to_tempfile ---
@@ -126,3 +134,96 @@ class TestValidateTranscript:
def test_invalid_json_returns_false(self):
assert validate_transcript("not json\n{}\n{}\n") is False
# --- strip_progress_entries ---
class TestStripProgressEntries:
def test_strips_all_strippable_types(self):
"""All STRIPPABLE_TYPES are removed from the output."""
entries = [
USER_MSG,
{"type": "progress", "uuid": "p1", "parentUuid": "u1"},
{"type": "file-history-snapshot", "files": []},
{"type": "queue-operation", "subtype": "create"},
{"type": "summary", "text": "..."},
{"type": "pr-link", "url": "..."},
ASST_MSG,
]
result = strip_progress_entries(_make_jsonl(*entries))
result_types = {json.loads(line)["type"] for line in result.strip().split("\n")}
assert result_types == {"user", "assistant"}
for stype in STRIPPABLE_TYPES:
assert stype not in result_types
def test_reparents_children_of_stripped_entries(self):
"""An assistant message whose parent is a progress entry gets reparented."""
progress = {
"type": "progress",
"uuid": "p1",
"parentUuid": "u1",
"data": {"type": "bash_progress"},
}
asst = {
"type": "assistant",
"uuid": "a1",
"parentUuid": "p1", # Points to progress
"message": {"role": "assistant", "content": "done"},
}
content = _make_jsonl(USER_MSG, progress, asst)
result = strip_progress_entries(content)
lines = [json.loads(line) for line in result.strip().split("\n")]
asst_entry = next(e for e in lines if e["type"] == "assistant")
# Should be reparented to u1 (the user message)
assert asst_entry["parentUuid"] == "u1"
def test_reparents_through_chain(self):
"""Reparenting walks through multiple stripped entries."""
p1 = {"type": "progress", "uuid": "p1", "parentUuid": "u1"}
p2 = {"type": "progress", "uuid": "p2", "parentUuid": "p1"}
p3 = {"type": "progress", "uuid": "p3", "parentUuid": "p2"}
asst = {
"type": "assistant",
"uuid": "a1",
"parentUuid": "p3", # 3 levels deep
"message": {"role": "assistant", "content": "done"},
}
content = _make_jsonl(USER_MSG, p1, p2, p3, asst)
result = strip_progress_entries(content)
lines = [json.loads(line) for line in result.strip().split("\n")]
asst_entry = next(e for e in lines if e["type"] == "assistant")
assert asst_entry["parentUuid"] == "u1"
def test_preserves_non_strippable_entries(self):
"""User, assistant, and system entries are preserved."""
system = {"type": "system", "uuid": "s1", "message": "prompt"}
content = _make_jsonl(system, USER_MSG, ASST_MSG)
result = strip_progress_entries(content)
result_types = [json.loads(line)["type"] for line in result.strip().split("\n")]
assert result_types == ["system", "user", "assistant"]
def test_empty_input(self):
result = strip_progress_entries("")
# Should return just a newline (empty content stripped)
assert result.strip() == ""
def test_no_strippable_entries(self):
"""When there's nothing to strip, output matches input structure."""
content = _make_jsonl(USER_MSG, ASST_MSG)
result = strip_progress_entries(content)
result_lines = result.strip().split("\n")
assert len(result_lines) == 2
def test_handles_entries_without_uuid(self):
"""Entries without uuid field are handled gracefully."""
no_uuid = {"type": "queue-operation", "subtype": "create"}
content = _make_jsonl(no_uuid, USER_MSG, ASST_MSG)
result = strip_progress_entries(content)
result_types = [json.loads(line)["type"] for line in result.strip().split("\n")]
# queue-operation is strippable
assert "queue-operation" not in result_types
assert "user" in result_types
assert "assistant" in result_types