diff --git a/autogpt_platform/backend/backend/api/features/chat/config.py b/autogpt_platform/backend/backend/api/features/chat/config.py index a209436a51..7daecc7053 100644 --- a/autogpt_platform/backend/backend/api/features/chat/config.py +++ b/autogpt_platform/backend/backend/api/features/chat/config.py @@ -111,6 +111,11 @@ class ChatConfig(BaseSettings): default=10, description="Max number of sub-agent Tasks the SDK can spawn per session.", ) + claude_agent_use_resume: bool = Field( + default=False, + description="Use --resume for multi-turn conversations instead of " + "history compression. Falls back to compression when unavailable.", + ) # Extended thinking configuration for Claude models thinking_enabled: bool = Field( diff --git a/autogpt_platform/backend/backend/api/features/chat/db.py b/autogpt_platform/backend/backend/api/features/chat/db.py index 303ea0a698..2529e873d1 100644 --- a/autogpt_platform/backend/backend/api/features/chat/db.py +++ b/autogpt_platform/backend/backend/api/features/chat/db.py @@ -56,6 +56,7 @@ async def update_chat_session( total_prompt_tokens: int | None = None, total_completion_tokens: int | None = None, title: str | None = None, + sdk_transcript: str | None = None, ) -> PrismaChatSession | None: """Update a chat session's metadata.""" data: ChatSessionUpdateInput = {"updatedAt": datetime.now(UTC)} @@ -72,6 +73,8 @@ async def update_chat_session( data["totalCompletionTokens"] = total_completion_tokens if title is not None: data["title"] = title + if sdk_transcript is not None: + data["sdkTranscript"] = sdk_transcript session = await PrismaChatSession.prisma().update( where={"id": session_id}, diff --git a/autogpt_platform/backend/backend/api/features/chat/model.py b/autogpt_platform/backend/backend/api/features/chat/model.py index 30ac27aece..4225416a2d 100644 --- a/autogpt_platform/backend/backend/api/features/chat/model.py +++ b/autogpt_platform/backend/backend/api/features/chat/model.py @@ -103,6 +103,7 @@ class ChatSession(BaseModel): updated_at: datetime successful_agent_runs: dict[str, int] = {} successful_agent_schedules: dict[str, int] = {} + sdk_transcript: str | None = None # Claude Code JSONL transcript for --resume def add_tool_call_to_current_turn(self, tool_call: dict) -> None: """Attach a tool_call to the current turn's assistant message. @@ -190,6 +191,7 @@ class ChatSession(BaseModel): updated_at=prisma_session.updatedAt, successful_agent_runs=successful_agent_runs, successful_agent_schedules=successful_agent_schedules, + sdk_transcript=getattr(prisma_session, "sdkTranscript", None), ) @staticmethod @@ -412,6 +414,7 @@ async def _save_session_to_db( successful_agent_schedules=session.successful_agent_schedules, total_prompt_tokens=total_prompt, total_completion_tokens=total_completion, + sdk_transcript=session.sdk_transcript, ) # Add new messages (only those after existing count) diff --git a/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks.py b/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks.py index 22c6cd5670..cf426d1008 100644 --- a/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks.py +++ b/autogpt_platform/backend/backend/api/features/chat/sdk/security_hooks.py @@ -8,6 +8,7 @@ import json import logging import os import re +from collections.abc import Callable from typing import Any, cast from backend.api.features.chat.sdk.tool_adapter import MCP_TOOL_PREFIX @@ -173,6 +174,7 @@ def create_security_hooks( user_id: str | None, sdk_cwd: str | None = None, max_subtasks: int = 3, + on_stop: Callable[[str, str], None] | None = None, ) -> dict[str, Any]: """Create the security hooks configuration for Claude Agent SDK. @@ -181,11 +183,15 @@ def create_security_hooks( - PostToolUse: Log successful tool executions - PostToolUseFailure: Log and handle failed tool executions - PreCompact: Log context compaction events (SDK handles compaction automatically) + - Stop: Capture transcript path for stateless resume (when *on_stop* is provided) Args: user_id: Current user ID for isolation validation sdk_cwd: SDK working directory for workspace-scoped tool validation max_subtasks: Maximum Task (sub-agent) spawns allowed per session + on_stop: Callback ``(transcript_path, sdk_session_id)`` invoked when + the SDK finishes processing — used to read the JSONL transcript + before the CLI process exits. Returns: Hooks configuration dict for ClaudeAgentOptions @@ -285,7 +291,31 @@ def create_security_hooks( ) return cast(SyncHookJSONOutput, {}) - return { + # --- Stop hook: capture transcript path for stateless resume --- + async def stop_hook( + input_data: HookInput, + tool_use_id: str | None, + context: HookContext, + ) -> SyncHookJSONOutput: + """Capture transcript path when SDK finishes processing. + + The Stop hook fires while the CLI process is still alive, giving us + a reliable window to read the JSONL transcript before SIGTERM. + """ + _ = context, tool_use_id + transcript_path = cast(str, input_data.get("transcript_path", "")) + sdk_session_id = cast(str, input_data.get("session_id", "")) + + if transcript_path and on_stop: + logger.info( + f"[SDK] Stop hook: transcript_path={transcript_path}, " + f"sdk_session_id={sdk_session_id[:12]}..." + ) + on_stop(transcript_path, sdk_session_id) + + return cast(SyncHookJSONOutput, {}) + + hooks: dict[str, Any] = { "PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])], "PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])], "PostToolUseFailure": [ @@ -293,6 +323,11 @@ def create_security_hooks( ], "PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])], } + + if on_stop is not None: + hooks["Stop"] = [HookMatcher(matcher=None, hooks=[stop_hook])] + + return hooks except ImportError: # Fallback for when SDK isn't available - return empty hooks logger.warning("claude-agent-sdk not available, security hooks disabled") diff --git a/autogpt_platform/backend/backend/api/features/chat/sdk/service.py b/autogpt_platform/backend/backend/api/features/chat/sdk/service.py index d760d8562a..491fe9d710 100644 --- a/autogpt_platform/backend/backend/api/features/chat/sdk/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/sdk/service.py @@ -464,6 +464,7 @@ async def stream_chat_completion_sdk( # Initialise sdk_cwd before the try so the finally can reference it # even if _make_sdk_cwd raises (in that case it stays as ""). sdk_cwd = "" + use_resume = False try: # Use a session-specific temp dir to avoid cleanup race conditions @@ -479,6 +480,12 @@ async def stream_chat_completion_sdk( try: from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient + from .transcript import ( + read_transcript_file, + validate_transcript, + write_transcript_to_tempfile, + ) + # Fail fast when no API credentials are available at all sdk_env = _build_sdk_env() if not sdk_env and not os.environ.get("ANTHROPIC_API_KEY"): @@ -492,22 +499,55 @@ async def stream_chat_completion_sdk( sdk_model = _resolve_sdk_model() + # --- Transcript capture via Stop hook --- + captured_transcript: dict[str, str] = {} + + def _on_stop(transcript_path: str, sdk_session_id: str) -> None: + captured_transcript["path"] = transcript_path + captured_transcript["session_id"] = sdk_session_id + security_hooks = create_security_hooks( user_id, sdk_cwd=sdk_cwd, max_subtasks=config.claude_agent_max_subtasks, + on_stop=_on_stop if config.claude_agent_use_resume else None, ) - options = ClaudeAgentOptions( - system_prompt=system_prompt, - mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type] - allowed_tools=COPILOT_TOOL_NAMES, - hooks=security_hooks, # type: ignore[arg-type] - cwd=sdk_cwd, - max_buffer_size=config.claude_agent_max_buffer_size, - # Only pass model/env when OpenRouter is configured - **({"model": sdk_model, "env": sdk_env} if sdk_env else {}), - ) + # --- Resume strategy: try JSONL resume, fall back to compression --- + resume_file: str | None = None + use_resume = False + + if ( + config.claude_agent_use_resume + and session.sdk_transcript + and len(session.messages) > 1 + and validate_transcript(session.sdk_transcript) + ): + resume_file = write_transcript_to_tempfile( + session.sdk_transcript, session_id, sdk_cwd + ) + if resume_file: + use_resume = True + logger.info( + f"[SDK] Using --resume with transcript " + f"({len(session.sdk_transcript)} bytes)" + ) + + sdk_options_kwargs: dict[str, Any] = { + "system_prompt": system_prompt, + "mcp_servers": {"copilot": mcp_server}, + "allowed_tools": COPILOT_TOOL_NAMES, + "hooks": security_hooks, + "cwd": sdk_cwd, + "max_buffer_size": config.claude_agent_max_buffer_size, + } + if sdk_env: + sdk_options_kwargs["model"] = sdk_model + sdk_options_kwargs["env"] = sdk_env + if use_resume and resume_file: + sdk_options_kwargs["resume"] = resume_file + + options = ClaudeAgentOptions(**sdk_options_kwargs) # type: ignore[arg-type] adapter = SDKResponseAdapter(message_id=message_id) adapter.set_task_id(task_id) @@ -527,10 +567,11 @@ async def stream_chat_completion_sdk( yield StreamFinish() return - # Build query with conversation history context. - # Compress history first to handle long conversations. + # Build query: with --resume the CLI already has full context, + # so we only send the new message. Without resume, compress + # history into a context prefix as before. query_message = current_message - if len(session.messages) > 1: + if not use_resume and len(session.messages) > 1: compressed = await _compress_conversation_history(session) history_context = _format_conversation_context(compressed) if history_context: @@ -623,6 +664,24 @@ async def stream_chat_completion_sdk( ) and not has_appended_assistant: session.messages.append(assistant_response) + # --- Capture transcript while CLI is still alive --- + # Must happen INSIDE async with: close() sends SIGTERM + # which kills the CLI before it can flush the JSONL. + if config.claude_agent_use_resume and captured_transcript.get("path"): + # Give CLI time to flush JSONL writes before we read + await asyncio.sleep(0.5) + transcript_content = read_transcript_file( + captured_transcript["path"] + ) + if transcript_content: + session.sdk_transcript = transcript_content + logger.info( + f"[SDK] Captured transcript: " + f"{len(transcript_content)} bytes" + ) + else: + logger.debug("[SDK] Stop hook fired but transcript not usable") + except ImportError: raise RuntimeError( "claude-agent-sdk is not installed. " @@ -639,6 +698,9 @@ async def stream_chat_completion_sdk( except Exception as e: logger.error(f"[SDK] Error: {e}", exc_info=True) + if use_resume: + session.sdk_transcript = None + logger.warning("[SDK] Clearing transcript after resume failure") try: await upsert_chat_session(session) except Exception as save_err: diff --git a/autogpt_platform/backend/backend/api/features/chat/sdk/transcript.py b/autogpt_platform/backend/backend/api/features/chat/sdk/transcript.py new file mode 100644 index 0000000000..107f6f3b2d --- /dev/null +++ b/autogpt_platform/backend/backend/api/features/chat/sdk/transcript.py @@ -0,0 +1,125 @@ +"""JSONL transcript management for stateless multi-turn resume. + +The Claude Code CLI persists conversations as JSONL files (one JSON object per +line). When the SDK's ``Stop`` hook fires we read this file, store it in the DB, +and on the next turn write it back to a temp file + pass ``--resume`` so the CLI +can reconstruct the full conversation without lossy history compression. +""" + +import json +import logging +import os + +logger = logging.getLogger(__name__) + +# Safety limit — large transcripts are truncated to keep DB writes reasonable. +MAX_TRANSCRIPT_SIZE = 512 * 1024 # 512 KB + + +def read_transcript_file(transcript_path: str) -> str | None: + """Read a JSONL transcript file from disk. + + Returns the raw JSONL content, or ``None`` if the file is missing, empty, + or only contains metadata (≤2 lines with no conversation messages). + """ + if not transcript_path or not os.path.isfile(transcript_path): + logger.debug(f"[Transcript] File not found: {transcript_path}") + return None + + try: + with open(transcript_path) as f: + content = f.read() + + if not content.strip(): + logger.debug(f"[Transcript] Empty file: {transcript_path}") + return None + + lines = content.strip().split("\n") + if len(lines) < 3: + # Metadata-only files have ≤2 lines (queue-operation + file-history-snapshot). + logger.debug( + f"[Transcript] Too few lines ({len(lines)}): {transcript_path}" + ) + return None + + # Quick structural validation — parse first and last lines. + json.loads(lines[0]) + json.loads(lines[-1]) + + if len(content) > MAX_TRANSCRIPT_SIZE: + # Truncating a JSONL transcript would break the parentUuid tree + # structure that --resume relies on. Instead, return None so the + # caller falls back to the compression approach. + logger.warning( + f"[Transcript] Transcript too large ({len(content)} bytes), " + "skipping — will fall back to history compression" + ) + return None + + logger.info( + f"[Transcript] Captured {len(lines)} lines, " + f"{len(content)} bytes from {transcript_path}" + ) + return content + + except (json.JSONDecodeError, OSError) as e: + logger.warning(f"[Transcript] Failed to read {transcript_path}: {e}") + return None + + +def write_transcript_to_tempfile( + transcript_content: str, + session_id: str, + cwd: str, +) -> str | None: + """Write JSONL transcript to a temp file inside *cwd* for ``--resume``. + + The file lives in the session working directory so it is cleaned up + automatically when the session ends. + + Returns the absolute path to the file, or ``None`` on failure. + """ + try: + os.makedirs(cwd, exist_ok=True) + jsonl_path = os.path.join(cwd, f"transcript-{session_id[:8]}.jsonl") + + with open(jsonl_path, "w") as f: + f.write(transcript_content) + + logger.info(f"[Transcript] Wrote resume file: {jsonl_path}") + return jsonl_path + + except OSError as e: + logger.warning(f"[Transcript] Failed to write resume file: {e}") + return None + + +def validate_transcript(content: str | None) -> bool: + """Check that a transcript has actual conversation messages. + + A valid transcript for resume needs at least one user message and one + assistant message (not just queue-operation / file-history-snapshot + metadata). + """ + if not content or not content.strip(): + return False + + lines = content.strip().split("\n") + if len(lines) < 3: + return False + + has_user = False + has_assistant = False + + for line in lines: + try: + entry = json.loads(line) + msg_type = entry.get("type") + if msg_type == "user": + has_user = True + elif msg_type == "assistant": + has_assistant = True + except json.JSONDecodeError: + return False + + return has_user and has_assistant diff --git a/autogpt_platform/backend/backend/api/features/chat/service_test.py b/autogpt_platform/backend/backend/api/features/chat/service_test.py index 70f27af14f..8781ee8341 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service_test.py +++ b/autogpt_platform/backend/backend/api/features/chat/service_test.py @@ -11,6 +11,7 @@ from .response_model import ( StreamTextDelta, StreamToolOutputAvailable, ) +from .sdk import service as sdk_service logger = logging.getLogger(__name__) @@ -80,3 +81,88 @@ async def test_stream_chat_completion_with_tool_calls(setup_test_user, test_user session = await get_chat_session(session.session_id) assert session, "Session not found" assert session.usage, "Usage is empty" + + +@pytest.mark.asyncio(loop_scope="session") +async def test_sdk_resume_multi_turn(setup_test_user, test_user_id): + """Test that the SDK --resume path captures and uses transcripts across turns. + + Turn 1: Send a message containing a unique keyword. + Turn 2: Ask the model to recall that keyword — proving the transcript was + persisted and restored via --resume. + """ + api_key: str | None = getenv("OPEN_ROUTER_API_KEY") + if not api_key: + return pytest.skip("OPEN_ROUTER_API_KEY is not set, skipping test") + + from .config import ChatConfig + + cfg = ChatConfig() + if not cfg.claude_agent_use_resume: + return pytest.skip("CLAUDE_AGENT_USE_RESUME is not enabled, skipping test") + + session = await create_chat_session(test_user_id) + session = await upsert_chat_session(session) + + # --- Turn 1: send a message with a unique keyword --- + keyword = "ZEPHYR42" + turn1_msg = ( + f"Please remember this special keyword: {keyword}. " + "Just confirm you've noted it, keep your response brief." + ) + turn1_text = "" + turn1_errors: list[str] = [] + turn1_ended = False + + async for chunk in sdk_service.stream_chat_completion_sdk( + session.session_id, + turn1_msg, + user_id=test_user_id, + ): + if isinstance(chunk, StreamTextDelta): + turn1_text += chunk.delta + elif isinstance(chunk, StreamError): + turn1_errors.append(chunk.errorText) + elif isinstance(chunk, StreamFinish): + turn1_ended = True + + assert turn1_ended, "Turn 1 did not finish" + assert not turn1_errors, f"Turn 1 errors: {turn1_errors}" + assert turn1_text, "Turn 1 produced no text" + + # Reload session from DB and verify transcript was captured + session = await get_chat_session(session.session_id, test_user_id) + assert session, "Session not found after turn 1" + assert session.sdk_transcript, ( + "sdk_transcript was not captured after turn 1 — " + "Stop hook may not have fired or transcript was too small" + ) + logger.info(f"Turn 1 transcript captured: {len(session.sdk_transcript)} bytes") + + # --- Turn 2: ask model to recall the keyword --- + turn2_msg = "What was the special keyword I asked you to remember?" + turn2_text = "" + turn2_errors: list[str] = [] + turn2_ended = False + + async for chunk in sdk_service.stream_chat_completion_sdk( + session.session_id, + turn2_msg, + user_id=test_user_id, + session=session, + ): + if isinstance(chunk, StreamTextDelta): + turn2_text += chunk.delta + elif isinstance(chunk, StreamError): + turn2_errors.append(chunk.errorText) + elif isinstance(chunk, StreamFinish): + turn2_ended = True + + assert turn2_ended, "Turn 2 did not finish" + assert not turn2_errors, f"Turn 2 errors: {turn2_errors}" + assert turn2_text, "Turn 2 produced no text" + assert keyword in turn2_text, ( + f"Model did not recall keyword '{keyword}' in turn 2. " + f"Response: {turn2_text[:200]}" + ) + logger.info(f"Turn 2 recalled keyword successfully: {turn2_text[:100]}") diff --git a/autogpt_platform/backend/migrations/20260213082253_add_sdk_chat_transcript/migration.sql b/autogpt_platform/backend/migrations/20260213082253_add_sdk_chat_transcript/migration.sql new file mode 100644 index 0000000000..9af33d5842 --- /dev/null +++ b/autogpt_platform/backend/migrations/20260213082253_add_sdk_chat_transcript/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "ChatSession" ADD COLUMN "sdkTranscript" TEXT; diff --git a/autogpt_platform/backend/schema.prisma b/autogpt_platform/backend/schema.prisma index 2da898a7ce..20adf97053 100644 --- a/autogpt_platform/backend/schema.prisma +++ b/autogpt_platform/backend/schema.prisma @@ -223,6 +223,9 @@ model ChatSession { totalPromptTokens Int @default(0) totalCompletionTokens Int @default(0) + // Claude Code JSONL transcript for stateless --resume across turns + sdkTranscript String? @db.Text + Messages ChatMessage[] @@index([userId, updatedAt]) diff --git a/autogpt_platform/backend/test/chat/test_transcript.py b/autogpt_platform/backend/test/chat/test_transcript.py new file mode 100644 index 0000000000..f8e5c4fb22 --- /dev/null +++ b/autogpt_platform/backend/test/chat/test_transcript.py @@ -0,0 +1,128 @@ +"""Unit tests for JSONL transcript management utilities.""" + +import json +import os + +from backend.api.features.chat.sdk.transcript import ( + MAX_TRANSCRIPT_SIZE, + read_transcript_file, + validate_transcript, + write_transcript_to_tempfile, +) + + +def _make_jsonl(*entries: dict) -> str: + return "\n".join(json.dumps(e) for e in entries) + "\n" + + +# --- Fixtures --- + + +METADATA_LINE = {"type": "queue-operation", "subtype": "create"} +FILE_HISTORY = {"type": "file-history-snapshot", "files": []} +USER_MSG = {"type": "user", "uuid": "u1", "message": {"role": "user", "content": "hi"}} +ASST_MSG = { + "type": "assistant", + "uuid": "a1", + "parentUuid": "u1", + "message": {"role": "assistant", "content": "hello"}, +} + +VALID_TRANSCRIPT = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG, ASST_MSG) + + +# --- read_transcript_file --- + + +class TestReadTranscriptFile: + def test_returns_content_for_valid_file(self, tmp_path): + path = tmp_path / "session.jsonl" + path.write_text(VALID_TRANSCRIPT) + result = read_transcript_file(str(path)) + assert result is not None + assert "user" in result + + def test_returns_none_for_missing_file(self): + assert read_transcript_file("/nonexistent/path.jsonl") is None + + def test_returns_none_for_empty_path(self): + assert read_transcript_file("") is None + + def test_returns_none_for_empty_file(self, tmp_path): + path = tmp_path / "empty.jsonl" + path.write_text("") + assert read_transcript_file(str(path)) is None + + def test_returns_none_for_metadata_only(self, tmp_path): + content = _make_jsonl(METADATA_LINE, FILE_HISTORY) + path = tmp_path / "meta.jsonl" + path.write_text(content) + assert read_transcript_file(str(path)) is None + + def test_returns_none_for_oversized_file(self, tmp_path): + # Build a valid transcript that exceeds MAX_TRANSCRIPT_SIZE + big_content = {"type": "user", "data": "x" * (MAX_TRANSCRIPT_SIZE + 100)} + content = _make_jsonl(METADATA_LINE, FILE_HISTORY, big_content, ASST_MSG) + path = tmp_path / "big.jsonl" + path.write_text(content) + assert read_transcript_file(str(path)) is None + + def test_returns_none_for_invalid_json(self, tmp_path): + path = tmp_path / "bad.jsonl" + path.write_text("not json\n{}\n{}\n") + assert read_transcript_file(str(path)) is None + + +# --- write_transcript_to_tempfile --- + + +class TestWriteTranscriptToTempfile: + def test_writes_file_and_returns_path(self, tmp_path): + cwd = str(tmp_path / "workspace") + result = write_transcript_to_tempfile(VALID_TRANSCRIPT, "sess-1234-abcd", cwd) + assert result is not None + assert os.path.isfile(result) + assert result.endswith(".jsonl") + with open(result) as f: + assert f.read() == VALID_TRANSCRIPT + + def test_creates_parent_directory(self, tmp_path): + cwd = str(tmp_path / "new" / "dir") + result = write_transcript_to_tempfile(VALID_TRANSCRIPT, "sess-1234", cwd) + assert result is not None + assert os.path.isdir(cwd) + + def test_uses_session_id_prefix(self, tmp_path): + cwd = str(tmp_path) + result = write_transcript_to_tempfile(VALID_TRANSCRIPT, "abcdef12-rest", cwd) + assert result is not None + assert "abcdef12" in os.path.basename(result) + + +# --- validate_transcript --- + + +class TestValidateTranscript: + def test_valid_transcript(self): + assert validate_transcript(VALID_TRANSCRIPT) is True + + def test_none_content(self): + assert validate_transcript(None) is False + + def test_empty_content(self): + assert validate_transcript("") is False + + def test_metadata_only(self): + content = _make_jsonl(METADATA_LINE, FILE_HISTORY) + assert validate_transcript(content) is False + + def test_user_only_no_assistant(self): + content = _make_jsonl(METADATA_LINE, FILE_HISTORY, USER_MSG) + assert validate_transcript(content) is False + + def test_assistant_only_no_user(self): + content = _make_jsonl(METADATA_LINE, FILE_HISTORY, ASST_MSG) + assert validate_transcript(content) is False + + def test_invalid_json_returns_false(self): + assert validate_transcript("not json\n{}\n{}\n") is False