Compare commits

...

24 Commits

Author SHA1 Message Date
Cursor Agent
9f30287eb9 fix(mcp): tighten resilience pipeline behavior 2026-03-10 00:17:49 +00:00
Waleed
ecd3536a72 v0.5.109: obsidian and evernote integrations, slack fixes, remove memory instrumentation 2026-03-09 10:40:37 -07:00
Theodore Li
635179d696 Revert "feat(hosted key): Add exa hosted key (#3221)" (#3495)
This reverts commit 158d5236bc.

Co-authored-by: Theodore Li <teddy@zenobiapay.com>
2026-03-09 10:31:54 -07:00
Waleed
f88926a6a8 fix(webhooks): return empty 200 for Slack to close modals cleanly (#3492)
* fix(webhooks): return empty 200 for Slack to close modals cleanly

* fix(webhooks): add clarifying comment on Slack error path trade-off
2026-03-09 10:11:36 -07:00
Waleed
690b47a0bf chore(monitoring): remove SSE connection tracking and Bun.gc debug instrumentation (#3472) 2026-03-08 17:27:05 -07:00
Theodore Li
158d5236bc feat(hosted key): Add exa hosted key (#3221)
* feat(hosted keys): Implement serper hosted key

* Handle required fields correctly for hosted keys

* Add rate limiting (3 tries, exponential backoff)

* Add custom pricing, switch to exa as first hosted key

* Add telemetry

* Consolidate byok type definitions

* Add warning comment if default calculation is used

* Record usage to user stats table

* Fix unit tests, use cost property

* Include more metadata in cost output

* Fix disabled tests

* Fix spacing

* Fix lint

* Move knowledge cost restructuring away from generic block handler

* Migrate knowledge unit tests

* Lint

* Fix broken tests

* Add user based hosted key throttling

* Refactor hosted key handling. Add optimistic handling of throttling for custom throttle rules.

* Remove research as hosted key. Recommend BYOK if throtttling occurs

* Make adding api keys adjustable via env vars

* Remove vestigial fields from research

* Make billing actor id required for throttling

* Switch to round robin for api key distribution

* Add helper method for adding hosted key cost

* Strip leading double underscores to avoid breaking change

* Lint fix

* Remove falsy check in favor for explicit null check

* Add more detailed metrics for different throttling types

* Fix _costDollars field

* Handle hosted agent tool calls

* Fail loudly if cost field isn't found

* Remove any type

* Fix type error

* Fix lint

* Fix usage log double logging data

* Fix test

---------

Co-authored-by: Theodore Li <teddy@zenobiapay.com>
2026-03-07 13:06:57 -05:00
Vikhyath Mondreti
8c0a2e04b1 v0.5.108: workflow input params in agent tools, bun upgrade, dropdown selectors for 14 blocks 2026-03-06 21:02:25 -08:00
Waleed
6586c5ce40 v0.5.107: new reddit, slack tools 2026-03-05 22:48:20 -08:00
Vikhyath Mondreti
3ce947566d v0.5.106: condition block and legacy kbs fixes, GPT 5.4 2026-03-05 17:30:05 -08:00
Waleed
70c36cb7aa v0.5.105: slack remove reaction, nested subflow locks fix, servicenow pagination, memory improvements 2026-03-04 22:38:26 -08:00
Waleed
f1ec5fe824 v0.5.104: memory improvements, nested subflows, careers page redirect, brandfetch, google meet 2026-03-03 23:45:29 -08:00
Waleed
e07e3c34cc v0.5.103: memory util instrumentation, API docs, amplitude, google pagespeed insights, pagerduty 2026-03-01 23:27:02 -08:00
Waleed
0d2e6ff31d v0.5.102: new integrations, new tools, ci speedups, memory leak instrumentation 2026-02-28 12:48:10 -08:00
Waleed
4fd0989264 v0.5.101: circular dependency mitigation, confluence enhancements, google tasks and bigquery integrations, workflow lock 2026-02-26 15:04:53 -08:00
Waleed
67f8a687f6 v0.5.100: multiple credentials, 40% speedup, gong, attio, audit log improvements 2026-02-25 00:28:25 -08:00
Waleed
af592349d3 v0.5.99: local dev improvements, live workflow logs in terminal 2026-02-23 00:24:49 -08:00
Waleed
0d86ea01f0 v0.5.98: change detection improvements, rate limit and code execution fixes, removed retired models, hex integration 2026-02-21 18:07:40 -08:00
Waleed
115f04e989 v0.5.97: oidc discovery for copilot mcp 2026-02-21 02:06:25 -08:00
Waleed
34d92fae89 v0.5.96: sim oauth provider, slack ephemeral message tool and blockkit support 2026-02-20 18:22:20 -08:00
Waleed
67aa4bb332 v0.5.95: gemini 3.1 pro, cloudflare, dataverse, revenuecat, redis, upstash, algolia tools; isolated-vm robustness improvements, tables backend (#3271)
* feat(tools): advanced fields for youtube, vercel; added cloudflare and dataverse tools (#3257)

* refactor(vercel): mark optional fields as advanced mode

Move optional/power-user fields behind the advanced toggle:
- List Deployments: project filter, target, state
- Create Deployment: project ID override, redeploy from, target
- List Projects: search
- Create/Update Project: framework, build/output/install commands
- Env Vars: variable type
- Webhooks: project IDs filter
- Checks: path, details URL
- Team Members: role filter
- All operations: team ID scope

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* style(youtube): mark optional params as advanced mode

Hide pagination, sort order, and filter fields behind the advanced
toggle for a cleaner default UX across all YouTube operations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* added advanced fields for vercel and youtube, added cloudflare and dataverse block

* addded desc for dataverse

* add more tools

* ack comment

* more

* ops

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

* feat(tables): added tables (#2867)

* updates

* required

* trashy table viewer

* updates

* updates

* filtering ui

* updates

* updates

* updates

* one input mode

* format

* fix lints

* improved errors

* updates

* updates

* chages

* doc strings

* breaking down file

* update comments with ai

* updates

* comments

* changes

* revert

* updates

* dedupe

* updates

* updates

* updates

* refactoring

* renames & refactors

* refactoring

* updates

* undo

* update db

* wand

* updates

* fix comments

* fixes

* simplify comments

* u[dates

* renames

* better comments

* validation

* updates

* updates

* updates

* fix sorting

* fix appearnce

* updating prompt to make it user sort

* rm

* updates

* rename

* comments

* clean comments

* simplicifcaiton

* updates

* updates

* refactor

* reduced type confusion

* undo

* rename

* undo changes

* undo

* simplify

* updates

* updates

* revert

* updates

* db updates

* type fix

* fix

* fix error handling

* updates

* docs

* docs

* updates

* rename

* dedupe

* revert

* uncook

* updates

* fix

* fix

* fix

* fix

* prepare merge

* readd migrations

* add back missed code

* migrate enrichment logic to general abstraction

* address bugbot concerns

* adhere to size limits for tables

* remove conflicting migration

* add back migrations

* fix tables auth

* fix permissive auth

* fix lint

* reran migrations

* migrate to use tanstack query for all server state

* update table-selector

* update names

* added tables to permission groups, updated subblock types

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
Co-authored-by: waleed <walif6@gmail.com>

* fix(snapshot): changed insert to upsert when concurrent identical child workflows are running (#3259)

* fix(snapshot): changed insert to upsert when concurrent identical child workflows are running

* fixed ci tests failing

* fix(workflows): disallow duplicate workflow names at the same folder level (#3260)

* feat(tools): added redis, upstash, algolia, and revenuecat (#3261)

* feat(tools): added redis, upstash, algolia, and revenuecat

* ack comment

* feat(models): add gemini-3.1-pro-preview and update gemini-3-pro thinking levels (#3263)

* fix(audit-log): lazily resolve actor name/email when missing (#3262)

* fix(blocks): move type coercions from tools.config.tool to tools.config.params (#3264)

* fix(blocks): move type coercions from tools.config.tool to tools.config.params

Number() coercions in tools.config.tool ran at serialization time before
variable resolution, destroying dynamic references like <block.result.count>
by converting them to NaN/null. Moved all coercions to tools.config.params
which runs at execution time after variables are resolved.

Fixed in 15 blocks: exa, arxiv, sentry, incidentio, wikipedia, ahrefs,
posthog, elasticsearch, dropbox, hunter, lemlist, spotify, youtube, grafana,
parallel. Also added mode: 'advanced' to optional exa fields.

Closes #3258

* fix(blocks): address PR review — move remaining param mutations from tool() to params()

- Moved field mappings from tool() to params() in grafana, posthog,
  lemlist, spotify, dropbox (same dynamic reference bug)
- Fixed parallel.ts excerpts/full_content boolean logic
- Fixed parallel.ts search_queries empty case (must set undefined)
- Fixed elasticsearch.ts timeout not included when already ends with 's'
- Restored dropbox.ts tool() switch for proper default fallback

* fix(blocks): restore field renames to tool() for serialization-time validation

Field renames (e.g. personalApiKey→apiKey) must be in tool() because
validateRequiredFieldsBeforeExecution calls selectToolId()→tool() then
checks renamed field names on params. Only type coercions (Number(),
boolean) stay in params() to avoid destroying dynamic variable references.

* improvement(resolver): resovled empty sentinel to not pass through unexecuted valid refs to text inputs (#3266)

* fix(blocks): add required constraint for serviceDeskId in JSM block (#3268)

* fix(blocks): add required constraint for serviceDeskId in JSM block

* fix(blocks): rename custom field values to request field values in JSM create request

* fix(trigger): add isolated-vm support to trigger.dev container builds (#3269)

Scheduled workflow executions running in trigger.dev containers were
failing to spawn isolated-vm workers because the native module wasn't
available in the container. This caused loop condition evaluation to
silently fail and exit after one iteration.

- Add isolated-vm to build.external and additionalPackages in trigger config
- Include isolated-vm-worker.cjs via additionalFiles for child process spawning
- Add fallback path resolution for worker file in trigger.dev environment

* fix(tables): hide tables from sidebar and block registry (#3270)

* fix(tables): hide tables from sidebar and block registry

* fix(trigger): add isolated-vm support to trigger.dev container builds (#3269)

Scheduled workflow executions running in trigger.dev containers were
failing to spawn isolated-vm workers because the native module wasn't
available in the container. This caused loop condition evaluation to
silently fail and exit after one iteration.

- Add isolated-vm to build.external and additionalPackages in trigger config
- Include isolated-vm-worker.cjs via additionalFiles for child process spawning
- Add fallback path resolution for worker file in trigger.dev environment

* lint

* fix(trigger): update node version to align with main app (#3272)

* fix(build): fix corrupted sticky disk cache on blacksmith (#3273)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Lakee Sivaraya <71339072+lakeesiv@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
2026-02-20 13:43:07 -08:00
Waleed
15ace5e63f v0.5.94: vercel integration, folder insertion, migrated tracking redirects to rewrites 2026-02-18 16:53:34 -08:00
Waleed
fdca73679d v0.5.93: NextJS config changes, MCP and Blocks whitelisting, copilot keyboard shortcuts, audit logs 2026-02-18 12:10:05 -08:00
Waleed
da46a387c9 v0.5.92: shortlinks, copilot scrolling stickiness, pagination 2026-02-17 15:13:21 -08:00
Waleed
b7e377ec4b v0.5.91: docs i18n, turborepo upgrade 2026-02-16 00:36:05 -08:00
14 changed files with 478 additions and 116 deletions

View File

@@ -19,7 +19,6 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
import {
@@ -631,11 +630,9 @@ async function handleMessageStream(
}
const encoder = new TextEncoder()
let messageStreamDecremented = false
const stream = new ReadableStream({
async start(controller) {
incrementSSEConnections('a2a-message')
const sendEvent = (event: string, data: unknown) => {
try {
const jsonRpcResponse = {
@@ -845,19 +842,10 @@ async function handleMessageStream(
})
} finally {
await releaseLock(lockKey, lockValue)
if (!messageStreamDecremented) {
messageStreamDecremented = true
decrementSSEConnections('a2a-message')
}
controller.close()
}
},
cancel() {
if (!messageStreamDecremented) {
messageStreamDecremented = true
decrementSSEConnections('a2a-message')
}
},
cancel() {},
})
return new NextResponse(stream, {
@@ -1042,22 +1030,16 @@ async function handleTaskResubscribe(
{ once: true }
)
let sseDecremented = false
const cleanup = () => {
isCancelled = true
if (pollTimeoutId) {
clearTimeout(pollTimeoutId)
pollTimeoutId = null
}
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('a2a-resubscribe')
}
}
const stream = new ReadableStream({
async start(controller) {
incrementSSEConnections('a2a-resubscribe')
const sendEvent = (event: string, data: unknown): boolean => {
if (isCancelled || abortSignal.aborted) return false
try {

View File

@@ -14,7 +14,6 @@ import { getSession } from '@/lib/auth'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
import { mcpPubSub } from '@/lib/mcp/pubsub'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
const logger = createLogger('McpEventsSSE')
@@ -50,14 +49,11 @@ export async function GET(request: NextRequest) {
for (const unsub of unsubscribers) {
unsub()
}
decrementSSEConnections('mcp-events')
logger.info(`SSE connection closed for workspace ${workspaceId}`)
}
const stream = new ReadableStream({
start(controller) {
incrementSSEConnections('mcp-events')
const send = (eventName: string, data: Record<string, unknown>) => {
if (cleaned) return
try {

View File

@@ -10,7 +10,6 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
import { env } from '@/lib/core/config/env'
import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { enrichTableSchema } from '@/lib/table/llm/wand'
import { verifyWorkspaceMembership } from '@/app/api/workflows/utils'
import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils'
@@ -331,14 +330,10 @@ export async function POST(req: NextRequest) {
const encoder = new TextEncoder()
const decoder = new TextDecoder()
let wandStreamClosed = false
const readable = new ReadableStream({
async start(controller) {
incrementSSEConnections('wand')
const reader = response.body?.getReader()
if (!reader) {
wandStreamClosed = true
decrementSSEConnections('wand')
controller.close()
return
}
@@ -483,18 +478,9 @@ export async function POST(req: NextRequest) {
controller.close()
} finally {
reader.releaseLock()
if (!wandStreamClosed) {
wandStreamClosed = true
decrementSSEConnections('wand')
}
}
},
cancel() {
if (!wandStreamClosed) {
wandStreamClosed = true
decrementSSEConnections('wand')
}
},
cancel() {},
})
return new Response(readable, {

View File

@@ -22,7 +22,6 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import {
cleanupExecutionBase64Cache,
hydrateUserFilesWithBase64,
@@ -764,7 +763,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const encoder = new TextEncoder()
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
let isStreamClosed = false
let sseDecremented = false
const eventWriter = createExecutionEventWriter(executionId)
setExecutionMeta(executionId, {
@@ -775,7 +773,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
incrementSSEConnections('workflow-execute')
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null
const sendEvent = (event: ExecutionEvent) => {
@@ -1159,10 +1156,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
if (executionId) {
await cleanupExecutionBase64Cache(executionId)
}
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('workflow-execute')
}
if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
@@ -1174,10 +1167,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client disconnected from SSE stream`)
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('workflow-execute')
}
},
})

View File

@@ -7,7 +7,6 @@ import {
getExecutionMeta,
readExecutionEvents,
} from '@/lib/execution/event-buffer'
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
@@ -74,10 +73,8 @@ export async function GET(
let closed = false
let sseDecremented = false
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
incrementSSEConnections('execution-stream-reconnect')
let lastEventId = fromEventId
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS
@@ -145,20 +142,11 @@ export async function GET(
controller.close()
} catch {}
}
} finally {
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('execution-stream-reconnect')
}
}
},
cancel() {
closed = true
logger.info('Client disconnected from reconnection stream', { executionId })
if (!sseDecremented) {
sseDecremented = true
decrementSSEConnections('execution-stream-reconnect')
}
},
})

View File

@@ -0,0 +1,143 @@
import { createLogger } from '@sim/logger'
import type { McpToolResult } from '@/lib/mcp/types'
import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types'
// Configure standard cache size limit
const MAX_SERVER_STATES = 1000
export type CircuitState = 'CLOSED' | 'OPEN' | 'HALF-OPEN'
export interface CircuitBreakerConfig {
/** Number of failures before tripping to OPEN */
failureThreshold: number
/** How long to wait in OPEN before transitioning to HALF-OPEN (ms) */
resetTimeoutMs: number
}
interface ServerState {
state: CircuitState
failures: number
nextAttemptMs: number
isHalfOpenProbing: boolean
}
const logger = createLogger('mcp:resilience:circuit-breaker')
export class CircuitBreakerMiddleware implements McpMiddleware {
// Use a Map to maintain insertion order for standard LRU-like eviction if necessary.
// We constrain it to prevent memory leaks if thousands of ephemeral servers connect.
private registry = new Map<string, ServerState>()
private config: CircuitBreakerConfig
constructor(config: Partial<CircuitBreakerConfig> = {}) {
this.config = {
failureThreshold: config.failureThreshold ?? 5,
resetTimeoutMs: config.resetTimeoutMs ?? 30000,
}
}
private getState(serverId: string): ServerState {
let state = this.registry.get(serverId)
if (!state) {
state = {
state: 'CLOSED',
failures: 0,
nextAttemptMs: 0,
isHalfOpenProbing: false,
}
this.registry.set(serverId, state)
this.evictIfNecessary()
}
return state
}
private evictIfNecessary() {
if (this.registry.size > MAX_SERVER_STATES) {
// Evict the oldest entry (first inserted)
const firstKey = this.registry.keys().next().value
if (firstKey) {
this.registry.delete(firstKey)
}
}
}
async execute(context: McpExecutionContext, next: McpMiddlewareNext): Promise<McpToolResult> {
const { serverId, toolCall } = context
const serverState = this.getState(serverId)
// 1. Check current state and evaluate timeouts
if (serverState.state === 'OPEN') {
if (Date.now() > serverState.nextAttemptMs) {
// Time to try again, enter HALF-OPEN
logger.info(`Circuit breaker entering HALF-OPEN for server ${serverId}`)
serverState.state = 'HALF-OPEN'
serverState.isHalfOpenProbing = false
} else {
// Fast-fail
throw new Error(
`Circuit breaker is OPEN for server ${serverId}. Fast-failing request to ${toolCall.name}.`
)
}
}
if (serverState.state === 'HALF-OPEN') {
if (serverState.isHalfOpenProbing) {
// Another request is already probing. Fast-fail concurrent requests.
throw new Error(
`Circuit breaker is HALF-OPEN for server ${serverId}. A probe request is currently executing. Fast-failing concurrent request to ${toolCall.name}.`
)
}
// We are the chosen ones. Lock it down.
serverState.isHalfOpenProbing = true
}
try {
// 2. Invoke the next layer
const result = await next(context)
// 3. Handle result parsing (isError = true counts as failure for us)
if (result.isError) {
this.recordFailure(serverId, serverState)
} else {
this.recordSuccess(serverId, serverState)
}
return result
} catch (error) {
// Note: we record failure on ANY exception
this.recordFailure(serverId, serverState)
throw error // Re-throw to caller
}
}
private recordSuccess(serverId: string, state: ServerState) {
if (state.state !== 'CLOSED') {
logger.info(`Circuit breaker reset to CLOSED for server ${serverId}`)
}
state.state = 'CLOSED'
state.failures = 0
state.isHalfOpenProbing = false
}
private recordFailure(serverId: string, state: ServerState) {
if (state.state === 'HALF-OPEN') {
// The probe failed! Trip immediately back to OPEN.
logger.warn(`Circuit breaker probe failed. Tripping back to OPEN for server ${serverId}`)
this.tripToOpen(state)
} else if (state.state === 'CLOSED') {
state.failures++
if (state.failures >= this.config.failureThreshold) {
logger.error(
`Circuit breaker failure threshold reached (${state.failures}/${this.config.failureThreshold}). Tripping to OPEN for server ${serverId}`
)
this.tripToOpen(state)
}
}
}
private tripToOpen(state: ServerState) {
state.state = 'OPEN'
state.isHalfOpenProbing = false
state.nextAttemptMs = Date.now() + this.config.resetTimeoutMs
}
}

View File

@@ -0,0 +1,42 @@
import type { McpToolResult } from '@/lib/mcp/types'
import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types'
export class ResiliencePipeline {
private middlewares: McpMiddleware[] = []
/**
* Add a middleware to the pipeline chain.
*/
use(middleware: McpMiddleware): this {
this.middlewares.push(middleware)
return this
}
/**
* Execute the pipeline, processing the context through all middlewares,
* and finally invoking the terminal handler.
*/
async execute(
context: McpExecutionContext,
finalHandler: McpMiddlewareNext
): Promise<McpToolResult> {
let index = -1
const dispatch = async (i: number, currentContext: McpExecutionContext): Promise<McpToolResult> => {
if (i <= index) {
throw new Error('next() called multiple times')
}
index = i
// If we reached the end of the middlewares, call the final handler
if (i === this.middlewares.length) {
return finalHandler(currentContext)
}
const middleware = this.middlewares[i]
return middleware.execute(currentContext, (nextContext) => dispatch(i + 1, nextContext))
}
return dispatch(0, context)
}
}

View File

@@ -0,0 +1,155 @@
import { createLogger } from '@sim/logger'
import { z } from 'zod'
import { createMcpToolId } from '@/lib/mcp/shared'
import type { McpTool, McpToolResult, McpToolSchema, McpToolSchemaProperty } from '@/lib/mcp/types'
import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types'
const logger = createLogger('mcp:schema-validator')
export type ToolProvider = (
serverId: string,
toolName: string
) => McpTool | undefined | Promise<McpTool | undefined>
export class SchemaValidatorMiddleware implements McpMiddleware {
private schemaCache = new Map<string, z.ZodTypeAny>()
private toolProvider?: ToolProvider
constructor(options?: { toolProvider?: ToolProvider }) {
this.toolProvider = options?.toolProvider
}
/**
* Cache a tool's schema explicitly (e.g. during server discovery)
*/
cacheTool(tool: McpTool) {
const toolId = createMcpToolId(tool.serverId, tool.name)
const zodSchema = this.compileSchema(tool.inputSchema)
this.schemaCache.set(toolId, zodSchema)
}
/**
* Clear caches, either for a specific tool or globally.
*/
clearCache(toolId?: string) {
if (toolId) {
this.schemaCache.delete(toolId)
} else {
this.schemaCache.clear()
}
}
async execute(context: McpExecutionContext, next: McpMiddlewareNext): Promise<McpToolResult> {
const { toolCall } = context
const toolName = toolCall.name
const toolId = createMcpToolId(context.serverId, toolName)
let zodSchema = this.schemaCache.get(toolId)
if (!zodSchema && this.toolProvider) {
const tool = await this.toolProvider(context.serverId, toolName)
if (tool) {
zodSchema = this.compileSchema(tool.inputSchema)
this.schemaCache.set(toolId, zodSchema)
}
}
if (zodSchema) {
const parseResult = await zodSchema.safeParseAsync(toolCall.arguments)
if (!parseResult.success) {
// Return natively formatted error payload
const errorDetails = parseResult.error.errors
.map((e) => `${e.path.join('.') || 'root'}: ${e.message}`)
.join(', ')
logger.warn('Schema validation failed', { toolName, error: errorDetails })
return {
isError: true,
content: [
{
type: 'text',
text: `Schema validation failed: [${errorDetails}]`,
},
],
}
}
// Sync successfully parsed / defaulted arguments back to context
context.toolCall.arguments = parseResult.data
}
return next(context)
}
private compileSchema(schema: McpToolSchema): z.ZodObject<any> {
return this.compileObject(schema.properties || {}, schema.required || []) as z.ZodObject<any>
}
private compileObject(
properties: Record<string, McpToolSchemaProperty>,
required: string[]
): z.ZodTypeAny {
const shape: Record<string, z.ZodTypeAny> = {}
for (const [key, prop] of Object.entries(properties)) {
let zodType = this.compileProperty(prop)
if (!required.includes(key)) {
zodType = zodType.optional()
}
shape[key] = zodType
}
return z.object(shape)
}
private compileProperty(prop: McpToolSchemaProperty): z.ZodTypeAny {
let baseType: z.ZodTypeAny = z.any()
switch (prop.type) {
case 'string':
baseType = z.string()
break
case 'number':
case 'integer':
baseType = z.number()
break
case 'boolean':
baseType = z.boolean()
break
case 'array':
if (prop.items) {
baseType = z.array(this.compileProperty(prop.items))
} else {
baseType = z.array(z.any())
}
break
case 'object':
baseType = this.compileObject(prop.properties || {}, prop.required || [])
break
}
// Apply Enum mappings
if (prop.enum && prop.enum.length > 0) {
if (prop.enum.length === 1) {
baseType = z.literal(prop.enum[0])
} else {
// We use mapped literals injected into an array
const literals = prop.enum.map((e) => z.literal(e))
baseType = z.union(literals as any)
}
}
if (prop.description) {
baseType = baseType.describe(prop.description)
}
if (prop.default !== undefined) {
baseType = baseType.default(prop.default)
}
return baseType
}
}

View File

@@ -0,0 +1,53 @@
import { createLogger } from '@sim/logger'
import type { McpToolResult } from '@/lib/mcp/types'
import type { McpExecutionContext, McpMiddleware, McpMiddlewareNext } from './types'
const logger = createLogger('mcp:telemetry')
export class TelemetryMiddleware implements McpMiddleware {
async execute(context: McpExecutionContext, next: McpMiddlewareNext): Promise<McpToolResult> {
const startTime = performance.now()
try {
const result = await next(context)
const latency_ms = Math.round(performance.now() - startTime)
const isError = result.isError === true
logger.info('MCP Tool Execution Completed', {
toolName: context.toolCall.name,
serverId: context.serverId,
workspaceId: context.workspaceId,
latency_ms,
success: !isError,
...(isError && { failure_reason: 'TOOL_ERROR' }),
})
return result
} catch (error) {
const latency_ms = Math.round(performance.now() - startTime)
// Attempt to determine failure reason based on error
let failure_reason = 'API_500' // General failure fallback
if (error instanceof Error) {
const lowerMsg = error.message.toLowerCase()
if (error.name === 'TimeoutError' || lowerMsg.includes('timeout')) {
failure_reason = 'TIMEOUT'
} else if (lowerMsg.includes('validation') || error.name === 'ZodError') {
failure_reason = 'VALIDATION_ERROR'
}
}
logger.error('MCP Tool Execution Failed', {
toolName: context.toolCall.name,
serverId: context.serverId,
workspaceId: context.workspaceId,
latency_ms,
failure_reason,
err: error instanceof Error ? error.message : String(error),
})
throw error // Re-throw to allow upstream handling (e.g. circuit breaker)
}
}
}

View File

@@ -0,0 +1,32 @@
import type { McpToolCall, McpToolResult } from '@/lib/mcp/types'
/**
* Context passed through the Resilience Pipeline
*/
export interface McpExecutionContext {
toolCall: McpToolCall
serverId: string
userId: string
workspaceId: string
/**
* Additional parameters passed directly by the executeTool caller
*/
extraHeaders?: Record<string, string>
}
/**
* Standardized function signature for invoking the NEXT component in the pipeline
*/
export type McpMiddlewareNext = (context: McpExecutionContext) => Promise<McpToolResult>
/**
* Interface that all Resilience Middlewares must implement
*/
export interface McpMiddleware {
/**
* Execute the middleware logic
* @param context The current execution context
* @param next The next middleware/tool in the chain
*/
execute(context: McpExecutionContext, next: McpMiddlewareNext): Promise<McpToolResult>
}

View File

@@ -11,6 +11,10 @@ import { generateRequestId } from '@/lib/core/utils/request'
import { McpClient } from '@/lib/mcp/client'
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
import { isMcpDomainAllowed, validateMcpDomain } from '@/lib/mcp/domain-check'
import { CircuitBreakerMiddleware } from '@/lib/mcp/resilience/circuit-breaker'
import { ResiliencePipeline } from '@/lib/mcp/resilience/pipeline'
import { SchemaValidatorMiddleware } from '@/lib/mcp/resilience/schema-validator'
import { TelemetryMiddleware } from '@/lib/mcp/resilience/telemetry'
import { resolveMcpConfigEnvVars } from '@/lib/mcp/resolve-config'
import {
createMcpCacheAdapter,
@@ -35,10 +39,23 @@ class McpService {
private readonly cacheTimeout = MCP_CONSTANTS.CACHE_TIMEOUT
private unsubscribeConnectionManager?: () => void
private pipeline: ResiliencePipeline
private schemaValidator: SchemaValidatorMiddleware
private circuitBreaker: CircuitBreakerMiddleware
private telemetry: TelemetryMiddleware
constructor() {
this.cacheAdapter = createMcpCacheAdapter()
logger.info(`MCP Service initialized with ${getMcpCacheType()} cache`)
this.schemaValidator = new SchemaValidatorMiddleware()
this.circuitBreaker = new CircuitBreakerMiddleware()
this.telemetry = new TelemetryMiddleware()
this.pipeline = new ResiliencePipeline()
.use(this.telemetry)
.use(this.schemaValidator)
.use(this.circuitBreaker)
if (mcpConnectionManager) {
this.unsubscribeConnectionManager = mcpConnectionManager.subscribe((event) => {
this.clearCache(event.workspaceId)
@@ -191,15 +208,23 @@ class McpService {
if (extraHeaders && Object.keys(extraHeaders).length > 0) {
resolvedConfig.headers = { ...resolvedConfig.headers, ...extraHeaders }
}
const client = await this.createClient(resolvedConfig)
try {
const result = await client.callTool(toolCall)
logger.info(`[${requestId}] Successfully executed tool ${toolCall.name}`)
return result
} finally {
await client.disconnect()
const context = {
serverId,
workspaceId,
userId,
toolCall,
extraHeaders,
}
const result = await this.pipeline.execute(context, async (ctx) => {
const client = await this.createClient(resolvedConfig)
try {
return await client.callTool(ctx.toolCall)
} finally {
await client.disconnect()
}
})
logger.info(`[${requestId}] Successfully executed tool ${toolCall.name}`)
return result
} catch (error) {
if (this.isSessionError(error) && attempt < maxRetries - 1) {
logger.warn(
@@ -322,6 +347,7 @@ class McpService {
try {
const cached = await this.cacheAdapter.get(cacheKey)
if (cached) {
cached.tools.forEach((t: McpTool) => this.schemaValidator.cacheTool(t))
return cached.tools
}
} catch (error) {
@@ -414,6 +440,7 @@ class McpService {
logger.info(
`[${requestId}] Discovered ${allTools.length} tools from ${servers.length - failedCount}/${servers.length} servers`
)
allTools.forEach((t: McpTool) => this.schemaValidator.cacheTool(t))
return allTools
} catch (error) {
logger.error(`[${requestId}] Failed to discover MCP tools for user ${userId}:`, error)
@@ -450,6 +477,7 @@ class McpService {
try {
const tools = await client.listTools()
logger.info(`[${requestId}] Discovered ${tools.length} tools from server ${config.name}`)
tools.forEach((t: McpTool) => this.schemaValidator.cacheTool(t))
return tools
} finally {
await client.disconnect()
@@ -533,6 +561,7 @@ class McpService {
await this.cacheAdapter.clear()
logger.debug('Cleared all MCP tool cache')
}
this.schemaValidator.clearCache()
} catch (error) {
logger.warn('Failed to clear cache:', error)
}

View File

@@ -1,16 +1,10 @@
/**
* Periodic memory telemetry for diagnosing heap growth in production.
* Logs process.memoryUsage(), V8 heap stats, and active SSE connection
* counts every 60s, enabling correlation between connection leaks and
* memory spikes.
* Periodic memory telemetry for monitoring heap growth in production.
* Logs process.memoryUsage() and V8 heap stats every 60s.
*/
import v8 from 'node:v8'
import { createLogger } from '@sim/logger'
import {
getActiveSSEConnectionCount,
getActiveSSEConnectionsByRoute,
} from '@/lib/monitoring/sse-connections'
const logger = createLogger('MemoryTelemetry', { logLevel: 'INFO' })
@@ -23,16 +17,6 @@ export function startMemoryTelemetry(intervalMs = 60_000) {
started = true
const timer = setInterval(() => {
// Trigger opportunistic (non-blocking) garbage collection if running on Bun.
// This signals JSC GC + mimalloc page purge without blocking the event loop,
// helping reclaim RSS that mimalloc otherwise retains under sustained load.
const bunGlobal = (globalThis as Record<string, unknown>).Bun as
| { gc?: (force: boolean) => void }
| undefined
if (typeof bunGlobal?.gc === 'function') {
bunGlobal.gc(false)
}
const mem = process.memoryUsage()
const heap = v8.getHeapStatistics()
@@ -49,8 +33,6 @@ export function startMemoryTelemetry(intervalMs = 60_000) {
? process.getActiveResourcesInfo().length
: -1,
uptimeMin: Math.round(process.uptime() / 60),
activeSSEConnections: getActiveSSEConnectionCount(),
sseByRoute: getActiveSSEConnectionsByRoute(),
})
}, intervalMs)
timer.unref()

View File

@@ -1,27 +0,0 @@
/**
* Tracks active SSE connections by route for memory leak diagnostics.
* Logged alongside periodic memory telemetry to correlate connection
* counts with heap growth.
*/
const connections = new Map<string, number>()
export function incrementSSEConnections(route: string) {
connections.set(route, (connections.get(route) ?? 0) + 1)
}
export function decrementSSEConnections(route: string) {
const count = (connections.get(route) ?? 0) - 1
if (count <= 0) connections.delete(route)
else connections.set(route, count)
}
export function getActiveSSEConnectionCount(): number {
let total = 0
for (const count of connections.values()) total += count
return total
}
export function getActiveSSEConnectionsByRoute(): Record<string, number> {
return Object.fromEntries(connections)
}

View File

@@ -1166,6 +1166,12 @@ export async function queueWebhookExecution(
})
}
// Slack requires an empty 200 for interactive payloads (view_submission, block_actions, etc.)
// A JSON body like {"message":"..."} is not a recognized response format and causes modal errors
if (foundWebhook.provider === 'slack') {
return new NextResponse(null, { status: 200 })
}
// Twilio Voice requires TwiML XML response
if (foundWebhook.provider === 'twilio_voice') {
const providerConfig = (foundWebhook.providerConfig as Record<string, any>) || {}
@@ -1211,6 +1217,12 @@ export async function queueWebhookExecution(
)
}
if (foundWebhook.provider === 'slack') {
// Return empty 200 to avoid Slack showing an error dialog to the user,
// even though processing failed. The error is already logged above.
return new NextResponse(null, { status: 200 })
}
if (foundWebhook.provider === 'twilio_voice') {
const errorTwiml = `<?xml version="1.0" encoding="UTF-8"?>
<Response>