mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
fix(execution): scope X-Sim-Via header to internal routes and enforce depth limit (#3313)
* feat(execution): workflow cycle detection via X-Sim-Via header * fix(execution): scope X-Sim-Via header to internal routes and add child workflow depth validation - Move call chain header injection from HTTP tool layer (request.ts/utils.ts) to tool execution layer (tools/index.ts) gated on isInternalRoute, preventing internal workflow IDs from leaking to external third-party APIs - Remove cycle detection from validateCallChain — depth limit alone prevents infinite loops while allowing legitimate self-recursion (pagination, tree processing, batch splitting) - Add validateCallChain check in workflow-handler.ts before spawning child executor, closing the gap where in-process child workflows skipped validation - Remove unsafe `(params as any)._context` type bypass in request.ts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(execution): validate child call chain instead of parent chain Validate childCallChain (after appending current workflow ID) rather than ctx.callChain (parent). Prevents an off-by-one where a chain at depth 10 could still spawn an 11th workflow. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -23,6 +23,7 @@ import { type AuthResult, checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { generateInternalToken } from '@/lib/auth/internal'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { SIM_VIA_HEADER } from '@/lib/execution/call-chain'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('WorkflowMcpServeAPI')
|
||||
@@ -181,7 +182,8 @@ export async function POST(request: NextRequest, { params }: { params: Promise<R
|
||||
serverId,
|
||||
rpcParams as { name: string; arguments?: Record<string, unknown> },
|
||||
executeAuthContext,
|
||||
server.isPublic ? server.createdBy : undefined
|
||||
server.isPublic ? server.createdBy : undefined,
|
||||
request.headers.get(SIM_VIA_HEADER)
|
||||
)
|
||||
|
||||
default:
|
||||
@@ -244,7 +246,8 @@ async function handleToolsCall(
|
||||
serverId: string,
|
||||
params: { name: string; arguments?: Record<string, unknown> } | undefined,
|
||||
executeAuthContext?: ExecuteAuthContext | null,
|
||||
publicServerOwnerId?: string
|
||||
publicServerOwnerId?: string,
|
||||
simViaHeader?: string | null
|
||||
): Promise<NextResponse> {
|
||||
try {
|
||||
if (!params?.name) {
|
||||
@@ -300,6 +303,10 @@ async function handleToolsCall(
|
||||
}
|
||||
}
|
||||
|
||||
if (simViaHeader) {
|
||||
headers[SIM_VIA_HEADER] = simViaHeader
|
||||
}
|
||||
|
||||
logger.info(`Executing workflow ${tool.workflowId} via MCP tool ${params.name}`)
|
||||
|
||||
const response = await fetch(executeUrl, {
|
||||
|
||||
@@ -12,6 +12,12 @@ import {
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import {
|
||||
buildNextCallChain,
|
||||
parseCallChain,
|
||||
SIM_VIA_HEADER,
|
||||
validateCallChain,
|
||||
} from '@/lib/execution/call-chain'
|
||||
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
|
||||
import { processInputFileFields } from '@/lib/execution/files'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
@@ -155,10 +161,11 @@ type AsyncExecutionParams = {
|
||||
input: any
|
||||
triggerType: CoreTriggerType
|
||||
executionId: string
|
||||
callChain?: string[]
|
||||
}
|
||||
|
||||
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
|
||||
const { requestId, workflowId, userId, input, triggerType, executionId } = params
|
||||
const { requestId, workflowId, userId, input, triggerType, executionId, callChain } = params
|
||||
|
||||
const payload: WorkflowExecutionPayload = {
|
||||
workflowId,
|
||||
@@ -166,6 +173,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
|
||||
input,
|
||||
triggerType,
|
||||
executionId,
|
||||
callChain,
|
||||
}
|
||||
|
||||
try {
|
||||
@@ -236,6 +244,14 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
const requestId = generateRequestId()
|
||||
const { id: workflowId } = await params
|
||||
|
||||
const incomingCallChain = parseCallChain(req.headers.get(SIM_VIA_HEADER))
|
||||
const callChainError = validateCallChain(incomingCallChain)
|
||||
if (callChainError) {
|
||||
logger.warn(`[${requestId}] Call chain rejected for workflow ${workflowId}: ${callChainError}`)
|
||||
return NextResponse.json({ error: callChainError }, { status: 409 })
|
||||
}
|
||||
const callChain = buildNextCallChain(incomingCallChain, workflowId)
|
||||
|
||||
try {
|
||||
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
|
||||
if (!auth.success || !auth.userId) {
|
||||
@@ -433,6 +449,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
input,
|
||||
triggerType: loggingTriggerType,
|
||||
executionId,
|
||||
callChain,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -539,6 +556,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
isClientSession,
|
||||
enforceCredentialAccess: useAuthenticatedUserAsActor,
|
||||
workflowStateOverride: effectiveWorkflowStateOverride,
|
||||
callChain,
|
||||
}
|
||||
|
||||
const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}
|
||||
@@ -909,6 +927,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
isClientSession,
|
||||
enforceCredentialAccess: useAuthenticatedUserAsActor,
|
||||
workflowStateOverride: effectiveWorkflowStateOverride,
|
||||
callChain,
|
||||
}
|
||||
|
||||
const sseExecutionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}
|
||||
|
||||
@@ -22,6 +22,7 @@ export type WorkflowExecutionPayload = {
|
||||
triggerType?: CoreTriggerType
|
||||
executionId?: string
|
||||
metadata?: Record<string, any>
|
||||
callChain?: string[]
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -95,6 +96,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
||||
useDraftState: false,
|
||||
startTime: new Date().toISOString(),
|
||||
isClientSession: false,
|
||||
callChain: payload.callChain,
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(
|
||||
|
||||
@@ -330,6 +330,7 @@ export class DAGExecutor {
|
||||
base64MaxBytes: this.contextExtensions.base64MaxBytes,
|
||||
runFromBlockContext: overrides?.runFromBlockContext,
|
||||
stopAfterBlockId: this.contextExtensions.stopAfterBlockId,
|
||||
callChain: this.contextExtensions.callChain,
|
||||
}
|
||||
|
||||
if (this.contextExtensions.resumeFromSnapshot) {
|
||||
|
||||
@@ -27,6 +27,7 @@ export interface ExecutionMetadata {
|
||||
parallels?: Record<string, any>
|
||||
deploymentVersionId?: string
|
||||
}
|
||||
callChain?: string[]
|
||||
}
|
||||
|
||||
export interface SerializableExecutionState {
|
||||
@@ -167,6 +168,12 @@ export interface ContextExtensions {
|
||||
* Stop execution after this block completes. Used for "run until block" feature.
|
||||
*/
|
||||
stopAfterBlockId?: string
|
||||
|
||||
/**
|
||||
* Ordered list of workflow IDs in the current call chain, used for cycle detection.
|
||||
* Each hop appends the current workflow ID before making outgoing requests.
|
||||
*/
|
||||
callChain?: string[]
|
||||
}
|
||||
|
||||
export interface WorkflowInput {
|
||||
|
||||
@@ -75,6 +75,7 @@ export class ApiBlockHandler implements BlockHandler {
|
||||
userId: ctx.userId,
|
||||
isDeployedContext: ctx.isDeployedContext,
|
||||
enforceCredentialAccess: ctx.enforceCredentialAccess,
|
||||
callChain: ctx.callChain,
|
||||
},
|
||||
},
|
||||
false,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { buildNextCallChain, validateCallChain } from '@/lib/execution/call-chain'
|
||||
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
import type { TraceSpan } from '@/lib/logs/types'
|
||||
@@ -167,6 +168,15 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
ctx.onChildWorkflowInstanceReady?.(effectiveBlockId, instanceId, iterationContext)
|
||||
}
|
||||
|
||||
const childCallChain = buildNextCallChain(ctx.callChain || [], workflowId)
|
||||
const depthError = validateCallChain(childCallChain)
|
||||
if (depthError) {
|
||||
throw new ChildWorkflowError({
|
||||
message: depthError,
|
||||
childWorkflowName,
|
||||
})
|
||||
}
|
||||
|
||||
const subExecutor = new Executor({
|
||||
workflow: childWorkflow.serializedState,
|
||||
workflowInput: childWorkflowInput,
|
||||
@@ -180,6 +190,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
userId: ctx.userId,
|
||||
executionId: ctx.executionId,
|
||||
abortSignal: ctx.abortSignal,
|
||||
callChain: childCallChain,
|
||||
...(shouldPropagateCallbacks && {
|
||||
onBlockStart: ctx.onBlockStart,
|
||||
onBlockComplete: ctx.onBlockComplete,
|
||||
|
||||
@@ -301,6 +301,12 @@ export interface ExecutionContext {
|
||||
*/
|
||||
stopAfterBlockId?: string
|
||||
|
||||
/**
|
||||
* Ordered list of workflow IDs in the current call chain, used for cycle detection.
|
||||
* Passed to outgoing HTTP requests via the X-Sim-Via header.
|
||||
*/
|
||||
callChain?: string[]
|
||||
|
||||
/**
|
||||
* Counter for generating monotonically increasing execution order values.
|
||||
* Starts at 0 and increments for each block. Use getNextExecutionOrder() to access.
|
||||
|
||||
130
apps/sim/lib/execution/__tests__/call-chain.test.ts
Normal file
130
apps/sim/lib/execution/__tests__/call-chain.test.ts
Normal file
@@ -0,0 +1,130 @@
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import {
|
||||
buildNextCallChain,
|
||||
MAX_CALL_CHAIN_DEPTH,
|
||||
parseCallChain,
|
||||
SIM_VIA_HEADER,
|
||||
serializeCallChain,
|
||||
validateCallChain,
|
||||
} from '@/lib/execution/call-chain'
|
||||
|
||||
describe('call-chain', () => {
|
||||
describe('SIM_VIA_HEADER', () => {
|
||||
it('has the expected header name', () => {
|
||||
expect(SIM_VIA_HEADER).toBe('X-Sim-Via')
|
||||
})
|
||||
})
|
||||
|
||||
describe('MAX_CALL_CHAIN_DEPTH', () => {
|
||||
it('equals 10', () => {
|
||||
expect(MAX_CALL_CHAIN_DEPTH).toBe(10)
|
||||
})
|
||||
})
|
||||
|
||||
describe('parseCallChain', () => {
|
||||
it('returns empty array for null', () => {
|
||||
expect(parseCallChain(null)).toEqual([])
|
||||
})
|
||||
|
||||
it('returns empty array for undefined', () => {
|
||||
expect(parseCallChain(undefined)).toEqual([])
|
||||
})
|
||||
|
||||
it('returns empty array for empty string', () => {
|
||||
expect(parseCallChain('')).toEqual([])
|
||||
})
|
||||
|
||||
it('returns empty array for whitespace-only string', () => {
|
||||
expect(parseCallChain(' ')).toEqual([])
|
||||
})
|
||||
|
||||
it('parses a single workflow ID', () => {
|
||||
expect(parseCallChain('wf-abc')).toEqual(['wf-abc'])
|
||||
})
|
||||
|
||||
it('parses multiple comma-separated workflow IDs', () => {
|
||||
expect(parseCallChain('wf-a,wf-b,wf-c')).toEqual(['wf-a', 'wf-b', 'wf-c'])
|
||||
})
|
||||
|
||||
it('trims whitespace around workflow IDs', () => {
|
||||
expect(parseCallChain(' wf-a , wf-b , wf-c ')).toEqual(['wf-a', 'wf-b', 'wf-c'])
|
||||
})
|
||||
|
||||
it('filters out empty segments', () => {
|
||||
expect(parseCallChain('wf-a,,wf-b')).toEqual(['wf-a', 'wf-b'])
|
||||
})
|
||||
})
|
||||
|
||||
describe('serializeCallChain', () => {
|
||||
it('serializes an empty array', () => {
|
||||
expect(serializeCallChain([])).toBe('')
|
||||
})
|
||||
|
||||
it('serializes a single ID', () => {
|
||||
expect(serializeCallChain(['wf-a'])).toBe('wf-a')
|
||||
})
|
||||
|
||||
it('serializes multiple IDs with commas', () => {
|
||||
expect(serializeCallChain(['wf-a', 'wf-b', 'wf-c'])).toBe('wf-a,wf-b,wf-c')
|
||||
})
|
||||
})
|
||||
|
||||
describe('validateCallChain', () => {
|
||||
it('returns null for an empty chain', () => {
|
||||
expect(validateCallChain([])).toBeNull()
|
||||
})
|
||||
|
||||
it('returns null when chain is under max depth', () => {
|
||||
expect(validateCallChain(['wf-a', 'wf-b'])).toBeNull()
|
||||
})
|
||||
|
||||
it('allows legitimate self-recursion', () => {
|
||||
expect(validateCallChain(['wf-a', 'wf-a', 'wf-a'])).toBeNull()
|
||||
})
|
||||
|
||||
it('returns depth error when chain is at max depth', () => {
|
||||
const chain = Array.from({ length: MAX_CALL_CHAIN_DEPTH }, (_, i) => `wf-${i}`)
|
||||
const error = validateCallChain(chain)
|
||||
expect(error).toContain(
|
||||
`Maximum workflow call chain depth (${MAX_CALL_CHAIN_DEPTH}) exceeded`
|
||||
)
|
||||
})
|
||||
|
||||
it('allows chain just under max depth', () => {
|
||||
const chain = Array.from({ length: MAX_CALL_CHAIN_DEPTH - 1 }, (_, i) => `wf-${i}`)
|
||||
expect(validateCallChain(chain)).toBeNull()
|
||||
})
|
||||
})
|
||||
|
||||
describe('buildNextCallChain', () => {
|
||||
it('appends workflow ID to empty chain', () => {
|
||||
expect(buildNextCallChain([], 'wf-a')).toEqual(['wf-a'])
|
||||
})
|
||||
|
||||
it('appends workflow ID to existing chain', () => {
|
||||
expect(buildNextCallChain(['wf-a', 'wf-b'], 'wf-c')).toEqual(['wf-a', 'wf-b', 'wf-c'])
|
||||
})
|
||||
|
||||
it('does not mutate the original chain', () => {
|
||||
const original = ['wf-a']
|
||||
const result = buildNextCallChain(original, 'wf-b')
|
||||
expect(original).toEqual(['wf-a'])
|
||||
expect(result).toEqual(['wf-a', 'wf-b'])
|
||||
})
|
||||
})
|
||||
|
||||
describe('round-trip', () => {
|
||||
it('parse → serialize is identity', () => {
|
||||
const header = 'wf-a,wf-b,wf-c'
|
||||
expect(serializeCallChain(parseCallChain(header))).toBe(header)
|
||||
})
|
||||
|
||||
it('serialize → parse is identity', () => {
|
||||
const chain = ['wf-a', 'wf-b', 'wf-c']
|
||||
expect(parseCallChain(serializeCallChain(chain))).toEqual(chain)
|
||||
})
|
||||
})
|
||||
})
|
||||
51
apps/sim/lib/execution/call-chain.ts
Normal file
51
apps/sim/lib/execution/call-chain.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
/**
|
||||
* Workflow call chain detection using the Via-style pattern.
|
||||
*
|
||||
* Prevents infinite execution loops when workflows call each other via API or
|
||||
* MCP endpoints. Each hop appends the current workflow ID to the `X-Sim-Via`
|
||||
* header; on ingress the chain is checked for depth.
|
||||
*/
|
||||
|
||||
export const SIM_VIA_HEADER = 'X-Sim-Via'
|
||||
export const MAX_CALL_CHAIN_DEPTH = 10
|
||||
|
||||
/**
|
||||
* Parses the `X-Sim-Via` header value into an ordered list of workflow IDs.
|
||||
* Returns an empty array when the header is absent or empty.
|
||||
*/
|
||||
export function parseCallChain(headerValue: string | null | undefined): string[] {
|
||||
if (!headerValue || !headerValue.trim()) {
|
||||
return []
|
||||
}
|
||||
return headerValue
|
||||
.split(',')
|
||||
.map((id) => id.trim())
|
||||
.filter(Boolean)
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes a call chain array back into the header value format.
|
||||
*/
|
||||
export function serializeCallChain(chain: string[]): string {
|
||||
return chain.join(',')
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that the call chain has not exceeded the maximum depth.
|
||||
* Returns an error message string if invalid, or `null` if the chain is
|
||||
* safe to extend.
|
||||
*/
|
||||
export function validateCallChain(chain: string[]): string | null {
|
||||
if (chain.length >= MAX_CALL_CHAIN_DEPTH) {
|
||||
return `Maximum workflow call chain depth (${MAX_CALL_CHAIN_DEPTH}) exceeded.`
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the next call chain by appending the current workflow ID.
|
||||
*/
|
||||
export function buildNextCallChain(chain: string[], workflowId: string): string[] {
|
||||
return [...chain, workflowId]
|
||||
}
|
||||
@@ -331,6 +331,7 @@ export async function executeWorkflowCore(
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId: resolvedStopAfterBlockId,
|
||||
onChildWorkflowInstanceReady,
|
||||
callChain: metadata.callChain,
|
||||
}
|
||||
|
||||
const executorInstance = new Executor({
|
||||
|
||||
@@ -7,6 +7,7 @@ import {
|
||||
} from '@/lib/core/security/input-validation.server'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { getBaseUrl, getInternalApiBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { SIM_VIA_HEADER, serializeCallChain } from '@/lib/execution/call-chain'
|
||||
import { parseMcpToolId } from '@/lib/mcp/utils'
|
||||
import { isCustomTool, isMcpTool } from '@/executor/constants'
|
||||
import { resolveSkillContent } from '@/executor/handlers/agent/skills-resolver'
|
||||
@@ -674,6 +675,13 @@ async function executeToolRequest(
|
||||
const headers = new Headers(requestParams.headers)
|
||||
await addInternalAuthIfNeeded(headers, isInternalRoute, requestId, toolId)
|
||||
|
||||
if (isInternalRoute) {
|
||||
const callChain = params._context?.callChain as string[] | undefined
|
||||
if (callChain && callChain.length > 0) {
|
||||
headers.set(SIM_VIA_HEADER, serializeCallChain(callChain))
|
||||
}
|
||||
}
|
||||
|
||||
// Check request body size before sending to detect potential size limit issues
|
||||
validateRequestBodySize(requestParams.body, requestId, toolId)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user