Checkpoint

This commit is contained in:
Siddharth Ganesan
2026-02-12 11:51:34 -08:00
parent f733b8dd88
commit 433552019e
7 changed files with 355 additions and 455 deletions

View File

@@ -2,17 +2,19 @@ import { createLogger } from '@sim/logger'
import { STREAM_STORAGE_KEY } from '@/lib/copilot/constants'
import { asRecord } from '@/lib/copilot/orchestrator/sse-utils'
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
import {
humanizedFallback,
isBackgroundState,
isRejectedState,
isReviewState,
resolveToolDisplay,
} from '@/lib/copilot/store-utils'
import { isBackgroundState, isRejectedState, isReviewState } from '@/lib/copilot/store-utils'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import type { CopilotStore, CopilotStreamInfo, CopilotToolCall } from '@/stores/panel/copilot/types'
import { appendTextBlock, beginThinkingBlock, finalizeThinkingBlock } from './content-blocks'
import { CLIENT_EXECUTABLE_RUN_TOOLS, executeRunToolOnClient } from './run-tool-execution'
import {
extractOperationListFromResultPayload,
extractToolExecutionMetadata,
extractToolUiMetadata,
isWorkflowChangeApplyCall,
mapServerStateToClientState,
resolveDisplayFromServerUi,
} from './tool-call-helpers'
import { applyToolEffects } from './tool-effects'
import type { ClientContentBlock, ClientStreamingContext } from './types'
@@ -30,131 +32,6 @@ function isClientRunCapability(toolCall: CopilotToolCall): boolean {
return CLIENT_EXECUTABLE_RUN_TOOLS.has(toolCall.name)
}
function mapServerStateToClientState(state: unknown): ClientToolCallState {
switch (String(state || '')) {
case 'generating':
return ClientToolCallState.generating
case 'pending':
case 'awaiting_approval':
return ClientToolCallState.pending
case 'executing':
return ClientToolCallState.executing
case 'success':
return ClientToolCallState.success
case 'rejected':
case 'skipped':
return ClientToolCallState.rejected
case 'aborted':
return ClientToolCallState.aborted
case 'error':
case 'failed':
return ClientToolCallState.error
default:
return ClientToolCallState.pending
}
}
function extractToolUiMetadata(data: Record<string, unknown>): CopilotToolCall['ui'] | undefined {
const ui = asRecord(data.ui)
if (!ui || Object.keys(ui).length === 0) return undefined
const autoAllowedFromUi = ui.autoAllowed === true
const autoAllowedFromData = data.autoAllowed === true
return {
title: typeof ui.title === 'string' ? ui.title : undefined,
phaseLabel: typeof ui.phaseLabel === 'string' ? ui.phaseLabel : undefined,
icon: typeof ui.icon === 'string' ? ui.icon : undefined,
showInterrupt: ui.showInterrupt === true,
showRemember: ui.showRemember === true,
autoAllowed: autoAllowedFromUi || autoAllowedFromData,
actions: Array.isArray(ui.actions)
? ui.actions
.map((action) => {
const a = asRecord(action)
const id = typeof a.id === 'string' ? a.id : undefined
const label = typeof a.label === 'string' ? a.label : undefined
const kind: 'accept' | 'reject' = a.kind === 'reject' ? 'reject' : 'accept'
if (!id || !label) return null
return {
id,
label,
kind,
remember: a.remember === true,
}
})
.filter((a): a is NonNullable<typeof a> => !!a)
: undefined,
}
}
function extractToolExecutionMetadata(
data: Record<string, unknown>
): CopilotToolCall['execution'] | undefined {
const execution = asRecord(data.execution)
if (!execution || Object.keys(execution).length === 0) return undefined
return {
target: typeof execution.target === 'string' ? execution.target : undefined,
capabilityId: typeof execution.capabilityId === 'string' ? execution.capabilityId : undefined,
}
}
function displayVerb(state: ClientToolCallState): string {
switch (state) {
case ClientToolCallState.success:
return 'Completed'
case ClientToolCallState.error:
return 'Failed'
case ClientToolCallState.rejected:
return 'Skipped'
case ClientToolCallState.aborted:
return 'Aborted'
case ClientToolCallState.generating:
return 'Preparing'
case ClientToolCallState.pending:
return 'Waiting'
default:
return 'Running'
}
}
function resolveDisplayFromServerUi(
toolName: string,
state: ClientToolCallState,
toolCallId: string,
params: Record<string, unknown> | undefined,
ui?: CopilotToolCall['ui']
) {
const fallback =
resolveToolDisplay(toolName, state, toolCallId, params) ||
humanizedFallback(toolName, state)
if (!fallback) return undefined
if (ui?.phaseLabel) {
return { text: ui.phaseLabel, icon: fallback.icon }
}
if (ui?.title) {
return { text: `${displayVerb(state)} ${ui.title}`, icon: fallback.icon }
}
return fallback
}
function isWorkflowChangeApplyCall(toolName?: string, params?: Record<string, unknown>): boolean {
if (toolName !== 'workflow_change') return false
const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
return typeof params?.proposalId === 'string' && params.proposalId.length > 0
}
function extractOperationListFromResultPayload(
resultPayload: Record<string, unknown>
): Array<Record<string, unknown>> | undefined {
const operations = resultPayload.operations
if (Array.isArray(operations)) return operations as Array<Record<string, unknown>>
const compiled = resultPayload.compiledOperations
if (Array.isArray(compiled)) return compiled as Array<Record<string, unknown>>
return undefined
}
function writeActiveStreamToStorage(info: CopilotStreamInfo | null): void {
if (typeof window === 'undefined') return
try {

View File

@@ -6,7 +6,6 @@ import {
shouldSkipToolResultEvent,
} from '@/lib/copilot/orchestrator/sse-utils'
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
import { humanizedFallback, resolveToolDisplay } from '@/lib/copilot/store-utils'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types'
import {
@@ -15,6 +14,14 @@ import {
updateStreamingMessage,
} from './handlers'
import { CLIENT_EXECUTABLE_RUN_TOOLS, executeRunToolOnClient } from './run-tool-execution'
import {
extractOperationListFromResultPayload,
extractToolExecutionMetadata,
extractToolUiMetadata,
isWorkflowChangeApplyCall,
mapServerStateToClientState,
resolveDisplayFromServerUi,
} from './tool-call-helpers'
import { applyToolEffects } from './tool-effects'
import type { ClientStreamingContext } from './types'
@@ -24,112 +31,6 @@ type StoreSet = (
partial: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)
) => void
function mapServerStateToClientState(state: unknown): ClientToolCallState {
switch (String(state || '')) {
case 'generating':
return ClientToolCallState.generating
case 'pending':
case 'awaiting_approval':
return ClientToolCallState.pending
case 'executing':
return ClientToolCallState.executing
case 'success':
return ClientToolCallState.success
case 'rejected':
case 'skipped':
return ClientToolCallState.rejected
case 'aborted':
return ClientToolCallState.aborted
case 'error':
case 'failed':
return ClientToolCallState.error
default:
return ClientToolCallState.pending
}
}
function extractToolUiMetadata(data: Record<string, unknown>): CopilotToolCall['ui'] | undefined {
const ui = asRecord(data.ui)
if (!ui || Object.keys(ui).length === 0) return undefined
const autoAllowedFromUi = ui.autoAllowed === true
const autoAllowedFromData = data.autoAllowed === true
return {
title: typeof ui.title === 'string' ? ui.title : undefined,
phaseLabel: typeof ui.phaseLabel === 'string' ? ui.phaseLabel : undefined,
icon: typeof ui.icon === 'string' ? ui.icon : undefined,
showInterrupt: ui.showInterrupt === true,
showRemember: ui.showRemember === true,
autoAllowed: autoAllowedFromUi || autoAllowedFromData,
actions: Array.isArray(ui.actions)
? ui.actions
.map((action) => {
const a = asRecord(action)
const id = typeof a.id === 'string' ? a.id : undefined
const label = typeof a.label === 'string' ? a.label : undefined
const kind: 'accept' | 'reject' = a.kind === 'reject' ? 'reject' : 'accept'
if (!id || !label) return null
return {
id,
label,
kind,
remember: a.remember === true,
}
})
.filter((a): a is NonNullable<typeof a> => !!a)
: undefined,
}
}
function extractToolExecutionMetadata(
data: Record<string, unknown>
): CopilotToolCall['execution'] | undefined {
const execution = asRecord(data.execution)
if (!execution || Object.keys(execution).length === 0) return undefined
return {
target: typeof execution.target === 'string' ? execution.target : undefined,
capabilityId: typeof execution.capabilityId === 'string' ? execution.capabilityId : undefined,
}
}
function displayVerb(state: ClientToolCallState): string {
switch (state) {
case ClientToolCallState.success:
return 'Completed'
case ClientToolCallState.error:
return 'Failed'
case ClientToolCallState.rejected:
return 'Skipped'
case ClientToolCallState.aborted:
return 'Aborted'
case ClientToolCallState.generating:
return 'Preparing'
case ClientToolCallState.pending:
return 'Waiting'
default:
return 'Running'
}
}
function resolveDisplayFromServerUi(
toolName: string,
state: ClientToolCallState,
toolCallId: string,
params: Record<string, unknown> | undefined,
ui?: CopilotToolCall['ui']
) {
const fallback =
resolveToolDisplay(toolName, state, toolCallId, params) ||
humanizedFallback(toolName, state)
if (!fallback) return undefined
if (ui?.phaseLabel) {
return { text: ui.phaseLabel, icon: fallback.icon }
}
if (ui?.title) {
return { text: `${displayVerb(state)} ${ui.title}`, icon: fallback.icon }
}
return fallback
}
function isClientRunCapability(toolCall: CopilotToolCall): boolean {
if (toolCall.execution?.target === 'sim_client_capability') {
return toolCall.execution.capabilityId === 'workflow.run' || !toolCall.execution.capabilityId
@@ -137,26 +38,6 @@ function isClientRunCapability(toolCall: CopilotToolCall): boolean {
return CLIENT_EXECUTABLE_RUN_TOOLS.has(toolCall.name)
}
function isWorkflowChangeApplyCall(toolCall: CopilotToolCall): boolean {
if (toolCall.name !== 'workflow_change') return false
const params = (toolCall.params || {}) as Record<string, unknown>
const mode = typeof params.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
return typeof params.proposalId === 'string' && params.proposalId.length > 0
}
function extractOperationListFromResultPayload(
resultPayload: Record<string, unknown>
): Array<Record<string, unknown>> | undefined {
const operations = resultPayload.operations
if (Array.isArray(operations)) return operations as Array<Record<string, unknown>>
const compiled = resultPayload.compiledOperations
if (Array.isArray(compiled)) return compiled as Array<Record<string, unknown>>
return undefined
}
export function appendSubAgentContent(
context: ClientStreamingContext,
parentToolCallId: string,
@@ -428,7 +309,7 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
)
if (
targetState === ClientToolCallState.success &&
isWorkflowChangeApplyCall(existing) &&
isWorkflowChangeApplyCall(existing.name, existing.params as Record<string, unknown>) &&
resultPayload
) {
const operations = extractOperationListFromResultPayload(resultPayload)

View File

@@ -0,0 +1,134 @@
import { asRecord } from '@/lib/copilot/orchestrator/sse-utils'
import { humanizedFallback, resolveToolDisplay } from '@/lib/copilot/store-utils'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import type { CopilotToolCall } from '@/stores/panel/copilot/types'
export function mapServerStateToClientState(state: unknown): ClientToolCallState {
switch (String(state || '')) {
case 'generating':
return ClientToolCallState.generating
case 'pending':
case 'awaiting_approval':
return ClientToolCallState.pending
case 'executing':
return ClientToolCallState.executing
case 'success':
return ClientToolCallState.success
case 'rejected':
case 'skipped':
return ClientToolCallState.rejected
case 'aborted':
return ClientToolCallState.aborted
case 'error':
case 'failed':
return ClientToolCallState.error
default:
return ClientToolCallState.pending
}
}
export function extractToolUiMetadata(
data: Record<string, unknown>
): CopilotToolCall['ui'] | undefined {
const ui = asRecord(data.ui)
if (!ui || Object.keys(ui).length === 0) return undefined
const autoAllowedFromUi = ui.autoAllowed === true
const autoAllowedFromData = data.autoAllowed === true
return {
title: typeof ui.title === 'string' ? ui.title : undefined,
phaseLabel: typeof ui.phaseLabel === 'string' ? ui.phaseLabel : undefined,
icon: typeof ui.icon === 'string' ? ui.icon : undefined,
showInterrupt: ui.showInterrupt === true,
showRemember: ui.showRemember === true,
autoAllowed: autoAllowedFromUi || autoAllowedFromData,
actions: Array.isArray(ui.actions)
? ui.actions
.map((action) => {
const a = asRecord(action)
const id = typeof a.id === 'string' ? a.id : undefined
const label = typeof a.label === 'string' ? a.label : undefined
const kind: 'accept' | 'reject' = a.kind === 'reject' ? 'reject' : 'accept'
if (!id || !label) return null
return {
id,
label,
kind,
remember: a.remember === true,
}
})
.filter((a): a is NonNullable<typeof a> => !!a)
: undefined,
}
}
export function extractToolExecutionMetadata(
data: Record<string, unknown>
): CopilotToolCall['execution'] | undefined {
const execution = asRecord(data.execution)
if (!execution || Object.keys(execution).length === 0) return undefined
return {
target: typeof execution.target === 'string' ? execution.target : undefined,
capabilityId: typeof execution.capabilityId === 'string' ? execution.capabilityId : undefined,
}
}
function displayVerb(state: ClientToolCallState): string {
switch (state) {
case ClientToolCallState.success:
return 'Completed'
case ClientToolCallState.error:
return 'Failed'
case ClientToolCallState.rejected:
return 'Skipped'
case ClientToolCallState.aborted:
return 'Aborted'
case ClientToolCallState.generating:
return 'Preparing'
case ClientToolCallState.pending:
return 'Waiting'
default:
return 'Running'
}
}
export function resolveDisplayFromServerUi(
toolName: string,
state: ClientToolCallState,
toolCallId: string,
params: Record<string, unknown> | undefined,
ui?: CopilotToolCall['ui']
) {
const fallback =
resolveToolDisplay(toolName, state, toolCallId, params) ||
humanizedFallback(toolName, state)
if (!fallback) return undefined
if (ui?.phaseLabel) {
return { text: ui.phaseLabel, icon: fallback.icon }
}
if (ui?.title) {
return { text: `${displayVerb(state)} ${ui.title}`, icon: fallback.icon }
}
return fallback
}
export function isWorkflowChangeApplyCall(
toolName?: string,
params?: Record<string, unknown>
): boolean {
if (toolName !== 'workflow_change') return false
const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
return typeof params?.proposalId === 'string' && params.proposalId.length > 0
}
export function extractOperationListFromResultPayload(
resultPayload: Record<string, unknown>
): Array<Record<string, unknown>> | undefined {
const operations = resultPayload.operations
if (Array.isArray(operations)) return operations as Array<Record<string, unknown>>
const compiled = resultPayload.compiledOperations
if (Array.isArray(compiled)) return compiled as Array<Record<string, unknown>>
return undefined
}

View File

@@ -139,6 +139,98 @@ async function waitForClientCapabilityAndReport(
markToolResultSeen(toolCall.id)
}
function markToolCallAndNotify(
toolCall: ToolCallState,
statusCode: number,
message: string,
data: Record<string, unknown> | undefined,
logScope: string
): void {
markToolComplete(toolCall.id, toolCall.name, statusCode, message, data).catch((err) => {
logger.error(`markToolComplete fire-and-forget failed (${logScope})`, {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
})
markToolResultSeen(toolCall.id)
}
async function executeToolCallWithPolicy(
toolCall: ToolCallState,
toolName: string,
toolData: Record<string, unknown>,
context: StreamingContext,
execContext: ExecutionContext,
options: OrchestratorOptions,
logScope: string
): Promise<void> {
const execution = getExecutionTarget(toolData, toolName)
const isInteractive = options.interactive === true
const requiresApproval = isInteractive && needsApproval(toolData)
if (toolData.state) {
toolCall.status = mapServerStateToToolStatus(toolData.state)
}
if (requiresApproval) {
const decision = await waitForToolDecision(
toolCall.id,
options.timeout || STREAM_TIMEOUT_MS,
options.abortSignal
)
if (decision?.status === 'accepted' || decision?.status === 'success') {
// Continue below into normal execution path.
} else if (decision?.status === 'rejected' || decision?.status === 'error') {
toolCall.status = 'rejected'
toolCall.endTime = Date.now()
markToolCallAndNotify(
toolCall,
400,
decision.message || 'Tool execution rejected',
{ skipped: true, reason: 'user_rejected' },
`${logScope} rejected`
)
return
} else if (decision?.status === 'background') {
toolCall.status = 'skipped'
toolCall.endTime = Date.now()
markToolCallAndNotify(
toolCall,
202,
decision.message || 'Tool execution moved to background',
{ background: true },
`${logScope} background`
)
return
} else {
// Decision was null (timeout/abort).
toolCall.status = 'rejected'
toolCall.endTime = Date.now()
markToolCallAndNotify(
toolCall,
408,
'Tool approval timed out',
{ skipped: true, reason: 'timeout' },
`${logScope} timeout`
)
return
}
}
if (execution.target === 'sim_client_capability' && isInteractive) {
await waitForClientCapabilityAndReport(toolCall, options, logScope)
return
}
if (
(execution.target === 'sim_server' || execution.target === 'sim_client_capability') &&
options.autoExecuteTools !== false
) {
await executeToolAndReport(toolCall.id, context, execContext, options)
}
}
// Normalization + dedupe helpers live in sse-utils to keep server/client in sync.
function inferToolSuccess(data: Record<string, unknown> | undefined): {
@@ -272,101 +364,15 @@ export const sseHandlers: Record<string, SSEHandler> = {
const toolCall = context.toolCalls.get(toolCallId)
if (!toolCall) return
const execution = getExecutionTarget(toolData, toolName)
const isInteractive = options.interactive === true
const requiresApproval = isInteractive && needsApproval(toolData)
if (toolData.state) {
toolCall.status = mapServerStateToToolStatus(toolData.state)
}
if (requiresApproval) {
const decision = await waitForToolDecision(
toolCallId,
options.timeout || STREAM_TIMEOUT_MS,
options.abortSignal
)
if (decision?.status === 'accepted' || decision?.status === 'success') {
if (execution.target === 'sim_client_capability' && isInteractive) {
await waitForClientCapabilityAndReport(toolCall, options, 'run tool')
return
}
if (execution.target === 'sim_server' || execution.target === 'sim_client_capability') {
if (options.autoExecuteTools !== false) {
await executeToolAndReport(toolCallId, context, execContext, options)
}
}
return
}
if (decision?.status === 'rejected' || decision?.status === 'error') {
toolCall.status = 'rejected'
toolCall.endTime = Date.now()
// Fire-and-forget: must NOT await — see deadlock note in executeToolAndReport
markToolComplete(
toolCall.id,
toolCall.name,
400,
decision.message || 'Tool execution rejected',
{ skipped: true, reason: 'user_rejected' }
).catch((err) => {
logger.error('markToolComplete fire-and-forget failed (rejected)', {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
})
markToolResultSeen(toolCall.id)
return
}
if (decision?.status === 'background') {
toolCall.status = 'skipped'
toolCall.endTime = Date.now()
// Fire-and-forget: must NOT await — see deadlock note in executeToolAndReport
markToolComplete(
toolCall.id,
toolCall.name,
202,
decision.message || 'Tool execution moved to background',
{ background: true }
).catch((err) => {
logger.error('markToolComplete fire-and-forget failed (background)', {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
})
markToolResultSeen(toolCall.id)
return
}
// Decision was null — timed out or aborted.
// Do NOT fall through to auto-execute. Mark the tool as timed out
// and notify Go so it can unblock waitForExternalTool.
toolCall.status = 'rejected'
toolCall.endTime = Date.now()
markToolComplete(toolCall.id, toolCall.name, 408, 'Tool approval timed out', {
skipped: true,
reason: 'timeout',
}).catch((err) => {
logger.error('markToolComplete fire-and-forget failed (timeout)', {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
})
markToolResultSeen(toolCall.id)
return
}
if (execution.target === 'sim_client_capability' && isInteractive) {
await waitForClientCapabilityAndReport(toolCall, options, 'run tool')
return
}
if (
(execution.target === 'sim_server' || execution.target === 'sim_client_capability') &&
options.autoExecuteTools !== false
) {
await executeToolAndReport(toolCallId, context, execContext, options)
}
await executeToolCallWithPolicy(
toolCall,
toolName,
toolData,
context,
execContext,
options,
'run tool'
)
},
reasoning: (event, context) => {
const d = asRecord(event.data)
@@ -484,95 +490,15 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
if (isPartial) return
const execution = getExecutionTarget(toolData, toolName)
const isInteractive = options.interactive === true
const requiresApproval = isInteractive && needsApproval(toolData)
if (requiresApproval) {
const decision = await waitForToolDecision(
toolCallId,
options.timeout || STREAM_TIMEOUT_MS,
options.abortSignal
)
if (decision?.status === 'accepted' || decision?.status === 'success') {
if (execution.target === 'sim_client_capability' && isInteractive) {
await waitForClientCapabilityAndReport(toolCall, options, 'subagent run tool')
return
}
if (execution.target === 'sim_server' || execution.target === 'sim_client_capability') {
if (options.autoExecuteTools !== false) {
await executeToolAndReport(toolCallId, context, execContext, options)
}
}
return
}
if (decision?.status === 'rejected' || decision?.status === 'error') {
toolCall.status = 'rejected'
toolCall.endTime = Date.now()
// Fire-and-forget: must NOT await — see deadlock note in executeToolAndReport
markToolComplete(
toolCall.id,
toolCall.name,
400,
decision.message || 'Tool execution rejected',
{ skipped: true, reason: 'user_rejected' }
).catch((err) => {
logger.error('markToolComplete fire-and-forget failed (subagent rejected)', {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
})
markToolResultSeen(toolCall.id)
return
}
if (decision?.status === 'background') {
toolCall.status = 'skipped'
toolCall.endTime = Date.now()
// Fire-and-forget: must NOT await — see deadlock note in executeToolAndReport
markToolComplete(
toolCall.id,
toolCall.name,
202,
decision.message || 'Tool execution moved to background',
{ background: true }
).catch((err) => {
logger.error('markToolComplete fire-and-forget failed (subagent background)', {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
})
markToolResultSeen(toolCall.id)
return
}
// Decision was null — timed out or aborted.
// Do NOT fall through to auto-execute.
toolCall.status = 'rejected'
toolCall.endTime = Date.now()
markToolComplete(toolCall.id, toolCall.name, 408, 'Tool approval timed out', {
skipped: true,
reason: 'timeout',
}).catch((err) => {
logger.error('markToolComplete fire-and-forget failed (subagent timeout)', {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
})
markToolResultSeen(toolCall.id)
return
}
if (execution.target === 'sim_client_capability' && isInteractive) {
await waitForClientCapabilityAndReport(toolCall, options, 'subagent run tool')
return
}
if (
(execution.target === 'sim_server' || execution.target === 'sim_client_capability') &&
options.autoExecuteTools !== false
) {
await executeToolAndReport(toolCallId, context, execContext, options)
}
await executeToolCallWithPolicy(
toolCall,
toolName,
toolData,
context,
execContext,
options,
'subagent run tool'
)
},
tool_result: (event, context) => {
const parentToolCallId = context.subAgentParentToolCallId

View File

@@ -1,4 +1,6 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { getRedisClient } from '@/lib/core/config/redis'
type StoreEntry<T> = {
value: T
@@ -7,6 +9,11 @@ type StoreEntry<T> = {
const DEFAULT_TTL_MS = 30 * 60 * 1000
const MAX_ENTRIES = 500
const DEFAULT_TTL_SECONDS = Math.floor(DEFAULT_TTL_MS / 1000)
const CONTEXT_PREFIX = 'copilot:workflow_change:context'
const PROPOSAL_PREFIX = 'copilot:workflow_change:proposal'
const logger = createLogger('WorkflowChangeStore')
class TTLStore<T> {
private readonly data = new Map<string, StoreEntry<T>>()
@@ -89,18 +96,90 @@ export type WorkflowChangeProposal = {
const contextPackStore = new TTLStore<WorkflowContextPack>()
const proposalStore = new TTLStore<WorkflowChangeProposal>()
export function saveContextPack(pack: WorkflowContextPack): string {
return contextPackStore.set(pack)
function getContextRedisKey(id: string): string {
return `${CONTEXT_PREFIX}:${id}`
}
export function getContextPack(id: string): WorkflowContextPack | null {
function getProposalRedisKey(id: string): string {
return `${PROPOSAL_PREFIX}:${id}`
}
async function writeRedisJson(key: string, value: unknown): Promise<void> {
const redis = getRedisClient()!
await redis.set(key, JSON.stringify(value), 'EX', DEFAULT_TTL_SECONDS)
}
async function readRedisJson<T>(key: string): Promise<T | null> {
const redis = getRedisClient()!
const raw = await redis.get(key)
if (!raw) {
return null
}
try {
return JSON.parse(raw) as T
} catch (error) {
logger.warn('Failed parsing workflow change store JSON payload', { key, error })
await redis.del(key).catch(() => {})
return null
}
}
export async function saveContextPack(pack: WorkflowContextPack): Promise<string> {
if (!getRedisClient()) {
return contextPackStore.set(pack)
}
const id = crypto.randomUUID()
try {
await writeRedisJson(getContextRedisKey(id), pack)
return id
} catch (error) {
logger.warn('Redis write failed for workflow context pack, using memory fallback', { error })
return contextPackStore.set(pack)
}
}
export async function getContextPack(id: string): Promise<WorkflowContextPack | null> {
if (!getRedisClient()) {
return contextPackStore.get(id)
}
try {
const redisPayload = await readRedisJson<WorkflowContextPack>(getContextRedisKey(id))
if (redisPayload) {
return redisPayload
}
} catch (error) {
logger.warn('Redis read failed for workflow context pack, using memory fallback', { error })
}
return contextPackStore.get(id)
}
export function saveProposal(proposal: WorkflowChangeProposal): string {
return proposalStore.set(proposal)
export async function saveProposal(proposal: WorkflowChangeProposal): Promise<string> {
if (!getRedisClient()) {
return proposalStore.set(proposal)
}
const id = crypto.randomUUID()
try {
await writeRedisJson(getProposalRedisKey(id), proposal)
return id
} catch (error) {
logger.warn('Redis write failed for workflow proposal, using memory fallback', { error })
return proposalStore.set(proposal)
}
}
export function getProposal(id: string): WorkflowChangeProposal | null {
export async function getProposal(id: string): Promise<WorkflowChangeProposal | null> {
if (!getRedisClient()) {
return proposalStore.get(id)
}
try {
const redisPayload = await readRedisJson<WorkflowChangeProposal>(getProposalRedisKey(id))
if (redisPayload) {
return redisPayload
}
} catch (error) {
logger.warn('Redis read failed for workflow proposal, using memory fallback', { error })
}
return proposalStore.get(id)
}

View File

@@ -1162,7 +1162,10 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
}
if (params.mode === 'dry_run') {
const workflowId = params.workflowId || getContextPack(params.contextPackId || '')?.workflowId
const contextPack = params.contextPackId
? await getContextPack(params.contextPackId)
: null
const workflowId = params.workflowId || contextPack?.workflowId
if (!workflowId) {
throw new Error('workflowId is required for dry_run')
}
@@ -1231,7 +1234,7 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
unresolvedRisks: params.changeSpec.unresolvedRisks,
},
}
const proposalId = saveProposal(proposal)
const proposalId = await saveProposal(proposal)
logger.info('Compiled workflow_change dry run', {
workflowId,
@@ -1265,7 +1268,7 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
throw new Error('proposalId is required for apply')
}
const proposal = getProposal(proposalId)
const proposal = await getProposal(proposalId)
if (!proposal) {
throw new Error(`Proposal not found or expired: ${proposalId}`)
}

View File

@@ -84,7 +84,7 @@ export const workflowContextGetServerTool: BaseServerTool<WorkflowContextGetPara
const suggestedSchemaTypes = [...new Set(blockTypesInWorkflow.filter(Boolean))]
const summary = summarizeWorkflowState(workflowState)
const packId = saveContextPack({
const packId = await saveContextPack({
workflowId: params.workflowId,
snapshotHash,
workflowState,
@@ -128,7 +128,7 @@ export const workflowContextExpandServerTool: BaseServerTool<WorkflowContextExpa
throw new Error('Unauthorized workflow access')
}
const contextPack = getContextPack(params.contextPackId)
const contextPack = await getContextPack(params.contextPackId)
if (!contextPack) {
throw new Error(`Context pack not found or expired: ${params.contextPackId}`)
}