Compare commits

..

3 Commits

Author SHA1 Message Date
Vikhyath Mondreti
9fc3684b3b consolidate 2026-02-02 17:31:37 -08:00
Vikhyath Mondreti
fd57927746 bugbot comment 2026-02-02 10:55:10 -08:00
Vikhyath Mondreti
a213ad98cf improvement(rooms): redis client closed should fail fast 2026-02-02 09:59:22 -08:00
31 changed files with 418 additions and 836 deletions

View File

@@ -8,7 +8,6 @@ import { verifyCronAuth } from '@/lib/auth/internal'
const logger = createLogger('CleanupStaleExecutions') const logger = createLogger('CleanupStaleExecutions')
const STALE_THRESHOLD_MINUTES = 30 const STALE_THRESHOLD_MINUTES = 30
const MAX_INT32 = 2_147_483_647
export async function GET(request: NextRequest) { export async function GET(request: NextRequest) {
try { try {
@@ -46,14 +45,13 @@ export async function GET(request: NextRequest) {
try { try {
const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime() const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime()
const staleDurationMinutes = Math.round(staleDurationMs / 60000) const staleDurationMinutes = Math.round(staleDurationMs / 60000)
const totalDurationMs = Math.min(staleDurationMs, MAX_INT32)
await db await db
.update(workflowExecutionLogs) .update(workflowExecutionLogs)
.set({ .set({
status: 'failed', status: 'failed',
endedAt: new Date(), endedAt: new Date(),
totalDurationMs, totalDurationMs: staleDurationMs,
executionData: sql`jsonb_set( executionData: sql`jsonb_set(
COALESCE(execution_data, '{}'::jsonb), COALESCE(execution_data, '{}'::jsonb),
ARRAY['error'], ARRAY['error'],

View File

@@ -284,7 +284,7 @@ async function handleToolsCall(
content: [ content: [
{ type: 'text', text: JSON.stringify(executeResult.output || executeResult, null, 2) }, { type: 'text', text: JSON.stringify(executeResult.output || executeResult, null, 2) },
], ],
isError: executeResult.success === false, isError: !executeResult.success,
} }
return NextResponse.json(createResponse(id, result)) return NextResponse.json(createResponse(id, result))

View File

@@ -20,7 +20,6 @@ import { z } from 'zod'
import { getEmailSubject, renderInvitationEmail } from '@/components/emails' import { getEmailSubject, renderInvitationEmail } from '@/components/emails'
import { getSession } from '@/lib/auth' import { getSession } from '@/lib/auth'
import { hasAccessControlAccess } from '@/lib/billing' import { hasAccessControlAccess } from '@/lib/billing'
import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage'
import { requireStripeClient } from '@/lib/billing/stripe-client' import { requireStripeClient } from '@/lib/billing/stripe-client'
import { getBaseUrl } from '@/lib/core/utils/urls' import { getBaseUrl } from '@/lib/core/utils/urls'
import { sendEmail } from '@/lib/messaging/email/mailer' import { sendEmail } from '@/lib/messaging/email/mailer'
@@ -502,18 +501,6 @@ export async function PUT(
} }
} }
if (status === 'accepted') {
try {
await syncUsageLimitsFromSubscription(session.user.id)
} catch (syncError) {
logger.error('Failed to sync usage limits after joining org', {
userId: session.user.id,
organizationId,
error: syncError,
})
}
}
logger.info(`Organization invitation ${status}`, { logger.info(`Organization invitation ${status}`, {
organizationId, organizationId,
invitationId, invitationId,

View File

@@ -5,7 +5,6 @@ import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server' import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod' import { z } from 'zod'
import { getSession } from '@/lib/auth' import { getSession } from '@/lib/auth'
import { hasActiveSubscription } from '@/lib/billing'
const logger = createLogger('SubscriptionTransferAPI') const logger = createLogger('SubscriptionTransferAPI')
@@ -89,14 +88,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
) )
} }
// Check if org already has an active subscription (prevent duplicates)
if (await hasActiveSubscription(organizationId)) {
return NextResponse.json(
{ error: 'Organization already has an active subscription' },
{ status: 409 }
)
}
await db await db
.update(subscription) .update(subscription)
.set({ referenceId: organizationId }) .set({ referenceId: organizationId })

View File

@@ -203,10 +203,6 @@ export const PATCH = withAdminAuthParams<RouteParams>(async (request, context) =
} }
updateData.billingBlocked = body.billingBlocked updateData.billingBlocked = body.billingBlocked
// Clear the reason when unblocking
if (body.billingBlocked === false) {
updateData.billingBlockedReason = null
}
updated.push('billingBlocked') updated.push('billingBlocked')
} }

View File

@@ -1,4 +1,6 @@
import { db, workflow as workflowTable } from '@sim/db'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server' import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid' import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod' import { z } from 'zod'
@@ -6,7 +8,6 @@ import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request' import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse' import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { markExecutionCancelled } from '@/lib/execution/cancellation' import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session' import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core' import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events' import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
@@ -74,31 +75,12 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const { startBlockId, sourceSnapshot, input } = validation.data const { startBlockId, sourceSnapshot, input } = validation.data
const executionId = uuidv4() const executionId = uuidv4()
// Run preprocessing checks (billing, rate limits, usage limits) const [workflowRecord] = await db
const preprocessResult = await preprocessExecution({ .select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId })
workflowId, .from(workflowTable)
userId, .where(eq(workflowTable.id, workflowId))
triggerType: 'manual', .limit(1)
executionId,
requestId,
checkRateLimit: false, // Manual executions don't rate limit
checkDeployment: false, // Run-from-block doesn't require deployment
})
if (!preprocessResult.success) {
const { error } = preprocessResult
logger.warn(`[${requestId}] Preprocessing failed for run-from-block`, {
workflowId,
error: error?.message,
statusCode: error?.statusCode,
})
return NextResponse.json(
{ error: error?.message || 'Execution blocked' },
{ status: error?.statusCode || 500 }
)
}
const workflowRecord = preprocessResult.workflowRecord
if (!workflowRecord?.workspaceId) { if (!workflowRecord?.workspaceId) {
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 }) return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
} }
@@ -110,7 +92,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflowId, workflowId,
startBlockId, startBlockId,
executedBlocksCount: sourceSnapshot.executedBlocks.length, executedBlocksCount: sourceSnapshot.executedBlocks.length,
billingActorUserId: preprocessResult.actorUserId,
}) })
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId) const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)

View File

