fix(mothership): queue supersede crash (#4297)

* fix(mothership): queue supersede crash

* add test

* abort observed marker
This commit is contained in:
Vikhyath Mondreti
2026-04-24 18:31:46 -07:00
committed by GitHub
parent f16d17ba49
commit df581c3efb
9 changed files with 287 additions and 19 deletions

View File

@@ -10,13 +10,24 @@ import {
MothershipStreamV1ToolOutcome,
MothershipStreamV1ToolPhase,
} from '@/lib/copilot/generated/mothership-stream-v1'
vi.mock('@/lib/copilot/request/session', async () => {
const actual = await vi.importActual<typeof import('@/lib/copilot/request/session')>(
'@/lib/copilot/request/session'
)
return {
...actual,
hasAbortMarker: vi.fn().mockResolvedValue(false),
}
})
import {
buildPreviewContentUpdate,
decodeJsonStringPrefix,
extractEditContent,
runStreamLoop,
} from '@/lib/copilot/request/go/stream'
import { createEvent } from '@/lib/copilot/request/session'
import { AbortReason, createEvent, hasAbortMarker } from '@/lib/copilot/request/session'
import { RequestTraceV1Outcome, TraceCollector } from '@/lib/copilot/request/trace'
import type { ExecutionContext, StreamingContext } from '@/lib/copilot/request/types'
@@ -285,6 +296,137 @@ describe('copilot go stream helpers', () => {
).toBe(true)
})
it('reclassifies as aborted when the body closes without terminal but the abort marker is set', async () => {
const textEvent = createEvent({
streamId: 'stream-1',
cursor: '1',
seq: 1,
requestId: 'req-1',
type: MothershipStreamV1EventType.text,
payload: {
channel: 'assistant',
text: 'partial response',
},
})
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
vi.mocked(hasAbortMarker).mockResolvedValueOnce(true)
const context = createStreamingContext()
const execContext: ExecutionContext = {
userId: 'user-1',
workflowId: 'workflow-1',
}
await runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
timeout: 1000,
})
expect(hasAbortMarker).toHaveBeenCalledWith(context.messageId)
expect(context.wasAborted).toBe(true)
expect(
context.errors.some((message) =>
message.includes('Copilot backend stream ended before a terminal event')
)
).toBe(false)
})
it('invokes onAbortObserved with MarkerObservedAtBodyClose when reclassifying via the abort marker', async () => {
const textEvent = createEvent({
streamId: 'stream-1',
cursor: '1',
seq: 1,
requestId: 'req-1',
type: MothershipStreamV1EventType.text,
payload: {
channel: 'assistant',
text: 'partial response',
},
})
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
vi.mocked(hasAbortMarker).mockResolvedValueOnce(true)
const context = createStreamingContext()
const execContext: ExecutionContext = {
userId: 'user-1',
workflowId: 'workflow-1',
}
const onAbortObserved = vi.fn()
await runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
timeout: 1000,
onAbortObserved,
})
expect(onAbortObserved).toHaveBeenCalledTimes(1)
expect(onAbortObserved).toHaveBeenCalledWith(AbortReason.MarkerObservedAtBodyClose)
expect(context.wasAborted).toBe(true)
})
it('does not invoke onAbortObserved when no abort marker is present at body close', async () => {
const textEvent = createEvent({
streamId: 'stream-1',
cursor: '1',
seq: 1,
requestId: 'req-1',
type: MothershipStreamV1EventType.text,
payload: {
channel: 'assistant',
text: 'partial response',
},
})
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
vi.mocked(hasAbortMarker).mockResolvedValueOnce(false)
const context = createStreamingContext()
const execContext: ExecutionContext = {
userId: 'user-1',
workflowId: 'workflow-1',
}
const onAbortObserved = vi.fn()
await expect(
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
timeout: 1000,
onAbortObserved,
})
).rejects.toThrow('Copilot backend stream ended before a terminal event')
expect(onAbortObserved).not.toHaveBeenCalled()
})
it('still fails closed when the body closes without terminal and the abort marker check throws', async () => {
const textEvent = createEvent({
streamId: 'stream-1',
cursor: '1',
seq: 1,
requestId: 'req-1',
type: MothershipStreamV1EventType.text,
payload: {
channel: 'assistant',
text: 'partial response',
},
})
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
vi.mocked(hasAbortMarker).mockRejectedValueOnce(new Error('redis unavailable'))
const context = createStreamingContext()
const execContext: ExecutionContext = {
userId: 'user-1',
workflowId: 'workflow-1',
}
await expect(
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
timeout: 1000,
})
).rejects.toThrow('Copilot backend stream ended before a terminal event')
expect(context.wasAborted).toBe(false)
})
it('fails closed when the shared stream receives an invalid event', async () => {
vi.mocked(fetch).mockResolvedValueOnce(
createSseResponse([

View File

@@ -30,7 +30,9 @@ import {
} from '@/lib/copilot/request/handlers/types'
import { getCopilotTracer } from '@/lib/copilot/request/otel'
import {
AbortReason,
eventToStreamEvent,
hasAbortMarker,
isSubagentSpanStreamEvent,
parsePersistedStreamEventEnvelope,
} from '@/lib/copilot/request/session'
@@ -436,16 +438,32 @@ export async function runStreamLoop(
})
if (!context.streamComplete && !abortSignal?.aborted && !context.wasAborted) {
const streamPath = new URL(fetchUrl).pathname
const message = `Copilot backend stream ended before a terminal event on ${streamPath}`
context.errors.push(message)
logger.error('Copilot backend stream ended before a terminal event', {
path: streamPath,
requestId: context.requestId,
messageId: context.messageId,
})
endedOn = CopilotSseCloseReason.ClosedNoTerminal
throw new CopilotBackendError(message, { status: 503 })
let abortRequested = false
try {
abortRequested = await hasAbortMarker(context.messageId)
} catch (error) {
logger.warn('Failed to read abort marker at body close', {
streamId: context.messageId,
error: error instanceof Error ? error.message : String(error),
})
}
if (abortRequested) {
options.onAbortObserved?.(AbortReason.MarkerObservedAtBodyClose)
context.wasAborted = true
endedOn = CopilotSseCloseReason.Aborted
} else {
const streamPath = new URL(fetchUrl).pathname
const message = `Copilot backend stream ended before a terminal event on ${streamPath}`
context.errors.push(message)
logger.error('Copilot backend stream ended before a terminal event', {
path: streamPath,
requestId: context.requestId,
messageId: context.messageId,
})
endedOn = CopilotSseCloseReason.ClosedNoTerminal
throw new CopilotBackendError(message, { status: 503 })
}
}
} catch (error) {
if (error instanceof FatalSseEventError && !context.errors.includes(error.message)) {

View File

@@ -53,10 +53,10 @@ export async function runHeadlessCopilotLifecycle(
simRequestId,
otelContext,
})
outcome = options.abortSignal?.aborted
? RequestTraceV1Outcome.cancelled
: result.success
? RequestTraceV1Outcome.success
outcome = result.success
? RequestTraceV1Outcome.success
: options.abortSignal?.aborted || result.cancelled
? RequestTraceV1Outcome.cancelled
: RequestTraceV1Outcome.error
return result
} catch (error) {

View File

@@ -6,7 +6,10 @@ import { propagation, trace } from '@opentelemetry/api'
import { W3CTraceContextPropagator } from '@opentelemetry/core'
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { MothershipStreamV1EventType } from '@/lib/copilot/generated/mothership-stream-v1'
import {
MothershipStreamV1CompletionStatus,
MothershipStreamV1EventType,
} from '@/lib/copilot/generated/mothership-stream-v1'
const {
runCopilotLifecycle,
@@ -60,6 +63,7 @@ vi.mock('@/lib/copilot/request/session', () => ({
registerActiveStream: vi.fn(),
unregisterActiveStream: vi.fn(),
startAbortPoller: vi.fn().mockReturnValue(setInterval(() => {}, 999999)),
isExplicitStopReason: vi.fn().mockReturnValue(false),
SSE_RESPONSE_HEADERS: {},
StreamWriter: vi.fn().mockImplementation(() => ({
attach: vi.fn().mockImplementation((ctrl: ReadableStreamDefaultController) => {
@@ -211,6 +215,46 @@ describe('createSSEStream terminal error handling', () => {
expect(scheduleBufferCleanup).toHaveBeenCalledWith('stream-1')
})
it('publishes a cancelled completion (not an error) when the orchestrator reports cancelled without abortSignal aborted', async () => {
runCopilotLifecycle.mockResolvedValue({
success: false,
cancelled: true,
content: '',
contentBlocks: [],
toolCalls: [],
})
const stream = createSSEStream({
requestPayload: { message: 'hello' },
userId: 'user-1',
streamId: 'stream-1',
executionId: 'exec-1',
runId: 'run-1',
currentChat: null,
isNewChat: false,
message: 'hello',
titleModel: 'gpt-5.4',
requestId: 'req-cancelled',
orchestrateOptions: {},
})
await drainStream(stream)
expect(appendEvent).not.toHaveBeenCalledWith(
expect.objectContaining({
type: MothershipStreamV1EventType.error,
})
)
expect(appendEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: MothershipStreamV1EventType.complete,
payload: expect.objectContaining({
status: MothershipStreamV1CompletionStatus.cancelled,
}),
})
)
})
it('passes an OTel context into the streaming lifecycle', async () => {
let lifecycleTraceparent = ''
runCopilotLifecycle.mockImplementation(async (_payload, options) => {

View File

@@ -249,6 +249,11 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
onEvent: async (event) => {
await publisher.publish(event)
},
onAbortObserved: (reason) => {
if (!abortController.signal.aborted) {
abortController.abort(reason)
}
},
})
lifecycleResult = result
@@ -266,7 +271,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
// 3. Otherwise → error.
outcome = result.success
? RequestTraceV1Outcome.success
: abortController.signal.aborted || publisher.clientDisconnected
: result.cancelled || abortController.signal.aborted || publisher.clientDisconnected
? RequestTraceV1Outcome.cancelled
: RequestTraceV1Outcome.error
if (outcome === RequestTraceV1Outcome.cancelled) {

View File

@@ -22,6 +22,12 @@ export const AbortReason = {
* that the node that DID receive it wrote, and aborts on the poll.
*/
RedisPoller: 'redis_abort_marker:poller',
/**
* Cross-process stop: same root cause as `RedisPoller`, but observed
* by `runStreamLoop` at body close (the Go body ended before the
* 250ms poller's next tick) rather than by the polling timer.
*/
MarkerObservedAtBodyClose: 'redis_abort_marker:body_close',
/** Internal timeout on the outbound explicit-abort fetch to Go. */
ExplicitAbortFetchTimeout: 'timeout:go_explicit_abort_fetch',
} as const
@@ -38,5 +44,9 @@ export type AbortReasonValue = (typeof AbortReason)[keyof typeof AbortReason]
* stops, mirroring `requestctx.IsExplicitUserStop` on the Go side.
*/
export function isExplicitStopReason(reason: unknown): boolean {
return reason === AbortReason.UserStop || reason === AbortReason.RedisPoller
return (
reason === AbortReason.UserStop ||
reason === AbortReason.RedisPoller ||
reason === AbortReason.MarkerObservedAtBodyClose
)
}

View File

@@ -98,6 +98,47 @@ describe('startAbortPoller heartbeat', () => {
}
})
it('aborts the controller before clearing the marker so the marker is never observable as cleared while the signal is still unaborted', async () => {
const controller = new AbortController()
const streamId = 'stream-order-1'
let signalAbortedWhenMarkerCleared: boolean | null = null
mockClearAbortMarker.mockImplementationOnce(async () => {
signalAbortedWhenMarkerCleared = controller.signal.aborted
})
mockHasAbortMarker.mockResolvedValueOnce(true)
const interval = startAbortPoller(streamId, controller, {})
try {
await vi.advanceTimersByTimeAsync(300)
expect(mockClearAbortMarker).toHaveBeenCalledWith(streamId)
expect(signalAbortedWhenMarkerCleared).toBe(true)
expect(controller.signal.aborted).toBe(true)
} finally {
clearInterval(interval)
}
})
it('does not clear the marker when the signal is already aborted (no double abort)', async () => {
const controller = new AbortController()
controller.abort('preexisting')
const streamId = 'stream-order-2'
mockHasAbortMarker.mockResolvedValueOnce(true)
const interval = startAbortPoller(streamId, controller, {})
try {
await vi.advanceTimersByTimeAsync(300)
expect(mockClearAbortMarker).not.toHaveBeenCalled()
} finally {
clearInterval(interval)
}
})
it('stops heartbeating after ownership is lost', async () => {
const controller = new AbortController()
const streamId = 'stream-lost'

View File

@@ -17,7 +17,7 @@ const pendingChatStreams = new Map<
{ promise: Promise<void>; resolve: () => void; streamId: string }
>()
const DEFAULT_ABORT_POLL_MS = 1000
const DEFAULT_ABORT_POLL_MS = 250
/**
* TTL for the per-chat stream lock. Kept short so that if the Sim pod

View File

@@ -136,6 +136,14 @@ export interface OrchestratorOptions {
onComplete?: (result: OrchestratorResult) => void | Promise<void>
onError?: (error: Error) => void | Promise<void>
abortSignal?: AbortSignal
/**
* Invoked when the orchestrator infers that the run was aborted via
* an out-of-band signal (currently: a Redis abort marker observed
* at SSE body close). Callers wire this to fire their local
* `AbortController` so `signal.reason` is set and `recordCancelled`
* classifies as `explicit_stop` rather than `unknown`.
*/
onAbortObserved?: (reason: string) => void
interactive?: boolean
}