fix(copilot): non-cancelling heartbeat, incremental saves, frontend reconnection

- Replace asyncio.timeout() with asyncio.wait() for SDK message iteration
  to avoid corrupting the internal anyio stream on timeout (root cause of
  tool outputs getting stuck)
- Add CancelledError handling + pending task cleanup in finally block
- Fix _end_text_if_open([]) discarding StreamTextEnd events (Sentry bug)
- Save session to DB after each tool input/output for cross-device recovery
- Optimize incremental saves by passing existing_message_count to skip
  redundant DB count queries
- Frontend: invalidate session cache + reset resume ref on stream end
  so SSE reconnection works after drops
This commit is contained in:
Zamil Majdy
2026-02-20 15:06:37 +07:00
parent e1e3b6094e
commit 37355f7581
3 changed files with 300 additions and 141 deletions

View File

@@ -432,13 +432,23 @@ async def _get_session_from_db(session_id: str) -> ChatSession | None:
return session
async def upsert_chat_session(session: ChatSession) -> ChatSession:
async def upsert_chat_session(
session: ChatSession,
*,
existing_message_count: int | None = None,
) -> ChatSession:
"""Update a chat session in both cache and database.
Uses session-level locking to prevent race conditions when concurrent
operations (e.g., background title update and main stream handler)
attempt to upsert the same session simultaneously.
Args:
existing_message_count: If provided, skip the DB query to count
existing messages. The caller is responsible for tracking this
accurately. Useful for incremental saves in a streaming loop
where the caller already knows how many messages are persisted.
Raises:
DatabaseError: If the database write fails. The cache is still updated
as a best-effort optimization, but the error is propagated to ensure
@@ -450,15 +460,20 @@ async def upsert_chat_session(session: ChatSession) -> ChatSession:
async with lock:
# Get existing message count from DB for incremental saves
existing_message_count = await chat_db().get_chat_session_message_count(
session.session_id
)
if existing_message_count is None:
existing_message_count = await chat_db().get_chat_session_message_count(
session.session_id
)
db_error: Exception | None = None
# Save to database (primary storage)
try:
await _save_session_to_db(session, existing_message_count)
await _save_session_to_db(
session,
existing_message_count,
skip_existence_check=existing_message_count > 0,
)
except Exception as e:
logger.error(
f"Failed to save session {session.session_id} to database: {e}"
@@ -489,21 +504,31 @@ async def upsert_chat_session(session: ChatSession) -> ChatSession:
async def _save_session_to_db(
session: ChatSession, existing_message_count: int
session: ChatSession,
existing_message_count: int,
*,
skip_existence_check: bool = False,
) -> None:
"""Save or update a chat session in the database."""
"""Save or update a chat session in the database.
Args:
skip_existence_check: When True, skip the ``get_chat_session`` query
and assume the session row already exists. Saves one DB round trip
for incremental saves during streaming.
"""
db = chat_db()
# Check if session exists in DB
existing = await db.get_chat_session(session.session_id)
if not skip_existence_check:
# Check if session exists in DB
existing = await db.get_chat_session(session.session_id)
if not existing:
# Create new session
await db.create_chat_session(
session_id=session.session_id,
user_id=session.user_id,
)
existing_message_count = 0
if not existing:
# Create new session
await db.create_chat_session(
session_id=session.session_id,
user_id=session.user_id,
)
existing_message_count = 0
# Calculate total tokens from usage
total_prompt = sum(u.prompt_tokens for u in session.usage)

View File

@@ -715,149 +715,249 @@ async def stream_chat_completion_sdk(
accumulated_tool_calls: list[dict[str, Any]] = []
has_appended_assistant = False
has_tool_results = False
# Track persisted message count to skip DB count queries
# on incremental saves. Initial save happened at line 545.
saved_msg_count = len(session.messages)
# Use an explicit async iterator with timeout to send
# heartbeats when the CLI is idle (e.g. executing tools).
# This prevents proxies/LBs from closing the SSE connection.
# asyncio.timeout() is preferred over asyncio.wait_for()
# because wait_for wraps in a separate Task whose cancellation
# can leave the async generator in a broken state.
# Use an explicit async iterator with non-cancelling heartbeats.
# CRITICAL: we must NOT cancel __anext__() mid-flight — doing so
# (via asyncio.timeout or wait_for) corrupts the SDK's internal
# anyio memory stream, causing StopAsyncIteration on the next
# call and silently dropping all in-flight tool results.
# Instead, wrap __anext__() in a Task and use asyncio.wait()
# with a timeout. On timeout we emit a heartbeat but keep the
# Task alive so it can deliver the next message.
msg_iter = client.receive_messages().__aiter__()
while not stream_completed:
try:
async with asyncio.timeout(_HEARTBEAT_INTERVAL):
sdk_msg = await msg_iter.__anext__()
except TimeoutError:
yield StreamHeartbeat()
continue
except StopAsyncIteration:
break
pending_task: asyncio.Task[Any] | None = None
try:
while not stream_completed:
if pending_task is None:
logger.info(
"[SDK] [%s] Received: %s %s "
"(unresolved=%d, current=%d, resolved=%d)",
session_id[:12],
type(sdk_msg).__name__,
getattr(sdk_msg, "subtype", ""),
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
len(adapter.current_tool_calls),
len(adapter.resolved_tool_calls),
)
async def _next_msg() -> Any:
return await msg_iter.__anext__()
# Race-condition fix: SDK hooks (PostToolUse) are executed
# asynchronously via start_soon() — the next message can
# arrive before the hook stashes output. wait_for_stash()
# awaits an asyncio.Event signaled by stash_pending_tool_output(),
# completing as soon as the hook finishes (typically <1ms).
# The sleep(0) after lets any remaining concurrent hooks complete.
#
# Skip for parallel tool continuations: when the SDK sends
# parallel tool calls as separate AssistantMessages (each
# containing only ToolUseBlocks), we must NOT wait/flush
# — the prior tools are still executing concurrently.
from claude_agent_sdk import (
AssistantMessage,
ResultMessage,
ToolUseBlock,
)
pending_task = asyncio.create_task(_next_msg())
is_parallel_continuation = isinstance(
sdk_msg, AssistantMessage
) and all(isinstance(b, ToolUseBlock) for b in sdk_msg.content)
done, _ = await asyncio.wait(
{pending_task}, timeout=_HEARTBEAT_INTERVAL
)
if (
adapter.has_unresolved_tool_calls
and isinstance(sdk_msg, (AssistantMessage, ResultMessage))
and not is_parallel_continuation
):
if await wait_for_stash(timeout=0.5):
await asyncio.sleep(0)
else:
logger.warning(
"[SDK] [%s] Timed out waiting for PostToolUse "
"hook stash (%d unresolved tool calls)",
session_id[:12],
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
)
for response in adapter.convert_message(sdk_msg):
if isinstance(response, StreamStart):
if not done:
# Timeout — emit heartbeat but keep the task alive
yield StreamHeartbeat()
continue
# Log tool events for debugging visibility issues
if isinstance(
response,
(StreamToolInputAvailable, StreamToolOutputAvailable),
):
extra = ""
if isinstance(response, StreamToolOutputAvailable):
out_len = len(str(response.output))
extra = f", output_len={out_len}"
# Task completed — get result
pending_task = None
try:
sdk_msg = done.pop().result()
except StopAsyncIteration:
logger.info(
"[SDK] [%s] Tool event: %s, tool=%s%s",
"[SDK] [%s] Stream ended normally "
"(StopAsyncIteration)",
session_id[:12],
type(response).__name__,
getattr(response, "toolName", "N/A"),
extra,
)
break
except Exception as stream_err:
# SDK sends {"type": "error"} which raises
# Exception in receive_messages() — capture it
# so the session can still be saved and the
# frontend gets a clean finish.
logger.error(
"[SDK] [%s] Stream error from SDK: %s",
session_id[:12],
stream_err,
exc_info=True,
)
yield StreamError(
errorText=f"SDK stream error: {stream_err}",
code="sdk_stream_error",
)
break
yield response
logger.info(
"[SDK] [%s] Received: %s %s "
"(unresolved=%d, current=%d, resolved=%d)",
session_id[:12],
type(sdk_msg).__name__,
getattr(sdk_msg, "subtype", ""),
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
len(adapter.current_tool_calls),
len(adapter.resolved_tool_calls),
)
if isinstance(response, StreamTextDelta):
delta = response.delta or ""
# After tool results, start a new assistant
# message for the post-tool text.
if has_tool_results and has_appended_assistant:
assistant_response = ChatMessage(
role="assistant", content=delta
)
accumulated_tool_calls = []
has_appended_assistant = False
has_tool_results = False
session.messages.append(assistant_response)
has_appended_assistant = True
# Race-condition fix: SDK hooks (PostToolUse) are
# executed asynchronously via start_soon() — the next
# message can arrive before the hook stashes output.
# wait_for_stash() awaits an asyncio.Event signaled by
# stash_pending_tool_output(), completing as soon as
# the hook finishes (typically <1ms). The sleep(0)
# after lets any remaining concurrent hooks complete.
#
# Skip for parallel tool continuations: when the SDK
# sends parallel tool calls as separate
# AssistantMessages (each containing only
# ToolUseBlocks), we must NOT wait/flush — the prior
# tools are still executing concurrently.
from claude_agent_sdk import (
AssistantMessage,
ResultMessage,
ToolUseBlock,
)
is_parallel_continuation = isinstance(
sdk_msg, AssistantMessage
) and all(isinstance(b, ToolUseBlock) for b in sdk_msg.content)
if (
adapter.has_unresolved_tool_calls
and isinstance(sdk_msg, (AssistantMessage, ResultMessage))
and not is_parallel_continuation
):
if await wait_for_stash(timeout=0.5):
await asyncio.sleep(0)
else:
assistant_response.content = (
assistant_response.content or ""
) + delta
logger.warning(
"[SDK] [%s] Timed out waiting for "
"PostToolUse hook stash "
"(%d unresolved tool calls)",
session_id[:12],
len(adapter.current_tool_calls)
- len(adapter.resolved_tool_calls),
)
for response in adapter.convert_message(sdk_msg):
if isinstance(response, StreamStart):
continue
# Log tool events for debugging
if isinstance(
response,
(
StreamToolInputAvailable,
StreamToolOutputAvailable,
),
):
extra = ""
if isinstance(response, StreamToolOutputAvailable):
out_len = len(str(response.output))
extra = f", output_len={out_len}"
logger.info(
"[SDK] [%s] Tool event: %s, tool=%s%s",
session_id[:12],
type(response).__name__,
getattr(response, "toolName", "N/A"),
extra,
)
yield response
if isinstance(response, StreamTextDelta):
delta = response.delta or ""
# After tool results, start a new assistant
# message for the post-tool text.
if has_tool_results and has_appended_assistant:
assistant_response = ChatMessage(
role="assistant", content=delta
)
accumulated_tool_calls = []
has_appended_assistant = False
has_tool_results = False
session.messages.append(assistant_response)
has_appended_assistant = True
else:
assistant_response.content = (
assistant_response.content or ""
) + delta
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
elif isinstance(response, StreamToolInputAvailable):
accumulated_tool_calls.append(
{
"id": response.toolCallId,
"type": "function",
"function": {
"name": response.toolName,
"arguments": json.dumps(
response.input or {}
),
},
}
)
assistant_response.tool_calls = accumulated_tool_calls
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
# Save before tool execution starts so the
# pending tool call is visible on refresh /
# other devices.
try:
await upsert_chat_session(
session,
existing_message_count=saved_msg_count,
)
saved_msg_count = len(session.messages)
except Exception as save_err:
logger.warning(
"[SDK] [%s] Incremental save " "failed: %s",
session_id[:12],
save_err,
)
elif isinstance(response, StreamToolInputAvailable):
accumulated_tool_calls.append(
{
"id": response.toolCallId,
"type": "function",
"function": {
"name": response.toolName,
"arguments": json.dumps(response.input or {}),
},
}
)
assistant_response.tool_calls = accumulated_tool_calls
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
elif isinstance(response, StreamToolOutputAvailable):
session.messages.append(
ChatMessage(
role="tool",
content=(
response.output
if isinstance(response.output, str)
else str(response.output)
),
tool_call_id=response.toolCallId,
elif isinstance(response, StreamToolOutputAvailable):
session.messages.append(
ChatMessage(
role="tool",
content=(
response.output
if isinstance(response.output, str)
else str(response.output)
),
tool_call_id=response.toolCallId,
)
)
)
has_tool_results = True
has_tool_results = True
# Save after tool completes so the result is
# visible on refresh / other devices.
try:
await upsert_chat_session(
session,
existing_message_count=saved_msg_count,
)
saved_msg_count = len(session.messages)
except Exception as save_err:
logger.warning(
"[SDK] [%s] Incremental save " "failed: %s",
session_id[:12],
save_err,
)
elif isinstance(response, StreamFinish):
stream_completed = True
elif isinstance(response, StreamFinish):
stream_completed = True
except asyncio.CancelledError:
# Task/generator was cancelled (e.g. client disconnect,
# server shutdown). Log and let the safety-net / finally
# blocks handle cleanup.
logger.warning(
"[SDK] [%s] Streaming loop cancelled "
"(asyncio.CancelledError)",
session_id[:12],
)
raise
finally:
# Cancel the pending __anext__ task to avoid a leaked
# coroutine. This is safe even if the task already
# completed.
if pending_task is not None and not pending_task.done():
pending_task.cancel()
try:
await pending_task
except (asyncio.CancelledError, StopAsyncIteration):
pass
# Safety net: if tools are still unresolved after the
# streaming loop (e.g. StopAsyncIteration before ResultMessage,
@@ -899,7 +999,10 @@ async def stream_chat_completion_sdk(
if adapter.step_open:
yield StreamFinishStep()
adapter.step_open = False
adapter._end_text_if_open([])
closing_responses: list[StreamBaseResponse] = []
adapter._end_text_if_open(closing_responses)
for r in closing_responses:
yield r
yield StreamFinish()
stream_completed = True
@@ -965,6 +1068,12 @@ async def stream_chat_completion_sdk(
if not stream_completed:
yield StreamFinish()
except asyncio.CancelledError:
# Client disconnect / server shutdown — log but re-raise so
# the framework can clean up. The finally block still runs
# for transcript upload.
logger.warning("[SDK] [%s] Session cancelled (CancelledError)", session_id[:12])
raise
except Exception as e:
logger.error(f"[SDK] Error: {e}", exc_info=True)
try:

View File

@@ -1,4 +1,5 @@
import {
getGetV2GetSessionQueryKey,
getGetV2ListSessionsQueryKey,
postV2CancelSessionTask,
useDeleteV2DeleteSession,
@@ -187,11 +188,35 @@ export function useCopilotPage() {
});
}, [hydratedMessages, setMessages, status]);
// Ref: tracks whether we've already resumed for a given session.
// Reset when the stream ends so re-resume is possible if the backend
// task is still running (SSE dropped but executor didn't finish).
const hasResumedRef = useRef<string | null>(null);
// When the stream ends (or drops), invalidate the session cache so the
// next hydration fetches fresh messages from the backend. Without this,
// staleTime: Infinity means the cache keeps the pre-stream data forever,
// and any messages added during streaming are lost on remount/navigation.
const prevStatusRef = useRef(status);
useEffect(() => {
const prev = prevStatusRef.current;
prevStatusRef.current = status;
const wasActive = prev === "streaming" || prev === "submitted";
const isIdle = status === "ready" || status === "error";
if (wasActive && isIdle && sessionId) {
queryClient.invalidateQueries({
queryKey: getGetV2GetSessionQueryKey(sessionId),
});
// Allow re-resume if the backend task is still running.
hasResumedRef.current = null;
}
}, [status, sessionId, queryClient]);
// Resume an active stream AFTER hydration completes.
// The backend returns active_stream info when a task is still running.
// We wait for hydration so the AI SDK has the conversation history
// before the resumed stream appends the in-progress assistant message.
const hasResumedRef = useRef<string | null>(null);
useEffect(() => {
if (!hasActiveStream || !sessionId) return;
if (!hydratedMessages || hydratedMessages.length === 0) return;