@@ -1,7 +1,7 @@
import type React from 'react' import type React from 'react'
import { memo, useCallback, useEffect, useMemo, useRef, useState } from 'react' import { memo, useCallback, useEffect, useMemo, useRef, useState } from 'react'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { ChevronRight, Loader2, ServerIcon, WrenchIcon, XIcon } from 'lucide-react' import { Loader2, WrenchIcon, XIcon } from 'lucide-react'
import { useParams } from 'next/navigation' import { useParams } from 'next/navigation'
import { import {
Badge, Badge,
@@ -111,33 +111,18 @@ interface ToolInputProps {
* Represents a tool selected and configured in the workflow * Represents a tool selected and configured in the workflow
* *
* @remarks * @remarks
* Valid types include:
* - Standard block types (e.g., 'api', 'search', 'function')
* - 'custom-tool': User-defined tools with custom code
* - 'mcp': Individual MCP tool from a connected server
* - 'mcp-server': All tools from an MCP server (agent discovery mode).
* At execution time, this expands into individual tool definitions for
* all tools available on the server.
*
* For custom tools (new format), we only store: type, customToolId, usageControl, isExpanded. * For custom tools (new format), we only store: type, customToolId, usageControl, isExpanded.
* Everything else (title, schema, code) is loaded dynamically from the database. * Everything else (title, schema, code) is loaded dynamically from the database.
* Legacy custom tools with inline schema/code are still supported for backwards compatibility. * Legacy custom tools with inline schema/code are still supported for backwards compatibility.
*/ */
interface StoredTool { interface StoredTool {
/** /** Block type identifier */
* Block type identifier.
* 'mcp-server' enables server-level selection where all tools from
* the server are made available to the LLM at execution time.
*/
type: string type: string
/** Display title for the tool (optional for new custom tool format) */ /** Display title for the tool (optional for new custom tool format) */
title?: string title?: string
/** Direct tool ID for execution (optional for new custom tool format) */ /** Direct tool ID for execution (optional for new custom tool format) */
toolId?: string toolId?: string
/** /** Parameter values configured by the user (optional for new custom tool format) */
* Parameter values configured by the user.
* For 'mcp-server' type, includes: serverId, serverUrl, serverName, toolCount
*/
params?: Record<string, string> params?: Record<string, string>
/** Whether the tool details are expanded in UI */ /** Whether the tool details are expanded in UI */
isExpanded?: boolean isExpanded?: boolean
@@ -1022,7 +1007,6 @@ export const ToolInput = memo(function ToolInput({
const [draggedIndex, setDraggedIndex] = useState<number | null>(null) const [draggedIndex, setDraggedIndex] = useState<number | null>(null)
const [dragOverIndex, setDragOverIndex] = useState<number | null>(null) const [dragOverIndex, setDragOverIndex] = useState<number | null>(null)
const [usageControlPopoverIndex, setUsageControlPopoverIndex] = useState<number | null>(null) const [usageControlPopoverIndex, setUsageControlPopoverIndex] = useState<number | null>(null)
const [expandedMcpServers, setExpandedMcpServers] = useState<Set<string>>(new Set())
const value = isPreview ? previewValue : storeValue const value = isPreview ? previewValue : storeValue
@@ -1252,18 +1236,6 @@ export const ToolInput = memo(function ToolInput({
return selectedTools.some((tool) => tool.type === 'mcp' && tool.toolId === mcpToolId) return selectedTools.some((tool) => tool.type === 'mcp' && tool.toolId === mcpToolId)
} }
/**
* Checks if an MCP server is already selected (all tools mode).
*
* @param serverId - The MCP server identifier to check
* @returns `true` if the MCP server is already selected
*/
const isMcpServerAlreadySelected = (serverId: string): boolean => {
return selectedTools.some(
(tool) => tool.type === 'mcp-server' && tool.params?.serverId === serverId
)
}
/** /**
* Checks if a custom tool is already selected. * Checks if a custom tool is already selected.
* *
@@ -1288,37 +1260,6 @@ export const ToolInput = memo(function ToolInput({
) )
} }
/**
* Groups MCP tools by their parent server.
*
* @returns Map of serverId to array of tools
*/
const mcpToolsByServer = useMemo(() => {
const grouped = new Map<string, typeof availableMcpTools>()
for (const tool of availableMcpTools) {
if (!grouped.has(tool.serverId)) {
grouped.set(tool.serverId, [])
}
grouped.get(tool.serverId)!.push(tool)
}
return grouped
}, [availableMcpTools])
/**
* Toggles the expanded state of an MCP server in the dropdown.
*/
const toggleMcpServerExpanded = useCallback((serverId: string) => {
setExpandedMcpServers((prev) => {
const next = new Set(prev)
if (next.has(serverId)) {
next.delete(serverId)
} else {
next.add(serverId)
}
return next
})
}, [])
/** /**
* Checks if a block supports multiple operations. * Checks if a block supports multiple operations.
* *
@@ -1864,125 +1805,41 @@ export const ToolInput = memo(function ToolInput({
}) })
} }
// MCP Servers section - grouped by server with expandable folders // MCP Tools section
if (!permissionConfig.disableMcpTools && mcpToolsByServer.size > 0) { if (!permissionConfig.disableMcpTools && availableMcpTools.length > 0) {
// Create items for each server (as expandable folders)
const serverItems: ComboboxOption[] = []
for (const [serverId, tools] of mcpToolsByServer) {
const server = mcpServers.find((s) => s.id === serverId)
const serverName = tools[0]?.serverName || server?.name || 'Unknown Server'
const isExpanded = expandedMcpServers.has(serverId)
const serverAlreadySelected = isMcpServerAlreadySelected(serverId)
const toolCount = tools.length
// Server folder header (clickable to expand/collapse)
serverItems.push({
label: serverName,
value: `mcp-server-folder-${serverId}`,
iconElement: (
<div className='flex items-center gap-[4px]'>
<ChevronRight
className={cn(
'h-[12px] w-[12px] text-[var(--text-tertiary)] transition-transform',
isExpanded && 'rotate-90'
)}
/>
<div
className='flex h-[16px] w-[16px] flex-shrink-0 items-center justify-center rounded-[4px]'
style={{ background: '#6366F1' }}
>
<ServerIcon className='h-[10px] w-[10px] text-white' />
</div>
</div>
),
onSelect: () => {
toggleMcpServerExpanded(serverId)
},
disabled: false,
keepOpen: true, // Keep dropdown open when toggling folder expansion
})
// If expanded, show "Use all tools" option and individual tools
if (isExpanded) {
// "Use all tools from server" option
serverItems.push({
label: `Use all ${toolCount} tools`,
value: `mcp-server-all-${serverId}`,
iconElement: (
<div className='ml-[20px] flex h-[16px] w-[16px] flex-shrink-0 items-center justify-center rounded-[4px] bg-[#6366F1]'>
<McpIcon className='h-[10px] w-[10px] text-white' />
</div>
),
onSelect: () => {
if (serverAlreadySelected) return
// Remove any individual tools from this server that were previously selected
const filteredTools = selectedTools.filter(
(tool) => !(tool.type === 'mcp' && tool.params?.serverId === serverId)
)
const newTool: StoredTool = {
type: 'mcp-server',
title: `${serverName} (all tools)`,
toolId: `mcp-server-${serverId}`,
params: {
serverId,
...(server?.url && { serverUrl: server.url }),
serverName,
toolCount: String(toolCount),
},
isExpanded: false,
usageControl: 'auto',
}
setStoreValue([
...filteredTools.map((tool) => ({ ...tool, isExpanded: false })),
newTool,
])
setOpen(false)
},
disabled: isPreview || disabled || serverAlreadySelected,
})
// Individual tools from this server
for (const mcpTool of tools) {
const alreadySelected = isMcpToolAlreadySelected(mcpTool.id) || serverAlreadySelected
serverItems.push({
label: mcpTool.name,
value: `mcp-${mcpTool.id}`,
iconElement: (
<div className='ml-[20px]'>
{createToolIcon(mcpTool.bgColor || '#6366F1', mcpTool.icon || McpIcon)}
</div>
),
onSelect: () => {
if (alreadySelected) return
const newTool: StoredTool = {
type: 'mcp',
title: mcpTool.name,
toolId: mcpTool.id,
params: {
serverId: mcpTool.serverId,
...(server?.url && { serverUrl: server.url }),
toolName: mcpTool.name,
serverName: mcpTool.serverName,
},
isExpanded: true,
usageControl: 'auto',
schema: {
...mcpTool.inputSchema,
description: mcpTool.description,
},
}
handleMcpToolSelect(newTool, true)
},
disabled: isPreview || disabled || alreadySelected,
})
}
}
}
groups.push({ groups.push({
section: 'MCP Servers', section: 'MCP Tools',
items: serverItems, items: availableMcpTools.map((mcpTool) => {
const server = mcpServers.find((s) => s.id === mcpTool.serverId)
const alreadySelected = isMcpToolAlreadySelected(mcpTool.id)
return {
label: mcpTool.name,
value: `mcp-${mcpTool.id}`,
iconElement: createToolIcon(mcpTool.bgColor || '#6366F1', mcpTool.icon || McpIcon),
onSelect: () => {
if (alreadySelected) return
const newTool: StoredTool = {
type: 'mcp',
title: mcpTool.name,
toolId: mcpTool.id,
params: {
serverId: mcpTool.serverId,
...(server?.url && { serverUrl: server.url }),
toolName: mcpTool.name,
serverName: mcpTool.serverName,
},
isExpanded: true,
usageControl: 'auto',
schema: {
...mcpTool.inputSchema,
description: mcpTool.description,
},
}
handleMcpToolSelect(newTool, true)
},
disabled: isPreview || disabled || alreadySelected,
}
}),
}) })
} }
@@ -2065,8 +1922,6 @@ export const ToolInput = memo(function ToolInput({
customTools, customTools,
availableMcpTools, availableMcpTools,
mcpServers, mcpServers,
mcpToolsByServer,
expandedMcpServers,
toolBlocks, toolBlocks,
isPreview, isPreview,
disabled, disabled,
@@ -2080,10 +1935,8 @@ export const ToolInput = memo(function ToolInput({
getToolIdForOperation, getToolIdForOperation,
isToolAlreadySelected, isToolAlreadySelected,
isMcpToolAlreadySelected, isMcpToolAlreadySelected,
isMcpServerAlreadySelected,
isCustomToolAlreadySelected, isCustomToolAlreadySelected,
isWorkflowAlreadySelected, isWorkflowAlreadySelected,
toggleMcpServerExpanded,
]) ])
const toolRequiresOAuth = (toolId: string): boolean => { const toolRequiresOAuth = (toolId: string): boolean => {
@@ -2510,25 +2363,24 @@ export const ToolInput = memo(function ToolInput({
{/* Selected Tools List */} {/* Selected Tools List */}
{selectedTools.length > 0 && {selectedTools.length > 0 &&
selectedTools.map((tool, toolIndex) => { selectedTools.map((tool, toolIndex) => {
// Handle custom tools, MCP tools, MCP servers, and workflow tools differently // Handle custom tools, MCP tools, and workflow tools differently
const isCustomTool = tool.type === 'custom-tool' const isCustomTool = tool.type === 'custom-tool'
const isMcpTool = tool.type === 'mcp' const isMcpTool = tool.type === 'mcp'
const isMcpServer = tool.type === 'mcp-server'
const isWorkflowTool = tool.type === 'workflow' const isWorkflowTool = tool.type === 'workflow'
const toolBlock = const toolBlock =
!isCustomTool && !isMcpTool && !isMcpServer !isCustomTool && !isMcpTool
? toolBlocks.find((block) => block.type === tool.type) ? toolBlocks.find((block) => block.type === tool.type)
: null : null
// Get the current tool ID (may change based on operation) // Get the current tool ID (may change based on operation)
const currentToolId = const currentToolId =
!isCustomTool && !isMcpTool && !isMcpServer !isCustomTool && !isMcpTool
? getToolIdForOperation(tool.type, tool.operation) || tool.toolId || '' ? getToolIdForOperation(tool.type, tool.operation) || tool.toolId || ''
: tool.toolId || '' : tool.toolId || ''
// Get tool parameters using the new utility with block type for UI components // Get tool parameters using the new utility with block type for UI components
const toolParams = const toolParams =
!isCustomTool && !isMcpTool && !isMcpServer && currentToolId !isCustomTool && !isMcpTool && currentToolId
? getToolParametersConfig(currentToolId, tool.type, { ? getToolParametersConfig(currentToolId, tool.type, {
operation: tool.operation, operation: tool.operation,
...tool.params, ...tool.params,
@@ -2597,32 +2449,21 @@ export const ToolInput = memo(function ToolInput({
? customToolParams ? customToolParams
: isMcpTool : isMcpTool
? mcpToolParams ? mcpToolParams
: isMcpServer : toolParams?.userInputParameters || []
? [] // MCP servers have no user-configurable params
: toolParams?.userInputParameters || []
// Check if tool requires OAuth // Check if tool requires OAuth
const requiresOAuth = const requiresOAuth =
!isCustomTool && !isCustomTool && !isMcpTool && currentToolId && toolRequiresOAuth(currentToolId)
!isMcpTool &&
!isMcpServer &&
currentToolId &&
toolRequiresOAuth(currentToolId)
const oauthConfig = const oauthConfig =
!isCustomTool && !isMcpTool && !isMcpServer && currentToolId !isCustomTool && !isMcpTool && currentToolId ? getToolOAuthConfig(currentToolId) : null
? getToolOAuthConfig(currentToolId)
: null
// Determine if tool has expandable body content // Determine if tool has expandable body content
const hasOperations = const hasOperations = !isCustomTool && !isMcpTool && hasMultipleOperations(tool.type)
!isCustomTool && !isMcpTool && !isMcpServer && hasMultipleOperations(tool.type)
const filteredDisplayParams = displayParams.filter((param) => const filteredDisplayParams = displayParams.filter((param) =>
evaluateParameterCondition(param, tool) evaluateParameterCondition(param, tool)
) )
// MCP servers are expandable to show tool list const hasToolBody =
const hasToolBody = isMcpServer hasOperations || (requiresOAuth && oauthConfig) || filteredDisplayParams.length > 0
? true
: hasOperations || (requiresOAuth && oauthConfig) || filteredDisplayParams.length > 0
// Only show expansion if tool has body content // Only show expansion if tool has body content
const isExpandedForDisplay = hasToolBody const isExpandedForDisplay = hasToolBody
@@ -2631,11 +2472,6 @@ export const ToolInput = memo(function ToolInput({
: !!tool.isExpanded : !!tool.isExpanded
: false : false
// For MCP servers, get the list of tools for display
const mcpServerTools = isMcpServer
? availableMcpTools.filter((t) => t.serverId === tool.params?.serverId)
: []
return ( return (
<div <div
key={`${tool.customToolId || tool.toolId || toolIndex}-${toolIndex}`} key={`${tool.customToolId || tool.toolId || toolIndex}-${toolIndex}`}
@@ -2672,7 +2508,7 @@ export const ToolInput = memo(function ToolInput({
style={{ style={{
backgroundColor: isCustomTool backgroundColor: isCustomTool
? '#3B82F6' ? '#3B82F6'
: isMcpTool || isMcpServer : isMcpTool
? mcpTool?.bgColor || '#6366F1' ? mcpTool?.bgColor || '#6366F1'
: isWorkflowTool : isWorkflowTool
? '#6366F1' ? '#6366F1'
@@ -2683,8 +2519,6 @@ export const ToolInput = memo(function ToolInput({
<WrenchIcon className='h-[10px] w-[10px] text-white' /> <WrenchIcon className='h-[10px] w-[10px] text-white' />
) : isMcpTool ? ( ) : isMcpTool ? (
<IconComponent icon={McpIcon} className='h-[10px] w-[10px] text-white' /> <IconComponent icon={McpIcon} className='h-[10px] w-[10px] text-white' />
) : isMcpServer ? (
<ServerIcon className='h-[10px] w-[10px] text-white' />
) : isWorkflowTool ? ( ) : isWorkflowTool ? (
<IconComponent icon={WorkflowIcon} className='h-[10px] w-[10px] text-white' /> <IconComponent icon={WorkflowIcon} className='h-[10px] w-[10px] text-white' />
) : ( ) : (
@@ -2697,11 +2531,6 @@ export const ToolInput = memo(function ToolInput({
<span className='truncate font-medium text-[13px] text-[var(--text-primary)]'> <span className='truncate font-medium text-[13px] text-[var(--text-primary)]'>
{isCustomTool ? customToolTitle : tool.title} {isCustomTool ? customToolTitle : tool.title}
</span> </span>
{isMcpServer && (
<Badge variant='default' size='sm'>
{tool.params?.toolCount || mcpServerTools.length} tools
</Badge>
)}
{isMcpTool && {isMcpTool &&
!mcpDataLoading && !mcpDataLoading &&
(() => { (() => {
@@ -2807,53 +2636,31 @@ export const ToolInput = memo(function ToolInput({
{!isCustomTool && isExpandedForDisplay && ( {!isCustomTool && isExpandedForDisplay && (
<div className='flex flex-col gap-[10px] overflow-visible rounded-b-[4px] border-[var(--border-1)] border-t px-[8px] py-[8px]'> <div className='flex flex-col gap-[10px] overflow-visible rounded-b-[4px] border-[var(--border-1)] border-t px-[8px] py-[8px]'>
{/* MCP Server tool list (read-only) */}
{isMcpServer && mcpServerTools.length > 0 && (
<div className='flex flex-col gap-[4px]'>
<div className='font-medium text-[12px] text-[var(--text-tertiary)]'>
Available tools:
</div>
<div className='flex flex-wrap gap-[4px]'>
{mcpServerTools.map((serverTool) => (
<Badge
key={serverTool.id}
variant='outline'
size='sm'
className='text-[11px]'
>
{serverTool.name}
</Badge>
))}
</div>
</div>
)}
{/* Operation dropdown for tools with multiple operations */} {/* Operation dropdown for tools with multiple operations */}
{!isMcpServer && {(() => {
(() => { const hasOperations = hasMultipleOperations(tool.type)
const hasOperations = hasMultipleOperations(tool.type) const operationOptions = hasOperations ? getOperationOptions(tool.type) : []
const operationOptions = hasOperations ? getOperationOptions(tool.type) : []
return hasOperations && operationOptions.length > 0 ? ( return hasOperations && operationOptions.length > 0 ? (
<div className='relative space-y-[6px]'> <div className='relative space-y-[6px]'>
<div className='font-medium text-[13px] text-[var(--text-primary)]'> <div className='font-medium text-[13px] text-[var(--text-primary)]'>
Operation Operation
</div>
<Combobox
options={operationOptions
.filter((option) => option.id !== '')
.map((option) => ({
label: option.label,
value: option.id,
}))}
value={tool.operation || operationOptions[0].id}
onChange={(value) => handleOperationChange(toolIndex, value)}
placeholder='Select operation'
disabled={disabled}
/>
</div> </div>
) : null <Combobox
})()} options={operationOptions
.filter((option) => option.id !== '')
.map((option) => ({
label: option.label,
value: option.id,
}))}
value={tool.operation || operationOptions[0].id}
onChange={(value) => handleOperationChange(toolIndex, value)}
placeholder='Select operation'
disabled={disabled}
/>
</div>
) : null
})()}
{/* OAuth credential selector if required */} {/* OAuth credential selector if required */}
{requiresOAuth && oauthConfig && ( {requiresOAuth && oauthConfig && (

View File

@@ -50,12 +50,6 @@ import { useSubBlockStore } from '@/stores/workflows/subblock/store'
/** Stable empty object to avoid creating new references */ /** Stable empty object to avoid creating new references */
const EMPTY_SUBBLOCK_VALUES = {} as Record<string, any> const EMPTY_SUBBLOCK_VALUES = {} as Record<string, any>
/** Shared style for dashed divider lines */
const DASHED_DIVIDER_STYLE = {
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
} as const
/** /**
* Icon component for rendering block icons. * Icon component for rendering block icons.
* *
@@ -95,23 +89,31 @@ export function Editor() {
const blockConfig = currentBlock ? getBlock(currentBlock.type) : null const blockConfig = currentBlock ? getBlock(currentBlock.type) : null
const title = currentBlock?.name || 'Editor' const title = currentBlock?.name || 'Editor'
// Check if selected block is a subflow (loop or parallel)
const isSubflow = const isSubflow =
currentBlock && (currentBlock.type === 'loop' || currentBlock.type === 'parallel') currentBlock && (currentBlock.type === 'loop' || currentBlock.type === 'parallel')
// Get subflow display properties from configs
const subflowConfig = isSubflow ? (currentBlock.type === 'loop' ? LoopTool : ParallelTool) : null const subflowConfig = isSubflow ? (currentBlock.type === 'loop' ? LoopTool : ParallelTool) : null
// Check if selected block is a workflow block
const isWorkflowBlock = const isWorkflowBlock =
currentBlock && (currentBlock.type === 'workflow' || currentBlock.type === 'workflow_input') currentBlock && (currentBlock.type === 'workflow' || currentBlock.type === 'workflow_input')
// Get workspace ID from params
const params = useParams() const params = useParams()
const workspaceId = params.workspaceId as string const workspaceId = params.workspaceId as string
// Refs for resize functionality
const subBlocksRef = useRef<HTMLDivElement>(null) const subBlocksRef = useRef<HTMLDivElement>(null)
// Get user permissions
const userPermissions = useUserPermissionsContext() const userPermissions = useUserPermissionsContext()
// Get active workflow ID
const activeWorkflowId = useWorkflowRegistry((state) => state.activeWorkflowId) const activeWorkflowId = useWorkflowRegistry((state) => state.activeWorkflowId)
// Get block properties (advanced/trigger modes)
const { advancedMode, triggerMode } = useEditorBlockProperties( const { advancedMode, triggerMode } = useEditorBlockProperties(
currentBlockId, currentBlockId,
currentWorkflow.isSnapshotView currentWorkflow.isSnapshotView
@@ -143,9 +145,10 @@ export function Editor() {
[subBlocksForCanonical] [subBlocksForCanonical]
) )
const canonicalModeOverrides = currentBlock?.data?.canonicalModes const canonicalModeOverrides = currentBlock?.data?.canonicalModes
const advancedValuesPresent = useMemo( const advancedValuesPresent = hasAdvancedValues(
() => hasAdvancedValues(subBlocksForCanonical, blockSubBlockValues, canonicalIndex), subBlocksForCanonical,
[subBlocksForCanonical, blockSubBlockValues, canonicalIndex] blockSubBlockValues,
canonicalIndex
) )
const displayAdvancedOptions = userPermissions.canEdit const displayAdvancedOptions = userPermissions.canEdit
? advancedMode ? advancedMode
@@ -153,9 +156,11 @@ export function Editor() {
const hasAdvancedOnlyFields = useMemo(() => { const hasAdvancedOnlyFields = useMemo(() => {
for (const subBlock of subBlocksForCanonical) { for (const subBlock of subBlocksForCanonical) {
// Must be standalone advanced (mode: 'advanced' without canonicalParamId)
if (subBlock.mode !== 'advanced') continue if (subBlock.mode !== 'advanced') continue
if (canonicalIndex.canonicalIdBySubBlockId[subBlock.id]) continue if (canonicalIndex.canonicalIdBySubBlockId[subBlock.id]) continue
// Check condition - skip if condition not met for current values
if ( if (
subBlock.condition && subBlock.condition &&
!evaluateSubBlockCondition(subBlock.condition, blockSubBlockValues) !evaluateSubBlockCondition(subBlock.condition, blockSubBlockValues)
@@ -168,6 +173,7 @@ export function Editor() {
return false return false
}, [subBlocksForCanonical, canonicalIndex.canonicalIdBySubBlockId, blockSubBlockValues]) }, [subBlocksForCanonical, canonicalIndex.canonicalIdBySubBlockId, blockSubBlockValues])
// Get subblock layout using custom hook
const { subBlocks, stateToUse: subBlockState } = useEditorSubblockLayout( const { subBlocks, stateToUse: subBlockState } = useEditorSubblockLayout(
blockConfig || ({} as any), blockConfig || ({} as any),
currentBlockId || '', currentBlockId || '',
@@ -200,34 +206,31 @@ export function Editor() {
return { regularSubBlocks: regular, advancedOnlySubBlocks: advancedOnly } return { regularSubBlocks: regular, advancedOnlySubBlocks: advancedOnly }
}, [subBlocks, canonicalIndex.canonicalIdBySubBlockId]) }, [subBlocks, canonicalIndex.canonicalIdBySubBlockId])
// Get block connections
const { incomingConnections, hasIncomingConnections } = useBlockConnections(currentBlockId || '') const { incomingConnections, hasIncomingConnections } = useBlockConnections(currentBlockId || '')
// Connections resize hook
const { handleMouseDown: handleConnectionsResizeMouseDown, isResizing } = useConnectionsResize({ const { handleMouseDown: handleConnectionsResizeMouseDown, isResizing } = useConnectionsResize({
subBlocksRef, subBlocksRef,
}) })
// Collaborative actions
const { const {
collaborativeSetBlockCanonicalMode, collaborativeSetBlockCanonicalMode,
collaborativeUpdateBlockName, collaborativeUpdateBlockName,
collaborativeToggleBlockAdvancedMode, collaborativeToggleBlockAdvancedMode,
} = useCollaborativeWorkflow() } = useCollaborativeWorkflow()
// Advanced mode toggle handler
const handleToggleAdvancedMode = useCallback(() => { const handleToggleAdvancedMode = useCallback(() => {
if (!currentBlockId || !userPermissions.canEdit) return if (!currentBlockId || !userPermissions.canEdit) return
collaborativeToggleBlockAdvancedMode(currentBlockId) collaborativeToggleBlockAdvancedMode(currentBlockId)
}, [currentBlockId, userPermissions.canEdit, collaborativeToggleBlockAdvancedMode]) }, [currentBlockId, userPermissions.canEdit, collaborativeToggleBlockAdvancedMode])
// Rename state
const [isRenaming, setIsRenaming] = useState(false) const [isRenaming, setIsRenaming] = useState(false)
const [editedName, setEditedName] = useState('') const [editedName, setEditedName] = useState('')
const nameInputRef = useRef<HTMLInputElement>(null)
/**
* Ref callback that auto-selects the input text when mounted.
*/
const nameInputRefCallback = useCallback((element: HTMLInputElement | null) => {
if (element) {
element.select()
}
}, [])
/** /**
* Handles starting the rename process. * Handles starting the rename process.
@@ -248,6 +251,7 @@ export function Editor() {
if (trimmedName && trimmedName !== currentBlock?.name) { if (trimmedName && trimmedName !== currentBlock?.name) {
const result = collaborativeUpdateBlockName(currentBlockId, trimmedName) const result = collaborativeUpdateBlockName(currentBlockId, trimmedName)
if (!result.success) { if (!result.success) {
// Keep rename mode open on error so user can correct the name
return return
} }
} }
@@ -262,6 +266,14 @@ export function Editor() {
setEditedName('') setEditedName('')
}, []) }, [])
// Focus input when entering rename mode
useEffect(() => {
if (isRenaming && nameInputRef.current) {
nameInputRef.current.select()
}
}, [isRenaming])
// Trigger rename mode when signaled from context menu
useEffect(() => { useEffect(() => {
if (shouldFocusRename && currentBlock) { if (shouldFocusRename && currentBlock) {
handleStartRename() handleStartRename()
@@ -272,13 +284,17 @@ export function Editor() {
/** /**
* Handles opening documentation link in a new secure tab. * Handles opening documentation link in a new secure tab.
*/ */
const handleOpenDocs = useCallback(() => { const handleOpenDocs = () => {
const docsLink = isSubflow ? subflowConfig?.docsLink : blockConfig?.docsLink const docsLink = isSubflow ? subflowConfig?.docsLink : blockConfig?.docsLink
window.open(docsLink || 'https://docs.sim.ai/quick-reference', '_blank', 'noopener,noreferrer') if (docsLink) {
}, [isSubflow, subflowConfig?.docsLink, blockConfig?.docsLink]) window.open(docsLink, '_blank', 'noopener,noreferrer')
}
}
// Get child workflow ID for workflow blocks
const childWorkflowId = isWorkflowBlock ? blockSubBlockValues?.workflowId : null const childWorkflowId = isWorkflowBlock ? blockSubBlockValues?.workflowId : null
// Fetch child workflow state for preview (only for workflow blocks with a selected workflow)
const { data: childWorkflowState, isLoading: isLoadingChildWorkflow } = const { data: childWorkflowState, isLoading: isLoadingChildWorkflow } =
useWorkflowState(childWorkflowId) useWorkflowState(childWorkflowId)
@@ -291,6 +307,7 @@ export function Editor() {
} }
}, [childWorkflowId, workspaceId]) }, [childWorkflowId, workspaceId])
// Determine if connections are at minimum height (collapsed state)
const isConnectionsAtMinHeight = connectionsHeight <= 35 const isConnectionsAtMinHeight = connectionsHeight <= 35
return ( return (
@@ -311,7 +328,7 @@ export function Editor() {
)} )}
{isRenaming ? ( {isRenaming ? (
<input <input
ref={nameInputRefCallback} ref={nameInputRef}
type='text' type='text'
value={editedName} value={editedName}
onChange={(e) => setEditedName(e.target.value)} onChange={(e) => setEditedName(e.target.value)}
@@ -382,21 +399,23 @@ export function Editor() {
</Tooltip.Content> </Tooltip.Content>
</Tooltip.Root> </Tooltip.Root>
)} */} )} */}
<Tooltip.Root> {currentBlock && (isSubflow ? subflowConfig?.docsLink : blockConfig?.docsLink) && (
<Tooltip.Trigger asChild> <Tooltip.Root>
<Button <Tooltip.Trigger asChild>
variant='ghost' <Button
className='p-0' variant='ghost'
onClick={handleOpenDocs} className='p-0'
aria-label='Open documentation' onClick={handleOpenDocs}
> aria-label='Open documentation'
<BookOpen className='h-[14px] w-[14px]' /> >
</Button> <BookOpen className='h-[14px] w-[14px]' />
</Tooltip.Trigger> </Button>
<Tooltip.Content side='top'> </Tooltip.Trigger>
<p>Open docs</p> <Tooltip.Content side='top'>
</Tooltip.Content> <p>Open docs</p>
</Tooltip.Root> </Tooltip.Content>
</Tooltip.Root>
)}
</div> </div>
</div> </div>
@@ -476,7 +495,13 @@ export function Editor() {
</div> </div>
</div> </div>
<div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'> <div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'>
<div className='h-[1.25px]' style={DASHED_DIVIDER_STYLE} /> <div
className='h-[1.25px]'
style={{
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
}}
/>
</div> </div>
</> </>
)} )}
@@ -541,7 +566,13 @@ export function Editor() {
/> />
{showDivider && ( {showDivider && (
<div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'> <div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'>
<div className='h-[1.25px]' style={DASHED_DIVIDER_STYLE} /> <div
className='h-[1.25px]'
style={{
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
}}
/>
</div> </div>
)} )}
</div> </div>
@@ -550,7 +581,13 @@ export function Editor() {
{hasAdvancedOnlyFields && userPermissions.canEdit && ( {hasAdvancedOnlyFields && userPermissions.canEdit && (
<div className='flex items-center gap-[10px] px-[2px] pt-[14px] pb-[12px]'> <div className='flex items-center gap-[10px] px-[2px] pt-[14px] pb-[12px]'>
<div className='h-[1.25px] flex-1' style={DASHED_DIVIDER_STYLE} /> <div
className='h-[1.25px] flex-1'
style={{
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
}}
/>
<button <button
type='button' type='button'
onClick={handleToggleAdvancedMode} onClick={handleToggleAdvancedMode}
@@ -563,7 +600,13 @@ export function Editor() {
className={`h-[14px] w-[14px] transition-transform duration-200 ${displayAdvancedOptions ? 'rotate-180' : ''}`} className={`h-[14px] w-[14px] transition-transform duration-200 ${displayAdvancedOptions ? 'rotate-180' : ''}`}
/> />
</button> </button>
<div className='h-[1.25px] flex-1' style={DASHED_DIVIDER_STYLE} /> <div
className='h-[1.25px] flex-1'
style={{
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
}}
/>
</div> </div>
)} )}
@@ -587,7 +630,13 @@ export function Editor() {
/> />
{index < advancedOnlySubBlocks.length - 1 && ( {index < advancedOnlySubBlocks.length - 1 && (
<div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'> <div className='subblock-divider px-[2px] pt-[16px] pb-[13px]'>
<div className='h-[1.25px]' style={DASHED_DIVIDER_STYLE} /> <div
className='h-[1.25px]'
style={{
backgroundImage:
'repeating-linear-gradient(to right, var(--border) 0px, var(--border) 6px, transparent 6px, transparent 12px)',
}}
/>
</div> </div>
)} )}
</div> </div>

View File

@@ -14,6 +14,7 @@ import { createLogger } from '@sim/logger'
import { useParams } from 'next/navigation' import { useParams } from 'next/navigation'
import { io, type Socket } from 'socket.io-client' import { io, type Socket } from 'socket.io-client'
import { getEnv } from '@/lib/core/config/env' import { getEnv } from '@/lib/core/config/env'
import { useOperationQueueStore } from '@/stores/operation-queue/store'
const logger = createLogger('SocketContext') const logger = createLogger('SocketContext')
@@ -138,6 +139,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [authFailed, setAuthFailed] = useState(false) const [authFailed, setAuthFailed] = useState(false)
const initializedRef = useRef(false) const initializedRef = useRef(false)
const socketRef = useRef<Socket | null>(null) const socketRef = useRef<Socket | null>(null)
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)
const params = useParams() const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined const urlWorkflowId = params?.workflowId as string | undefined
@@ -341,9 +343,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}) })
}) })
socketInstance.on('join-workflow-error', ({ error }) => { socketInstance.on('join-workflow-error', ({ error, code }) => {
isRejoiningRef.current = false isRejoiningRef.current = false
logger.error('Failed to join workflow:', error) logger.error('Failed to join workflow:', { error, code })
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
triggerOfflineMode()
}
}) })
socketInstance.on('workflow-operation', (data) => { socketInstance.on('workflow-operation', (data) => {

View File

@@ -52,8 +52,6 @@ export type ComboboxOption = {
onSelect?: () => void onSelect?: () => void
/** Whether this option is disabled */ /** Whether this option is disabled */
disabled?: boolean disabled?: boolean
/** When true, keep the dropdown open after selecting this option */
keepOpen?: boolean
} }
/** /**
@@ -254,15 +252,13 @@ const Combobox = memo(
* Handles selection of an option * Handles selection of an option
*/ */
const handleSelect = useCallback( const handleSelect = useCallback(
(selectedValue: string, customOnSelect?: () => void, keepOpen?: boolean) => { (selectedValue: string, customOnSelect?: () => void) => {
// If option has custom onSelect, use it instead // If option has custom onSelect, use it instead
if (customOnSelect) { if (customOnSelect) {
customOnSelect() customOnSelect()
if (!keepOpen) { setOpen(false)
setOpen(false) setHighlightedIndex(-1)
setHighlightedIndex(-1) setSearchQuery('')
setSearchQuery('')
}
return return
} }
@@ -274,13 +270,11 @@ const Combobox = memo(
onMultiSelectChange(newValues) onMultiSelectChange(newValues)
} else { } else {
onChange?.(selectedValue) onChange?.(selectedValue)
if (!keepOpen) { setOpen(false)
setOpen(false) setHighlightedIndex(-1)
setHighlightedIndex(-1) setSearchQuery('')
setSearchQuery('') if (editable && inputRef.current) {
if (editable && inputRef.current) { inputRef.current.blur()
inputRef.current.blur()
}
} }
} }
}, },
@@ -349,7 +343,7 @@ const Combobox = memo(
e.preventDefault() e.preventDefault()
const selectedOption = filteredOptions[highlightedIndex] const selectedOption = filteredOptions[highlightedIndex]
if (selectedOption && !selectedOption.disabled) { if (selectedOption && !selectedOption.disabled) {
handleSelect(selectedOption.value, selectedOption.onSelect, selectedOption.keepOpen) handleSelect(selectedOption.value, selectedOption.onSelect)
} }
} else if (!editable) { } else if (!editable) {
e.preventDefault() e.preventDefault()
@@ -674,7 +668,7 @@ const Combobox = memo(
e.preventDefault() e.preventDefault()
e.stopPropagation() e.stopPropagation()
if (!option.disabled) { if (!option.disabled) {
handleSelect(option.value, option.onSelect, option.keepOpen) handleSelect(option.value, option.onSelect)
} }
}} }}
onMouseEnter={() => onMouseEnter={() =>
@@ -749,7 +743,7 @@ const Combobox = memo(
e.preventDefault() e.preventDefault()
e.stopPropagation() e.stopPropagation()
if (!option.disabled) { if (!option.disabled) {
handleSelect(option.value, option.onSelect, option.keepOpen) handleSelect(option.value, option.onSelect)
} }
}} }}
onMouseEnter={() => !option.disabled && setHighlightedIndex(index)} onMouseEnter={() => !option.disabled && setHighlightedIndex(index)}

View File

@@ -143,7 +143,7 @@ export class AgentBlockHandler implements BlockHandler {
private async validateToolPermissions(ctx: ExecutionContext, tools: ToolInput[]): Promise<void> { private async validateToolPermissions(ctx: ExecutionContext, tools: ToolInput[]): Promise<void> {
if (!Array.isArray(tools) || tools.length === 0) return if (!Array.isArray(tools) || tools.length === 0) return
const hasMcpTools = tools.some((t) => t.type === 'mcp' || t.type === 'mcp-server') const hasMcpTools = tools.some((t) => t.type === 'mcp')
const hasCustomTools = tools.some((t) => t.type === 'custom-tool') const hasCustomTools = tools.some((t) => t.type === 'custom-tool')
if (hasMcpTools) { if (hasMcpTools) {
@@ -161,7 +161,7 @@ export class AgentBlockHandler implements BlockHandler {
): Promise<ToolInput[]> { ): Promise<ToolInput[]> {
if (!Array.isArray(tools) || tools.length === 0) return tools if (!Array.isArray(tools) || tools.length === 0) return tools
const mcpTools = tools.filter((t) => t.type === 'mcp' || t.type === 'mcp-server') const mcpTools = tools.filter((t) => t.type === 'mcp')
if (mcpTools.length === 0) return tools if (mcpTools.length === 0) return tools
const serverIds = [...new Set(mcpTools.map((t) => t.params?.serverId).filter(Boolean))] const serverIds = [...new Set(mcpTools.map((t) => t.params?.serverId).filter(Boolean))]
@@ -195,7 +195,7 @@ export class AgentBlockHandler implements BlockHandler {
} }
return tools.filter((tool) => { return tools.filter((tool) => {
if (tool.type !== 'mcp' && tool.type !== 'mcp-server') return true if (tool.type !== 'mcp') return true
const serverId = tool.params?.serverId const serverId = tool.params?.serverId
if (!serverId) return false if (!serverId) return false
return availableServerIds.has(serverId) return availableServerIds.has(serverId)
@@ -211,14 +211,11 @@ export class AgentBlockHandler implements BlockHandler {
}) })
const mcpTools: ToolInput[] = [] const mcpTools: ToolInput[] = []
const mcpServers: ToolInput[] = []
const otherTools: ToolInput[] = [] const otherTools: ToolInput[] = []
for (const tool of filtered) { for (const tool of filtered) {
if (tool.type === 'mcp') { if (tool.type === 'mcp') {
mcpTools.push(tool) mcpTools.push(tool)
} else if (tool.type === 'mcp-server') {
mcpServers.push(tool)
} else { } else {
otherTools.push(tool) otherTools.push(tool)
} }
@@ -227,12 +224,7 @@ export class AgentBlockHandler implements BlockHandler {
const otherResults = await Promise.all( const otherResults = await Promise.all(
otherTools.map(async (tool) => { otherTools.map(async (tool) => {
try { try {
if ( if (tool.type && tool.type !== 'custom-tool' && tool.type !== 'mcp') {
tool.type &&
tool.type !== 'custom-tool' &&
tool.type !== 'mcp' &&
tool.type !== 'mcp-server'
) {
await validateBlockType(ctx.userId, tool.type, ctx) await validateBlockType(ctx.userId, tool.type, ctx)
} }
if (tool.type === 'custom-tool' && (tool.schema || tool.customToolId)) { if (tool.type === 'custom-tool' && (tool.schema || tool.customToolId)) {
@@ -248,133 +240,12 @@ export class AgentBlockHandler implements BlockHandler {
const mcpResults = await this.processMcpToolsBatched(ctx, mcpTools) const mcpResults = await this.processMcpToolsBatched(ctx, mcpTools)
// Process MCP servers (all tools from server mode) const allTools = [...otherResults, ...mcpResults]
const mcpServerResults = await this.processMcpServerSelections(ctx, mcpServers)
const allTools = [...otherResults, ...mcpResults, ...mcpServerResults]
return allTools.filter( return allTools.filter(
(tool): tool is NonNullable<typeof tool> => tool !== null && tool !== undefined (tool): tool is NonNullable<typeof tool> => tool !== null && tool !== undefined
) )
} }
/**
* Process MCP server selections by discovering and formatting all tools from each server.
* This enables "agent discovery" mode where the LLM can call any tool from the server.
*/
private async processMcpServerSelections(
ctx: ExecutionContext,
mcpServerSelections: ToolInput[]
): Promise<any[]> {
if (mcpServerSelections.length === 0) return []
const results: any[] = []
for (const serverSelection of mcpServerSelections) {
const serverId = serverSelection.params?.serverId
const serverName = serverSelection.params?.serverName
const usageControl = serverSelection.usageControl || 'auto'
if (!serverId) {
logger.error('MCP server selection missing serverId:', serverSelection)
continue
}
try {
// Discover all tools from this server
const discoveredTools = await this.discoverMcpToolsForServer(ctx, serverId)
// Create tool definitions for each discovered tool
for (const mcpTool of discoveredTools) {
const created = await this.createMcpToolFromDiscoveredServerTool(
ctx,
mcpTool,
serverId,
serverName || serverId,
usageControl
)
if (created) results.push(created)
}
logger.info(
`[AgentHandler] Expanded MCP server ${serverName} into ${discoveredTools.length} tools`
)
} catch (error) {
logger.error(`[AgentHandler] Failed to process MCP server selection:`, { serverId, error })
}
}
return results
}
/**
* Create an MCP tool from server discovery for the "all tools" mode.
*/
private async createMcpToolFromDiscoveredServerTool(
ctx: ExecutionContext,
mcpTool: any,
serverId: string,
serverName: string,
usageControl: string
): Promise<any> {
const toolName = mcpTool.name
const { filterSchemaForLLM } = await import('@/tools/params')
const filteredSchema = filterSchemaForLLM(
mcpTool.inputSchema || { type: 'object', properties: {} },
{}
)
const toolId = createMcpToolId(serverId, toolName)
return {
id: toolId,
name: toolName,
description: mcpTool.description || `MCP tool ${toolName} from ${serverName}`,
parameters: filteredSchema,
params: {},
usageControl,
executeFunction: async (callParams: Record<string, any>) => {
const headers = await buildAuthHeaders()
const execUrl = buildAPIUrl('/api/mcp/tools/execute')
const execResponse = await fetch(execUrl.toString(), {
method: 'POST',
headers,
body: stringifyJSON({
serverId,
toolName,
arguments: callParams,
workspaceId: ctx.workspaceId,
workflowId: ctx.workflowId,
toolSchema: mcpTool.inputSchema,
}),
})
if (!execResponse.ok) {
throw new Error(
`MCP tool execution failed: ${execResponse.status} ${execResponse.statusText}`
)
}
const result = await execResponse.json()
if (!result.success) {
throw new Error(result.error || 'MCP tool execution failed')
}
return {
success: true,
output: result.data.output || {},
metadata: {
source: 'mcp-server',
serverId,
serverName,
toolName,
},
}
},
}
}
private async createCustomTool(ctx: ExecutionContext, tool: ToolInput): Promise<any> { private async createCustomTool(ctx: ExecutionContext, tool: ToolInput): Promise<any> {
const userProvidedParams = tool.params || {} const userProvidedParams = tool.params || {}

View File

@@ -29,36 +29,11 @@ export interface AgentInputs {
verbosity?: string verbosity?: string
} }
/**
* Represents a tool input for the agent block.
*
* @remarks
* Valid types include:
* - Standard block types (e.g., 'api', 'search', 'function')
* - 'custom-tool': User-defined tools with custom code
* - 'mcp': Individual MCP tool from a connected server
* - 'mcp-server': All tools from an MCP server (agent discovery mode).
* At execution time, this is expanded into individual tool definitions
* for all tools available on the server. This enables dynamic capability
* discovery where the LLM can call any tool from the server.
*/
export interface ToolInput { export interface ToolInput {
/**
* Tool type identifier.
* 'mcp-server' enables server-level selection where all tools from
* the server are made available to the LLM at execution time.
*/
type?: string type?: string
schema?: any schema?: any
title?: string title?: string
code?: string code?: string
/**
* Tool parameters. For 'mcp-server' type, includes:
* - serverId: The MCP server ID
* - serverUrl: The server URL (optional)
* - serverName: Human-readable server name
* - toolCount: Number of tools available (for display)
*/
params?: Record<string, any> params?: Record<string, any>
timeout?: number timeout?: number
usageControl?: 'auto' | 'force' | 'none' usageControl?: 'auto' | 'force' | 'none'

View File

@@ -1,37 +1,20 @@
import { db } from '@sim/db' import { db } from '@sim/db'
import * as schema from '@sim/db/schema' import * as schema from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm' import { and, eq } from 'drizzle-orm'
import { hasActiveSubscription } from '@/lib/billing'
const logger = createLogger('BillingAuthorization')
/** /**
* Check if a user is authorized to manage billing for a given reference ID * Check if a user is authorized to manage billing for a given reference ID
* Reference ID can be either a user ID (individual subscription) or organization ID (team subscription) * Reference ID can be either a user ID (individual subscription) or organization ID (team subscription)
*
* This function also performs duplicate subscription validation for organizations:
* - Rejects if an organization already has an active subscription (prevents duplicates)
* - Personal subscriptions (referenceId === userId) skip this check to allow upgrades
*/ */
export async function authorizeSubscriptionReference( export async function authorizeSubscriptionReference(
userId: string, userId: string,
referenceId: string referenceId: string
): Promise<boolean> { ): Promise<boolean> {
// User can always manage their own subscriptions (Pro upgrades, etc.) // User can always manage their own subscriptions
if (referenceId === userId) { if (referenceId === userId) {
return true return true
} }
// For organizations: check for existing active subscriptions to prevent duplicates
if (await hasActiveSubscription(referenceId)) {
logger.warn('Blocking checkout - active subscription already exists for organization', {
userId,
referenceId,
})
return false
}
// Check if referenceId is an organizationId the user has admin rights to // Check if referenceId is an organizationId the user has admin rights to
const members = await db const members = await db
.select() .select()

View File

@@ -25,11 +25,9 @@ export function useSubscriptionUpgrade() {
} }
let currentSubscriptionId: string | undefined let currentSubscriptionId: string | undefined
let allSubscriptions: any[] = []
try { try {
const listResult = await client.subscription.list() const listResult = await client.subscription.list()
allSubscriptions = listResult.data || [] const activePersonalSub = listResult.data?.find(
const activePersonalSub = allSubscriptions.find(
(sub: any) => sub.status === 'active' && sub.referenceId === userId (sub: any) => sub.status === 'active' && sub.referenceId === userId
) )
currentSubscriptionId = activePersonalSub?.id currentSubscriptionId = activePersonalSub?.id
@@ -52,25 +50,6 @@ export function useSubscriptionUpgrade() {
) )
if (existingOrg) { if (existingOrg) {
// Check if this org already has an active team subscription
const existingTeamSub = allSubscriptions.find(
(sub: any) =>
sub.status === 'active' &&
sub.referenceId === existingOrg.id &&
(sub.plan === 'team' || sub.plan === 'enterprise')
)
if (existingTeamSub) {
logger.warn('Organization already has an active team subscription', {
userId,
organizationId: existingOrg.id,
existingSubscriptionId: existingTeamSub.id,
})
throw new Error(
'This organization already has an active team subscription. Please manage it from the billing settings.'
)
}
logger.info('Using existing organization for team plan upgrade', { logger.info('Using existing organization for team plan upgrade', {
userId, userId,
organizationId: existingOrg.id, organizationId: existingOrg.id,

View File

@@ -1,5 +1,5 @@
import { db } from '@sim/db' import { db } from '@sim/db'
import { member, organization, subscription } from '@sim/db/schema' import { member, subscription } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq, inArray } from 'drizzle-orm' import { and, eq, inArray } from 'drizzle-orm'
import { checkEnterprisePlan, checkProPlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils' import { checkEnterprisePlan, checkProPlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
@@ -26,22 +26,10 @@ export async function getHighestPrioritySubscription(userId: string) {
let orgSubs: typeof personalSubs = [] let orgSubs: typeof personalSubs = []
if (orgIds.length > 0) { if (orgIds.length > 0) {
// Verify orgs exist to filter out orphaned subscriptions orgSubs = await db
const existingOrgs = await db .select()
.select({ id: organization.id }) .from(subscription)
.from(organization) .where(and(inArray(subscription.referenceId, orgIds), eq(subscription.status, 'active')))
.where(inArray(organization.id, orgIds))
const validOrgIds = existingOrgs.map((o) => o.id)
if (validOrgIds.length > 0) {
orgSubs = await db
.select()
.from(subscription)
.where(
and(inArray(subscription.referenceId, validOrgIds), eq(subscription.status, 'active'))
)
}
} }
const allSubs = [...personalSubs, ...orgSubs] const allSubs = [...personalSubs, ...orgSubs]

View File

@@ -25,28 +25,6 @@ const logger = createLogger('SubscriptionCore')
export { getHighestPrioritySubscription } export { getHighestPrioritySubscription }
/**
* Check if a referenceId (user ID or org ID) has an active subscription
* Used for duplicate subscription prevention
*
* Fails closed: returns true on error to prevent duplicate creation
*/
export async function hasActiveSubscription(referenceId: string): Promise<boolean> {
try {
const [activeSub] = await db
.select({ id: subscription.id })
.from(subscription)
.where(and(eq(subscription.referenceId, referenceId), eq(subscription.status, 'active')))
.limit(1)
return !!activeSub
} catch (error) {
logger.error('Error checking active subscription', { error, referenceId })
// Fail closed: assume subscription exists to prevent duplicate creation
return true
}
}
/** /**
* Check if user is on Pro plan (direct or via organization) * Check if user is on Pro plan (direct or via organization)
*/ */

View File

@@ -11,7 +11,6 @@ export {
getHighestPrioritySubscription as getActiveSubscription, getHighestPrioritySubscription as getActiveSubscription,
getUserSubscriptionState as getSubscriptionState, getUserSubscriptionState as getSubscriptionState,
hasAccessControlAccess, hasAccessControlAccess,
hasActiveSubscription,
hasCredentialSetsAccess, hasCredentialSetsAccess,
hasSSOAccess, hasSSOAccess,
isEnterpriseOrgAdminOrOwner, isEnterpriseOrgAdminOrOwner,
@@ -33,11 +32,6 @@ export {
} from '@/lib/billing/core/usage' } from '@/lib/billing/core/usage'
export * from '@/lib/billing/credits/balance' export * from '@/lib/billing/credits/balance'
export * from '@/lib/billing/credits/purchase' export * from '@/lib/billing/credits/purchase'
export {
blockOrgMembers,
getOrgMemberIds,
unblockOrgMembers,
} from '@/lib/billing/organizations/membership'
export * from '@/lib/billing/subscriptions/utils' export * from '@/lib/billing/subscriptions/utils'
export { canEditUsageLimit as canEditLimit } from '@/lib/billing/subscriptions/utils' export { canEditUsageLimit as canEditLimit } from '@/lib/billing/subscriptions/utils'
export * from '@/lib/billing/types' export * from '@/lib/billing/types'

View File

@@ -8,7 +8,6 @@ import {
} from '@sim/db/schema' } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm' import { and, eq } from 'drizzle-orm'
import { hasActiveSubscription } from '@/lib/billing'
import { getPlanPricing } from '@/lib/billing/core/billing' import { getPlanPricing } from '@/lib/billing/core/billing'
import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage' import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage'
@@ -160,16 +159,6 @@ export async function ensureOrganizationForTeamSubscription(
if (existingMembership.length > 0) { if (existingMembership.length > 0) {
const membership = existingMembership[0] const membership = existingMembership[0]
if (membership.role === 'owner' || membership.role === 'admin') { if (membership.role === 'owner' || membership.role === 'admin') {
// Check if org already has an active subscription (prevent duplicates)
if (await hasActiveSubscription(membership.organizationId)) {
logger.error('Organization already has an active subscription', {
userId,
organizationId: membership.organizationId,
newSubscriptionId: subscription.id,
})
throw new Error('Organization already has an active subscription')
}
logger.info('User already owns/admins an org, using it', { logger.info('User already owns/admins an org, using it', {
userId, userId,
organizationId: membership.organizationId, organizationId: membership.organizationId,

View File

@@ -15,86 +15,13 @@ import {
userStats, userStats,
} from '@sim/db/schema' } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq, inArray, isNull, ne, or, sql } from 'drizzle-orm' import { and, eq, sql } from 'drizzle-orm'
import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage' import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage'
import { requireStripeClient } from '@/lib/billing/stripe-client' import { requireStripeClient } from '@/lib/billing/stripe-client'
import { validateSeatAvailability } from '@/lib/billing/validation/seat-management' import { validateSeatAvailability } from '@/lib/billing/validation/seat-management'
const logger = createLogger('OrganizationMembership') const logger = createLogger('OrganizationMembership')
export type BillingBlockReason = 'payment_failed' | 'dispute'
/**
* Get all member user IDs for an organization
*/
export async function getOrgMemberIds(organizationId: string): Promise<string[]> {
const members = await db
.select({ userId: member.userId })
.from(member)
.where(eq(member.organizationId, organizationId))
return members.map((m) => m.userId)
}
/**
* Block all members of an organization for billing reasons
* Returns the number of members actually blocked
*
* Reason priority: dispute > payment_failed
* A payment_failed block won't overwrite an existing dispute block
*/
export async function blockOrgMembers(
organizationId: string,
reason: BillingBlockReason
): Promise<number> {
const memberIds = await getOrgMemberIds(organizationId)
if (memberIds.length === 0) {
return 0
}
// Don't overwrite dispute blocks with payment_failed (dispute is higher priority)
const whereClause =
reason === 'payment_failed'
? and(
inArray(userStats.userId, memberIds),
or(ne(userStats.billingBlockedReason, 'dispute'), isNull(userStats.billingBlockedReason))
)
: inArray(userStats.userId, memberIds)
const result = await db
.update(userStats)
.set({ billingBlocked: true, billingBlockedReason: reason })
.where(whereClause)
.returning({ userId: userStats.userId })
return result.length
}
/**
* Unblock all members of an organization blocked for a specific reason
* Only unblocks members blocked for the specified reason (not other reasons)
* Returns the number of members actually unblocked
*/
export async function unblockOrgMembers(
organizationId: string,
reason: BillingBlockReason
): Promise<number> {
const memberIds = await getOrgMemberIds(organizationId)
if (memberIds.length === 0) {
return 0
}
const result = await db
.update(userStats)
.set({ billingBlocked: false, billingBlockedReason: null })
.where(and(inArray(userStats.userId, memberIds), eq(userStats.billingBlockedReason, reason)))
.returning({ userId: userStats.userId })
return result.length
}
export interface RestoreProResult { export interface RestoreProResult {
restored: boolean restored: boolean
usageRestored: boolean usageRestored: boolean

View File

@@ -1,9 +1,8 @@
import { db } from '@sim/db' import { db } from '@sim/db'
import { subscription, user, userStats } from '@sim/db/schema' import { member, subscription, user, userStats } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm' import { and, eq } from 'drizzle-orm'
import type Stripe from 'stripe' import type Stripe from 'stripe'
import { blockOrgMembers, unblockOrgMembers } from '@/lib/billing'
import { requireStripeClient } from '@/lib/billing/stripe-client' import { requireStripeClient } from '@/lib/billing/stripe-client'
const logger = createLogger('DisputeWebhooks') const logger = createLogger('DisputeWebhooks')
@@ -58,34 +57,36 @@ export async function handleChargeDispute(event: Stripe.Event): Promise<void> {
if (subs.length > 0) { if (subs.length > 0) {
const orgId = subs[0].referenceId const orgId = subs[0].referenceId
const memberCount = await blockOrgMembers(orgId, 'dispute')
if (memberCount > 0) { const owners = await db
logger.warn('Blocked all org members due to dispute', { .select({ userId: member.userId })
.from(member)
.where(and(eq(member.organizationId, orgId), eq(member.role, 'owner')))
.limit(1)
if (owners.length > 0) {
await db
.update(userStats)
.set({ billingBlocked: true, billingBlockedReason: 'dispute' })
.where(eq(userStats.userId, owners[0].userId))
logger.warn('Blocked org owner due to dispute', {
disputeId: dispute.id, disputeId: dispute.id,
ownerId: owners[0].userId,
organizationId: orgId, organizationId: orgId,
memberCount,
}) })
} }
} }
} }
/** /**
* Handles charge.dispute.closed - unblocks user if dispute was won or warning closed * Handles charge.dispute.closed - unblocks user if dispute was won
*
* Status meanings:
* - 'won': Merchant won, customer's chargeback denied → unblock
* - 'lost': Customer won, money refunded → stay blocked (they owe us)
* - 'warning_closed': Pre-dispute inquiry closed without chargeback → unblock (false alarm)
*/ */
export async function handleDisputeClosed(event: Stripe.Event): Promise<void> { export async function handleDisputeClosed(event: Stripe.Event): Promise<void> {
const dispute = event.data.object as Stripe.Dispute const dispute = event.data.object as Stripe.Dispute
// Only unblock if we won or the warning was closed without a full dispute if (dispute.status !== 'won') {
const shouldUnblock = dispute.status === 'won' || dispute.status === 'warning_closed' logger.info('Dispute not won, user remains blocked', {
if (!shouldUnblock) {
logger.info('Dispute resolved against us, user remains blocked', {
disputeId: dispute.id, disputeId: dispute.id,
status: dispute.status, status: dispute.status,
}) })
@@ -97,7 +98,7 @@ export async function handleDisputeClosed(event: Stripe.Event): Promise<void> {
return return
} }
// Find and unblock user (Pro plans) - only if blocked for dispute, not other reasons // Find and unblock user (Pro plans)
const users = await db const users = await db
.select({ id: user.id }) .select({ id: user.id })
.from(user) .from(user)
@@ -108,17 +109,16 @@ export async function handleDisputeClosed(event: Stripe.Event): Promise<void> {
await db await db
.update(userStats) .update(userStats)
.set({ billingBlocked: false, billingBlockedReason: null }) .set({ billingBlocked: false, billingBlockedReason: null })
.where(and(eq(userStats.userId, users[0].id), eq(userStats.billingBlockedReason, 'dispute'))) .where(eq(userStats.userId, users[0].id))
logger.info('Unblocked user after dispute resolved in our favor', { logger.info('Unblocked user after winning dispute', {
disputeId: dispute.id, disputeId: dispute.id,
userId: users[0].id, userId: users[0].id,
status: dispute.status,
}) })
return return
} }
// Find and unblock all org members (Team/Enterprise) - consistent with payment success // Find and unblock org owner (Team/Enterprise)
const subs = await db const subs = await db
.select({ referenceId: subscription.referenceId }) .select({ referenceId: subscription.referenceId })
.from(subscription) .from(subscription)
@@ -127,13 +127,24 @@ export async function handleDisputeClosed(event: Stripe.Event): Promise<void> {
if (subs.length > 0) { if (subs.length > 0) {
const orgId = subs[0].referenceId const orgId = subs[0].referenceId
const memberCount = await unblockOrgMembers(orgId, 'dispute')
logger.info('Unblocked all org members after dispute resolved in our favor', { const owners = await db
disputeId: dispute.id, .select({ userId: member.userId })
organizationId: orgId, .from(member)
memberCount, .where(and(eq(member.organizationId, orgId), eq(member.role, 'owner')))
status: dispute.status, .limit(1)
})
if (owners.length > 0) {
await db
.update(userStats)
.set({ billingBlocked: false, billingBlockedReason: null })
.where(eq(userStats.userId, owners[0].userId))
logger.info('Unblocked org owner after winning dispute', {
disputeId: dispute.id,
ownerId: owners[0].userId,
organizationId: orgId,
})
}
} }
} }

View File

@@ -8,13 +8,12 @@ import {
userStats, userStats,
} from '@sim/db/schema' } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq, inArray, isNull, ne, or } from 'drizzle-orm' import { and, eq, inArray } from 'drizzle-orm'
import type Stripe from 'stripe' import type Stripe from 'stripe'
import { getEmailSubject, PaymentFailedEmail, renderCreditPurchaseEmail } from '@/components/emails' import { getEmailSubject, PaymentFailedEmail, renderCreditPurchaseEmail } from '@/components/emails'
import { calculateSubscriptionOverage } from '@/lib/billing/core/billing' import { calculateSubscriptionOverage } from '@/lib/billing/core/billing'
import { addCredits, getCreditBalance, removeCredits } from '@/lib/billing/credits/balance' import { addCredits, getCreditBalance, removeCredits } from '@/lib/billing/credits/balance'
import { setUsageLimitForCredits } from '@/lib/billing/credits/purchase' import { setUsageLimitForCredits } from '@/lib/billing/credits/purchase'
import { blockOrgMembers, unblockOrgMembers } from '@/lib/billing/organizations/membership'
import { requireStripeClient } from '@/lib/billing/stripe-client' import { requireStripeClient } from '@/lib/billing/stripe-client'
import { getBaseUrl } from '@/lib/core/utils/urls' import { getBaseUrl } from '@/lib/core/utils/urls'
import { sendEmail } from '@/lib/messaging/email/mailer' import { sendEmail } from '@/lib/messaging/email/mailer'
@@ -503,7 +502,24 @@ export async function handleInvoicePaymentSucceeded(event: Stripe.Event) {
} }
if (sub.plan === 'team' || sub.plan === 'enterprise') { if (sub.plan === 'team' || sub.plan === 'enterprise') {
await unblockOrgMembers(sub.referenceId, 'payment_failed') const members = await db
.select({ userId: member.userId })
.from(member)
.where(eq(member.organizationId, sub.referenceId))
const memberIds = members.map((m) => m.userId)
if (memberIds.length > 0) {
// Only unblock users blocked for payment_failed, not disputes
await db
.update(userStats)
.set({ billingBlocked: false, billingBlockedReason: null })
.where(
and(
inArray(userStats.userId, memberIds),
eq(userStats.billingBlockedReason, 'payment_failed')
)
)
}
} else { } else {
// Only unblock users blocked for payment_failed, not disputes // Only unblock users blocked for payment_failed, not disputes
await db await db
@@ -600,26 +616,28 @@ export async function handleInvoicePaymentFailed(event: Stripe.Event) {
if (records.length > 0) { if (records.length > 0) {
const sub = records[0] const sub = records[0]
if (sub.plan === 'team' || sub.plan === 'enterprise') { if (sub.plan === 'team' || sub.plan === 'enterprise') {
const memberCount = await blockOrgMembers(sub.referenceId, 'payment_failed') const members = await db
.select({ userId: member.userId })
.from(member)
.where(eq(member.organizationId, sub.referenceId))
const memberIds = members.map((m) => m.userId)
if (memberIds.length > 0) {
await db
.update(userStats)
.set({ billingBlocked: true, billingBlockedReason: 'payment_failed' })
.where(inArray(userStats.userId, memberIds))
}
logger.info('Blocked team/enterprise members due to payment failure', { logger.info('Blocked team/enterprise members due to payment failure', {
organizationId: sub.referenceId, organizationId: sub.referenceId,
memberCount, memberCount: members.length,
isOverageInvoice, isOverageInvoice,
}) })
} else { } else {
// Don't overwrite dispute blocks (dispute > payment_failed priority)
await db await db
.update(userStats) .update(userStats)
.set({ billingBlocked: true, billingBlockedReason: 'payment_failed' }) .set({ billingBlocked: true, billingBlockedReason: 'payment_failed' })
.where( .where(eq(userStats.userId, sub.referenceId))
and(
eq(userStats.userId, sub.referenceId),
or(
ne(userStats.billingBlockedReason, 'dispute'),
isNull(userStats.billingBlockedReason)
)
)
)
logger.info('Blocked user due to payment failure', { logger.info('Blocked user due to payment failure', {
userId: sub.referenceId, userId: sub.referenceId,
isOverageInvoice, isOverageInvoice,

View File

@@ -3,7 +3,6 @@ import { member, organization, subscription } from '@sim/db/schema'
import { createLogger } from '@sim/logger' import { createLogger } from '@sim/logger'
import { and, eq, ne } from 'drizzle-orm' import { and, eq, ne } from 'drizzle-orm'
import { calculateSubscriptionOverage } from '@/lib/billing/core/billing' import { calculateSubscriptionOverage } from '@/lib/billing/core/billing'
import { hasActiveSubscription } from '@/lib/billing/core/subscription'
import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage' import { syncUsageLimitsFromSubscription } from '@/lib/billing/core/usage'
import { restoreUserProSubscription } from '@/lib/billing/organizations/membership' import { restoreUserProSubscription } from '@/lib/billing/organizations/membership'
import { requireStripeClient } from '@/lib/billing/stripe-client' import { requireStripeClient } from '@/lib/billing/stripe-client'
@@ -53,37 +52,14 @@ async function restoreMemberProSubscriptions(organizationId: string): Promise<nu
/** /**
* Cleanup organization when team/enterprise subscription is deleted. * Cleanup organization when team/enterprise subscription is deleted.
* - Checks if other active subscriptions point to this org (skip deletion if so)
* - Restores member Pro subscriptions * - Restores member Pro subscriptions
* - Deletes the organization (only if no other active subs) * - Deletes the organization
* - Syncs usage limits for former members (resets to free or Pro tier) * - Syncs usage limits for former members (resets to free or Pro tier)
*/ */
async function cleanupOrganizationSubscription(organizationId: string): Promise<{ async function cleanupOrganizationSubscription(organizationId: string): Promise<{
restoredProCount: number restoredProCount: number
membersSynced: number membersSynced: number
organizationDeleted: boolean
}> { }> {
// Check if other active subscriptions still point to this org
// Note: The subscription being deleted is already marked as 'canceled' by better-auth
// before this handler runs, so we only find truly active ones
if (await hasActiveSubscription(organizationId)) {
logger.info('Skipping organization deletion - other active subscriptions exist', {
organizationId,
})
// Still sync limits for members since this subscription was deleted
const memberUserIds = await db
.select({ userId: member.userId })
.from(member)
.where(eq(member.organizationId, organizationId))
for (const m of memberUserIds) {
await syncUsageLimitsFromSubscription(m.userId)
}
return { restoredProCount: 0, membersSynced: memberUserIds.length, organizationDeleted: false }
}
// Get member userIds before deletion (needed for limit syncing after org deletion) // Get member userIds before deletion (needed for limit syncing after org deletion)
const memberUserIds = await db const memberUserIds = await db
.select({ userId: member.userId }) .select({ userId: member.userId })
@@ -99,7 +75,7 @@ async function cleanupOrganizationSubscription(organizationId: string): Promise<
await syncUsageLimitsFromSubscription(m.userId) await syncUsageLimitsFromSubscription(m.userId)
} }
return { restoredProCount, membersSynced: memberUserIds.length, organizationDeleted: true } return { restoredProCount, membersSynced: memberUserIds.length }
} }
/** /**
@@ -196,14 +172,15 @@ export async function handleSubscriptionDeleted(subscription: {
referenceId: subscription.referenceId, referenceId: subscription.referenceId,
}) })
const { restoredProCount, membersSynced, organizationDeleted } = const { restoredProCount, membersSynced } = await cleanupOrganizationSubscription(
await cleanupOrganizationSubscription(subscription.referenceId) subscription.referenceId
)
logger.info('Successfully processed enterprise subscription cancellation', { logger.info('Successfully processed enterprise subscription cancellation', {
subscriptionId: subscription.id, subscriptionId: subscription.id,
stripeSubscriptionId, stripeSubscriptionId,
restoredProCount, restoredProCount,
organizationDeleted, organizationDeleted: true,
membersSynced, membersSynced,
}) })
return return
@@ -320,7 +297,7 @@ export async function handleSubscriptionDeleted(subscription: {
const cleanup = await cleanupOrganizationSubscription(subscription.referenceId) const cleanup = await cleanupOrganizationSubscription(subscription.referenceId)
restoredProCount = cleanup.restoredProCount restoredProCount = cleanup.restoredProCount
membersSynced = cleanup.membersSynced membersSynced = cleanup.membersSynced
organizationDeleted = cleanup.organizationDeleted organizationDeleted = true
} else if (subscription.plan === 'pro') { } else if (subscription.plan === 'pro') {
await syncUsageLimitsFromSubscription(subscription.referenceId) await syncUsageLimitsFromSubscription(subscription.referenceId)
membersSynced = 1 membersSynced = 1

View File

@@ -33,7 +33,6 @@ import type {
WorkflowExecutionSnapshot, WorkflowExecutionSnapshot,
WorkflowState, WorkflowState,
} from '@/lib/logs/types' } from '@/lib/logs/types'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
export interface ToolCall { export interface ToolCall {
name: string name: string
@@ -504,7 +503,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
} }
try { try {
// Get the workflow record to get workspace and fallback userId // Get the workflow record to get the userId
const [workflowRecord] = await db const [workflowRecord] = await db
.select() .select()
.from(workflow) .from(workflow)
@@ -516,12 +515,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
return return
} }
let billingUserId: string | null = null const userId = workflowRecord.userId
if (workflowRecord.workspaceId) {
billingUserId = await getWorkspaceBilledAccountUserId(workflowRecord.workspaceId)
}
const userId = billingUserId || workflowRecord.userId
const costToStore = costSummary.totalCost const costToStore = costSummary.totalCost
const existing = await db.select().from(userStats).where(eq(userStats.userId, userId)) const existing = await db.select().from(userStats).where(eq(userStats.userId, userId))

View File

@@ -12,39 +12,70 @@ import {
import { persistWorkflowOperation } from '@/socket/database/operations' import { persistWorkflowOperation } from '@/socket/database/operations'
import type { AuthenticatedSocket } from '@/socket/middleware/auth' import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { checkRolePermission } from '@/socket/middleware/permissions' import { checkRolePermission } from '@/socket/middleware/permissions'
import type { IRoomManager } from '@/socket/rooms' import type { IRoomManager, UserSession } from '@/socket/rooms'
import { WorkflowOperationSchema } from '@/socket/validation/schemas' import { WorkflowOperationSchema } from '@/socket/validation/schemas'
const logger = createLogger('OperationsHandlers') const logger = createLogger('OperationsHandlers')
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) { export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('workflow-operation', async (data) => { socket.on('workflow-operation', async (data) => {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) const emitOperationError = (
const session = await roomManager.getUserSession(socket.id) forbidden: { type: string; message: string; operation?: string; target?: string },
failed?: { error: string; retryable?: boolean }
if (!workflowId || !session) { ) => {
socket.emit('operation-forbidden', { socket.emit('operation-forbidden', forbidden)
type: 'SESSION_ERROR', if (failed && data?.operationId) {
message: 'Session expired, please rejoin workflow', socket.emit('operation-failed', { operationId: data.operationId, ...failed })
})
if (data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
} }
}
if (!roomManager.isReady()) {
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return return
} }
const hasRoom = await roomManager.hasWorkflowRoom(workflowId) let workflowId: string | null = null
let session: UserSession | null = null
try {
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
session = await roomManager.getUserSession(socket.id)
} catch (error) {
logger.error('Error loading session for workflow operation:', error)
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
if (!workflowId || !session) {
emitOperationError(
{ type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' },
{ error: 'Session expired' }
)
return
}
let hasRoom = false
try {
hasRoom = await roomManager.hasWorkflowRoom(workflowId)
} catch (error) {
logger.error('Error checking workflow room:', error)
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
if (!hasRoom) { if (!hasRoom) {
socket.emit('operation-forbidden', { emitOperationError(
type: 'ROOM_NOT_FOUND', { type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' },
message: 'Workflow room not found', { error: 'Workflow room not found' }
}) )
if (data?.operationId) {
socket.emit('operation-failed', {
operationId: data.operationId,
error: 'Workflow room not found',
})
}
return return
} }
@@ -77,15 +108,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
// Check permissions from cached role for all other operations // Check permissions from cached role for all other operations
if (!userPresence) { if (!userPresence) {
logger.warn(`User presence not found for socket ${socket.id}`) logger.warn(`User presence not found for socket ${socket.id}`)
socket.emit('operation-forbidden', { emitOperationError(
type: 'SESSION_ERROR', {
message: 'User session not found', type: 'SESSION_ERROR',
operation, message: 'User session not found',
target, operation,
}) target,
if (operationId) { },
socket.emit('operation-failed', { operationId, error: 'User session not found' }) { error: 'User session not found' }
} )
return return
} }
@@ -97,7 +128,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
logger.warn( logger.warn(
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}` `User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
) )
socket.emit('operation-forbidden', { emitOperationError({
type: 'INSUFFICIENT_PERMISSIONS', type: 'INSUFFICIENT_PERMISSIONS',
message: `${permissionCheck.reason} on '${target}'`, message: `${permissionCheck.reason} on '${target}'`,
operation, operation,

View File

@@ -48,6 +48,21 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
operationId, operationId,
} = data } = data
if (!roomManager.isReady()) {
socket.emit('operation-forbidden', {
type: 'ROOM_MANAGER_UNAVAILABLE',
message: 'Realtime unavailable',
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Realtime unavailable',
retryable: true,
})
}
return
}
try { try {
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id) const session = await roomManager.getUserSession(socket.id)

View File

@@ -37,6 +37,21 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
socket.on('variable-update', async (data) => { socket.on('variable-update', async (data) => {
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
if (!roomManager.isReady()) {
socket.emit('operation-forbidden', {
type: 'ROOM_MANAGER_UNAVAILABLE',
message: 'Realtime unavailable',
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Realtime unavailable',
retryable: true,
})
}
return
}
try { try {
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id) const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id) const session = await roomManager.getUserSession(socket.id)

View File

@@ -20,6 +20,15 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
return return
} }
if (!roomManager.isReady()) {
logger.warn(`Join workflow rejected: Room manager unavailable`)
socket.emit('join-workflow-error', {
error: 'Realtime unavailable',
code: 'ROOM_MANAGER_UNAVAILABLE',
})
return
}
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`) logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
// Verify workflow access // Verify workflow access
@@ -128,12 +137,20 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
// Undo socket.join and room manager entry if any operation failed // Undo socket.join and room manager entry if any operation failed
socket.leave(workflowId) socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id) await roomManager.removeUserFromRoom(socket.id)
socket.emit('join-workflow-error', { error: 'Failed to join workflow' }) const isReady = roomManager.isReady()
socket.emit('join-workflow-error', {
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
})
} }
}) })
socket.on('leave-workflow', async () => { socket.on('leave-workflow', async () => {
try { try {
if (!roomManager.isReady()) {
return
}
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id) const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id) const session = await roomManager.getUserSession(socket.id)

View File

@@ -26,6 +26,10 @@ export class MemoryRoomManager implements IRoomManager {
logger.info('MemoryRoomManager initialized (single-pod mode)') logger.info('MemoryRoomManager initialized (single-pod mode)')
} }
isReady(): boolean {
return true
}
async shutdown(): Promise<void> { async shutdown(): Promise<void> {
this.workflowRooms.clear() this.workflowRooms.clear()
this.socketToWorkflow.clear() this.socketToWorkflow.clear()

View File

@@ -96,17 +96,6 @@ export class RedisRoomManager implements IRoomManager {
this._io = io this._io = io
this.redis = createClient({ this.redis = createClient({
url: redisUrl, url: redisUrl,
socket: {
reconnectStrategy: (retries) => {
if (retries > 10) {
logger.error('Redis reconnection failed after 10 attempts')
return new Error('Redis reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
}) })
this.redis.on('error', (err) => { this.redis.on('error', (err) => {
@@ -122,12 +111,21 @@ export class RedisRoomManager implements IRoomManager {
logger.info('Redis client ready') logger.info('Redis client ready')
this.isConnected = true this.isConnected = true
}) })
this.redis.on('end', () => {
logger.warn('Redis client connection closed')
this.isConnected = false
})
} }
get io(): Server { get io(): Server {
return this._io return this._io
} }
isReady(): boolean {
return this.isConnected
}
async initialize(): Promise<void> { async initialize(): Promise<void> {
if (this.isConnected) return if (this.isConnected) return

View File

@@ -48,6 +48,11 @@ export interface IRoomManager {
*/ */
initialize(): Promise<void> initialize(): Promise<void>
/**
* Whether the room manager is ready to serve requests
*/
isReady(): boolean
/** /**
* Clean shutdown * Clean shutdown
*/ */

View File

@@ -85,6 +85,11 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
res.end(JSON.stringify({ error: authResult.error })) res.end(JSON.stringify({ error: authResult.error }))
return return
} }
if (!roomManager.isReady()) {
sendError(res, 'Room manager unavailable', 503)
return
}
} }
// Handle workflow deletion notifications from the main API // Handle workflow deletion notifications from the main API