Compare commits

..

6 Commits

Author SHA1 Message Date
Vikhyath Mondreti
501b44e05a fix type issues 2026-02-02 19:53:12 -08:00
Vikhyath Mondreti
7c1e7273de feat(timeouts): execution timeout limits 2026-02-02 19:24:09 -08:00
Waleed
a9b7d75d87 feat(editor): added docs link to editor (#3116) 2026-02-02 12:22:08 -08:00
Vikhyath Mondreti
0449804ffb improvement(billing): duplicate checks for bypasses, logger billing actor consistency, run from block (#3107)
* improvement(billing): improve against direct subscription creation bypasses

* more usage of block/unblock helpers

* address bugbot comments

* fail closed

* only run dup check for orgs
2026-02-02 10:52:08 -08:00
Vikhyath Mondreti
c286f3ed24 fix(mcp): child workflow with response block returns error (#3114) 2026-02-02 09:30:35 -08:00
Vikhyath Mondreti
b738550815 fix(cleanup-cron): stale execution cleanup integer overflow (#3113) 2026-02-02 09:03:56 -08:00
45 changed files with 745 additions and 317 deletions

View File

@@ -8,6 +8,7 @@ import { verifyCronAuth } from '@/lib/auth/internal'
const logger = createLogger('CleanupStaleExecutions') const logger = createLogger('CleanupStaleExecutions')
const STALE_THRESHOLD_MINUTES = 30 const STALE_THRESHOLD_MINUTES = 30
const MAX_INT32 = 2_147_483_647
export async function GET(request: NextRequest) { export async function GET(request: NextRequest) {
try { try {
@@ -45,13 +46,14 @@ export async function GET(request: NextRequest) {
try { try {
const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime() const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime()
const staleDurationMinutes = Math.round(staleDurationMs / 60000) const staleDurationMinutes = Math.round(staleDurationMs / 60000)
const totalDurationMs = Math.min(staleDurationMs, MAX_INT32)
await db await db
.update(workflowExecutionLogs) .update(workflowExecutionLogs)
.set({ .set({
status: 'failed', status: 'failed',
endedAt: new Date(), endedAt: new Date(),
totalDurationMs: staleDurationMs, totalDurationMs,
executionData: sql`jsonb_set( executionData: sql`jsonb_set(
COALESCE(execution_data, '{}'::jsonb), COALESCE(execution_data, '{}'::jsonb),
ARRAY['error'], ARRAY['error'],

View File

@@ -21,6 +21,7 @@ import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server' import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid' import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateInternalToken } from '@/lib/auth/internal' import { generateInternalToken } from '@/lib/auth/internal'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { getBaseUrl } from '@/lib/core/utils/urls' import { getBaseUrl } from '@/lib/core/utils/urls'
const logger = createLogger('WorkflowMcpServeAPI') const logger = createLogger('WorkflowMcpServeAPI')
@@ -264,7 +265,7 @@ async function handleToolsCall(
method: 'POST', method: 'POST',
headers, headers,
body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }), body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }),
signal: AbortSignal.timeout(600000), // 10 minute timeout signal: AbortSignal.timeout(getMaxExecutionTimeout()),
}) })
const executeResult = await response.json() const executeResult = await response.json()
@@ -284,7 +285,7 @@ async function handleToolsCall(
content: [ content: [
{ type: 'text', text: JSON.stringify(executeResult.output || executeResult, null, 2) }, { type: 'text', text: JSON.stringify(executeResult.output || executeResult, null, 2) },
], ],
isError: !executeResult.success, isError: executeResult.success === false,
} }
return NextResponse.json(createResponse(id, result)) return NextResponse.json(createResponse(id, result))

View File

@@ -1,5 +1,8 @@
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import type { NextRequest } from 'next/server' import type { NextRequest } from 'next/server'
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
import { getExecutionTimeout } from '@/lib/core/execution-limits'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware' import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
import { mcpService } from '@/lib/mcp/service' import { mcpService } from '@/lib/mcp/service'
import type { McpTool, McpToolCall, McpToolResult } from '@/lib/mcp/types' import type { McpTool, McpToolCall, McpToolResult } from '@/lib/mcp/types'
@@ -7,7 +10,6 @@ import {
categorizeError, categorizeError,
createMcpErrorResponse, createMcpErrorResponse,
createMcpSuccessResponse, createMcpSuccessResponse,
MCP_CONSTANTS,
validateStringParam, validateStringParam,
} from '@/lib/mcp/utils' } from '@/lib/mcp/utils'
@@ -171,13 +173,16 @@ export const POST = withMcpAuth('read')(
arguments: args, arguments: args,
} }
const userSubscription = await getHighestPrioritySubscription(userId)
const executionTimeout = getExecutionTimeout(
userSubscription?.plan as SubscriptionPlan | undefined,
'sync'
)
const result = await Promise.race([ const result = await Promise.race([
mcpService.executeTool(userId, serverId, toolCall, workspaceId), mcpService.executeTool(userId, serverId, toolCall, workspaceId),
new Promise<never>((_, reject) => new Promise<never>((_, reject) =>
setTimeout( setTimeout(() => reject(new Error('Tool execution timeout')), executionTimeout)
() => reject(new Error('Tool execution timeout')),
MCP_CONSTANTS.EXECUTION_TIMEOUT
)
), ),
]) ])

View File

@@ -20,6 +20,7 @@ import { z } from 'zod'
import { getEmailSubject, renderInvitationEmail } from '@/components/emails' import { getEmailSubject, renderInvitationEmail } from '@/components/emails'
import { getSession } from '@/lib/auth' import { getSession } from '@/lib/auth'
import { hasAccessControlAccess } from '@/lib/billing' import { hasAccessControlAccess } from '@/lib/billing'
import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage'
import { requireStripeClient } from '@/lib/billing/stripe-client' import { requireStripeClient } from '@/lib/billing/stripe-client'
import { getBaseUrl } from '@/lib/core/utils/urls' import { getBaseUrl } from '@/lib/core/utils/urls'
import { sendEmail } from '@/lib/messaging/email/mailer' import { sendEmail } from '@/lib/messaging/email/mailer'
@@ -501,6 +502,18 @@ export async function PUT(
} }
} }
if (status === 'accepted') {
try {
await syncUsageLimitsFromSubscription(session.user.id)
} catch (syncError) {
logger.error('Failed to sync usage limits after joining org', {
userId: session.user.id,
organizationId,
error: syncError,
})
}
}
logger.info(`Organization invitation ${status}`, { logger.info(`Organization invitation ${status}`, {
organizationId, organizationId,
invitationId, invitationId,

View File

@@ -5,6 +5,7 @@ import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server' import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod' import { z } from 'zod'
import { getSession } from '@/lib/auth' import { getSession } from '@/lib/auth'
import { hasActiveSubscription } from '@/lib/billing'
const logger = createLogger('SubscriptionTransferAPI') const logger = createLogger('SubscriptionTransferAPI')
@@ -88,6 +89,14 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
) )
} }
// Check if org already has an active subscription (prevent duplicates)
if (await hasActiveSubscription(organizationId)) {
return NextResponse.json(
{ error: 'Organization already has an active subscription' },
{ status: 409 }
)
}
await db await db
.update(subscription) .update(subscription)
.set({ referenceId: organizationId }) .set({ referenceId: organizationId })

View File

@@ -203,6 +203,10 @@ export const PATCH = withAdminAuthParams<RouteParams>(async (request, context) =
} }
updateData.billingBlocked = body.billingBlocked updateData.billingBlocked = body.billingBlocked
// Clear the reason when unblocking
if (body.billingBlocked === false) {
updateData.billingBlockedReason = null
}
updated.push('billingBlocked') updated.push('billingBlocked')
} }

View File

