fix(copilot): improve tool flush logging, transcript capture, and stale spinner safety nets

- Elevate flush logging from debug to info/warning with structured messages
  showing tool names and IDs for production diagnostics
- Capture raw SDK output for transcript instead of relying on Stop hook
  file path (CLI doesn't write JSONL in SDK mode)
- Add _build_transcript() to reconstruct JSONL from captured entries
- Add isComplete option to hydration conversion — marks dangling tool calls
  as completed when session has no active stream (fixes stale spinners on
  page refresh)
- Add resolveInProgressTools safety net on streaming→ready transition
  (catches tool parts the backend didn't emit output for)
- Add 3 new tests for flush mechanism (ResultMessage, AssistantMessage,
  stashed output)
This commit is contained in:
Zamil Majdy
2026-02-20 08:56:56 +07:00
parent 7ee870ed70
commit b1c5000937
6 changed files with 344 additions and 54 deletions

View File

@@ -228,11 +228,22 @@ class SDKResponseAdapter:
output, which we pop and emit here before the next ``AssistantMessage``
starts.
"""
unresolved = [
(tid, info.get("name", "unknown"))
for tid, info in self.current_tool_calls.items()
if tid not in self.resolved_tool_calls
]
if not unresolved:
return
logger.info(
"[SDK] Flushing %d unresolved tool call(s): %s",
len(unresolved),
", ".join(f"{name}({tid[:12]})" for tid, name in unresolved),
)
flushed = False
for tool_id, tool_info in self.current_tool_calls.items():
if tool_id in self.resolved_tool_calls:
continue
tool_name = tool_info.get("name", "unknown")
for tool_id, tool_name in unresolved:
output = pop_pending_tool_output(tool_name)
if output is not None:
responses.append(
@@ -245,9 +256,11 @@ class SDKResponseAdapter:
)
self.resolved_tool_calls.add(tool_id)
flushed = True
logger.debug(
f"Flushed pending output for built-in tool {tool_name} "
f"(call {tool_id})"
logger.info(
"[SDK] Flushed stashed output for %s (call %s, %d chars)",
tool_name,
tool_id[:12],
len(output),
)
else:
# No output available — emit an empty output so the frontend
@@ -263,9 +276,11 @@ class SDKResponseAdapter:
)
self.resolved_tool_calls.add(tool_id)
flushed = True
logger.debug(
f"Flushed empty output for unresolved tool {tool_name} "
f"(call {tool_id})"
logger.warning(
"[SDK] Flushed EMPTY output for unresolved tool %s "
"(call %s) — stash was empty",
tool_name,
tool_id[:12],
)
if flushed and self.step_open:

View File

@@ -364,3 +364,151 @@ def test_full_conversation_flow():
"StreamFinishStep", # step 2 closed
"StreamFinish",
]
# -- Flush unresolved tool calls --------------------------------------------
def test_flush_unresolved_at_result_message():
"""Built-in tools (WebSearch) without UserMessage results get flushed at ResultMessage."""
adapter = _adapter()
all_responses: list[StreamBaseResponse] = []
# 1. Init
all_responses.extend(
adapter.convert_message(SystemMessage(subtype="init", data={}))
)
# 2. Tool use (built-in tool — no MCP prefix)
all_responses.extend(
adapter.convert_message(
AssistantMessage(
content=[
ToolUseBlock(id="ws-1", name="WebSearch", input={"query": "test"})
],
model="test",
)
)
)
# 3. No UserMessage for this tool — go straight to ResultMessage
all_responses.extend(
adapter.convert_message(
ResultMessage(
subtype="success",
duration_ms=100,
duration_api_ms=50,
is_error=False,
num_turns=1,
session_id="s1",
)
)
)
types = [type(r).__name__ for r in all_responses]
assert types == [
"StreamStart",
"StreamStartStep",
"StreamToolInputStart",
"StreamToolInputAvailable",
"StreamToolOutputAvailable", # flushed with empty output
"StreamFinishStep", # step closed by flush
"StreamFinish",
]
# The flushed output should be empty (no stash available)
output_event = [
r for r in all_responses if isinstance(r, StreamToolOutputAvailable)
][0]
assert output_event.toolCallId == "ws-1"
assert output_event.toolName == "WebSearch"
assert output_event.output == ""
def test_flush_unresolved_at_next_assistant_message():
"""Built-in tools get flushed when the next AssistantMessage arrives."""
adapter = _adapter()
all_responses: list[StreamBaseResponse] = []
# 1. Init
all_responses.extend(
adapter.convert_message(SystemMessage(subtype="init", data={}))
)
# 2. Tool use (built-in — no UserMessage will come)
all_responses.extend(
adapter.convert_message(
AssistantMessage(
content=[
ToolUseBlock(id="ws-1", name="WebSearch", input={"query": "test"})
],
model="test",
)
)
)
# 3. Next AssistantMessage triggers flush before processing its blocks
all_responses.extend(
adapter.convert_message(
AssistantMessage(
content=[TextBlock(text="Here are the results")], model="test"
)
)
)
types = [type(r).__name__ for r in all_responses]
assert types == [
"StreamStart",
"StreamStartStep",
"StreamToolInputStart",
"StreamToolInputAvailable",
# Flush at next AssistantMessage:
"StreamToolOutputAvailable",
"StreamFinishStep", # step closed by flush
# New step for continuation text:
"StreamStartStep",
"StreamTextStart",
"StreamTextDelta",
]
def test_flush_with_stashed_output():
"""Stashed output from PostToolUse hook is used when flushing."""
from .tool_adapter import _pending_tool_outputs, stash_pending_tool_output
adapter = _adapter()
# Simulate PostToolUse hook stashing output
_pending_tool_outputs.set({})
stash_pending_tool_output("WebSearch", "Search result: 5 items found")
all_responses: list[StreamBaseResponse] = []
# Tool use
all_responses.extend(
adapter.convert_message(
AssistantMessage(
content=[
ToolUseBlock(id="ws-1", name="WebSearch", input={"query": "test"})
],
model="test",
)
)
)
# ResultMessage triggers flush
all_responses.extend(
adapter.convert_message(
ResultMessage(
subtype="success",
duration_ms=100,
duration_api_ms=50,
is_error=False,
num_turns=1,
session_id="s1",
)
)
)
output_events = [
r for r in all_responses if isinstance(r, StreamToolOutputAvailable)
]
assert len(output_events) == 1
assert output_events[0].output == "Search result: 5 items found"
# Cleanup
_pending_tool_outputs.set({}) # type: ignore[arg-type]

View File

@@ -7,6 +7,7 @@ import os
import uuid
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from typing import Any
from backend.util.exceptions import NotFoundError
@@ -50,7 +51,6 @@ from .tool_adapter import (
from .transcript import (
cleanup_cli_project_dir,
download_transcript,
read_transcript_file,
upload_transcript,
validate_transcript,
write_transcript_to_tempfile,
@@ -65,14 +65,20 @@ _background_tasks: set[asyncio.Task[Any]] = set()
@dataclass
class CapturedTranscript:
"""Info captured by the SDK Stop hook for stateless --resume."""
"""Transcript built from raw SDK output for stateless --resume.
The CLI does not write JSONL files in SDK mode, so we capture the raw
JSON messages from the CLI's stdout and build the transcript ourselves.
"""
raw_entries: list[str] = dataclass_field(default_factory=list)
"""Raw JSON lines captured from the SDK output (non-ephemeral only)."""
path: str = ""
sdk_session_id: str = ""
@property
def available(self) -> bool:
return bool(self.path)
return bool(self.raw_entries)
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
@@ -505,6 +511,7 @@ async def stream_chat_completion_sdk(
# even if _make_sdk_cwd raises (in that case it stays as "").
sdk_cwd = ""
use_resume = False
current_message = message or ""
try:
# Use a session-specific temp dir to avoid cleanup race conditions
@@ -533,13 +540,14 @@ async def stream_chat_completion_sdk(
sdk_model = _resolve_sdk_model()
# --- Transcript capture via Stop hook ---
# --- Transcript capture from SDK output ---
# The CLI does not write JSONL files in SDK mode. Instead
# we capture the raw JSON from the CLI stdout and build
# the transcript for --resume ourselves.
captured_transcript = CapturedTranscript()
def _on_stop(transcript_path: str, sdk_session_id: str) -> None:
captured_transcript.path = transcript_path
def _on_stop(_transcript_path: str, sdk_session_id: str) -> None:
captured_transcript.sdk_session_id = sdk_session_id
logger.debug(f"[SDK] Stop hook: path={transcript_path!r}")
security_hooks = create_security_hooks(
user_id,
@@ -552,6 +560,7 @@ async def stream_chat_completion_sdk(
resume_file: str | None = None
use_resume = False
transcript_msg_count = 0 # watermark: session.messages length at upload
downloaded_transcript_content: str | None = None
if config.claude_agent_use_resume and user_id and len(session.messages) > 1:
dl = await download_transcript(user_id, session_id)
@@ -562,6 +571,7 @@ async def stream_chat_completion_sdk(
if resume_file:
use_resume = True
transcript_msg_count = dl.message_count
downloaded_transcript_content = dl.content
logger.debug(
f"[SDK] Using --resume ({len(dl.content)}B, "
f"msg_count={transcript_msg_count})"
@@ -664,17 +674,37 @@ async def stream_chat_completion_sdk(
# asyncio.timeout() is preferred over asyncio.wait_for()
# because wait_for wraps in a separate Task whose cancellation
# can leave the async generator in a broken state.
msg_iter = client.receive_messages().__aiter__()
#
# We iterate over the internal query's raw dicts instead
# of the parsed Messages so we can capture them for the
# transcript (the CLI does not write JSONL files in SDK
# mode).
from claude_agent_sdk._internal.message_parser import (
parse_message as _parse_sdk_msg,
)
assert client._query is not None # set by connect()
msg_iter = client._query.receive_messages().__aiter__()
while not stream_completed:
try:
async with asyncio.timeout(_HEARTBEAT_INTERVAL):
sdk_msg = await msg_iter.__anext__()
raw_data = await msg_iter.__anext__()
except TimeoutError:
yield StreamHeartbeat()
continue
except StopAsyncIteration:
break
# Capture non-ephemeral entries for transcript.
# stream_event = streaming deltas (redundant with
# final assistant message).
msg_type = raw_data.get("type", "")
if msg_type != "stream_event":
captured_transcript.raw_entries.append(
json.dumps(raw_data, separators=(",", ":"))
)
sdk_msg = _parse_sdk_msg(raw_data)
logger.debug(
f"[SDK] Received: {type(sdk_msg).__name__} "
f"{getattr(sdk_msg, 'subtype', '')}"
@@ -756,33 +786,23 @@ async def stream_chat_completion_sdk(
session.messages.append(assistant_response)
# --- Upload transcript for next-turn --resume ---
# After async with the SDK task group has exited, so the Stop
# hook has already fired and the CLI has been SIGTERMed. The
# CLI uses appendFileSync, so all writes are safely on disk.
# The CLI does not write JSONL files in SDK mode. Instead
# we build the transcript from the raw JSON we captured
# during the streaming loop above.
if config.claude_agent_use_resume and user_id:
# With --resume the CLI appends to the resume file (most
# complete). Otherwise use the Stop hook path.
if use_resume and resume_file:
raw_transcript = read_transcript_file(resume_file)
logger.debug("[SDK] Transcript source: resume file")
elif captured_transcript.path:
raw_transcript = read_transcript_file(captured_transcript.path)
logger.debug(
"[SDK] Transcript source: stop hook (%s), " "read result: %s",
captured_transcript.path,
f"{len(raw_transcript)}B" if raw_transcript else "None",
)
else:
raw_transcript = None
if not raw_transcript:
logger.debug(
"[SDK] No usable transcript — CLI file had no "
"conversation entries (expected for first turn "
"without --resume)"
)
raw_transcript = _build_transcript(
captured_entries=captured_transcript.raw_entries,
user_message=current_message,
session_id=session_id,
previous_transcript=downloaded_transcript_content,
)
if raw_transcript:
logger.info(
"[SDK] Uploading transcript (%dB, %d new entries)",
len(raw_transcript),
len(captured_transcript.raw_entries),
)
# Shield the upload from generator cancellation so a
# client disconnect / page refresh doesn't lose the
# transcript. The upload must finish even if the SSE
@@ -795,6 +815,12 @@ async def stream_chat_completion_sdk(
message_count=len(session.messages),
)
)
else:
logger.warning(
"[SDK] No transcript to upload for %s " "(%d captured entries)",
session_id,
len(captured_transcript.raw_entries),
)
except ImportError:
raise RuntimeError(
@@ -826,6 +852,67 @@ async def stream_chat_completion_sdk(
_cleanup_sdk_tool_results(sdk_cwd)
def _build_transcript(
captured_entries: list[str],
user_message: str,
session_id: str,
previous_transcript: str | None = None,
) -> str | None:
"""Build a JSONL transcript from captured SDK output for ``--resume``.
The Claude CLI does not write JSONL transcript files in SDK mode
(``--output-format stream-json``). This function reconstructs the
transcript from the raw JSON messages we captured from the CLI's stdout
during the streaming loop.
Args:
captured_entries: Raw JSON lines from the CLI output (non-ephemeral).
user_message: The user's original message for this turn.
session_id: Chat session identifier.
previous_transcript: JSONL content of the previous transcript
(downloaded from bucket when using ``--resume``).
Returns:
Complete JSONL string ready for upload, or ``None`` if the entries
don't constitute a valid transcript.
"""
if not captured_entries:
return None
parts: list[str] = []
# 1. Include the previous transcript (old turns)
if previous_transcript:
parts.append(previous_transcript.rstrip("\n"))
# 2. Add a synthetic user entry for this turn.
# The CLI does not echo user messages sent via stdin, so we construct
# one. The uuid/parentUuid fields are optional for --resume.
user_entry = {
"type": "user",
"uuid": str(uuid.uuid4()),
"parentUuid": "",
"session_id": session_id,
"message": {"role": "user", "content": user_message},
}
parts.append(json.dumps(user_entry, separators=(",", ":")))
# 3. Append the raw CLI output entries (system init, assistant, result, …)
parts.extend(captured_entries)
raw = "\n".join(parts) + "\n"
if not validate_transcript(raw):
logger.warning(
"[SDK] Built transcript not valid (%d entries, %dB)",
len(captured_entries),
len(raw),
)
return None
return raw
async def _try_upload_transcript(
user_id: str,
session_id: str,

View File

@@ -58,6 +58,7 @@ function toToolInput(rawArguments: unknown): unknown {
export function convertChatSessionMessagesToUiMessages(
sessionId: string,
rawMessages: unknown[],
options?: { isComplete?: boolean },
): UIMessage<unknown, UIDataTypes, UITools>[] {
const messages = coerceSessionChatMessages(rawMessages);
const toolOutputsByCallId = new Map<string, unknown>();
@@ -104,6 +105,16 @@ export function convertChatSessionMessagesToUiMessages(
input,
output: typeof output === "string" ? safeJsonParse(output) : output,
});
} else if (options?.isComplete) {
// Session is complete (no active stream) but this tool call has
// no output in the DB — mark as completed to stop stale spinners.
parts.push({
type: `tool-${toolName}`,
toolCallId,
state: "output-available",
input,
output: "",
});
} else {
parts.push({
type: `tool-${toolName}`,

View File

@@ -40,16 +40,6 @@ export function useChatSession() {
}
}, [sessionId, queryClient]);
// Memoize so the effect in useCopilotPage doesn't infinite-loop on a new
// array reference every render. Re-derives only when query data changes.
const hydratedMessages = useMemo(() => {
if (sessionQuery.data?.status !== 200 || !sessionId) return undefined;
return convertChatSessionMessagesToUiMessages(
sessionId,
sessionQuery.data.data.messages ?? [],
);
}, [sessionQuery.data, sessionId]);
// Expose active_stream info so the caller can trigger manual resume
// after hydration completes (rather than relying on AI SDK's built-in
// resume which fires before hydration).
@@ -58,6 +48,19 @@ export function useChatSession() {
return !!sessionQuery.data.data.active_stream;
}, [sessionQuery.data]);
// Memoize so the effect in useCopilotPage doesn't infinite-loop on a new
// array reference every render. Re-derives only when query data changes.
// When the session is complete (no active stream), mark dangling tool
// calls as completed so stale spinners don't persist after refresh.
const hydratedMessages = useMemo(() => {
if (sessionQuery.data?.status !== 200 || !sessionId) return undefined;
return convertChatSessionMessagesToUiMessages(
sessionId,
sessionQuery.data.data.messages ?? [],
{ isComplete: !hasActiveStream },
);
}, [sessionQuery.data, sessionId, hasActiveStream]);
const { mutateAsync: createSessionMutation, isPending: isCreatingSession } =
usePostV2CreateSession({
mutation: {

View File

@@ -9,12 +9,26 @@ import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
import { useChat } from "@ai-sdk/react";
import { useQueryClient } from "@tanstack/react-query";
import { DefaultChatTransport } from "ai";
import type { UIMessage } from "ai";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useChatSession } from "./useChatSession";
import { useLongRunningToolPolling } from "./hooks/useLongRunningToolPolling";
const STREAM_START_TIMEOUT_MS = 12_000;
/** Mark any in-progress tool parts as completed so spinners stop. */
function resolveInProgressTools(messages: UIMessage[]): UIMessage[] {
return messages.map((msg) => ({
...msg,
parts: msg.parts.map((part) =>
"state" in part &&
(part.state === "input-streaming" || part.state === "input-available")
? { ...part, state: "output-available" as const, output: "" }
: part,
),
}));
}
export function useCopilotPage() {
const { isUserLoading, isLoggedIn } = useSupabase();
const [isDrawerOpen, setIsDrawerOpen] = useState(false);
@@ -152,6 +166,18 @@ export function useCopilotPage() {
resumeStream();
}, [hasActiveStream, sessionId, hydratedMessages, status, resumeStream]);
// When the stream finishes, resolve any tool parts still showing spinners.
// This can happen if the backend didn't emit StreamToolOutputAvailable for
// a tool call before sending StreamFinish (e.g. SDK built-in tools).
const prevStatusRef = useRef(status);
useEffect(() => {
const prev = prevStatusRef.current;
prevStatusRef.current = status;
if (prev === "streaming" && status === "ready") {
setMessages((msgs) => resolveInProgressTools(msgs));
}
}, [status, setMessages]);
// Poll session endpoint when a long-running tool (create_agent, edit_agent)
// is in progress. When the backend completes, the session data will contain
// the final tool output — this hook detects the change and updates messages.