diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 332665f8dc..b86f6197cc 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -15,6 +15,7 @@ import { processContextsServer, resolveActiveResourceContext, } from '@/lib/copilot/chat/process-contents' +import { finalizeAssistantTurn } from '@/lib/copilot/chat/terminal-state' import { COPILOT_REQUEST_MODES } from '@/lib/copilot/constants' import { createBadRequestResponse, @@ -378,6 +379,7 @@ export async function POST(req: NextRequest) { autoExecuteTools: true, interactive: true, onComplete: buildOnComplete(actualChatId, userMessageIdToUse, tracker.requestId), + onError: buildOnError(actualChatId, userMessageIdToUse, tracker.requestId), }, }) @@ -419,34 +421,16 @@ function buildOnComplete( requestId: string ): (result: OrchestratorResult) => Promise { return async (result) => { - if (!chatId || !result.success) return - - const assistantMessage = buildPersistedAssistantMessage(result, result.requestId) + if (!chatId) return try { - const [row] = await db - .select({ messages: copilotChats.messages }) - .from(copilotChats) - .where(eq(copilotChats.id, chatId)) - .limit(1) - - const msgs: Record[] = Array.isArray(row?.messages) ? row.messages : [] - const userIdx = msgs.findIndex((m: Record) => m.id === userMessageId) - const alreadyHasResponse = - userIdx >= 0 && - userIdx + 1 < msgs.length && - (msgs[userIdx + 1] as Record)?.role === 'assistant' - - if (!alreadyHasResponse) { - await db - .update(copilotChats) - .set({ - messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`, - conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${userMessageId} THEN NULL ELSE ${copilotChats.conversationId} END`, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, chatId)) - } + await finalizeAssistantTurn({ + chatId, + userMessageId, + ...(result.success + ? { assistantMessage: buildPersistedAssistantMessage(result, result.requestId) } + : {}), + }) } catch (error) { logger.error(`[${requestId}] Failed to persist chat messages`, { chatId, @@ -456,6 +440,25 @@ function buildOnComplete( } } +function buildOnError( + chatId: string | undefined, + userMessageId: string, + requestId: string +): () => Promise { + return async () => { + if (!chatId) return + + try { + await finalizeAssistantTurn({ chatId, userMessageId }) + } catch (error) { + logger.error(`[${requestId}] Failed to finalize errored chat stream`, { + chatId, + error: error instanceof Error ? error.message : 'Unknown error', + }) + } + } +} + // --------------------------------------------------------------------------- // GET handler (read-only queries, extracted to queries.ts) // --------------------------------------------------------------------------- diff --git a/apps/sim/app/api/mothership/chat/route.ts b/apps/sim/app/api/mothership/chat/route.ts index d6275e33e7..c6e3089c17 100644 --- a/apps/sim/app/api/mothership/chat/route.ts +++ b/apps/sim/app/api/mothership/chat/route.ts @@ -15,6 +15,7 @@ import { processContextsServer, resolveActiveResourceContext, } from '@/lib/copilot/chat/process-contents' +import { finalizeAssistantTurn } from '@/lib/copilot/chat/terminal-state' import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context' import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request/http' import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/request/lifecycle/start' @@ -295,40 +296,20 @@ export async function POST(req: NextRequest) { interactive: true, onComplete: async (result: OrchestratorResult) => { if (!actualChatId) return - if (!result.success) return - - const assistantMessage = buildPersistedAssistantMessage(result, result.requestId) try { - const [row] = await db - .select({ messages: copilotChats.messages }) - .from(copilotChats) - .where(eq(copilotChats.id, actualChatId)) - .limit(1) - - const msgs: any[] = Array.isArray(row?.messages) ? row.messages : [] - const userIdx = msgs.findIndex((m: any) => m.id === userMessageId) - const alreadyHasResponse = - userIdx >= 0 && - userIdx + 1 < msgs.length && - (msgs[userIdx + 1] as any)?.role === 'assistant' - - if (!alreadyHasResponse) { - await db - .update(copilotChats) - .set({ - messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`, - conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${userMessageId} THEN NULL ELSE ${copilotChats.conversationId} END`, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, actualChatId)) - - taskPubSub?.publishStatusChanged({ - workspaceId, - chatId: actualChatId, - type: 'completed', - }) - } + await finalizeAssistantTurn({ + chatId: actualChatId, + userMessageId, + ...(result.success + ? { assistantMessage: buildPersistedAssistantMessage(result, result.requestId) } + : {}), + }) + taskPubSub?.publishStatusChanged({ + workspaceId, + chatId: actualChatId, + type: 'completed', + }) } catch (error) { logger.error(`[${tracker.requestId}] Failed to persist chat messages`, { chatId: actualChatId, @@ -336,6 +317,25 @@ export async function POST(req: NextRequest) { }) } }, + onError: async () => { + if (!actualChatId) return + try { + await finalizeAssistantTurn({ + chatId: actualChatId, + userMessageId, + }) + taskPubSub?.publishStatusChanged({ + workspaceId, + chatId: actualChatId, + type: 'completed', + }) + } catch (error) { + logger.error(`[${tracker.requestId}] Failed to finalize errored chat stream`, { + chatId: actualChatId, + error: error instanceof Error ? error.message : 'Unknown error', + }) + } + }, }, }) diff --git a/apps/sim/app/api/mothership/chat/stop/route.ts b/apps/sim/app/api/mothership/chat/stop/route.ts index cc5cac32a2..2e65f4a8f1 100644 --- a/apps/sim/app/api/mothership/chat/stop/route.ts +++ b/apps/sim/app/api/mothership/chat/stop/route.ts @@ -70,17 +70,38 @@ export async function POST(req: NextRequest) { const { chatId, streamId, content, contentBlocks } = StopSchema.parse(await req.json()) - await releasePendingChatStream(chatId, streamId) + const [row] = await db + .select({ + workspaceId: copilotChats.workspaceId, + messages: copilotChats.messages, + }) + .from(copilotChats) + .where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, session.user.id))) + .limit(1) + + if (!row) { + await releasePendingChatStream(chatId, streamId) + return NextResponse.json({ success: true }) + } + + const messages: Record[] = Array.isArray(row.messages) ? row.messages : [] + const userIdx = messages.findIndex((message) => message.id === streamId) + const alreadyHasResponse = + userIdx >= 0 && + userIdx + 1 < messages.length && + (messages[userIdx + 1] as Record)?.role === 'assistant' + const canAppendAssistant = + userIdx >= 0 && userIdx === messages.length - 1 && !alreadyHasResponse const setClause: Record = { - conversationId: null, + conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${streamId} THEN NULL ELSE ${copilotChats.conversationId} END`, updatedAt: new Date(), } const hasContent = content.trim().length > 0 const hasBlocks = Array.isArray(contentBlocks) && contentBlocks.length > 0 - if (hasContent || hasBlocks) { + if ((hasContent || hasBlocks) && canAppendAssistant) { const normalized = normalizeMessage({ id: crypto.randomUUID(), role: 'assistant', @@ -95,15 +116,11 @@ export async function POST(req: NextRequest) { const [updated] = await db .update(copilotChats) .set(setClause) - .where( - and( - eq(copilotChats.id, chatId), - eq(copilotChats.userId, session.user.id), - eq(copilotChats.conversationId, streamId) - ) - ) + .where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, session.user.id))) .returning({ workspaceId: copilotChats.workspaceId }) + await releasePendingChatStream(chatId, streamId) + if (updated?.workspaceId) { taskPubSub?.publishStatusChanged({ workspaceId: updated.workspaceId, diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/message-content/components/chat-content/chat-content.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/message-content/components/chat-content/chat-content.tsx index 218b34db84..8ae245f42c 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/message-content/components/chat-content/chat-content.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/message-content/components/chat-content/chat-content.tsx @@ -172,10 +172,16 @@ interface ChatContentProps { content: string isStreaming?: boolean onOptionSelect?: (id: string) => void + smoothStreaming?: boolean } -export function ChatContent({ content, isStreaming = false, onOptionSelect }: ChatContentProps) { - const rendered = useStreamingText(content, isStreaming) +export function ChatContent({ + content, + isStreaming = false, + onOptionSelect, + smoothStreaming = true, +}: ChatContentProps) { + const rendered = useStreamingText(content, isStreaming && smoothStreaming) const parsed = useMemo(() => parseSpecialTags(rendered, isStreaming), [rendered, isStreaming]) const hasSpecialContent = parsed.hasPendingTag || parsed.segments.some((s) => s.type !== 'text') diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/message-content/message-content.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/message-content/message-content.tsx index fe34d53fa6..318a2e89c2 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/message-content/message-content.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/message-content/message-content.tsx @@ -372,6 +372,7 @@ export function MessageContent({ const hasSubagentEnded = blocks.some((b) => b.type === 'subagent_end') const showTrailingThinking = isStreaming && !hasTrailingContent && (hasSubagentEnded || allLastGroupToolsDone) + const hasStructuredSegments = segments.some((segment) => segment.type !== 'text') const lastOpenSubagentGroupId = [...segments] .reverse() .find( @@ -390,6 +391,7 @@ export function MessageContent({ content={segment.content} isStreaming={isStreaming} onOptionSelect={onOptionSelect} + smoothStreaming={!hasStructuredSegments} /> ) case 'agent_group': { diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 6c1031c343..938aea4bed 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -1091,14 +1091,14 @@ export function useChat( break } const tc = blocks[idx].toolCall! - const resultObj = asPayloadRecord(payload.result) + const outputObj = asPayloadRecord(payload.output) const success = typeof payload.success === 'boolean' ? payload.success : payload.status === MothershipStreamV1ToolOutcome.success const isCancelled = - resultObj?.reason === 'user_cancelled' || - resultObj?.cancelledByUser === true || + outputObj?.reason === 'user_cancelled' || + outputObj?.cancelledByUser === true || payload.reason === 'user_cancelled' || payload.cancelledByUser === true || payload.status === MothershipStreamV1ToolOutcome.cancelled @@ -1112,12 +1112,7 @@ export function useChat( tc.streamingArgs = undefined tc.result = { success: !!success, - output: - payload.result !== undefined - ? payload.result - : payload.output !== undefined - ? payload.output - : payload.data, + output: payload.output, error: typeof payload.error === 'string' ? payload.error : undefined, } flush() diff --git a/apps/sim/app/workspace/[workspaceId]/home/types.ts b/apps/sim/app/workspace/[workspaceId]/home/types.ts index 53eb80cda6..dd5625ea57 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/types.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/types.ts @@ -135,6 +135,7 @@ export const ContentBlockType = { subagent: 'subagent', subagent_end: 'subagent_end', subagent_text: 'subagent_text', + subagent_thinking: 'subagent_thinking', options: 'options', stopped: 'stopped', } as const diff --git a/apps/sim/lib/copilot/chat/display-message.ts b/apps/sim/lib/copilot/chat/display-message.ts index b3e6ea3a6c..913df9e30a 100644 --- a/apps/sim/lib/copilot/chat/display-message.ts +++ b/apps/sim/lib/copilot/chat/display-message.ts @@ -46,6 +46,9 @@ function toDisplayBlock(block: PersistedContentBlock): ContentBlock { switch (block.type) { case MothershipStreamV1EventType.text: if (block.lane === 'subagent') { + if (block.channel === 'thinking') { + return { type: ContentBlockType.subagent_thinking, content: block.content } + } return { type: ContentBlockType.subagent_text, content: block.content } } return { type: ContentBlockType.text, content: block.content } diff --git a/apps/sim/lib/copilot/chat/persisted-message.ts b/apps/sim/lib/copilot/chat/persisted-message.ts index 96843a4b40..18f91b6d1e 100644 --- a/apps/sim/lib/copilot/chat/persisted-message.ts +++ b/apps/sim/lib/copilot/chat/persisted-message.ts @@ -113,6 +113,13 @@ function mapContentBlock(block: ContentBlock): PersistedContentBlock { channel: MothershipStreamV1TextChannel.assistant, content: block.content, } + case 'subagent_thinking': + return { + type: MothershipStreamV1EventType.text, + lane: 'subagent', + channel: MothershipStreamV1TextChannel.thinking, + content: block.content, + } case 'tool_call': { if (!block.toolCall) { return { @@ -274,7 +281,7 @@ function normalizeCanonicalBlock(block: RawBlock): PersistedContentBlock { const result: PersistedContentBlock = { type: block.type as MothershipStreamV1EventType, } - if (block.lane === 'main' || block.lane === 'subagent') { + if (block.lane === 'subagent') { result.lane = block.lane } const blockContent = block.content ?? block.text @@ -349,6 +356,15 @@ function normalizeLegacyBlock(block: RawBlock): PersistedContentBlock { } } + if (block.type === 'subagent_thinking') { + return { + type: MothershipStreamV1EventType.text, + lane: 'subagent', + channel: MothershipStreamV1TextChannel.thinking, + content: block.content, + } + } + if (block.type === 'subagent_end') { return { type: MothershipStreamV1EventType.span, diff --git a/apps/sim/lib/copilot/chat/terminal-state.ts b/apps/sim/lib/copilot/chat/terminal-state.ts new file mode 100644 index 0000000000..39f60b7422 --- /dev/null +++ b/apps/sim/lib/copilot/chat/terminal-state.ts @@ -0,0 +1,53 @@ +import { db } from '@sim/db' +import { copilotChats } from '@sim/db/schema' +import { eq, sql } from 'drizzle-orm' +import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' + +interface FinalizeAssistantTurnParams { + chatId: string + userMessageId: string + assistantMessage?: PersistedMessage +} + +/** + * Clear the active stream marker for a chat and optionally append the assistant + * message if a response has not already been persisted immediately after the + * triggering user message. + */ +export async function finalizeAssistantTurn({ + chatId, + userMessageId, + assistantMessage, +}: FinalizeAssistantTurnParams): Promise { + const [row] = await db + .select({ messages: copilotChats.messages }) + .from(copilotChats) + .where(eq(copilotChats.id, chatId)) + .limit(1) + + const messages: Record[] = Array.isArray(row?.messages) ? row.messages : [] + const userIdx = messages.findIndex((message) => message.id === userMessageId) + const alreadyHasResponse = + userIdx >= 0 && + userIdx + 1 < messages.length && + (messages[userIdx + 1] as Record)?.role === 'assistant' + const canAppendAssistant = userIdx >= 0 && userIdx === messages.length - 1 && !alreadyHasResponse + + const baseUpdate = { + conversationId: sql`CASE WHEN ${copilotChats.conversationId} = ${userMessageId} THEN NULL ELSE ${copilotChats.conversationId} END`, + updatedAt: new Date(), + } + + if (assistantMessage && canAppendAssistant) { + await db + .update(copilotChats) + .set({ + ...baseUpdate, + messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`, + }) + .where(eq(copilotChats.id, chatId)) + return + } + + await db.update(copilotChats).set(baseUpdate).where(eq(copilotChats.id, chatId)) +} diff --git a/apps/sim/lib/copilot/generated/mothership-stream-v1.ts b/apps/sim/lib/copilot/generated/mothership-stream-v1.ts index ded9a6b4c3..af27af9950 100644 --- a/apps/sim/lib/copilot/generated/mothership-stream-v1.ts +++ b/apps/sim/lib/copilot/generated/mothership-stream-v1.ts @@ -121,7 +121,7 @@ export interface MothershipStreamV1AdditionalPropertiesMap { */ export interface MothershipStreamV1StreamScope { agentId?: string - lane: 'main' | 'subagent' + lane: 'subagent' parentToolCallId?: string } /** diff --git a/apps/sim/lib/copilot/request/context/request-context.ts b/apps/sim/lib/copilot/request/context/request-context.ts index 0f4fe41b1d..003eef3c85 100644 --- a/apps/sim/lib/copilot/request/context/request-context.ts +++ b/apps/sim/lib/copilot/request/context/request-context.ts @@ -15,6 +15,7 @@ export function createStreamingContext(overrides?: Partial): S toolCalls: new Map(), pendingToolPromises: new Map(), currentThinkingBlock: null, + currentSubagentThinkingBlock: null, isInThinkingBlock: false, subAgentParentToolCallId: undefined, subAgentParentStack: [], diff --git a/apps/sim/lib/copilot/request/go/parser.ts b/apps/sim/lib/copilot/request/go/parser.ts index 1b3b06bc78..78d3e99fc9 100644 --- a/apps/sim/lib/copilot/request/go/parser.ts +++ b/apps/sim/lib/copilot/request/go/parser.ts @@ -2,6 +2,10 @@ import { createLogger } from '@sim/logger' const logger = createLogger('CopilotSseParser') +function normalizeSseLine(line: string): string { + return line.endsWith('\r') ? line.slice(0, -1) : line +} + /** * Processes an SSE stream by calling onEvent for each parsed event. * @@ -32,23 +36,35 @@ export async function processSSEStream( let stopped = false for (const line of lines) { + const normalizedLine = normalizeSseLine(line) if (abortSignal?.aborted) { logger.info('SSE stream aborted mid-chunk (between events)') return } - if (!line.trim()) continue - if (!line.startsWith('data: ')) continue + if (!normalizedLine.trim()) continue + if (!normalizedLine.startsWith('data: ')) continue - const jsonStr = line.slice(6) + const jsonStr = normalizedLine.slice(6) if (jsonStr === '[DONE]') continue + let parsed: unknown try { - if (await onEvent(JSON.parse(jsonStr))) { + parsed = JSON.parse(jsonStr) + } catch (error) { + logger.warn('Failed to parse SSE event', { + preview: jsonStr.slice(0, 200), + error: error instanceof Error ? error.message : String(error), + }) + continue + } + + try { + if (await onEvent(parsed)) { stopped = true break } } catch (error) { - logger.warn('Failed to parse SSE event', { + logger.warn('Failed to handle SSE event', { preview: jsonStr.slice(0, 200), error: error instanceof Error ? error.message : String(error), }) @@ -66,12 +82,29 @@ export async function processSSEStream( throw error } - if (buffer.trim() && buffer.startsWith('data: ')) { + const normalizedBuffer = normalizeSseLine(buffer) + if (normalizedBuffer.trim() && normalizedBuffer.startsWith('data: ')) { + const jsonStr = normalizedBuffer.slice(6) + if (jsonStr === '[DONE]') { + return + } + + let parsed: unknown try { - await onEvent(JSON.parse(buffer.slice(6))) + parsed = JSON.parse(jsonStr) } catch (error) { logger.warn('Failed to parse final SSE buffer', { - preview: buffer.slice(0, 200), + preview: normalizedBuffer.slice(0, 200), + error: error instanceof Error ? error.message : String(error), + }) + return + } + + try { + await onEvent(parsed) + } catch (error) { + logger.warn('Failed to handle final SSE event', { + preview: normalizedBuffer.slice(0, 200), error: error instanceof Error ? error.message : String(error), }) } diff --git a/apps/sim/lib/copilot/request/go/stream.ts b/apps/sim/lib/copilot/request/go/stream.ts index b0eb7284ef..04f15e0055 100644 --- a/apps/sim/lib/copilot/request/go/stream.ts +++ b/apps/sim/lib/copilot/request/go/stream.ts @@ -498,14 +498,14 @@ export async function runStreamLoop( if (handleSubagentRouting(streamEvent, context)) { const handler = subAgentHandlers[streamEvent.type] if (handler) { - handler(streamEvent, context, execContext, options) + await handler(streamEvent, context, execContext, options) } return context.streamComplete || undefined } const handler = sseHandlers[streamEvent.type] if (handler) { - handler(streamEvent, context, execContext, options) + await handler(streamEvent, context, execContext, options) } return context.streamComplete || undefined }) diff --git a/apps/sim/lib/copilot/request/handlers/complete.ts b/apps/sim/lib/copilot/request/handlers/complete.ts index ba7ebda6aa..a6fb3794b8 100644 --- a/apps/sim/lib/copilot/request/handlers/complete.ts +++ b/apps/sim/lib/copilot/request/handlers/complete.ts @@ -1,8 +1,11 @@ import { asRecord, getEventData } from '@/lib/copilot/request/sse-utils' import type { StreamHandler } from './types' +import { flushSubagentThinkingBlock, flushThinkingBlock } from './types' export const handleCompleteEvent: StreamHandler = (event, context) => { const d = getEventData(event) + flushSubagentThinkingBlock(context) + flushThinkingBlock(context) if (!d) { context.streamComplete = true return diff --git a/apps/sim/lib/copilot/request/handlers/error.ts b/apps/sim/lib/copilot/request/handlers/error.ts index 225534d066..24ef780c98 100644 --- a/apps/sim/lib/copilot/request/handlers/error.ts +++ b/apps/sim/lib/copilot/request/handlers/error.ts @@ -1,8 +1,11 @@ import { getEventData } from '@/lib/copilot/request/sse-utils' import type { StreamHandler } from './types' +import { flushSubagentThinkingBlock, flushThinkingBlock } from './types' export const handleErrorEvent: StreamHandler = (event, context) => { const d = getEventData(event) + flushSubagentThinkingBlock(context) + flushThinkingBlock(context) const message = (d?.message || d?.error) as string | undefined if (message) { context.errors.push(message) diff --git a/apps/sim/lib/copilot/request/handlers/handlers.test.ts b/apps/sim/lib/copilot/request/handlers/handlers.test.ts index 674e6a6468..a5676de686 100644 --- a/apps/sim/lib/copilot/request/handlers/handlers.test.ts +++ b/apps/sim/lib/copilot/request/handlers/handlers.test.ts @@ -215,6 +215,29 @@ describe('sse-handlers tool lifecycle', () => { ) }) + it('routes main assistant text with no scope into accumulatedContent', async () => { + await sseHandlers.text( + { + type: MothershipStreamV1EventType.text, + payload: { + channel: MothershipStreamV1TextChannel.assistant, + text: 'hello from main', + }, + } satisfies StreamEvent, + context, + execContext, + { interactive: false, timeout: 1000 } + ) + + expect(context.accumulatedContent).toBe('hello from main') + expect(context.contentBlocks.at(-1)).toEqual( + expect.objectContaining({ + type: 'text', + content: 'hello from main', + }) + ) + }) + it('routes subagent tool calls using the event scope parent tool call id', async () => { executeTool.mockResolvedValueOnce({ success: true, output: { ok: true } }) context.subAgentParentToolCallId = 'wrong-parent' @@ -417,7 +440,7 @@ describe('sse-handlers tool lifecycle', () => { mode: MothershipStreamV1ToolMode.async, phase: MothershipStreamV1ToolPhase.result, success: true, - result: { ok: true }, + output: { ok: true }, }, } satisfies StreamEvent, context, @@ -446,7 +469,7 @@ describe('sse-handlers tool lifecycle', () => { mode: MothershipStreamV1ToolMode.async, phase: MothershipStreamV1ToolPhase.result, success: true, - result: { ok: true }, + output: { ok: true }, }, } satisfies StreamEvent, context, @@ -479,6 +502,31 @@ describe('sse-handlers tool lifecycle', () => { ) }) + it('reads canonical tool result payloads from output only', async () => { + await sseHandlers.tool( + { + type: MothershipStreamV1EventType.tool, + payload: { + toolCallId: 'tool-output-only', + toolName: ReadTool.id, + executor: MothershipStreamV1ToolExecutor.sim, + mode: MothershipStreamV1ToolMode.async, + phase: MothershipStreamV1ToolPhase.result, + success: false, + output: { error: 'output-failure' }, + }, + } satisfies StreamEvent, + context, + execContext, + { onEvent: vi.fn(), interactive: false, timeout: 1000 } + ) + + const updated = context.toolCalls.get('tool-output-only') + expect(updated?.status).toBe(MothershipStreamV1ToolOutcome.error) + expect(updated?.result?.output).toEqual({ error: 'output-failure' }) + expect(updated?.error).toBe('output-failure') + }) + it('executes dynamic sim tools based on payload executor', async () => { isSimExecuted.mockReturnValueOnce(false) executeTool.mockResolvedValueOnce({ success: true, output: { emails: [] } }) diff --git a/apps/sim/lib/copilot/request/handlers/text.ts b/apps/sim/lib/copilot/request/handlers/text.ts index 7c42b68516..a2030c17ce 100644 --- a/apps/sim/lib/copilot/request/handlers/text.ts +++ b/apps/sim/lib/copilot/request/handlers/text.ts @@ -4,7 +4,12 @@ import { } from '@/lib/copilot/generated/mothership-stream-v1' import { getEventData } from '@/lib/copilot/request/sse-utils' import type { StreamHandler, ToolScope } from './types' -import { addContentBlock, getScopedParentToolCallId } from './types' +import { + addContentBlock, + flushSubagentThinkingBlock, + flushThinkingBlock, + getScopedParentToolCallId, +} from './types' export function handleTextEvent(scope: ToolScope): StreamHandler { return (event, context) => { @@ -12,9 +17,26 @@ export function handleTextEvent(scope: ToolScope): StreamHandler { if (scope === 'subagent') { const parentToolCallId = getScopedParentToolCallId(event, context) - if (!parentToolCallId || d?.channel !== MothershipStreamV1TextChannel.assistant) return + if (!parentToolCallId) return const chunk = d?.text as string | undefined if (!chunk) return + if (d?.channel === MothershipStreamV1TextChannel.thinking) { + if (!context.currentSubagentThinkingBlock) { + context.currentSubagentThinkingBlock = { + type: 'subagent_thinking', + content: '', + timestamp: Date.now(), + } + } + context.currentSubagentThinkingBlock.content = `${context.currentSubagentThinkingBlock.content || ''}${chunk}` + return + } + if (context.currentSubagentThinkingBlock) { + flushSubagentThinkingBlock(context) + } + if (context.isInThinkingBlock) { + flushThinkingBlock(context) + } context.subAgentContent[parentToolCallId] = (context.subAgentContent[parentToolCallId] || '') + chunk addContentBlock(context, { type: 'subagent_text', content: chunk }) @@ -24,6 +46,9 @@ export function handleTextEvent(scope: ToolScope): StreamHandler { if (d?.channel === MothershipStreamV1TextChannel.thinking) { const phase = d.phase as string | undefined if (phase === MothershipStreamV1SpanLifecycleEvent.start) { + if (context.isInThinkingBlock) { + flushThinkingBlock(context) + } context.isInThinkingBlock = true context.currentThinkingBlock = { type: 'thinking', @@ -33,21 +58,28 @@ export function handleTextEvent(scope: ToolScope): StreamHandler { return } if (phase === MothershipStreamV1SpanLifecycleEvent.end) { - if (context.currentThinkingBlock) { - context.contentBlocks.push(context.currentThinkingBlock) - } - context.isInThinkingBlock = false - context.currentThinkingBlock = null + flushThinkingBlock(context) return } const chunk = d?.text as string | undefined - if (!chunk || !context.currentThinkingBlock) return + if (!chunk) return + if (!context.currentThinkingBlock) { + context.currentThinkingBlock = { + type: 'thinking', + content: '', + timestamp: Date.now(), + } + context.isInThinkingBlock = true + } context.currentThinkingBlock.content = `${context.currentThinkingBlock.content || ''}${chunk}` return } const chunk = d?.text as string | undefined if (!chunk) return + if (context.isInThinkingBlock) { + flushThinkingBlock(context) + } context.accumulatedContent += chunk addContentBlock(context, { type: 'text', content: chunk }) } diff --git a/apps/sim/lib/copilot/request/handlers/tool.ts b/apps/sim/lib/copilot/request/handlers/tool.ts index 6c9a1cc4f6..dec3766dc9 100644 --- a/apps/sim/lib/copilot/request/handlers/tool.ts +++ b/apps/sim/lib/copilot/request/handlers/tool.ts @@ -102,6 +102,7 @@ function handleResultPhase( ): void { const mainToolCall = ensureTerminalToolCallState(context, toolCallId, toolName) const { success, hasResultData, hasError } = inferToolSuccess(data) + const outputObj = asRecord(data?.output) const status = data?.status === MothershipStreamV1ToolOutcome.cancelled ? MothershipStreamV1ToolOutcome.cancelled @@ -109,7 +110,7 @@ function handleResultPhase( ? MothershipStreamV1ToolOutcome.success : MothershipStreamV1ToolOutcome.error const endTime = Date.now() - const result = hasResultData ? { success, output: data?.result || data?.data } : undefined + const result = hasResultData ? { success, output: data?.output } : undefined if (isSubagent && parentToolCallId) { const toolCalls = context.subAgentToolCalls[parentToolCallId] || [] @@ -119,8 +120,7 @@ function handleResultPhase( subAgentToolCall.endTime = endTime if (result) subAgentToolCall.result = result if (hasError) { - const resultObj = asRecord(data?.result) - subAgentToolCall.error = (data?.error || resultObj.error) as string | undefined + subAgentToolCall.error = (data?.error || outputObj.error) as string | undefined } } } @@ -129,8 +129,7 @@ function handleResultPhase( mainToolCall.endTime = endTime if (result) mainToolCall.result = result if (hasError) { - const resultObj = asRecord(data?.result) - mainToolCall.error = (data?.error || resultObj.error) as string | undefined + mainToolCall.error = (data?.error || outputObj.error) as string | undefined } markToolResultSeen(toolCallId) } diff --git a/apps/sim/lib/copilot/request/handlers/types.ts b/apps/sim/lib/copilot/request/handlers/types.ts index 6966aa7366..48c3e6fdc4 100644 --- a/apps/sim/lib/copilot/request/handlers/types.ts +++ b/apps/sim/lib/copilot/request/handlers/types.ts @@ -38,6 +38,25 @@ export function addContentBlock( }) } +/** + * Flush any open thinking block into contentBlocks and clear the thinking state. + * Safe to call repeatedly. + */ +export function flushThinkingBlock(context: StreamingContext): void { + if (context.currentThinkingBlock) { + context.contentBlocks.push(context.currentThinkingBlock) + } + context.isInThinkingBlock = false + context.currentThinkingBlock = null +} + +export function flushSubagentThinkingBlock(context: StreamingContext): void { + if (context.currentSubagentThinkingBlock) { + context.contentBlocks.push(context.currentSubagentThinkingBlock) + } + context.currentSubagentThinkingBlock = null +} + export function getScopedParentToolCallId( event: StreamEvent, context: StreamingContext @@ -209,11 +228,11 @@ export function inferToolSuccess(data: Record | undefined): { hasResultData: boolean hasError: boolean } { - const resultObj = asRecord(data?.result) - const hasExplicitSuccess = data?.success !== undefined || resultObj.success !== undefined - const explicitSuccess = data?.success ?? resultObj.success - const hasResultData = data?.result !== undefined || data?.data !== undefined - const hasError = !!data?.error || !!resultObj.error + const outputObj = asRecord(data?.output) + const hasExplicitSuccess = data?.success !== undefined + const explicitSuccess = data?.success + const hasResultData = data?.output !== undefined + const hasError = !!data?.error || !!outputObj.error const success = hasExplicitSuccess ? !!explicitSuccess : !hasError return { success, hasResultData, hasError } } diff --git a/apps/sim/lib/copilot/request/types.ts b/apps/sim/lib/copilot/request/types.ts index 5114fa5e5e..e4aa1cb3ff 100644 --- a/apps/sim/lib/copilot/request/types.ts +++ b/apps/sim/lib/copilot/request/types.ts @@ -36,6 +36,7 @@ export const ContentBlockType = { thinking: 'thinking', tool_call: 'tool_call', subagent_text: 'subagent_text', + subagent_thinking: 'subagent_thinking', subagent: 'subagent', } as const export type ContentBlockType = (typeof ContentBlockType)[keyof typeof ContentBlockType] @@ -73,6 +74,7 @@ export interface StreamingContext { }> } currentThinkingBlock: ContentBlock | null + currentSubagentThinkingBlock: ContentBlock | null isInThinkingBlock: boolean subAgentParentToolCallId?: string subAgentParentStack: string[] diff --git a/bun.lock b/bun.lock index 4a60c0668f..8335f0b768 100644 --- a/bun.lock +++ b/bun.lock @@ -11,6 +11,7 @@ "@octokit/rest": "^21.0.0", "glob": "13.0.0", "husky": "9.1.7", + "json-schema-to-typescript": "15.0.4", "lint-staged": "16.0.0", "turbo": "2.9.3", }, @@ -395,6 +396,8 @@ "@anthropic-ai/sdk": ["@anthropic-ai/sdk@0.71.2", "", { "dependencies": { "json-schema-to-ts": "^3.1.1" }, "peerDependencies": { "zod": "^3.25.0 || ^4.0.0" }, "optionalPeers": ["zod"], "bin": { "anthropic-ai-sdk": "bin/cli" } }, "sha512-TGNDEUuEstk/DKu0/TflXAEt+p+p/WhTlFzEnoosvbaDU2LTjm42igSdlL0VijrKpWejtOKxX0b8A7uc+XiSAQ=="], + "@apidevtools/json-schema-ref-parser": ["@apidevtools/json-schema-ref-parser@11.9.3", "", { "dependencies": { "@jsdevtools/ono": "^7.1.3", "@types/json-schema": "^7.0.15", "js-yaml": "^4.1.0" } }, "sha512-60vepv88RwcJtSHrD6MjIL6Ta3SOYbgfnkHb+ppAVK+o9mXprRtulx7VlRl3lN3bbvysAfCS7WMVfhUYemB0IQ=="], + "@ark/schema": ["@ark/schema@0.56.0", "", { "dependencies": { "@ark/util": "0.56.0" } }, "sha512-ECg3hox/6Z/nLajxXqNhgPtNdHWC9zNsDyskwO28WinoFEnWow4IsERNz9AnXRhTZJnYIlAJ4uGn3nlLk65vZA=="], "@ark/util": ["@ark/util@0.56.0", "", {}, "sha512-BghfRC8b9pNs3vBoDJhcta0/c1J1rsoS1+HgVUreMFPdhz/CRAKReAu57YEllNaSy98rWAdY1gE+gFup7OXpgA=="], @@ -839,6 +842,8 @@ "@js-sdsl/ordered-map": ["@js-sdsl/ordered-map@4.4.2", "", {}, "sha512-iUKgm52T8HOE/makSxjqoWhe95ZJA1/G1sYsGev2JDKUSS14KAgg1LHb+Ba+IPow0xflbnSkOsZcO08C7w1gYw=="], + "@jsdevtools/ono": ["@jsdevtools/ono@7.1.3", "", {}, "sha512-4JQNk+3mVzK3xh2rqd6RB4J46qUR19azEHBneZyTZM+c456qOrbbM/5xcR8huNCCcbVt7+UmizG6GuUvPvKUYg=="], + "@jsonhero/path": ["@jsonhero/path@1.0.21", "", {}, "sha512-gVUDj/92acpVoJwsVJ/RuWOaHyG4oFzn898WNGQItLCTQ+hOaVlEaImhwE1WqOTf+l3dGOUkbSiVKlb3q1hd1Q=="], "@kurkle/color": ["@kurkle/color@0.3.4", "", {}, "sha512-M5UknZPHRu3DEDWoipU6sE8PdkZ6Z/S+v4dD+Ke8IaNlpdSQah50lz1KtcFBa2vsdOnwbbnxJwVM4wty6udA5w=="], @@ -1635,6 +1640,8 @@ "@types/json-schema": ["@types/json-schema@7.0.15", "", {}, "sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA=="], + "@types/lodash": ["@types/lodash@4.17.24", "", {}, "sha512-gIW7lQLZbue7lRSWEFql49QJJWThrTFFeIMJdp3eH4tKoxm1OvEPg02rm4wCCSHS0cL3/Fizimb35b7k8atwsQ=="], + "@types/mdast": ["@types/mdast@4.0.4", "", { "dependencies": { "@types/unist": "*" } }, "sha512-kGaNbPh1k7AFzgpud/gMdvIm5xuECykRR+JnWKQno9TAXVa6WIVCGTPvYGekIDL4uwCZQSYbUxNBSb1aUo79oA=="], "@types/mdx": ["@types/mdx@2.0.13", "", {}, "sha512-+OWZQfAYyio6YkJb3HLxDrvnx6SWWDbC0zVPfBRzUk0/nqoDyf6dNxQi3eArPe8rJ473nobTMQ/8Zk+LxJ+Yuw=="], @@ -2707,6 +2714,8 @@ "json-schema-to-ts": ["json-schema-to-ts@3.1.1", "", { "dependencies": { "@babel/runtime": "^7.18.3", "ts-algebra": "^2.0.0" } }, "sha512-+DWg8jCJG2TEnpy7kOm/7/AxaYoaRbjVB4LFZLySZlWn8exGs3A4OLJR966cVvU26N7X9TWxl+Jsw7dzAqKT6g=="], + "json-schema-to-typescript": ["json-schema-to-typescript@15.0.4", "", { "dependencies": { "@apidevtools/json-schema-ref-parser": "^11.5.5", "@types/json-schema": "^7.0.15", "@types/lodash": "^4.17.7", "is-glob": "^4.0.3", "js-yaml": "^4.1.0", "lodash": "^4.17.21", "minimist": "^1.2.8", "prettier": "^3.2.5", "tinyglobby": "^0.2.9" }, "bin": { "json2ts": "dist/src/cli.js" } }, "sha512-Su9oK8DR4xCmDsLlyvadkXzX6+GGXJpbhwoLtOGArAG61dvbW4YQmSEno2y66ahpIdmLMg6YUf/QHLgiwvkrHQ=="], + "json-schema-traverse": ["json-schema-traverse@1.0.0", "", {}, "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug=="], "json5": ["json5@2.2.3", "", { "bin": { "json5": "lib/cli.js" } }, "sha512-XmOWe7eyHYH14cLdVPoyg+GOH3rYX++KpzrylJwSW98t3Nk+U8XOl8FWKOgwtzdb8lXGf6zYwDUzeHMWfxasyg=="], @@ -3977,6 +3986,8 @@ "@antfu/install-pkg/tinyexec": ["tinyexec@1.0.4", "", {}, "sha512-u9r3uZC0bdpGOXtlxUIdwf9pkmvhqJdrVCH9fapQtgy/OeTTMZ1nqH7agtvEfmGui6e1XxjcdrlxvxJvc3sMqw=="], + "@apidevtools/json-schema-ref-parser/js-yaml": ["js-yaml@4.1.1", "", { "dependencies": { "argparse": "^2.0.1" }, "bin": { "js-yaml": "bin/js-yaml.js" } }, "sha512-qQKT4zQxXl8lLwBtHMWwaTcGfFOZviOJet3Oy/xmGk2gZH677CJM9EvtfdSkgWcATZhj/55JZ0rmy3myCT5lsA=="], + "@asamuzakjp/css-color/lru-cache": ["lru-cache@10.4.3", "", {}, "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ=="], "@authenio/xml-encryption/xpath": ["xpath@0.0.32", "", {}, "sha512-rxMJhSIoiO8vXcWvSifKqhvV96GjiD5wYb8/QHdoRyQvraTpp4IEv944nhGausZZ3u7dhQXteZuZbaqfpB7uYw=="], @@ -4839,6 +4850,8 @@ "jaeger-client/uuid": ["uuid@8.3.2", "", { "bin": { "uuid": "dist/bin/uuid" } }, "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg=="], + "json-schema-to-typescript/js-yaml": ["js-yaml@4.1.1", "", { "dependencies": { "argparse": "^2.0.1" }, "bin": { "js-yaml": "bin/js-yaml.js" } }, "sha512-qQKT4zQxXl8lLwBtHMWwaTcGfFOZviOJet3Oy/xmGk2gZH677CJM9EvtfdSkgWcATZhj/55JZ0rmy3myCT5lsA=="], + "katex/commander": ["commander@8.3.0", "", {}, "sha512-OkTL9umf+He2DZkUq8f8J9of7yL6RJKI24dVITBmNfZBmri9zYZQrKkuXiKhyfPSu8tUhnVBB1iKXevvnlR4Ww=="], "langsmith/chalk": ["chalk@4.1.2", "", { "dependencies": { "ansi-styles": "^4.1.0", "supports-color": "^7.1.0" } }, "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA=="], diff --git a/package.json b/package.json index 74ceff2be6..1644b48c5f 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ "@octokit/rest": "^21.0.0", "glob": "13.0.0", "husky": "9.1.7", + "json-schema-to-typescript": "15.0.4", "lint-staged": "16.0.0", "turbo": "2.9.3" },