mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-10 23:05:17 -05:00
refactor(backend/chat): Replace --resume with conversation context, add compaction and dedup
- Remove broken --resume/session file approach (CLI v2.1.38 can't load >2 message session files) and delete session_file.py + tests - Embed prior conversation turns as <conversation_history> context in the user message for multi-turn memory - Add context compaction using shared compress_context() from prompt.py with LLM summarization + truncation fallback for long conversations - Reuse _build_system_prompt and _generate_session_title from parent service.py instead of duplicating (gains Langfuse prompt support) - Add has_conversation_history param to _build_system_prompt to avoid greeting on multi-turn conversations - Fix _SDK_TOOL_RESULTS_GLOB from hardcoded /root/ to expanduser ~/
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
"""Claude Agent SDK service layer for CoPilot chat completions."""
|
||||
|
||||
import asyncio
|
||||
import glob
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -9,12 +8,6 @@ import uuid
|
||||
from collections.abc import AsyncGenerator
|
||||
from typing import Any
|
||||
|
||||
import openai
|
||||
|
||||
from backend.data.understanding import (
|
||||
format_understanding_for_prompt,
|
||||
get_business_understanding,
|
||||
)
|
||||
from backend.util.exceptions import NotFoundError
|
||||
|
||||
from ..config import ChatConfig
|
||||
@@ -34,11 +27,11 @@ from ..response_model import (
|
||||
StreamToolInputAvailable,
|
||||
StreamToolOutputAvailable,
|
||||
)
|
||||
from ..service import _build_system_prompt, _generate_session_title
|
||||
from ..tracking import track_user_message
|
||||
from .anthropic_fallback import stream_with_anthropic
|
||||
from .response_adapter import SDKResponseAdapter
|
||||
from .security_hooks import create_security_hooks
|
||||
from .session_file import cleanup_session_file, write_session_file
|
||||
from .tool_adapter import (
|
||||
COPILOT_TOOL_NAMES,
|
||||
create_copilot_mcp_server,
|
||||
@@ -51,152 +44,121 @@ config = ChatConfig()
|
||||
# Set to hold background tasks to prevent garbage collection
|
||||
_background_tasks: set[asyncio.Task[Any]] = set()
|
||||
|
||||
# SDK tool-results directory pattern
|
||||
_SDK_TOOL_RESULTS_GLOB = "/root/.claude/projects/*/tool-results/*"
|
||||
# SDK tool-results glob pattern — clean these up after each query
|
||||
_SDK_TOOL_RESULTS_GLOB = os.path.expanduser("~/.claude/projects/*/tool-results/*")
|
||||
|
||||
|
||||
def _cleanup_sdk_tool_results() -> None:
|
||||
"""Remove SDK tool-result files to prevent disk accumulation."""
|
||||
for path in glob.glob(_SDK_TOOL_RESULTS_GLOB):
|
||||
import glob as _glob
|
||||
|
||||
for path in _glob.glob(_SDK_TOOL_RESULTS_GLOB):
|
||||
try:
|
||||
os.remove(path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
|
||||
async def _compress_conversation_history(
|
||||
session: ChatSession,
|
||||
) -> list[ChatMessage]:
|
||||
"""Compress prior conversation messages if they exceed the token threshold.
|
||||
|
||||
Here is everything you know about the current user from previous interactions:
|
||||
Uses the shared compress_context() from prompt.py which supports:
|
||||
- LLM summarization of old messages (keeps recent ones intact)
|
||||
- Progressive content truncation as fallback
|
||||
- Middle-out deletion as last resort
|
||||
|
||||
<users_information>
|
||||
{users_information}
|
||||
</users_information>
|
||||
|
||||
## YOUR CORE MANDATE
|
||||
|
||||
You are action-oriented. Your success is measured by:
|
||||
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
|
||||
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
|
||||
- **Time Saved**: Focus on tangible efficiency gains
|
||||
- **Quality Output**: Deliver results that meet or exceed expectations
|
||||
|
||||
## YOUR WORKFLOW
|
||||
|
||||
Adapt flexibly to the conversation context. Not every interaction requires all stages:
|
||||
|
||||
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
|
||||
|
||||
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
|
||||
|
||||
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
|
||||
|
||||
4. **Discover or Create Agents**:
|
||||
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
|
||||
- Search the marketplace with `find_agent` for pre-built automations
|
||||
- Find reusable components with `find_block`
|
||||
- Create custom solutions with `create_agent` if nothing suitable exists
|
||||
- Modify existing library agents with `edit_agent`
|
||||
|
||||
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
|
||||
|
||||
6. **Show Results**: Display outputs using `agent_output`.
|
||||
|
||||
## BEHAVIORAL GUIDELINES
|
||||
|
||||
**Be Concise:**
|
||||
- Target 2-5 short lines maximum
|
||||
- Make every word count—no repetition or filler
|
||||
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
|
||||
- Avoid jargon (blocks, slugs, cron) unless the user asks
|
||||
|
||||
**Be Proactive:**
|
||||
- Suggest next steps before being asked
|
||||
- Anticipate needs based on conversation context and user information
|
||||
- Look for opportunities to expand scope when relevant
|
||||
- Reveal capabilities through action, not explanation
|
||||
|
||||
**Use Tools Effectively:**
|
||||
- Select the right tool for each task
|
||||
- **Always check `find_library_agent` before searching the marketplace**
|
||||
- Use `add_understanding` to capture valuable business context
|
||||
- When tool calls fail, try alternative approaches
|
||||
|
||||
## CRITICAL REMINDER
|
||||
|
||||
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
|
||||
|
||||
|
||||
async def _build_system_prompt(
|
||||
user_id: str | None, has_conversation_history: bool = False
|
||||
) -> tuple[str, Any]:
|
||||
"""Build the system prompt with user's business understanding context.
|
||||
|
||||
Args:
|
||||
user_id: The user ID to fetch understanding for.
|
||||
has_conversation_history: Whether there's existing conversation history.
|
||||
If True, we don't tell the model to greet/introduce (since they're
|
||||
already in a conversation).
|
||||
Returns the compressed prior messages (everything except the current message).
|
||||
"""
|
||||
understanding = None
|
||||
if user_id:
|
||||
try:
|
||||
understanding = await get_business_understanding(user_id)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to fetch business understanding: {e}")
|
||||
prior = session.messages[:-1]
|
||||
if len(prior) < 2:
|
||||
return prior
|
||||
|
||||
if understanding:
|
||||
context = format_understanding_for_prompt(understanding)
|
||||
elif has_conversation_history:
|
||||
# Don't tell model to greet if there's conversation history
|
||||
context = "No prior understanding saved yet. Continue the existing conversation naturally."
|
||||
else:
|
||||
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
|
||||
from backend.util.prompt import compress_context
|
||||
|
||||
return DEFAULT_SYSTEM_PROMPT.replace("{users_information}", context), understanding
|
||||
# Convert ChatMessages to dicts for compress_context
|
||||
messages_dict = []
|
||||
for msg in prior:
|
||||
msg_dict: dict[str, Any] = {"role": msg.role}
|
||||
if msg.content:
|
||||
msg_dict["content"] = msg.content
|
||||
if msg.tool_calls:
|
||||
msg_dict["tool_calls"] = msg.tool_calls
|
||||
if msg.tool_call_id:
|
||||
msg_dict["tool_call_id"] = msg.tool_call_id
|
||||
messages_dict.append(msg_dict)
|
||||
|
||||
|
||||
async def _generate_session_title(
|
||||
message: str,
|
||||
user_id: str | None = None,
|
||||
session_id: str | None = None,
|
||||
) -> str | None:
|
||||
"""Generate a concise title for a chat session."""
|
||||
from backend.util.settings import Settings
|
||||
|
||||
settings = Settings()
|
||||
try:
|
||||
# Build extra_body for OpenRouter tracing
|
||||
extra_body: dict[str, Any] = {
|
||||
"posthogProperties": {"environment": settings.config.app_env.value},
|
||||
}
|
||||
if user_id:
|
||||
extra_body["user"] = user_id[:128]
|
||||
extra_body["posthogDistinctId"] = user_id
|
||||
if session_id:
|
||||
extra_body["session_id"] = session_id[:128]
|
||||
import openai
|
||||
|
||||
client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
|
||||
response = await client.chat.completions.create(
|
||||
model=config.title_model,
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "Generate a very short title (3-6 words) for a chat conversation based on the user's first message. Return ONLY the title, no quotes or punctuation.",
|
||||
},
|
||||
{"role": "user", "content": message[:500]},
|
||||
],
|
||||
max_tokens=20,
|
||||
extra_body=extra_body,
|
||||
)
|
||||
title = response.choices[0].message.content
|
||||
if title:
|
||||
title = title.strip().strip("\"'")
|
||||
return title[:47] + "..." if len(title) > 50 else title
|
||||
return None
|
||||
async with openai.AsyncOpenAI(
|
||||
api_key=config.api_key, base_url=config.base_url, timeout=30.0
|
||||
) as client:
|
||||
result = await compress_context(
|
||||
messages=messages_dict,
|
||||
model=config.model,
|
||||
client=client,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to generate session title: {e}")
|
||||
logger.warning(f"[SDK] Context compression with LLM failed: {e}")
|
||||
# Fall back to truncation-only (no LLM summarization)
|
||||
result = await compress_context(
|
||||
messages=messages_dict,
|
||||
model=config.model,
|
||||
client=None,
|
||||
)
|
||||
|
||||
if result.was_compacted:
|
||||
logger.info(
|
||||
f"[SDK] Context compacted: {result.original_token_count} -> "
|
||||
f"{result.token_count} tokens "
|
||||
f"({result.messages_summarized} summarized, "
|
||||
f"{result.messages_dropped} dropped)"
|
||||
)
|
||||
# Convert compressed dicts back to ChatMessages
|
||||
return [
|
||||
ChatMessage(
|
||||
role=m["role"],
|
||||
content=m.get("content"),
|
||||
tool_calls=m.get("tool_calls"),
|
||||
tool_call_id=m.get("tool_call_id"),
|
||||
)
|
||||
for m in result.messages
|
||||
]
|
||||
|
||||
return prior
|
||||
|
||||
|
||||
def _format_conversation_context(messages: list[ChatMessage]) -> str | None:
|
||||
"""Format conversation messages into a context prefix for the user message.
|
||||
|
||||
Returns a string like:
|
||||
<conversation_history>
|
||||
User: hello
|
||||
You responded: Hi! How can I help?
|
||||
</conversation_history>
|
||||
|
||||
Returns None if there are no messages to format.
|
||||
"""
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
lines: list[str] = []
|
||||
for msg in messages:
|
||||
if not msg.content:
|
||||
continue
|
||||
if msg.role == "user":
|
||||
lines.append(f"User: {msg.content}")
|
||||
elif msg.role == "assistant":
|
||||
lines.append(f"You responded: {msg.content}")
|
||||
# Skip tool messages — they're internal details
|
||||
|
||||
if not lines:
|
||||
return None
|
||||
|
||||
return "<conversation_history>\n" + "\n".join(lines) + "\n</conversation_history>"
|
||||
|
||||
|
||||
async def stream_chat_completion_sdk(
|
||||
session_id: str,
|
||||
@@ -243,11 +205,10 @@ async def stream_chat_completion_sdk(
|
||||
task = asyncio.create_task(
|
||||
_update_title_async(session_id, first_message, user_id)
|
||||
)
|
||||
# Store reference to prevent garbage collection
|
||||
_background_tasks.add(task)
|
||||
task.add_done_callback(_background_tasks.discard)
|
||||
|
||||
# Check if there's conversation history (more than just the current message)
|
||||
# Build system prompt (reuses non-SDK path with Langfuse support)
|
||||
has_history = len(session.messages) > 1
|
||||
system_prompt, _ = await _build_system_prompt(
|
||||
user_id, has_conversation_history=has_history
|
||||
@@ -260,35 +221,20 @@ async def stream_chat_completion_sdk(
|
||||
|
||||
yield StreamStart(messageId=message_id, taskId=task_id)
|
||||
|
||||
# Track whether the stream completed normally via ResultMessage
|
||||
stream_completed = False
|
||||
|
||||
try:
|
||||
try:
|
||||
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
|
||||
|
||||
# Create MCP server with CoPilot tools
|
||||
mcp_server = create_copilot_mcp_server()
|
||||
|
||||
# For multi-turn conversations, write a session file so the CLI
|
||||
# loads full user+assistant context via --resume. This enables
|
||||
# turn-level compaction for long conversations.
|
||||
resume_id: str | None = None
|
||||
if len(session.messages) > 1:
|
||||
resume_id = write_session_file(session)
|
||||
if resume_id:
|
||||
logger.info(
|
||||
f"[SDK] Wrote session file for --resume: "
|
||||
f"{len(session.messages) - 1} prior messages"
|
||||
)
|
||||
|
||||
options = ClaudeAgentOptions(
|
||||
system_prompt=system_prompt,
|
||||
mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type]
|
||||
allowed_tools=COPILOT_TOOL_NAMES,
|
||||
hooks=create_security_hooks(user_id), # type: ignore[arg-type]
|
||||
cwd="/tmp",
|
||||
resume=resume_id,
|
||||
)
|
||||
|
||||
adapter = SDKResponseAdapter(message_id=message_id)
|
||||
@@ -296,14 +242,12 @@ async def stream_chat_completion_sdk(
|
||||
|
||||
try:
|
||||
async with ClaudeSDKClient(options=options) as client:
|
||||
# Determine the current user message
|
||||
current_message = message or ""
|
||||
if not current_message and session.messages:
|
||||
last_user = [m for m in session.messages if m.role == "user"]
|
||||
if last_user:
|
||||
current_message = last_user[-1].content or ""
|
||||
|
||||
# Guard against empty messages
|
||||
if not current_message.strip():
|
||||
yield StreamError(
|
||||
errorText="Message cannot be empty.",
|
||||
@@ -312,38 +256,46 @@ async def stream_chat_completion_sdk(
|
||||
yield StreamFinish()
|
||||
return
|
||||
|
||||
await client.query(current_message, session_id=session_id)
|
||||
logger.info(
|
||||
"[SDK] Query sent"
|
||||
+ (" (with --resume)" if resume_id else " (new)")
|
||||
)
|
||||
# Build query with conversation history context.
|
||||
# Compress history first to handle long conversations.
|
||||
query_message = current_message
|
||||
if len(session.messages) > 1:
|
||||
compressed = await _compress_conversation_history(session)
|
||||
history_context = _format_conversation_context(compressed)
|
||||
if history_context:
|
||||
query_message = (
|
||||
f"{history_context}\n\n"
|
||||
f"Now, the user says:\n{current_message}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"[SDK] Sending query: {current_message[:80]!r}"
|
||||
f" ({len(session.messages)} msgs in session)"
|
||||
)
|
||||
await client.query(query_message, session_id=session_id)
|
||||
|
||||
# Track assistant response to save to session
|
||||
# We may need multiple assistant messages if text comes after tool results
|
||||
assistant_response = ChatMessage(role="assistant", content="")
|
||||
accumulated_tool_calls: list[dict[str, Any]] = []
|
||||
has_appended_assistant = False
|
||||
has_tool_results = False # Track if we've received tool results
|
||||
has_tool_results = False
|
||||
|
||||
# Receive messages from the SDK
|
||||
async for sdk_msg in client.receive_messages():
|
||||
logger.debug(
|
||||
f"[SDK] Received: {type(sdk_msg).__name__} {getattr(sdk_msg, 'subtype', '')}"
|
||||
f"[SDK] Received: {type(sdk_msg).__name__} "
|
||||
f"{getattr(sdk_msg, 'subtype', '')}"
|
||||
)
|
||||
for response in adapter.convert_message(sdk_msg):
|
||||
if isinstance(response, StreamStart):
|
||||
continue
|
||||
yield response
|
||||
|
||||
# Accumulate text deltas into assistant response
|
||||
if isinstance(response, StreamTextDelta):
|
||||
delta = response.delta or ""
|
||||
# After tool results, create new assistant message for post-tool text
|
||||
if has_tool_results and has_appended_assistant:
|
||||
assistant_response = ChatMessage(
|
||||
role="assistant", content=delta
|
||||
)
|
||||
accumulated_tool_calls = [] # Reset for new message
|
||||
accumulated_tool_calls = []
|
||||
session.messages.append(assistant_response)
|
||||
has_tool_results = False
|
||||
else:
|
||||
@@ -354,7 +306,6 @@ async def stream_chat_completion_sdk(
|
||||
session.messages.append(assistant_response)
|
||||
has_appended_assistant = True
|
||||
|
||||
# Track tool calls on the assistant message
|
||||
elif isinstance(response, StreamToolInputAvailable):
|
||||
accumulated_tool_calls.append(
|
||||
{
|
||||
@@ -368,9 +319,7 @@ async def stream_chat_completion_sdk(
|
||||
},
|
||||
}
|
||||
)
|
||||
# Update assistant message with tool calls
|
||||
assistant_response.tool_calls = accumulated_tool_calls
|
||||
# Append assistant message if not already (tool-only response)
|
||||
if not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
has_appended_assistant = True
|
||||
@@ -392,23 +341,16 @@ async def stream_chat_completion_sdk(
|
||||
elif isinstance(response, StreamFinish):
|
||||
stream_completed = True
|
||||
|
||||
# Break out of the message loop if we received finish signal
|
||||
if stream_completed:
|
||||
break
|
||||
|
||||
# Ensure assistant response is saved even if no text deltas
|
||||
# (e.g., only tool calls were made)
|
||||
if (
|
||||
assistant_response.content or assistant_response.tool_calls
|
||||
) and not has_appended_assistant:
|
||||
session.messages.append(assistant_response)
|
||||
|
||||
finally:
|
||||
# Always clean up SDK tool-result files, even on error
|
||||
_cleanup_sdk_tool_results()
|
||||
# Clean up session file written for --resume
|
||||
if resume_id:
|
||||
cleanup_session_file(resume_id)
|
||||
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
@@ -421,24 +363,19 @@ async def stream_chat_completion_sdk(
|
||||
stream_completed = True
|
||||
yield response
|
||||
|
||||
# Save the session with accumulated messages
|
||||
await upsert_chat_session(session)
|
||||
logger.debug(
|
||||
f"[SDK] Session {session_id} saved with {len(session.messages)} messages"
|
||||
)
|
||||
# Yield StreamFinish to signal completion to the caller (routes.py)
|
||||
# Only if one hasn't already been yielded by the stream
|
||||
if not stream_completed:
|
||||
yield StreamFinish()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[SDK] Error: {e}", exc_info=True)
|
||||
# Save session even on error to preserve any partial response
|
||||
try:
|
||||
await upsert_chat_session(session)
|
||||
except Exception as save_err:
|
||||
logger.error(f"[SDK] Failed to save session on error: {save_err}")
|
||||
# Sanitize error message to avoid exposing internal details
|
||||
yield StreamError(
|
||||
errorText="An error occurred. Please try again.",
|
||||
code="sdk_error",
|
||||
|
||||
@@ -1,119 +0,0 @@
|
||||
"""Session file management for Claude Code CLI --resume support.
|
||||
|
||||
Writes conversation history as JSONL files to the CLI's session storage
|
||||
directory, enabling --resume to load full user+assistant context with
|
||||
turn-level compaction support.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from ..model import ChatSession
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# The CLI stores sessions under ~/.claude/projects/<encoded-cwd>/<session-id>.jsonl
|
||||
# The cwd path is encoded by replacing / with - and prefixing with -
|
||||
_CLAUDE_PROJECTS_DIR = Path.home() / ".claude" / "projects"
|
||||
|
||||
|
||||
def _encode_cwd(cwd: str) -> str:
|
||||
"""Encode a working directory path for the CLI projects dir name."""
|
||||
return "-" + cwd.lstrip("/").replace("/", "-")
|
||||
|
||||
|
||||
def _get_project_dir(cwd: str) -> Path:
|
||||
"""Get the CLI project directory for a given working directory.
|
||||
|
||||
Resolves symlinks to match the CLI's behavior (e.g. /tmp -> /private/tmp
|
||||
on macOS).
|
||||
"""
|
||||
resolved = str(Path(cwd).resolve())
|
||||
return _CLAUDE_PROJECTS_DIR / _encode_cwd(resolved)
|
||||
|
||||
|
||||
def write_session_file(
|
||||
session: ChatSession,
|
||||
cwd: str = "/tmp",
|
||||
) -> str | None:
|
||||
"""Write a session's conversation history as a JSONL file for --resume.
|
||||
|
||||
Returns the session ID to pass to --resume, or None if there's not enough
|
||||
history to warrant a file (< 2 messages).
|
||||
"""
|
||||
# Only write if there's prior conversation (at least user + assistant)
|
||||
prior = [m for m in session.messages[:-1] if m.role in ("user", "assistant")]
|
||||
if len(prior) < 2:
|
||||
return None
|
||||
|
||||
session_id = session.session_id
|
||||
resolved_cwd = str(Path(cwd).resolve())
|
||||
project_dir = _get_project_dir(cwd)
|
||||
project_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
file_path = project_dir / f"{session_id}.jsonl"
|
||||
now = datetime.now(UTC).isoformat()
|
||||
|
||||
lines: list[str] = []
|
||||
prev_uuid: str | None = None
|
||||
|
||||
for msg in session.messages[:-1]:
|
||||
msg_uuid = str(uuid.uuid4())
|
||||
|
||||
if msg.role == "user" and msg.content:
|
||||
line = {
|
||||
"parentUuid": prev_uuid,
|
||||
"isSidechain": False,
|
||||
"userType": "external",
|
||||
"cwd": resolved_cwd,
|
||||
"sessionId": session_id,
|
||||
"type": "user",
|
||||
"message": {"role": "user", "content": msg.content},
|
||||
"uuid": msg_uuid,
|
||||
"timestamp": now,
|
||||
}
|
||||
lines.append(json.dumps(line))
|
||||
prev_uuid = msg_uuid
|
||||
|
||||
elif msg.role == "assistant" and msg.content:
|
||||
line = {
|
||||
"parentUuid": prev_uuid,
|
||||
"isSidechain": False,
|
||||
"userType": "external",
|
||||
"cwd": resolved_cwd,
|
||||
"sessionId": session_id,
|
||||
"type": "assistant",
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": msg.content}],
|
||||
"model": "unknown",
|
||||
},
|
||||
"uuid": msg_uuid,
|
||||
"timestamp": now,
|
||||
}
|
||||
lines.append(json.dumps(line))
|
||||
prev_uuid = msg_uuid
|
||||
|
||||
if not lines:
|
||||
return None
|
||||
|
||||
try:
|
||||
file_path.write_text("\n".join(lines) + "\n")
|
||||
logger.debug(f"[SESSION] Wrote {len(lines)} messages to {file_path}")
|
||||
return session_id
|
||||
except OSError as e:
|
||||
logger.warning(f"[SESSION] Failed to write session file: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def cleanup_session_file(session_id: str, cwd: str = "/tmp") -> None:
|
||||
"""Remove a session file after use."""
|
||||
project_dir = _get_project_dir(cwd)
|
||||
file_path = project_dir / f"{session_id}.jsonl"
|
||||
try:
|
||||
file_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
@@ -1,222 +0,0 @@
|
||||
"""Unit tests for session file management."""
|
||||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
from ..model import ChatMessage, ChatSession
|
||||
from .session_file import _get_project_dir, cleanup_session_file, write_session_file
|
||||
|
||||
_NOW = datetime.now(UTC)
|
||||
|
||||
|
||||
def _make_session(
|
||||
messages: list[ChatMessage], session_id: str = "test-session"
|
||||
) -> ChatSession:
|
||||
return ChatSession(
|
||||
session_id=session_id,
|
||||
user_id="test-user",
|
||||
messages=messages,
|
||||
usage=[],
|
||||
started_at=_NOW,
|
||||
updated_at=_NOW,
|
||||
)
|
||||
|
||||
|
||||
# -- write_session_file ------------------------------------------------------
|
||||
|
||||
|
||||
def test_write_returns_none_for_short_history():
|
||||
"""Sessions with < 2 prior messages shouldn't generate a file."""
|
||||
session = _make_session(
|
||||
[
|
||||
ChatMessage(role="user", content="hello"),
|
||||
]
|
||||
)
|
||||
assert write_session_file(session) is None
|
||||
|
||||
|
||||
def test_write_returns_none_for_single_pair():
|
||||
"""A single user message (the current one) with no prior history."""
|
||||
session = _make_session(
|
||||
[
|
||||
ChatMessage(role="user", content="current message"),
|
||||
]
|
||||
)
|
||||
assert write_session_file(session) is None
|
||||
|
||||
|
||||
def test_write_creates_valid_jsonl(tmp_path: Path):
|
||||
"""Multi-turn session should produce valid JSONL with correct structure."""
|
||||
session = _make_session(
|
||||
[
|
||||
ChatMessage(role="user", content="hello"),
|
||||
ChatMessage(role="assistant", content="Hi there!"),
|
||||
ChatMessage(role="user", content="how are you"), # current message
|
||||
],
|
||||
session_id="sess-123",
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.sdk.session_file._get_project_dir",
|
||||
return_value=tmp_path,
|
||||
):
|
||||
result = write_session_file(session)
|
||||
|
||||
assert result == "sess-123"
|
||||
|
||||
# Verify the file exists and is valid JSONL
|
||||
file_path = tmp_path / "sess-123.jsonl"
|
||||
assert file_path.exists()
|
||||
|
||||
lines = file_path.read_text().strip().split("\n")
|
||||
# Should have 2 lines (prior messages only, not the current/last one)
|
||||
assert len(lines) == 2
|
||||
|
||||
# Verify first line (user message)
|
||||
line1 = json.loads(lines[0])
|
||||
assert line1["type"] == "user"
|
||||
assert line1["message"]["role"] == "user"
|
||||
assert line1["message"]["content"] == "hello"
|
||||
assert line1["sessionId"] == "sess-123"
|
||||
assert line1["parentUuid"] is None # First message has no parent
|
||||
assert "uuid" in line1
|
||||
assert "timestamp" in line1
|
||||
|
||||
# Verify second line (assistant message)
|
||||
line2 = json.loads(lines[1])
|
||||
assert line2["type"] == "assistant"
|
||||
assert line2["message"]["role"] == "assistant"
|
||||
assert line2["message"]["content"] == [{"type": "text", "text": "Hi there!"}]
|
||||
assert line2["parentUuid"] == line1["uuid"] # Chained to previous
|
||||
|
||||
|
||||
def test_write_skips_tool_messages(tmp_path: Path):
|
||||
"""Tool messages should be skipped in the session file."""
|
||||
session = _make_session(
|
||||
[
|
||||
ChatMessage(role="user", content="find agents"),
|
||||
ChatMessage(role="assistant", content="Let me search."),
|
||||
ChatMessage(role="tool", content="found 3", tool_call_id="tc1"),
|
||||
ChatMessage(role="assistant", content="I found 3 agents."),
|
||||
ChatMessage(role="user", content="run the first one"),
|
||||
],
|
||||
session_id="sess-tools",
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.sdk.session_file._get_project_dir",
|
||||
return_value=tmp_path,
|
||||
):
|
||||
result = write_session_file(session)
|
||||
|
||||
assert result == "sess-tools"
|
||||
file_path = tmp_path / "sess-tools.jsonl"
|
||||
lines = file_path.read_text().strip().split("\n")
|
||||
|
||||
# Should have 3 lines: user, assistant, assistant (tool message skipped,
|
||||
# last user message excluded as current)
|
||||
assert len(lines) == 3
|
||||
types = [json.loads(line)["type"] for line in lines]
|
||||
assert types == ["user", "assistant", "assistant"]
|
||||
|
||||
|
||||
def test_write_skips_empty_content(tmp_path: Path):
|
||||
"""Messages with empty content should be skipped."""
|
||||
session = _make_session(
|
||||
[
|
||||
ChatMessage(role="user", content="hello"),
|
||||
ChatMessage(role="assistant", content=""),
|
||||
ChatMessage(role="assistant", content="real response"),
|
||||
ChatMessage(role="user", content="next"),
|
||||
],
|
||||
session_id="sess-empty",
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.sdk.session_file._get_project_dir",
|
||||
return_value=tmp_path,
|
||||
):
|
||||
result = write_session_file(session)
|
||||
|
||||
assert result == "sess-empty"
|
||||
file_path = tmp_path / "sess-empty.jsonl"
|
||||
lines = file_path.read_text().strip().split("\n")
|
||||
# user + assistant (non-empty) = 2 lines
|
||||
assert len(lines) == 2
|
||||
|
||||
|
||||
# -- cleanup_session_file ----------------------------------------------------
|
||||
|
||||
|
||||
def test_cleanup_removes_file(tmp_path: Path):
|
||||
"""cleanup_session_file should remove the session file."""
|
||||
file_path = tmp_path / "sess-cleanup.jsonl"
|
||||
file_path.write_text("{}\n")
|
||||
assert file_path.exists()
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.sdk.session_file._get_project_dir",
|
||||
return_value=tmp_path,
|
||||
):
|
||||
cleanup_session_file("sess-cleanup")
|
||||
|
||||
assert not file_path.exists()
|
||||
|
||||
|
||||
def test_cleanup_no_error_if_missing(tmp_path: Path):
|
||||
"""cleanup_session_file should not raise if file doesn't exist."""
|
||||
with patch(
|
||||
"backend.api.features.chat.sdk.session_file._get_project_dir",
|
||||
return_value=tmp_path,
|
||||
):
|
||||
cleanup_session_file("nonexistent") # Should not raise
|
||||
|
||||
|
||||
# -- _get_project_dir --------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_project_dir_resolves_symlinks(tmp_path: Path):
|
||||
"""_get_project_dir should resolve symlinks so the path matches the CLI."""
|
||||
# Create a symlink: tmp_path/link -> tmp_path/real
|
||||
real_dir = tmp_path / "real"
|
||||
real_dir.mkdir()
|
||||
link = tmp_path / "link"
|
||||
link.symlink_to(real_dir)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.sdk.session_file._CLAUDE_PROJECTS_DIR",
|
||||
tmp_path / "projects",
|
||||
):
|
||||
result = _get_project_dir(str(link))
|
||||
|
||||
# Should resolve the symlink and encode the real path
|
||||
expected_encoded = "-" + str(real_dir).lstrip("/").replace("/", "-")
|
||||
assert result.name == expected_encoded
|
||||
|
||||
|
||||
def test_write_uses_resolved_cwd_in_messages(tmp_path: Path):
|
||||
"""The cwd field in JSONL messages should use the resolved path."""
|
||||
session = _make_session(
|
||||
[
|
||||
ChatMessage(role="user", content="hello"),
|
||||
ChatMessage(role="assistant", content="Hi!"),
|
||||
ChatMessage(role="user", content="current"),
|
||||
],
|
||||
session_id="sess-cwd",
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.sdk.session_file._get_project_dir",
|
||||
return_value=tmp_path,
|
||||
):
|
||||
write_session_file(session, cwd="/tmp")
|
||||
|
||||
file_path = tmp_path / "sess-cwd.jsonl"
|
||||
lines = file_path.read_text().strip().split("\n")
|
||||
for line in lines:
|
||||
obj = json.loads(line)
|
||||
# On macOS /tmp resolves to /private/tmp; on Linux stays /tmp
|
||||
resolved = str(Path("/tmp").resolve())
|
||||
assert obj["cwd"] == resolved
|
||||
@@ -245,12 +245,16 @@ async def _get_system_prompt_template(context: str) -> str:
|
||||
return DEFAULT_SYSTEM_PROMPT.format(users_information=context)
|
||||
|
||||
|
||||
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
|
||||
async def _build_system_prompt(
|
||||
user_id: str | None, has_conversation_history: bool = False
|
||||
) -> tuple[str, Any]:
|
||||
"""Build the full system prompt including business understanding if available.
|
||||
|
||||
Args:
|
||||
user_id: The user ID for fetching business understanding
|
||||
If "default" and this is the user's first session, will use "onboarding" instead.
|
||||
user_id: The user ID for fetching business understanding.
|
||||
has_conversation_history: Whether there's existing conversation history.
|
||||
If True, we don't tell the model to greet/introduce (since they're
|
||||
already in a conversation).
|
||||
|
||||
Returns:
|
||||
Tuple of (compiled prompt string, business understanding object)
|
||||
@@ -266,6 +270,8 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
|
||||
|
||||
if understanding:
|
||||
context = format_understanding_for_prompt(understanding)
|
||||
elif has_conversation_history:
|
||||
context = "No prior understanding saved yet. Continue the existing conversation naturally."
|
||||
else:
|
||||
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
|
||||
|
||||
@@ -1229,7 +1235,7 @@ async def _stream_chat_chunks(
|
||||
|
||||
total_time = (time_module.perf_counter() - stream_chunks_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time/1000:.1f}s; "
|
||||
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time / 1000:.1f}s; "
|
||||
f"session={session.session_id}, user={session.user_id}",
|
||||
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user