Compare commits

..

38 Commits

Author SHA1 Message Date
Waleed
14089f7dbb v0.6.14: performance improvements, connectors UX, collapsed sidebar actions 2026-03-27 13:07:59 -07:00
Waleed
e615816dce v0.6.13: emcn standardization, granola and ketch integrations, security hardening, connectors improvements 2026-03-27 00:16:37 -07:00
Waleed
ca87d7ce29 v0.6.12: billing, blogs UI 2026-03-26 01:19:23 -07:00
Waleed
6bebbc5e29 v0.6.11: billing fixes, rippling, hubspot, UI improvements, demo modal 2026-03-25 22:54:56 -07:00
Waleed
7b572f1f61 v0.6.10: tour fix, connectors reliability improvements, tooltip gif fixes 2026-03-24 21:38:19 -07:00
Vikhyath Mondreti
ed9a71f0af v0.6.9: general ux improvements for tables, mothership 2026-03-24 17:03:24 -07:00
Siddharth Ganesan
c78c870fda v0.6.8: mothership tool loop
v0.6.8: mothership tool loop
2026-03-24 04:06:19 -07:00
Waleed
19442f19e2 v0.6.7: kb improvements, edge z index fix, captcha, new trust center, block classifications 2026-03-21 12:43:33 -07:00
Waleed
1731a4d7f0 v0.6.6: landing improvements, styling consistency, mothership table renaming 2026-03-19 23:58:30 -07:00
Waleed
9fcd02fd3b v0.6.5: email validation, integrations page, mothership and custom tool fixes 2026-03-19 16:08:30 -07:00
Waleed
ff7b5b528c v0.6.4: subflows, docusign, ashby new tools, box, workday, billing bug fixes 2026-03-18 23:12:36 -07:00
Waleed
30f2d1a0fc v0.6.3: hubspot integration, kb block improvements 2026-03-18 11:19:55 -07:00
Waleed
4bd0731871 v0.6.2: mothership stability, chat iframe embedding, KB upserts, new blog post 2026-03-18 03:29:39 -07:00
Waleed
4f3bc37fe4 v0.6.1: added better auth admin plugin 2026-03-17 15:16:16 -07:00
Waleed
84d6fdc423 v0.6: mothership, tables, connectors 2026-03-17 12:21:15 -07:00
Vikhyath Mondreti
4c12914d35 v0.5.113: jira, ashby, google ads, grain updates 2026-03-12 22:54:25 -07:00
Waleed
e9bdc57616 v0.5.112: trace spans improvements, fathom integration, jira fixes, canvas navigation updates 2026-03-12 13:30:20 -07:00
Vikhyath Mondreti
36612ae42a v0.5.111: non-polling webhook execs off trigger.dev, gmail subject headers, webhook trigger configs (#3530) 2026-03-11 17:47:28 -07:00
Waleed
1c2c2c65d4 v0.5.110: webhook execution speedups, SSRF patches 2026-03-11 15:00:24 -07:00
Waleed
ecd3536a72 v0.5.109: obsidian and evernote integrations, slack fixes, remove memory instrumentation 2026-03-09 10:40:37 -07:00
Vikhyath Mondreti
8c0a2e04b1 v0.5.108: workflow input params in agent tools, bun upgrade, dropdown selectors for 14 blocks 2026-03-06 21:02:25 -08:00
Waleed
6586c5ce40 v0.5.107: new reddit, slack tools 2026-03-05 22:48:20 -08:00
Vikhyath Mondreti
3ce947566d v0.5.106: condition block and legacy kbs fixes, GPT 5.4 2026-03-05 17:30:05 -08:00
Waleed
70c36cb7aa v0.5.105: slack remove reaction, nested subflow locks fix, servicenow pagination, memory improvements 2026-03-04 22:38:26 -08:00
Waleed
f1ec5fe824 v0.5.104: memory improvements, nested subflows, careers page redirect, brandfetch, google meet 2026-03-03 23:45:29 -08:00
Waleed
e07e3c34cc v0.5.103: memory util instrumentation, API docs, amplitude, google pagespeed insights, pagerduty 2026-03-01 23:27:02 -08:00
Waleed
0d2e6ff31d v0.5.102: new integrations, new tools, ci speedups, memory leak instrumentation 2026-02-28 12:48:10 -08:00
Waleed
4fd0989264 v0.5.101: circular dependency mitigation, confluence enhancements, google tasks and bigquery integrations, workflow lock 2026-02-26 15:04:53 -08:00
Waleed
67f8a687f6 v0.5.100: multiple credentials, 40% speedup, gong, attio, audit log improvements 2026-02-25 00:28:25 -08:00
Waleed
af592349d3 v0.5.99: local dev improvements, live workflow logs in terminal 2026-02-23 00:24:49 -08:00
Waleed
0d86ea01f0 v0.5.98: change detection improvements, rate limit and code execution fixes, removed retired models, hex integration 2026-02-21 18:07:40 -08:00
Waleed
115f04e989 v0.5.97: oidc discovery for copilot mcp 2026-02-21 02:06:25 -08:00
Waleed
34d92fae89 v0.5.96: sim oauth provider, slack ephemeral message tool and blockkit support 2026-02-20 18:22:20 -08:00
Waleed
67aa4bb332 v0.5.95: gemini 3.1 pro, cloudflare, dataverse, revenuecat, redis, upstash, algolia tools; isolated-vm robustness improvements, tables backend (#3271)
* feat(tools): advanced fields for youtube, vercel; added cloudflare and dataverse tools (#3257)

* refactor(vercel): mark optional fields as advanced mode

Move optional/power-user fields behind the advanced toggle:
- List Deployments: project filter, target, state
- Create Deployment: project ID override, redeploy from, target
- List Projects: search
- Create/Update Project: framework, build/output/install commands
- Env Vars: variable type
- Webhooks: project IDs filter
- Checks: path, details URL
- Team Members: role filter
- All operations: team ID scope

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* style(youtube): mark optional params as advanced mode

Hide pagination, sort order, and filter fields behind the advanced
toggle for a cleaner default UX across all YouTube operations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* added advanced fields for vercel and youtube, added cloudflare and dataverse block

* addded desc for dataverse

* add more tools

* ack comment

* more

* ops

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

* feat(tables): added tables (#2867)

* updates

* required

* trashy table viewer

* updates

* updates

* filtering ui

* updates

* updates

* updates

* one input mode

* format

* fix lints

* improved errors

* updates

* updates

* chages

* doc strings

* breaking down file

* update comments with ai

* updates

* comments

* changes

* revert

* updates

* dedupe

* updates

* updates

* updates

* refactoring

* renames & refactors

* refactoring

* updates

* undo

* update db

* wand

* updates

* fix comments

* fixes

* simplify comments

* u[dates

* renames

* better comments

* validation

* updates

* updates

* updates

* fix sorting

* fix appearnce

* updating prompt to make it user sort

* rm

* updates

* rename

* comments

* clean comments

* simplicifcaiton

* updates

* updates

* refactor

* reduced type confusion

* undo

* rename

* undo changes

* undo

* simplify

* updates

* updates

* revert

* updates

* db updates

* type fix

* fix

* fix error handling

* updates

* docs

* docs

* updates

* rename

* dedupe

* revert

* uncook

* updates

* fix

* fix

* fix

* fix

* prepare merge

* readd migrations

* add back missed code

* migrate enrichment logic to general abstraction

* address bugbot concerns

* adhere to size limits for tables

* remove conflicting migration

* add back migrations

* fix tables auth

* fix permissive auth

* fix lint

* reran migrations

* migrate to use tanstack query for all server state

* update table-selector

* update names

* added tables to permission groups, updated subblock types

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
Co-authored-by: waleed <walif6@gmail.com>

* fix(snapshot): changed insert to upsert when concurrent identical child workflows are running (#3259)

* fix(snapshot): changed insert to upsert when concurrent identical child workflows are running

* fixed ci tests failing

* fix(workflows): disallow duplicate workflow names at the same folder level (#3260)

* feat(tools): added redis, upstash, algolia, and revenuecat (#3261)

* feat(tools): added redis, upstash, algolia, and revenuecat

* ack comment

* feat(models): add gemini-3.1-pro-preview and update gemini-3-pro thinking levels (#3263)

* fix(audit-log): lazily resolve actor name/email when missing (#3262)

* fix(blocks): move type coercions from tools.config.tool to tools.config.params (#3264)

* fix(blocks): move type coercions from tools.config.tool to tools.config.params

Number() coercions in tools.config.tool ran at serialization time before
variable resolution, destroying dynamic references like <block.result.count>
by converting them to NaN/null. Moved all coercions to tools.config.params
which runs at execution time after variables are resolved.

Fixed in 15 blocks: exa, arxiv, sentry, incidentio, wikipedia, ahrefs,
posthog, elasticsearch, dropbox, hunter, lemlist, spotify, youtube, grafana,
parallel. Also added mode: 'advanced' to optional exa fields.

Closes #3258

* fix(blocks): address PR review — move remaining param mutations from tool() to params()

- Moved field mappings from tool() to params() in grafana, posthog,
  lemlist, spotify, dropbox (same dynamic reference bug)
- Fixed parallel.ts excerpts/full_content boolean logic
- Fixed parallel.ts search_queries empty case (must set undefined)
- Fixed elasticsearch.ts timeout not included when already ends with 's'
- Restored dropbox.ts tool() switch for proper default fallback

* fix(blocks): restore field renames to tool() for serialization-time validation

Field renames (e.g. personalApiKey→apiKey) must be in tool() because
validateRequiredFieldsBeforeExecution calls selectToolId()→tool() then
checks renamed field names on params. Only type coercions (Number(),
boolean) stay in params() to avoid destroying dynamic variable references.

* improvement(resolver): resovled empty sentinel to not pass through unexecuted valid refs to text inputs (#3266)

* fix(blocks): add required constraint for serviceDeskId in JSM block (#3268)

* fix(blocks): add required constraint for serviceDeskId in JSM block

* fix(blocks): rename custom field values to request field values in JSM create request

* fix(trigger): add isolated-vm support to trigger.dev container builds (#3269)

Scheduled workflow executions running in trigger.dev containers were
failing to spawn isolated-vm workers because the native module wasn't
available in the container. This caused loop condition evaluation to
silently fail and exit after one iteration.

- Add isolated-vm to build.external and additionalPackages in trigger config
- Include isolated-vm-worker.cjs via additionalFiles for child process spawning
- Add fallback path resolution for worker file in trigger.dev environment

* fix(tables): hide tables from sidebar and block registry (#3270)

* fix(tables): hide tables from sidebar and block registry

* fix(trigger): add isolated-vm support to trigger.dev container builds (#3269)

Scheduled workflow executions running in trigger.dev containers were
failing to spawn isolated-vm workers because the native module wasn't
available in the container. This caused loop condition evaluation to
silently fail and exit after one iteration.

- Add isolated-vm to build.external and additionalPackages in trigger config
- Include isolated-vm-worker.cjs via additionalFiles for child process spawning
- Add fallback path resolution for worker file in trigger.dev environment

* lint

* fix(trigger): update node version to align with main app (#3272)

* fix(build): fix corrupted sticky disk cache on blacksmith (#3273)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Lakee Sivaraya <71339072+lakeesiv@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
2026-02-20 13:43:07 -08:00
Waleed
15ace5e63f v0.5.94: vercel integration, folder insertion, migrated tracking redirects to rewrites 2026-02-18 16:53:34 -08:00
Waleed
fdca73679d v0.5.93: NextJS config changes, MCP and Blocks whitelisting, copilot keyboard shortcuts, audit logs 2026-02-18 12:10:05 -08:00
Waleed
da46a387c9 v0.5.92: shortlinks, copilot scrolling stickiness, pagination 2026-02-17 15:13:21 -08:00
Waleed
b7e377ec4b v0.5.91: docs i18n, turborepo upgrade 2026-02-16 00:36:05 -08:00
117 changed files with 1113 additions and 6438 deletions

View File

@@ -195,17 +195,6 @@ By default, your usage is capped at the credits included in your plan. To allow
Max (individual) shares the same rate limits as team plans. Team plans (Pro or Max for Teams) use the Max-tier rate limits.
### Concurrent Execution Limits
| Plan | Concurrent Executions |
|------|----------------------|
| **Free** | 5 |
| **Pro** | 50 |
| **Max / Team** | 200 |
| **Enterprise** | 200 (customizable) |
Concurrent execution limits control how many workflow executions can run simultaneously within a workspace. When the limit is reached, new executions are queued and admitted as running executions complete. Manual runs from the editor are not subject to these limits.
### File Storage
| Plan | Storage |

View File

@@ -25,7 +25,6 @@ const PRICING_TIERS: PricingTier[] = [
'5GB file storage',
'3 tables · 1,000 rows each',
'5 min execution limit',
'5 concurrent/workspace',
'7-day log retention',
'CLI/SDK/MCP Access',
],
@@ -43,7 +42,6 @@ const PRICING_TIERS: PricingTier[] = [
'50GB file storage',
'25 tables · 5,000 rows each',
'50 min execution · 150 runs/min',
'50 concurrent/workspace',
'Unlimited log retention',
'CLI/SDK/MCP Access',
],
@@ -61,7 +59,6 @@ const PRICING_TIERS: PricingTier[] = [
'500GB file storage',
'25 tables · 5,000 rows each',
'50 min execution · 300 runs/min',
'200 concurrent/workspace',
'Unlimited log retention',
'CLI/SDK/MCP Access',
],
@@ -78,7 +75,6 @@ const PRICING_TIERS: PricingTier[] = [
'Custom file storage',
'10,000 tables · 1M rows each',
'Custom execution limits',
'Custom concurrency limits',
'Unlimited log retention',
'SSO & SCIM · SOC2 & HIPAA',
'Self hosting · Dedicated support',

View File

@@ -5,7 +5,6 @@ import { and, desc, eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
import { getAccessibleCopilotChat, resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
import {
@@ -540,26 +539,10 @@ export async function POST(req: NextRequest) {
return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS })
}
const nsExecutionId = crypto.randomUUID()
const nsRunId = crypto.randomUUID()
if (actualChatId) {
await createRunSegment({
id: nsRunId,
executionId: nsExecutionId,
chatId: actualChatId,
userId: authenticatedUserId,
workflowId,
streamId: userMessageIdToUse,
}).catch(() => {})
}
const nonStreamingResult = await orchestrateCopilotStream(requestPayload, {
userId: authenticatedUserId,
workflowId,
chatId: actualChatId,
executionId: nsExecutionId,
runId: nsRunId,
goRoute: '/api/copilot',
autoExecuteTools: true,
interactive: true,

View File

@@ -1,160 +0,0 @@
/**
* @vitest-environment node
*/
import type { NextRequest } from 'next/server'
import { beforeEach, describe, expect, it, vi } from 'vitest'
const {
mockCheckHybridAuth,
mockGetDispatchJobRecord,
mockGetJobQueue,
mockVerifyWorkflowAccess,
mockGetWorkflowById,
} = vi.hoisted(() => ({
mockCheckHybridAuth: vi.fn(),
mockGetDispatchJobRecord: vi.fn(),
mockGetJobQueue: vi.fn(),
mockVerifyWorkflowAccess: vi.fn(),
mockGetWorkflowById: vi.fn(),
}))
vi.mock('@sim/logger', () => ({
createLogger: () => ({
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
}),
}))
vi.mock('@/lib/auth/hybrid', () => ({
checkHybridAuth: mockCheckHybridAuth,
}))
vi.mock('@/lib/core/async-jobs', () => ({
JOB_STATUS: {
PENDING: 'pending',
PROCESSING: 'processing',
COMPLETED: 'completed',
FAILED: 'failed',
},
getJobQueue: mockGetJobQueue,
}))
vi.mock('@/lib/core/workspace-dispatch/store', () => ({
getDispatchJobRecord: mockGetDispatchJobRecord,
}))
vi.mock('@/lib/core/utils/request', () => ({
generateRequestId: vi.fn().mockReturnValue('request-1'),
}))
vi.mock('@/socket/middleware/permissions', () => ({
verifyWorkflowAccess: mockVerifyWorkflowAccess,
}))
vi.mock('@/lib/workflows/utils', () => ({
getWorkflowById: mockGetWorkflowById,
}))
import { GET } from './route'
function createMockRequest(): NextRequest {
return {
headers: {
get: () => null,
},
} as NextRequest
}
describe('GET /api/jobs/[jobId]', () => {
beforeEach(() => {
vi.clearAllMocks()
mockCheckHybridAuth.mockResolvedValue({
success: true,
userId: 'user-1',
apiKeyType: undefined,
workspaceId: undefined,
})
mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: true })
mockGetWorkflowById.mockResolvedValue({
id: 'workflow-1',
workspaceId: 'workspace-1',
})
mockGetJobQueue.mockResolvedValue({
getJob: vi.fn().mockResolvedValue(null),
})
})
it('returns dispatcher-aware waiting status with metadata', async () => {
mockGetDispatchJobRecord.mockResolvedValue({
id: 'dispatch-1',
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: {},
metadata: {
workflowId: 'workflow-1',
},
priority: 10,
status: 'waiting',
createdAt: 1000,
admittedAt: 2000,
})
const response = await GET(createMockRequest(), {
params: Promise.resolve({ jobId: 'dispatch-1' }),
})
const body = await response.json()
expect(response.status).toBe(200)
expect(body.status).toBe('waiting')
expect(body.metadata.queueName).toBe('workflow-execution')
expect(body.metadata.lane).toBe('runtime')
expect(body.metadata.workspaceId).toBe('workspace-1')
})
it('returns completed output from dispatch state', async () => {
mockGetDispatchJobRecord.mockResolvedValue({
id: 'dispatch-2',
workspaceId: 'workspace-1',
lane: 'interactive',
queueName: 'workflow-execution',
bullmqJobName: 'direct-workflow-execution',
bullmqPayload: {},
metadata: {
workflowId: 'workflow-1',
},
priority: 1,
status: 'completed',
createdAt: 1000,
startedAt: 2000,
completedAt: 7000,
output: { success: true },
})
const response = await GET(createMockRequest(), {
params: Promise.resolve({ jobId: 'dispatch-2' }),
})
const body = await response.json()
expect(response.status).toBe(200)
expect(body.status).toBe('completed')
expect(body.output).toEqual({ success: true })
expect(body.metadata.duration).toBe(5000)
})
it('returns 404 when neither dispatch nor BullMQ job exists', async () => {
mockGetDispatchJobRecord.mockResolvedValue(null)
const response = await GET(createMockRequest(), {
params: Promise.resolve({ jobId: 'missing-job' }),
})
expect(response.status).toBe(404)
})
})

View File

@@ -1,10 +1,8 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { getJobQueue } from '@/lib/core/async-jobs'
import { getJobQueue, JOB_STATUS } from '@/lib/core/async-jobs'
import { generateRequestId } from '@/lib/core/utils/request'
import { presentDispatchOrJobStatus } from '@/lib/core/workspace-dispatch/status'
import { getDispatchJobRecord } from '@/lib/core/workspace-dispatch/store'
import { createErrorResponse } from '@/app/api/workflows/utils'
const logger = createLogger('TaskStatusAPI')
@@ -25,54 +23,68 @@ export async function GET(
const authenticatedUserId = authResult.userId
const dispatchJob = await getDispatchJobRecord(taskId)
const jobQueue = await getJobQueue()
const job = dispatchJob ? null : await jobQueue.getJob(taskId)
const job = await jobQueue.getJob(taskId)
if (!job && !dispatchJob) {
if (!job) {
return createErrorResponse('Task not found', 404)
}
const metadataToCheck = dispatchJob?.metadata ?? job?.metadata
if (metadataToCheck?.workflowId) {
if (job.metadata?.workflowId) {
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
const accessCheck = await verifyWorkflowAccess(
authenticatedUserId,
metadataToCheck.workflowId as string
job.metadata.workflowId as string
)
if (!accessCheck.hasAccess) {
logger.warn(`[${requestId}] Access denied to workflow ${metadataToCheck.workflowId}`)
logger.warn(`[${requestId}] Access denied to workflow ${job.metadata.workflowId}`)
return createErrorResponse('Access denied', 403)
}
if (authResult.apiKeyType === 'workspace' && authResult.workspaceId) {
const { getWorkflowById } = await import('@/lib/workflows/utils')
const workflow = await getWorkflowById(metadataToCheck.workflowId as string)
const workflow = await getWorkflowById(job.metadata.workflowId as string)
if (!workflow?.workspaceId || workflow.workspaceId !== authResult.workspaceId) {
return createErrorResponse('API key is not authorized for this workspace', 403)
}
}
} else if (metadataToCheck?.userId && metadataToCheck.userId !== authenticatedUserId) {
logger.warn(`[${requestId}] Access denied to user ${metadataToCheck.userId}`)
} else if (job.metadata?.userId && job.metadata.userId !== authenticatedUserId) {
logger.warn(`[${requestId}] Access denied to user ${job.metadata.userId}`)
return createErrorResponse('Access denied', 403)
} else if (!metadataToCheck?.userId && !metadataToCheck?.workflowId) {
} else if (!job.metadata?.userId && !job.metadata?.workflowId) {
logger.warn(`[${requestId}] Access denied to job ${taskId}`)
return createErrorResponse('Access denied', 403)
}
const presented = presentDispatchOrJobStatus(dispatchJob, job)
const mappedStatus = job.status === JOB_STATUS.PENDING ? 'queued' : job.status
const response: any = {
success: true,
taskId,
status: presented.status,
metadata: presented.metadata,
status: mappedStatus,
metadata: {
startedAt: job.startedAt,
},
}
if (presented.output !== undefined) response.output = presented.output
if (presented.error !== undefined) response.error = presented.error
if (presented.estimatedDuration !== undefined) {
response.estimatedDuration = presented.estimatedDuration
if (job.status === JOB_STATUS.COMPLETED) {
response.output = job.output
response.metadata.completedAt = job.completedAt
if (job.startedAt && job.completedAt) {
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
}
}
if (job.status === JOB_STATUS.FAILED) {
response.error = job.error
response.metadata.completedAt = job.completedAt
if (job.startedAt && job.completedAt) {
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
}
}
if (job.status === JOB_STATUS.PROCESSING || job.status === JOB_STATUS.PENDING) {
response.estimatedDuration = 300000
}
return NextResponse.json(response)

View File

@@ -18,7 +18,6 @@ import { eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { validateOAuthAccessToken } from '@/lib/auth/oauth-token'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
import { ORCHESTRATION_TIMEOUT_MS, SIM_AGENT_API_URL } from '@/lib/copilot/constants'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import { orchestrateSubagentStream } from '@/lib/copilot/orchestrator/subagent'
@@ -728,25 +727,10 @@ async function handleBuildToolCall(
chatId,
}
const executionId = crypto.randomUUID()
const runId = crypto.randomUUID()
const messageId = requestPayload.messageId as string
await createRunSegment({
id: runId,
executionId,
chatId,
userId,
workflowId: resolved.workflowId,
streamId: messageId,
}).catch(() => {})
const result = await orchestrateCopilotStream(requestPayload, {
userId,
workflowId: resolved.workflowId,
chatId,
executionId,
runId,
goRoute: '/api/mcp',
autoExecuteTools: true,
timeout: ORCHESTRATION_TIMEOUT_MS,

View File

@@ -2,7 +2,6 @@ import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { checkInternalAuth } from '@/lib/auth/hybrid'
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat-payload'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
@@ -72,24 +71,10 @@ export async function POST(req: NextRequest) {
...(userPermission ? { userPermission } : {}),
}
const executionId = crypto.randomUUID()
const runId = crypto.randomUUID()
await createRunSegment({
id: runId,
executionId,
chatId: effectiveChatId,
userId,
workspaceId,
streamId: messageId,
}).catch(() => {})
const result = await orchestrateCopilotStream(requestPayload, {
userId,
workspaceId,
chatId: effectiveChatId,
executionId,
runId,
goRoute: '/api/mothership/execute',
autoExecuteTools: true,
interactive: false,

View File

@@ -9,12 +9,10 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
const {
mockVerifyCronAuth,
mockExecuteScheduleJob,
mockExecuteJobInline,
mockFeatureFlags,
mockDbReturning,
mockDbUpdate,
mockEnqueue,
mockEnqueueWorkspaceDispatch,
mockStartJob,
mockCompleteJob,
mockMarkJobFailed,
@@ -24,7 +22,6 @@ const {
const mockDbSet = vi.fn().mockReturnValue({ where: mockDbWhere })
const mockDbUpdate = vi.fn().mockReturnValue({ set: mockDbSet })
const mockEnqueue = vi.fn().mockResolvedValue('job-id-1')
const mockEnqueueWorkspaceDispatch = vi.fn().mockResolvedValue('job-id-1')
const mockStartJob = vi.fn().mockResolvedValue(undefined)
const mockCompleteJob = vi.fn().mockResolvedValue(undefined)
const mockMarkJobFailed = vi.fn().mockResolvedValue(undefined)
@@ -32,7 +29,6 @@ const {
return {
mockVerifyCronAuth: vi.fn().mockReturnValue(null),
mockExecuteScheduleJob: vi.fn().mockResolvedValue(undefined),
mockExecuteJobInline: vi.fn().mockResolvedValue(undefined),
mockFeatureFlags: {
isTriggerDevEnabled: false,
isHosted: false,
@@ -42,7 +38,6 @@ const {
mockDbReturning,
mockDbUpdate,
mockEnqueue,
mockEnqueueWorkspaceDispatch,
mockStartJob,
mockCompleteJob,
mockMarkJobFailed,
@@ -55,8 +50,6 @@ vi.mock('@/lib/auth/internal', () => ({
vi.mock('@/background/schedule-execution', () => ({
executeScheduleJob: mockExecuteScheduleJob,
executeJobInline: mockExecuteJobInline,
releaseScheduleLock: vi.fn().mockResolvedValue(undefined),
}))
vi.mock('@/lib/core/config/feature-flags', () => mockFeatureFlags)
@@ -75,22 +68,6 @@ vi.mock('@/lib/core/async-jobs', () => ({
shouldExecuteInline: vi.fn().mockReturnValue(false),
}))
vi.mock('@/lib/core/bullmq', () => ({
isBullMQEnabled: vi.fn().mockReturnValue(true),
createBullMQJobData: vi.fn((payload: unknown) => ({ payload })),
}))
vi.mock('@/lib/core/workspace-dispatch', () => ({
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
}))
vi.mock('@/lib/workflows/utils', () => ({
getWorkflowById: vi.fn().mockResolvedValue({
id: 'workflow-1',
workspaceId: 'workspace-1',
}),
}))
vi.mock('drizzle-orm', () => ({
and: vi.fn((...conditions: unknown[]) => ({ type: 'and', conditions })),
eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })),
@@ -165,18 +142,6 @@ const MULTIPLE_SCHEDULES = [
},
]
const SINGLE_JOB = [
{
id: 'job-1',
cronExpression: '0 * * * *',
failedCount: 0,
lastQueuedAt: undefined,
sourceUserId: 'user-1',
sourceWorkspaceId: 'workspace-1',
sourceType: 'job',
},
]
function createMockRequest(): NextRequest {
const mockHeaders = new Map([
['authorization', 'Bearer test-cron-secret'],
@@ -246,44 +211,30 @@ describe('Scheduled Workflow Execution API Route', () => {
expect(data).toHaveProperty('executedCount', 2)
})
it('should queue mothership jobs to BullMQ when available', async () => {
mockDbReturning.mockReturnValueOnce([]).mockReturnValueOnce(SINGLE_JOB)
const response = await GET(createMockRequest())
expect(response.status).toBe(200)
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
expect.objectContaining({
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'mothership-job-execution',
bullmqJobName: 'mothership-job-execution',
bullmqPayload: {
payload: {
scheduleId: 'job-1',
cronExpression: '0 * * * *',
failedCount: 0,
now: expect.any(String),
},
},
})
)
expect(mockExecuteJobInline).not.toHaveBeenCalled()
})
it('should enqueue preassigned correlation metadata for schedules', async () => {
mockDbReturning.mockReturnValue(SINGLE_SCHEDULE)
const response = await GET(createMockRequest())
expect(response.status).toBe(200)
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
expect(mockEnqueue).toHaveBeenCalledWith(
'schedule-execution',
expect.objectContaining({
id: 'schedule-execution-1',
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'schedule-execution',
bullmqJobName: 'schedule-execution',
scheduleId: 'schedule-1',
workflowId: 'workflow-1',
executionId: 'schedule-execution-1',
requestId: 'test-request-id',
correlation: {
executionId: 'schedule-execution-1',
requestId: 'test-request-id',
source: 'schedule',
workflowId: 'workflow-1',
scheduleId: 'schedule-1',
triggerType: 'schedule',
scheduledFor: '2025-01-01T00:00:00.000Z',
},
}),
{
metadata: {
workflowId: 'workflow-1',
correlation: {
@@ -296,7 +247,7 @@ describe('Scheduled Workflow Execution API Route', () => {
scheduledFor: '2025-01-01T00:00:00.000Z',
},
},
})
}
)
})
})

View File

@@ -5,9 +5,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { verifyCronAuth } from '@/lib/auth/internal'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { generateRequestId } from '@/lib/core/utils/request'
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import {
executeJobInline,
executeScheduleJob,
@@ -75,8 +73,6 @@ export async function GET(request: NextRequest) {
cronExpression: workflowSchedule.cronExpression,
failedCount: workflowSchedule.failedCount,
lastQueuedAt: workflowSchedule.lastQueuedAt,
sourceWorkspaceId: workflowSchedule.sourceWorkspaceId,
sourceUserId: workflowSchedule.sourceUserId,
sourceType: workflowSchedule.sourceType,
})
@@ -115,40 +111,9 @@ export async function GET(request: NextRequest) {
}
try {
const { getWorkflowById } = await import('@/lib/workflows/utils')
const resolvedWorkflow = schedule.workflowId
? await getWorkflowById(schedule.workflowId)
: null
const resolvedWorkspaceId = resolvedWorkflow?.workspaceId
let jobId: string
if (isBullMQEnabled()) {
if (!resolvedWorkspaceId) {
throw new Error(
`Missing workspace for scheduled workflow ${schedule.workflowId}; refusing to bypass workspace admission`
)
}
jobId = await enqueueWorkspaceDispatch({
id: executionId,
workspaceId: resolvedWorkspaceId,
lane: 'runtime',
queueName: 'schedule-execution',
bullmqJobName: 'schedule-execution',
bullmqPayload: createBullMQJobData(payload, {
workflowId: schedule.workflowId ?? undefined,
correlation,
}),
metadata: {
workflowId: schedule.workflowId ?? undefined,
correlation,
},
})
} else {
jobId = await jobQueue.enqueue('schedule-execution', payload, {
metadata: { workflowId: schedule.workflowId ?? undefined, correlation },
})
}
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
metadata: { workflowId: schedule.workflowId ?? undefined, correlation },
})
logger.info(
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
)
@@ -200,7 +165,7 @@ export async function GET(request: NextRequest) {
}
})
// Mothership jobs use BullMQ when available, otherwise direct inline execution.
// Jobs always execute inline (no TriggerDev)
const jobPromises = dueJobs.map(async (job) => {
const queueTime = job.lastQueuedAt ?? queuedAt
const payload = {
@@ -211,24 +176,7 @@ export async function GET(request: NextRequest) {
}
try {
if (isBullMQEnabled()) {
if (!job.sourceWorkspaceId || !job.sourceUserId) {
throw new Error(`Mothership job ${job.id} is missing workspace/user ownership`)
}
await enqueueWorkspaceDispatch({
workspaceId: job.sourceWorkspaceId!,
lane: 'runtime',
queueName: 'mothership-job-execution',
bullmqJobName: 'mothership-job-execution',
bullmqPayload: createBullMQJobData(payload),
metadata: {
userId: job.sourceUserId,
},
})
} else {
await executeJobInline(payload)
}
await executeJobInline(payload)
} catch (error) {
logger.error(`[${requestId}] Job execution failed for ${job.id}`, {
error: error instanceof Error ? error.message : String(error),

View File

@@ -1,7 +1,6 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
@@ -105,24 +104,10 @@ export async function POST(req: NextRequest) {
chatId,
}
const executionId = crypto.randomUUID()
const runId = crypto.randomUUID()
await createRunSegment({
id: runId,
executionId,
chatId,
userId: auth.userId,
workflowId: resolved.workflowId,
streamId: messageId,
}).catch(() => {})
const result = await orchestrateCopilotStream(requestPayload, {
userId: auth.userId,
workflowId: resolved.workflowId,
chatId,
executionId,
runId,
goRoute: '/api/mcp',
autoExecuteTools: parsed.autoExecuteTools,
timeout: parsed.timeout,

View File

@@ -1,8 +1,6 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { admissionRejectedResponse, tryAdmit } from '@/lib/core/admission/gate'
import { generateRequestId } from '@/lib/core/utils/request'
import { DispatchQueueFullError } from '@/lib/core/workspace-dispatch'
import {
checkWebhookPreprocessing,
findAllWebhooksForPath,
@@ -43,25 +41,10 @@ export async function POST(
request: NextRequest,
{ params }: { params: Promise<{ path: string }> }
) {
const ticket = tryAdmit()
if (!ticket) {
return admissionRejectedResponse()
}
try {
return await handleWebhookPost(request, params)
} finally {
ticket.release()
}
}
async function handleWebhookPost(
request: NextRequest,
params: Promise<{ path: string }>
): Promise<NextResponse> {
const requestId = generateRequestId()
const { path } = await params
// Handle provider challenges before body parsing (Microsoft Graph validationToken, etc.)
const earlyChallenge = await handleProviderChallenges({}, request, requestId, path)
if (earlyChallenge) {
return earlyChallenge
@@ -157,30 +140,17 @@ async function handleWebhookPost(
continue
}
try {
const response = await queueWebhookExecution(foundWebhook, foundWorkflow, body, request, {
requestId,
path,
actorUserId: preprocessResult.actorUserId,
executionId: preprocessResult.executionId,
correlation: preprocessResult.correlation,
})
responses.push(response)
} catch (error) {
if (error instanceof DispatchQueueFullError) {
return NextResponse.json(
{
error: 'Service temporarily at capacity',
message: error.message,
retryAfterSeconds: 10,
},
{ status: 503, headers: { 'Retry-After': '10' } }
)
}
throw error
}
const response = await queueWebhookExecution(foundWebhook, foundWorkflow, body, request, {
requestId,
path,
actorUserId: preprocessResult.actorUserId,
executionId: preprocessResult.executionId,
correlation: preprocessResult.correlation,
})
responses.push(response)
}
// Return the last successful response, or a combined response for multiple webhooks
if (responses.length === 0) {
return new NextResponse('No webhooks processed successfully', { status: 500 })
}

View File

@@ -10,18 +10,15 @@ const {
mockAuthorizeWorkflowByWorkspacePermission,
mockPreprocessExecution,
mockEnqueue,
mockEnqueueWorkspaceDispatch,
} = vi.hoisted(() => ({
mockCheckHybridAuth: vi.fn(),
mockAuthorizeWorkflowByWorkspacePermission: vi.fn(),
mockPreprocessExecution: vi.fn(),
mockEnqueue: vi.fn().mockResolvedValue('job-123'),
mockEnqueueWorkspaceDispatch: vi.fn().mockResolvedValue('job-123'),
}))
vi.mock('@/lib/auth/hybrid', () => ({
checkHybridAuth: mockCheckHybridAuth,
hasExternalApiCredentials: vi.fn().mockReturnValue(true),
AuthType: {
SESSION: 'session',
API_KEY: 'api_key',
@@ -47,16 +44,6 @@ vi.mock('@/lib/core/async-jobs', () => ({
markJobFailed: vi.fn(),
}),
shouldExecuteInline: vi.fn().mockReturnValue(false),
shouldUseBullMQ: vi.fn().mockReturnValue(true),
}))
vi.mock('@/lib/core/bullmq', () => ({
createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({ payload, metadata })),
}))
vi.mock('@/lib/core/workspace-dispatch', () => ({
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
waitForDispatchJob: vi.fn(),
}))
vi.mock('@/lib/core/utils/request', () => ({
@@ -145,13 +132,22 @@ describe('workflow execute async route', () => {
expect(response.status).toBe(202)
expect(body.executionId).toBe('execution-123')
expect(body.jobId).toBe('job-123')
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
expect(mockEnqueue).toHaveBeenCalledWith(
'workflow-execution',
expect.objectContaining({
id: 'execution-123',
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
workflowId: 'workflow-1',
userId: 'actor-1',
executionId: 'execution-123',
requestId: 'req-12345678',
correlation: {
executionId: 'execution-123',
requestId: 'req-12345678',
source: 'workflow',
workflowId: 'workflow-1',
triggerType: 'manual',
},
}),
{
metadata: {
workflowId: 'workflow-1',
userId: 'actor-1',
@@ -163,7 +159,7 @@ describe('workflow execute async route', () => {
triggerType: 'manual',
},
},
})
}
)
})
})

View File

@@ -2,10 +2,8 @@ import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { AuthType, checkHybridAuth, hasExternalApiCredentials } from '@/lib/auth/hybrid'
import { admissionRejectedResponse, tryAdmit } from '@/lib/core/admission/gate'
import { getJobQueue, shouldExecuteInline, shouldUseBullMQ } from '@/lib/core/async-jobs'
import { createBullMQJobData } from '@/lib/core/bullmq'
import { AuthType, checkHybridAuth } from '@/lib/auth/hybrid'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import {
createTimeoutAbortController,
getTimeoutErrorMessage,
@@ -14,13 +12,6 @@ import {
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import {
DispatchQueueFullError,
enqueueWorkspaceDispatch,
type WorkspaceDispatchLane,
waitForDispatchJob,
} from '@/lib/core/workspace-dispatch'
import { createBufferedExecutionStream } from '@/lib/execution/buffered-stream'
import {
buildNextCallChain,
parseCallChain,
@@ -42,11 +33,6 @@ import {
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import {
DIRECT_WORKFLOW_JOB_NAME,
type QueuedWorkflowExecutionPayload,
type QueuedWorkflowExecutionResult,
} from '@/lib/workflows/executor/queued-workflow-execution'
import {
loadDeployedWorkflowState,
loadWorkflowFromNormalizedTables,
@@ -118,8 +104,6 @@ const ExecuteWorkflowSchema = z.object({
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
const INLINE_TRIGGER_TYPES = new Set<CoreTriggerType>(['manual', 'workflow'])
function resolveOutputIds(
selectedOutputs: string[] | undefined,
blocks: Record<string, any>
@@ -177,7 +161,6 @@ type AsyncExecutionParams = {
requestId: string
workflowId: string
userId: string
workspaceId: string
input: any
triggerType: CoreTriggerType
executionId: string
@@ -185,8 +168,7 @@ type AsyncExecutionParams = {
}
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
const { requestId, workflowId, userId, workspaceId, input, triggerType, executionId, callChain } =
params
const { requestId, workflowId, userId, input, triggerType, executionId, callChain } = params
const correlation = {
executionId,
@@ -199,7 +181,6 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
const payload: WorkflowExecutionPayload = {
workflowId,
userId,
workspaceId,
input,
triggerType,
executionId,
@@ -209,42 +190,22 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
}
try {
const useBullMQ = shouldUseBullMQ()
const jobQueue = useBullMQ ? null : await getJobQueue()
const jobId = useBullMQ
? await enqueueWorkspaceDispatch({
id: executionId,
workspaceId,
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: createBullMQJobData(payload, {
workflowId,
userId,
correlation,
}),
metadata: {
workflowId,
userId,
correlation,
},
})
: await jobQueue!.enqueue('workflow-execution', payload, {
metadata: { workflowId, userId, correlation },
})
const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('workflow-execution', payload, {
metadata: { workflowId, userId, correlation },
})
logger.info(`[${requestId}] Queued async workflow execution`, {
workflowId,
jobId,
})
if (shouldExecuteInline() && jobQueue) {
const inlineJobQueue = jobQueue
if (shouldExecuteInline()) {
void (async () => {
try {
await inlineJobQueue.startJob(jobId)
await jobQueue.startJob(jobId)
const output = await executeWorkflowJob(payload)
await inlineJobQueue.completeJob(jobId, output)
await jobQueue.completeJob(jobId, output)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logger.error(`[${requestId}] Async workflow execution failed`, {
@@ -252,7 +213,7 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
error: errorMessage,
})
try {
await inlineJobQueue.markJobFailed(jobId, errorMessage)
await jobQueue.markJobFailed(jobId, errorMessage)
} catch (markFailedError) {
logger.error(`[${requestId}] Failed to mark job as failed`, {
jobId,
@@ -278,17 +239,6 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
{ status: 202 }
)
} catch (error: any) {
if (error instanceof DispatchQueueFullError) {
return NextResponse.json(
{
error: 'Service temporarily at capacity',
message: error.message,
retryAfterSeconds: 10,
},
{ status: 503, headers: { 'Retry-After': '10' } }
)
}
logger.error(`[${requestId}] Failed to queue async execution`, error)
return NextResponse.json(
{ error: `Failed to queue async execution: ${error.message}` },
@@ -297,31 +247,6 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
}
}
async function enqueueDirectWorkflowExecution(
payload: QueuedWorkflowExecutionPayload,
priority: number,
lane: WorkspaceDispatchLane
) {
return enqueueWorkspaceDispatch({
id: payload.metadata.executionId,
workspaceId: payload.metadata.workspaceId,
lane,
queueName: 'workflow-execution',
bullmqJobName: DIRECT_WORKFLOW_JOB_NAME,
bullmqPayload: createBullMQJobData(payload, {
workflowId: payload.metadata.workflowId,
userId: payload.metadata.userId,
correlation: payload.metadata.correlation,
}),
metadata: {
workflowId: payload.metadata.workflowId,
userId: payload.metadata.userId,
correlation: payload.metadata.correlation,
},
priority,
})
}
/**
* POST /api/workflows/[id]/execute
*
@@ -329,27 +254,6 @@ async function enqueueDirectWorkflowExecution(
* Supports both SSE streaming (for interactive/manual runs) and direct JSON responses (for background jobs).
*/
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const isSessionRequest = req.headers.has('cookie') && !hasExternalApiCredentials(req.headers)
if (isSessionRequest) {
return handleExecutePost(req, params)
}
const ticket = tryAdmit()
if (!ticket) {
return admissionRejectedResponse()
}
try {
return await handleExecutePost(req, params)
} finally {
ticket.release()
}
}
async function handleExecutePost(
req: NextRequest,
params: Promise<{ id: string }>
): Promise<NextResponse | Response> {
const requestId = generateRequestId()
const { id: workflowId } = await params
@@ -680,7 +584,6 @@ async function handleExecutePost(
requestId,
workflowId,
userId: actorUserId,
workspaceId,
input,
triggerType: loggingTriggerType,
executionId,
@@ -773,116 +676,30 @@ async function handleExecutePost(
if (!enableSSE) {
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
const metadata: ExecutionMetadata = {
requestId,
executionId,
workflowId,
workspaceId,
userId: actorUserId,
sessionUserId: isClientSession ? userId : undefined,
workflowUserId: workflow.userId,
triggerType,
useDraftState: shouldUseDraftState,
startTime: new Date().toISOString(),
isClientSession,
enforceCredentialAccess: useAuthenticatedUserAsActor,
workflowStateOverride: effectiveWorkflowStateOverride,
callChain,
}
const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}
if (shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)) {
try {
const dispatchJobId = await enqueueDirectWorkflowExecution(
{
workflow,
metadata,
input: processedInput,
variables: executionVariables,
selectedOutputs,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
timeoutMs: preprocessResult.executionTimeout?.sync,
runFromBlock: resolvedRunFromBlock,
},
5,
'interactive'
)
const resultRecord = await waitForDispatchJob(
dispatchJobId,
(preprocessResult.executionTimeout?.sync ?? 300000) + 30000
)
if (resultRecord.status === 'failed') {
return NextResponse.json(
{
success: false,
executionId,
error: resultRecord.error ?? 'Workflow execution failed',
},
{ status: 500 }
)
}
const result = resultRecord.output as QueuedWorkflowExecutionResult
const resultForResponseBlock = {
success: result.success,
logs: result.logs,
output: result.output,
}
if (
auth.authType !== AuthType.INTERNAL_JWT &&
workflowHasResponseBlock(resultForResponseBlock)
) {
return createHttpResponseFromBlock(resultForResponseBlock)
}
return NextResponse.json(
{
success: result.success,
executionId,
output: result.output,
error: result.error,
metadata: result.metadata,
},
{ status: result.statusCode ?? 200 }
)
} catch (error: unknown) {
if (error instanceof DispatchQueueFullError) {
return NextResponse.json(
{
error: 'Service temporarily at capacity',
message: error.message,
retryAfterSeconds: 10,
},
{ status: 503, headers: { 'Retry-After': '10' } }
)
}
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Queued non-SSE execution failed: ${errorMessage}`)
return NextResponse.json(
{
success: false,
error: errorMessage,
},
{ status: 500 }
)
}
}
const timeoutController = createTimeoutAbortController(
preprocessResult.executionTimeout?.sync
)
try {
const metadata: ExecutionMetadata = {
requestId,
executionId,
workflowId,
workspaceId,
userId: actorUserId,
sessionUserId: isClientSession ? userId : undefined,
workflowUserId: workflow.userId,
triggerType,
useDraftState: shouldUseDraftState,
startTime: new Date().toISOString(),
isClientSession,
enforceCredentialAccess: useAuthenticatedUserAsActor,
workflowStateOverride: effectiveWorkflowStateOverride,
callChain,
}
const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}
const snapshot = new ExecutionSnapshot(
metadata,
workflow,
@@ -992,53 +809,6 @@ async function handleExecutePost(
}
if (shouldUseDraftState) {
const shouldDispatchViaQueue = shouldUseBullMQ() && !INLINE_TRIGGER_TYPES.has(triggerType)
if (shouldDispatchViaQueue) {
const metadata: ExecutionMetadata = {
requestId,
executionId,
workflowId,
workspaceId,
userId: actorUserId,
sessionUserId: isClientSession ? userId : undefined,
workflowUserId: workflow.userId,
triggerType,
useDraftState: shouldUseDraftState,
startTime: new Date().toISOString(),
isClientSession,
enforceCredentialAccess: useAuthenticatedUserAsActor,
workflowStateOverride: effectiveWorkflowStateOverride,
callChain,
}
const executionVariables = cachedWorkflowData?.variables ?? workflow.variables ?? {}
await enqueueDirectWorkflowExecution(
{
workflow,
metadata,
input: processedInput,
variables: executionVariables,
selectedOutputs,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
timeoutMs: preprocessResult.executionTimeout?.sync,
runFromBlock: resolvedRunFromBlock,
streamEvents: true,
},
1,
'interactive'
)
return new NextResponse(createBufferedExecutionStream(executionId), {
headers: {
...SSE_HEADERS,
'X-Execution-Id': executionId,
},
})
}
logger.info(`[${requestId}] Using SSE console log streaming (manual execution)`)
} else {
logger.info(`[${requestId}] Using streaming API response`)
@@ -1507,17 +1277,6 @@ async function handleExecutePost(
},
})
} catch (error: any) {
if (error instanceof DispatchQueueFullError) {
return NextResponse.json(
{
error: 'Service temporarily at capacity',
message: error.message,
retryAfterSeconds: 10,
},
{ status: 503, headers: { 'Retry-After': '10' } }
)
}
logger.error(`[${requestId}] Failed to start workflow execution:`, error)
return NextResponse.json(
{ error: error.message || 'Failed to start workflow execution' },

View File

@@ -8,7 +8,7 @@ import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { getNextWorkflowColor } from '@/lib/workflows/colors'
import { deduplicateWorkflowName, listWorkflows, type WorkflowScope } from '@/lib/workflows/utils'
import { listWorkflows, type WorkflowScope } from '@/lib/workflows/utils'
import { getUserEntityPermissions, workspaceExists } from '@/lib/workspaces/permissions/utils'
import { verifyWorkspaceMembership } from '@/app/api/workflows/utils'
@@ -25,7 +25,6 @@ const CreateWorkflowSchema = z.object({
workspaceId: z.string().optional(),
folderId: z.string().nullable().optional(),
sortOrder: z.number().int().optional(),
deduplicate: z.boolean().optional(),
})
// GET /api/workflows - Get workflows for user (optionally filtered by workspaceId)
@@ -127,13 +126,12 @@ export async function POST(req: NextRequest) {
const body = await req.json()
const {
id: clientId,
name: requestedName,
name,
description,
color,
workspaceId,
folderId,
sortOrder: providedSortOrder,
deduplicate,
} = CreateWorkflowSchema.parse(body)
if (!workspaceId) {
@@ -164,6 +162,19 @@ export async function POST(req: NextRequest) {
logger.info(`[${requestId}] Creating workflow ${workflowId} for user ${userId}`)
import('@/lib/core/telemetry')
.then(({ PlatformEvents }) => {
PlatformEvents.workflowCreated({
workflowId,
name,
workspaceId: workspaceId || undefined,
folderId: folderId || undefined,
})
})
.catch(() => {
// Silently fail
})
let sortOrder: number
if (providedSortOrder !== undefined) {
sortOrder = providedSortOrder
@@ -203,49 +214,30 @@ export async function POST(req: NextRequest) {
sortOrder = minSortOrder != null ? minSortOrder - 1 : 0
}
let name = requestedName
const duplicateConditions = [
eq(workflow.workspaceId, workspaceId),
isNull(workflow.archivedAt),
eq(workflow.name, name),
]
if (deduplicate) {
name = await deduplicateWorkflowName(requestedName, workspaceId, folderId)
if (folderId) {
duplicateConditions.push(eq(workflow.folderId, folderId))
} else {
const duplicateConditions = [
eq(workflow.workspaceId, workspaceId),
isNull(workflow.archivedAt),
eq(workflow.name, requestedName),
]
if (folderId) {
duplicateConditions.push(eq(workflow.folderId, folderId))
} else {
duplicateConditions.push(isNull(workflow.folderId))
}
const [duplicateWorkflow] = await db
.select({ id: workflow.id })
.from(workflow)
.where(and(...duplicateConditions))
.limit(1)
if (duplicateWorkflow) {
return NextResponse.json(
{ error: `A workflow named "${requestedName}" already exists in this folder` },
{ status: 409 }
)
}
duplicateConditions.push(isNull(workflow.folderId))
}
import('@/lib/core/telemetry')
.then(({ PlatformEvents }) => {
PlatformEvents.workflowCreated({
workflowId,
name,
workspaceId: workspaceId || undefined,
folderId: folderId || undefined,
})
})
.catch(() => {
// Silently fail
})
const [duplicateWorkflow] = await db
.select({ id: workflow.id })
.from(workflow)
.where(and(...duplicateConditions))
.limit(1)
if (duplicateWorkflow) {
return NextResponse.json(
{ error: `A workflow named "${name}" already exists in this folder` },
{ status: 409 }
)
}
await db.insert(workflow).values({
id: workflowId,

View File

@@ -82,7 +82,6 @@ const TOOL_ICONS: Record<MothershipToolName | SubagentName | 'mothership', IconC
create_job: Calendar,
manage_job: Calendar,
update_job_history: Calendar,
job_respond: Calendar,
// Management
manage_mcp_tool: Settings,
manage_skill: Asterisk,

View File

@@ -54,7 +54,6 @@ export function Home({ chatId }: HomeProps = {}) {
description,
color,
workspaceId,
deduplicate: true,
}),
})

View File

@@ -1213,30 +1213,31 @@ export function useChat(
}
flush()
if (shouldOpenGenericResource(name)) {
if (!genericEntryMap.has(id)) {
const entryIdx = appendGenericEntry({
toolCallId: id,
toolName: name,
displayTitle: displayTitle ?? name,
status: 'executing',
params: args,
})
genericEntryMap.set(id, entryIdx)
const opened = addResource({ type: 'generic', id: 'results', title: 'Results' })
if (opened) onResourceEventRef.current?.()
else setActiveResourceId('results')
} else {
const entryIdx = genericEntryMap.get(id)
if (entryIdx !== undefined) {
updateGenericEntry(entryIdx, {
toolName: name,
...(displayTitle && { displayTitle }),
...(args && { params: args }),
})
}
}
}
// TODO: Uncomment when rich UI for Results tab is ready
// if (shouldOpenGenericResource(name)) {
// if (!genericEntryMap.has(id)) {
// const entryIdx = appendGenericEntry({
// toolCallId: id,
// toolName: name,
// displayTitle: displayTitle ?? name,
// status: 'executing',
// params: args,
// })
// genericEntryMap.set(id, entryIdx)
// const opened = addResource({ type: 'generic', id: 'results', title: 'Results' })
// if (opened) onResourceEventRef.current?.()
// else setActiveResourceId('results')
// } else {
// const entryIdx = genericEntryMap.get(id)
// if (entryIdx !== undefined) {
// updateGenericEntry(entryIdx, {
// toolName: name,
// ...(displayTitle && { displayTitle }),
// ...(args && { params: args }),
// })
// }
// }
// }
if (
parsed.type === 'tool_call' &&
@@ -1333,17 +1334,18 @@ export function useChat(
flush()
}
if (toolName && shouldOpenGenericResource(toolName)) {
const entryIdx = genericEntryMap.get(id)
if (entryIdx !== undefined) {
const entry = genericResourceDataRef.current.entries[entryIdx]
if (entry) {
updateGenericEntry(entryIdx, {
streamingArgs: (entry.streamingArgs ?? '') + delta,
})
}
}
}
// TODO: Uncomment when rich UI for Results tab is ready
// if (toolName && shouldOpenGenericResource(toolName)) {
// const entryIdx = genericEntryMap.get(id)
// if (entryIdx !== undefined) {
// const entry = genericResourceDataRef.current.entries[entryIdx]
// if (entry) {
// updateGenericEntry(entryIdx, {
// streamingArgs: (entry.streamingArgs ?? '') + delta,
// })
// }
// }
// }
break
}
@@ -1452,32 +1454,33 @@ export function useChat(
}
}
if (
shouldOpenGenericResource(tc.name) ||
(isDeferredResourceTool(tc.name) && extractedResources.length === 0)
) {
const entryIdx = genericEntryMap.get(id)
if (entryIdx !== undefined) {
updateGenericEntry(entryIdx, {
status: tc.status,
result: tc.result ?? undefined,
streamingArgs: undefined,
})
} else {
const newIdx = appendGenericEntry({
toolCallId: id,
toolName: tc.name,
displayTitle: tc.displayTitle ?? tc.name,
status: tc.status,
params: toolArgsMap.get(id) as Record<string, unknown> | undefined,
result: tc.result ?? undefined,
})
genericEntryMap.set(id, newIdx)
if (addResource({ type: 'generic', id: 'results', title: 'Results' })) {
onResourceEventRef.current?.()
}
}
}
// TODO: Uncomment when rich UI for Results tab is ready
// if (
// shouldOpenGenericResource(tc.name) ||
// (isDeferredResourceTool(tc.name) && extractedResources.length === 0)
// ) {
// const entryIdx = genericEntryMap.get(id)
// if (entryIdx !== undefined) {
// updateGenericEntry(entryIdx, {
// status: tc.status,
// result: tc.result ?? undefined,
// streamingArgs: undefined,
// })
// } else {
// const newIdx = appendGenericEntry({
// toolCallId: id,
// toolName: tc.name,
// displayTitle: tc.displayTitle ?? tc.name,
// status: tc.status,
// params: toolArgsMap.get(id) as Record<string, unknown> | undefined,
// result: tc.result ?? undefined,
// })
// genericEntryMap.set(id, newIdx)
// if (addResource({ type: 'generic', id: 'results', title: 'Results' })) {
// onResourceEventRef.current?.()
// }
// }
// }
}
break
@@ -1574,12 +1577,13 @@ export function useChat(
}
flush()
if (toolCallName && shouldOpenGenericResource(toolCallName)) {
const entryIdx = genericEntryMap.get(id)
if (entryIdx !== undefined) {
updateGenericEntry(entryIdx, { status: 'error', streamingArgs: undefined })
}
}
// TODO: Uncomment when rich UI for Results tab is ready
// if (toolCallName && shouldOpenGenericResource(toolCallName)) {
// const entryIdx = genericEntryMap.get(id)
// if (entryIdx !== undefined) {
// updateGenericEntry(entryIdx, { status: 'error', streamingArgs: undefined })
// }
// }
}
break
}

View File

@@ -98,7 +98,6 @@ export type MothershipToolName =
| 'create_job'
| 'complete_job'
| 'update_job_history'
| 'job_respond'
| 'download_to_workspace_file'
| 'materialize_file'
| 'context_write'
@@ -394,7 +393,6 @@ export const TOOL_UI_METADATA: Record<MothershipToolName, ToolUIMetadata> = {
create_job: { title: 'Creating job', phaseLabel: 'Resource', phase: 'resource' },
manage_job: { title: 'Updating job', phaseLabel: 'Management', phase: 'management' },
update_job_history: { title: 'Updating job', phaseLabel: 'Management', phase: 'management' },
job_respond: { title: 'Explaining job scheduled', phaseLabel: 'Execution', phase: 'execution' },
// Management
manage_mcp_tool: { title: 'Updating integration', phaseLabel: 'Management', phase: 'management' },
manage_skill: { title: 'Updating skill', phaseLabel: 'Management', phase: 'management' },

View File

@@ -56,7 +56,6 @@ export function useImportWorkflow({ workspaceId }: UseImportWorkflowProps) {
workspaceId,
folderId,
sortOrder,
deduplicate: true,
}),
})

View File

@@ -176,7 +176,6 @@ export function useImportWorkspace({ onSuccess }: UseImportWorkspaceProps = {})
color: workflowColor,
workspaceId: newWorkspace.id,
folderId: targetFolderId,
deduplicate: true,
}),
})

View File

@@ -303,7 +303,6 @@ async function runWorkflowExecution({
export type ScheduleExecutionPayload = {
scheduleId: string
workflowId: string
workspaceId?: string
executionId?: string
requestId?: string
correlation?: AsyncExecutionCorrelation

View File

@@ -36,7 +36,6 @@ export function buildWorkflowCorrelation(
export type WorkflowExecutionPayload = {
workflowId: string
userId: string
workspaceId?: string
input?: any
triggerType?: CoreTriggerType
executionId?: string

View File

@@ -1,5 +1,5 @@
import { createHmac } from 'crypto'
import { db, workflowExecutionLogs } from '@sim/db'
import { db } from '@sim/db'
import {
account,
workspaceNotificationDelivery,
@@ -17,14 +17,11 @@ import {
import { checkUsageStatus } from '@/lib/billing/calculations/usage-monitor'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { dollarsToCredits } from '@/lib/billing/credits/conversion'
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { acquireLock } from '@/lib/core/config/redis'
import { RateLimiter } from '@/lib/core/rate-limiter'
import { decryptSecret } from '@/lib/core/security/encryption'
import { secureFetchWithValidation } from '@/lib/core/security/input-validation.server'
import { formatDuration } from '@/lib/core/utils/formatting'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import type { TraceSpan, WorkflowExecutionLog } from '@/lib/logs/types'
import { sendEmail } from '@/lib/messaging/email/mailer'
import type { AlertConfig } from '@/lib/notifications/alert-rules'
@@ -35,7 +32,6 @@ const logger = createLogger('WorkspaceNotificationDelivery')
const MAX_ATTEMPTS = 5
const RETRY_DELAYS = [5 * 1000, 15 * 1000, 60 * 1000, 3 * 60 * 1000, 10 * 60 * 1000]
const NOTIFICATION_DISPATCH_LOCK_TTL_SECONDS = 3
function getRetryDelayWithJitter(baseDelay: number): number {
const jitter = Math.random() * 0.1 * baseDelay
@@ -490,170 +486,12 @@ async function updateDeliveryStatus(
export interface NotificationDeliveryParams {
deliveryId: string
subscriptionId: string
workspaceId: string
notificationType: 'webhook' | 'email' | 'slack'
log: WorkflowExecutionLog
alertConfig?: AlertConfig
}
export type NotificationDeliveryResult =
| { status: 'success' | 'skipped' | 'failed' }
| { status: 'retry'; retryDelayMs: number }
async function buildRetryLog(params: NotificationDeliveryParams): Promise<WorkflowExecutionLog> {
const conditions = [eq(workflowExecutionLogs.executionId, params.log.executionId)]
if (params.log.workflowId) {
conditions.push(eq(workflowExecutionLogs.workflowId, params.log.workflowId))
}
const [storedLog] = await db
.select()
.from(workflowExecutionLogs)
.where(and(...conditions))
.limit(1)
if (storedLog) {
return storedLog as unknown as WorkflowExecutionLog
}
const now = new Date().toISOString()
return {
id: `retry_log_${params.deliveryId}`,
workflowId: params.log.workflowId,
executionId: params.log.executionId,
stateSnapshotId: '',
level: 'info',
trigger: 'system',
startedAt: now,
endedAt: now,
totalDurationMs: 0,
executionData: {},
cost: { total: 0 },
createdAt: now,
}
}
export async function enqueueNotificationDeliveryDispatch(
params: NotificationDeliveryParams
): Promise<boolean> {
if (!isBullMQEnabled()) {
return false
}
const lockAcquired = await acquireLock(
`workspace-notification-dispatch:${params.deliveryId}`,
params.deliveryId,
NOTIFICATION_DISPATCH_LOCK_TTL_SECONDS
)
if (!lockAcquired) {
return false
}
await enqueueWorkspaceDispatch({
workspaceId: params.workspaceId,
lane: 'lightweight',
queueName: 'workspace-notification-delivery',
bullmqJobName: 'workspace-notification-delivery',
bullmqPayload: createBullMQJobData(params),
metadata: {
workflowId: params.log.workflowId ?? undefined,
},
})
return true
}
const STUCK_IN_PROGRESS_THRESHOLD_MS = 5 * 60 * 1000
export async function sweepPendingNotificationDeliveries(limit = 50): Promise<number> {
if (!isBullMQEnabled()) {
return 0
}
const stuckThreshold = new Date(Date.now() - STUCK_IN_PROGRESS_THRESHOLD_MS)
await db
.update(workspaceNotificationDelivery)
.set({
status: 'pending',
updatedAt: new Date(),
})
.where(
and(
eq(workspaceNotificationDelivery.status, 'in_progress'),
lte(workspaceNotificationDelivery.lastAttemptAt, stuckThreshold)
)
)
const dueDeliveries = await db
.select({
deliveryId: workspaceNotificationDelivery.id,
subscriptionId: workspaceNotificationDelivery.subscriptionId,
workflowId: workspaceNotificationDelivery.workflowId,
executionId: workspaceNotificationDelivery.executionId,
workspaceId: workspaceNotificationSubscription.workspaceId,
alertConfig: workspaceNotificationSubscription.alertConfig,
notificationType: workspaceNotificationSubscription.notificationType,
})
.from(workspaceNotificationDelivery)
.innerJoin(
workspaceNotificationSubscription,
eq(workspaceNotificationDelivery.subscriptionId, workspaceNotificationSubscription.id)
)
.where(
and(
eq(workspaceNotificationDelivery.status, 'pending'),
or(
isNull(workspaceNotificationDelivery.nextAttemptAt),
lte(workspaceNotificationDelivery.nextAttemptAt, new Date())
)
)
)
.limit(limit)
let enqueued = 0
for (const delivery of dueDeliveries) {
const params: NotificationDeliveryParams = {
deliveryId: delivery.deliveryId,
subscriptionId: delivery.subscriptionId,
workspaceId: delivery.workspaceId,
notificationType: delivery.notificationType,
log: await buildRetryLog({
deliveryId: delivery.deliveryId,
subscriptionId: delivery.subscriptionId,
workspaceId: delivery.workspaceId,
notificationType: delivery.notificationType,
log: {
id: '',
workflowId: delivery.workflowId,
executionId: delivery.executionId,
stateSnapshotId: '',
level: 'info',
trigger: 'system',
startedAt: '',
endedAt: '',
totalDurationMs: 0,
executionData: {},
cost: { total: 0 },
createdAt: '',
},
alertConfig: (delivery.alertConfig as AlertConfig | null) ?? undefined,
}),
alertConfig: (delivery.alertConfig as AlertConfig | null) ?? undefined,
}
if (await enqueueNotificationDeliveryDispatch(params)) {
enqueued += 1
}
}
return enqueued
}
export async function executeNotificationDelivery(
params: NotificationDeliveryParams
): Promise<NotificationDeliveryResult> {
export async function executeNotificationDelivery(params: NotificationDeliveryParams) {
const { deliveryId, subscriptionId, notificationType, log, alertConfig } = params
try {
@@ -666,7 +504,7 @@ export async function executeNotificationDelivery(
if (!subscription || !subscription.active) {
logger.warn(`Subscription ${subscriptionId} not found or inactive`)
await updateDeliveryStatus(deliveryId, 'failed', 'Subscription not found or inactive')
return { status: 'failed' }
return
}
const claimed = await db
@@ -691,7 +529,7 @@ export async function executeNotificationDelivery(
if (claimed.length === 0) {
logger.info(`Delivery ${deliveryId} not claimable`)
return { status: 'skipped' }
return
}
const attempts = claimed[0].attempts
@@ -701,7 +539,7 @@ export async function executeNotificationDelivery(
if (!payload) {
await updateDeliveryStatus(deliveryId, 'failed', 'Workflow was archived or deleted')
logger.info(`Skipping delivery ${deliveryId} - workflow was archived or deleted`)
return { status: 'failed' }
return
}
let result: { success: boolean; status?: number; error?: string }
@@ -723,35 +561,39 @@ export async function executeNotificationDelivery(
if (result.success) {
await updateDeliveryStatus(deliveryId, 'success', undefined, result.status)
logger.info(`${notificationType} notification delivered successfully`, { deliveryId })
return { status: 'success' }
}
if (attempts < MAX_ATTEMPTS) {
const retryDelay = getRetryDelayWithJitter(
RETRY_DELAYS[attempts - 1] || RETRY_DELAYS[RETRY_DELAYS.length - 1]
)
const nextAttemptAt = new Date(Date.now() + retryDelay)
} else {
if (attempts < MAX_ATTEMPTS) {
const retryDelay = getRetryDelayWithJitter(
RETRY_DELAYS[attempts - 1] || RETRY_DELAYS[RETRY_DELAYS.length - 1]
)
const nextAttemptAt = new Date(Date.now() + retryDelay)
await updateDeliveryStatus(deliveryId, 'pending', result.error, result.status, nextAttemptAt)
await updateDeliveryStatus(
deliveryId,
'pending',
result.error,
result.status,
nextAttemptAt
)
logger.info(
`${notificationType} notification failed, scheduled retry ${attempts}/${MAX_ATTEMPTS}`,
{
logger.info(
`${notificationType} notification failed, scheduled retry ${attempts}/${MAX_ATTEMPTS}`,
{
deliveryId,
error: result.error,
}
)
} else {
await updateDeliveryStatus(deliveryId, 'failed', result.error, result.status)
logger.error(`${notificationType} notification failed after ${MAX_ATTEMPTS} attempts`, {
deliveryId,
error: result.error,
}
)
return { status: 'retry', retryDelayMs: retryDelay }
})
}
}
await updateDeliveryStatus(deliveryId, 'failed', result.error, result.status)
logger.error(`${notificationType} notification failed after ${MAX_ATTEMPTS} attempts`, {
deliveryId,
error: result.error,
})
return { status: 'failed' }
} catch (error) {
logger.error('Notification delivery failed', { deliveryId, error })
await updateDeliveryStatus(deliveryId, 'failed', 'Internal error')
return { status: 'failed' }
}
}

View File

@@ -1532,7 +1532,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
projectId: effectiveProjectId || undefined,
includeArchived: params.includeArchived,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_get_issue':
@@ -1599,7 +1599,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
teamId: effectiveTeamId,
includeArchived: params.includeArchived,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_add_label_to_issue':
@@ -1650,7 +1650,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
...baseParams,
issueId: params.issueId.trim(),
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_list_projects':
@@ -1659,7 +1659,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
teamId: effectiveTeamId,
includeArchived: params.includeArchived,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_get_project':
@@ -1714,7 +1714,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
return {
...baseParams,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_get_viewer':
@@ -1725,7 +1725,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
...baseParams,
teamId: effectiveTeamId,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_create_label':
@@ -1764,7 +1764,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
...baseParams,
teamId: effectiveTeamId,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_create_workflow_state':
@@ -1795,7 +1795,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
...baseParams,
teamId: effectiveTeamId,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_get_cycle':
@@ -1860,7 +1860,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
...baseParams,
issueId: params.issueId.trim(),
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_update_attachment':
@@ -1901,7 +1901,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
...baseParams,
issueId: params.issueId.trim(),
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_delete_issue_relation':
@@ -1927,7 +1927,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
return {
...baseParams,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_create_project_update':
@@ -1949,14 +1949,14 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
...baseParams,
projectId: effectiveProjectId,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_list_notifications':
return {
...baseParams,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_update_notification':
@@ -1988,7 +1988,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
return {
...baseParams,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
includeArchived: false,
}
@@ -2023,7 +2023,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
return {
...baseParams,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
includeArchived: false,
}
@@ -2117,7 +2117,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
return {
...baseParams,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
// Customer Tier Operations
@@ -2159,7 +2159,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
return {
...baseParams,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
// Project Management Operations
@@ -2212,7 +2212,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
...baseParams,
projectId: effectiveProjectId || undefined,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
case 'linear_add_label_to_project':
@@ -2277,7 +2277,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
...baseParams,
projectId: params.projectIdForMilestone.trim(),
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
// Project Status Operations
@@ -2328,7 +2328,7 @@ Return ONLY the date string in YYYY-MM-DD format - no explanations, no quotes, n
return {
...baseParams,
first: params.first ? Number(params.first) : undefined,
after: params.after?.trim() || undefined,
after: params.after,
}
default:

View File

@@ -164,7 +164,6 @@ interface CreateWorkflowVariables {
folderId?: string | null
sortOrder?: number
id?: string
deduplicate?: boolean
}
interface CreateWorkflowResult {
@@ -301,8 +300,7 @@ export function useCreateWorkflow() {
return useMutation({
mutationFn: async (variables: CreateWorkflowVariables): Promise<CreateWorkflowResult> => {
const { workspaceId, name, description, color, folderId, sortOrder, id, deduplicate } =
variables
const { workspaceId, name, description, color, folderId, sortOrder, id } = variables
logger.info(`Creating new workflow in workspace: ${workspaceId}`)
@@ -317,7 +315,6 @@ export function useCreateWorkflow() {
workspaceId,
folderId: folderId || null,
sortOrder,
deduplicate,
}),
})

View File

@@ -14,20 +14,6 @@ export const AuthType = {
export type AuthTypeValue = (typeof AuthType)[keyof typeof AuthType]
const API_KEY_HEADER = 'x-api-key'
const BEARER_PREFIX = 'Bearer '
/**
* Lightweight header-only check for whether a request carries external API credentials.
* Does NOT validate the credentials — only inspects headers to classify the request
* as programmatic API traffic vs interactive session traffic.
*/
export function hasExternalApiCredentials(headers: Headers): boolean {
if (headers.has(API_KEY_HEADER)) return true
const auth = headers.get('authorization')
return auth !== null && auth.startsWith(BEARER_PREFIX)
}
export interface AuthResult {
success: boolean
userId?: string

View File

@@ -13,7 +13,7 @@ import {
isPro,
isTeam,
} from '@/lib/billing/plan-helpers'
import { parseEnterpriseSubscriptionMetadata } from '@/lib/billing/types'
import type { EnterpriseSubscriptionMetadata } from '@/lib/billing/types'
import { env } from '@/lib/core/config/env'
export const ENTITLED_SUBSCRIPTION_STATUSES = ['active', 'past_due'] as const
@@ -80,15 +80,27 @@ export function checkEnterprisePlan(subscription: any): boolean {
return isEnterprise(subscription?.plan) && hasPaidSubscriptionStatus(subscription?.status)
}
/**
* Type guard to check if metadata is valid EnterpriseSubscriptionMetadata
*/
function isEnterpriseMetadata(metadata: unknown): metadata is EnterpriseSubscriptionMetadata {
return (
!!metadata &&
typeof metadata === 'object' &&
'seats' in metadata &&
typeof (metadata as EnterpriseSubscriptionMetadata).seats === 'string'
)
}
export function getEffectiveSeats(subscription: any): number {
if (!subscription) {
return 0
}
if (isEnterprise(subscription.plan)) {
const metadata = parseEnterpriseSubscriptionMetadata(subscription.metadata)
if (metadata) {
return metadata.seats
const metadata = subscription.metadata as EnterpriseSubscriptionMetadata | null
if (isEnterpriseMetadata(metadata)) {
return Number.parseInt(metadata.seats, 10)
}
return 0
}

View File

@@ -2,47 +2,18 @@
* Billing System Types
* Centralized type definitions for the billing system
*/
import { z } from 'zod'
export const enterpriseSubscriptionMetadataSchema = z.object({
plan: z
.string()
.transform((v) => v.toLowerCase())
.pipe(z.literal('enterprise')),
export interface EnterpriseSubscriptionMetadata {
plan: 'enterprise'
// The referenceId must be provided in Stripe metadata to link to the organization
// This gets stored in the subscription.referenceId column
referenceId: z.string().min(1),
referenceId: string
// The fixed monthly price for this enterprise customer (as string from Stripe metadata)
// This will be used to set the organization's usage limit
monthlyPrice: z.coerce.number().positive(),
// Number of seats for invitation limits (not for billing)
seats: z.coerce.number().int().positive(),
// Optional custom workspace concurrency limit for enterprise workspaces
workspaceConcurrencyLimit: z.coerce.number().int().positive().optional(),
})
export type EnterpriseSubscriptionMetadata = z.infer<typeof enterpriseSubscriptionMetadataSchema>
const enterpriseWorkspaceConcurrencyMetadataSchema = z.object({
workspaceConcurrencyLimit: z.coerce.number().int().positive().optional(),
})
export type EnterpriseWorkspaceConcurrencyMetadata = z.infer<
typeof enterpriseWorkspaceConcurrencyMetadataSchema
>
export function parseEnterpriseSubscriptionMetadata(
value: unknown
): EnterpriseSubscriptionMetadata | null {
const result = enterpriseSubscriptionMetadataSchema.safeParse(value)
return result.success ? result.data : null
}
export function parseEnterpriseWorkspaceConcurrencyMetadata(
value: unknown
): EnterpriseWorkspaceConcurrencyMetadata | null {
const result = enterpriseWorkspaceConcurrencyMetadataSchema.safeParse(value)
return result.success ? result.data : null
monthlyPrice: string
// Number of seats for invitation limits (not for billing) (as string from Stripe metadata)
// We set Stripe quantity to 1 and use this for actual seat count
seats: string
}
export interface UsageData {

View File

@@ -6,10 +6,26 @@ import type Stripe from 'stripe'
import { getEmailSubject, renderEnterpriseSubscriptionEmail } from '@/components/emails'
import { sendEmail } from '@/lib/messaging/email/mailer'
import { getFromEmailAddress } from '@/lib/messaging/email/utils'
import { parseEnterpriseSubscriptionMetadata } from '../types'
import type { EnterpriseSubscriptionMetadata } from '../types'
const logger = createLogger('BillingEnterprise')
function isEnterpriseMetadata(value: unknown): value is EnterpriseSubscriptionMetadata {
return (
!!value &&
typeof value === 'object' &&
'plan' in value &&
'referenceId' in value &&
'monthlyPrice' in value &&
'seats' in value &&
typeof value.plan === 'string' &&
value.plan.toLowerCase() === 'enterprise' &&
typeof value.referenceId === 'string' &&
typeof value.monthlyPrice === 'string' &&
typeof value.seats === 'string'
)
}
export async function handleManualEnterpriseSubscription(event: Stripe.Event) {
const stripeSubscription = event.data.object as Stripe.Subscription
@@ -47,16 +63,37 @@ export async function handleManualEnterpriseSubscription(event: Stripe.Event) {
throw new Error('Unable to resolve referenceId for subscription')
}
const enterpriseMetadata = parseEnterpriseSubscriptionMetadata(metadata)
if (!enterpriseMetadata) {
if (!isEnterpriseMetadata(metadata)) {
logger.error('[subscription.created] Invalid enterprise metadata shape', {
subscriptionId: stripeSubscription.id,
metadata,
})
throw new Error('Invalid enterprise metadata for subscription')
}
const enterpriseMetadata = metadata
const metadataJson: Record<string, unknown> = { ...enterpriseMetadata }
const { seats, monthlyPrice } = enterpriseMetadata
// Extract and parse seats and monthly price from metadata (they come as strings from Stripe)
const seats = Number.parseInt(enterpriseMetadata.seats, 10)
const monthlyPrice = Number.parseFloat(enterpriseMetadata.monthlyPrice)
if (!seats || seats <= 0 || Number.isNaN(seats)) {
logger.error('[subscription.created] Invalid or missing seats in enterprise metadata', {
subscriptionId: stripeSubscription.id,
seatsRaw: enterpriseMetadata.seats,
seatsParsed: seats,
})
throw new Error('Enterprise subscription must include valid seats in metadata')
}
if (!monthlyPrice || monthlyPrice <= 0 || Number.isNaN(monthlyPrice)) {
logger.error('[subscription.created] Invalid or missing monthlyPrice in enterprise metadata', {
subscriptionId: stripeSubscription.id,
monthlyPriceRaw: enterpriseMetadata.monthlyPrice,
monthlyPriceParsed: monthlyPrice,
})
throw new Error('Enterprise subscription must include valid monthlyPrice in metadata')
}
// Get the first subscription item which contains the period information
const referenceItem = stripeSubscription.items?.data?.[0]
@@ -80,7 +117,7 @@ export async function handleManualEnterpriseSubscription(event: Stripe.Event) {
? new Date(stripeSubscription.trial_start * 1000)
: null,
trialEnd: stripeSubscription.trial_end ? new Date(stripeSubscription.trial_end * 1000) : null,
metadata: metadata as Record<string, unknown>,
metadata: metadataJson,
}
const existing = await db

View File

@@ -1,146 +0,0 @@
/**
* @vitest-environment node
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
const {
mockGetHighestPrioritySubscription,
mockGetWorkspaceBilledAccountUserId,
mockFeatureFlags,
mockRedisGet,
mockRedisSet,
mockRedisDel,
mockRedisKeys,
mockGetRedisClient,
} = vi.hoisted(() => ({
mockGetHighestPrioritySubscription: vi.fn(),
mockGetWorkspaceBilledAccountUserId: vi.fn(),
mockFeatureFlags: {
isBillingEnabled: true,
},
mockRedisGet: vi.fn(),
mockRedisSet: vi.fn(),
mockRedisDel: vi.fn(),
mockRedisKeys: vi.fn(),
mockGetRedisClient: vi.fn(),
}))
vi.mock('@sim/logger', () => ({
createLogger: () => ({
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
}),
}))
vi.mock('@/lib/billing/core/plan', () => ({
getHighestPrioritySubscription: mockGetHighestPrioritySubscription,
}))
vi.mock('@/lib/workspaces/utils', () => ({
getWorkspaceBilledAccountUserId: mockGetWorkspaceBilledAccountUserId,
}))
vi.mock('@/lib/core/config/redis', () => ({
getRedisClient: mockGetRedisClient,
}))
vi.mock('@/lib/core/config/feature-flags', () => mockFeatureFlags)
import {
getWorkspaceConcurrencyLimit,
resetWorkspaceConcurrencyLimitCache,
} from '@/lib/billing/workspace-concurrency'
describe('workspace concurrency billing', () => {
beforeEach(() => {
vi.clearAllMocks()
mockFeatureFlags.isBillingEnabled = true
mockRedisGet.mockResolvedValue(null)
mockRedisSet.mockResolvedValue('OK')
mockRedisDel.mockResolvedValue(1)
mockRedisKeys.mockResolvedValue([])
mockGetRedisClient.mockReturnValue({
get: mockRedisGet,
set: mockRedisSet,
del: mockRedisDel,
keys: mockRedisKeys,
})
})
it('returns free tier when no billed account exists', async () => {
mockGetWorkspaceBilledAccountUserId.mockResolvedValue(null)
await expect(getWorkspaceConcurrencyLimit('workspace-1')).resolves.toBe(5)
})
it('returns pro limit for pro billing accounts', async () => {
mockGetWorkspaceBilledAccountUserId.mockResolvedValue('user-1')
mockGetHighestPrioritySubscription.mockResolvedValue({
plan: 'pro_6000',
metadata: null,
})
await expect(getWorkspaceConcurrencyLimit('workspace-1')).resolves.toBe(50)
})
it('returns max limit for max plan tiers', async () => {
mockGetWorkspaceBilledAccountUserId.mockResolvedValue('user-1')
mockGetHighestPrioritySubscription.mockResolvedValue({
plan: 'pro_25000',
metadata: null,
})
await expect(getWorkspaceConcurrencyLimit('workspace-1')).resolves.toBe(200)
})
it('returns max limit for legacy team plans', async () => {
mockGetWorkspaceBilledAccountUserId.mockResolvedValue('user-1')
mockGetHighestPrioritySubscription.mockResolvedValue({
plan: 'team',
metadata: null,
})
await expect(getWorkspaceConcurrencyLimit('workspace-1')).resolves.toBe(200)
})
it('returns enterprise metadata override when present', async () => {
mockGetWorkspaceBilledAccountUserId.mockResolvedValue('user-1')
mockGetHighestPrioritySubscription.mockResolvedValue({
plan: 'enterprise',
metadata: {
workspaceConcurrencyLimit: '350',
},
})
await expect(getWorkspaceConcurrencyLimit('workspace-1')).resolves.toBe(350)
})
it('uses free-tier limit when billing is disabled', async () => {
mockFeatureFlags.isBillingEnabled = false
mockGetWorkspaceBilledAccountUserId.mockResolvedValue('user-1')
mockGetHighestPrioritySubscription.mockResolvedValue({
plan: 'pro_25000',
metadata: {
workspaceConcurrencyLimit: 999,
},
})
await expect(getWorkspaceConcurrencyLimit('workspace-1')).resolves.toBe(5)
})
it('uses redis cache when available', async () => {
mockRedisGet.mockResolvedValueOnce('123')
await expect(getWorkspaceConcurrencyLimit('workspace-1')).resolves.toBe(123)
expect(mockGetWorkspaceBilledAccountUserId).not.toHaveBeenCalled()
})
it('can clear a specific workspace cache entry', async () => {
await resetWorkspaceConcurrencyLimitCache('workspace-1')
expect(mockRedisDel).toHaveBeenCalledWith('workspace-concurrency-limit:workspace-1')
})
})

View File

@@ -1,170 +0,0 @@
import { createLogger } from '@sim/logger'
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
import { getPlanTierCredits, isEnterprise, isPro, isTeam } from '@/lib/billing/plan-helpers'
import { parseEnterpriseWorkspaceConcurrencyMetadata } from '@/lib/billing/types'
import { env } from '@/lib/core/config/env'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
import { getRedisClient } from '@/lib/core/config/redis'
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
const logger = createLogger('WorkspaceConcurrencyBilling')
const CACHE_TTL_MS = 60_000
const CACHE_TTL_SECONDS = Math.floor(CACHE_TTL_MS / 1000)
interface CacheEntry {
value: number
expiresAt: number
}
const inMemoryConcurrencyCache = new Map<string, CacheEntry>()
function cacheKey(workspaceId: string): string {
return `workspace-concurrency-limit:${workspaceId}`
}
function parsePositiveLimit(value: unknown): number | null {
if (typeof value === 'number' && Number.isFinite(value) && value > 0) {
return Math.floor(value)
}
if (typeof value === 'string') {
const parsed = Number.parseInt(value, 10)
if (Number.isFinite(parsed) && parsed > 0) {
return parsed
}
}
return null
}
function getFreeConcurrencyLimit(): number {
return Number.parseInt(env.WORKSPACE_CONCURRENCY_FREE, 10) || 5
}
function getProConcurrencyLimit(): number {
return Number.parseInt(env.WORKSPACE_CONCURRENCY_PRO, 10) || 50
}
function getTeamConcurrencyLimit(): number {
return Number.parseInt(env.WORKSPACE_CONCURRENCY_TEAM, 10) || 200
}
function getEnterpriseDefaultConcurrencyLimit(): number {
return Number.parseInt(env.WORKSPACE_CONCURRENCY_ENTERPRISE, 10) || 200
}
function getEnterpriseConcurrencyLimit(metadata: unknown): number {
const enterpriseMetadata = parseEnterpriseWorkspaceConcurrencyMetadata(metadata)
return enterpriseMetadata?.workspaceConcurrencyLimit ?? getEnterpriseDefaultConcurrencyLimit()
}
function getPlanConcurrencyLimit(plan: string | null | undefined, metadata: unknown): number {
if (!isBillingEnabled) {
return getFreeConcurrencyLimit()
}
if (!plan) {
return getFreeConcurrencyLimit()
}
if (isEnterprise(plan)) {
return getEnterpriseConcurrencyLimit(metadata)
}
if (isTeam(plan)) {
return getTeamConcurrencyLimit()
}
const credits = getPlanTierCredits(plan)
if (credits >= 25_000) {
return getTeamConcurrencyLimit()
}
if (isPro(plan)) {
return getProConcurrencyLimit()
}
return getFreeConcurrencyLimit()
}
export async function getWorkspaceConcurrencyLimit(workspaceId: string): Promise<number> {
const redis = getRedisClient()
if (redis) {
const cached = await redis.get(cacheKey(workspaceId))
const cachedValue = parsePositiveLimit(cached)
if (cachedValue !== null) {
return cachedValue
}
} else {
const cached = inMemoryConcurrencyCache.get(workspaceId)
if (cached && cached.expiresAt > Date.now()) {
return cached.value
}
}
try {
const billedAccountUserId = await getWorkspaceBilledAccountUserId(workspaceId)
if (!billedAccountUserId) {
if (redis) {
await redis.set(
cacheKey(workspaceId),
String(getFreeConcurrencyLimit()),
'EX',
CACHE_TTL_SECONDS
)
} else {
inMemoryConcurrencyCache.set(workspaceId, {
value: getFreeConcurrencyLimit(),
expiresAt: Date.now() + CACHE_TTL_MS,
})
}
return getFreeConcurrencyLimit()
}
const subscription = await getHighestPrioritySubscription(billedAccountUserId)
const limit = getPlanConcurrencyLimit(subscription?.plan, subscription?.metadata)
if (redis) {
await redis.set(cacheKey(workspaceId), String(limit), 'EX', CACHE_TTL_SECONDS)
} else {
inMemoryConcurrencyCache.set(workspaceId, {
value: limit,
expiresAt: Date.now() + CACHE_TTL_MS,
})
}
return limit
} catch (error) {
logger.error('Failed to resolve workspace concurrency limit, using free tier', {
workspaceId,
error,
})
return getFreeConcurrencyLimit()
}
}
export async function resetWorkspaceConcurrencyLimitCache(workspaceId?: string): Promise<void> {
if (!workspaceId) {
inMemoryConcurrencyCache.clear()
} else {
inMemoryConcurrencyCache.delete(workspaceId)
}
const redis = getRedisClient()
if (!redis) {
return
}
if (workspaceId) {
await redis.del(cacheKey(workspaceId))
return
}
const keys = await redis.keys('workspace-concurrency-limit:*')
if (keys.length > 0) {
await redis.del(...keys)
}
}

View File

@@ -1,62 +0,0 @@
import { createLogger } from '@sim/logger'
import { NextResponse } from 'next/server'
import { env } from '@/lib/core/config/env'
const logger = createLogger('AdmissionGate')
const MAX_INFLIGHT = Number.parseInt(env.ADMISSION_GATE_MAX_INFLIGHT ?? '') || 500
let inflight = 0
export interface AdmissionTicket {
release: () => void
}
/**
* Attempts to admit a request through the in-process gate.
* Returns a ticket with a release() handle on success, or null if at capacity.
* Zero external calls — purely in-process atomic counter. Each pod maintains its
* own counter, so the effective aggregate limit across N pods is N × MAX_INFLIGHT.
* Configure ADMISSION_GATE_MAX_INFLIGHT per pod based on what each pod can sustain.
*/
export function tryAdmit(): AdmissionTicket | null {
if (inflight >= MAX_INFLIGHT) {
return null
}
inflight++
let released = false
return {
release() {
if (released) return
released = true
inflight--
},
}
}
/**
* Returns a 429 response for requests rejected by the admission gate.
*/
export function admissionRejectedResponse(): NextResponse {
logger.warn('Admission gate rejecting request', { inflight, maxInflight: MAX_INFLIGHT })
return NextResponse.json(
{
error: 'Too many requests',
message: 'Server is at capacity. Please retry shortly.',
retryAfterSeconds: 5,
},
{
status: 429,
headers: { 'Retry-After': '5' },
}
)
}
/**
* Returns the current gate metrics for observability.
*/
export function getAdmissionGateStatus(): { inflight: number; maxInflight: number } {
return { inflight, maxInflight: MAX_INFLIGHT }
}

View File

@@ -1,106 +0,0 @@
import { createLogger } from '@sim/logger'
import type { Job as BullMQJob } from 'bullmq'
import {
type EnqueueOptions,
JOB_STATUS,
type Job,
type JobQueueBackend,
type JobStatus,
type JobType,
} from '@/lib/core/async-jobs/types'
import { type BullMQJobData, createBullMQJobData, getBullMQQueue } from '@/lib/core/bullmq'
const logger = createLogger('BullMQJobQueue')
function mapBullMQStatus(status: string): JobStatus {
switch (status) {
case 'active':
return JOB_STATUS.PROCESSING
case 'completed':
return JOB_STATUS.COMPLETED
case 'failed':
return JOB_STATUS.FAILED
default:
return JOB_STATUS.PENDING
}
}
async function toJob(
queueType: JobType,
bullJob: BullMQJob<BullMQJobData<unknown>> | null
): Promise<Job | null> {
if (!bullJob) {
return null
}
const status = mapBullMQStatus(await bullJob.getState())
return {
id: bullJob.id ?? '',
type: queueType,
payload: bullJob.data.payload,
status,
createdAt: new Date(bullJob.timestamp),
startedAt: bullJob.processedOn ? new Date(bullJob.processedOn) : undefined,
completedAt: bullJob.finishedOn ? new Date(bullJob.finishedOn) : undefined,
attempts: bullJob.attemptsMade,
maxAttempts: bullJob.opts.attempts ?? 1,
error: bullJob.failedReason || undefined,
output: bullJob.returnvalue,
metadata: bullJob.data.metadata ?? {},
}
}
export class BullMQJobQueue implements JobQueueBackend {
async enqueue<TPayload>(
type: JobType,
payload: TPayload,
options?: EnqueueOptions
): Promise<string> {
const queue = getBullMQQueue(type)
const job = await queue.add(
options?.name ?? type,
createBullMQJobData(payload, options?.metadata),
{
jobId: options?.jobId,
attempts: options?.maxAttempts,
priority: options?.priority,
delay: options?.delayMs,
}
)
logger.debug('Enqueued job via BullMQ', {
jobId: job.id,
type,
name: options?.name ?? type,
})
return String(job.id)
}
async getJob(jobId: string): Promise<Job | null> {
const workflowJob = await getBullMQQueue('workflow-execution').getJob(jobId)
if (workflowJob) {
return toJob('workflow-execution', workflowJob)
}
const webhookJob = await getBullMQQueue('webhook-execution').getJob(jobId)
if (webhookJob) {
return toJob('webhook-execution', webhookJob)
}
const scheduleJob = await getBullMQQueue('schedule-execution').getJob(jobId)
if (scheduleJob) {
return toJob('schedule-execution', scheduleJob)
}
return null
}
async startJob(_jobId: string): Promise<void> {}
async completeJob(_jobId: string, _output: unknown): Promise<void> {}
async markJobFailed(_jobId: string, _error: string): Promise<void> {}
}

View File

@@ -1,3 +1,3 @@
export { BullMQJobQueue } from './bullmq'
export { DatabaseJobQueue } from './database'
export { RedisJobQueue } from './redis'
export { TriggerDevJobQueue } from './trigger-dev'

View File

@@ -0,0 +1,176 @@
/**
* @vitest-environment node
*/
import { createMockRedis, loggerMock, type MockRedis } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'
vi.mock('@sim/logger', () => loggerMock)
import {
JOB_MAX_LIFETIME_SECONDS,
JOB_RETENTION_SECONDS,
JOB_STATUS,
} from '@/lib/core/async-jobs/types'
import { RedisJobQueue } from './redis'
describe('RedisJobQueue', () => {
let mockRedis: MockRedis
let queue: RedisJobQueue
beforeEach(() => {
vi.clearAllMocks()
mockRedis = createMockRedis()
queue = new RedisJobQueue(mockRedis as never)
})
describe('enqueue', () => {
it.concurrent('should create a job with pending status', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
const jobId = await localQueue.enqueue('workflow-execution', { test: 'data' })
expect(jobId).toMatch(/^run_/)
expect(localRedis.hset).toHaveBeenCalledTimes(1)
const [key, data] = localRedis.hset.mock.calls[0]
expect(key).toBe(`async-jobs:job:${jobId}`)
expect(data.status).toBe(JOB_STATUS.PENDING)
expect(data.type).toBe('workflow-execution')
})
it.concurrent('should set max lifetime TTL on enqueue', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
const jobId = await localQueue.enqueue('workflow-execution', { test: 'data' })
expect(localRedis.expire).toHaveBeenCalledWith(
`async-jobs:job:${jobId}`,
JOB_MAX_LIFETIME_SECONDS
)
})
})
describe('completeJob', () => {
it.concurrent('should set status to completed and set TTL', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
const jobId = 'run_test123'
await localQueue.completeJob(jobId, { result: 'success' })
expect(localRedis.hset).toHaveBeenCalledWith(`async-jobs:job:${jobId}`, {
status: JOB_STATUS.COMPLETED,
completedAt: expect.any(String),
output: JSON.stringify({ result: 'success' }),
updatedAt: expect.any(String),
})
expect(localRedis.expire).toHaveBeenCalledWith(
`async-jobs:job:${jobId}`,
JOB_RETENTION_SECONDS
)
})
it.concurrent('should set TTL to 24 hours (86400 seconds)', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
await localQueue.completeJob('run_test123', {})
expect(localRedis.expire).toHaveBeenCalledWith(expect.any(String), 86400)
})
})
describe('markJobFailed', () => {
it.concurrent('should set status to failed and set TTL', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
const jobId = 'run_test456'
const error = 'Something went wrong'
await localQueue.markJobFailed(jobId, error)
expect(localRedis.hset).toHaveBeenCalledWith(`async-jobs:job:${jobId}`, {
status: JOB_STATUS.FAILED,
completedAt: expect.any(String),
error,
updatedAt: expect.any(String),
})
expect(localRedis.expire).toHaveBeenCalledWith(
`async-jobs:job:${jobId}`,
JOB_RETENTION_SECONDS
)
})
it.concurrent('should set TTL to 24 hours (86400 seconds)', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
await localQueue.markJobFailed('run_test456', 'error')
expect(localRedis.expire).toHaveBeenCalledWith(expect.any(String), 86400)
})
})
describe('startJob', () => {
it.concurrent('should not set TTL when starting a job', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
await localQueue.startJob('run_test789')
expect(localRedis.hset).toHaveBeenCalled()
expect(localRedis.expire).not.toHaveBeenCalled()
})
})
describe('getJob', () => {
it.concurrent('should return null for non-existent job', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
localRedis.hgetall.mockResolvedValue({})
const job = await localQueue.getJob('run_nonexistent')
expect(job).toBeNull()
})
it.concurrent('should deserialize job data correctly', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
const now = new Date()
localRedis.hgetall.mockResolvedValue({
id: 'run_test',
type: 'workflow-execution',
payload: JSON.stringify({ foo: 'bar' }),
status: JOB_STATUS.COMPLETED,
createdAt: now.toISOString(),
startedAt: now.toISOString(),
completedAt: now.toISOString(),
attempts: '1',
maxAttempts: '3',
error: '',
output: JSON.stringify({ result: 'ok' }),
metadata: JSON.stringify({ workflowId: 'wf_123' }),
})
const job = await localQueue.getJob('run_test')
expect(job).not.toBeNull()
expect(job?.id).toBe('run_test')
expect(job?.type).toBe('workflow-execution')
expect(job?.payload).toEqual({ foo: 'bar' })
expect(job?.status).toBe(JOB_STATUS.COMPLETED)
expect(job?.output).toEqual({ result: 'ok' })
expect(job?.metadata.workflowId).toBe('wf_123')
})
})
})
describe('JOB_RETENTION_SECONDS', () => {
it.concurrent('should be 24 hours in seconds', async () => {
expect(JOB_RETENTION_SECONDS).toBe(24 * 60 * 60)
expect(JOB_RETENTION_SECONDS).toBe(86400)
})
})

View File

@@ -0,0 +1,146 @@
import { createLogger } from '@sim/logger'
import type Redis from 'ioredis'
import {
type EnqueueOptions,
JOB_MAX_LIFETIME_SECONDS,
JOB_RETENTION_SECONDS,
JOB_STATUS,
type Job,
type JobMetadata,
type JobQueueBackend,
type JobStatus,
type JobType,
} from '@/lib/core/async-jobs/types'
const logger = createLogger('RedisJobQueue')
const KEYS = {
job: (id: string) => `async-jobs:job:${id}`,
} as const
function serializeJob(job: Job): Record<string, string> {
return {
id: job.id,
type: job.type,
payload: JSON.stringify(job.payload),
status: job.status,
createdAt: job.createdAt.toISOString(),
startedAt: job.startedAt?.toISOString() ?? '',
completedAt: job.completedAt?.toISOString() ?? '',
attempts: job.attempts.toString(),
maxAttempts: job.maxAttempts.toString(),
error: job.error ?? '',
output: job.output !== undefined ? JSON.stringify(job.output) : '',
metadata: JSON.stringify(job.metadata),
updatedAt: new Date().toISOString(),
}
}
function deserializeJob(data: Record<string, string>): Job | null {
if (!data || !data.id) return null
try {
return {
id: data.id,
type: data.type as JobType,
payload: JSON.parse(data.payload),
status: data.status as JobStatus,
createdAt: new Date(data.createdAt),
startedAt: data.startedAt ? new Date(data.startedAt) : undefined,
completedAt: data.completedAt ? new Date(data.completedAt) : undefined,
attempts: Number.parseInt(data.attempts, 10),
maxAttempts: Number.parseInt(data.maxAttempts, 10),
error: data.error || undefined,
output: data.output ? JSON.parse(data.output) : undefined,
metadata: JSON.parse(data.metadata) as JobMetadata,
}
} catch (error) {
logger.error('Failed to deserialize job', { error, data })
return null
}
}
export class RedisJobQueue implements JobQueueBackend {
private redis: Redis
constructor(redis: Redis) {
this.redis = redis
}
async enqueue<TPayload>(
type: JobType,
payload: TPayload,
options?: EnqueueOptions
): Promise<string> {
const jobId = `run_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`
const now = new Date()
const job: Job<TPayload> = {
id: jobId,
type,
payload,
status: JOB_STATUS.PENDING,
createdAt: now,
attempts: 0,
maxAttempts: options?.maxAttempts ?? 3,
metadata: options?.metadata ?? {},
}
const key = KEYS.job(jobId)
const serialized = serializeJob(job as Job)
await this.redis.hset(key, serialized)
await this.redis.expire(key, JOB_MAX_LIFETIME_SECONDS)
logger.debug('Enqueued job', { jobId, type })
return jobId
}
async getJob(jobId: string): Promise<Job | null> {
const data = await this.redis.hgetall(KEYS.job(jobId))
return deserializeJob(data)
}
async startJob(jobId: string): Promise<void> {
const now = new Date()
const key = KEYS.job(jobId)
await this.redis.hset(key, {
status: JOB_STATUS.PROCESSING,
startedAt: now.toISOString(),
updatedAt: now.toISOString(),
})
await this.redis.hincrby(key, 'attempts', 1)
logger.debug('Started job', { jobId })
}
async completeJob(jobId: string, output: unknown): Promise<void> {
const now = new Date()
const key = KEYS.job(jobId)
await this.redis.hset(key, {
status: JOB_STATUS.COMPLETED,
completedAt: now.toISOString(),
output: JSON.stringify(output),
updatedAt: now.toISOString(),
})
await this.redis.expire(key, JOB_RETENTION_SECONDS)
logger.debug('Completed job', { jobId })
}
async markJobFailed(jobId: string, error: string): Promise<void> {
const now = new Date()
const key = KEYS.job(jobId)
await this.redis.hset(key, {
status: JOB_STATUS.FAILED,
completedAt: now.toISOString(),
error,
updatedAt: now.toISOString(),
})
await this.redis.expire(key, JOB_RETENTION_SECONDS)
logger.debug('Marked job as failed', { jobId })
}
}

View File

@@ -1,7 +1,7 @@
import { createLogger } from '@sim/logger'
import type { AsyncBackendType, JobQueueBackend } from '@/lib/core/async-jobs/types'
import { isBullMQEnabled } from '@/lib/core/bullmq'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getRedisClient } from '@/lib/core/config/redis'
const logger = createLogger('AsyncJobsConfig')
@@ -11,15 +11,16 @@ let cachedInlineBackend: JobQueueBackend | null = null
/**
* Determines which async backend to use based on environment configuration.
* Follows the fallback chain: trigger.dev → bullmq → database
* Follows the fallback chain: trigger.dev → redis → database
*/
export function getAsyncBackendType(): AsyncBackendType {
if (isTriggerDevEnabled) {
return 'trigger-dev'
}
if (isBullMQEnabled()) {
return 'bullmq'
const redis = getRedisClient()
if (redis) {
return 'redis'
}
return 'database'
@@ -42,9 +43,13 @@ export async function getJobQueue(): Promise<JobQueueBackend> {
cachedBackend = new TriggerDevJobQueue()
break
}
case 'bullmq': {
const { BullMQJobQueue } = await import('@/lib/core/async-jobs/backends/bullmq')
cachedBackend = new BullMQJobQueue()
case 'redis': {
const redis = getRedisClient()
if (!redis) {
throw new Error('Redis client not available but redis backend was selected')
}
const { RedisJobQueue } = await import('@/lib/core/async-jobs/backends/redis')
cachedBackend = new RedisJobQueue(redis)
break
}
case 'database': {
@@ -57,10 +62,6 @@ export async function getJobQueue(): Promise<JobQueueBackend> {
cachedBackendType = type
logger.info(`Async job backend initialized: ${type}`)
if (!cachedBackend) {
throw new Error(`Failed to initialize async backend: ${type}`)
}
return cachedBackend
}
@@ -72,19 +73,20 @@ export function getCurrentBackendType(): AsyncBackendType | null {
}
/**
* Gets a job queue backend that bypasses Trigger.dev (BullMQ -> Database).
* Used for execution paths that must avoid Trigger.dev cold starts.
* Gets a job queue backend that bypasses Trigger.dev (Redis -> Database).
* Used for non-polling webhooks that should always execute inline.
*/
export async function getInlineJobQueue(): Promise<JobQueueBackend> {
if (cachedInlineBackend) {
return cachedInlineBackend
}
const redis = getRedisClient()
let type: string
if (isBullMQEnabled()) {
const { BullMQJobQueue } = await import('@/lib/core/async-jobs/backends/bullmq')
cachedInlineBackend = new BullMQJobQueue()
type = 'bullmq'
if (redis) {
const { RedisJobQueue } = await import('@/lib/core/async-jobs/backends/redis')
cachedInlineBackend = new RedisJobQueue(redis)
type = 'redis'
} else {
const { DatabaseJobQueue } = await import('@/lib/core/async-jobs/backends/database')
cachedInlineBackend = new DatabaseJobQueue()
@@ -96,15 +98,11 @@ export async function getInlineJobQueue(): Promise<JobQueueBackend> {
}
/**
* Checks if jobs should be executed inline in-process.
* Database fallback is the only mode that still relies on inline execution.
* Checks if jobs should be executed inline (fire-and-forget).
* For Redis/DB backends, we execute inline. Trigger.dev handles execution itself.
*/
export function shouldExecuteInline(): boolean {
return getAsyncBackendType() === 'database'
}
export function shouldUseBullMQ(): boolean {
return isBullMQEnabled()
return getAsyncBackendType() !== 'trigger-dev'
}
/**

View File

@@ -5,7 +5,6 @@ export {
getJobQueue,
resetJobQueueCache,
shouldExecuteInline,
shouldUseBullMQ,
} from './config'
export type {
AsyncBackendType,

View File

@@ -62,10 +62,6 @@ export interface JobMetadata {
export interface EnqueueOptions {
maxAttempts?: number
metadata?: JobMetadata
jobId?: string
priority?: number
name?: string
delayMs?: number
}
/**
@@ -99,4 +95,4 @@ export interface JobQueueBackend {
markJobFailed(jobId: string, error: string): Promise<void>
}
export type AsyncBackendType = 'trigger-dev' | 'bullmq' | 'database'
export type AsyncBackendType = 'trigger-dev' | 'redis' | 'database'

View File

@@ -1,29 +0,0 @@
import type { ConnectionOptions } from 'bullmq'
import { env } from '@/lib/core/config/env'
export function isBullMQEnabled(): boolean {
return Boolean(env.REDIS_URL)
}
export function getBullMQConnectionOptions(): ConnectionOptions {
if (!env.REDIS_URL) {
throw new Error('BullMQ requires REDIS_URL')
}
const redisUrl = new URL(env.REDIS_URL)
const isTls = redisUrl.protocol === 'rediss:'
const port = redisUrl.port ? Number.parseInt(redisUrl.port, 10) : 6379
const dbPath = redisUrl.pathname.replace('/', '')
const db = dbPath ? Number.parseInt(dbPath, 10) : undefined
return {
host: redisUrl.hostname,
port,
username: redisUrl.username || undefined,
password: redisUrl.password || undefined,
db: Number.isFinite(db) ? db : undefined,
maxRetriesPerRequest: null,
enableReadyCheck: false,
...(isTls ? { tls: {} } : {}),
}
}

View File

@@ -1,16 +0,0 @@
export { getBullMQConnectionOptions, isBullMQEnabled } from './connection'
export {
type BullMQJobData,
createBullMQJobData,
getBullMQQueue,
getBullMQQueueByName,
getKnowledgeConnectorSyncQueue,
getKnowledgeDocumentProcessingQueue,
getMothershipJobExecutionQueue,
getWorkflowQueueEvents,
getWorkspaceNotificationDeliveryQueue,
KNOWLEDGE_CONNECTOR_SYNC_QUEUE,
KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE,
MOTHERSHIP_JOB_EXECUTION_QUEUE,
WORKSPACE_NOTIFICATION_DELIVERY_QUEUE,
} from './queues'

View File

@@ -1,196 +0,0 @@
import { Queue, QueueEvents } from 'bullmq'
import type { JobMetadata, JobType } from '@/lib/core/async-jobs/types'
import { getBullMQConnectionOptions } from '@/lib/core/bullmq/connection'
import type { WorkspaceDispatchQueueName } from '@/lib/core/workspace-dispatch/types'
export const KNOWLEDGE_CONNECTOR_SYNC_QUEUE = 'knowledge-connector-sync' as const
export const KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE = 'knowledge-process-document' as const
export const MOTHERSHIP_JOB_EXECUTION_QUEUE = 'mothership-job-execution' as const
export const WORKSPACE_NOTIFICATION_DELIVERY_QUEUE = 'workspace-notification-delivery' as const
export interface BullMQJobData<TPayload> {
payload: TPayload
metadata?: JobMetadata
}
let workflowQueueInstance: Queue | null = null
let webhookQueueInstance: Queue | null = null
let scheduleQueueInstance: Queue | null = null
let knowledgeConnectorSyncQueueInstance: Queue | null = null
let knowledgeDocumentProcessingQueueInstance: Queue | null = null
let mothershipJobExecutionQueueInstance: Queue | null = null
let workspaceNotificationDeliveryQueueInstance: Queue | null = null
let workflowQueueEventsInstance: QueueEvents | null = null
function getQueueDefaultOptions(type: JobType) {
switch (type) {
case 'workflow-execution':
return {
attempts: 3,
backoff: { type: 'exponential' as const, delay: 1000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
}
case 'webhook-execution':
return {
attempts: 2,
backoff: { type: 'exponential' as const, delay: 2000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 3 * 24 * 60 * 60 },
}
case 'schedule-execution':
return {
attempts: 2,
backoff: { type: 'exponential' as const, delay: 5000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 3 * 24 * 60 * 60 },
}
}
}
function createQueue(type: JobType): Queue {
return new Queue(type, {
connection: getBullMQConnectionOptions(),
defaultJobOptions: getQueueDefaultOptions(type),
})
}
function createNamedQueue(
name:
| typeof KNOWLEDGE_CONNECTOR_SYNC_QUEUE
| typeof KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE
| typeof MOTHERSHIP_JOB_EXECUTION_QUEUE
| typeof WORKSPACE_NOTIFICATION_DELIVERY_QUEUE
): Queue {
switch (name) {
case KNOWLEDGE_CONNECTOR_SYNC_QUEUE:
return new Queue(name, {
connection: getBullMQConnectionOptions(),
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
},
})
case KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE:
return new Queue(name, {
connection: getBullMQConnectionOptions(),
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
},
})
case MOTHERSHIP_JOB_EXECUTION_QUEUE:
return new Queue(name, {
connection: getBullMQConnectionOptions(),
defaultJobOptions: {
attempts: 1,
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
},
})
case WORKSPACE_NOTIFICATION_DELIVERY_QUEUE:
return new Queue(name, {
connection: getBullMQConnectionOptions(),
defaultJobOptions: {
attempts: 1,
removeOnComplete: { age: 24 * 60 * 60 },
removeOnFail: { age: 7 * 24 * 60 * 60 },
},
})
}
}
export function getBullMQQueue(type: JobType): Queue {
switch (type) {
case 'workflow-execution':
if (!workflowQueueInstance) {
workflowQueueInstance = createQueue(type)
}
return workflowQueueInstance
case 'webhook-execution':
if (!webhookQueueInstance) {
webhookQueueInstance = createQueue(type)
}
return webhookQueueInstance
case 'schedule-execution':
if (!scheduleQueueInstance) {
scheduleQueueInstance = createQueue(type)
}
return scheduleQueueInstance
}
}
export function getBullMQQueueByName(queueName: WorkspaceDispatchQueueName): Queue {
switch (queueName) {
case 'workflow-execution':
case 'webhook-execution':
case 'schedule-execution':
return getBullMQQueue(queueName)
case KNOWLEDGE_CONNECTOR_SYNC_QUEUE:
return getKnowledgeConnectorSyncQueue()
case KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE:
return getKnowledgeDocumentProcessingQueue()
case MOTHERSHIP_JOB_EXECUTION_QUEUE:
return getMothershipJobExecutionQueue()
case WORKSPACE_NOTIFICATION_DELIVERY_QUEUE:
return getWorkspaceNotificationDeliveryQueue()
}
}
export function getWorkflowQueueEvents(): QueueEvents {
if (!workflowQueueEventsInstance) {
workflowQueueEventsInstance = new QueueEvents('workflow-execution', {
connection: getBullMQConnectionOptions(),
})
}
return workflowQueueEventsInstance
}
export function getKnowledgeConnectorSyncQueue(): Queue {
if (!knowledgeConnectorSyncQueueInstance) {
knowledgeConnectorSyncQueueInstance = createNamedQueue(KNOWLEDGE_CONNECTOR_SYNC_QUEUE)
}
return knowledgeConnectorSyncQueueInstance
}
export function getKnowledgeDocumentProcessingQueue(): Queue {
if (!knowledgeDocumentProcessingQueueInstance) {
knowledgeDocumentProcessingQueueInstance = createNamedQueue(KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE)
}
return knowledgeDocumentProcessingQueueInstance
}
export function getMothershipJobExecutionQueue(): Queue {
if (!mothershipJobExecutionQueueInstance) {
mothershipJobExecutionQueueInstance = createNamedQueue(MOTHERSHIP_JOB_EXECUTION_QUEUE)
}
return mothershipJobExecutionQueueInstance
}
export function getWorkspaceNotificationDeliveryQueue(): Queue {
if (!workspaceNotificationDeliveryQueueInstance) {
workspaceNotificationDeliveryQueueInstance = createNamedQueue(
WORKSPACE_NOTIFICATION_DELIVERY_QUEUE
)
}
return workspaceNotificationDeliveryQueueInstance
}
export function createBullMQJobData<TPayload>(
payload: TPayload,
metadata?: JobMetadata
): BullMQJobData<TPayload> {
return {
payload,
metadata: metadata ?? {},
}
}

View File

@@ -183,11 +183,6 @@ export const env = createEnv({
// Data Retention
FREE_PLAN_LOG_RETENTION_DAYS: z.string().optional(), // Log retention days for free plan users
// Admission & Burst Protection
ADMISSION_GATE_MAX_INFLIGHT: z.string().optional().default('500'), // Max concurrent in-flight execution requests per pod
DISPATCH_MAX_QUEUE_PER_WORKSPACE: z.string().optional().default('1000'), // Max queued dispatch jobs per workspace
DISPATCH_MAX_QUEUE_GLOBAL: z.string().optional().default('50000'), // Max queued dispatch jobs globally
// Rate Limiting Configuration
RATE_LIMIT_WINDOW_MS: z.string().optional().default('60000'), // Rate limit window duration in milliseconds (default: 1 minute)
MANUAL_EXECUTION_LIMIT: z.string().optional().default('999999'),// Manual execution bypass value (effectively unlimited)
@@ -199,10 +194,6 @@ export const env = createEnv({
RATE_LIMIT_TEAM_ASYNC: z.string().optional().default('2500'), // Team tier async API executions per minute
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
WORKSPACE_CONCURRENCY_FREE: z.string().optional().default('5'), // Free tier concurrent workspace executions
WORKSPACE_CONCURRENCY_PRO: z.string().optional().default('50'), // Pro tier concurrent workspace executions
WORKSPACE_CONCURRENCY_TEAM: z.string().optional().default('200'), // Team/Max tier concurrent workspace executions
WORKSPACE_CONCURRENCY_ENTERPRISE: z.string().optional().default('200'), // Enterprise default concurrent workspace executions
// Timeout Configuration
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'), // 5 minutes

View File

@@ -1,80 +0,0 @@
import type {
WorkspaceDispatchClaimResult,
WorkspaceDispatchEnqueueInput,
WorkspaceDispatchJobRecord,
WorkspaceDispatchLane,
} from '@/lib/core/workspace-dispatch/types'
export interface WorkspaceDispatchStorageAdapter {
saveDispatchJob(record: WorkspaceDispatchJobRecord): Promise<void>
getDispatchJobRecord(jobId: string): Promise<WorkspaceDispatchJobRecord | null>
listDispatchJobsByStatuses(
statuses: readonly WorkspaceDispatchJobRecord['status'][]
): Promise<WorkspaceDispatchJobRecord[]>
updateDispatchJobRecord(
jobId: string,
updater: (record: WorkspaceDispatchJobRecord) => WorkspaceDispatchJobRecord
): Promise<WorkspaceDispatchJobRecord | null>
enqueueWorkspaceDispatchJob(
input: WorkspaceDispatchEnqueueInput
): Promise<WorkspaceDispatchJobRecord>
restoreWorkspaceDispatchJob(record: WorkspaceDispatchJobRecord): Promise<void>
claimWorkspaceJob(
workspaceId: string,
options: {
lanes: readonly WorkspaceDispatchLane[]
concurrencyLimit: number
leaseId: string
now: number
leaseTtlMs: number
}
): Promise<WorkspaceDispatchClaimResult>
getWorkspaceQueueDepth(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<number>
getGlobalQueueDepth(): Promise<number>
reconcileGlobalQueueDepth(knownCount: number): Promise<void>
popNextWorkspaceId(): Promise<string | null>
getQueuedWorkspaceCount(): Promise<number>
hasActiveWorkspace(workspaceId: string): Promise<boolean>
ensureWorkspaceActive(workspaceId: string, readyAt?: number): Promise<void>
requeueWorkspaceId(workspaceId: string): Promise<void>
workspaceHasPendingJobs(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<boolean>
getNextWorkspaceJob(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<WorkspaceDispatchJobRecord | null>
removeWorkspaceJobFromLane(
workspaceId: string,
lane: WorkspaceDispatchLane,
jobId: string
): Promise<void>
cleanupExpiredWorkspaceLeases(workspaceId: string): Promise<void>
countActiveWorkspaceLeases(workspaceId: string): Promise<number>
hasWorkspaceLease(workspaceId: string, leaseId: string): Promise<boolean>
createWorkspaceLease(workspaceId: string, leaseId: string, ttlMs: number): Promise<number>
refreshWorkspaceLease(workspaceId: string, leaseId: string, ttlMs: number): Promise<number>
releaseWorkspaceLease(workspaceId: string, leaseId: string): Promise<void>
removeWorkspaceIfIdle(workspaceId: string, lanes: readonly WorkspaceDispatchLane[]): Promise<void>
markDispatchJobAdmitted(
jobId: string,
workspaceId: string,
leaseId: string,
leaseExpiresAt: number
): Promise<void>
markDispatchJobAdmitting(
jobId: string,
workspaceId: string,
leaseId: string,
leaseExpiresAt: number
): Promise<void>
markDispatchJobRunning(jobId: string): Promise<void>
markDispatchJobCompleted(jobId: string, output: unknown): Promise<void>
markDispatchJobFailed(jobId: string, error: string): Promise<void>
clear(): Promise<void>
dispose(): void
}

View File

@@ -1,175 +0,0 @@
/**
* @vitest-environment node
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
const { mockGetWorkspaceConcurrencyLimit, mockAcquireLock, mockReleaseLock } = vi.hoisted(() => ({
mockGetWorkspaceConcurrencyLimit: vi.fn(),
mockAcquireLock: vi.fn(),
mockReleaseLock: vi.fn(),
}))
vi.mock('@/lib/billing/workspace-concurrency', () => ({
getWorkspaceConcurrencyLimit: mockGetWorkspaceConcurrencyLimit,
}))
vi.mock('@/lib/core/config/redis', () => ({
acquireLock: mockAcquireLock,
releaseLock: mockReleaseLock,
getRedisClient: vi.fn().mockReturnValue(null),
}))
vi.mock('@/lib/core/bullmq', () => ({
getBullMQQueueByName: vi.fn().mockReturnValue({
add: vi.fn().mockResolvedValue({ id: 'bullmq-1' }),
}),
}))
import { MemoryWorkspaceDispatchStorage } from '@/lib/core/workspace-dispatch/memory-store'
import {
DISPATCH_SCAN_RESULTS,
dispatchNextAdmissibleWorkspaceJob,
} from '@/lib/core/workspace-dispatch/planner'
import {
enqueueWorkspaceDispatchJob,
setWorkspaceDispatchStorageAdapter,
} from '@/lib/core/workspace-dispatch/store'
describe('workspace dispatch integration (memory-backed)', () => {
let store: MemoryWorkspaceDispatchStorage
beforeEach(async () => {
vi.clearAllMocks()
store = new MemoryWorkspaceDispatchStorage()
setWorkspaceDispatchStorageAdapter(store)
mockGetWorkspaceConcurrencyLimit.mockResolvedValue(5)
mockAcquireLock.mockResolvedValue(true)
mockReleaseLock.mockResolvedValue(true)
})
async function enqueue(
workspaceId: string,
overrides: { lane?: string; delayMs?: number; priority?: number } = {}
) {
return enqueueWorkspaceDispatchJob({
workspaceId,
lane: (overrides.lane ?? 'runtime') as 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: { payload: { workflowId: 'wf-1' } },
metadata: { workflowId: 'wf-1' },
delayMs: overrides.delayMs,
priority: overrides.priority,
})
}
it('admits jobs round-robin across workspaces', async () => {
await enqueue('ws-a')
await enqueue('ws-b')
await enqueue('ws-a')
const r1 = await dispatchNextAdmissibleWorkspaceJob()
const r2 = await dispatchNextAdmissibleWorkspaceJob()
const r3 = await dispatchNextAdmissibleWorkspaceJob()
expect(r1).toBe(DISPATCH_SCAN_RESULTS.ADMITTED)
expect(r2).toBe(DISPATCH_SCAN_RESULTS.ADMITTED)
expect(r3).toBe(DISPATCH_SCAN_RESULTS.ADMITTED)
})
it('respects workspace concurrency limits', async () => {
mockGetWorkspaceConcurrencyLimit.mockResolvedValue(1)
await enqueue('ws-a')
await enqueue('ws-a')
const r1 = await dispatchNextAdmissibleWorkspaceJob()
expect(r1).toBe(DISPATCH_SCAN_RESULTS.ADMITTED)
const r2 = await dispatchNextAdmissibleWorkspaceJob()
expect(r2).toBe(DISPATCH_SCAN_RESULTS.NO_PROGRESS)
})
it('skips delayed jobs and admits ready ones in same lane', async () => {
await enqueue('ws-a', { delayMs: 60_000 })
await enqueue('ws-a', { delayMs: 0 })
const r1 = await dispatchNextAdmissibleWorkspaceJob()
expect(r1).toBe(DISPATCH_SCAN_RESULTS.ADMITTED)
})
it('returns delayed when all jobs are delayed', async () => {
await enqueue('ws-a', { delayMs: 60_000 })
const r1 = await dispatchNextAdmissibleWorkspaceJob()
expect(r1).toBe(DISPATCH_SCAN_RESULTS.NO_PROGRESS)
})
it('returns no_workspace when queue is empty', async () => {
const result = await dispatchNextAdmissibleWorkspaceJob()
expect(result).toBe(DISPATCH_SCAN_RESULTS.NO_WORKSPACE)
})
it('lease cleanup frees capacity for new admissions', async () => {
mockGetWorkspaceConcurrencyLimit.mockResolvedValue(1)
const record = await enqueue('ws-a')
await enqueue('ws-a')
const r1 = await dispatchNextAdmissibleWorkspaceJob()
expect(r1).toBe(DISPATCH_SCAN_RESULTS.ADMITTED)
const updated = await store.getDispatchJobRecord(record.id)
if (updated?.lease) {
await store.releaseWorkspaceLease('ws-a', updated.lease.leaseId)
}
const r2 = await dispatchNextAdmissibleWorkspaceJob()
expect(r2).toBe(DISPATCH_SCAN_RESULTS.ADMITTED)
})
it('expired leases are cleaned up during claim', async () => {
mockGetWorkspaceConcurrencyLimit.mockResolvedValue(1)
await enqueue('ws-a')
await enqueue('ws-a')
const claimResult = await store.claimWorkspaceJob('ws-a', {
lanes: ['runtime'],
concurrencyLimit: 1,
leaseId: 'old-lease',
now: Date.now(),
leaseTtlMs: 1,
})
expect(claimResult.type).toBe('admitted')
await new Promise((resolve) => setTimeout(resolve, 10))
const r2 = await dispatchNextAdmissibleWorkspaceJob()
expect(r2).toBe(DISPATCH_SCAN_RESULTS.ADMITTED)
})
it('recovers job to waiting via restoreWorkspaceDispatchJob', async () => {
const record = await enqueue('ws-a')
await store.claimWorkspaceJob('ws-a', {
lanes: ['runtime'],
concurrencyLimit: 1,
leaseId: 'lease-1',
now: Date.now(),
leaseTtlMs: 1000,
})
await store.markDispatchJobAdmitted(record.id, 'ws-a', 'lease-1', Date.now() + 10000)
const admitted = await store.getDispatchJobRecord(record.id)
expect(admitted).toBeDefined()
const resetRecord = { ...admitted!, status: 'waiting' as const, lease: undefined }
await store.restoreWorkspaceDispatchJob(resetRecord)
const restored = await store.getDispatchJobRecord(record.id)
expect(restored?.status).toBe('waiting')
expect(restored?.lease).toBeUndefined()
})
})

View File

@@ -1,156 +0,0 @@
import { createLogger } from '@sim/logger'
import { env } from '@/lib/core/config/env'
import {
enqueueWorkspaceDispatchJob,
getDispatchJobRecord,
getGlobalQueueDepth,
getQueuedWorkspaceCount,
getWorkspaceQueueDepth,
} from '@/lib/core/workspace-dispatch/store'
import {
WORKSPACE_DISPATCH_LANES,
type WorkspaceDispatchEnqueueInput,
type WorkspaceDispatchJobRecord,
} from '@/lib/core/workspace-dispatch/types'
import { DISPATCH_SCAN_RESULTS, dispatchNextAdmissibleWorkspaceJob } from './planner'
import { reconcileWorkspaceDispatchState } from './reconciler'
const logger = createLogger('WorkspaceDispatcher')
const WAIT_POLL_INTERVAL_MS = 250
const RECONCILE_INTERVAL_MS = 30_000
const MAX_QUEUE_PER_WORKSPACE = Number.parseInt(env.DISPATCH_MAX_QUEUE_PER_WORKSPACE ?? '') || 1000
const MAX_QUEUE_GLOBAL = Number.parseInt(env.DISPATCH_MAX_QUEUE_GLOBAL ?? '') || 50_000
let dispatcherRunning = false
let dispatcherWakePending = false
let lastReconcileAt = 0
async function runDispatcherLoop(): Promise<void> {
if (dispatcherRunning) {
dispatcherWakePending = true
return
}
dispatcherRunning = true
try {
const now = Date.now()
if (now - lastReconcileAt >= RECONCILE_INTERVAL_MS) {
await reconcileWorkspaceDispatchState()
lastReconcileAt = now
}
do {
dispatcherWakePending = false
const queuedWorkspaces = await getQueuedWorkspaceCount()
if (queuedWorkspaces === 0) {
continue
}
let admitted = 0
let scanned = 0
const loopStartMs = Date.now()
for (let index = 0; index < queuedWorkspaces; index++) {
scanned++
const result = await dispatchNextAdmissibleWorkspaceJob()
if (result === DISPATCH_SCAN_RESULTS.ADMITTED) {
admitted++
}
if (result === DISPATCH_SCAN_RESULTS.NO_WORKSPACE) {
break
}
}
if (admitted > 0) {
dispatcherWakePending = true
}
if (admitted > 0 || scanned > 0) {
logger.info('Dispatcher pass', {
admitted,
scanned,
queuedWorkspaces,
durationMs: Date.now() - loopStartMs,
})
}
} while (dispatcherWakePending)
} catch (error) {
logger.error('Workspace dispatcher loop failed', { error })
} finally {
dispatcherRunning = false
}
}
export class DispatchQueueFullError extends Error {
readonly statusCode = 503
constructor(
readonly scope: 'workspace' | 'global',
readonly depth: number,
readonly limit: number
) {
super(
scope === 'workspace'
? `Workspace queue is at capacity (${depth}/${limit})`
: `Global dispatch queue is at capacity (${depth}/${limit})`
)
this.name = 'DispatchQueueFullError'
}
}
export async function enqueueWorkspaceDispatch(
input: WorkspaceDispatchEnqueueInput
): Promise<string> {
const [workspaceDepth, globalDepth] = await Promise.all([
getWorkspaceQueueDepth(input.workspaceId, WORKSPACE_DISPATCH_LANES),
getGlobalQueueDepth(),
])
if (workspaceDepth >= MAX_QUEUE_PER_WORKSPACE) {
logger.warn('Workspace dispatch queue at capacity', {
workspaceId: input.workspaceId,
depth: workspaceDepth,
limit: MAX_QUEUE_PER_WORKSPACE,
})
throw new DispatchQueueFullError('workspace', workspaceDepth, MAX_QUEUE_PER_WORKSPACE)
}
if (globalDepth >= MAX_QUEUE_GLOBAL) {
logger.warn('Global dispatch queue at capacity', {
depth: globalDepth,
limit: MAX_QUEUE_GLOBAL,
})
throw new DispatchQueueFullError('global', globalDepth, MAX_QUEUE_GLOBAL)
}
const record = await enqueueWorkspaceDispatchJob(input)
void runDispatcherLoop()
return record.id
}
export async function wakeWorkspaceDispatcher(): Promise<void> {
await runDispatcherLoop()
}
export async function waitForDispatchJob(
dispatchJobId: string,
timeoutMs: number
): Promise<WorkspaceDispatchJobRecord> {
const deadline = Date.now() + timeoutMs
while (Date.now() < deadline) {
const record = await getDispatchJobRecord(dispatchJobId)
if (!record) {
throw new Error(`Dispatch job not found: ${dispatchJobId}`)
}
if (record.status === 'completed' || record.status === 'failed') {
return record
}
await new Promise((resolve) => setTimeout(resolve, WAIT_POLL_INTERVAL_MS))
}
throw new Error(`Timed out waiting for dispatch job ${dispatchJobId}`)
}

View File

@@ -1,42 +0,0 @@
import { createLogger } from '@sim/logger'
import { getRedisClient } from '@/lib/core/config/redis'
import type { WorkspaceDispatchStorageAdapter } from '@/lib/core/workspace-dispatch/adapter'
import { MemoryWorkspaceDispatchStorage } from '@/lib/core/workspace-dispatch/memory-store'
import { RedisWorkspaceDispatchStorage } from '@/lib/core/workspace-dispatch/redis-store'
const logger = createLogger('WorkspaceDispatchFactory')
let cachedAdapter: WorkspaceDispatchStorageAdapter | null = null
export function createWorkspaceDispatchStorageAdapter(): WorkspaceDispatchStorageAdapter {
if (cachedAdapter) {
return cachedAdapter
}
const redis = getRedisClient()
if (redis) {
logger.info('Workspace dispatcher: Using Redis storage')
const adapter = new RedisWorkspaceDispatchStorage(redis)
cachedAdapter = adapter
return adapter
}
logger.warn(
'Workspace dispatcher: Using in-memory storage; distributed fairness is disabled in multi-process deployments'
)
const adapter = new MemoryWorkspaceDispatchStorage()
cachedAdapter = adapter
return adapter
}
export function setWorkspaceDispatchStorageAdapter(adapter: WorkspaceDispatchStorageAdapter): void {
cachedAdapter = adapter
}
export function resetWorkspaceDispatchStorageAdapter(): void {
if (cachedAdapter) {
cachedAdapter.dispose()
cachedAdapter = null
}
}

View File

@@ -1,32 +0,0 @@
export type { WorkspaceDispatchStorageAdapter } from './adapter'
export {
DispatchQueueFullError,
enqueueWorkspaceDispatch,
waitForDispatchJob,
wakeWorkspaceDispatcher,
} from './dispatcher'
export {
createWorkspaceDispatchStorageAdapter,
resetWorkspaceDispatchStorageAdapter,
} from './factory'
export {
markDispatchJobAdmitted,
markDispatchJobAdmitting,
markDispatchJobCompleted,
markDispatchJobFailed,
markDispatchJobRunning,
refreshWorkspaceLease,
releaseWorkspaceLease,
} from './store'
export {
WORKSPACE_DISPATCH_LANES,
WORKSPACE_DISPATCH_STATUSES,
type WorkspaceDispatchEnqueueInput,
type WorkspaceDispatchJobContext,
type WorkspaceDispatchJobRecord,
type WorkspaceDispatchLane,
type WorkspaceDispatchLeaseInfo,
type WorkspaceDispatchQueueName,
type WorkspaceDispatchStatus,
} from './types'
export { getDispatchRuntimeMetadata, runDispatchedJob } from './worker'

View File

@@ -1,65 +0,0 @@
/**
* @vitest-environment node
*/
import { afterEach, describe, expect, it } from 'vitest'
import { MemoryWorkspaceDispatchStorage } from '@/lib/core/workspace-dispatch/memory-store'
describe('memory workspace dispatch storage', () => {
const store = new MemoryWorkspaceDispatchStorage()
afterEach(async () => {
await store.clear()
})
it('claims a runnable job and marks it admitting with a lease', async () => {
const record = await store.enqueueWorkspaceDispatchJob({
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: { payload: { workflowId: 'workflow-1' } },
metadata: {
workflowId: 'workflow-1',
},
})
const result = await store.claimWorkspaceJob('workspace-1', {
lanes: ['runtime'],
concurrencyLimit: 1,
leaseId: 'lease-1',
now: Date.now(),
leaseTtlMs: 1000,
})
expect(result.type).toBe('admitted')
if (result.type === 'admitted') {
expect(result.record.id).toBe(record.id)
expect(result.record.status).toBe('admitting')
expect(result.record.lease?.leaseId).toBe('lease-1')
}
})
it('returns delayed when only delayed jobs exist', async () => {
await store.enqueueWorkspaceDispatchJob({
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: { payload: { workflowId: 'workflow-1' } },
metadata: {
workflowId: 'workflow-1',
},
delayMs: 5000,
})
const result = await store.claimWorkspaceJob('workspace-1', {
lanes: ['runtime'],
concurrencyLimit: 1,
leaseId: 'lease-2',
now: Date.now(),
leaseTtlMs: 1000,
})
expect(result.type).toBe('delayed')
})
})

View File

@@ -1,505 +0,0 @@
import { createLogger } from '@sim/logger'
import type { WorkspaceDispatchStorageAdapter } from '@/lib/core/workspace-dispatch/adapter'
import {
WORKSPACE_DISPATCH_CLAIM_RESULTS,
type WorkspaceDispatchClaimResult,
type WorkspaceDispatchEnqueueInput,
type WorkspaceDispatchJobRecord,
type WorkspaceDispatchLane,
} from '@/lib/core/workspace-dispatch/types'
const logger = createLogger('WorkspaceDispatchMemoryStore')
const JOB_TTL_MS = 48 * 60 * 60 * 1000
export class MemoryWorkspaceDispatchStorage implements WorkspaceDispatchStorageAdapter {
private jobs = new Map<string, WorkspaceDispatchJobRecord>()
private workspaceOrder: string[] = []
private laneQueues = new Map<string, string[]>()
private leases = new Map<string, Map<string, number>>()
private cleanupInterval: NodeJS.Timeout | null = null
constructor() {
this.cleanupInterval = setInterval(() => {
void this.clearExpiredState()
}, 60_000)
this.cleanupInterval.unref()
}
private queueKey(workspaceId: string, lane: WorkspaceDispatchLane): string {
return `${workspaceId}:${lane}`
}
private ensureWorkspaceQueued(workspaceId: string): void {
if (!this.workspaceOrder.includes(workspaceId)) {
this.workspaceOrder.push(workspaceId)
}
}
private getLaneQueue(workspaceId: string, lane: WorkspaceDispatchLane): string[] {
const key = this.queueKey(workspaceId, lane)
const existing = this.laneQueues.get(key)
if (existing) {
return existing
}
const queue: string[] = []
this.laneQueues.set(key, queue)
return queue
}
private sortQueue(queue: string[]): void {
queue.sort((leftId, rightId) => {
const left = this.jobs.get(leftId)
const right = this.jobs.get(rightId)
if (!left || !right) {
return 0
}
if (left.priority !== right.priority) {
return left.priority - right.priority
}
return left.createdAt - right.createdAt
})
}
private getLeaseMap(workspaceId: string): Map<string, number> {
const existing = this.leases.get(workspaceId)
if (existing) {
return existing
}
const leaseMap = new Map<string, number>()
this.leases.set(workspaceId, leaseMap)
return leaseMap
}
private async clearExpiredState(): Promise<void> {
const now = Date.now()
for (const [jobId, record] of this.jobs.entries()) {
if (
(record.status === 'completed' || record.status === 'failed') &&
record.completedAt &&
now - record.completedAt > JOB_TTL_MS
) {
this.jobs.delete(jobId)
}
}
for (const [workspaceId, leaseMap] of this.leases.entries()) {
for (const [leaseId, expiresAt] of leaseMap.entries()) {
if (expiresAt <= now) {
leaseMap.delete(leaseId)
}
}
if (leaseMap.size === 0) {
this.leases.delete(workspaceId)
}
}
}
async saveDispatchJob(record: WorkspaceDispatchJobRecord): Promise<void> {
this.jobs.set(record.id, record)
}
async getDispatchJobRecord(jobId: string): Promise<WorkspaceDispatchJobRecord | null> {
return this.jobs.get(jobId) ?? null
}
async listDispatchJobsByStatuses(
statuses: readonly WorkspaceDispatchJobRecord['status'][]
): Promise<WorkspaceDispatchJobRecord[]> {
return Array.from(this.jobs.values()).filter((record) => statuses.includes(record.status))
}
private static readonly TERMINAL_STATUSES = new Set(['completed', 'failed'])
async updateDispatchJobRecord(
jobId: string,
updater: (record: WorkspaceDispatchJobRecord) => WorkspaceDispatchJobRecord
): Promise<WorkspaceDispatchJobRecord | null> {
const current = this.jobs.get(jobId)
if (!current) {
return null
}
const updated = updater(current)
if (
MemoryWorkspaceDispatchStorage.TERMINAL_STATUSES.has(current.status) &&
!MemoryWorkspaceDispatchStorage.TERMINAL_STATUSES.has(updated.status)
) {
return current
}
this.jobs.set(jobId, updated)
return updated
}
async enqueueWorkspaceDispatchJob(
input: WorkspaceDispatchEnqueueInput
): Promise<WorkspaceDispatchJobRecord> {
const id = input.id ?? `dispatch_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`
const createdAt = Date.now()
const record: WorkspaceDispatchJobRecord = {
id,
workspaceId: input.workspaceId,
lane: input.lane,
queueName: input.queueName,
bullmqJobName: input.bullmqJobName,
bullmqPayload: input.bullmqPayload,
metadata: input.metadata,
priority: input.priority ?? 100,
maxAttempts: input.maxAttempts,
delayMs: input.delayMs,
status: 'waiting',
createdAt,
}
this.jobs.set(id, record)
const queue = this.getLaneQueue(record.workspaceId, record.lane)
queue.push(id)
this.sortQueue(queue)
this.ensureWorkspaceQueued(record.workspaceId)
return record
}
async restoreWorkspaceDispatchJob(record: WorkspaceDispatchJobRecord): Promise<void> {
this.jobs.set(record.id, record)
const queue = this.getLaneQueue(record.workspaceId, record.lane)
if (!queue.includes(record.id)) {
queue.push(record.id)
this.sortQueue(queue)
}
this.ensureWorkspaceQueued(record.workspaceId)
}
async claimWorkspaceJob(
workspaceId: string,
options: {
lanes: readonly WorkspaceDispatchLane[]
concurrencyLimit: number
leaseId: string
now: number
leaseTtlMs: number
}
): Promise<WorkspaceDispatchClaimResult> {
await this.cleanupExpiredWorkspaceLeases(workspaceId)
if (this.getLeaseMap(workspaceId).size >= options.concurrencyLimit) {
this.ensureWorkspaceQueued(workspaceId)
return { type: WORKSPACE_DISPATCH_CLAIM_RESULTS.LIMIT_REACHED }
}
let selectedRecord: WorkspaceDispatchJobRecord | null = null
let selectedLane: WorkspaceDispatchLane | null = null
let nextReadyAt: number | null = null
for (const lane of options.lanes) {
const queue = this.getLaneQueue(workspaceId, lane)
for (let scanIndex = 0; scanIndex < queue.length && scanIndex < 20; ) {
const jobId = queue[scanIndex]
const record = this.jobs.get(jobId)
if (!record) {
queue.splice(scanIndex, 1)
continue
}
const readyAt = record.createdAt + (record.delayMs ?? 0)
if (readyAt <= options.now) {
selectedRecord = record
selectedLane = lane
queue.splice(scanIndex, 1)
break
}
nextReadyAt = nextReadyAt ? Math.min(nextReadyAt, readyAt) : readyAt
scanIndex++
}
if (selectedRecord) {
break
}
}
if (!selectedRecord || !selectedLane) {
const hasPending = await this.workspaceHasPendingJobs(workspaceId, options.lanes)
if (!hasPending) {
this.workspaceOrder = this.workspaceOrder.filter((value) => value !== workspaceId)
return { type: WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY }
}
this.ensureWorkspaceQueued(workspaceId)
return {
type: WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED,
nextReadyAt: nextReadyAt ?? options.now,
}
}
const leaseExpiresAt = options.now + options.leaseTtlMs
this.getLeaseMap(workspaceId).set(options.leaseId, leaseExpiresAt)
const updatedRecord: WorkspaceDispatchJobRecord = {
...selectedRecord,
status: 'admitting',
lease: {
workspaceId,
leaseId: options.leaseId,
},
metadata: {
...selectedRecord.metadata,
dispatchLeaseExpiresAt: leaseExpiresAt,
},
}
this.jobs.set(updatedRecord.id, updatedRecord)
const hasPending = await this.workspaceHasPendingJobs(workspaceId, options.lanes)
if (hasPending) {
this.ensureWorkspaceQueued(workspaceId)
} else {
this.workspaceOrder = this.workspaceOrder.filter((value) => value !== workspaceId)
}
return {
type: WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED,
record: updatedRecord,
leaseId: options.leaseId,
leaseExpiresAt,
}
}
async getWorkspaceQueueDepth(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<number> {
let depth = 0
for (const lane of lanes) {
depth += this.getLaneQueue(workspaceId, lane).length
}
return depth
}
async getGlobalQueueDepth(): Promise<number> {
const terminalStatuses = new Set(['completed', 'failed'])
let count = 0
for (const job of this.jobs.values()) {
if (!terminalStatuses.has(job.status)) {
count++
}
}
return count
}
async reconcileGlobalQueueDepth(_knownCount: number): Promise<void> {
// no-op: memory store computes depth on the fly
}
async popNextWorkspaceId(): Promise<string | null> {
const now = Date.now()
const maxScans = this.workspaceOrder.length
for (let i = 0; i < maxScans; i++) {
const id = this.workspaceOrder.shift()
if (!id) return null
const readyAt = this.workspaceReadyAt.get(id)
if (readyAt && readyAt > now) {
this.workspaceOrder.push(id)
continue
}
this.workspaceReadyAt.delete(id)
return id
}
return null
}
async getQueuedWorkspaceCount(): Promise<number> {
return this.workspaceOrder.length
}
async hasActiveWorkspace(workspaceId: string): Promise<boolean> {
return this.workspaceOrder.includes(workspaceId)
}
private workspaceReadyAt = new Map<string, number>()
async ensureWorkspaceActive(workspaceId: string, readyAt?: number): Promise<void> {
if (readyAt && readyAt > Date.now()) {
this.workspaceReadyAt.set(workspaceId, readyAt)
}
this.ensureWorkspaceQueued(workspaceId)
}
async requeueWorkspaceId(workspaceId: string): Promise<void> {
this.ensureWorkspaceQueued(workspaceId)
}
async workspaceHasPendingJobs(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<boolean> {
return lanes.some((lane) => this.getLaneQueue(workspaceId, lane).length > 0)
}
async getNextWorkspaceJob(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<WorkspaceDispatchJobRecord | null> {
for (const lane of lanes) {
const queue = this.getLaneQueue(workspaceId, lane)
while (queue.length > 0) {
const jobId = queue[0]
const job = this.jobs.get(jobId)
if (job) {
return job
}
queue.shift()
}
}
return null
}
async removeWorkspaceJobFromLane(
workspaceId: string,
lane: WorkspaceDispatchLane,
jobId: string
): Promise<void> {
const queue = this.getLaneQueue(workspaceId, lane)
const index = queue.indexOf(jobId)
if (index >= 0) {
queue.splice(index, 1)
}
}
async cleanupExpiredWorkspaceLeases(workspaceId: string): Promise<void> {
const leaseMap = this.getLeaseMap(workspaceId)
const now = Date.now()
for (const [leaseId, expiresAt] of leaseMap.entries()) {
if (expiresAt <= now) {
leaseMap.delete(leaseId)
}
}
}
async countActiveWorkspaceLeases(workspaceId: string): Promise<number> {
await this.cleanupExpiredWorkspaceLeases(workspaceId)
return this.getLeaseMap(workspaceId).size
}
async hasWorkspaceLease(workspaceId: string, leaseId: string): Promise<boolean> {
await this.cleanupExpiredWorkspaceLeases(workspaceId)
return this.getLeaseMap(workspaceId).has(leaseId)
}
async createWorkspaceLease(workspaceId: string, leaseId: string, ttlMs: number): Promise<number> {
const expiresAt = Date.now() + ttlMs
this.getLeaseMap(workspaceId).set(leaseId, expiresAt)
return expiresAt
}
async refreshWorkspaceLease(
workspaceId: string,
leaseId: string,
ttlMs: number
): Promise<number> {
return this.createWorkspaceLease(workspaceId, leaseId, ttlMs)
}
async releaseWorkspaceLease(workspaceId: string, leaseId: string): Promise<void> {
this.getLeaseMap(workspaceId).delete(leaseId)
}
async removeWorkspaceIfIdle(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<void> {
const hasPending = await this.workspaceHasPendingJobs(workspaceId, lanes)
if (!hasPending) {
this.workspaceOrder = this.workspaceOrder.filter((value) => value !== workspaceId)
}
}
async markDispatchJobAdmitted(
jobId: string,
workspaceId: string,
leaseId: string,
leaseExpiresAt: number
): Promise<void> {
await this.updateDispatchJobRecord(jobId, (record) => ({
...record,
status: 'admitted',
admittedAt: Date.now(),
lease: {
workspaceId,
leaseId,
},
metadata: {
...record.metadata,
dispatchLeaseExpiresAt: leaseExpiresAt,
},
}))
}
async markDispatchJobAdmitting(
jobId: string,
workspaceId: string,
leaseId: string,
leaseExpiresAt: number
): Promise<void> {
await this.updateDispatchJobRecord(jobId, (record) => ({
...record,
status: 'admitting',
lease: {
workspaceId,
leaseId,
},
metadata: {
...record.metadata,
dispatchLeaseExpiresAt: leaseExpiresAt,
},
}))
}
async markDispatchJobRunning(jobId: string): Promise<void> {
await this.updateDispatchJobRecord(jobId, (record) => ({
...record,
status: 'running',
startedAt: record.startedAt ?? Date.now(),
}))
}
async markDispatchJobCompleted(jobId: string, output: unknown): Promise<void> {
await this.updateDispatchJobRecord(jobId, (record) => ({
...record,
status: 'completed',
completedAt: Date.now(),
output,
}))
}
async markDispatchJobFailed(jobId: string, error: string): Promise<void> {
await this.updateDispatchJobRecord(jobId, (record) => ({
...record,
status: 'failed',
completedAt: Date.now(),
error,
}))
}
async clear(): Promise<void> {
this.jobs.clear()
this.workspaceOrder = []
this.laneQueues.clear()
this.leases.clear()
this.workspaceReadyAt.clear()
}
dispose(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval)
this.cleanupInterval = null
}
void this.clear().catch((error) => {
logger.error('Failed to clear memory workspace dispatch storage', { error })
})
}
}

View File

@@ -1,154 +0,0 @@
import { createLogger } from '@sim/logger'
import { getWorkspaceConcurrencyLimit } from '@/lib/billing/workspace-concurrency'
import { type BullMQJobData, getBullMQQueueByName } from '@/lib/core/bullmq'
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
import {
claimWorkspaceJob,
markDispatchJobAdmitted,
popNextWorkspaceId,
releaseWorkspaceLease,
removeWorkspaceIfIdle,
requeueWorkspaceId,
} from '@/lib/core/workspace-dispatch/store'
import {
WORKSPACE_DISPATCH_CLAIM_RESULTS,
WORKSPACE_DISPATCH_LANES,
type WorkspaceDispatchJobRecord,
} from '@/lib/core/workspace-dispatch/types'
const logger = createLogger('WorkspaceDispatchPlanner')
const LEASE_TTL_MS = 15 * 60 * 1000
const WORKSPACE_CLAIM_LOCK_TTL_SECONDS = 10
export const DISPATCH_SCAN_RESULTS = {
NO_WORKSPACE: 'no_workspace',
NO_PROGRESS: 'no_progress',
ADMITTED: 'admitted',
} as const
export type DispatchScanResult = (typeof DISPATCH_SCAN_RESULTS)[keyof typeof DISPATCH_SCAN_RESULTS]
function attachDispatchMetadata(
bullmqPayload: unknown,
record: WorkspaceDispatchJobRecord,
leaseId: string,
leaseExpiresAt: number
): BullMQJobData<unknown> {
if (
bullmqPayload &&
typeof bullmqPayload === 'object' &&
'payload' in bullmqPayload &&
'metadata' in bullmqPayload
) {
const data = bullmqPayload as BullMQJobData<unknown>
return {
payload: data.payload,
metadata: {
...(data.metadata ?? {}),
dispatchJobId: record.id,
dispatchWorkspaceId: record.workspaceId,
dispatchLeaseId: leaseId,
dispatchLeaseExpiresAt: leaseExpiresAt,
},
}
}
return {
payload: bullmqPayload,
metadata: {
...record.metadata,
dispatchJobId: record.id,
dispatchWorkspaceId: record.workspaceId,
dispatchLeaseId: leaseId,
dispatchLeaseExpiresAt: leaseExpiresAt,
},
}
}
async function finalizeAdmittedJob(
record: WorkspaceDispatchJobRecord,
leaseId: string,
leaseExpiresAt: number
): Promise<void> {
try {
await getBullMQQueueByName(record.queueName).add(
record.bullmqJobName,
attachDispatchMetadata(record.bullmqPayload, record, leaseId, leaseExpiresAt),
{
jobId: record.id,
attempts: record.maxAttempts,
priority: record.priority,
}
)
await markDispatchJobAdmitted(record.id, record.workspaceId, leaseId, leaseExpiresAt)
} catch (error) {
await releaseWorkspaceLease(record.workspaceId, leaseId).catch(() => undefined)
throw error
}
}
export async function dispatchNextAdmissibleWorkspaceJob(): Promise<DispatchScanResult> {
const workspaceId = await popNextWorkspaceId()
if (!workspaceId) {
return DISPATCH_SCAN_RESULTS.NO_WORKSPACE
}
const lockValue = `lock_${crypto.randomUUID()}`
try {
const lockKey = `workspace-dispatch:claim-lock:${workspaceId}`
const acquired = await acquireLock(lockKey, lockValue, WORKSPACE_CLAIM_LOCK_TTL_SECONDS)
if (!acquired) {
await requeueWorkspaceId(workspaceId)
return DISPATCH_SCAN_RESULTS.NO_PROGRESS
}
const limit = await getWorkspaceConcurrencyLimit(workspaceId)
const leaseId = `lease_${crypto.randomUUID()}`
const claimResult = await claimWorkspaceJob(workspaceId, {
lanes: WORKSPACE_DISPATCH_LANES,
concurrencyLimit: limit,
leaseId,
now: Date.now(),
leaseTtlMs: LEASE_TTL_MS,
})
switch (claimResult.type) {
case WORKSPACE_DISPATCH_CLAIM_RESULTS.LIMIT_REACHED:
logger.debug('Workspace concurrency limit reached', { workspaceId, limit })
await requeueWorkspaceId(workspaceId)
return DISPATCH_SCAN_RESULTS.NO_PROGRESS
case WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED:
logger.debug('Workspace has only delayed jobs', {
workspaceId,
nextReadyAt: claimResult.nextReadyAt,
})
return DISPATCH_SCAN_RESULTS.NO_PROGRESS
case WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY:
await removeWorkspaceIfIdle(workspaceId, WORKSPACE_DISPATCH_LANES)
return DISPATCH_SCAN_RESULTS.NO_PROGRESS
case WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED:
logger.info('Admitting workspace job', {
workspaceId,
dispatchJobId: claimResult.record.id,
lane: claimResult.record.lane,
queueName: claimResult.record.queueName,
})
await finalizeAdmittedJob(
claimResult.record,
claimResult.leaseId,
claimResult.leaseExpiresAt
)
return DISPATCH_SCAN_RESULTS.ADMITTED
}
} catch (error) {
logger.error('Failed to dispatch workspace job', { workspaceId, error })
await requeueWorkspaceId(workspaceId)
return DISPATCH_SCAN_RESULTS.NO_PROGRESS
} finally {
await releaseLock(`workspace-dispatch:claim-lock:${workspaceId}`, lockValue).catch(
() => undefined
)
}
}

View File

@@ -1,225 +0,0 @@
/**
* @vitest-environment node
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
const {
mockGetBullMQQueueByName,
mockHasActiveWorkspace,
mockEnsureWorkspaceActive,
mockHasWorkspaceLease,
mockListDispatchJobsByStatuses,
mockMarkDispatchJobAdmitted,
mockMarkDispatchJobCompleted,
mockMarkDispatchJobFailed,
mockRefreshWorkspaceLease,
mockReleaseWorkspaceLease,
mockRemoveWorkspaceJobFromLane,
mockRestoreWorkspaceDispatchJob,
mockWakeWorkspaceDispatcher,
} = vi.hoisted(() => ({
mockGetBullMQQueueByName: vi.fn(),
mockHasActiveWorkspace: vi.fn(),
mockEnsureWorkspaceActive: vi.fn(),
mockHasWorkspaceLease: vi.fn(),
mockListDispatchJobsByStatuses: vi.fn(),
mockMarkDispatchJobAdmitted: vi.fn(),
mockMarkDispatchJobCompleted: vi.fn(),
mockMarkDispatchJobFailed: vi.fn(),
mockRefreshWorkspaceLease: vi.fn(),
mockReleaseWorkspaceLease: vi.fn(),
mockRemoveWorkspaceJobFromLane: vi.fn(),
mockRestoreWorkspaceDispatchJob: vi.fn(),
mockWakeWorkspaceDispatcher: vi.fn(),
}))
vi.mock('@sim/logger', () => ({
createLogger: () => ({
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
}),
}))
vi.mock('@/lib/core/bullmq', () => ({
getBullMQQueueByName: mockGetBullMQQueueByName,
}))
vi.mock('@/lib/core/workspace-dispatch/store', () => ({
ensureWorkspaceActive: mockEnsureWorkspaceActive,
hasActiveWorkspace: mockHasActiveWorkspace,
hasWorkspaceLease: mockHasWorkspaceLease,
listDispatchJobsByStatuses: mockListDispatchJobsByStatuses,
markDispatchJobAdmitted: mockMarkDispatchJobAdmitted,
markDispatchJobCompleted: mockMarkDispatchJobCompleted,
markDispatchJobFailed: mockMarkDispatchJobFailed,
reconcileGlobalQueueDepth: vi.fn().mockResolvedValue(undefined),
refreshWorkspaceLease: mockRefreshWorkspaceLease,
releaseWorkspaceLease: mockReleaseWorkspaceLease,
removeWorkspaceJobFromLane: mockRemoveWorkspaceJobFromLane,
restoreWorkspaceDispatchJob: mockRestoreWorkspaceDispatchJob,
}))
vi.mock('@/lib/core/workspace-dispatch/dispatcher', () => ({
wakeWorkspaceDispatcher: mockWakeWorkspaceDispatcher,
}))
import { reconcileWorkspaceDispatchState } from '@/lib/core/workspace-dispatch/reconciler'
describe('workspace dispatch reconciler', () => {
beforeEach(() => {
vi.clearAllMocks()
mockHasActiveWorkspace.mockResolvedValue(true)
mockRemoveWorkspaceJobFromLane.mockResolvedValue(undefined)
})
it('marks dispatch job completed when BullMQ job is completed', async () => {
mockListDispatchJobsByStatuses.mockResolvedValue([
{
id: 'dispatch-1',
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: {},
metadata: {},
priority: 10,
status: 'running',
createdAt: 1,
lease: {
workspaceId: 'workspace-1',
leaseId: 'lease-1',
},
},
])
mockGetBullMQQueueByName.mockReturnValue({
getJob: vi.fn().mockResolvedValue({
getState: vi.fn().mockResolvedValue('completed'),
returnvalue: { ok: true },
}),
})
await reconcileWorkspaceDispatchState()
expect(mockMarkDispatchJobCompleted).toHaveBeenCalledWith('dispatch-1', { ok: true })
expect(mockReleaseWorkspaceLease).toHaveBeenCalledWith('workspace-1', 'lease-1')
expect(mockWakeWorkspaceDispatcher).toHaveBeenCalled()
})
it('restores admitted jobs to waiting when lease and BullMQ job are gone', async () => {
mockListDispatchJobsByStatuses.mockResolvedValue([
{
id: 'dispatch-2',
workspaceId: 'workspace-2',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: {},
metadata: {},
priority: 10,
status: 'admitted',
createdAt: 1,
admittedAt: 2,
lease: {
workspaceId: 'workspace-2',
leaseId: 'lease-2',
},
},
])
mockGetBullMQQueueByName.mockReturnValue({
getJob: vi.fn().mockResolvedValue(null),
})
mockHasWorkspaceLease.mockResolvedValue(false)
await reconcileWorkspaceDispatchState()
expect(mockRestoreWorkspaceDispatchJob).toHaveBeenCalledWith(
expect.objectContaining({
id: 'dispatch-2',
status: 'waiting',
lease: undefined,
})
)
expect(mockWakeWorkspaceDispatcher).toHaveBeenCalled()
})
it('reacquires the lease for a live admitting BullMQ job', async () => {
mockListDispatchJobsByStatuses.mockResolvedValue([
{
id: 'dispatch-3',
workspaceId: 'workspace-3',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: {},
metadata: {
dispatchLeaseExpiresAt: 12345,
},
priority: 10,
status: 'admitting',
createdAt: 1,
lease: {
workspaceId: 'workspace-3',
leaseId: 'lease-3',
},
},
])
mockGetBullMQQueueByName.mockReturnValue({
getJob: vi.fn().mockResolvedValue({
getState: vi.fn().mockResolvedValue('active'),
}),
})
mockHasWorkspaceLease.mockResolvedValue(false)
await reconcileWorkspaceDispatchState()
expect(mockRefreshWorkspaceLease).toHaveBeenCalledWith('workspace-3', 'lease-3', 15 * 60 * 1000)
expect(mockMarkDispatchJobAdmitted).toHaveBeenCalledWith(
'dispatch-3',
'workspace-3',
'lease-3',
12345
)
expect(mockRemoveWorkspaceJobFromLane).toHaveBeenCalledWith(
'workspace-3',
'runtime',
'dispatch-3'
)
})
it('releases leaked lease and restores waiting when BullMQ job is gone but lease remains', async () => {
mockListDispatchJobsByStatuses.mockResolvedValue([
{
id: 'dispatch-4',
workspaceId: 'workspace-4',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: {},
metadata: {},
priority: 10,
status: 'running',
createdAt: 1,
lease: {
workspaceId: 'workspace-4',
leaseId: 'lease-4',
},
},
])
mockGetBullMQQueueByName.mockReturnValue({
getJob: vi.fn().mockResolvedValue(null),
})
mockHasWorkspaceLease.mockResolvedValue(true)
await reconcileWorkspaceDispatchState()
expect(mockReleaseWorkspaceLease).toHaveBeenCalledWith('workspace-4', 'lease-4')
expect(mockRestoreWorkspaceDispatchJob).toHaveBeenCalledWith(
expect.objectContaining({
id: 'dispatch-4',
status: 'waiting',
})
)
})
})

View File

@@ -1,226 +0,0 @@
import { createLogger } from '@sim/logger'
import { getBullMQQueueByName } from '@/lib/core/bullmq'
import {
ensureWorkspaceActive,
hasActiveWorkspace,
hasWorkspaceLease,
listDispatchJobsByStatuses,
markDispatchJobAdmitted,
markDispatchJobCompleted,
markDispatchJobFailed,
markDispatchJobRunning,
reconcileGlobalQueueDepth,
refreshWorkspaceLease,
releaseWorkspaceLease,
removeWorkspaceJobFromLane,
restoreWorkspaceDispatchJob,
} from '@/lib/core/workspace-dispatch/store'
import type { WorkspaceDispatchJobRecord } from '@/lib/core/workspace-dispatch/types'
import { wakeWorkspaceDispatcher } from './dispatcher'
const logger = createLogger('WorkspaceDispatchReconciler')
const LEASE_TTL_MS = 15 * 60 * 1000
function resetToWaiting(record: WorkspaceDispatchJobRecord): WorkspaceDispatchJobRecord {
return {
...record,
status: 'waiting',
admittedAt: undefined,
startedAt: undefined,
completedAt: undefined,
output: undefined,
error: undefined,
lease: undefined,
}
}
async function reconcileTerminalBullMQState(record: WorkspaceDispatchJobRecord): Promise<boolean> {
const queue = getBullMQQueueByName(record.queueName)
const job = await queue.getJob(record.id)
if (!job) {
return false
}
const state = await job.getState()
if (state === 'completed') {
await markDispatchJobCompleted(record.id, job.returnvalue)
if (record.lease) {
await releaseWorkspaceLease(record.lease.workspaceId, record.lease.leaseId)
}
return true
}
if (state === 'failed' && job.attemptsMade >= (job.opts.attempts ?? 1)) {
await markDispatchJobFailed(record.id, job.failedReason || 'Job failed')
if (record.lease) {
await releaseWorkspaceLease(record.lease.workspaceId, record.lease.leaseId)
}
return true
}
return false
}
async function reconcileStrandedDispatchJob(record: WorkspaceDispatchJobRecord): Promise<boolean> {
if (!record.lease && record.status !== 'waiting') {
await restoreWorkspaceDispatchJob(resetToWaiting(record))
return true
}
if (!record.lease) {
return false
}
const hasLease = await hasWorkspaceLease(record.lease.workspaceId, record.lease.leaseId)
const queue = getBullMQQueueByName(record.queueName)
const job = await queue.getJob(record.id)
if (hasLease) {
if (!job) {
await releaseWorkspaceLease(record.lease.workspaceId, record.lease.leaseId)
await restoreWorkspaceDispatchJob(resetToWaiting(record))
return true
}
return false
}
if (job) {
if (record.status === 'admitting') {
await refreshWorkspaceLease(record.lease.workspaceId, record.lease.leaseId, LEASE_TTL_MS)
await markDispatchJobAdmitted(
record.id,
record.lease.workspaceId,
record.lease.leaseId,
(record.metadata as { dispatchLeaseExpiresAt?: number }).dispatchLeaseExpiresAt ??
Date.now()
)
await removeWorkspaceJobFromLane(record.workspaceId, record.lane, record.id).catch(
() => undefined
)
return true
}
await refreshWorkspaceLease(record.lease.workspaceId, record.lease.leaseId, LEASE_TTL_MS)
if (record.status === 'admitted') {
await markDispatchJobRunning(record.id)
return true
}
return false
}
await restoreWorkspaceDispatchJob(resetToWaiting(record))
return true
}
async function reconcileTerminalDispatchLease(
record: WorkspaceDispatchJobRecord
): Promise<boolean> {
if ((record.status !== 'completed' && record.status !== 'failed') || !record.lease) {
return false
}
const hasLease = await hasWorkspaceLease(record.lease.workspaceId, record.lease.leaseId)
if (!hasLease) {
return false
}
await releaseWorkspaceLease(record.lease.workspaceId, record.lease.leaseId)
return true
}
async function reconcileWaitingWorkspaceTracking(
waitingJobs: WorkspaceDispatchJobRecord[]
): Promise<boolean> {
let changed = false
const earliestByWorkspace = new Map<string, number>()
for (const record of waitingJobs) {
const readyAt = record.createdAt + (record.delayMs ?? 0)
const current = earliestByWorkspace.get(record.workspaceId)
if (current === undefined || readyAt < current) {
earliestByWorkspace.set(record.workspaceId, readyAt)
}
}
for (const [workspaceId, nextReadyAt] of earliestByWorkspace.entries()) {
const active = await hasActiveWorkspace(workspaceId)
if (!active) {
await ensureWorkspaceActive(workspaceId, nextReadyAt)
changed = true
}
}
return changed
}
export async function reconcileWorkspaceDispatchState(): Promise<void> {
const allJobs = await listDispatchJobsByStatuses([
'waiting',
'admitting',
'admitted',
'running',
'completed',
'failed',
])
const activeJobs: WorkspaceDispatchJobRecord[] = []
const waitingJobs: WorkspaceDispatchJobRecord[] = []
const terminalJobs: WorkspaceDispatchJobRecord[] = []
let nonTerminalCount = 0
for (const job of allJobs) {
switch (job.status) {
case 'admitting':
case 'admitted':
case 'running':
activeJobs.push(job)
nonTerminalCount++
break
case 'waiting':
waitingJobs.push(job)
nonTerminalCount++
break
case 'completed':
case 'failed':
terminalJobs.push(job)
break
}
}
let changed = false
for (const record of activeJobs) {
const terminal = await reconcileTerminalBullMQState(record)
if (terminal) {
changed = true
continue
}
const restored = await reconcileStrandedDispatchJob(record)
if (restored) {
changed = true
}
}
if (await reconcileWaitingWorkspaceTracking(waitingJobs)) {
changed = true
}
for (const record of terminalJobs) {
if (await reconcileTerminalDispatchLease(record)) {
changed = true
}
}
await reconcileGlobalQueueDepth(nonTerminalCount).catch((error) => {
logger.error('Failed to reconcile global queue depth', { error })
})
if (changed) {
logger.info('Workspace dispatch reconciliation updated state', {
activeJobsInspected: activeJobs.length,
waitingJobsInspected: waitingJobs.length,
terminalJobsInspected: terminalJobs.length,
})
await wakeWorkspaceDispatcher()
}
}

View File

@@ -1,577 +0,0 @@
import { createLogger } from '@sim/logger'
import type Redis from 'ioredis'
import type { WorkspaceDispatchStorageAdapter } from '@/lib/core/workspace-dispatch/adapter'
import {
WORKSPACE_DISPATCH_CLAIM_RESULTS,
type WorkspaceDispatchClaimResult,
type WorkspaceDispatchEnqueueInput,
type WorkspaceDispatchJobRecord,
type WorkspaceDispatchLane,
} from '@/lib/core/workspace-dispatch/types'
const logger = createLogger('WorkspaceDispatchRedisStore')
const DISPATCH_PREFIX = 'workspace-dispatch:v1'
const JOB_TTL_SECONDS = 48 * 60 * 60
const SEQUENCE_KEY = `${DISPATCH_PREFIX}:sequence`
const ACTIVE_WORKSPACES_KEY = `${DISPATCH_PREFIX}:workspaces`
const GLOBAL_DEPTH_KEY = `${DISPATCH_PREFIX}:global-depth`
const CLAIM_JOB_SCRIPT = `
local workspaceId = ARGV[1]
local now = tonumber(ARGV[2])
local concurrencyLimit = tonumber(ARGV[3])
local leaseId = ARGV[4]
local leaseExpiresAt = tonumber(ARGV[5])
local lanes = cjson.decode(ARGV[6])
local sequenceKey = ARGV[7]
local activeWorkspacesKey = ARGV[8]
local jobPrefix = ARGV[9]
local workspacePrefix = ARGV[10]
local jobTtlSeconds = tonumber(ARGV[11])
local function laneKey(lane)
return workspacePrefix .. workspaceId .. ':lane:' .. lane
end
local function leaseKey()
return workspacePrefix .. workspaceId .. ':leases'
end
local function workspaceHasPending()
local minReadyAt = nil
local hasPending = false
for _, lane in ipairs(lanes) do
local ids = redis.call('ZRANGE', laneKey(lane), 0, 0)
if #ids > 0 then
local raw = redis.call('GET', jobPrefix .. ids[1])
if raw then
hasPending = true
local record = cjson.decode(raw)
local readyAt = (record.createdAt or 0) + (record.delayMs or 0)
if (minReadyAt == nil) or (readyAt < minReadyAt) then
minReadyAt = readyAt
end
else
redis.call('ZREM', laneKey(lane), ids[1])
end
end
end
return hasPending, minReadyAt
end
redis.call('ZREMRANGEBYSCORE', leaseKey(), 0, now)
local activeLeaseCount = redis.call('ZCARD', leaseKey())
if activeLeaseCount >= concurrencyLimit then
return cjson.encode({ type = 'limit_reached' })
end
local selectedId = nil
local selectedLane = nil
local selectedRecord = nil
local delayedNextReadyAt = nil
local maxScanPerLane = 20
for _, lane in ipairs(lanes) do
local ids = redis.call('ZRANGE', laneKey(lane), 0, maxScanPerLane - 1)
for _, candidateId in ipairs(ids) do
local raw = redis.call('GET', jobPrefix .. candidateId)
if raw then
local record = cjson.decode(raw)
local readyAt = (record.createdAt or 0) + (record.delayMs or 0)
if readyAt <= now then
selectedId = candidateId
selectedLane = lane
selectedRecord = record
break
end
if (delayedNextReadyAt == nil) or (readyAt < delayedNextReadyAt) then
delayedNextReadyAt = readyAt
end
else
redis.call('ZREM', laneKey(lane), candidateId)
end
end
if selectedRecord then
break
end
end
if selectedRecord == nil then
local hasPending, minReadyAt = workspaceHasPending()
if not hasPending then
return cjson.encode({ type = 'empty' })
end
local sequence = redis.call('INCR', sequenceKey)
local score = sequence
if minReadyAt ~= nil and minReadyAt > now then
score = minReadyAt * 1000000 + sequence
end
redis.call('ZADD', activeWorkspacesKey, score, workspaceId)
return cjson.encode({
type = 'delayed',
nextReadyAt = delayedNextReadyAt or minReadyAt or now
})
end
redis.call('ZADD', leaseKey(), leaseExpiresAt, leaseId)
selectedRecord.status = 'admitting'
selectedRecord.lease = {
workspaceId = workspaceId,
leaseId = leaseId
}
if selectedRecord.metadata == nil then
selectedRecord.metadata = {}
end
selectedRecord.metadata.dispatchLeaseExpiresAt = leaseExpiresAt
redis.call('SET', jobPrefix .. selectedId, cjson.encode(selectedRecord), 'EX', jobTtlSeconds)
redis.call('ZREM', laneKey(selectedLane), selectedId)
local hasPending, minReadyAt = workspaceHasPending()
if hasPending then
local sequence = redis.call('INCR', sequenceKey)
local score = sequence
if minReadyAt ~= nil and minReadyAt > now then
score = minReadyAt * 1000000 + sequence
end
redis.call('ZADD', activeWorkspacesKey, score, workspaceId)
end
return cjson.encode({
type = 'admitted',
record = selectedRecord,
leaseId = leaseId,
leaseExpiresAt = leaseExpiresAt
})
`
function jobKey(jobId: string): string {
return `${DISPATCH_PREFIX}:job:${jobId}`
}
function workspaceLaneKey(workspaceId: string, lane: WorkspaceDispatchLane): string {
return `${DISPATCH_PREFIX}:workspace:${workspaceId}:lane:${lane}`
}
function workspaceLeaseKey(workspaceId: string): string {
return `${DISPATCH_PREFIX}:workspace:${workspaceId}:leases`
}
function createPriorityScore(priority: number, sequence: number): number {
return priority * 1_000_000_000_000 + sequence
}
export class RedisWorkspaceDispatchStorage implements WorkspaceDispatchStorageAdapter {
constructor(private redis: Redis) {}
private async nextSequence(): Promise<number> {
return this.redis.incr(SEQUENCE_KEY)
}
async saveDispatchJob(record: WorkspaceDispatchJobRecord): Promise<void> {
await this.redis.set(jobKey(record.id), JSON.stringify(record), 'EX', JOB_TTL_SECONDS)
}
async getDispatchJobRecord(jobId: string): Promise<WorkspaceDispatchJobRecord | null> {
const raw = await this.redis.get(jobKey(jobId))
if (!raw) {
return null
}
try {
return JSON.parse(raw) as WorkspaceDispatchJobRecord
} catch (error) {
logger.warn('Corrupted dispatch job record, deleting', { jobId, error })
await this.redis.del(jobKey(jobId))
return null
}
}
async listDispatchJobsByStatuses(
statuses: readonly WorkspaceDispatchJobRecord['status'][]
): Promise<WorkspaceDispatchJobRecord[]> {
let cursor = '0'
const jobs: WorkspaceDispatchJobRecord[] = []
do {
const [nextCursor, keys] = await this.redis.scan(
cursor,
'MATCH',
`${DISPATCH_PREFIX}:job:*`,
'COUNT',
100
)
cursor = nextCursor
if (keys.length === 0) {
continue
}
const values = await this.redis.mget(...keys)
for (const value of values) {
if (!value) {
continue
}
try {
const record = JSON.parse(value) as WorkspaceDispatchJobRecord
if (statuses.includes(record.status)) {
jobs.push(record)
}
} catch {
// Best effort during reconciliation scans.
}
}
} while (cursor !== '0')
return jobs
}
private static readonly TERMINAL_STATUSES = new Set(['completed', 'failed'])
async updateDispatchJobRecord(
jobId: string,
updater: (record: WorkspaceDispatchJobRecord) => WorkspaceDispatchJobRecord
): Promise<WorkspaceDispatchJobRecord | null> {
const current = await this.getDispatchJobRecord(jobId)
if (!current) {
return null
}
const updated = updater(current)
if (
RedisWorkspaceDispatchStorage.TERMINAL_STATUSES.has(current.status) &&
!RedisWorkspaceDispatchStorage.TERMINAL_STATUSES.has(updated.status)
) {
return current
}
await this.saveDispatchJob(updated)
return updated
}
async enqueueWorkspaceDispatchJob(
input: WorkspaceDispatchEnqueueInput
): Promise<WorkspaceDispatchJobRecord> {
const id = input.id ?? `dispatch_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`
const createdAt = Date.now()
const sequence = await this.nextSequence()
const record: WorkspaceDispatchJobRecord = {
id,
workspaceId: input.workspaceId,
lane: input.lane,
queueName: input.queueName,
bullmqJobName: input.bullmqJobName,
bullmqPayload: input.bullmqPayload,
metadata: input.metadata,
priority: input.priority ?? 100,
maxAttempts: input.maxAttempts,
delayMs: input.delayMs,
status: 'waiting',
createdAt,
}
const score = createPriorityScore(record.priority, sequence)
const pipeline = this.redis.pipeline()
pipeline.set(jobKey(id), JSON.stringify(record), 'EX', JOB_TTL_SECONDS)
pipeline.zadd(workspaceLaneKey(record.workspaceId, record.lane), score, id)
pipeline.zadd(ACTIVE_WORKSPACES_KEY, 'NX', sequence, record.workspaceId)
pipeline.incr(GLOBAL_DEPTH_KEY)
await pipeline.exec()
return record
}
async restoreWorkspaceDispatchJob(record: WorkspaceDispatchJobRecord): Promise<void> {
const sequence = await this.nextSequence()
const score = createPriorityScore(record.priority, sequence)
const pipeline = this.redis.pipeline()
pipeline.set(jobKey(record.id), JSON.stringify(record), 'EX', JOB_TTL_SECONDS)
pipeline.zadd(workspaceLaneKey(record.workspaceId, record.lane), score, record.id)
pipeline.zadd(ACTIVE_WORKSPACES_KEY, 'NX', sequence, record.workspaceId)
await pipeline.exec()
}
async claimWorkspaceJob(
workspaceId: string,
options: {
lanes: readonly WorkspaceDispatchLane[]
concurrencyLimit: number
leaseId: string
now: number
leaseTtlMs: number
}
): Promise<WorkspaceDispatchClaimResult> {
const raw = await this.redis.eval(
CLAIM_JOB_SCRIPT,
0,
workspaceId,
String(options.now),
String(options.concurrencyLimit),
options.leaseId,
String(options.now + options.leaseTtlMs),
JSON.stringify(options.lanes),
SEQUENCE_KEY,
ACTIVE_WORKSPACES_KEY,
`${DISPATCH_PREFIX}:job:`,
`${DISPATCH_PREFIX}:workspace:`,
String(JOB_TTL_SECONDS)
)
const parsed = JSON.parse(String(raw)) as WorkspaceDispatchClaimResult
switch (parsed.type) {
case WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED:
case WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED:
case WORKSPACE_DISPATCH_CLAIM_RESULTS.LIMIT_REACHED:
case WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY:
return parsed
default:
throw new Error(
`Unknown dispatch claim result: ${String((parsed as { type?: string }).type)}`
)
}
}
async getWorkspaceQueueDepth(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<number> {
if (lanes.length === 0) return 0
const pipeline = this.redis.pipeline()
for (const lane of lanes) {
pipeline.zcard(workspaceLaneKey(workspaceId, lane))
}
const results = await pipeline.exec()
let depth = 0
for (const result of results ?? []) {
if (result && !result[0]) {
depth += (result[1] as number) ?? 0
}
}
return depth
}
async getGlobalQueueDepth(): Promise<number> {
const count = await this.redis.get(GLOBAL_DEPTH_KEY)
return count ? Math.max(0, Number.parseInt(count, 10)) : 0
}
async reconcileGlobalQueueDepth(knownCount: number): Promise<void> {
await this.redis.set(GLOBAL_DEPTH_KEY, knownCount)
}
async popNextWorkspaceId(): Promise<string | null> {
const result = await this.redis.zpopmin(ACTIVE_WORKSPACES_KEY)
if (!result || result.length === 0) {
return null
}
return result[0] ?? null
}
async getQueuedWorkspaceCount(): Promise<number> {
return this.redis.zcard(ACTIVE_WORKSPACES_KEY)
}
async hasActiveWorkspace(workspaceId: string): Promise<boolean> {
return (await this.redis.zscore(ACTIVE_WORKSPACES_KEY, workspaceId)) !== null
}
async ensureWorkspaceActive(workspaceId: string, readyAt?: number): Promise<void> {
const sequence = await this.nextSequence()
const score = readyAt && readyAt > Date.now() ? readyAt * 1_000_000 + sequence : sequence
await this.redis.zadd(ACTIVE_WORKSPACES_KEY, 'NX', score, workspaceId)
}
async requeueWorkspaceId(workspaceId: string): Promise<void> {
const sequence = await this.nextSequence()
await this.redis.zadd(ACTIVE_WORKSPACES_KEY, sequence, workspaceId)
}
async workspaceHasPendingJobs(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<boolean> {
for (const lane of lanes) {
const count = await this.redis.zcard(workspaceLaneKey(workspaceId, lane))
if (count > 0) {
return true
}
}
return false
}
async getNextWorkspaceJob(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<WorkspaceDispatchJobRecord | null> {
for (const lane of lanes) {
const ids = await this.redis.zrange(workspaceLaneKey(workspaceId, lane), 0, 0)
if (ids.length === 0) {
continue
}
const record = await this.getDispatchJobRecord(ids[0])
if (!record) {
await this.redis.zrem(workspaceLaneKey(workspaceId, lane), ids[0])
continue
}
return record
}
return null
}
async removeWorkspaceJobFromLane(
workspaceId: string,
lane: WorkspaceDispatchLane,
jobId: string
): Promise<void> {
await this.redis.zrem(workspaceLaneKey(workspaceId, lane), jobId)
}
async cleanupExpiredWorkspaceLeases(workspaceId: string): Promise<void> {
await this.redis.zremrangebyscore(workspaceLeaseKey(workspaceId), 0, Date.now())
}
async countActiveWorkspaceLeases(workspaceId: string): Promise<number> {
await this.cleanupExpiredWorkspaceLeases(workspaceId)
return this.redis.zcard(workspaceLeaseKey(workspaceId))
}
async hasWorkspaceLease(workspaceId: string, leaseId: string): Promise<boolean> {
await this.cleanupExpiredWorkspaceLeases(workspaceId)
return (await this.redis.zscore(workspaceLeaseKey(workspaceId), leaseId)) !== null
}
async createWorkspaceLease(workspaceId: string, leaseId: string, ttlMs: number): Promise<number> {
const expiresAt = Date.now() + ttlMs
await this.redis.zadd(workspaceLeaseKey(workspaceId), expiresAt, leaseId)
return expiresAt
}
async refreshWorkspaceLease(
workspaceId: string,
leaseId: string,
ttlMs: number
): Promise<number> {
return this.createWorkspaceLease(workspaceId, leaseId, ttlMs)
}
async releaseWorkspaceLease(workspaceId: string, leaseId: string): Promise<void> {
await this.redis.zrem(workspaceLeaseKey(workspaceId), leaseId)
}
async removeWorkspaceIfIdle(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<void> {
const hasPendingJobs = await this.workspaceHasPendingJobs(workspaceId, lanes)
if (!hasPendingJobs) {
await this.redis.zrem(ACTIVE_WORKSPACES_KEY, workspaceId)
}
}
async markDispatchJobAdmitted(
jobId: string,
workspaceId: string,
leaseId: string,
leaseExpiresAt: number
): Promise<void> {
await this.updateDispatchJobRecord(jobId, (record) => ({
...record,
status: 'admitted',
admittedAt: Date.now(),
lease: {
workspaceId,
leaseId,
},
metadata: {
...record.metadata,
dispatchLeaseExpiresAt: leaseExpiresAt,
},
}))
}
async markDispatchJobAdmitting(
jobId: string,
workspaceId: string,
leaseId: string,
leaseExpiresAt: number
): Promise<void> {
await this.updateDispatchJobRecord(jobId, (record) => ({
...record,
status: 'admitting',
lease: {
workspaceId,
leaseId,
},
metadata: {
...record.metadata,
dispatchLeaseExpiresAt: leaseExpiresAt,
},
}))
}
async markDispatchJobRunning(jobId: string): Promise<void> {
await this.updateDispatchJobRecord(jobId, (record) => ({
...record,
status: 'running',
startedAt: record.startedAt ?? Date.now(),
}))
}
async markDispatchJobCompleted(jobId: string, output: unknown): Promise<void> {
await this.updateDispatchJobRecord(jobId, (record) => ({
...record,
status: 'completed',
completedAt: Date.now(),
output,
}))
await this.redis.decr(GLOBAL_DEPTH_KEY).catch(() => undefined)
}
async markDispatchJobFailed(jobId: string, error: string): Promise<void> {
await this.updateDispatchJobRecord(jobId, (record) => ({
...record,
status: 'failed',
completedAt: Date.now(),
error,
}))
await this.redis.decr(GLOBAL_DEPTH_KEY).catch(() => undefined)
}
async clear(): Promise<void> {
let cursor = '0'
const keys: string[] = []
do {
const [nextCursor, foundKeys] = await this.redis.scan(
cursor,
'MATCH',
`${DISPATCH_PREFIX}:*`,
'COUNT',
100
)
cursor = nextCursor
keys.push(...foundKeys)
} while (cursor !== '0')
if (keys.length > 0) {
await this.redis.del(...keys)
}
}
dispose(): void {
logger.info('Redis workspace dispatch storage disposed')
}
}

View File

@@ -1,102 +0,0 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import { presentDispatchOrJobStatus } from '@/lib/core/workspace-dispatch/status'
describe('workspace dispatch status presentation', () => {
it('presents waiting dispatch jobs with queue metadata', () => {
const result = presentDispatchOrJobStatus(
{
id: 'dispatch-1',
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: {},
metadata: { workflowId: 'workflow-1' },
priority: 10,
status: 'waiting',
createdAt: 1000,
},
null
)
expect(result).toEqual({
status: 'waiting',
metadata: {
createdAt: new Date(1000),
admittedAt: undefined,
startedAt: undefined,
completedAt: undefined,
queueName: 'workflow-execution',
lane: 'runtime',
workspaceId: 'workspace-1',
},
estimatedDuration: 300000,
})
})
it('presents admitting dispatch jobs distinctly', () => {
const result = presentDispatchOrJobStatus(
{
id: 'dispatch-1a',
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: {},
metadata: { workflowId: 'workflow-1' },
priority: 10,
status: 'admitting',
createdAt: 1000,
},
null
)
expect(result.status).toBe('admitting')
expect(result.estimatedDuration).toBe(300000)
})
it('presents completed dispatch jobs with output and duration', () => {
const result = presentDispatchOrJobStatus(
{
id: 'dispatch-2',
workspaceId: 'workspace-1',
lane: 'interactive',
queueName: 'workflow-execution',
bullmqJobName: 'direct-workflow-execution',
bullmqPayload: {},
metadata: { workflowId: 'workflow-1' },
priority: 1,
status: 'completed',
createdAt: 1000,
admittedAt: 1500,
startedAt: 2000,
completedAt: 7000,
output: { success: true },
},
null
)
expect(result.status).toBe('completed')
expect(result.output).toEqual({ success: true })
expect(result.metadata.duration).toBe(5000)
})
it('falls back to legacy job status when no dispatch record exists', () => {
const result = presentDispatchOrJobStatus(null, {
id: 'job-1',
type: 'workflow-execution',
payload: {},
status: 'pending',
createdAt: new Date(1000),
attempts: 0,
maxAttempts: 3,
metadata: {},
})
expect(result.status).toBe('queued')
expect(result.estimatedDuration).toBe(300000)
})
})

View File

@@ -1,110 +0,0 @@
import type { Job, JobStatus } from '@/lib/core/async-jobs/types'
import type { WorkspaceDispatchJobRecord } from '@/lib/core/workspace-dispatch/types'
export type DispatchPresentedStatus =
| 'waiting'
| 'admitting'
| 'admitted'
| 'running'
| 'completed'
| 'failed'
| 'queued'
| JobStatus
export interface DispatchStatusPresentation {
status: DispatchPresentedStatus
metadata: {
createdAt?: Date
admittedAt?: Date
startedAt?: Date
completedAt?: Date
queueName?: string
lane?: string
workspaceId?: string
duration?: number
}
output?: unknown
error?: string
estimatedDuration?: number
}
export function presentDispatchOrJobStatus(
dispatchJob: WorkspaceDispatchJobRecord | null,
job: Job | null
): DispatchStatusPresentation {
if (dispatchJob) {
const startedAt = dispatchJob.startedAt ? new Date(dispatchJob.startedAt) : undefined
const completedAt = dispatchJob.completedAt ? new Date(dispatchJob.completedAt) : undefined
const response: DispatchStatusPresentation = {
status: dispatchJob.status,
metadata: {
createdAt: new Date(dispatchJob.createdAt),
admittedAt: dispatchJob.admittedAt ? new Date(dispatchJob.admittedAt) : undefined,
startedAt,
completedAt,
queueName: dispatchJob.queueName,
lane: dispatchJob.lane,
workspaceId: dispatchJob.workspaceId,
},
}
if (startedAt && completedAt) {
response.metadata.duration = completedAt.getTime() - startedAt.getTime()
}
if (dispatchJob.status === 'completed') {
response.output = dispatchJob.output
}
if (dispatchJob.status === 'failed') {
response.error = dispatchJob.error
}
if (
dispatchJob.status === 'waiting' ||
dispatchJob.status === 'admitting' ||
dispatchJob.status === 'admitted' ||
dispatchJob.status === 'running'
) {
response.estimatedDuration = 300000
}
return response
}
if (!job) {
return {
status: 'queued',
metadata: {},
}
}
const mappedStatus = job.status === 'pending' ? 'queued' : job.status
const response: DispatchStatusPresentation = {
status: mappedStatus,
metadata: {
createdAt: job.createdAt,
startedAt: job.startedAt,
completedAt: job.completedAt,
},
}
if (job.startedAt && job.completedAt) {
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
}
if (job.status === 'completed') {
response.output = job.output
}
if (job.status === 'failed') {
response.error = job.error
}
if (job.status === 'processing' || job.status === 'pending') {
response.estimatedDuration = 300000
}
return response
}

View File

@@ -1,193 +0,0 @@
import type { WorkspaceDispatchStorageAdapter } from '@/lib/core/workspace-dispatch/adapter'
import {
setWorkspaceDispatchStorageAdapter as _setAdapter,
createWorkspaceDispatchStorageAdapter,
} from '@/lib/core/workspace-dispatch/factory'
import type {
WorkspaceDispatchClaimResult,
WorkspaceDispatchEnqueueInput,
WorkspaceDispatchJobRecord,
WorkspaceDispatchLane,
} from '@/lib/core/workspace-dispatch/types'
function getAdapter() {
return createWorkspaceDispatchStorageAdapter()
}
export function setWorkspaceDispatchStorageAdapter(adapter: WorkspaceDispatchStorageAdapter): void {
_setAdapter(adapter)
}
export async function saveDispatchJob(record: WorkspaceDispatchJobRecord): Promise<void> {
return getAdapter().saveDispatchJob(record)
}
export async function getDispatchJobRecord(
jobId: string
): Promise<WorkspaceDispatchJobRecord | null> {
return getAdapter().getDispatchJobRecord(jobId)
}
export async function listDispatchJobsByStatuses(
statuses: readonly WorkspaceDispatchJobRecord['status'][]
): Promise<WorkspaceDispatchJobRecord[]> {
return getAdapter().listDispatchJobsByStatuses(statuses)
}
export async function updateDispatchJobRecord(
jobId: string,
updater: (record: WorkspaceDispatchJobRecord) => WorkspaceDispatchJobRecord
): Promise<WorkspaceDispatchJobRecord | null> {
return getAdapter().updateDispatchJobRecord(jobId, updater)
}
export async function enqueueWorkspaceDispatchJob(
input: WorkspaceDispatchEnqueueInput
): Promise<WorkspaceDispatchJobRecord> {
return getAdapter().enqueueWorkspaceDispatchJob(input)
}
export async function restoreWorkspaceDispatchJob(
record: WorkspaceDispatchJobRecord
): Promise<void> {
return getAdapter().restoreWorkspaceDispatchJob(record)
}
export async function claimWorkspaceJob(
workspaceId: string,
options: {
lanes: readonly WorkspaceDispatchLane[]
concurrencyLimit: number
leaseId: string
now: number
leaseTtlMs: number
}
): Promise<WorkspaceDispatchClaimResult> {
return getAdapter().claimWorkspaceJob(workspaceId, options)
}
export async function getWorkspaceQueueDepth(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<number> {
return getAdapter().getWorkspaceQueueDepth(workspaceId, lanes)
}
export async function getGlobalQueueDepth(): Promise<number> {
return getAdapter().getGlobalQueueDepth()
}
export async function reconcileGlobalQueueDepth(knownCount: number): Promise<void> {
return getAdapter().reconcileGlobalQueueDepth(knownCount)
}
export async function popNextWorkspaceId(): Promise<string | null> {
return getAdapter().popNextWorkspaceId()
}
export async function getQueuedWorkspaceCount(): Promise<number> {
return getAdapter().getQueuedWorkspaceCount()
}
export async function hasActiveWorkspace(workspaceId: string): Promise<boolean> {
return getAdapter().hasActiveWorkspace(workspaceId)
}
export async function ensureWorkspaceActive(workspaceId: string, readyAt?: number): Promise<void> {
return getAdapter().ensureWorkspaceActive(workspaceId, readyAt)
}
export async function requeueWorkspaceId(workspaceId: string): Promise<void> {
return getAdapter().requeueWorkspaceId(workspaceId)
}
export async function workspaceHasPendingJobs(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<boolean> {
return getAdapter().workspaceHasPendingJobs(workspaceId, lanes)
}
export async function getNextWorkspaceJob(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<WorkspaceDispatchJobRecord | null> {
return getAdapter().getNextWorkspaceJob(workspaceId, lanes)
}
export async function removeWorkspaceJobFromLane(
workspaceId: string,
lane: WorkspaceDispatchLane,
jobId: string
): Promise<void> {
return getAdapter().removeWorkspaceJobFromLane(workspaceId, lane, jobId)
}
export async function cleanupExpiredWorkspaceLeases(workspaceId: string): Promise<void> {
return getAdapter().cleanupExpiredWorkspaceLeases(workspaceId)
}
export async function countActiveWorkspaceLeases(workspaceId: string): Promise<number> {
return getAdapter().countActiveWorkspaceLeases(workspaceId)
}
export async function hasWorkspaceLease(workspaceId: string, leaseId: string): Promise<boolean> {
return getAdapter().hasWorkspaceLease(workspaceId, leaseId)
}
export async function createWorkspaceLease(
workspaceId: string,
leaseId: string,
ttlMs: number
): Promise<number> {
return getAdapter().createWorkspaceLease(workspaceId, leaseId, ttlMs)
}
export async function refreshWorkspaceLease(
workspaceId: string,
leaseId: string,
ttlMs: number
): Promise<number> {
return getAdapter().refreshWorkspaceLease(workspaceId, leaseId, ttlMs)
}
export async function releaseWorkspaceLease(workspaceId: string, leaseId: string): Promise<void> {
return getAdapter().releaseWorkspaceLease(workspaceId, leaseId)
}
export async function removeWorkspaceIfIdle(
workspaceId: string,
lanes: readonly WorkspaceDispatchLane[]
): Promise<void> {
return getAdapter().removeWorkspaceIfIdle(workspaceId, lanes)
}
export async function markDispatchJobAdmitted(
jobId: string,
workspaceId: string,
leaseId: string,
leaseExpiresAt: number
): Promise<void> {
return getAdapter().markDispatchJobAdmitted(jobId, workspaceId, leaseId, leaseExpiresAt)
}
export async function markDispatchJobAdmitting(
jobId: string,
workspaceId: string,
leaseId: string,
leaseExpiresAt: number
): Promise<void> {
return getAdapter().markDispatchJobAdmitting(jobId, workspaceId, leaseId, leaseExpiresAt)
}
export async function markDispatchJobRunning(jobId: string): Promise<void> {
return getAdapter().markDispatchJobRunning(jobId)
}
export async function markDispatchJobCompleted(jobId: string, output: unknown): Promise<void> {
return getAdapter().markDispatchJobCompleted(jobId, output)
}
export async function markDispatchJobFailed(jobId: string, error: string): Promise<void> {
return getAdapter().markDispatchJobFailed(jobId, error)
}

View File

@@ -1,107 +0,0 @@
import type { JobMetadata, JobType } from '@/lib/core/async-jobs/types'
import type {
KNOWLEDGE_CONNECTOR_SYNC_QUEUE,
KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE,
MOTHERSHIP_JOB_EXECUTION_QUEUE,
WORKSPACE_NOTIFICATION_DELIVERY_QUEUE,
} from '@/lib/core/bullmq/queues'
export const WORKSPACE_DISPATCH_LANES = [
'interactive',
'runtime',
'knowledge',
'lightweight',
] as const
export type WorkspaceDispatchLane = (typeof WORKSPACE_DISPATCH_LANES)[number]
export type WorkspaceDispatchQueueName =
| JobType
| typeof KNOWLEDGE_CONNECTOR_SYNC_QUEUE
| typeof KNOWLEDGE_DOCUMENT_PROCESSING_QUEUE
| typeof MOTHERSHIP_JOB_EXECUTION_QUEUE
| typeof WORKSPACE_NOTIFICATION_DELIVERY_QUEUE
export const WORKSPACE_DISPATCH_STATUSES = {
WAITING: 'waiting',
ADMITTING: 'admitting',
ADMITTED: 'admitted',
RUNNING: 'running',
COMPLETED: 'completed',
FAILED: 'failed',
} as const
export type WorkspaceDispatchStatus =
(typeof WORKSPACE_DISPATCH_STATUSES)[keyof typeof WORKSPACE_DISPATCH_STATUSES]
export interface WorkspaceDispatchLeaseInfo {
workspaceId: string
leaseId: string
}
export interface WorkspaceDispatchJobContext {
dispatchJobId: string
workspaceId: string
lane: WorkspaceDispatchLane
queueName: WorkspaceDispatchQueueName
bullmqJobName: string
priority: number
}
export interface WorkspaceDispatchJobRecord {
id: string
workspaceId: string
lane: WorkspaceDispatchLane
queueName: WorkspaceDispatchQueueName
bullmqJobName: string
bullmqPayload: unknown
metadata: JobMetadata
priority: number
maxAttempts?: number
delayMs?: number
status: WorkspaceDispatchStatus
createdAt: number
admittedAt?: number
startedAt?: number
completedAt?: number
output?: unknown
error?: string
lease?: WorkspaceDispatchLeaseInfo
}
export interface WorkspaceDispatchEnqueueInput {
id?: string
workspaceId: string
lane: WorkspaceDispatchLane
queueName: WorkspaceDispatchQueueName
bullmqJobName: string
bullmqPayload: unknown
metadata: JobMetadata
priority?: number
maxAttempts?: number
delayMs?: number
}
export const WORKSPACE_DISPATCH_CLAIM_RESULTS = {
ADMITTED: 'admitted',
LIMIT_REACHED: 'limit_reached',
DELAYED: 'delayed',
EMPTY: 'empty',
} as const
export type WorkspaceDispatchClaimResult =
| {
type: typeof WORKSPACE_DISPATCH_CLAIM_RESULTS.ADMITTED
record: WorkspaceDispatchJobRecord
leaseId: string
leaseExpiresAt: number
}
| {
type:
| typeof WORKSPACE_DISPATCH_CLAIM_RESULTS.LIMIT_REACHED
| typeof WORKSPACE_DISPATCH_CLAIM_RESULTS.EMPTY
}
| {
type: typeof WORKSPACE_DISPATCH_CLAIM_RESULTS.DELAYED
nextReadyAt: number
}

View File

@@ -1,98 +0,0 @@
/**
* @vitest-environment node
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
const {
mockMarkDispatchJobCompleted,
mockMarkDispatchJobFailed,
mockMarkDispatchJobRunning,
mockReleaseWorkspaceLease,
mockWakeWorkspaceDispatcher,
} = vi.hoisted(() => ({
mockMarkDispatchJobCompleted: vi.fn(),
mockMarkDispatchJobFailed: vi.fn(),
mockMarkDispatchJobRunning: vi.fn(),
mockReleaseWorkspaceLease: vi.fn(),
mockWakeWorkspaceDispatcher: vi.fn(),
}))
vi.mock('@sim/logger', () => ({
createLogger: () => ({
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
}),
}))
vi.mock('@/lib/core/workspace-dispatch', () => ({
markDispatchJobCompleted: mockMarkDispatchJobCompleted,
markDispatchJobFailed: mockMarkDispatchJobFailed,
markDispatchJobRunning: mockMarkDispatchJobRunning,
releaseWorkspaceLease: mockReleaseWorkspaceLease,
wakeWorkspaceDispatcher: mockWakeWorkspaceDispatcher,
}))
import { getDispatchRuntimeMetadata, runDispatchedJob } from '@/lib/core/workspace-dispatch/worker'
describe('workspace dispatch worker lifecycle', () => {
beforeEach(() => {
vi.clearAllMocks()
})
it('returns null for missing metadata', () => {
expect(getDispatchRuntimeMetadata(undefined)).toBeNull()
})
it('extracts dispatch runtime metadata when all fields are present', () => {
expect(
getDispatchRuntimeMetadata({
dispatchJobId: 'dispatch-1',
dispatchWorkspaceId: 'workspace-1',
dispatchLeaseId: 'lease-1',
})
).toEqual({
dispatchJobId: 'dispatch-1',
dispatchWorkspaceId: 'workspace-1',
dispatchLeaseId: 'lease-1',
})
})
it('marks running, completed, releases lease, and wakes dispatcher on success', async () => {
const result = await runDispatchedJob(
{
dispatchJobId: 'dispatch-1',
dispatchWorkspaceId: 'workspace-1',
dispatchLeaseId: 'lease-1',
},
async () => ({ success: true })
)
expect(result).toEqual({ success: true })
expect(mockMarkDispatchJobRunning).toHaveBeenCalledWith('dispatch-1')
expect(mockMarkDispatchJobCompleted).toHaveBeenCalledWith('dispatch-1', { success: true })
expect(mockReleaseWorkspaceLease).toHaveBeenCalledWith('workspace-1', 'lease-1')
expect(mockWakeWorkspaceDispatcher).toHaveBeenCalled()
})
it('marks failed and still releases lease on error', async () => {
await expect(
runDispatchedJob(
{
dispatchJobId: 'dispatch-2',
dispatchWorkspaceId: 'workspace-2',
dispatchLeaseId: 'lease-2',
},
async () => {
throw new Error('boom')
}
)
).rejects.toThrow('boom')
expect(mockMarkDispatchJobRunning).toHaveBeenCalledWith('dispatch-2')
expect(mockMarkDispatchJobFailed).toHaveBeenCalledWith('dispatch-2', 'boom')
expect(mockReleaseWorkspaceLease).toHaveBeenCalledWith('workspace-2', 'lease-2')
expect(mockWakeWorkspaceDispatcher).toHaveBeenCalled()
})
})

View File

@@ -1,104 +0,0 @@
import { createLogger } from '@sim/logger'
import {
markDispatchJobCompleted,
markDispatchJobFailed,
markDispatchJobRunning,
refreshWorkspaceLease,
releaseWorkspaceLease,
wakeWorkspaceDispatcher,
} from '@/lib/core/workspace-dispatch'
const logger = createLogger('WorkspaceDispatchWorker')
interface DispatchRuntimeMetadata {
dispatchJobId: string
dispatchWorkspaceId: string
dispatchLeaseId: string
}
interface RunDispatchedJobOptions {
isFinalAttempt?: boolean
leaseTtlMs?: number
}
const DEFAULT_LEASE_TTL_MS = 15 * 60 * 1000
const LEASE_HEARTBEAT_INTERVAL_MS = 60_000
export function getDispatchRuntimeMetadata(metadata: unknown): DispatchRuntimeMetadata | null {
if (!metadata || typeof metadata !== 'object') {
return null
}
const value = metadata as Partial<DispatchRuntimeMetadata>
if (!value.dispatchJobId || !value.dispatchWorkspaceId || !value.dispatchLeaseId) {
return null
}
return {
dispatchJobId: value.dispatchJobId,
dispatchWorkspaceId: value.dispatchWorkspaceId,
dispatchLeaseId: value.dispatchLeaseId,
}
}
export async function runDispatchedJob<T>(
metadata: unknown,
run: () => Promise<T>,
options: RunDispatchedJobOptions = {}
): Promise<T> {
const dispatchMetadata = getDispatchRuntimeMetadata(metadata)
if (!dispatchMetadata) {
return run()
}
const leaseTtlMs = options.leaseTtlMs ?? DEFAULT_LEASE_TTL_MS
const isFinalAttempt = options.isFinalAttempt ?? true
await markDispatchJobRunning(dispatchMetadata.dispatchJobId)
let heartbeatTimer: NodeJS.Timeout | null = setInterval(() => {
void refreshWorkspaceLease(
dispatchMetadata.dispatchWorkspaceId,
dispatchMetadata.dispatchLeaseId,
leaseTtlMs
).catch((error) => {
logger.error('Failed to refresh dispatch lease', { error, dispatchMetadata })
})
}, LEASE_HEARTBEAT_INTERVAL_MS)
heartbeatTimer.unref()
let succeeded = false
try {
const result = await run()
succeeded = true
await markDispatchJobCompleted(dispatchMetadata.dispatchJobId, result)
return result
} catch (error) {
if (isFinalAttempt && !succeeded) {
await markDispatchJobFailed(
dispatchMetadata.dispatchJobId,
error instanceof Error ? error.message : String(error)
)
}
throw error
} finally {
if (heartbeatTimer) {
clearInterval(heartbeatTimer)
heartbeatTimer = null
}
const shouldReleaseLease = succeeded || isFinalAttempt
if (shouldReleaseLease) {
try {
await releaseWorkspaceLease(
dispatchMetadata.dispatchWorkspaceId,
dispatchMetadata.dispatchLeaseId
)
await wakeWorkspaceDispatcher()
} catch (error) {
logger.error('Failed to release dispatch lease', { error, dispatchMetadata })
}
}
}
}

View File

@@ -1,111 +0,0 @@
import { createLogger } from '@sim/logger'
import {
type ExecutionStreamStatus,
getExecutionMeta,
readExecutionEvents,
} from '@/lib/execution/event-buffer'
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
const logger = createLogger('BufferedExecutionStream')
const POLL_INTERVAL_MS = 500
const MAX_POLL_DURATION_MS = 10 * 60 * 1000
function isTerminalStatus(status: ExecutionStreamStatus): boolean {
return status === 'complete' || status === 'error' || status === 'cancelled'
}
export function createBufferedExecutionStream(
executionId: string,
initialEventId = 0
): ReadableStream<Uint8Array> {
const encoder = new TextEncoder()
let closed = false
return new ReadableStream<Uint8Array>({
async start(controller) {
let lastEventId = initialEventId
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS
const enqueue = (text: string) => {
if (closed) {
return
}
try {
controller.enqueue(encoder.encode(text))
} catch {
closed = true
}
}
try {
const initialEvents = await readExecutionEvents(executionId, lastEventId)
for (const entry of initialEvents) {
if (closed) {
return
}
enqueue(formatSSEEvent(entry.event))
lastEventId = entry.eventId
}
while (!closed && Date.now() < pollDeadline) {
const meta = await getExecutionMeta(executionId)
if (meta && isTerminalStatus(meta.status)) {
const finalEvents = await readExecutionEvents(executionId, lastEventId)
for (const entry of finalEvents) {
if (closed) {
return
}
enqueue(formatSSEEvent(entry.event))
lastEventId = entry.eventId
}
enqueue('data: [DONE]\n\n')
controller.close()
return
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
if (closed) {
return
}
const newEvents = await readExecutionEvents(executionId, lastEventId)
for (const entry of newEvents) {
if (closed) {
return
}
enqueue(formatSSEEvent(entry.event))
lastEventId = entry.eventId
}
}
if (!closed) {
logger.warn('Buffered execution stream deadline reached', { executionId })
enqueue('data: [DONE]\n\n')
controller.close()
}
} catch (error) {
logger.error('Buffered execution stream failed', {
executionId,
error: error instanceof Error ? error.message : String(error),
})
if (!closed) {
try {
controller.close()
} catch {}
}
}
},
cancel() {
closed = true
logger.info('Client disconnected from buffered execution stream', { executionId })
},
})
}

View File

@@ -8,9 +8,7 @@ import {
import { createLogger } from '@sim/logger'
import { and, eq, gt, inArray, isNull, lt, ne, or, sql } from 'drizzle-orm'
import { decryptApiKey } from '@/lib/api-key/crypto'
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import type { DocumentData } from '@/lib/knowledge/documents/service'
import {
hardDeleteDocuments,
@@ -142,7 +140,8 @@ export function resolveTagMapping(
}
/**
* Dispatch a connector sync using the configured background execution backend.
* Dispatch a connector sync — uses Trigger.dev when available,
* otherwise falls back to direct executeSync.
*/
export async function dispatchSync(
connectorId: string,
@@ -160,38 +159,6 @@ export async function dispatchSync(
{ tags: [`connector:${connectorId}`] }
)
logger.info(`Dispatched connector sync to Trigger.dev`, { connectorId, requestId })
} else if (isBullMQEnabled()) {
const connectorRows = await db
.select({
workspaceId: knowledgeBase.workspaceId,
userId: knowledgeBase.userId,
})
.from(knowledgeConnector)
.innerJoin(knowledgeBase, eq(knowledgeBase.id, knowledgeConnector.knowledgeBaseId))
.where(eq(knowledgeConnector.id, connectorId))
.limit(1)
const workspaceId = connectorRows[0]?.workspaceId
const userId = connectorRows[0]?.userId
if (!workspaceId || !userId) {
throw new Error(`No workspace found for connector ${connectorId}`)
}
await enqueueWorkspaceDispatch({
workspaceId,
lane: 'knowledge',
queueName: 'knowledge-connector-sync',
bullmqJobName: 'knowledge-connector-sync',
bullmqPayload: createBullMQJobData({
connectorId,
fullSync: options?.fullSync,
requestId,
}),
metadata: {
userId,
},
})
logger.info(`Dispatched connector sync to BullMQ`, { connectorId, requestId })
} else {
executeSync(connectorId, { fullSync: options?.fullSync }).catch((error) => {
logger.error(`Sync failed for connector ${connectorId}`, {

View File

@@ -5,10 +5,9 @@ import { type Chunk, JsonYamlChunker, StructuredDataChunker, TextChunker } from
import { env } from '@/lib/core/config/env'
import { parseBuffer, parseFile } from '@/lib/file-parsers'
import type { FileParseMetadata } from '@/lib/file-parsers/types'
import { resolveParserExtension } from '@/lib/knowledge/documents/parser-extension'
import { retryWithExponentialBackoff } from '@/lib/knowledge/documents/utils'
import { StorageService } from '@/lib/uploads'
import { isInternalFileUrl } from '@/lib/uploads/utils/file-utils'
import { getExtensionFromMimeType, isInternalFileUrl } from '@/lib/uploads/utils/file-utils'
import { downloadFileFromUrl } from '@/lib/uploads/utils/file-utils.server'
import { mistralParserTool } from '@/tools/mistral/parser'
@@ -760,7 +759,10 @@ async function parseDataURI(fileUrl: string, filename: string, mimeType: string)
: decodeURIComponent(base64Data)
}
const extension = resolveParserExtension(filename, mimeType, 'txt')
const extension =
(filename.includes('.') ? filename.split('.').pop()?.toLowerCase() : undefined) ||
getExtensionFromMimeType(mimeType) ||
'txt'
const buffer = Buffer.from(base64Data, 'base64')
const result = await parseBuffer(buffer, extension)
return result.content
@@ -773,7 +775,14 @@ async function parseHttpFile(
): Promise<{ content: string; metadata?: FileParseMetadata }> {
const buffer = await downloadFileWithTimeout(fileUrl)
const extension = resolveParserExtension(filename, mimeType)
let extension = filename.includes('.') ? filename.split('.').pop()?.toLowerCase() : undefined
if (!extension && mimeType) {
extension = getExtensionFromMimeType(mimeType) ?? undefined
}
if (!extension) {
throw new Error(`Could not determine file type for: ${filename}`)
}
const result = await parseBuffer(buffer, extension)
return result
}

View File

@@ -1,27 +0,0 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import { resolveParserExtension } from '@/lib/knowledge/documents/parser-extension'
describe('resolveParserExtension', () => {
it('uses a supported filename extension when present', () => {
expect(resolveParserExtension('report.pdf', 'application/pdf')).toBe('pdf')
})
it('falls back to mime type when filename has no extension', () => {
expect(
resolveParserExtension('[Business] Your Thursday morning trip with Uber', 'text/plain')
).toBe('txt')
})
it('falls back to mime type when filename extension is unsupported', () => {
expect(resolveParserExtension('uber-message.business', 'text/plain')).toBe('txt')
})
it('throws when neither filename nor mime type resolves to a supported parser', () => {
expect(() =>
resolveParserExtension('uber-message.unknown', 'application/octet-stream')
).toThrow('Unsupported file type')
})
})

View File

@@ -1,55 +0,0 @@
import { getExtensionFromMimeType } from '@/lib/uploads/utils/file-utils'
const SUPPORTED_FILE_TYPES = [
'pdf',
'csv',
'docx',
'doc',
'txt',
'md',
'xlsx',
'xls',
'pptx',
'ppt',
'html',
'htm',
'json',
'yaml',
'yml',
] as const
const SUPPORTED_FILE_TYPES_TEXT = SUPPORTED_FILE_TYPES.join(', ')
function isSupportedParserExtension(extension: string): boolean {
return SUPPORTED_FILE_TYPES.includes(extension as (typeof SUPPORTED_FILE_TYPES)[number])
}
export function resolveParserExtension(
filename: string,
mimeType?: string,
fallback?: string
): string {
const raw = filename.includes('.') ? filename.split('.').pop()?.toLowerCase() : undefined
const filenameExtension = raw && /^[a-z0-9]+$/.test(raw) ? raw : undefined
if (filenameExtension && isSupportedParserExtension(filenameExtension)) {
return filenameExtension
}
const mimeExtension = mimeType ? getExtensionFromMimeType(mimeType) : undefined
if (mimeExtension && isSupportedParserExtension(mimeExtension)) {
return mimeExtension
}
if (fallback) {
return fallback
}
if (filenameExtension) {
throw new Error(
`Unsupported file type: ${filenameExtension}. Supported types are: ${SUPPORTED_FILE_TYPES_TEXT}`
)
}
throw new Error(`Could not determine file type for ${filename || 'document'}`)
}

View File

@@ -0,0 +1,227 @@
import { createLogger } from '@sim/logger'
import { getRedisClient } from '@/lib/core/config/redis'
import { getStorageMethod, type StorageMethod } from '@/lib/core/storage'
const logger = createLogger('DocumentQueue')
interface QueueJob<T = unknown> {
id: string
type: string
data: T
timestamp: number
attempts: number
maxAttempts: number
}
interface QueueConfig {
maxConcurrent: number
retryDelay: number
maxRetries: number
}
/**
* Document processing queue that uses either Redis or in-memory storage.
* Storage method is determined once at construction based on configuration.
* No switching on transient errors.
*/
export class DocumentProcessingQueue {
private config: QueueConfig
private storageMethod: StorageMethod
private processing = new Map<string, Promise<void>>()
private inMemoryQueue: QueueJob[] = []
private inMemoryProcessing = 0
private processingStarted = false
constructor(config: QueueConfig) {
this.config = config
this.storageMethod = getStorageMethod()
logger.info(`DocumentProcessingQueue using ${this.storageMethod} storage`)
}
async addJob<T>(type: string, data: T, options: { maxAttempts?: number } = {}): Promise<string> {
const job: QueueJob = {
id: `${type}-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`,
type,
data,
timestamp: Date.now(),
attempts: 0,
maxAttempts: options.maxAttempts || this.config.maxRetries,
}
if (this.storageMethod === 'redis') {
const redis = getRedisClient()
if (!redis) {
throw new Error('Redis configured but client unavailable')
}
await redis.lpush('document-queue', JSON.stringify(job))
logger.info(`Job ${job.id} added to Redis queue`)
} else {
this.inMemoryQueue.push(job)
logger.info(`Job ${job.id} added to in-memory queue`)
}
return job.id
}
async processJobs(processor: (job: QueueJob) => Promise<void>): Promise<void> {
if (this.processingStarted) {
logger.info('Queue processing already started, skipping')
return
}
this.processingStarted = true
logger.info(`Starting queue processing (${this.storageMethod})`)
if (this.storageMethod === 'redis') {
await this.processRedisJobs(processor)
} else {
await this.processInMemoryJobs(processor)
}
}
private async processRedisJobs(processor: (job: QueueJob) => Promise<void>) {
const redis = getRedisClient()
if (!redis) {
throw new Error('Redis configured but client unavailable')
}
const processJobsContinuously = async () => {
while (true) {
if (this.processing.size >= this.config.maxConcurrent) {
await new Promise((resolve) => setTimeout(resolve, 100))
continue
}
try {
const result = await redis.rpop('document-queue')
if (!result) {
await new Promise((resolve) => setTimeout(resolve, 500))
continue
}
const job: QueueJob = JSON.parse(result)
const promise = this.executeJob(job, processor)
this.processing.set(job.id, promise)
promise.finally(() => {
this.processing.delete(job.id)
})
} catch (error: any) {
logger.error('Error processing Redis job:', error)
await new Promise((resolve) => setTimeout(resolve, 1000))
}
}
}
const processors = Array(this.config.maxConcurrent)
.fill(null)
.map(() => processJobsContinuously())
Promise.allSettled(processors).catch((error) => {
logger.error('Error in Redis queue processors:', error)
})
}
private async processInMemoryJobs(processor: (job: QueueJob) => Promise<void>) {
const processInMemoryContinuously = async () => {
while (true) {
if (this.inMemoryProcessing >= this.config.maxConcurrent) {
await new Promise((resolve) => setTimeout(resolve, 100))
continue
}
const job = this.inMemoryQueue.shift()
if (!job) {
await new Promise((resolve) => setTimeout(resolve, 500))
continue
}
this.inMemoryProcessing++
this.executeJob(job, processor).finally(() => {
this.inMemoryProcessing--
})
}
}
const processors = Array(this.config.maxConcurrent)
.fill(null)
.map(() => processInMemoryContinuously())
Promise.allSettled(processors).catch((error) => {
logger.error('Error in in-memory queue processors:', error)
})
}
private async executeJob(
job: QueueJob,
processor: (job: QueueJob) => Promise<void>
): Promise<void> {
try {
job.attempts++
logger.info(`Processing job ${job.id} (attempt ${job.attempts}/${job.maxAttempts})`)
await processor(job)
logger.info(`Job ${job.id} completed successfully`)
} catch (error) {
logger.error(`Job ${job.id} failed (attempt ${job.attempts}):`, error)
if (job.attempts < job.maxAttempts) {
const delay = this.config.retryDelay * 2 ** (job.attempts - 1)
setTimeout(async () => {
if (this.storageMethod === 'redis') {
const redis = getRedisClient()
if (!redis) {
logger.error('Redis unavailable for retry, job lost:', job.id)
return
}
await redis.lpush('document-queue', JSON.stringify(job))
} else {
this.inMemoryQueue.push(job)
}
}, delay)
logger.info(`Job ${job.id} will retry in ${delay}ms`)
} else {
logger.error(`Job ${job.id} failed permanently after ${job.attempts} attempts`)
}
}
}
async getQueueStats(): Promise<{
pending: number
processing: number
storageMethod: StorageMethod
}> {
let pending = 0
if (this.storageMethod === 'redis') {
const redis = getRedisClient()
if (redis) {
pending = await redis.llen('document-queue')
}
} else {
pending = this.inMemoryQueue.length
}
return {
pending,
processing: this.storageMethod === 'redis' ? this.processing.size : this.inMemoryProcessing,
storageMethod: this.storageMethod,
}
}
async clearQueue(): Promise<void> {
if (this.storageMethod === 'redis') {
const redis = getRedisClient()
if (redis) {
await redis.del('document-queue')
logger.info('Redis queue cleared')
}
}
this.inMemoryQueue.length = 0
logger.info('In-memory queue cleared')
}
}

View File

@@ -25,11 +25,10 @@ import {
type SQL,
sql,
} from 'drizzle-orm'
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { env } from '@/lib/core/config/env'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import { getStorageMethod, isRedisStorage } from '@/lib/core/storage'
import { processDocument } from '@/lib/knowledge/documents/document-processor'
import { DocumentProcessingQueue } from '@/lib/knowledge/documents/queue'
import type { DocumentSortField, SortOrder } from '@/lib/knowledge/documents/types'
import { generateEmbeddings } from '@/lib/knowledge/embeddings'
import {
@@ -88,8 +87,22 @@ const REDIS_PROCESSING_CONFIG = {
delayBetweenDocuments: env.KB_CONFIG_DELAY_BETWEEN_DOCUMENTS || 50,
}
let documentQueue: DocumentProcessingQueue | null = null
export function getDocumentQueue(): DocumentProcessingQueue {
if (!documentQueue) {
const config = isRedisStorage() ? REDIS_PROCESSING_CONFIG : PROCESSING_CONFIG
documentQueue = new DocumentProcessingQueue({
maxConcurrent: config.maxConcurrentDocuments,
retryDelay: env.KB_CONFIG_MIN_TIMEOUT || 1000,
maxRetries: env.KB_CONFIG_MAX_ATTEMPTS || 3,
})
}
return documentQueue
}
export function getProcessingConfig() {
return isBullMQEnabled() ? REDIS_PROCESSING_CONFIG : PROCESSING_CONFIG
return isRedisStorage() ? REDIS_PROCESSING_CONFIG : PROCESSING_CONFIG
}
export interface DocumentData {
@@ -121,54 +134,6 @@ export interface DocumentJobData {
requestId: string
}
export async function dispatchDocumentProcessingJob(payload: DocumentJobData): Promise<void> {
if (isTriggerAvailable()) {
await tasks.trigger('knowledge-process-document', payload)
return
}
if (isBullMQEnabled()) {
const workspaceRows = await db
.select({
workspaceId: knowledgeBase.workspaceId,
userId: knowledgeBase.userId,
})
.from(knowledgeBase)
.where(and(eq(knowledgeBase.id, payload.knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
.limit(1)
const workspaceId = workspaceRows[0]?.workspaceId
const userId = workspaceRows[0]?.userId
if (!workspaceId || !userId) {
throw new Error(`Knowledge base not found: ${payload.knowledgeBaseId}`)
}
await enqueueWorkspaceDispatch({
workspaceId,
lane: 'knowledge',
queueName: 'knowledge-process-document',
bullmqJobName: 'knowledge-process-document',
bullmqPayload: createBullMQJobData(payload),
metadata: {
userId,
},
})
return
}
void processDocumentAsync(
payload.knowledgeBaseId,
payload.documentId,
payload.docData,
payload.processingOptions
).catch((error) => {
logger.error(`[${payload.requestId}] Direct document processing failed`, {
documentId: payload.documentId,
error: error instanceof Error ? error.message : String(error),
})
})
}
export interface DocumentTagData {
tagName: string
fieldType: string
@@ -357,7 +322,7 @@ export async function processDocumentTags(
}
/**
* Process documents with the configured background execution backend.
* Process documents with best available method: Trigger.dev > Redis queue > in-memory concurrency control
*/
export async function processDocumentsWithQueue(
createdDocuments: DocumentData[],
@@ -365,29 +330,76 @@ export async function processDocumentsWithQueue(
processingOptions: ProcessingOptions,
requestId: string
): Promise<void> {
const jobPayloads = createdDocuments.map<DocumentJobData>((doc) => ({
knowledgeBaseId,
documentId: doc.documentId,
docData: {
filename: doc.filename,
fileUrl: doc.fileUrl,
fileSize: doc.fileSize,
mimeType: doc.mimeType,
},
processingOptions,
requestId,
}))
// Priority 1: Trigger.dev
if (isTriggerAvailable()) {
try {
logger.info(
`[${requestId}] Using Trigger.dev background processing for ${createdDocuments.length} documents`
)
const triggerPayloads = createdDocuments.map((doc) => ({
knowledgeBaseId,
documentId: doc.documentId,
docData: {
filename: doc.filename,
fileUrl: doc.fileUrl,
fileSize: doc.fileSize,
mimeType: doc.mimeType,
},
processingOptions,
requestId,
}))
const result = await processDocumentsWithTrigger(triggerPayloads, requestId)
if (result.success) {
logger.info(
`[${requestId}] Successfully triggered background processing: ${result.message}`
)
return
}
logger.warn(`[${requestId}] Trigger.dev failed: ${result.message}, falling back to Redis`)
} catch (error) {
logger.warn(`[${requestId}] Trigger.dev processing failed, falling back to Redis:`, error)
}
}
// Priority 2: Queue-based processing (Redis or in-memory based on storage method)
const queue = getDocumentQueue()
const storageMethod = getStorageMethod()
logger.info(
`[${requestId}] Dispatching background processing for ${jobPayloads.length} documents`,
{
backend: isTriggerAvailable() ? 'trigger-dev' : isBullMQEnabled() ? 'bullmq' : 'direct',
}
`[${requestId}] Using ${storageMethod} queue for ${createdDocuments.length} documents`
)
await Promise.all(jobPayloads.map((payload) => dispatchDocumentProcessingJob(payload)))
const jobPromises = createdDocuments.map((doc) =>
queue.addJob<DocumentJobData>('process-document', {
knowledgeBaseId,
documentId: doc.documentId,
docData: {
filename: doc.filename,
fileUrl: doc.fileUrl,
fileSize: doc.fileSize,
mimeType: doc.mimeType,
},
processingOptions,
requestId,
})
)
logger.info(`[${requestId}] All documents dispatched for processing`)
await Promise.all(jobPromises)
queue
.processJobs(async (job) => {
const data = job.data as DocumentJobData
const { knowledgeBaseId, documentId, docData, processingOptions } = data
await processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions)
})
.catch((error) => {
logger.error(`[${requestId}] Error in queue processing:`, error)
})
logger.info(`[${requestId}] All documents queued for processing`)
return
}
@@ -647,7 +659,7 @@ export async function processDocumentAsync(
* Check if Trigger.dev is available and configured
*/
export function isTriggerAvailable(): boolean {
return Boolean(env.TRIGGER_SECRET_KEY) && isTriggerDevEnabled
return !!(env.TRIGGER_SECRET_KEY && env.TRIGGER_DEV_ENABLED !== false)
}
/**

View File

@@ -12,7 +12,6 @@ import {
} from '@/lib/notifications/alert-rules'
import { getActiveWorkflowContext } from '@/lib/workflows/active-context'
import {
enqueueNotificationDeliveryDispatch,
executeNotificationDelivery,
workspaceNotificationDeliveryTask,
} from '@/background/workspace-notification-delivery'
@@ -132,7 +131,6 @@ export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog):
const payload = {
deliveryId,
subscriptionId: subscription.id,
workspaceId,
notificationType: subscription.notificationType,
log: notificationLog,
alertConfig: alertConfig || undefined,
@@ -143,10 +141,6 @@ export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog):
logger.info(
`Enqueued ${subscription.notificationType} notification ${deliveryId} via Trigger.dev`
)
} else if (await enqueueNotificationDeliveryDispatch(payload)) {
logger.info(
`Enqueued ${subscription.notificationType} notification ${deliveryId} via BullMQ`
)
} else {
void executeNotificationDelivery(payload).catch((error) => {
logger.error(`Direct notification delivery failed for ${deliveryId}`, { error })

View File

@@ -1,7 +1,6 @@
import { copilotChats, db, mothershipInboxTask, permissions, user, workspace } from '@sim/db'
import { createLogger } from '@sim/logger'
import { and, eq, sql } from 'drizzle-orm'
import { createRunSegment } from '@/lib/copilot/async-runs/repository'
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat-payload'
import { requestChatTitle } from '@/lib/copilot/chat-streaming'
@@ -188,27 +187,10 @@ export async function executeInboxTask(taskId: string): Promise<void> {
...(fileAttachments.length > 0 ? { fileAttachments } : {}),
}
const executionId = crypto.randomUUID()
const runId = crypto.randomUUID()
const runStreamId = crypto.randomUUID()
if (chatId) {
await createRunSegment({
id: runId,
executionId,
chatId,
userId,
workspaceId: ws.id,
streamId: runStreamId,
}).catch(() => {})
}
const result = await orchestrateCopilotStream(requestPayload, {
userId,
workspaceId: ws.id,
chatId: chatId ?? undefined,
executionId,
runId,
goRoute: '/api/mothership/execute',
autoExecuteTools: true,
interactive: false,

View File

@@ -12,7 +12,6 @@ import { v4 as uuidv4 } from 'uuid'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { TRIGGER_TYPES } from '@/lib/workflows/triggers/triggers'
import {
enqueueNotificationDeliveryDispatch,
executeNotificationDelivery,
workspaceNotificationDeliveryTask,
} from '@/background/workspace-notification-delivery'
@@ -182,7 +181,6 @@ async function checkWorkflowInactivity(
const payload = {
deliveryId,
subscriptionId: subscription.id,
workspaceId: workflowData.workspaceId,
notificationType: subscription.notificationType,
log: mockLog,
alertConfig,
@@ -190,7 +188,6 @@ async function checkWorkflowInactivity(
if (isTriggerDevEnabled) {
await workspaceNotificationDeliveryTask.trigger(payload)
} else if (await enqueueNotificationDeliveryDispatch(payload)) {
} else {
void executeNotificationDelivery(payload).catch((error) => {
logger.error(`Direct notification delivery failed for ${deliveryId}`, { error })

View File

@@ -5,8 +5,8 @@ import {
generatePresignedDownloadUrl,
generatePresignedUploadUrl,
} from '@/lib/uploads/core/storage-service'
import type { PresignedUrlResponse } from '@/lib/uploads/shared/types'
import { isImageFileType } from '@/lib/uploads/utils/file-utils'
import type { PresignedUrlResponse } from '@/lib/uploads/shared/types'
const logger = createLogger('CopilotFileManager')

View File

@@ -13,10 +13,14 @@ import {
incrementStorageUsage,
} from '@/lib/billing/storage'
import { normalizeVfsSegment } from '@/lib/copilot/vfs/normalize-segment'
import {
downloadFile,
hasCloudStorage,
uploadFile,
} from '@/lib/uploads/core/storage-service'
import { getFileMetadataByKey, insertFileMetadata } from '@/lib/uploads/server/metadata'
import { getPostgresErrorCode } from '@/lib/core/utils/pg-error'
import { generateRestoreName } from '@/lib/core/utils/restore-name'
import { downloadFile, hasCloudStorage, uploadFile } from '@/lib/uploads/core/storage-service'
import { getFileMetadataByKey, insertFileMetadata } from '@/lib/uploads/server/metadata'
import { isUuid, sanitizeFileName } from '@/executor/constants'
import type { UserFile } from '@/executor/types'
@@ -252,10 +256,7 @@ export async function uploadWorkspaceFile(
}
}
logger.error(
`Failed to upload workspace file after ${MAX_UPLOAD_UNIQUE_RETRIES} attempts`,
lastError
)
logger.error(`Failed to upload workspace file after ${MAX_UPLOAD_UNIQUE_RETRIES} attempts`, lastError)
throw new FileConflictError(fileName)
}
@@ -277,13 +278,7 @@ export async function trackChatUpload(
const updated = await db
.update(workspaceFiles)
.set({ chatId, context: 'mothership' })
.where(
and(
eq(workspaceFiles.key, s3Key),
eq(workspaceFiles.workspaceId, workspaceId),
isNull(workspaceFiles.deletedAt)
)
)
.where(and(eq(workspaceFiles.key, s3Key), eq(workspaceFiles.workspaceId, workspaceId), isNull(workspaceFiles.deletedAt)))
.returning({ id: workspaceFiles.id })
if (updated.length > 0) {
@@ -350,10 +345,7 @@ export async function listWorkspaceFiles(
.from(workspaceFiles)
.where(
scope === 'all'
? and(
eq(workspaceFiles.workspaceId, workspaceId),
eq(workspaceFiles.context, 'workspace')
)
? and(eq(workspaceFiles.workspaceId, workspaceId), eq(workspaceFiles.context, 'workspace'))
: scope === 'archived'
? and(
eq(workspaceFiles.workspaceId, workspaceId),
@@ -422,9 +414,7 @@ export function normalizeWorkspaceFileReference(fileReference: string): string {
/**
* Canonical sandbox mount path for an existing workspace file.
*/
export function getSandboxWorkspaceFilePath(
file: Pick<WorkspaceFileRecord, 'id' | 'name'>
): string {
export function getSandboxWorkspaceFilePath(file: Pick<WorkspaceFileRecord, 'id' | 'name'>): string {
return `/home/user/files/${file.id}/${file.name}`
}

View File

@@ -244,13 +244,13 @@ async function hydrateValue(
* Hydrates UserFile objects within a value to include base64 content.
* Returns the original structure with UserFile.base64 set where available.
*/
export async function hydrateUserFilesWithBase64<T>(
value: T,
export async function hydrateUserFilesWithBase64(
value: unknown,
options: Base64HydrationOptions
): Promise<T> {
): Promise<unknown> {
const logger = getHydrationLogger(options)
const state = createHydrationState(options, logger)
return (await hydrateValue(value, options, state, logger)) as T
return hydrateValue(value, options, state, logger)
}
function isPlainObject(value: unknown): value is Record<string, unknown> {

View File

@@ -137,13 +137,12 @@ export interface FileValidationError {
* Validate if a file type is supported for document processing
*/
export function validateFileType(fileName: string, mimeType: string): FileValidationError | null {
const raw = path.extname(fileName).toLowerCase().substring(1)
const extension = (/^[a-z0-9]+$/.test(raw) ? raw : '') as SupportedDocumentExtension
const extension = path.extname(fileName).toLowerCase().substring(1) as SupportedDocumentExtension
if (!SUPPORTED_DOCUMENT_EXTENSIONS.includes(extension)) {
return {
code: 'UNSUPPORTED_FILE_TYPE',
message: `Unsupported file type${extension ? `: ${extension}` : ` for "${fileName}"`}. Supported types are: ${SUPPORTED_DOCUMENT_EXTENSIONS.join(', ')}`,
message: `Unsupported file type: ${extension}. Supported types are: ${SUPPORTED_DOCUMENT_EXTENSIONS.join(', ')}`,
supportedTypes: [...SUPPORTED_DOCUMENT_EXTENSIONS],
}
}
@@ -222,8 +221,7 @@ export function validateMediaFileType(
fileName: string,
mimeType: string
): FileValidationError | null {
const raw = path.extname(fileName).toLowerCase().substring(1)
const extension = /^[a-z0-9]+$/.test(raw) ? raw : ''
const extension = path.extname(fileName).toLowerCase().substring(1)
const isAudio = SUPPORTED_AUDIO_EXTENSIONS.includes(extension as SupportedAudioExtension)
const isVideo = SUPPORTED_VIDEO_EXTENSIONS.includes(extension as SupportedVideoExtension)
@@ -231,7 +229,7 @@ export function validateMediaFileType(
if (!isAudio && !isVideo) {
return {
code: 'UNSUPPORTED_FILE_TYPE',
message: `Unsupported media file type${extension ? `: ${extension}` : ` for "${fileName}"`}. Supported audio types: ${SUPPORTED_AUDIO_EXTENSIONS.join(', ')}. Supported video types: ${SUPPORTED_VIDEO_EXTENSIONS.join(', ')}`,
message: `Unsupported media file type: ${extension}. Supported audio types: ${SUPPORTED_AUDIO_EXTENSIONS.join(', ')}. Supported video types: ${SUPPORTED_VIDEO_EXTENSIONS.join(', ')}`,
supportedTypes: [...SUPPORTED_AUDIO_EXTENSIONS, ...SUPPORTED_VIDEO_EXTENSIONS],
}
}

View File

@@ -9,14 +9,12 @@ const {
mockUuidV4,
mockPreprocessExecution,
mockEnqueue,
mockEnqueueWorkspaceDispatch,
mockGetJobQueue,
mockShouldExecuteInline,
} = vi.hoisted(() => ({
mockUuidV4: vi.fn(),
mockPreprocessExecution: vi.fn(),
mockEnqueue: vi.fn(),
mockEnqueueWorkspaceDispatch: vi.fn(),
mockGetJobQueue: vi.fn(),
mockShouldExecuteInline: vi.fn(),
}))
@@ -64,15 +62,6 @@ vi.mock('@/lib/core/async-jobs', () => ({
shouldExecuteInline: mockShouldExecuteInline,
}))
vi.mock('@/lib/core/bullmq', () => ({
isBullMQEnabled: vi.fn().mockReturnValue(true),
createBullMQJobData: vi.fn((payload: unknown, metadata?: unknown) => ({ payload, metadata })),
}))
vi.mock('@/lib/core/workspace-dispatch', () => ({
enqueueWorkspaceDispatch: mockEnqueueWorkspaceDispatch,
}))
vi.mock('@/lib/core/config/feature-flags', () => ({
isProd: false,
}))
@@ -153,7 +142,6 @@ describe('webhook processor execution identity', () => {
actorUserId: 'actor-user-1',
})
mockEnqueue.mockResolvedValue('job-1')
mockEnqueueWorkspaceDispatch.mockResolvedValue('job-1')
mockGetJobQueue.mockResolvedValue({ enqueue: mockEnqueue })
mockShouldExecuteInline.mockReturnValue(false)
mockUuidV4.mockReturnValue('generated-execution-id')
@@ -214,15 +202,15 @@ describe('webhook processor execution identity', () => {
)
expect(mockUuidV4).toHaveBeenCalledTimes(1)
expect(mockEnqueueWorkspaceDispatch).toHaveBeenCalledWith(
expect(mockEnqueue).toHaveBeenCalledWith(
'webhook-execution',
expect.objectContaining({
executionId: 'generated-execution-id',
requestId: 'request-1',
correlation: preprocessingResult.correlation,
}),
expect.objectContaining({
id: 'generated-execution-id',
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'webhook-execution',
metadata: expect.objectContaining({
workflowId: 'workflow-1',
userId: 'actor-user-1',
correlation: preprocessingResult.correlation,
}),
})

View File

@@ -7,10 +7,8 @@ import { v4 as uuidv4 } from 'uuid'
import { isOrganizationOnTeamOrEnterprisePlan } from '@/lib/billing/core/subscription'
import { getInlineJobQueue, getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
import { createBullMQJobData, isBullMQEnabled } from '@/lib/core/bullmq'
import { isProd } from '@/lib/core/config/feature-flags'
import { safeCompare } from '@/lib/core/security/encryption'
import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import {
@@ -1266,91 +1264,53 @@ export async function queueWebhookExecution(
const isPolling = isPollingWebhookProvider(payload.provider)
if (isPolling && !shouldExecuteInline()) {
const jobId = isBullMQEnabled()
? await enqueueWorkspaceDispatch({
id: executionId,
workspaceId: foundWorkflow.workspaceId,
lane: 'runtime',
queueName: 'webhook-execution',
bullmqJobName: 'webhook-execution',
bullmqPayload: createBullMQJobData(payload, {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
}),
metadata: {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
},
})
: await (await getJobQueue()).enqueue('webhook-execution', payload, {
metadata: {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
},
})
const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
metadata: {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
},
})
logger.info(
`[${options.requestId}] Queued polling webhook execution task ${jobId} for ${foundWebhook.provider} webhook via job queue`
)
} else {
const jobQueue = await getInlineJobQueue()
const jobId = isBullMQEnabled()
? await enqueueWorkspaceDispatch({
id: executionId,
workspaceId: foundWorkflow.workspaceId,
lane: 'runtime',
queueName: 'webhook-execution',
bullmqJobName: 'webhook-execution',
bullmqPayload: createBullMQJobData(payload, {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
}),
metadata: {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
},
})
: await jobQueue.enqueue('webhook-execution', payload, {
metadata: {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
},
})
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
metadata: {
workflowId: foundWorkflow.id,
userId: actorUserId,
correlation,
},
})
logger.info(
`[${options.requestId}] Queued ${foundWebhook.provider} webhook execution ${jobId} via inline backend`
`[${options.requestId}] Executing ${foundWebhook.provider} webhook ${jobId} inline`
)
if (shouldExecuteInline()) {
void (async () => {
void (async () => {
try {
await jobQueue.startJob(jobId)
const output = await executeWebhookJob(payload)
await jobQueue.completeJob(jobId, output)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logger.error(`[${options.requestId}] Webhook execution failed`, {
jobId,
error: errorMessage,
})
try {
await jobQueue.startJob(jobId)
const output = await executeWebhookJob(payload)
await jobQueue.completeJob(jobId, output)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logger.error(`[${options.requestId}] Webhook execution failed`, {
await jobQueue.markJobFailed(jobId, errorMessage)
} catch (markFailedError) {
logger.error(`[${options.requestId}] Failed to mark job as failed`, {
jobId,
error: errorMessage,
error:
markFailedError instanceof Error
? markFailedError.message
: String(markFailedError),
})
try {
await jobQueue.markJobFailed(jobId, errorMessage)
} catch (markFailedError) {
logger.error(`[${options.requestId}] Failed to mark job as failed`, {
jobId,
error:
markFailedError instanceof Error
? markFailedError.message
: String(markFailedError),
})
}
}
})()
}
}
})()
}
if (foundWebhook.provider === 'microsoft-teams') {

View File

@@ -242,17 +242,18 @@ export interface SSECallbackOptions {
}
/**
* Creates execution callbacks using a provided event sink.
* Creates SSE callbacks for workflow execution streaming
*/
export function createExecutionCallbacks(options: {
executionId: string
workflowId: string
sendEvent: (event: ExecutionEvent) => void | Promise<void>
}) {
const { executionId, workflowId, sendEvent } = options
export function createSSECallbacks(options: SSECallbackOptions) {
const { executionId, workflowId, controller, isStreamClosed, setStreamClosed } = options
const sendBufferedEvent = async (event: ExecutionEvent) => {
await sendEvent(event)
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed()) return
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
setStreamClosed()
}
}
const onBlockStart = async (
@@ -263,7 +264,7 @@ export function createExecutionCallbacks(options: {
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => {
await sendBufferedEvent({
sendEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
@@ -330,7 +331,7 @@ export function createExecutionCallbacks(options: {
: {}
if (hasError) {
await sendBufferedEvent({
sendEvent({
type: 'block:error',
timestamp: new Date().toISOString(),
executionId,
@@ -351,7 +352,7 @@ export function createExecutionCallbacks(options: {
},
})
} else {
await sendBufferedEvent({
sendEvent({
type: 'block:completed',
timestamp: new Date().toISOString(),
executionId,
@@ -385,7 +386,7 @@ export function createExecutionCallbacks(options: {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
await sendBufferedEvent({
sendEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId,
@@ -393,7 +394,7 @@ export function createExecutionCallbacks(options: {
data: { blockId, chunk },
})
}
await sendBufferedEvent({
sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
@@ -413,7 +414,7 @@ export function createExecutionCallbacks(options: {
iterationContext?: IterationContext,
executionOrder?: number
) => {
void sendBufferedEvent({
sendEvent({
type: 'block:childWorkflowStarted',
timestamp: new Date().toISOString(),
executionId,
@@ -430,33 +431,5 @@ export function createExecutionCallbacks(options: {
})
}
return {
sendEvent: sendBufferedEvent,
onBlockStart,
onBlockComplete,
onStream,
onChildWorkflowInstanceReady,
}
}
/**
* Creates SSE callbacks for workflow execution streaming
*/
export function createSSECallbacks(options: SSECallbackOptions) {
const { executionId, workflowId, controller, isStreamClosed, setStreamClosed } = options
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed()) return
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
setStreamClosed()
}
}
return createExecutionCallbacks({
executionId,
workflowId,
sendEvent,
})
return { sendEvent, onBlockStart, onBlockComplete, onStream, onChildWorkflowInstanceReady }
}

View File

@@ -1,339 +0,0 @@
import { createLogger } from '@sim/logger'
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import {
cleanupExecutionBase64Cache,
hydrateUserFilesWithBase64,
} from '@/lib/uploads/utils/user-file-base64.server'
import {
executeWorkflowCore,
wasExecutionFinalizedByCore,
} from '@/lib/workflows/executor/execution-core'
import {
createExecutionCallbacks,
type ExecutionEvent,
} from '@/lib/workflows/executor/execution-events'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import type { BlockLog, NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
const logger = createLogger('QueuedWorkflowExecution')
export const DIRECT_WORKFLOW_JOB_NAME = 'direct-workflow-execution'
export interface QueuedWorkflowExecutionPayload {
workflow: Record<string, any>
metadata: ExecutionMetadata
input: unknown
variables: Record<string, any>
selectedOutputs?: string[]
includeFileBase64?: boolean
base64MaxBytes?: number
stopAfterBlockId?: string
timeoutMs?: number
runFromBlock?: {
startBlockId: string
sourceSnapshot: SerializableExecutionState
}
streamEvents?: boolean
}
export interface QueuedWorkflowExecutionResult {
success: boolean
executionId: string
output: NormalizedBlockOutput
error?: string
logs?: BlockLog[]
status: 'success' | 'cancelled' | 'paused' | 'failed'
statusCode?: number
metadata?: {
duration?: number
startTime?: string
endTime?: string
}
}
function buildResult(
status: QueuedWorkflowExecutionResult['status'],
result: {
success: boolean
output: NormalizedBlockOutput
error?: string
logs?: BlockLog[]
metadata?: {
duration?: number
startTime?: string
endTime?: string
}
},
executionId: string,
statusCode?: number
): QueuedWorkflowExecutionResult {
return {
success: result.success,
executionId,
output: result.output,
error: result.error,
logs: result.logs,
status,
statusCode,
metadata: result.metadata,
}
}
export async function executeQueuedWorkflowJob(
payload: QueuedWorkflowExecutionPayload
): Promise<QueuedWorkflowExecutionResult> {
const { metadata } = payload
const { executionId, requestId, workflowId, triggerType } = metadata
const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId)
const timeoutController = createTimeoutAbortController(payload.timeoutMs)
const eventWriter = payload.streamEvents ? createExecutionEventWriter(executionId) : null
if (payload.streamEvents) {
await setExecutionMeta(executionId, {
status: 'active',
userId: metadata.userId,
workflowId,
})
}
try {
const snapshot = new ExecutionSnapshot(
metadata,
payload.workflow,
payload.input,
payload.variables,
payload.selectedOutputs ?? []
)
let callbacks = {}
if (eventWriter) {
const executionCallbacks = createExecutionCallbacks({
executionId,
workflowId,
sendEvent: async (event: ExecutionEvent) => {
await eventWriter.write(event)
},
})
callbacks = {
onBlockStart: executionCallbacks.onBlockStart,
onBlockComplete: executionCallbacks.onBlockComplete,
onStream: executionCallbacks.onStream,
onChildWorkflowInstanceReady: executionCallbacks.onChildWorkflowInstanceReady,
}
await executionCallbacks.sendEvent({
type: 'execution:started',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
startTime: metadata.startTime,
},
})
}
const result = await executeWorkflowCore({
snapshot,
callbacks,
loggingSession,
includeFileBase64: payload.includeFileBase64,
base64MaxBytes: payload.base64MaxBytes,
stopAfterBlockId: payload.stopAfterBlockId,
runFromBlock: payload.runFromBlock,
abortSignal: timeoutController.signal,
})
if (
result.status === 'cancelled' &&
timeoutController.isTimedOut() &&
timeoutController.timeoutMs
) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
await loggingSession.markAsFailed(timeoutErrorMessage)
if (eventWriter) {
await eventWriter.write({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: timeoutErrorMessage,
duration: result.metadata?.duration || 0,
},
})
await setExecutionMeta(executionId, { status: 'error' })
}
return buildResult(
'cancelled',
{
success: false,
output: result.output,
error: timeoutErrorMessage,
logs: result.logs,
metadata: result.metadata
? {
duration: result.metadata.duration,
startTime: result.metadata.startTime,
endTime: result.metadata.endTime,
}
: undefined,
},
executionId,
408
)
}
if (result.status === 'paused') {
if (!result.snapshotSeed) {
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
}
const outputWithBase64 = payload.includeFileBase64
? await hydrateUserFilesWithBase64(result.output, {
requestId,
executionId,
maxBytes: payload.base64MaxBytes,
})
: result.output
if (eventWriter) {
if (result.status === 'cancelled') {
await eventWriter.write({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
duration: result.metadata?.duration || 0,
},
})
await setExecutionMeta(executionId, { status: 'cancelled' })
} else {
await eventWriter.write({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: outputWithBase64,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || metadata.startTime,
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
await setExecutionMeta(executionId, { status: 'complete' })
}
}
return buildResult(
result.status === 'paused'
? 'paused'
: result.status === 'cancelled'
? 'cancelled'
: 'success',
{
success: result.success,
output: outputWithBase64,
error: result.error,
logs: result.logs,
metadata: result.metadata
? {
duration: result.metadata.duration,
startTime: result.metadata.startTime,
endTime: result.metadata.endTime,
}
: undefined,
},
executionId
)
} catch (error) {
logger.error('Queued workflow execution failed', {
workflowId,
executionId,
error: error instanceof Error ? error.message : String(error),
})
if (!wasExecutionFinalizedByCore(error, executionId)) {
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
await loggingSession.safeCompleteWithError({
error: {
message: error instanceof Error ? error.message : String(error),
stackTrace: error instanceof Error ? error.stack : undefined,
},
traceSpans,
})
}
if (eventWriter) {
await eventWriter.write({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: error instanceof Error ? error.message : String(error),
duration: 0,
},
})
await setExecutionMeta(executionId, { status: 'error' })
}
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
return buildResult(
'failed',
{
success: false,
output: executionResult?.output ?? {},
error: executionResult?.error || (error instanceof Error ? error.message : String(error)),
logs: executionResult?.logs,
metadata: executionResult?.metadata
? {
duration: executionResult.metadata.duration,
startTime: executionResult.metadata.startTime,
endTime: executionResult.metadata.endTime,
}
: undefined,
},
executionId,
500
)
} finally {
timeoutController.cleanup()
if (eventWriter) {
await eventWriter.close()
}
await cleanupExecutionBase64Cache(executionId).catch((error) => {
logger.error('Failed to cleanup queued workflow base64 cache', {
executionId,
error: error instanceof Error ? error.message : String(error),
})
})
}
}

View File

@@ -249,9 +249,7 @@ export async function updateWorkflowRunCounts(workflowId: string, runs = 1) {
}
}
export const workflowHasResponseBlock = (
executionResult: Pick<ExecutionResult, 'success' | 'logs'>
): boolean => {
export const workflowHasResponseBlock = (executionResult: ExecutionResult): boolean => {
if (!executionResult?.logs || !Array.isArray(executionResult.logs) || !executionResult.success) {
return false
}
@@ -263,9 +261,7 @@ export const workflowHasResponseBlock = (
return responseBlock !== undefined
}
export const createHttpResponseFromBlock = (
executionResult: Pick<ExecutionResult, 'output'>
): NextResponse => {
export const createHttpResponseFromBlock = (executionResult: ExecutionResult): NextResponse => {
const { data = {}, status = 200, headers = {} } = executionResult.output
const responseHeaders = new Headers({

View File

@@ -11,16 +11,10 @@
"dev": "next dev --port 3000",
"dev:webpack": "next dev --webpack",
"dev:sockets": "bun run socket/index.ts",
"dev:worker": "bun run worker/index.ts",
"dev:full": "bunx concurrently -n \"App,Realtime,Worker\" -c \"cyan,magenta,yellow\" \"bun run dev\" \"bun run dev:sockets\" \"bun run dev:worker\"",
"load:workflow": "bun run load:workflow:baseline",
"load:workflow:baseline": "BASE_URL=${BASE_URL:-http://localhost:3000} WARMUP_DURATION=${WARMUP_DURATION:-10} WARMUP_RATE=${WARMUP_RATE:-2} PEAK_RATE=${PEAK_RATE:-8} HOLD_DURATION=${HOLD_DURATION:-20} bunx artillery run scripts/load/workflow-concurrency.yml",
"load:workflow:waves": "BASE_URL=${BASE_URL:-http://localhost:3000} WAVE_ONE_DURATION=${WAVE_ONE_DURATION:-10} WAVE_ONE_RATE=${WAVE_ONE_RATE:-6} QUIET_DURATION=${QUIET_DURATION:-5} WAVE_TWO_DURATION=${WAVE_TWO_DURATION:-15} WAVE_TWO_RATE=${WAVE_TWO_RATE:-8} WAVE_THREE_DURATION=${WAVE_THREE_DURATION:-20} WAVE_THREE_RATE=${WAVE_THREE_RATE:-10} bunx artillery run scripts/load/workflow-waves.yml",
"load:workflow:isolation": "BASE_URL=${BASE_URL:-http://localhost:3000} ISOLATION_DURATION=${ISOLATION_DURATION:-30} TOTAL_RATE=${TOTAL_RATE:-9} WORKSPACE_A_WEIGHT=${WORKSPACE_A_WEIGHT:-8} WORKSPACE_B_WEIGHT=${WORKSPACE_B_WEIGHT:-1} bunx artillery run scripts/load/workflow-isolation.yml",
"dev:full": "bunx concurrently -n \"App,Realtime\" -c \"cyan,magenta\" \"bun run dev\" \"bun run dev:sockets\"",
"build": "bun run build:pptx-worker && next build",
"build:pptx-worker": "bun build ./lib/execution/pptx-worker.cjs --target=node --format=cjs --outfile ./dist/pptx-worker.cjs",
"start": "next start",
"worker": "NODE_ENV=production bun run worker/index.ts",
"prepare": "cd ../.. && bun husky",
"test": "vitest run",
"test:watch": "vitest",
@@ -98,7 +92,6 @@
"better-auth-harmony": "1.3.1",
"binary-extensions": "^2.0.0",
"browser-image-compression": "^2.0.2",
"bullmq": "5.71.0",
"chalk": "5.6.2",
"chart.js": "4.5.1",
"cheerio": "1.1.2",

View File

@@ -1,113 +0,0 @@
# Workflow Load Tests
These local-only Artillery scenarios exercise `POST /api/workflows/[id]/execute` in async mode.
## Requirements
- The app should be running locally, for example with `bun run dev:full`
- Each scenario needs valid workflow IDs and API keys
- All scenarios default to `http://localhost:3000`
The default rates are tuned for these local limits:
- `ADMISSION_GATE_MAX_INFLIGHT=500`
- `DISPATCH_MAX_QUEUE_PER_WORKSPACE=1000`
- `DISPATCH_MAX_QUEUE_GLOBAL=50000`
- `WORKSPACE_CONCURRENCY_FREE=5`
- `WORKSPACE_CONCURRENCY_PRO=50`
- `WORKSPACE_CONCURRENCY_TEAM=200`
- `WORKSPACE_CONCURRENCY_ENTERPRISE=200`
That means the defaults are intentionally aimed at forcing queueing for a Free workspace without overwhelming a single local dev server process.
## Baseline Concurrency
Use this to ramp traffic into one workflow and observe normal queueing behavior.
Default profile:
- Starts at `2` requests per second
- Ramps to `8` requests per second
- Holds there for `20` seconds
- Good for validating queueing against a Free workspace concurrency of `5`
```bash
WORKFLOW_ID=<workflow-id> \
SIM_API_KEY=<api-key> \
bun run load:workflow:baseline
```
Optional variables:
- `BASE_URL`
- `WARMUP_DURATION`
- `WARMUP_RATE`
- `PEAK_RATE`
- `HOLD_DURATION`
For higher-plan workspaces, a good local starting point is:
- Pro: `PEAK_RATE=20` to `40`
- Team or Enterprise: `PEAK_RATE=50` to `100`
## Queueing Waves
Use this to send repeated bursts to one workflow in the same workspace.
Default profile:
- Wave 1: `6` requests per second for `10` seconds
- Wave 2: `8` requests per second for `15` seconds
- Wave 3: `10` requests per second for `20` seconds
- Quiet gaps: `5` seconds
```bash
WORKFLOW_ID=<workflow-id> \
SIM_API_KEY=<api-key> \
bun run load:workflow:waves
```
Optional variables:
- `BASE_URL`
- `WAVE_ONE_DURATION`
- `WAVE_ONE_RATE`
- `QUIET_DURATION`
- `WAVE_TWO_DURATION`
- `WAVE_TWO_RATE`
- `WAVE_THREE_DURATION`
- `WAVE_THREE_RATE`
## Two-Workspace Isolation
Use this to send mixed traffic to two workflows from different workspaces and compare whether one workspace's queue pressure appears to affect the other.
Default profile:
- Total rate: `9` requests per second for `30` seconds
- Weight split: `8:1`
- In practice this sends heavy pressure to workspace A while still sending a light stream to workspace B
```bash
WORKFLOW_ID_A=<workspace-a-workflow-id> \
SIM_API_KEY_A=<workspace-a-api-key> \
WORKFLOW_ID_B=<workspace-b-workflow-id> \
SIM_API_KEY_B=<workspace-b-api-key> \
bun run load:workflow:isolation
```
Optional variables:
- `BASE_URL`
- `ISOLATION_DURATION`
- `TOTAL_RATE`
- `WORKSPACE_A_WEIGHT`
- `WORKSPACE_B_WEIGHT`
## Notes
- `load:workflow` is an alias for `load:workflow:baseline`
- All scenarios send `x-execution-mode: async`
- Artillery output will show request counts and response codes, which is usually enough for quick local verification
- At these defaults, you should observe queueing behavior before you approach `ADMISSION_GATE_MAX_INFLIGHT=500` or `DISPATCH_MAX_QUEUE_PER_WORKSPACE=1000`
- If you still see lots of `429` or `ETIMEDOUT` responses locally, lower the rates again before increasing durations

View File

@@ -1,24 +0,0 @@
config:
target: "{{ $processEnvironment.BASE_URL }}"
phases:
- duration: "{{ $processEnvironment.WARMUP_DURATION }}"
arrivalRate: "{{ $processEnvironment.WARMUP_RATE }}"
rampTo: "{{ $processEnvironment.PEAK_RATE }}"
name: baseline-ramp
- duration: "{{ $processEnvironment.HOLD_DURATION }}"
arrivalRate: "{{ $processEnvironment.PEAK_RATE }}"
name: baseline-hold
defaults:
headers:
content-type: application/json
x-api-key: "{{ $processEnvironment.SIM_API_KEY }}"
x-execution-mode: async
scenarios:
- name: baseline-workflow-concurrency
flow:
- post:
url: "/api/workflows/{{ $processEnvironment.WORKFLOW_ID }}/execute"
json:
input:
source: artillery-baseline
runId: "{{ $uuid }}"

View File

@@ -1,35 +0,0 @@
config:
target: "{{ $processEnvironment.BASE_URL }}"
phases:
- duration: "{{ $processEnvironment.ISOLATION_DURATION }}"
arrivalRate: "{{ $processEnvironment.TOTAL_RATE }}"
name: mixed-workspace-load
defaults:
headers:
content-type: application/json
x-execution-mode: async
scenarios:
- name: workspace-a-traffic
weight: "{{ $processEnvironment.WORKSPACE_A_WEIGHT }}"
flow:
- post:
url: "/api/workflows/{{ $processEnvironment.WORKFLOW_ID_A }}/execute"
headers:
x-api-key: "{{ $processEnvironment.SIM_API_KEY_A }}"
json:
input:
source: artillery-isolation
workspace: a
runId: "{{ $uuid }}"
- name: workspace-b-traffic
weight: "{{ $processEnvironment.WORKSPACE_B_WEIGHT }}"
flow:
- post:
url: "/api/workflows/{{ $processEnvironment.WORKFLOW_ID_B }}/execute"
headers:
x-api-key: "{{ $processEnvironment.SIM_API_KEY_B }}"
json:
input:
source: artillery-isolation
workspace: b
runId: "{{ $uuid }}"

View File

@@ -1,33 +0,0 @@
config:
target: "{{ $processEnvironment.BASE_URL }}"
phases:
- duration: "{{ $processEnvironment.WAVE_ONE_DURATION }}"
arrivalRate: "{{ $processEnvironment.WAVE_ONE_RATE }}"
name: wave-one
- duration: "{{ $processEnvironment.QUIET_DURATION }}"
arrivalRate: 1
name: quiet-gap
- duration: "{{ $processEnvironment.WAVE_TWO_DURATION }}"
arrivalRate: "{{ $processEnvironment.WAVE_TWO_RATE }}"
name: wave-two
- duration: "{{ $processEnvironment.QUIET_DURATION }}"
arrivalRate: 1
name: quiet-gap-two
- duration: "{{ $processEnvironment.WAVE_THREE_DURATION }}"
arrivalRate: "{{ $processEnvironment.WAVE_THREE_RATE }}"
name: wave-three
defaults:
headers:
content-type: application/json
x-api-key: "{{ $processEnvironment.SIM_API_KEY }}"
x-execution-mode: async
scenarios:
- name: workflow-queue-waves
flow:
- post:
url: "/api/workflows/{{ $processEnvironment.WORKFLOW_ID }}/execute"
json:
input:
source: artillery-waves
runId: "{{ $uuid }}"
waveProfile: single-workspace

View File

@@ -76,7 +76,7 @@ export const linearListAttachmentsTool: ToolConfig<
variables: {
issueId: params.issueId,
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}),
},

View File

@@ -76,7 +76,7 @@ export const linearListCommentsTool: ToolConfig<
variables: {
issueId: params.issueId,
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}),
},

View File

@@ -89,7 +89,7 @@ export const linearListCustomerRequestsTool: ToolConfig<
`,
variables: {
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
includeArchived: params.includeArchived || false,
},
}),

View File

@@ -70,7 +70,7 @@ export const linearListCustomerStatusesTool: ToolConfig<
`,
variables: {
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}),
},

View File

@@ -69,7 +69,7 @@ export const linearListCustomerTiersTool: ToolConfig<
`,
variables: {
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}),
},

View File

@@ -76,7 +76,7 @@ export const linearListCustomersTool: ToolConfig<
`,
variables: {
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
includeArchived: params.includeArchived || false,
},
}),

View File

@@ -80,7 +80,7 @@ export const linearListCyclesTool: ToolConfig<LinearListCyclesParams, LinearList
variables: {
filter: Object.keys(filter).length > 0 ? filter : undefined,
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}
},

View File

@@ -71,7 +71,7 @@ export const linearListFavoritesTool: ToolConfig<
`,
variables: {
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}),
},

View File

@@ -79,7 +79,7 @@ export const linearListIssueRelationsTool: ToolConfig<
variables: {
issueId: params.issueId,
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}),
},

View File

@@ -80,7 +80,7 @@ export const linearListLabelsTool: ToolConfig<LinearListLabelsParams, LinearList
variables: {
filter: Object.keys(filter).length > 0 ? filter : undefined,
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}
},

View File

@@ -70,7 +70,7 @@ export const linearListNotificationsTool: ToolConfig<
`,
variables: {
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}),
},

View File

@@ -82,7 +82,7 @@ export const linearListProjectLabelsTool: ToolConfig<
variables: {
id: params.projectId.trim(),
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}
}
@@ -110,7 +110,7 @@ export const linearListProjectLabelsTool: ToolConfig<
`,
variables: {
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}
},

View File

@@ -82,7 +82,7 @@ export const linearListProjectMilestonesTool: ToolConfig<
variables: {
id: params.projectId,
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}),
},

View File

@@ -71,7 +71,7 @@ export const linearListProjectStatusesTool: ToolConfig<
`,
variables: {
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}),
},

View File

@@ -77,7 +77,7 @@ export const linearListProjectUpdatesTool: ToolConfig<
variables: {
projectId: params.projectId,
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
},
}),
},

View File

@@ -94,7 +94,7 @@ export const linearListProjectsTool: ToolConfig<
`,
variables: {
first: params.first ? Number(params.first) : 50,
after: params.after?.trim() || undefined,
after: params.after,
includeArchived: params.includeArchived || false,
},
}

Some files were not shown because too many files have changed in this diff Show More