fix(subagent, streaming) fix deploy subagent and task streamnig (#3610)

* Fix deploy subagent

* fix(stream) handle task switching   (#3609)

* Fix task switching causing stream to abort

* Process all task streams all the time

* Process task streams that are in the background

---------

Co-authored-by: Theodore Li <theo@sim.ai>

* Always return isDeployed for undeploy chat

* Fix lint

---------

Co-authored-by: Siddharth Ganesan <siddharthganesan@gmail.com>
Co-authored-by: Theodore Li <theo@sim.ai>
This commit is contained in:
Theodore Li
2026-03-16 15:13:18 -07:00
committed by GitHub
parent 395a61d1b6
commit b0870f4afa
6 changed files with 191 additions and 33 deletions

View File

@@ -19,6 +19,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { validateOAuthAccessToken } from '@/lib/auth/oauth-token'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { ORCHESTRATION_TIMEOUT_MS, SIM_AGENT_API_URL } from '@/lib/copilot/constants'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import { orchestrateSubagentStream } from '@/lib/copilot/orchestrator/subagent'
import {
executeToolServerSide,
@@ -28,6 +29,10 @@ import { DIRECT_TOOL_DEFS, SUBAGENT_TOOL_DEFS } from '@/lib/copilot/tools/mcp/de
import { env } from '@/lib/core/config/env'
import { RateLimiter } from '@/lib/core/rate-limiter'
import { getBaseUrl } from '@/lib/core/utils/urls'
import {
authorizeWorkflowByWorkspacePermission,
resolveWorkflowIdForUser,
} from '@/lib/workflows/utils'
const logger = createLogger('CopilotMcpAPI')
const mcpRateLimiter = new RateLimiter()
@@ -660,12 +665,110 @@ async function handleDirectToolCall(
}
}
/**
* Build mode uses the main chat orchestrator with the 'fast' command instead of
* the subagent endpoint. In Go, 'build' is not a registered subagent — it's a mode
* (ModeFast) on the main chat processor that bypasses subagent orchestration and
* executes all tools directly.
*/
async function handleBuildToolCall(
args: Record<string, unknown>,
userId: string,
abortSignal?: AbortSignal
): Promise<CallToolResult> {
try {
const requestText = (args.request as string) || JSON.stringify(args)
const workflowId = args.workflowId as string | undefined
const resolved = workflowId
? await (async () => {
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId,
userId,
action: 'read',
})
return authorization.allowed ? { workflowId } : null
})()
: await resolveWorkflowIdForUser(userId)
if (!resolved?.workflowId) {
return {
content: [
{
type: 'text',
text: JSON.stringify(
{
success: false,
error: 'workflowId is required for build. Call create_workflow first.',
},
null,
2
),
},
],
isError: true,
}
}
const chatId = randomUUID()
const requestPayload = {
message: requestText,
workflowId: resolved.workflowId,
userId,
model: DEFAULT_COPILOT_MODEL,
mode: 'agent',
commands: ['fast'],
messageId: randomUUID(),
chatId,
}
const result = await orchestrateCopilotStream(requestPayload, {
userId,
workflowId: resolved.workflowId,
chatId,
goRoute: '/api/mcp',
autoExecuteTools: true,
timeout: 300000,
interactive: false,
abortSignal,
})
const responseData = {
success: result.success,
content: result.content,
toolCalls: result.toolCalls,
error: result.error,
}
return {
content: [{ type: 'text', text: JSON.stringify(responseData, null, 2) }],
isError: !result.success,
}
} catch (error) {
logger.error('Build tool call failed', { error })
return {
content: [
{
type: 'text',
text: `Build failed: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
}
}
}
async function handleSubagentToolCall(
toolDef: (typeof SUBAGENT_TOOL_DEFS)[number],
args: Record<string, unknown>,
userId: string,
abortSignal?: AbortSignal
): Promise<CallToolResult> {
if (toolDef.agentId === 'build') {
return handleBuildToolCall(args, userId, abortSignal)
}
try {
const requestText =
(args.request as string) ||

View File

@@ -12,6 +12,7 @@ import { VFS_DIR_TO_RESOURCE } from '@/lib/copilot/resource-types'
import { isWorkflowToolName } from '@/lib/copilot/workflow-tools'
import { getNextWorkflowColor } from '@/lib/workflows/colors'
import { invalidateResourceQueries } from '@/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry'
import { deploymentKeys } from '@/hooks/queries/deployments'
import {
type TaskChatHistory,
type TaskStoredContentBlock,
@@ -22,6 +23,7 @@ import {
useChatHistory,
} from '@/hooks/queries/tasks'
import { getTopInsertionSortOrder } from '@/hooks/queries/utils/top-insertion-sort-order'
import { workflowKeys } from '@/hooks/queries/workflows'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { useExecutionStore } from '@/stores/execution/store'
import { useFolderStore } from '@/stores/folders/store'
@@ -74,6 +76,8 @@ const STATE_TO_STATUS: Record<string, ToolCallStatus> = {
skipped: 'success',
} as const
const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy'])
function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock {
const mapped: ContentBlock = {
type: block.type as ContentBlockType,
@@ -361,6 +365,15 @@ export function useChat(
useEffect(() => {
if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return
const activeStreamId = chatHistory.activeStreamId
const snapshot = chatHistory.streamSnapshot
if (activeStreamId && !snapshot && !sendingRef.current) {
queryClient.invalidateQueries({ queryKey: taskKeys.detail(chatHistory.id) })
return
}
appliedChatIdRef.current = chatHistory.id
setMessages(chatHistory.messages.map(mapStoredMessage))
@@ -374,11 +387,6 @@ export function useChat(
}
}
// Kick off stream reconnection immediately if there's an active stream.
// The stream snapshot was fetched in parallel with the chat history (same
// API call), so there's no extra round-trip.
const activeStreamId = chatHistory.activeStreamId
const snapshot = chatHistory.streamSnapshot
if (activeStreamId && !sendingRef.current) {
const gen = ++streamGenRef.current
const abortController = new AbortController()
@@ -396,8 +404,7 @@ export function useChat(
const batchEvents = snapshot?.events ?? []
const streamStatus = snapshot?.status ?? ''
if (!snapshot || (batchEvents.length === 0 && streamStatus === 'unknown')) {
// No snapshot available — stream buffer expired. Clean up.
if (batchEvents.length === 0 && streamStatus === 'unknown') {
const cid = chatIdRef.current
if (cid) {
fetch('/api/mothership/chat/stop', {
@@ -462,7 +469,7 @@ export function useChat(
}
reconnect()
}
}, [chatHistory, workspaceId])
}, [chatHistory, workspaceId, queryClient])
useEffect(() => {
if (resources.length === 0) {
@@ -686,6 +693,33 @@ export function useChat(
onResourceEventRef.current?.()
}
}
if (DEPLOY_TOOL_NAMES.has(tc.name) && tc.status === 'success') {
const output = tc.result?.output as Record<string, unknown> | undefined
const deployedWorkflowId = (output?.workflowId as string) ?? undefined
if (deployedWorkflowId && typeof output?.isDeployed === 'boolean') {
const isDeployed = output.isDeployed as boolean
const serverDeployedAt = output.deployedAt
? new Date(output.deployedAt as string)
: undefined
useWorkflowRegistry
.getState()
.setDeploymentStatus(
deployedWorkflowId,
isDeployed,
isDeployed ? (serverDeployedAt ?? new Date()) : undefined
)
queryClient.invalidateQueries({
queryKey: deploymentKeys.info(deployedWorkflowId),
})
queryClient.invalidateQueries({
queryKey: deploymentKeys.versions(deployedWorkflowId),
})
queryClient.invalidateQueries({
queryKey: workflowKeys.list(workspaceId),
})
}
}
}
break
@@ -1116,11 +1150,6 @@ export function useChat(
useEffect(() => {
return () => {
streamGenRef.current++
// Only drop the browser→Sim read; the Sim→Go stream stays open
// so the backend can finish persisting. Explicit abort is only
// triggered by the stop button via /api/copilot/chat/abort.
abortControllerRef.current?.abort()
abortControllerRef.current = null
sendingRef.current = false
}
}, [])

View File

@@ -567,7 +567,6 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
}
// Deploy tools: update deployment status in workflow registry
if (
targetState === ClientToolCallState.success &&
(current.name === 'deploy_api' ||
@@ -579,21 +578,30 @@ export const sseHandlers: Record<string, SSEHandler> = {
const resultPayload = asRecord(
data?.result || eventData.result || eventData.data || data?.data
)
const input = asRecord(current.params)
const workflowId =
(resultPayload?.workflowId as string) ||
(input?.workflowId as string) ||
useWorkflowRegistry.getState().activeWorkflowId
const isDeployed = resultPayload?.isDeployed !== false
if (workflowId) {
useWorkflowRegistry
.getState()
.setDeploymentStatus(workflowId, isDeployed, isDeployed ? new Date() : undefined)
logger.info('[SSE] Updated deployment status from tool result', {
toolName: current.name,
workflowId,
isDeployed,
})
if (typeof resultPayload?.isDeployed === 'boolean') {
const input = asRecord(current.params)
const workflowId =
(resultPayload?.workflowId as string) ||
(input?.workflowId as string) ||
useWorkflowRegistry.getState().activeWorkflowId
const isDeployed = resultPayload.isDeployed as boolean
const serverDeployedAt = resultPayload.deployedAt
? new Date(resultPayload.deployedAt as string)
: undefined
if (workflowId) {
useWorkflowRegistry
.getState()
.setDeploymentStatus(
workflowId,
isDeployed,
isDeployed ? (serverDeployedAt ?? new Date()) : undefined
)
logger.info('[SSE] Updated deployment status from tool result', {
toolName: current.name,
workflowId,
isDeployed,
})
}
}
} catch (err) {
logger.warn('[SSE] Failed to hydrate deployment status', {

View File

@@ -455,6 +455,7 @@ export async function executeToolAndReport(
logger.info('Tool execution succeeded', {
toolCallId: toolCall.id,
toolName: toolCall.name,
output: result.output,
})
} else {
logger.warn('Tool execution failed', {

View File

@@ -87,7 +87,16 @@ export async function executeDeployChat(
return { success: false, error: 'Unauthorized chat access' }
}
await db.delete(chat).where(eq(chat.id, existing[0].id))
return { success: true, output: { success: true, action: 'undeploy', isDeployed: false } }
return {
success: true,
output: {
workflowId,
success: true,
action: 'undeploy',
isDeployed: true,
isChatDeployed: false,
},
}
}
const { hasAccess } = await checkWorkflowAccessForChatCreation(workflowId, context.userId)
@@ -199,9 +208,11 @@ export async function executeDeployChat(
return {
success: true,
output: {
workflowId,
success: true,
action: 'deploy',
isDeployed: true,
isChatDeployed: true,
identifier,
chatUrl: `${baseUrl}/chat/${identifier}`,
apiEndpoint: `${baseUrl}/api/workflows/${workflowId}/run`,
@@ -252,6 +263,8 @@ export async function executeDeployMcp(
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
// Intentionally omits `isDeployed` — removing from an MCP server does not
// affect the workflow's API deployment.
return {
success: true,
output: { workflowId, serverId, action: 'undeploy', removed: true },
@@ -335,9 +348,12 @@ export async function executeDeployMcp(
}
}
export async function executeRedeploy(context: ExecutionContext): Promise<ToolCallResult> {
export async function executeRedeploy(
params: { workflowId?: string },
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = context.workflowId
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
@@ -352,6 +368,7 @@ export async function executeRedeploy(context: ExecutionContext): Promise<ToolCa
success: true,
output: {
workflowId,
isDeployed: true,
deployedAt: result.deployedAt || null,
version: result.version,
apiEndpoint: `${baseUrl}/api/workflows/${workflowId}/run`,

View File

@@ -864,7 +864,7 @@ const SIM_WORKFLOW_TOOL_HANDLERS: Record<
deploy_api: (p, c) => executeDeployApi(p as DeployApiParams, c),
deploy_chat: (p, c) => executeDeployChat(p as DeployChatParams, c),
deploy_mcp: (p, c) => executeDeployMcp(p as DeployMcpParams, c),
redeploy: (_p, c) => executeRedeploy(c),
redeploy: (p, c) => executeRedeploy(p as { workflowId?: string }, c),
check_deployment_status: (p, c) =>
executeCheckDeploymentStatus(p as CheckDeploymentStatusParams, c),
list_workspace_mcp_servers: (p, c) =>