feat(chat/sdk): add stateless multi-turn resume via JSONL transcripts

Capture Claude Code CLI session transcripts via the Stop hook and persist
them in the DB. On subsequent turns, write the transcript to a temp file
and pass --resume so the CLI restores full conversation context without
lossy history compression.

Key changes:
- transcript.py: read/write/validate JSONL transcript utilities
- security_hooks: register Stop hook to capture transcript_path
- service.py: resume strategy with fallback to compression
- schema.prisma: add sdkTranscript column to ChatSession
- Feature flag: CLAUDE_AGENT_USE_RESUME (default off)
This commit is contained in:
Zamil Majdy
2026-02-13 13:48:04 +04:00
parent 32e9dda30d
commit b915e67a9b
10 changed files with 466 additions and 14 deletions

View File

@@ -111,6 +111,11 @@ class ChatConfig(BaseSettings):
default=10,
description="Max number of sub-agent Tasks the SDK can spawn per session.",
)
claude_agent_use_resume: bool = Field(
default=False,
description="Use --resume for multi-turn conversations instead of "
"history compression. Falls back to compression when unavailable.",
)
# Extended thinking configuration for Claude models
thinking_enabled: bool = Field(

View File

@@ -56,6 +56,7 @@ async def update_chat_session(
total_prompt_tokens: int | None = None,
total_completion_tokens: int | None = None,
title: str | None = None,
sdk_transcript: str | None = None,
) -> PrismaChatSession | None:
"""Update a chat session's metadata."""
data: ChatSessionUpdateInput = {"updatedAt": datetime.now(UTC)}
@@ -72,6 +73,8 @@ async def update_chat_session(
data["totalCompletionTokens"] = total_completion_tokens
if title is not None:
data["title"] = title
if sdk_transcript is not None:
data["sdkTranscript"] = sdk_transcript
session = await PrismaChatSession.prisma().update(
where={"id": session_id},

View File

@@ -103,6 +103,7 @@ class ChatSession(BaseModel):
updated_at: datetime
successful_agent_runs: dict[str, int] = {}
successful_agent_schedules: dict[str, int] = {}
sdk_transcript: str | None = None # Claude Code JSONL transcript for --resume
def add_tool_call_to_current_turn(self, tool_call: dict) -> None:
"""Attach a tool_call to the current turn's assistant message.
@@ -190,6 +191,7 @@ class ChatSession(BaseModel):
updated_at=prisma_session.updatedAt,
successful_agent_runs=successful_agent_runs,
successful_agent_schedules=successful_agent_schedules,
sdk_transcript=getattr(prisma_session, "sdkTranscript", None),
)
@staticmethod
@@ -412,6 +414,7 @@ async def _save_session_to_db(
successful_agent_schedules=session.successful_agent_schedules,
total_prompt_tokens=total_prompt,
total_completion_tokens=total_completion,
sdk_transcript=session.sdk_transcript,
)
# Add new messages (only those after existing count)

View File

@@ -8,6 +8,7 @@ import json
import logging
import os
import re
from collections.abc import Callable
from typing import Any, cast
from backend.api.features.chat.sdk.tool_adapter import MCP_TOOL_PREFIX
@@ -173,6 +174,7 @@ def create_security_hooks(
user_id: str | None,
sdk_cwd: str | None = None,
max_subtasks: int = 3,
on_stop: Callable[[str, str], None] | None = None,
) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
@@ -181,11 +183,15 @@ def create_security_hooks(
- PostToolUse: Log successful tool executions
- PostToolUseFailure: Log and handle failed tool executions
- PreCompact: Log context compaction events (SDK handles compaction automatically)
- Stop: Capture transcript path for stateless resume (when *on_stop* is provided)
Args:
user_id: Current user ID for isolation validation
sdk_cwd: SDK working directory for workspace-scoped tool validation
max_subtasks: Maximum Task (sub-agent) spawns allowed per session
on_stop: Callback ``(transcript_path, sdk_session_id)`` invoked when
the SDK finishes processing — used to read the JSONL transcript
before the CLI process exits.
Returns:
Hooks configuration dict for ClaudeAgentOptions
@@ -285,7 +291,31 @@ def create_security_hooks(
)
return cast(SyncHookJSONOutput, {})
return {
# --- Stop hook: capture transcript path for stateless resume ---
async def stop_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Capture transcript path when SDK finishes processing.
The Stop hook fires while the CLI process is still alive, giving us
a reliable window to read the JSONL transcript before SIGTERM.
"""
_ = context, tool_use_id
transcript_path = cast(str, input_data.get("transcript_path", ""))
sdk_session_id = cast(str, input_data.get("session_id", ""))
if transcript_path and on_stop:
logger.info(
f"[SDK] Stop hook: transcript_path={transcript_path}, "
f"sdk_session_id={sdk_session_id[:12]}..."
)
on_stop(transcript_path, sdk_session_id)
return cast(SyncHookJSONOutput, {})
hooks: dict[str, Any] = {
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
"PostToolUseFailure": [
@@ -293,6 +323,11 @@ def create_security_hooks(
],
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
}
if on_stop is not None:
hooks["Stop"] = [HookMatcher(matcher=None, hooks=[stop_hook])]
return hooks
except ImportError:
# Fallback for when SDK isn't available - return empty hooks
logger.warning("claude-agent-sdk not available, security hooks disabled")

View File

@@ -464,6 +464,7 @@ async def stream_chat_completion_sdk(
# Initialise sdk_cwd before the try so the finally can reference it
# even if _make_sdk_cwd raises (in that case it stays as "").
sdk_cwd = ""
use_resume = False
try:
# Use a session-specific temp dir to avoid cleanup race conditions
@@ -479,6 +480,12 @@ async def stream_chat_completion_sdk(
try:
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
from .transcript import (
read_transcript_file,
validate_transcript,
write_transcript_to_tempfile,
)
# Fail fast when no API credentials are available at all
sdk_env = _build_sdk_env()
if not sdk_env and not os.environ.get("ANTHROPIC_API_KEY"):
@@ -492,22 +499,55 @@ async def stream_chat_completion_sdk(
sdk_model = _resolve_sdk_model()
# --- Transcript capture via Stop hook ---
captured_transcript: dict[str, str] = {}
def _on_stop(transcript_path: str, sdk_session_id: str) -> None:
captured_transcript["path"] = transcript_path
captured_transcript["session_id"] = sdk_session_id
security_hooks = create_security_hooks(
user_id,
sdk_cwd=sdk_cwd,
max_subtasks=config.claude_agent_max_subtasks,
on_stop=_on_stop if config.claude_agent_use_resume else None,
)
options = ClaudeAgentOptions(
system_prompt=system_prompt,
mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type]
allowed_tools=COPILOT_TOOL_NAMES,
hooks=security_hooks, # type: ignore[arg-type]
cwd=sdk_cwd,
max_buffer_size=config.claude_agent_max_buffer_size,
# Only pass model/env when OpenRouter is configured
**({"model": sdk_model, "env": sdk_env} if sdk_env else {}),
)
# --- Resume strategy: try JSONL resume, fall back to compression ---
resume_file: str | None = None
use_resume = False
if (
config.claude_agent_use_resume
and session.sdk_transcript
and len(session.messages) > 1
and validate_transcript(session.sdk_transcript)
):
resume_file = write_transcript_to_tempfile(
session.sdk_transcript, session_id, sdk_cwd
)
if resume_file:
use_resume = True
logger.info(
f"[SDK] Using --resume with transcript "
f"({len(session.sdk_transcript)} bytes)"
)
sdk_options_kwargs: dict[str, Any] = {
"system_prompt": system_prompt,
"mcp_servers": {"copilot": mcp_server},
"allowed_tools": COPILOT_TOOL_NAMES,
"hooks": security_hooks,
"cwd": sdk_cwd,
"max_buffer_size": config.claude_agent_max_buffer_size,
}
if sdk_env:
sdk_options_kwargs["model"] = sdk_model
sdk_options_kwargs["env"] = sdk_env
if use_resume and resume_file:
sdk_options_kwargs["resume"] = resume_file
options = ClaudeAgentOptions(**sdk_options_kwargs) # type: ignore[arg-type]
adapter = SDKResponseAdapter(message_id=message_id)
adapter.set_task_id(task_id)
@@ -527,10 +567,11 @@ async def stream_chat_completion_sdk(
yield StreamFinish()
return
# Build query with conversation history context.
# Compress history first to handle long conversations.
# Build query: with --resume the CLI already has full context,
# so we only send the new message. Without resume, compress
# history into a context prefix as before.
query_message = current_message
if len(session.messages) > 1:
if not use_resume and len(session.messages) > 1:
compressed = await _compress_conversation_history(session)
history_context = _format_conversation_context(compressed)
if history_context:
@@ -623,6 +664,24 @@ async def stream_chat_completion_sdk(
) and not has_appended_assistant:
session.messages.append(assistant_response)
# --- Capture transcript while CLI is still alive ---
# Must happen INSIDE async with: close() sends SIGTERM
# which kills the CLI before it can flush the JSONL.
if config.claude_agent_use_resume and captured_transcript.get("path"):
# Give CLI time to flush JSONL writes before we read
await asyncio.sleep(0.5)
transcript_content = read_transcript_file(
captured_transcript["path"]
)
if transcript_content:
session.sdk_transcript = transcript_content
logger.info(
f"[SDK] Captured transcript: "
f"{len(transcript_content)} bytes"
)
else:
logger.debug("[SDK] Stop hook fired but transcript not usable")
except ImportError:
raise RuntimeError(
"claude-agent-sdk is not installed. "
@@ -639,6 +698,9 @@ async def stream_chat_completion_sdk(
except Exception as e:
logger.error(f"[SDK] Error: {e}", exc_info=True)
if use_resume:
session.sdk_transcript = None
logger.warning("[SDK] Clearing transcript after resume failure")
try:
await upsert_chat_session(session)
except Exception as save_err:

View File

@@ -0,0 +1,125 @@
"""JSONL transcript management for stateless multi-turn resume.
The Claude Code CLI persists conversations as JSONL files (one JSON object per
line). When the SDK's ``Stop`` hook fires we read this file, store it in the DB,
and on the next turn write it back to a temp file + pass ``--resume`` so the CLI
can reconstruct the full conversation without lossy history compression.
"""
import json
import logging
import os
logger = logging.getLogger(__name__)
# Safety limit — large transcripts are truncated to keep DB writes reasonable.
MAX_TRANSCRIPT_SIZE = 512 * 1024 # 512 KB
def read_transcript_file(transcript_path: str) -> str | None:
"""Read a JSONL transcript file from disk.
Returns the raw JSONL content, or ``None`` if the file is missing, empty,
or only contains metadata (≤2 lines with no conversation messages).
"""
if not transcript_path or not os.path.isfile(transcript_path):
logger.debug(f"[Transcript] File not found: {transcript_path}")
return None
try:
with open(transcript_path) as f:
content = f.read()
if not content.strip():
logger.debug(f"[Transcript] Empty file: {transcript_path}")
return None
lines = content.strip().split("\n")
if len(lines) < 3:
# Metadata-only files have ≤2 lines (queue-operation + file-history-snapshot).
logger.debug(
f"[Transcript] Too few lines ({len(lines)}): {transcript_path}"
)
return None
# Quick structural validation — parse first and last lines.
json.loads(lines[0])
json.loads(lines[-1])
if len(content) > MAX_TRANSCRIPT_SIZE:
# Truncating a JSONL transcript would break the parentUuid tree
# structure that --resume relies on. Instead, return None so the
# caller falls back to the compression approach.
logger.warning(
f"[Transcript] Transcript too large ({len(content)} bytes), "
"skipping — will fall back to history compression"
)
return None
logger.info(
f"[Transcript] Captured {len(lines)} lines, "
f"{len(content)} bytes from {transcript_path}"
)
return content
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"[Transcript] Failed to read {transcript_path}: {e}")
return None
def write_transcript_to_tempfile(
transcript_content: str,
session_id: str,
cwd: str,
) -> str | None:
"""Write JSONL transcript to a temp file inside *cwd* for ``--resume``.
The file lives in the session working directory so it is cleaned up
automatically when the session ends.
Returns the absolute path to the file, or ``None`` on failure.
"""
try:
os.makedirs(cwd, exist_ok=True)
jsonl_path = os.path.join(cwd, f"transcript-{session_id[:8]}.jsonl")
with open(jsonl_path, "w") as f:
f.write(transcript_content)
logger.info(f"[Transcript] Wrote resume file: {jsonl_path}")
return jsonl_path
except OSError as e:
logger.warning(f"[Transcript] Failed to write resume file: {e}")
return None
def validate_transcript(content: str | None) -> bool:
"""Check that a transcript has actual conversation messages.
A valid transcript for resume needs at least one user message and one
assistant message (not just queue-operation / file-history-snapshot
metadata).
"""
if not content or not content.strip():
return False
lines = content.strip().split("\n")
if len(lines) < 3:
return False
has_user = False
has_assistant = False
for line in lines:
try:
entry = json.loads(line)
msg_type = entry.get("type")
if msg_type == "user":
has_user = True
elif msg_type == "assistant":
has_assistant = True
except json.JSONDecodeError:
return False
return has_user and has_assistant

View File

@@ -11,6 +11,7 @@ from .response_model import (
StreamTextDelta,
StreamToolOutputAvailable,
)
from .sdk import service as sdk_service
logger = logging.getLogger(__name__)
@@ -80,3 +81,88 @@ async def test_stream_chat_completion_with_tool_calls(setup_test_user, test_user
session = await get_chat_session(session.session_id)
assert session, "Session not found"
assert session.usage, "Usage is empty"
@pytest.mark.asyncio(loop_scope="session")
async def test_sdk_resume_multi_turn(setup_test_user, test_user_id):
"""Test that the SDK --resume path captures and uses transcripts across turns.
Turn 1: Send a message containing a unique keyword.
Turn 2: Ask the model to recall that keyword — proving the transcript was
persisted and restored via --resume.
"""
api_key: str | None = getenv("OPEN_ROUTER_API_KEY")
if not api_key:
return pytest.skip("OPEN_ROUTER_API_KEY is not set, skipping test")
from .config import ChatConfig
cfg = ChatConfig()
if not cfg.claude_agent_use_resume:
return pytest.skip("CLAUDE_AGENT_USE_RESUME is not enabled, skipping test")
session = await create_chat_session(test_user_id)
session = await upsert_chat_session(session)
# --- Turn 1: send a message with a unique keyword ---
keyword = "ZEPHYR42"
turn1_msg = (
f"Please remember this special keyword: {keyword}. "
"Just confirm you've noted it, keep your response brief."
)
turn1_text = ""
turn1_errors: list[str] = []
turn1_ended = False
async for chunk in sdk_service.stream_chat_completion_sdk(
session.session_id,
turn1_msg,
user_id=test_user_id,
):
if isinstance(chunk, StreamTextDelta):
turn1_text += chunk.delta
elif isinstance(chunk, StreamError):
turn1_errors.append(chunk.errorText)
elif isinstance(chunk, StreamFinish):
turn1_ended = True
assert turn1_ended, "Turn 1 did not finish"
assert not turn1_errors, f"Turn 1 errors: {turn1_errors}"
assert turn1_text, "Turn 1 produced no text"
# Reload session from DB and verify transcript was captured
session = await get_chat_session(session.session_id, test_user_id)
assert session, "Session not found after turn 1"
assert session.sdk_transcript, (
"sdk_transcript was not captured after turn 1 — "
"Stop hook may not have fired or transcript was too small"
)
logger.info(f"Turn 1 transcript captured: {len(session.sdk_transcript)} bytes")
# --- Turn 2: ask model to recall the keyword ---
turn2_msg = "What was the special keyword I asked you to remember?"
turn2_text = ""
turn2_errors: list[str] = []
turn2_ended = False
async for chunk in sdk_service.stream_chat_completion_sdk(
session.session_id,
turn2_msg,
user_id=test_user_id,
session=session,
):
if isinstance(chunk, StreamTextDelta):
turn2_text += chunk.delta
elif isinstance(chunk, StreamError):
turn2_errors.append(chunk.errorText)
elif isinstance(chunk, StreamFinish):
turn2_ended = True
assert turn2_ended, "Turn 2 did not finish"
assert not turn2_errors, f"Turn 2 errors: {turn2_errors}"
assert turn2_text, "Turn 2 produced no text"
assert keyword in turn2_text, (
f"Model did not recall keyword '{keyword}' in turn 2. "
f"Response: {turn2_text[:200]}"
)
logger.info(f"Turn 2 recalled keyword successfully: {turn2_text[:100]}")

View File

@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "ChatSession" ADD COLUMN "sdkTranscript" TEXT;

View File

@@ -223,6 +223,9 @@ model ChatSession {
totalPromptTokens Int @default(0)
totalCompletionTokens Int @default(0)
// Claude Code JSONL transcript for stateless --resume across turns
sdkTranscript String? @db.Text
Messages ChatMessage[]
@@index([userId, updatedAt])

View File

@@ -0,0 +1,128 @@
"""Unit tests for JSONL transcript management utilities."""
import json
import os
from backend.api.features.chat.sdk.transcript import (
MAX_TRANSCRIPT_SIZE,
read_transcript_file,
validate_transcript,
write_transcript_to_tempfile,
)
def _make_jsonl(*entries: dict) -> str:
return "\n".join(json.dumps(e) for e in entries) + "\n"
# --- Fixtures ---
METADATA_LINE = {"type": "queue-operation", "subtype": "create"}
FILE_HISTORY = {"type": "file-history-snapshot", "files": []}
USER_MSG = {"type": "user", "uuid": "u1", "message": {"role": "user", "content": "hi"}}
ASST_MSG = {
"type": "assistant",
"uuid": "a1",
"parentUuid": "u1",
"message": {"role": "assistant", "content": "hello"},
}
VALID_TRANSCRIPT = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG, ASST_MSG)
# --- read_transcript_file ---
class TestReadTranscriptFile:
def test_returns_content_for_valid_file(self, tmp_path):
path = tmp_path / "session.jsonl"
path.write_text(VALID_TRANSCRIPT)
result = read_transcript_file(str(path))
assert result is not None
assert "user" in result
def test_returns_none_for_missing_file(self):
assert read_transcript_file("/nonexistent/path.jsonl") is None
def test_returns_none_for_empty_path(self):
assert read_transcript_file("") is None
def test_returns_none_for_empty_file(self, tmp_path):
path = tmp_path / "empty.jsonl"
path.write_text("")
assert read_transcript_file(str(path)) is None
def test_returns_none_for_metadata_only(self, tmp_path):
content = _make_jsonl(METADATA_LINE, FILE_HISTORY)
path = tmp_path / "meta.jsonl"
path.write_text(content)
assert read_transcript_file(str(path)) is None
def test_returns_none_for_oversized_file(self, tmp_path):
# Build a valid transcript that exceeds MAX_TRANSCRIPT_SIZE
big_content = {"type": "user", "data": "x" * (MAX_TRANSCRIPT_SIZE + 100)}
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG)
path = tmp_path / "big.jsonl"
path.write_text(content)
assert read_transcript_file(str(path)) is None
def test_returns_none_for_invalid_json(self, tmp_path):
path = tmp_path / "bad.jsonl"
path.write_text("not json\n{}\n{}\n")
assert read_transcript_file(str(path)) is None
# --- write_transcript_to_tempfile ---
class TestWriteTranscriptToTempfile:
def test_writes_file_and_returns_path(self, tmp_path):
cwd = str(tmp_path / "workspace")
result = write_transcript_to_tempfile(VALID_TRANSCRIPT, "sess-1234-abcd", cwd)
assert result is not None
assert os.path.isfile(result)
assert result.endswith(".jsonl")
with open(result) as f:
assert f.read() == VALID_TRANSCRIPT
def test_creates_parent_directory(self, tmp_path):
cwd = str(tmp_path / "new" / "dir")
result = write_transcript_to_tempfile(VALID_TRANSCRIPT, "sess-1234", cwd)
assert result is not None
assert os.path.isdir(cwd)
def test_uses_session_id_prefix(self, tmp_path):
cwd = str(tmp_path)
result = write_transcript_to_tempfile(VALID_TRANSCRIPT, "abcdef12-rest", cwd)
assert result is not None
assert "abcdef12" in os.path.basename(result)
# --- validate_transcript ---
class TestValidateTranscript:
def test_valid_transcript(self):
assert validate_transcript(VALID_TRANSCRIPT) is True
def test_none_content(self):
assert validate_transcript(None) is False
def test_empty_content(self):
assert validate_transcript("") is False
def test_metadata_only(self):
content = _make_jsonl(METADATA_LINE, FILE_HISTORY)
assert validate_transcript(content) is False
def test_user_only_no_assistant(self):
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG)
assert validate_transcript(content) is False
def test_assistant_only_no_user(self):
content = _make_jsonl(METADATA_LINE, FILE_HISTORY, ASST_MSG)
assert validate_transcript(content) is False
def test_invalid_json_returns_false(self):
assert validate_transcript("not json\n{}\n{}\n") is False