@@ -1,13 +1,13 @@
import { db, workflow as workflowTable } from '@sim/db'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server' import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid' import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod' import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid' import { checkHybridAuth } from '@/lib/auth/hybrid'
import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits'
import { generateRequestId } from '@/lib/core/utils/request' import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse' import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { markExecutionCancelled } from '@/lib/execution/cancellation' import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session' import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events' import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
@@ -75,12 +75,31 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const { startBlockId, sourceSnapshot, input } = validation.data const { startBlockId, sourceSnapshot, input } = validation.data
const executionId = uuidv4() const executionId = uuidv4()
const [workflowRecord] = await db // Run preprocessing checks (billing, rate limits, usage limits)
.select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId }) const preprocessResult = await preprocessExecution({
.from(workflowTable) workflowId,
.where(eq(workflowTable.id, workflowId)) userId,
.limit(1) triggerType: 'manual',
executionId,
requestId,
checkRateLimit: false, // Manual executions don't rate limit
checkDeployment: false, // Run-from-block doesn't require deployment
})
if (!preprocessResult.success) {
const { error } = preprocessResult
logger.warn(`[${requestId}] Preprocessing failed for run-from-block`, {
workflowId,
error: error?.message,
statusCode: error?.statusCode,
})
return NextResponse.json(
{ error: error?.message || 'Execution blocked' },
{ status: error?.statusCode || 500 }
)
}
const workflowRecord = preprocessResult.workflowRecord
if (!workflowRecord?.workspaceId) { if (!workflowRecord?.workspaceId) {
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 }) return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
} }
@@ -92,11 +111,22 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflowId, workflowId,
startBlockId, startBlockId,
executedBlocksCount: sourceSnapshot.executedBlocks.length, executedBlocksCount: sourceSnapshot.executedBlocks.length,
billingActorUserId: preprocessResult.actorUserId,
}) })
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId) const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
const abortController = new AbortController() const abortController = new AbortController()
let isStreamClosed = false let isStreamClosed = false
let isTimedOut = false
const syncTimeout = preprocessResult.executionTimeout?.sync
let timeoutId: NodeJS.Timeout | undefined
if (syncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, syncTimeout)
}
const stream = new ReadableStream<Uint8Array>({ const stream = new ReadableStream<Uint8Array>({
async start(controller) { async start(controller) {
@@ -148,6 +178,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}) })
if (result.status === 'cancelled') { if (result.status === 'cancelled') {
if (isTimedOut && syncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
logger.info(`[${requestId}] Run-from-block execution timed out`, {
timeoutMs: syncTimeout,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: timeoutErrorMessage,
duration: result.metadata?.duration || 0,
},
})
} else {
sendEvent({ sendEvent({
type: 'execution:cancelled', type: 'execution:cancelled',
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
@@ -155,6 +204,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflowId, workflowId,
data: { duration: result.metadata?.duration || 0 }, data: { duration: result.metadata?.duration || 0 },
}) })
}
} else { } else {
sendEvent({ sendEvent({
type: 'execution:completed', type: 'execution:completed',
@@ -171,11 +221,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}) })
} }
} catch (error: unknown) { } catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error' const isTimeout = isTimeoutError(error) || isTimedOut
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`) const errorMessage = isTimeout
? getTimeoutErrorMessage(error, syncTimeout)
: error instanceof Error
? error.message
: 'Unknown error'
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`, {
isTimeout,
})
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
await loggingSession.safeCompleteWithError({
totalDurationMs: executionResult?.metadata?.duration,
error: { message: errorMessage },
traceSpans: executionResult?.logs as any,
})
sendEvent({ sendEvent({
type: 'execution:error', type: 'execution:error',
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
@@ -187,6 +251,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}, },
}) })
} finally { } finally {
if (timeoutId) clearTimeout(timeoutId)
if (!isStreamClosed) { if (!isStreamClosed) {
try { try {
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n')) controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
@@ -197,6 +262,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}, },
cancel() { cancel() {
isStreamClosed = true isStreamClosed = true
if (timeoutId) clearTimeout(timeoutId)
abortController.abort() abortController.abort()
markExecutionCancelled(executionId).catch(() => {}) markExecutionCancelled(executionId).catch(() => {})
}, },

View File

@@ -5,6 +5,7 @@ import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { z } from 'zod' import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid' import { checkHybridAuth } from '@/lib/auth/hybrid'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags' import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits'
import { generateRequestId } from '@/lib/core/utils/request' import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse' import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls' import { getBaseUrl } from '@/lib/core/utils/urls'
@@ -120,10 +121,6 @@ type AsyncExecutionParams = {
triggerType: CoreTriggerType triggerType: CoreTriggerType
} }
/**
* Handles async workflow execution by queueing a background job.
* Returns immediately with a 202 Accepted response containing the job ID.
*/
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> { async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
const { requestId, workflowId, userId, input, triggerType } = params const { requestId, workflowId, userId, input, triggerType } = params
@@ -405,6 +402,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
if (!enableSSE) { if (!enableSSE) {
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`) logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
const syncTimeout = preprocessResult.executionTimeout?.sync
try { try {
const metadata: ExecutionMetadata = { const metadata: ExecutionMetadata = {
requestId, requestId,
@@ -438,6 +436,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64, includeFileBase64,
base64MaxBytes, base64MaxBytes,
stopAfterBlockId, stopAfterBlockId,
abortSignal: syncTimeout ? AbortSignal.timeout(syncTimeout) : undefined,
}) })
const outputWithBase64 = includeFileBase64 const outputWithBase64 = includeFileBase64
@@ -473,11 +472,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
return NextResponse.json(filteredResult) return NextResponse.json(filteredResult)
} catch (error: unknown) { } catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error' const isTimeout = isTimeoutError(error)
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`) const errorMessage = isTimeout
? getTimeoutErrorMessage(error, syncTimeout)
: error instanceof Error
? error.message
: 'Unknown error'
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`, { isTimeout })
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
await loggingSession.safeCompleteWithError({
totalDurationMs: executionResult?.metadata?.duration,
error: { message: errorMessage },
traceSpans: executionResult?.logs as any,
})
return NextResponse.json( return NextResponse.json(
{ {
success: false, success: false,
@@ -491,7 +502,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
} }
: undefined, : undefined,
}, },
{ status: 500 } { status: isTimeout ? 408 : 500 }
) )
} }
} }
@@ -537,6 +548,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const encoder = new TextEncoder() const encoder = new TextEncoder()
const abortController = new AbortController() const abortController = new AbortController()
let isStreamClosed = false let isStreamClosed = false
let isTimedOut = false
const syncTimeout = preprocessResult.executionTimeout?.sync
let timeoutId: NodeJS.Timeout | undefined
if (syncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, syncTimeout)
}
const stream = new ReadableStream<Uint8Array>({ const stream = new ReadableStream<Uint8Array>({
async start(controller) { async start(controller) {
@@ -763,7 +784,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
} }
if (result.status === 'cancelled') { if (result.status === 'cancelled') {
if (isTimedOut && syncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
logger.info(`[${requestId}] Workflow execution timed out`, { timeoutMs: syncTimeout })
await loggingSession.markAsFailed(timeoutErrorMessage)
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: timeoutErrorMessage,
duration: result.metadata?.duration || 0,
},
})
} else {
logger.info(`[${requestId}] Workflow execution was cancelled`) logger.info(`[${requestId}] Workflow execution was cancelled`)
sendEvent({ sendEvent({
type: 'execution:cancelled', type: 'execution:cancelled',
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
@@ -773,6 +812,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
duration: result.metadata?.duration || 0, duration: result.metadata?.duration || 0,
}, },
}) })
}
return return
} }
@@ -799,11 +839,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
// Cleanup base64 cache for this execution // Cleanup base64 cache for this execution
await cleanupExecutionBase64Cache(executionId) await cleanupExecutionBase64Cache(executionId)
} catch (error: unknown) { } catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error' const isTimeout = isTimeoutError(error) || isTimedOut
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`) const errorMessage = isTimeout
? getTimeoutErrorMessage(error, syncTimeout)
: error instanceof Error
? error.message
: 'Unknown error'
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout })
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
await loggingSession.safeCompleteWithError({
totalDurationMs: executionResult?.metadata?.duration,
error: { message: errorMessage },
traceSpans: executionResult?.logs as any,
})
sendEvent({ sendEvent({
type: 'execution:error', type: 'execution:error',
timestamp: new Date().toISOString(), timestamp: new Date().toISOString(),
@@ -815,18 +867,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}, },
}) })
} finally { } finally {
if (timeoutId) clearTimeout(timeoutId)
if (!isStreamClosed) { if (!isStreamClosed) {
try { try {
controller.enqueue(encoder.encode('data: [DONE]\n\n')) controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close() controller.close()
} catch { } catch {}
// Stream already closed - nothing to do
}
} }
} }
}, },
cancel() { cancel() {
isStreamClosed = true isStreamClosed = true
if (timeoutId) clearTimeout(timeoutId)
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`) logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
abortController.abort() abortController.abort()
markExecutionCancelled(executionId).catch(() => {}) markExecutionCancelled(executionId).catch(() => {})

View File

@@ -50,6 +50,12 @@ import { useSubBlockStore } from '@/stores/workflows/subblock/store'
/** Stable empty object to avoid creating new references */ /** Stable empty object to avoid creating new references */
const EMPTY_SUBBLOCK_VALUES = {} as Record<string, any> const EMPTY_SUBBLOCK_VALUES = {} as Record<string, any>
/** Shared style for dashed divider lines */
const DASHED_DIVIDER_STYLE = {
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
} as const
/** /**
* Icon component for rendering block icons. * Icon component for rendering block icons.
* *
@@ -89,31 +95,23 @@ export function Editor() {
const blockConfig = currentBlock ? getBlock(currentBlock.type) : null const blockConfig = currentBlock ? getBlock(currentBlock.type) : null
const title = currentBlock?.name || 'Editor' const title = currentBlock?.name || 'Editor'
// Check if selected block is a subflow (loop or parallel)
const isSubflow = const isSubflow =
currentBlock && (currentBlock.type === 'loop' || currentBlock.type === 'parallel') currentBlock && (currentBlock.type === 'loop' || currentBlock.type === 'parallel')
// Get subflow display properties from configs
const subflowConfig = isSubflow ? (currentBlock.type === 'loop' ? LoopTool : ParallelTool) : null const subflowConfig = isSubflow ? (currentBlock.type === 'loop' ? LoopTool : ParallelTool) : null
// Check if selected block is a workflow block
const isWorkflowBlock = const isWorkflowBlock =
currentBlock && (currentBlock.type === 'workflow' || currentBlock.type === 'workflow_input') currentBlock && (currentBlock.type === 'workflow' || currentBlock.type === 'workflow_input')
// Get workspace ID from params
const params = useParams() const params = useParams()
const workspaceId = params.workspaceId as string const workspaceId = params.workspaceId as string
// Refs for resize functionality
const subBlocksRef = useRef<HTMLDivElement>(null) const subBlocksRef = useRef<HTMLDivElement>(null)
// Get user permissions
const userPermissions = useUserPermissionsContext() const userPermissions = useUserPermissionsContext()
// Get active workflow ID
const activeWorkflowId = useWorkflowRegistry((state) => state.activeWorkflowId) const activeWorkflowId = useWorkflowRegistry((state) => state.activeWorkflowId)
// Get block properties (advanced/trigger modes)
const { advancedMode, triggerMode } = useEditorBlockProperties( const { advancedMode, triggerMode } = useEditorBlockProperties(
currentBlockId, currentBlockId,
currentWorkflow.isSnapshotView currentWorkflow.isSnapshotView
@@ -145,10 +143,9 @@ export function Editor() {
[subBlocksForCanonical] [subBlocksForCanonical]
) )
const canonicalModeOverrides = currentBlock?.data?.canonicalModes const canonicalModeOverrides = currentBlock?.data?.canonicalModes
const advancedValuesPresent = hasAdvancedValues( const advancedValuesPresent = useMemo(
subBlocksForCanonical, () => hasAdvancedValues(subBlocksForCanonical, blockSubBlockValues, canonicalIndex),
blockSubBlockValues, [subBlocksForCanonical, blockSubBlockValues, canonicalIndex]
canonicalIndex
) )
const displayAdvancedOptions = userPermissions.canEdit const displayAdvancedOptions = userPermissions.canEdit
? advancedMode ? advancedMode
@@ -156,11 +153,9 @@ export function Editor() {
const hasAdvancedOnlyFields = useMemo(() => { const hasAdvancedOnlyFields = useMemo(() => {
for (const subBlock of subBlocksForCanonical) { for (const subBlock of subBlocksForCanonical) {
// Must be standalone advanced (mode: 'advanced' without canonicalParamId)
if (subBlock.mode !== 'advanced') continue if (subBlock.mode !== 'advanced') continue
if (canonicalIndex.canonicalIdBySubBlockId[subBlock.id]) continue if (canonicalIndex.canonicalIdBySubBlockId[subBlock.id]) continue
// Check condition - skip if condition not met for current values
if ( if (
subBlock.condition && subBlock.condition &&
!evaluateSubBlockCondition(subBlock.condition, blockSubBlockValues) !evaluateSubBlockCondition(subBlock.condition, blockSubBlockValues)
@@ -173,7 +168,6 @@ export function Editor() {
return false return false
}, [subBlocksForCanonical, canonicalIndex.canonicalIdBySubBlockId, blockSubBlockValues]) }, [subBlocksForCanonical, canonicalIndex.canonicalIdBySubBlockId, blockSubBlockValues])
// Get subblock layout using custom hook
const { subBlocks, stateToUse: subBlockState } = useEditorSubblockLayout( const { subBlocks, stateToUse: subBlockState } = useEditorSubblockLayout(
blockConfig || ({} as any), blockConfig || ({} as any),
currentBlockId || '', currentBlockId || '',
@@ -206,31 +200,34 @@ export function Editor() {
return { regularSubBlocks: regular, advancedOnlySubBlocks: advancedOnly } return { regularSubBlocks: regular, advancedOnlySubBlocks: advancedOnly }
}, [subBlocks, canonicalIndex.canonicalIdBySubBlockId]) }, [subBlocks, canonicalIndex.canonicalIdBySubBlockId])
// Get block connections
const { incomingConnections, hasIncomingConnections } = useBlockConnections(currentBlockId || '') const { incomingConnections, hasIncomingConnections } = useBlockConnections(currentBlockId || '')
// Connections resize hook
const { handleMouseDown: handleConnectionsResizeMouseDown, isResizing } = useConnectionsResize({ const { handleMouseDown: handleConnectionsResizeMouseDown, isResizing } = useConnectionsResize({
subBlocksRef, subBlocksRef,
}) })
// Collaborative actions
const { const {
collaborativeSetBlockCanonicalMode, collaborativeSetBlockCanonicalMode,
collaborativeUpdateBlockName, collaborativeUpdateBlockName,
collaborativeToggleBlockAdvancedMode, collaborativeToggleBlockAdvancedMode,
} = useCollaborativeWorkflow() } = useCollaborativeWorkflow()
// Advanced mode toggle handler
const handleToggleAdvancedMode = useCallback(() => { const handleToggleAdvancedMode = useCallback(() => {
if (!currentBlockId || !userPermissions.canEdit) return if (!currentBlockId || !userPermissions.canEdit) return
collaborativeToggleBlockAdvancedMode(currentBlockId) collaborativeToggleBlockAdvancedMode(currentBlockId)
}, [currentBlockId, userPermissions.canEdit, collaborativeToggleBlockAdvancedMode]) }, [currentBlockId, userPermissions.canEdit, collaborativeToggleBlockAdvancedMode])
// Rename state
const [isRenaming, setIsRenaming] = useState(false) const [isRenaming, setIsRenaming] = useState(false)
const [editedName, setEditedName] = useState('') const [editedName, setEditedName] = useState('')
const nameInputRef = useRef<HTMLInputElement>(null)
/**
* Ref callback that auto-selects the input text when mounted.
*/
const nameInputRefCallback = useCallback((element: HTMLInputElement | null) => {
if (element) {
element.select()
}
}, [])
/** /**
* Handles starting the rename process. * Handles starting the rename process.
@@ -251,7 +248,6 @@ export function Editor() {
if (trimmedName && trimmedName !== currentBlock?.name) { if (trimmedName && trimmedName !== currentBlock?.name) {
const result = collaborativeUpdateBlockName(currentBlockId, trimmedName) const result = collaborativeUpdateBlockName(currentBlockId, trimmedName)
if (!result.success) { if (!result.success) {
// Keep rename mode open on error so user can correct the name
return return
} }
} }
@@ -266,14 +262,6 @@ export function Editor() {
setEditedName('') setEditedName('')
}, []) }, [])
// Focus input when entering rename mode
useEffect(() => {
if (isRenaming && nameInputRef.current) {
nameInputRef.current.select()
}
}, [isRenaming])
// Trigger rename mode when signaled from context menu
useEffect(() => { useEffect(() => {
if (shouldFocusRename && currentBlock) { if (shouldFocusRename && currentBlock) {
handleStartRename() handleStartRename()
@@ -284,17 +272,13 @@ export function Editor() {
/** /**
* Handles opening documentation link in a new secure tab. * Handles opening documentation link in a new secure tab.
*/ */
const handleOpenDocs = () => { const handleOpenDocs = useCallback(() => {
const docsLink = isSubflow ? subflowConfig?.docsLink : blockConfig?.docsLink const docsLink = isSubflow ? subflowConfig?.docsLink : blockConfig?.docsLink
if (docsLink) { window.open(docsLink || 'https://docs.sim.ai/quick-reference', '_blank', 'noopener,noreferrer')
window.open(docsLink, '_blank', 'noopener,noreferrer') }, [isSubflow, subflowConfig?.docsLink, blockConfig?.docsLink])
}
}
// Get child workflow ID for workflow blocks
const childWorkflowId = isWorkflowBlock ? blockSubBlockValues?.workflowId : null const childWorkflowId = isWorkflowBlock ? blockSubBlockValues?.workflowId : null
// Fetch child workflow state for preview (only for workflow blocks with a selected workflow)
const { data: childWorkflowState, isLoading: isLoadingChildWorkflow } = const { data: childWorkflowState, isLoading: isLoadingChildWorkflow } =
useWorkflowState(childWorkflowId) useWorkflowState(childWorkflowId)
@@ -307,7 +291,6 @@ export function Editor() {
} }
}, [childWorkflowId, workspaceId]) }, [childWorkflowId, workspaceId])
// Determine if connections are at minimum height (collapsed state)
const isConnectionsAtMinHeight = connectionsHeight <= 35 const isConnectionsAtMinHeight = connectionsHeight <= 35
return ( return (
@@ -328,7 +311,7 @@ export function Editor() {
)} )}
{isRenaming ? ( {isRenaming ? (
<input <input
ref={nameInputRef} ref={nameInputRefCallback}
type='text' type='text'
value={editedName} value={editedName}
onChange={(e) => setEditedName(e.target.value)} onChange={(e) => setEditedName(e.target.value)}
@@ -399,7 +382,6 @@ export function Editor() {
</Tooltip.Content> </Tooltip.Content>
</Tooltip.Root> </Tooltip.Root>
)} */} )} */}
{currentBlock && (isSubflow ? subflowConfig?.docsLink : blockConfig?.docsLink) && (
<Tooltip.Root> <Tooltip.Root>
<Tooltip.Trigger asChild> <Tooltip.Trigger asChild>
<Button <Button
@@ -415,7 +397,6 @@ export function Editor() {
<p>Open docs</p> <p>Open docs</p>
</Tooltip.Content> </Tooltip.Content>
</Tooltip.Root> </Tooltip.Root>
)}
</div> </div>
</div> </div>
@@ -495,13 +476,7 @@ export function Editor() {
</div> </div>
</div> </div>
<div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'> <div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'>
<div <div className='h-[1.25px]' style={DASHED_DIVIDER_STYLE} />
className='h-[1.25px]'
style={{
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
}}
/>
</div> </div>
</> </>
)} )}
@@ -566,13 +541,7 @@ export function Editor() {
/> />
{showDivider && ( {showDivider && (
<div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'> <div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'>
<div <div className='h-[1.25px]' style={DASHED_DIVIDER_STYLE} />
className='h-[1.25px]'
style={{
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
}}
/>
</div> </div>
)} )}
</div> </div>
@@ -581,13 +550,7 @@ export function Editor() {
{hasAdvancedOnlyFields && userPermissions.canEdit && ( {hasAdvancedOnlyFields && userPermissions.canEdit && (
<div className='flex items-center gap-[10px] px-[2px] pt-[14px] pb-[12px]'> <div className='flex items-center gap-[10px] px-[2px] pt-[14px] pb-[12px]'>
<div <div className='h-[1.25px] flex-1' style={DASHED_DIVIDER_STYLE} />
className='h-[1.25px] flex-1'
style={{
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
}}
/>
<button <button
type='button' type='button'
onClick={handleToggleAdvancedMode} onClick={handleToggleAdvancedMode}
@@ -600,13 +563,7 @@ export function Editor() {
className={`h-[14px] w-[14px] transition-transform duration-200 ${displayAdvancedOptions ? 'rotate-180' : ''}`} className={`h-[14px] w-[14px] transition-transform duration-200 ${displayAdvancedOptions ? 'rotate-180' : ''}`}
/> />
</button> </button>
<div <div className='h-[1.25px] flex-1' style={DASHED_DIVIDER_STYLE} />
className='h-[1.25px] flex-1'
style={{
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
}}
/>
</div> </div>
)} )}
@@ -630,13 +587,7 @@ export function Editor() {
/> />
{index < advancedOnlySubBlocks.length - 1 && ( {index < advancedOnlySubBlocks.length - 1 && (
<div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'> <div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'>
<div <div className='h-[1.25px]' style={DASHED_DIVIDER_STYLE} />
className='h-[1.25px]'
style={{
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
}}
/>
</div> </div>
)} )}
</div> </div>

View File

@@ -27,7 +27,7 @@ import { useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications' import { useNotificationStore } from '@/stores/notifications'
import { useVariablesStore } from '@/stores/panel' import { useVariablesStore } from '@/stores/panel'
import { useEnvironmentStore } from '@/stores/settings/environment' import { useEnvironmentStore } from '@/stores/settings/environment'
import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal' import { useTerminalConsoleStore } from '@/stores/terminal'
import { useWorkflowDiffStore } from '@/stores/workflow-diff' import { useWorkflowDiffStore } from '@/stores/workflow-diff'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { mergeSubblockState } from '@/stores/workflows/utils' import { mergeSubblockState } from '@/stores/workflows/utils'
@@ -1153,16 +1153,10 @@ export function useWorkflowExecution() {
logs: accumulatedBlockLogs, logs: accumulatedBlockLogs,
} }
// Only add workflow-level error if no blocks have executed yet if (activeWorkflowId) {
// This catches pre-execution errors (validation, serialization, etc.) cancelRunningEntries(activeWorkflowId)
// Block execution errors are already logged via onBlockError callback }
const { entries } = useTerminalConsoleStore.getState()
const existingLogs = entries.filter(
(log: ConsoleEntry) => log.executionId === executionId
)
if (existingLogs.length === 0) {
// No blocks executed yet - this is a pre-execution error
addConsole({ addConsole({
input: {}, input: {},
output: {}, output: {},
@@ -1172,11 +1166,16 @@ export function useWorkflowExecution() {
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(), startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
endedAt: new Date().toISOString(), endedAt: new Date().toISOString(),
workflowId: activeWorkflowId, workflowId: activeWorkflowId,
blockId: 'validation', blockId: 'workflow-error',
executionId, executionId,
blockName: 'Workflow Validation', blockName: 'Workflow Error',
blockType: 'validation', blockType: 'error',
}) })
},
onExecutionCancelled: () => {
if (activeWorkflowId) {
cancelRunningEntries(activeWorkflowId)
} }
}, },
}, },
@@ -1718,13 +1717,28 @@ export function useWorkflowExecution() {
'Workflow was modified. Run the workflow again to enable running from block.', 'Workflow was modified. Run the workflow again to enable running from block.',
workflowId, workflowId,
}) })
} else {
addNotification({
level: 'error',
message: data.error || 'Run from block failed',
workflowId,
})
} }
cancelRunningEntries(workflowId)
addConsole({
input: {},
output: {},
success: false,
error: data.error,
durationMs: data.duration || 0,
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
endedAt: new Date().toISOString(),
workflowId,
blockId: 'workflow-error',
executionId,
blockName: 'Workflow Error',
blockType: 'error',
})
},
onExecutionCancelled: () => {
cancelRunningEntries(workflowId)
}, },
}, },
}) })

View File

@@ -185,10 +185,16 @@ export const HTTP = {
}, },
} as const } as const
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
export const AGENT = { export const AGENT = {
DEFAULT_MODEL: 'claude-sonnet-4-5', DEFAULT_MODEL: 'claude-sonnet-4-5',
DEFAULT_FUNCTION_TIMEOUT: 600000, get DEFAULT_FUNCTION_TIMEOUT() {
REQUEST_TIMEOUT: 600000, return getMaxExecutionTimeout()
},
get REQUEST_TIMEOUT() {
return getMaxExecutionTimeout()
},
CUSTOM_TOOL_PREFIX: 'custom_', CUSTOM_TOOL_PREFIX: 'custom_',
} as const } as const

View File

@@ -14,7 +14,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
const { signal, executionId } = options const { signal, executionId } = options
const useRedis = isRedisCancellationEnabled() && !!executionId const useRedis = isRedisCancellationEnabled() && !!executionId
if (!useRedis && signal?.aborted) { if (signal?.aborted) {
return false return false
} }
@@ -27,7 +27,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
const cleanup = () => { const cleanup = () => {
if (mainTimeoutId) clearTimeout(mainTimeoutId) if (mainTimeoutId) clearTimeout(mainTimeoutId)
if (checkIntervalId) clearInterval(checkIntervalId) if (checkIntervalId) clearInterval(checkIntervalId)
if (!useRedis && signal) signal.removeEventListener('abort', onAbort) if (signal) signal.removeEventListener('abort', onAbort)
} }
const onAbort = () => { const onAbort = () => {
@@ -37,6 +37,10 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
resolve(false) resolve(false)
} }
if (signal) {
signal.addEventListener('abort', onAbort, { once: true })
}
if (useRedis) { if (useRedis) {
checkIntervalId = setInterval(async () => { checkIntervalId = setInterval(async () => {
if (resolved) return if (resolved) return
@@ -49,8 +53,6 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
} }
} catch {} } catch {}
}, CANCELLATION_CHECK_INTERVAL_MS) }, CANCELLATION_CHECK_INTERVAL_MS)
} else if (signal) {
signal.addEventListener('abort', onAbort, { once: true })
} }
mainTimeoutId = setTimeout(() => { mainTimeoutId = setTimeout(() => {

View File

@@ -1,7 +1,10 @@
export { AGENT_CARD_PATH } from '@a2a-js/sdk' export { AGENT_CARD_PATH } from '@a2a-js/sdk'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
export const A2A_PROTOCOL_VERSION = '0.3.0' export const A2A_PROTOCOL_VERSION = '0.3.0'
export const A2A_DEFAULT_TIMEOUT = 300000 export const A2A_DEFAULT_TIMEOUT = DEFAULT_EXECUTION_TIMEOUT_MS
/** /**
* Maximum number of messages stored per task in the database. * Maximum number of messages stored per task in the database.

View File

@@ -1,20 +1,37 @@
import { db } from '@sim/db' import { db } from '@sim/db'
import * as schema from '@sim/db/schema' import * as schema from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm' import { and, eq } from 'drizzle-orm'
import { hasActiveSubscription } from '@/lib/billing'
const logger = createLogger('BillingAuthorization')
/** /**
* Check if a user is authorized to manage billing for a given reference ID * Check if a user is authorized to manage billing for a given reference ID
* Reference ID can be either a user ID (individual subscription) or organization ID (team subscription) * Reference ID can be either a user ID (individual subscription) or organization ID (team subscription)
*
* This function also performs duplicate subscription validation for organizations:
* - Rejects if an organization already has an active subscription (prevents duplicates)
* - Personal subscriptions (referenceId === userId) skip this check to allow upgrades
*/ */
export async function authorizeSubscriptionReference( export async function authorizeSubscriptionReference(
userId: string, userId: string,
referenceId: string referenceId: string
): Promise<boolean> { ): Promise<boolean> {
// User can always manage their own subscriptions // User can always manage their own subscriptions (Pro upgrades, etc.)
if (referenceId === userId) { if (referenceId === userId) {
return true return true
} }
// For organizations: check for existing active subscriptions to prevent duplicates
if (await hasActiveSubscription(referenceId)) {
logger.warn('Blocking checkout - active subscription already exists for organization', {
userId,
referenceId,
})
return false
}
// Check if referenceId is an organizationId the user has admin rights to // Check if referenceId is an organizationId the user has admin rights to
const members = await db const members = await db
.select() .select()

View File

@@ -25,9 +25,11 @@ export function useSubscriptionUpgrade() {
} }
let currentSubscriptionId: string | undefined let currentSubscriptionId: string | undefined
let allSubscriptions: any[] = []
try { try {
const listResult = await client.subscription.list() const listResult = await client.subscription.list()
const activePersonalSub = listResult.data?.find( allSubscriptions = listResult.data || []
const activePersonalSub = allSubscriptions.find(
(sub: any) => sub.status === 'active' && sub.referenceId === userId (sub: any) => sub.status === 'active' && sub.referenceId === userId
) )
currentSubscriptionId = activePersonalSub?.id currentSubscriptionId = activePersonalSub?.id
@@ -50,6 +52,25 @@ export function useSubscriptionUpgrade() {
) )
if (existingOrg) { if (existingOrg) {
// Check if this org already has an active team subscription
const existingTeamSub = allSubscriptions.find(
(sub: any) =>
sub.status === 'active' &&
sub.referenceId === existingOrg.id &&
(sub.plan === 'team' || sub.plan === 'enterprise')
)
if (existingTeamSub) {
logger.warn('Organization already has an active team subscription', {
userId,
organizationId: existingOrg.id,
existingSubscriptionId: existingTeamSub.id,
})
throw new Error(
'This organization already has an active team subscription. Please manage it from the billing settings.'
)
}
logger.info('Using existing organization for team plan upgrade', { logger.info('Using existing organization for team plan upgrade', {
userId, userId,
organizationId: existingOrg.id, organizationId: existingOrg.id,

View File

@@ -1,5 +1,5 @@
import { db } from '@sim/db' import { db } from '@sim/db'
import { member, subscription } from '@sim/db/schema' import { member, organization, subscription } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq, inArray } from 'drizzle-orm' import { and, eq, inArray } from 'drizzle-orm'
import { checkEnterprisePlan, checkProPlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils' import { checkEnterprisePlan, checkProPlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
@@ -26,10 +26,22 @@ export async function getHighestPrioritySubscription(userId: string) {
let orgSubs: typeof personalSubs = [] let orgSubs: typeof personalSubs = []
if (orgIds.length > 0) { if (orgIds.length > 0) {
// Verify orgs exist to filter out orphaned subscriptions
const existingOrgs = await db
.select({ id: organization.id })
.from(organization)
.where(inArray(organization.id, orgIds))
const validOrgIds = existingOrgs.map((o) => o.id)
if (validOrgIds.length > 0) {
orgSubs = await db orgSubs = await db
.select() .select()
.from(subscription) .from(subscription)
.where(and(inArray(subscription.referenceId, orgIds), eq(subscription.status, 'active'))) .where(
and(inArray(subscription.referenceId, validOrgIds), eq(subscription.status, 'active'))
)
}
} }
const allSubs = [...personalSubs, ...orgSubs] const allSubs = [...personalSubs, ...orgSubs]

View File

@@ -25,6 +25,28 @@ const logger = createLogger('SubscriptionCore')
export { getHighestPrioritySubscription } export { getHighestPrioritySubscription }
/**
* Check if a referenceId (user ID or org ID) has an active subscription
* Used for duplicate subscription prevention
*
* Fails closed: returns true on error to prevent duplicate creation
*/
export async function hasActiveSubscription(referenceId: string): Promise<boolean> {
try {
const [activeSub] = await db
.select({ id: subscription.id })
.from(subscription)
.where(and(eq(subscription.referenceId, referenceId), eq(subscription.status, 'active')))
.limit(1)
return !!activeSub
} catch (error) {
logger.error('Error checking active subscription', { error, referenceId })
// Fail closed: assume subscription exists to prevent duplicate creation
return true
}
}
/** /**
* Check if user is on Pro plan (direct or via organization) * Check if user is on Pro plan (direct or via organization)
*/ */

View File

@@ -11,6 +11,7 @@ export {
getHighestPrioritySubscription as getActiveSubscription, getHighestPrioritySubscription as getActiveSubscription,
getUserSubscriptionState as getSubscriptionState, getUserSubscriptionState as getSubscriptionState,
hasAccessControlAccess, hasAccessControlAccess,
hasActiveSubscription,
hasCredentialSetsAccess, hasCredentialSetsAccess,
hasSSOAccess, hasSSOAccess,
isEnterpriseOrgAdminOrOwner, isEnterpriseOrgAdminOrOwner,
@@ -32,6 +33,11 @@ export {
} from '@/lib/billing/core/usage' } from '@/lib/billing/core/usage'
export * from '@/lib/billing/credits/balance' export * from '@/lib/billing/credits/balance'
export * from '@/lib/billing/credits/purchase' export * from '@/lib/billing/credits/purchase'
export {
blockOrgMembers,
getOrgMemberIds,
unblockOrgMembers,
} from '@/lib/billing/organizations/membership'
export * from '@/lib/billing/subscriptions/utils' export * from '@/lib/billing/subscriptions/utils'
export { canEditUsageLimit as canEditLimit } from '@/lib/billing/subscriptions/utils' export { canEditUsageLimit as canEditLimit } from '@/lib/billing/subscriptions/utils'
export * from '@/lib/billing/types' export * from '@/lib/billing/types'

View File

@@ -8,6 +8,7 @@ import {
} from '@sim/db/schema' } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm' import { and, eq } from 'drizzle-orm'
import { hasActiveSubscription } from '@/lib/billing'
import { getPlanPricing } from '@/lib/billing/core/billing' import { getPlanPricing } from '@/lib/billing/core/billing'
import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage' import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage'
@@ -159,6 +160,16 @@ export async function ensureOrganizationForTeamSubscription(
if (existingMembership.length > 0) { if (existingMembership.length > 0) {
const membership = existingMembership[0] const membership = existingMembership[0]
if (membership.role === 'owner' || membership.role === 'admin') { if (membership.role === 'owner' || membership.role === 'admin') {
// Check if org already has an active subscription (prevent duplicates)
if (await hasActiveSubscription(membership.organizationId)) {
logger.error('Organization already has an active subscription', {
userId,
organizationId: membership.organizationId,
newSubscriptionId: subscription.id,
})
throw new Error('Organization already has an active subscription')
}
logger.info('User already owns/admins an org, using it', { logger.info('User already owns/admins an org, using it', {
userId, userId,
organizationId: membership.organizationId, organizationId: membership.organizationId,

View File

@@ -15,13 +15,86 @@ import {
userStats, userStats,
} from '@sim/db/schema' } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq, sql } from 'drizzle-orm' import { and, eq, inArray, isNull, ne, or, sql } from 'drizzle-orm'
import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage' import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage'
import { requireStripeClient } from '@/lib/billing/stripe-client' import { requireStripeClient } from '@/lib/billing/stripe-client'
import { validateSeatAvailability } from '@/lib/billing/validation/seat-management' import { validateSeatAvailability } from '@/lib/billing/validation/seat-management'
const logger = createLogger('OrganizationMembership') const logger = createLogger('OrganizationMembership')
export type BillingBlockReason = 'payment_failed' | 'dispute'
/**
* Get all member user IDs for an organization
*/
export async function getOrgMemberIds(organizationId: string): Promise<string[]> {
const members = await db
.select({ userId: member.userId })
.from(member)
.where(eq(member.organizationId, organizationId))
return members.map((m) => m.userId)
}
/**
* Block all members of an organization for billing reasons
* Returns the number of members actually blocked
*
* Reason priority: dispute > payment_failed
* A payment_failed block won't overwrite an existing dispute block
*/
export async function blockOrgMembers(
organizationId: string,
reason: BillingBlockReason
): Promise<number> {
const memberIds = await getOrgMemberIds(organizationId)
if (memberIds.length === 0) {
return 0
}
// Don't overwrite dispute blocks with payment_failed (dispute is higher priority)
const whereClause =
reason === 'payment_failed'
? and(
inArray(userStats.userId, memberIds),
or(ne(userStats.billingBlockedReason, 'dispute'), isNull(userStats.billingBlockedReason))
)
: inArray(userStats.userId, memberIds)
const result = await db
.update(userStats)
.set({ billingBlocked: true, billingBlockedReason: reason })
.where(whereClause)
.returning({ userId: userStats.userId })
return result.length
}
/**
* Unblock all members of an organization blocked for a specific reason
* Only unblocks members blocked for the specified reason (not other reasons)
* Returns the number of members actually unblocked
*/
export async function unblockOrgMembers(
organizationId: string,
reason: BillingBlockReason
): Promise<number> {
const memberIds = await getOrgMemberIds(organizationId)
if (memberIds.length === 0) {
return 0
}
const result = await db
.update(userStats)
.set({ billingBlocked: false, billingBlockedReason: null })
.where(and(inArray(userStats.userId, memberIds), eq(userStats.billingBlockedReason, reason)))
.returning({ userId: userStats.userId })
return result.length
}
export interface RestoreProResult { export interface RestoreProResult {
restored: boolean restored: boolean
usageRestored: boolean usageRestored: boolean

View File

@@ -1,8 +1,9 @@
import { db } from '@sim/db' import { db } from '@sim/db'
import { member, subscription, user, userStats } from '@sim/db/schema' import { subscription, user, userStats } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm' import { and, eq } from 'drizzle-orm'
import type Stripe from 'stripe' import type Stripe from 'stripe'
import { blockOrgMembers, unblockOrgMembers } from '@/lib/billing'
import { requireStripeClient } from '@/lib/billing/stripe-client' import { requireStripeClient } from '@/lib/billing/stripe-client'
const logger = createLogger('DisputeWebhooks') const logger = createLogger('DisputeWebhooks')
@@ -57,36 +58,34 @@ export async function handleChargeDispute(event: Stripe.Event): Promise<void> {
if (subs.length > 0) { if (subs.length > 0) {
const orgId = subs[0].referenceId const orgId = subs[0].referenceId
const memberCount = await blockOrgMembers(orgId, 'dispute')
const owners = await db if (memberCount > 0) {
.select({ userId: member.userId }) logger.warn('Blocked all org members due to dispute', {
.from(member)
.where(and(eq(member.organizationId, orgId), eq(member.role, 'owner')))
.limit(1)
if (owners.length > 0) {
await db
.update(userStats)
.set({ billingBlocked: true, billingBlockedReason: 'dispute' })
.where(eq(userStats.userId, owners[0].userId))
logger.warn('Blocked org owner due to dispute', {
disputeId: dispute.id, disputeId: dispute.id,
ownerId: owners[0].userId,
organizationId: orgId, organizationId: orgId,
memberCount,
}) })
} }
} }
} }
/** /**
* Handles charge.dispute.closed - unblocks user if dispute was won * Handles charge.dispute.closed - unblocks user if dispute was won or warning closed
*
* Status meanings:
* - 'won': Merchant won, customer's chargeback denied → unblock
* - 'lost': Customer won, money refunded → stay blocked (they owe us)
* - 'warning_closed': Pre-dispute inquiry closed without chargeback → unblock (false alarm)
*/ */
export async function handleDisputeClosed(event: Stripe.Event): Promise<void> { export async function handleDisputeClosed(event: Stripe.Event): Promise<void> {
const dispute = event.data.object as Stripe.Dispute const dispute = event.data.object as Stripe.Dispute
if (dispute.status !== 'won') { // Only unblock if we won or the warning was closed without a full dispute
logger.info('Dispute not won, user remains blocked', { const shouldUnblock = dispute.status === 'won' || dispute.status === 'warning_closed'
if (!shouldUnblock) {
logger.info('Dispute resolved against us, user remains blocked', {
disputeId: dispute.id, disputeId: dispute.id,
status: dispute.status, status: dispute.status,
}) })
@@ -98,7 +97,7 @@ export async function handleDisputeClosed(event: Stripe.Event): Promise<void> {
return return
} }
// Find and unblock user (Pro plans) // Find and unblock user (Pro plans) - only if blocked for dispute, not other reasons
const users = await db const users = await db
.select({ id: user.id }) .select({ id: user.id })
.from(user) .from(user)
@@ -109,16 +108,17 @@ export async function handleDisputeClosed(event: Stripe.Event): Promise<void> {
await db await db
.update(userStats) .update(userStats)
.set({ billingBlocked: false, billingBlockedReason: null }) .set({ billingBlocked: false, billingBlockedReason: null })
.where(eq(userStats.userId, users[0].id)) .where(and(eq(userStats.userId, users[0].id), eq(userStats.billingBlockedReason, 'dispute')))
logger.info('Unblocked user after winning dispute', { logger.info('Unblocked user after dispute resolved in our favor', {
disputeId: dispute.id, disputeId: dispute.id,
userId: users[0].id, userId: users[0].id,
status: dispute.status,
}) })
return return
} }
// Find and unblock org owner (Team/Enterprise) // Find and unblock all org members (Team/Enterprise) - consistent with payment success
const subs = await db const subs = await db
.select({ referenceId: subscription.referenceId }) .select({ referenceId: subscription.referenceId })
.from(subscription) .from(subscription)
@@ -127,24 +127,13 @@ export async function handleDisputeClosed(event: Stripe.Event): Promise<void> {
if (subs.length > 0) { if (subs.length > 0) {
const orgId = subs[0].referenceId const orgId = subs[0].referenceId
const memberCount = await unblockOrgMembers(orgId, 'dispute')
const owners = await db logger.info('Unblocked all org members after dispute resolved in our favor', {
.select({ userId: member.userId })
.from(member)
.where(and(eq(member.organizationId, orgId), eq(member.role, 'owner')))
.limit(1)
if (owners.length > 0) {
await db
.update(userStats)
.set({ billingBlocked: false, billingBlockedReason: null })
.where(eq(userStats.userId, owners[0].userId))
logger.info('Unblocked org owner after winning dispute', {
disputeId: dispute.id, disputeId: dispute.id,
ownerId: owners[0].userId,
organizationId: orgId, organizationId: orgId,
memberCount,
status: dispute.status,
}) })
} }
} }
}

View File

@@ -8,12 +8,13 @@ import {
userStats, userStats,
} from '@sim/db/schema' } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq, inArray } from 'drizzle-orm' import { and, eq, inArray, isNull, ne, or } from 'drizzle-orm'
import type Stripe from 'stripe' import type Stripe from 'stripe'
import { getEmailSubject, PaymentFailedEmail, renderCreditPurchaseEmail } from '@/components/emails' import { getEmailSubject, PaymentFailedEmail, renderCreditPurchaseEmail } from '@/components/emails'
import { calculateSubscriptionOverage } from '@/lib/billing/core/billing' import { calculateSubscriptionOverage } from '@/lib/billing/core/billing'
import { addCredits, getCreditBalance, removeCredits } from '@/lib/billing/credits/balance' import { addCredits, getCreditBalance, removeCredits } from '@/lib/billing/credits/balance'
import { setUsageLimitForCredits } from '@/lib/billing/credits/purchase' import { setUsageLimitForCredits } from '@/lib/billing/credits/purchase'
import { blockOrgMembers, unblockOrgMembers } from '@/lib/billing/organizations/membership'
import { requireStripeClient } from '@/lib/billing/stripe-client' import { requireStripeClient } from '@/lib/billing/stripe-client'
import { getBaseUrl } from '@/lib/core/utils/urls' import { getBaseUrl } from '@/lib/core/utils/urls'
import { sendEmail } from '@/lib/messaging/email/mailer' import { sendEmail } from '@/lib/messaging/email/mailer'
@@ -502,24 +503,7 @@ export async function handleInvoicePaymentSucceeded(event: Stripe.Event) {
} }
if (sub.plan === 'team' || sub.plan === 'enterprise') { if (sub.plan === 'team' || sub.plan === 'enterprise') {
const members = await db await unblockOrgMembers(sub.referenceId, 'payment_failed')
.select({ userId: member.userId })
.from(member)
.where(eq(member.organizationId, sub.referenceId))
const memberIds = members.map((m) => m.userId)
if (memberIds.length > 0) {
// Only unblock users blocked for payment_failed, not disputes
await db
.update(userStats)
.set({ billingBlocked: false, billingBlockedReason: null })
.where(
and(
inArray(userStats.userId, memberIds),
eq(userStats.billingBlockedReason, 'payment_failed')
)
)
}
} else { } else {
// Only unblock users blocked for payment_failed, not disputes // Only unblock users blocked for payment_failed, not disputes
await db await db
@@ -616,28 +600,26 @@ export async function handleInvoicePaymentFailed(event: Stripe.Event) {
if (records.length > 0) { if (records.length > 0) {
const sub = records[0] const sub = records[0]
if (sub.plan === 'team' || sub.plan === 'enterprise') { if (sub.plan === 'team' || sub.plan === 'enterprise') {
const members = await db const memberCount = await blockOrgMembers(sub.referenceId, 'payment_failed')
.select({ userId: member.userId })
.from(member)
.where(eq(member.organizationId, sub.referenceId))
const memberIds = members.map((m) => m.userId)
if (memberIds.length > 0) {
await db
.update(userStats)
.set({ billingBlocked: true, billingBlockedReason: 'payment_failed' })
.where(inArray(userStats.userId, memberIds))
}
logger.info('Blocked team/enterprise members due to payment failure', { logger.info('Blocked team/enterprise members due to payment failure', {
organizationId: sub.referenceId, organizationId: sub.referenceId,
memberCount: members.length, memberCount,
isOverageInvoice, isOverageInvoice,
}) })
} else { } else {
// Don't overwrite dispute blocks (dispute > payment_failed priority)
await db await db
.update(userStats) .update(userStats)
.set({ billingBlocked: true, billingBlockedReason: 'payment_failed' }) .set({ billingBlocked: true, billingBlockedReason: 'payment_failed' })
.where(eq(userStats.userId, sub.referenceId)) .where(
and(
eq(userStats.userId, sub.referenceId),
or(
ne(userStats.billingBlockedReason, 'dispute'),
isNull(userStats.billingBlockedReason)
)
)
)
logger.info('Blocked user due to payment failure', { logger.info('Blocked user due to payment failure', {
userId: sub.referenceId, userId: sub.referenceId,
isOverageInvoice, isOverageInvoice,

View File

@@ -3,6 +3,7 @@ import { member, organization, subscription } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq, ne } from 'drizzle-orm' import { and, eq, ne } from 'drizzle-orm'
import { calculateSubscriptionOverage } from '@/lib/billing/core/billing' import { calculateSubscriptionOverage } from '@/lib/billing/core/billing'
import { hasActiveSubscription } from '@/lib/billing/core/subscription'
import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage' import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage'
import { restoreUserProSubscription } from '@/lib/billing/organizations/membership' import { restoreUserProSubscription } from '@/lib/billing/organizations/membership'
import { requireStripeClient } from '@/lib/billing/stripe-client' import { requireStripeClient } from '@/lib/billing/stripe-client'
@@ -52,14 +53,37 @@ async function restoreMemberProSubscriptions(organizationId: string): Promise<nu
/** /**
* Cleanup organization when team/enterprise subscription is deleted. * Cleanup organization when team/enterprise subscription is deleted.
* - Checks if other active subscriptions point to this org (skip deletion if so)
* - Restores member Pro subscriptions * - Restores member Pro subscriptions
* - Deletes the organization * - Deletes the organization (only if no other active subs)
* - Syncs usage limits for former members (resets to free or Pro tier) * - Syncs usage limits for former members (resets to free or Pro tier)
*/ */
async function cleanupOrganizationSubscription(organizationId: string): Promise<{ async function cleanupOrganizationSubscription(organizationId: string): Promise<{
restoredProCount: number restoredProCount: number
membersSynced: number membersSynced: number
organizationDeleted: boolean
}> { }> {
// Check if other active subscriptions still point to this org
// Note: The subscription being deleted is already marked as 'canceled' by better-auth
// before this handler runs, so we only find truly active ones
if (await hasActiveSubscription(organizationId)) {
logger.info('Skipping organization deletion - other active subscriptions exist', {
organizationId,
})
// Still sync limits for members since this subscription was deleted
const memberUserIds = await db
.select({ userId: member.userId })
.from(member)
.where(eq(member.organizationId, organizationId))
for (const m of memberUserIds) {
await syncUsageLimitsFromSubscription(m.userId)
}
return { restoredProCount: 0, membersSynced: memberUserIds.length, organizationDeleted: false }
}
// Get member userIds before deletion (needed for limit syncing after org deletion) // Get member userIds before deletion (needed for limit syncing after org deletion)
const memberUserIds = await db const memberUserIds = await db
.select({ userId: member.userId }) .select({ userId: member.userId })
@@ -75,7 +99,7 @@ async function cleanupOrganizationSubscription(organizationId: string): Promise<
await syncUsageLimitsFromSubscription(m.userId) await syncUsageLimitsFromSubscription(m.userId)
} }
return { restoredProCount, membersSynced: memberUserIds.length } return { restoredProCount, membersSynced: memberUserIds.length, organizationDeleted: true }
} }
/** /**
@@ -172,15 +196,14 @@ export async function handleSubscriptionDeleted(subscription: {
referenceId: subscription.referenceId, referenceId: subscription.referenceId,
}) })
const { restoredProCount, membersSynced } = await cleanupOrganizationSubscription( const { restoredProCount, membersSynced, organizationDeleted } =
subscription.referenceId await cleanupOrganizationSubscription(subscription.referenceId)
)
logger.info('Successfully processed enterprise subscription cancellation', { logger.info('Successfully processed enterprise subscription cancellation', {
subscriptionId: subscription.id, subscriptionId: subscription.id,
stripeSubscriptionId, stripeSubscriptionId,
restoredProCount, restoredProCount,
organizationDeleted: true, organizationDeleted,
membersSynced, membersSynced,
}) })
return return
@@ -297,7 +320,7 @@ export async function handleSubscriptionDeleted(subscription: {
const cleanup = await cleanupOrganizationSubscription(subscription.referenceId) const cleanup = await cleanupOrganizationSubscription(subscription.referenceId)
restoredProCount = cleanup.restoredProCount restoredProCount = cleanup.restoredProCount
membersSynced = cleanup.membersSynced membersSynced = cleanup.membersSynced
organizationDeleted = true organizationDeleted = cleanup.organizationDeleted
} else if (subscription.plan === 'pro') { } else if (subscription.plan === 'pro') {
await syncUsageLimitsFromSubscription(subscription.referenceId) await syncUsageLimitsFromSubscription(subscription.referenceId)
membersSynced = 1 membersSynced = 1

View File

@@ -5,11 +5,9 @@ import type { ToolUIConfig } from './ui-config'
const baseToolLogger = createLogger('BaseClientTool') const baseToolLogger = createLogger('BaseClientTool')
/** Default timeout for tool execution (5 minutes) */ const DEFAULT_TOOL_TIMEOUT_MS = 5 * 60 * 1000
const DEFAULT_TOOL_TIMEOUT_MS = 2 * 60 * 1000
/** Timeout for tools that run workflows (10 minutes) */ export const WORKFLOW_EXECUTION_TIMEOUT_MS = 5 * 60 * 1000
export const WORKFLOW_EXECUTION_TIMEOUT_MS = 10 * 60 * 1000
// Client tool call states used by the new runtime // Client tool call states used by the new runtime
export enum ClientToolCallState { export enum ClientToolCallState {

View File

@@ -170,6 +170,11 @@ export const env = createEnv({
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'),
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3600'),
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3600'),
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3600'),
// Knowledge Base Processing Configuration - Shared across all processing methods // Knowledge Base Processing Configuration - Shared across all processing methods
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes) KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts

View File

@@ -0,0 +1 @@
export * from './types'

View File

@@ -0,0 +1,122 @@
import { env } from '@/lib/core/config/env'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
export interface ExecutionTimeoutConfig {
sync: number
async: number
}
const DEFAULT_SYNC_TIMEOUTS = {
free: 300,
pro: 3600,
team: 3600,
enterprise: 3600,
} as const
const ASYNC_TIMEOUT_SECONDS = 5400
function getSyncTimeoutForPlan(plan: SubscriptionPlan): number {
const envVarMap: Record<SubscriptionPlan, string | undefined> = {
free: env.EXECUTION_TIMEOUT_FREE,
pro: env.EXECUTION_TIMEOUT_PRO,
team: env.EXECUTION_TIMEOUT_TEAM,
enterprise: env.EXECUTION_TIMEOUT_ENTERPRISE,
}
return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS[plan]) * 1000
}
export const EXECUTION_TIMEOUTS: Record<SubscriptionPlan, ExecutionTimeoutConfig> = {
free: {
sync: getSyncTimeoutForPlan('free'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
},
pro: {
sync: getSyncTimeoutForPlan('pro'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
},
team: {
sync: getSyncTimeoutForPlan('team'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
},
enterprise: {
sync: getSyncTimeoutForPlan('enterprise'),
async: ASYNC_TIMEOUT_SECONDS * 1000,
},
}
export function getExecutionTimeout(
plan: SubscriptionPlan | undefined,
type: 'sync' | 'async' = 'sync'
): number {
return EXECUTION_TIMEOUTS[plan || 'free'][type]
}
export function getExecutionTimeoutSeconds(
plan: SubscriptionPlan | undefined,
type: 'sync' | 'async' = 'sync'
): number {
return Math.floor(getExecutionTimeout(plan, type) / 1000)
}
export function getMaxExecutionTimeout(): number {
return EXECUTION_TIMEOUTS.enterprise.async
}
export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync
export class ExecutionTimeoutError extends Error {
constructor(
public readonly timeoutMs: number,
public readonly plan?: SubscriptionPlan
) {
const timeoutSeconds = Math.floor(timeoutMs / 1000)
const timeoutMinutes = Math.floor(timeoutSeconds / 60)
const displayTime =
timeoutMinutes > 0
? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}`
: `${timeoutSeconds} seconds`
super(`Execution timed out after ${displayTime}`)
this.name = 'ExecutionTimeoutError'
}
}
export function isTimeoutError(error: unknown): boolean {
if (error instanceof ExecutionTimeoutError) return true
if (!(error instanceof Error)) return false
const name = error.name.toLowerCase()
const message = error.message.toLowerCase()
return (
name === 'timeouterror' ||
name === 'aborterror' ||
message.includes('timeout') ||
message.includes('timed out') ||
message.includes('aborted')
)
}
export function createTimeoutError(
timeoutMs: number,
plan?: SubscriptionPlan
): ExecutionTimeoutError {
return new ExecutionTimeoutError(timeoutMs, plan)
}
export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): string {
if (error instanceof ExecutionTimeoutError) {
return error.message
}
if (timeoutMs) {
const timeoutSeconds = Math.floor(timeoutMs / 1000)
const timeoutMinutes = Math.floor(timeoutSeconds / 60)
const displayTime =
timeoutMinutes > 0
? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}`
: `${timeoutSeconds} seconds`
return `Execution timed out after ${displayTime}`
}
return 'Execution timed out'
}

View File

@@ -1,7 +1,3 @@
/** import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
* Execution timeout constants
*
* DEFAULT_EXECUTION_TIMEOUT_MS: The default timeout for executing user code (10 minutes)
*/
export const DEFAULT_EXECUTION_TIMEOUT_MS = 600000 // 10 minutes (600 seconds) export { DEFAULT_EXECUTION_TIMEOUT_MS }

View File

@@ -4,7 +4,9 @@ import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm' import { eq } from 'drizzle-orm'
import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor' import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getExecutionTimeout } from '@/lib/core/execution-limits'
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter' import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
import { LoggingSession } from '@/lib/logs/execution/logging-session' import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils' import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import type { CoreTriggerType } from '@/stores/logs/filters/types' import type { CoreTriggerType } from '@/stores/logs/filters/types'
@@ -133,10 +135,10 @@ export interface PreprocessExecutionResult {
success: boolean success: boolean
error?: { error?: {
message: string message: string
statusCode: number // HTTP status code (401, 402, 403, 404, 429, 500) statusCode: number
logCreated: boolean // Whether error was logged to execution_logs logCreated: boolean
} }
actorUserId?: string // The user ID that will be billed actorUserId?: string
workflowRecord?: WorkflowRecord workflowRecord?: WorkflowRecord
userSubscription?: SubscriptionInfo | null userSubscription?: SubscriptionInfo | null
rateLimitInfo?: { rateLimitInfo?: {
@@ -144,6 +146,10 @@ export interface PreprocessExecutionResult {
remaining: number remaining: number
resetAt: Date resetAt: Date
} }
executionTimeout?: {
sync: number
async: number
}
} }
type WorkflowRecord = typeof workflow.$inferSelect type WorkflowRecord = typeof workflow.$inferSelect
@@ -484,12 +490,17 @@ export async function preprocessExecution(
triggerType, triggerType,
}) })
const plan = userSubscription?.plan as SubscriptionPlan | undefined
return { return {
success: true, success: true,
actorUserId, actorUserId,
workflowRecord, workflowRecord,
userSubscription, userSubscription,
rateLimitInfo, rateLimitInfo,
executionTimeout: {
sync: getExecutionTimeout(plan, 'sync'),
async: getExecutionTimeout(plan, 'async'),
},
} }
} }

View File

@@ -33,6 +33,7 @@ import type {
WorkflowExecutionSnapshot, WorkflowExecutionSnapshot,
WorkflowState, WorkflowState,
} from '@/lib/logs/types' } from '@/lib/logs/types'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
export interface ToolCall { export interface ToolCall {
name: string name: string
@@ -503,7 +504,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
} }
try { try {
// Get the workflow record to get the userId // Get the workflow record to get workspace and fallback userId
const [workflowRecord] = await db const [workflowRecord] = await db
.select() .select()
.from(workflow) .from(workflow)
@@ -515,7 +516,12 @@ export class ExecutionLogger implements IExecutionLoggerService {
return return
} }
const userId = workflowRecord.userId let billingUserId: string | null = null
if (workflowRecord.workspaceId) {
billingUserId = await getWorkspaceBilledAccountUserId(workflowRecord.workspaceId)
}
const userId = billingUserId || workflowRecord.userId
const costToStore = costSummary.totalCost const costToStore = costSummary.totalCost
const existing = await db.select().from(userStats).where(eq(userStats.userId, userId)) const existing = await db.select().from(userStats).where(eq(userStats.userId, userId))

View File

@@ -776,11 +776,16 @@ export class LoggingSession {
await db await db
.update(workflowExecutionLogs) .update(workflowExecutionLogs)
.set({ .set({
level: 'error',
status: 'failed', status: 'failed',
executionData: sql`jsonb_set( executionData: sql`jsonb_set(
jsonb_set(
COALESCE(execution_data, '{}'::jsonb), COALESCE(execution_data, '{}'::jsonb),
ARRAY['error'], ARRAY['error'],
to_jsonb(${message}::text) to_jsonb(${message}::text)
),
ARRAY['finalOutput'],
jsonb_build_object('error', ${message}::text)
)`, )`,
}) })
.where(eq(workflowExecutionLogs.executionId, executionId)) .where(eq(workflowExecutionLogs.executionId, executionId))

View File

@@ -12,6 +12,7 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js'
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js' import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'
import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js' import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { import {
McpConnectionError, McpConnectionError,
type McpConnectionStatus, type McpConnectionStatus,
@@ -202,7 +203,7 @@ export class McpClient {
const sdkResult = await this.client.callTool( const sdkResult = await this.client.callTool(
{ name: toolCall.name, arguments: toolCall.arguments }, { name: toolCall.name, arguments: toolCall.arguments },
undefined, undefined,
{ timeout: 600000 } // 10 minutes - override SDK's 60s default { timeout: getMaxExecutionTimeout() }
) )
return sdkResult as McpToolResult return sdkResult as McpToolResult

View File

@@ -34,7 +34,7 @@ export function sanitizeHeaders(
* Client-safe MCP constants * Client-safe MCP constants
*/ */
export const MCP_CLIENT_CONSTANTS = { export const MCP_CLIENT_CONSTANTS = {
CLIENT_TIMEOUT: 600000, CLIENT_TIMEOUT: 5 * 60 * 1000,
MAX_RETRIES: 3, MAX_RETRIES: 3,
RECONNECT_DELAY: 1000, RECONNECT_DELAY: 1000,
} as const } as const

View File

@@ -81,8 +81,8 @@ describe('generateMcpServerId', () => {
}) })
describe('MCP_CONSTANTS', () => { describe('MCP_CONSTANTS', () => {
it.concurrent('has correct execution timeout (10 minutes)', () => { it.concurrent('has correct execution timeout (5 minutes)', () => {
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(600000) expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(300000)
}) })
it.concurrent('has correct cache timeout (5 minutes)', () => { it.concurrent('has correct cache timeout (5 minutes)', () => {
@@ -107,8 +107,8 @@ describe('MCP_CONSTANTS', () => {
}) })
describe('MCP_CLIENT_CONSTANTS', () => { describe('MCP_CLIENT_CONSTANTS', () => {
it.concurrent('has correct client timeout (10 minutes)', () => { it.concurrent('has correct client timeout (5 minutes)', () => {
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(600000) expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(300000)
}) })
it.concurrent('has correct auto refresh interval (5 minutes)', () => { it.concurrent('has correct auto refresh interval (5 minutes)', () => {

View File

@@ -1,12 +1,11 @@
import { NextResponse } from 'next/server' import { NextResponse } from 'next/server'
import { DEFAULT_EXECUTION_TIMEOUT_MS, getExecutionTimeout } from '@/lib/core/execution-limits'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
import type { McpApiResponse } from '@/lib/mcp/types' import type { McpApiResponse } from '@/lib/mcp/types'
import { isMcpTool, MCP } from '@/executor/constants' import { isMcpTool, MCP } from '@/executor/constants'
/**
* MCP-specific constants
*/
export const MCP_CONSTANTS = { export const MCP_CONSTANTS = {
EXECUTION_TIMEOUT: 600000, EXECUTION_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
CACHE_TIMEOUT: 5 * 60 * 1000, CACHE_TIMEOUT: 5 * 60 * 1000,
DEFAULT_RETRIES: 3, DEFAULT_RETRIES: 3,
DEFAULT_CONNECTION_TIMEOUT: 30000, DEFAULT_CONNECTION_TIMEOUT: 30000,
@@ -14,6 +13,10 @@ export const MCP_CONSTANTS = {
MAX_CONSECUTIVE_FAILURES: 3, MAX_CONSECUTIVE_FAILURES: 3,
} as const } as const
export function getMcpExecutionTimeout(plan?: SubscriptionPlan): number {
return getExecutionTimeout(plan, 'sync')
}
/** /**
* Core MCP tool parameter keys that are metadata, not user-entered test values. * Core MCP tool parameter keys that are metadata, not user-entered test values.
* These should be preserved when cleaning up params during schema updates. * These should be preserved when cleaning up params during schema updates.
@@ -45,11 +48,8 @@ export function sanitizeHeaders(
) )
} }
/**
* Client-safe MCP constants
*/
export const MCP_CLIENT_CONSTANTS = { export const MCP_CLIENT_CONSTANTS = {
CLIENT_TIMEOUT: 600000, CLIENT_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
AUTO_REFRESH_INTERVAL: 5 * 60 * 1000, AUTO_REFRESH_INTERVAL: 5 * 60 * 1000,
} as const } as const

View File

@@ -62,9 +62,6 @@ export interface ExecutionErrorEvent extends BaseExecutionEvent {
} }
} }
/**
* Execution cancelled event
*/
export interface ExecutionCancelledEvent extends BaseExecutionEvent { export interface ExecutionCancelledEvent extends BaseExecutionEvent {
type: 'execution:cancelled' type: 'execution:cancelled'
workflowId: string workflowId: string
@@ -171,9 +168,6 @@ export type ExecutionEvent =
| StreamChunkEvent | StreamChunkEvent
| StreamDoneEvent | StreamDoneEvent
/**
* Extracted data types for use in callbacks
*/
export type ExecutionStartedData = ExecutionStartedEvent['data'] export type ExecutionStartedData = ExecutionStartedEvent['data']
export type ExecutionCompletedData = ExecutionCompletedEvent['data'] export type ExecutionCompletedData = ExecutionCompletedEvent['data']
export type ExecutionErrorData = ExecutionErrorEvent['data'] export type ExecutionErrorData = ExecutionErrorEvent['data']

View File

@@ -1,8 +1,9 @@
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { RunActorParams, RunActorResult } from '@/tools/apify/types' import type { RunActorParams, RunActorResult } from '@/tools/apify/types'
import type { ToolConfig } from '@/tools/types' import type { ToolConfig } from '@/tools/types'
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
export const apifyRunActorAsyncTool: ToolConfig<RunActorParams, RunActorResult> = { export const apifyRunActorAsyncTool: ToolConfig<RunActorParams, RunActorResult> = {
id: 'apify_run_actor_async', id: 'apify_run_actor_async',

View File

@@ -1,11 +1,12 @@
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types' import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types'
import type { ToolConfig, ToolResponse } from '@/tools/types' import type { ToolConfig, ToolResponse } from '@/tools/types'
const logger = createLogger('BrowserUseTool') const logger = createLogger('BrowserUseTool')
const POLL_INTERVAL_MS = 5000 const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = 600000 // 10 minutes const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
const MAX_CONSECUTIVE_ERRORS = 3 const MAX_CONSECUTIVE_ERRORS = 3
async function createSessionWithProfile( async function createSessionWithProfile(

View File

@@ -1,11 +1,12 @@
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { ExaResearchParams, ExaResearchResponse } from '@/tools/exa/types' import type { ExaResearchParams, ExaResearchResponse } from '@/tools/exa/types'
import type { ToolConfig } from '@/tools/types' import type { ToolConfig } from '@/tools/types'
const logger = createLogger('ExaResearchTool') const logger = createLogger('ExaResearchTool')
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
export const researchTool: ToolConfig<ExaResearchParams, ExaResearchResponse> = { export const researchTool: ToolConfig<ExaResearchParams, ExaResearchResponse> = {
id: 'exa_research', id: 'exa_research',

View File

@@ -1,11 +1,12 @@
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { AgentParams, AgentResponse } from '@/tools/firecrawl/types' import type { AgentParams, AgentResponse } from '@/tools/firecrawl/types'
import type { ToolConfig } from '@/tools/types' import type { ToolConfig } from '@/tools/types'
const logger = createLogger('FirecrawlAgentTool') const logger = createLogger('FirecrawlAgentTool')
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
export const agentTool: ToolConfig<AgentParams, AgentResponse> = { export const agentTool: ToolConfig<AgentParams, AgentResponse> = {
id: 'firecrawl_agent', id: 'firecrawl_agent',

View File

@@ -1,12 +1,13 @@
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { FirecrawlCrawlParams, FirecrawlCrawlResponse } from '@/tools/firecrawl/types' import type { FirecrawlCrawlParams, FirecrawlCrawlResponse } from '@/tools/firecrawl/types'
import { CRAWLED_PAGE_OUTPUT_PROPERTIES } from '@/tools/firecrawl/types' import { CRAWLED_PAGE_OUTPUT_PROPERTIES } from '@/tools/firecrawl/types'
import type { ToolConfig } from '@/tools/types' import type { ToolConfig } from '@/tools/types'
const logger = createLogger('FirecrawlCrawlTool') const logger = createLogger('FirecrawlCrawlTool')
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
export const crawlTool: ToolConfig<FirecrawlCrawlParams, FirecrawlCrawlResponse> = { export const crawlTool: ToolConfig<FirecrawlCrawlParams, FirecrawlCrawlResponse> = {
id: 'firecrawl_crawl', id: 'firecrawl_crawl',

View File

@@ -1,11 +1,12 @@
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import type { ExtractParams, ExtractResponse } from '@/tools/firecrawl/types' import type { ExtractParams, ExtractResponse } from '@/tools/firecrawl/types'
import type { ToolConfig } from '@/tools/types' import type { ToolConfig } from '@/tools/types'
const logger = createLogger('FirecrawlExtractTool') const logger = createLogger('FirecrawlExtractTool')
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
export const extractTool: ToolConfig<ExtractParams, ExtractResponse> = { export const extractTool: ToolConfig<ExtractParams, ExtractResponse> = {
id: 'firecrawl_extract', id: 'firecrawl_extract',

View File

@@ -1,5 +1,6 @@
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { generateInternalToken } from '@/lib/auth/internal' import { generateInternalToken } from '@/lib/auth/internal'
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
import { secureFetchWithPinnedIP, validateUrlWithDNS } from '@/lib/core/security/input-validation' import { secureFetchWithPinnedIP, validateUrlWithDNS } from '@/lib/core/security/input-validation'
import { generateRequestId } from '@/lib/core/utils/request' import { generateRequestId } from '@/lib/core/utils/request'
import { getBaseUrl } from '@/lib/core/utils/urls' import { getBaseUrl } from '@/lib/core/utils/urls'
@@ -625,9 +626,8 @@ async function executeToolRequest(
let response: Response let response: Response
if (isInternalRoute) { if (isInternalRoute) {
// Set up AbortController for timeout support on internal routes
const controller = new AbortController() const controller = new AbortController()
const timeout = requestParams.timeout || 300000 const timeout = requestParams.timeout || DEFAULT_EXECUTION_TIMEOUT_MS
const timeoutId = setTimeout(() => controller.abort(), timeout) const timeoutId = setTimeout(() => controller.abort(), timeout)
try { try {

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
import { getBaseUrl } from '@/lib/core/utils/urls' import { getBaseUrl } from '@/lib/core/utils/urls'
import { AGENT, isCustomTool } from '@/executor/constants' import { AGENT, isCustomTool } from '@/executor/constants'
import { getCustomTool } from '@/hooks/queries/custom-tools' import { getCustomTool } from '@/hooks/queries/custom-tools'
@@ -123,9 +124,7 @@ export function formatRequestParams(tool: ToolConfig, params: Record<string, any
} }
} }
// Get timeout from params (if specified) and validate const MAX_TIMEOUT_MS = getMaxExecutionTimeout()
// Must be a finite positive number, max 600000ms (10 minutes) as documented
const MAX_TIMEOUT_MS = 600000
const rawTimeout = params.timeout const rawTimeout = params.timeout
const timeout = rawTimeout != null ? Number(rawTimeout) : undefined const timeout = rawTimeout != null ? Number(rawTimeout) : undefined
const validTimeout = const validTimeout =

View File

@@ -6,7 +6,7 @@ export default defineConfig({
project: env.TRIGGER_PROJECT_ID!, project: env.TRIGGER_PROJECT_ID!,
runtime: 'node', runtime: 'node',
logLevel: 'log', logLevel: 'log',
maxDuration: 600, maxDuration: 5400,
retries: { retries: {
enabledInDev: false, enabledInDev: false,
default: { default: {