fix(backend/chat): Use --resume with session files for multi-turn conversations

Replace broken AsyncIterable approach (CLI rejects assistant-type stdin
messages) with JSONL session files written to the CLI's storage directory.
This enables --resume to load full user+assistant context with turn-level
compaction support for long conversations.
This commit is contained in:
Zamil Majdy
2026-02-10 18:46:33 +04:00
parent c40a98ba3c
commit 6403ffe353
3 changed files with 310 additions and 68 deletions

View File

@@ -6,7 +6,7 @@ import json
import logging
import os
import uuid
from collections.abc import AsyncGenerator, AsyncIterator
from collections.abc import AsyncGenerator
from typing import Any
import openai
@@ -38,6 +38,7 @@ from ..tracking import track_user_message
from .anthropic_fallback import stream_with_anthropic
from .response_adapter import SDKResponseAdapter
from .security_hooks import create_security_hooks
from .session_file import cleanup_session_file, write_session_file
from .tool_adapter import (
COPILOT_TOOL_NAMES,
create_copilot_mcp_server,
@@ -63,58 +64,6 @@ def _cleanup_sdk_tool_results() -> None:
pass
def _build_conversation_messages(
session: ChatSession,
) -> AsyncIterator[dict[str, Any]]:
"""Build an async iterator of SDK-compatible message dicts from session history.
Yields structured user/assistant turns that the SDK writes directly to the
CLI's stdin. This gives the model native conversation context (enabling
turn-level compaction for long conversations) without any file I/O.
Only prior messages are yielded; the current (last) user message is
appended at the end so the SDK processes it as the new query.
"""
async def _iter() -> AsyncIterator[dict[str, Any]]:
# Yield all messages except the last (current user message)
for msg in session.messages[:-1]:
if msg.role == "user":
yield {
"type": "user",
"message": {
"role": "user",
"content": msg.content or "",
},
"session_id": session.session_id,
}
elif msg.role == "assistant" and msg.content:
yield {
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": msg.content}],
},
"session_id": session.session_id,
}
# Skip tool messages — the assistant's text already captures the
# key information and tool IDs won't match across sessions.
# Yield the current user message last
current = session.messages[-1] if session.messages else None
if current and current.role == "user":
yield {
"type": "user",
"message": {
"role": "user",
"content": current.content or "",
},
"session_id": session.session_id,
}
return _iter()
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
Here is everything you know about the current user from previous interactions:
@@ -321,12 +270,24 @@ async def stream_chat_completion_sdk(
# Create MCP server with CoPilot tools
mcp_server = create_copilot_mcp_server()
# For multi-turn conversations, write a session file so the CLI
# loads full user+assistant context via --resume. This enables
# turn-level compaction for long conversations.
resume_id: str | None = None
if len(session.messages) > 1:
resume_id = write_session_file(session)
if resume_id:
logger.info(
f"[SDK] Wrote session file for --resume: "
f"{len(session.messages) - 1} prior messages"
)
options = ClaudeAgentOptions(
system_prompt=system_prompt,
mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type]
allowed_tools=COPILOT_TOOL_NAMES,
hooks=create_security_hooks(user_id), # type: ignore[arg-type]
continue_conversation=True,
resume=resume_id,
)
adapter = SDKResponseAdapter(message_id=message_id)
@@ -350,20 +311,11 @@ async def stream_chat_completion_sdk(
yield StreamFinish()
return
# For multi-turn conversations, pass structured history
# as an AsyncIterable so the CLI sees native turns and
# can do turn-level compaction. For first messages, just
# send the string directly.
if len(session.messages) > 1:
history_iter = _build_conversation_messages(session)
await client.query(history_iter, session_id=session_id)
logger.info(
f"[SDK] Structured history: "
f"{len(session.messages) - 1} prior messages"
)
else:
await client.query(current_message, session_id=session_id)
logger.info("[SDK] New conversation")
await client.query(current_message, session_id=session_id)
logger.info(
"[SDK] Query sent"
+ (" (with --resume)" if resume_id else " (new)")
)
# Track assistant response to save to session
# We may need multiple assistant messages if text comes after tool results
@@ -450,6 +402,9 @@ async def stream_chat_completion_sdk(
finally:
# Always clean up SDK tool-result files, even on error
_cleanup_sdk_tool_results()
# Clean up session file written for --resume
if resume_id:
cleanup_session_file(resume_id)
except ImportError:
logger.warning(

View File

@@ -0,0 +1,113 @@
"""Session file management for Claude Code CLI --resume support.
Writes conversation history as JSONL files to the CLI's session storage
directory, enabling --resume to load full user+assistant context with
turn-level compaction support.
"""
import json
import logging
import uuid
from datetime import UTC, datetime
from pathlib import Path
from ..model import ChatSession
logger = logging.getLogger(__name__)
# The CLI stores sessions under ~/.claude/projects/<encoded-cwd>/<session-id>.jsonl
# The cwd path is encoded by replacing / with - and prefixing with -
_CLAUDE_PROJECTS_DIR = Path.home() / ".claude" / "projects"
def _encode_cwd(cwd: str) -> str:
"""Encode a working directory path for the CLI projects dir name."""
return "-" + cwd.lstrip("/").replace("/", "-")
def _get_project_dir(cwd: str) -> Path:
"""Get the CLI project directory for a given working directory."""
return _CLAUDE_PROJECTS_DIR / _encode_cwd(cwd)
def write_session_file(
session: ChatSession,
cwd: str = "/tmp",
) -> str | None:
"""Write a session's conversation history as a JSONL file for --resume.
Returns the session ID to pass to --resume, or None if there's not enough
history to warrant a file (< 2 messages).
"""
# Only write if there's prior conversation (at least user + assistant)
prior = [m for m in session.messages[:-1] if m.role in ("user", "assistant")]
if len(prior) < 2:
return None
session_id = session.session_id
project_dir = _get_project_dir(cwd)
project_dir.mkdir(parents=True, exist_ok=True)
file_path = project_dir / f"{session_id}.jsonl"
now = datetime.now(UTC).isoformat()
lines: list[str] = []
prev_uuid: str | None = None
for msg in session.messages[:-1]:
msg_uuid = str(uuid.uuid4())
if msg.role == "user" and msg.content:
line = {
"parentUuid": prev_uuid,
"isSidechain": False,
"userType": "external",
"cwd": cwd,
"sessionId": session_id,
"type": "user",
"message": {"role": "user", "content": msg.content},
"uuid": msg_uuid,
"timestamp": now,
}
lines.append(json.dumps(line))
prev_uuid = msg_uuid
elif msg.role == "assistant" and msg.content:
line = {
"parentUuid": prev_uuid,
"isSidechain": False,
"userType": "external",
"cwd": cwd,
"sessionId": session_id,
"type": "assistant",
"message": {
"role": "assistant",
"content": [{"type": "text", "text": msg.content}],
"model": "unknown",
},
"uuid": msg_uuid,
"timestamp": now,
}
lines.append(json.dumps(line))
prev_uuid = msg_uuid
if not lines:
return None
try:
file_path.write_text("\n".join(lines) + "\n")
logger.debug(f"[SESSION] Wrote {len(lines)} messages to {file_path}")
return session_id
except OSError as e:
logger.warning(f"[SESSION] Failed to write session file: {e}")
return None
def cleanup_session_file(session_id: str, cwd: str = "/tmp") -> None:
"""Remove a session file after use."""
project_dir = _get_project_dir(cwd)
file_path = project_dir / f"{session_id}.jsonl"
try:
file_path.unlink(missing_ok=True)
except OSError:
pass

View File

@@ -0,0 +1,174 @@
"""Unit tests for session file management."""
import json
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import patch
from ..model import ChatMessage, ChatSession
from .session_file import cleanup_session_file, write_session_file
_NOW = datetime.now(UTC)
def _make_session(
messages: list[ChatMessage], session_id: str = "test-session"
) -> ChatSession:
return ChatSession(
session_id=session_id,
user_id="test-user",
messages=messages,
usage=[],
started_at=_NOW,
updated_at=_NOW,
)
# -- write_session_file ------------------------------------------------------
def test_write_returns_none_for_short_history():
"""Sessions with < 2 prior messages shouldn't generate a file."""
session = _make_session(
[
ChatMessage(role="user", content="hello"),
]
)
assert write_session_file(session) is None
def test_write_returns_none_for_single_pair():
"""A single user message (the current one) with no prior history."""
session = _make_session(
[
ChatMessage(role="user", content="current message"),
]
)
assert write_session_file(session) is None
def test_write_creates_valid_jsonl(tmp_path: Path):
"""Multi-turn session should produce valid JSONL with correct structure."""
session = _make_session(
[
ChatMessage(role="user", content="hello"),
ChatMessage(role="assistant", content="Hi there!"),
ChatMessage(role="user", content="how are you"), # current message
],
session_id="sess-123",
)
with patch(
"backend.api.features.chat.sdk.session_file._get_project_dir",
return_value=tmp_path,
):
result = write_session_file(session)
assert result == "sess-123"
# Verify the file exists and is valid JSONL
file_path = tmp_path / "sess-123.jsonl"
assert file_path.exists()
lines = file_path.read_text().strip().split("\n")
# Should have 2 lines (prior messages only, not the current/last one)
assert len(lines) == 2
# Verify first line (user message)
line1 = json.loads(lines[0])
assert line1["type"] == "user"
assert line1["message"]["role"] == "user"
assert line1["message"]["content"] == "hello"
assert line1["sessionId"] == "sess-123"
assert line1["parentUuid"] is None # First message has no parent
assert "uuid" in line1
assert "timestamp" in line1
# Verify second line (assistant message)
line2 = json.loads(lines[1])
assert line2["type"] == "assistant"
assert line2["message"]["role"] == "assistant"
assert line2["message"]["content"] == [{"type": "text", "text": "Hi there!"}]
assert line2["parentUuid"] == line1["uuid"] # Chained to previous
def test_write_skips_tool_messages(tmp_path: Path):
"""Tool messages should be skipped in the session file."""
session = _make_session(
[
ChatMessage(role="user", content="find agents"),
ChatMessage(role="assistant", content="Let me search."),
ChatMessage(role="tool", content="found 3", tool_call_id="tc1"),
ChatMessage(role="assistant", content="I found 3 agents."),
ChatMessage(role="user", content="run the first one"),
],
session_id="sess-tools",
)
with patch(
"backend.api.features.chat.sdk.session_file._get_project_dir",
return_value=tmp_path,
):
result = write_session_file(session)
assert result == "sess-tools"
file_path = tmp_path / "sess-tools.jsonl"
lines = file_path.read_text().strip().split("\n")
# Should have 3 lines: user, assistant, assistant (tool message skipped,
# last user message excluded as current)
assert len(lines) == 3
types = [json.loads(line)["type"] for line in lines]
assert types == ["user", "assistant", "assistant"]
def test_write_skips_empty_content(tmp_path: Path):
"""Messages with empty content should be skipped."""
session = _make_session(
[
ChatMessage(role="user", content="hello"),
ChatMessage(role="assistant", content=""),
ChatMessage(role="assistant", content="real response"),
ChatMessage(role="user", content="next"),
],
session_id="sess-empty",
)
with patch(
"backend.api.features.chat.sdk.session_file._get_project_dir",
return_value=tmp_path,
):
result = write_session_file(session)
assert result == "sess-empty"
file_path = tmp_path / "sess-empty.jsonl"
lines = file_path.read_text().strip().split("\n")
# user + assistant (non-empty) = 2 lines
assert len(lines) == 2
# -- cleanup_session_file ----------------------------------------------------
def test_cleanup_removes_file(tmp_path: Path):
"""cleanup_session_file should remove the session file."""
file_path = tmp_path / "sess-cleanup.jsonl"
file_path.write_text("{}\n")
assert file_path.exists()
with patch(
"backend.api.features.chat.sdk.session_file._get_project_dir",
return_value=tmp_path,
):
cleanup_session_file("sess-cleanup")
assert not file_path.exists()
def test_cleanup_no_error_if_missing(tmp_path: Path):
"""cleanup_session_file should not raise if file doesn't exist."""
with patch(
"backend.api.features.chat.sdk.session_file._get_project_dir",
return_value=tmp_path,
):
cleanup_session_file("nonexistent") # Should not raise