Compare commits

..

6 Commits

Author SHA1 Message Date
Vikhyath Mondreti
0db87778c4 fix(mistral): restore mistral configs for v2 version 2026-02-04 15:48:21 -08:00
Waleed
8d846c5983 feat(async-jobs): async execution with job queue backends (#3134)
* feat(async-jobs): async execution with job queue backends

* added migration

* remove unused envvar, remove extraneous comments

* ack comment

* same for db

* added dedicated async envvars for timeouts, updated helm

* updated comment

* ack comment

* migrated routes to be more restful

* ack comments

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 14:52:33 -08:00
Vikhyath Mondreti
362f4c2918 improvement(timeouts): sync to 50 min, self-hosted maxed out (#3133)
* improvement(timeouts): sync to 50 min, self-hosted maxed out

* update env vars
2026-02-04 11:27:41 -08:00
Waleed
c77e351067 fix(providers): correct tool calling message format across all providers (#3132)
* fix(providers): correct tool calling message format across all providers

* fix(bedrock): correct timestamp char count in comment

* chore(gemini): remove dead executeToolCall function

* remove unused var
2026-02-04 11:02:49 -08:00
Vikhyath Mondreti
a627faabe7 feat(timeouts): execution timeout limits (#3120)
* feat(timeouts): execution timeout limits

* fix type issues

* add to docs

* update stale exec cleanup route

* update more callsites

* update tests

* address bugbot comments

* remove import expression

* support streaming and async paths'

* fix streaming path

* add hitl and workflow handler

* make sync path match

* consolidate

* timeout errors

* validation errors typed

* import order

* Merge staging into feat/timeout-lims

Resolved conflicts:
- stt/route.ts: Keep both execution timeout and security imports
- textract/parse/route.ts: Keep both execution timeout and validation imports
- use-workflow-execution.ts: Keep cancellation console entry from feature branch
- input-validation.ts: Remove server functions (moved to .server.ts in staging)
- tools/index.ts: Keep execution timeout, use .server import for security

* make run from block consistent

* revert console update change

* fix subflow errors

* clean up base 64 cache correctly

* update docs

* consolidate workflow execution and run from block hook code

* remove unused constant

* fix cleanup base64 sse

* fix run from block tracespan
2026-02-04 10:26:36 -08:00
Vikhyath Mondreti
f811594875 improvement(rooms): redis client closed should fail with indicator (#3115)
* improvement(rooms): redis client closed should fail fast

* bugbot comment

* consolidate
2026-02-03 23:48:46 -08:00
62 changed files with 12772 additions and 1273 deletions

View File

@@ -220,9 +220,9 @@ Workflows have maximum execution time limits based on your subscription plan:
| Plan | Sync Execution | Async Execution |
|------|----------------|-----------------|
| **Free** | 5 minutes | 10 minutes |
| **Pro** | 60 minutes | 90 minutes |
| **Team** | 60 minutes | 90 minutes |
| **Enterprise** | 60 minutes | 90 minutes |
| **Pro** | 50 minutes | 90 minutes |
| **Team** | 50 minutes | 90 minutes |
| **Enterprise** | 50 minutes | 90 minutes |
**Sync executions** run immediately and return results directly. These are triggered via the API with `async: false` (default) or through the UI.
**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background. Async time limits are up to 2x the sync limit, capped at 90 minutes.

View File

@@ -21,6 +21,7 @@ const UpdateCreatorProfileSchema = z.object({
name: z.string().min(1, 'Name is required').max(100, 'Max 100 characters').optional(),
profileImageUrl: z.string().optional().or(z.literal('')),
details: CreatorProfileDetailsSchema.optional(),
verified: z.boolean().optional(), // Verification status (super users only)
})
// Helper to check if user has permission to manage profile
@@ -97,11 +98,29 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Profile not found' }, { status: 404 })
}
// Check permissions
const canEdit = await hasPermission(session.user.id, existing[0])
if (!canEdit) {
logger.warn(`[${requestId}] User denied permission to update profile: ${id}`)
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
// Verification changes require super user permission
if (data.verified !== undefined) {
const { verifyEffectiveSuperUser } = await import('@/lib/templates/permissions')
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
if (!effectiveSuperUser) {
logger.warn(`[${requestId}] Non-super user attempted to change creator verification: ${id}`)
return NextResponse.json(
{ error: 'Only super users can change verification status' },
{ status: 403 }
)
}
}
// For non-verified updates, check regular permissions
const hasNonVerifiedUpdates =
data.name !== undefined || data.profileImageUrl !== undefined || data.details !== undefined
if (hasNonVerifiedUpdates) {
const canEdit = await hasPermission(session.user.id, existing[0])
if (!canEdit) {
logger.warn(`[${requestId}] User denied permission to update profile: ${id}`)
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
}
const updateData: any = {
@@ -111,6 +130,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
if (data.name !== undefined) updateData.name = data.name
if (data.profileImageUrl !== undefined) updateData.profileImageUrl = data.profileImageUrl
if (data.details !== undefined) updateData.details = data.details
if (data.verified !== undefined) updateData.verified = data.verified
const updated = await db
.update(templateCreators)

View File

@@ -1,113 +0,0 @@
import { db } from '@sim/db'
import { templateCreators } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { generateRequestId } from '@/lib/core/utils/request'
import { verifyEffectiveSuperUser } from '@/lib/templates/permissions'
const logger = createLogger('CreatorVerificationAPI')
export const revalidate = 0
// POST /api/creators/[id]/verify - Verify a creator (super users only)
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const { id } = await params
try {
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized verification attempt for creator: ${id}`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
// Check if user is a super user
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
if (!effectiveSuperUser) {
logger.warn(`[${requestId}] Non-super user attempted to verify creator: ${id}`)
return NextResponse.json({ error: 'Only super users can verify creators' }, { status: 403 })
}
// Check if creator exists
const existingCreator = await db
.select()
.from(templateCreators)
.where(eq(templateCreators.id, id))
.limit(1)
if (existingCreator.length === 0) {
logger.warn(`[${requestId}] Creator not found for verification: ${id}`)
return NextResponse.json({ error: 'Creator not found' }, { status: 404 })
}
// Update creator verified status to true
await db
.update(templateCreators)
.set({ verified: true, updatedAt: new Date() })
.where(eq(templateCreators.id, id))
logger.info(`[${requestId}] Creator verified: ${id} by super user: ${session.user.id}`)
return NextResponse.json({
message: 'Creator verified successfully',
creatorId: id,
})
} catch (error) {
logger.error(`[${requestId}] Error verifying creator ${id}`, error)
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
// DELETE /api/creators/[id]/verify - Unverify a creator (super users only)
export async function DELETE(
request: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
const requestId = generateRequestId()
const { id } = await params
try {
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized unverification attempt for creator: ${id}`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
// Check if user is a super user
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
if (!effectiveSuperUser) {
logger.warn(`[${requestId}] Non-super user attempted to unverify creator: ${id}`)
return NextResponse.json({ error: 'Only super users can unverify creators' }, { status: 403 })
}
// Check if creator exists
const existingCreator = await db
.select()
.from(templateCreators)
.where(eq(templateCreators.id, id))
.limit(1)
if (existingCreator.length === 0) {
logger.warn(`[${requestId}] Creator not found for unverification: ${id}`)
return NextResponse.json({ error: 'Creator not found' }, { status: 404 })
}
// Update creator verified status to false
await db
.update(templateCreators)
.set({ verified: false, updatedAt: new Date() })
.where(eq(templateCreators.id, id))
logger.info(`[${requestId}] Creator unverified: ${id} by super user: ${session.user.id}`)
return NextResponse.json({
message: 'Creator unverified successfully',
creatorId: id,
})
} catch (error) {
logger.error(`[${requestId}] Error unverifying creator ${id}`, error)
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -1,9 +1,10 @@
import { db } from '@sim/db'
import { asyncJobs, db } from '@sim/db'
import { workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, lt, sql } from 'drizzle-orm'
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs'
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
const logger = createLogger('CleanupStaleExecutions')
@@ -80,12 +81,102 @@ export async function GET(request: NextRequest) {
logger.info(`Stale execution cleanup completed. Cleaned: ${cleaned}, Failed: ${failed}`)
// Clean up stale async jobs (stuck in processing)
let asyncJobsMarkedFailed = 0
try {
const staleAsyncJobs = await db
.update(asyncJobs)
.set({
status: JOB_STATUS.FAILED,
completedAt: new Date(),
error: `Job terminated: stuck in processing for more than ${STALE_THRESHOLD_MINUTES} minutes`,
updatedAt: new Date(),
})
.where(
and(eq(asyncJobs.status, JOB_STATUS.PROCESSING), lt(asyncJobs.startedAt, staleThreshold))
)
.returning({ id: asyncJobs.id })
asyncJobsMarkedFailed = staleAsyncJobs.length
if (asyncJobsMarkedFailed > 0) {
logger.info(`Marked ${asyncJobsMarkedFailed} stale async jobs as failed`)
}
} catch (error) {
logger.error('Failed to clean up stale async jobs:', {
error: error instanceof Error ? error.message : String(error),
})
}
// Clean up stale pending jobs (never started, e.g., due to server crash before startJob())
let stalePendingJobsMarkedFailed = 0
try {
const stalePendingJobs = await db
.update(asyncJobs)
.set({
status: JOB_STATUS.FAILED,
completedAt: new Date(),
error: `Job terminated: stuck in pending state for more than ${STALE_THRESHOLD_MINUTES} minutes (never started)`,
updatedAt: new Date(),
})
.where(
and(eq(asyncJobs.status, JOB_STATUS.PENDING), lt(asyncJobs.createdAt, staleThreshold))
)
.returning({ id: asyncJobs.id })
stalePendingJobsMarkedFailed = stalePendingJobs.length
if (stalePendingJobsMarkedFailed > 0) {
logger.info(`Marked ${stalePendingJobsMarkedFailed} stale pending jobs as failed`)
}
} catch (error) {
logger.error('Failed to clean up stale pending jobs:', {
error: error instanceof Error ? error.message : String(error),
})
}
// Delete completed/failed jobs older than retention period
const retentionThreshold = new Date(Date.now() - JOB_RETENTION_HOURS * 60 * 60 * 1000)
let asyncJobsDeleted = 0
try {
const deletedJobs = await db
.delete(asyncJobs)
.where(
and(
inArray(asyncJobs.status, [JOB_STATUS.COMPLETED, JOB_STATUS.FAILED]),
lt(asyncJobs.completedAt, retentionThreshold)
)
)
.returning({ id: asyncJobs.id })
asyncJobsDeleted = deletedJobs.length
if (asyncJobsDeleted > 0) {
logger.info(
`Deleted ${asyncJobsDeleted} old async jobs (retention: ${JOB_RETENTION_HOURS}h)`
)
}
} catch (error) {
logger.error('Failed to delete old async jobs:', {
error: error instanceof Error ? error.message : String(error),
})
}
return NextResponse.json({
success: true,
found: staleExecutions.length,
cleaned,
failed,
thresholdMinutes: STALE_THRESHOLD_MINUTES,
executions: {
found: staleExecutions.length,
cleaned,
failed,
thresholdMinutes: STALE_THRESHOLD_MINUTES,
},
asyncJobs: {
staleProcessingMarkedFailed: asyncJobsMarkedFailed,
stalePendingMarkedFailed: stalePendingJobsMarkedFailed,
oldDeleted: asyncJobsDeleted,
staleThresholdMinutes: STALE_THRESHOLD_MINUTES,
retentionHours: JOB_RETENTION_HOURS,
},
})
} catch (error) {
logger.error('Error in stale execution cleanup job:', error)

View File

@@ -1,7 +1,7 @@
import { createLogger } from '@sim/logger'
import { runs } from '@trigger.dev/sdk'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { getJobQueue, JOB_STATUS } from '@/lib/core/async-jobs'
import { generateRequestId } from '@/lib/core/utils/request'
import { createErrorResponse } from '@/app/api/workflows/utils'
@@ -15,8 +15,6 @@ export async function GET(
const requestId = generateRequestId()
try {
logger.debug(`[${requestId}] Getting status for task: ${taskId}`)
const authResult = await checkHybridAuth(request, { requireWorkflowId: false })
if (!authResult.success || !authResult.userId) {
logger.warn(`[${requestId}] Unauthorized task status request`)
@@ -25,76 +23,60 @@ export async function GET(
const authenticatedUserId = authResult.userId
const run = await runs.retrieve(taskId)
const jobQueue = await getJobQueue()
const job = await jobQueue.getJob(taskId)
logger.debug(`[${requestId}] Task ${taskId} status: ${run.status}`)
const payload = run.payload as any
if (payload?.workflowId) {
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
const accessCheck = await verifyWorkflowAccess(authenticatedUserId, payload.workflowId)
if (!accessCheck.hasAccess) {
logger.warn(`[${requestId}] User ${authenticatedUserId} denied access to task ${taskId}`, {
workflowId: payload.workflowId,
})
return createErrorResponse('Access denied', 403)
}
logger.debug(`[${requestId}] User ${authenticatedUserId} has access to task ${taskId}`)
} else {
if (payload?.userId && payload.userId !== authenticatedUserId) {
logger.warn(
`[${requestId}] User ${authenticatedUserId} attempted to access task ${taskId} owned by ${payload.userId}`
)
return createErrorResponse('Access denied', 403)
}
if (!payload?.userId) {
logger.warn(
`[${requestId}] Task ${taskId} has no ownership information in payload. Denying access for security.`
)
return createErrorResponse('Access denied', 403)
}
if (!job) {
return createErrorResponse('Task not found', 404)
}
const statusMap = {
QUEUED: 'queued',
WAITING_FOR_DEPLOY: 'queued',
EXECUTING: 'processing',
RESCHEDULED: 'processing',
FROZEN: 'processing',
COMPLETED: 'completed',
CANCELED: 'cancelled',
FAILED: 'failed',
CRASHED: 'failed',
INTERRUPTED: 'failed',
SYSTEM_FAILURE: 'failed',
EXPIRED: 'failed',
} as const
if (job.metadata?.workflowId) {
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
const accessCheck = await verifyWorkflowAccess(
authenticatedUserId,
job.metadata.workflowId as string
)
if (!accessCheck.hasAccess) {
logger.warn(`[${requestId}] Access denied to workflow ${job.metadata.workflowId}`)
return createErrorResponse('Access denied', 403)
}
} else if (job.metadata?.userId && job.metadata.userId !== authenticatedUserId) {
logger.warn(`[${requestId}] Access denied to user ${job.metadata.userId}`)
return createErrorResponse('Access denied', 403)
} else if (!job.metadata?.userId && !job.metadata?.workflowId) {
logger.warn(`[${requestId}] Access denied to job ${taskId}`)
return createErrorResponse('Access denied', 403)
}
const mappedStatus = statusMap[run.status as keyof typeof statusMap] || 'unknown'
const mappedStatus = job.status === JOB_STATUS.PENDING ? 'queued' : job.status
const response: any = {
success: true,
taskId,
status: mappedStatus,
metadata: {
startedAt: run.startedAt,
startedAt: job.startedAt,
},
}
if (mappedStatus === 'completed') {
response.output = run.output // This contains the workflow execution results
response.metadata.completedAt = run.finishedAt
response.metadata.duration = run.durationMs
if (job.status === JOB_STATUS.COMPLETED) {
response.output = job.output
response.metadata.completedAt = job.completedAt
if (job.startedAt && job.completedAt) {
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
}
}
if (mappedStatus === 'failed') {
response.error = run.error
response.metadata.completedAt = run.finishedAt
response.metadata.duration = run.durationMs
if (job.status === JOB_STATUS.FAILED) {
response.error = job.error
response.metadata.completedAt = job.completedAt
if (job.startedAt && job.completedAt) {
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
}
}
if (mappedStatus === 'processing' || mappedStatus === 'queued') {
response.estimatedDuration = 180000 // 3 minutes max from our config
if (job.status === JOB_STATUS.PROCESSING || job.status === JOB_STATUS.PENDING) {
response.estimatedDuration = 180000
}
return NextResponse.json(response)

View File

@@ -1,10 +1,9 @@
import { db, workflowDeploymentVersion, workflowSchedule } from '@sim/db'
import { createLogger } from '@sim/logger'
import { tasks } from '@trigger.dev/sdk'
import { and, eq, isNull, lt, lte, not, or, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { generateRequestId } from '@/lib/core/utils/request'
import { executeScheduleJob } from '@/background/schedule-execution'
@@ -55,72 +54,67 @@ export async function GET(request: NextRequest) {
logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`)
logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`)
if (isTriggerDevEnabled) {
const triggerPromises = dueSchedules.map(async (schedule) => {
const queueTime = schedule.lastQueuedAt ?? queuedAt
const jobQueue = await getJobQueue()
try {
const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: queueTime.toISOString(),
scheduledFor: schedule.nextRunAt?.toISOString(),
}
const queuePromises = dueSchedules.map(async (schedule) => {
const queueTime = schedule.lastQueuedAt ?? queuedAt
const handle = await tasks.trigger('schedule-execution', payload)
logger.info(
`[${requestId}] Queued schedule execution task ${handle.id} for workflow ${schedule.workflowId}`
)
return handle
} catch (error) {
logger.error(
`[${requestId}] Failed to trigger schedule execution for workflow ${schedule.workflowId}`,
error
)
return null
}
})
const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: queueTime.toISOString(),
scheduledFor: schedule.nextRunAt?.toISOString(),
}
await Promise.allSettled(triggerPromises)
logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions to Trigger.dev`)
} else {
const directExecutionPromises = dueSchedules.map(async (schedule) => {
const queueTime = schedule.lastQueuedAt ?? queuedAt
const payload = {
scheduleId: schedule.id,
workflowId: schedule.workflowId,
blockId: schedule.blockId || undefined,
cronExpression: schedule.cronExpression || undefined,
lastRanAt: schedule.lastRanAt?.toISOString(),
failedCount: schedule.failedCount || 0,
now: queueTime.toISOString(),
scheduledFor: schedule.nextRunAt?.toISOString(),
}
void executeScheduleJob(payload).catch((error) => {
logger.error(
`[${requestId}] Direct schedule execution failed for workflow ${schedule.workflowId}`,
error
)
try {
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
metadata: { workflowId: schedule.workflowId },
})
logger.info(
`[${requestId}] Queued direct schedule execution for workflow ${schedule.workflowId} (Trigger.dev disabled)`
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
)
})
await Promise.allSettled(directExecutionPromises)
if (shouldExecuteInline()) {
void (async () => {
try {
await jobQueue.startJob(jobId)
const output = await executeScheduleJob(payload)
await jobQueue.completeJob(jobId, output)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logger.error(
`[${requestId}] Schedule execution failed for workflow ${schedule.workflowId}`,
{ jobId, error: errorMessage }
)
try {
await jobQueue.markJobFailed(jobId, errorMessage)
} catch (markFailedError) {
logger.error(`[${requestId}] Failed to mark job as failed`, {
jobId,
error:
markFailedError instanceof Error
? markFailedError.message
: String(markFailedError),
})
}
}
})()
}
} catch (error) {
logger.error(
`[${requestId}] Failed to queue schedule execution for workflow ${schedule.workflowId}`,
error
)
}
})
logger.info(
`[${requestId}] Queued ${dueSchedules.length} direct schedule executions (Trigger.dev disabled)`
)
}
await Promise.allSettled(queuePromises)
logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions`)
return NextResponse.json({
message: 'Scheduled workflow executions processed',

View File

@@ -1,101 +0,0 @@
import { db } from '@sim/db'
import { templates } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { generateRequestId } from '@/lib/core/utils/request'
import { verifyEffectiveSuperUser } from '@/lib/templates/permissions'
const logger = createLogger('TemplateApprovalAPI')
export const revalidate = 0
/**
* POST /api/templates/[id]/approve - Approve a template (super users only)
*/
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const { id } = await params
try {
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized template approval attempt for ID: ${id}`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
if (!effectiveSuperUser) {
logger.warn(`[${requestId}] Non-super user attempted to approve template: ${id}`)
return NextResponse.json({ error: 'Only super users can approve templates' }, { status: 403 })
}
const existingTemplate = await db.select().from(templates).where(eq(templates.id, id)).limit(1)
if (existingTemplate.length === 0) {
logger.warn(`[${requestId}] Template not found for approval: ${id}`)
return NextResponse.json({ error: 'Template not found' }, { status: 404 })
}
await db
.update(templates)
.set({ status: 'approved', updatedAt: new Date() })
.where(eq(templates.id, id))
logger.info(`[${requestId}] Template approved: ${id} by super user: ${session.user.id}`)
return NextResponse.json({
message: 'Template approved successfully',
templateId: id,
})
} catch (error) {
logger.error(`[${requestId}] Error approving template ${id}`, error)
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
/**
* DELETE /api/templates/[id]/approve - Unapprove a template (super users only)
*/
export async function DELETE(
_request: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
const requestId = generateRequestId()
const { id } = await params
try {
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized template rejection attempt for ID: ${id}`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
if (!effectiveSuperUser) {
logger.warn(`[${requestId}] Non-super user attempted to reject template: ${id}`)
return NextResponse.json({ error: 'Only super users can reject templates' }, { status: 403 })
}
const existingTemplate = await db.select().from(templates).where(eq(templates.id, id)).limit(1)
if (existingTemplate.length === 0) {
logger.warn(`[${requestId}] Template not found for rejection: ${id}`)
return NextResponse.json({ error: 'Template not found' }, { status: 404 })
}
await db
.update(templates)
.set({ status: 'rejected', updatedAt: new Date() })
.where(eq(templates.id, id))
logger.info(`[${requestId}] Template rejected: ${id} by super user: ${session.user.id}`)
return NextResponse.json({
message: 'Template rejected successfully',
templateId: id,
})
} catch (error) {
logger.error(`[${requestId}] Error rejecting template ${id}`, error)
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -1,55 +0,0 @@
import { db } from '@sim/db'
import { templates } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { generateRequestId } from '@/lib/core/utils/request'
import { verifyEffectiveSuperUser } from '@/lib/templates/permissions'
const logger = createLogger('TemplateRejectionAPI')
export const revalidate = 0
/**
* POST /api/templates/[id]/reject - Reject a template (super users only)
*/
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const { id } = await params
try {
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized template rejection attempt for ID: ${id}`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
if (!effectiveSuperUser) {
logger.warn(`[${requestId}] Non-super user attempted to reject template: ${id}`)
return NextResponse.json({ error: 'Only super users can reject templates' }, { status: 403 })
}
const existingTemplate = await db.select().from(templates).where(eq(templates.id, id)).limit(1)
if (existingTemplate.length === 0) {
logger.warn(`[${requestId}] Template not found for rejection: ${id}`)
return NextResponse.json({ error: 'Template not found' }, { status: 404 })
}
await db
.update(templates)
.set({ status: 'rejected', updatedAt: new Date() })
.where(eq(templates.id, id))
logger.info(`[${requestId}] Template rejected: ${id} by super user: ${session.user.id}`)
return NextResponse.json({
message: 'Template rejected successfully',
templateId: id,
})
} catch (error) {
logger.error(`[${requestId}] Error rejecting template ${id}`, error)
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -106,6 +106,7 @@ const updateTemplateSchema = z.object({
creatorId: z.string().optional(), // Creator profile ID
tags: z.array(z.string()).max(10, 'Maximum 10 tags allowed').optional(),
updateState: z.boolean().optional(), // Explicitly request state update from current workflow
status: z.enum(['approved', 'rejected', 'pending']).optional(), // Status change (super users only)
})
// PUT /api/templates/[id] - Update a template
@@ -131,7 +132,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
)
}
const { name, details, creatorId, tags, updateState } = validationResult.data
const { name, details, creatorId, tags, updateState, status } = validationResult.data
const existingTemplate = await db.select().from(templates).where(eq(templates.id, id)).limit(1)
@@ -142,21 +143,44 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
const template = existingTemplate[0]
if (!template.creatorId) {
logger.warn(`[${requestId}] Template ${id} has no creator, denying update`)
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
// Status changes require super user permission
if (status !== undefined) {
const { verifyEffectiveSuperUser } = await import('@/lib/templates/permissions')
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
if (!effectiveSuperUser) {
logger.warn(`[${requestId}] Non-super user attempted to change template status: ${id}`)
return NextResponse.json(
{ error: 'Only super users can change template status' },
{ status: 403 }
)
}
}
const { verifyCreatorPermission } = await import('@/lib/templates/permissions')
const { hasPermission, error: permissionError } = await verifyCreatorPermission(
session.user.id,
template.creatorId,
'admin'
)
// For non-status updates, verify creator permission
const hasNonStatusUpdates =
name !== undefined ||
details !== undefined ||
creatorId !== undefined ||
tags !== undefined ||
updateState
if (!hasPermission) {
logger.warn(`[${requestId}] User denied permission to update template ${id}`)
return NextResponse.json({ error: permissionError || 'Access denied' }, { status: 403 })
if (hasNonStatusUpdates) {
if (!template.creatorId) {
logger.warn(`[${requestId}] Template ${id} has no creator, denying update`)
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
const { verifyCreatorPermission } = await import('@/lib/templates/permissions')
const { hasPermission, error: permissionError } = await verifyCreatorPermission(
session.user.id,
template.creatorId,
'admin'
)
if (!hasPermission) {
logger.warn(`[${requestId}] User denied permission to update template ${id}`)
return NextResponse.json({ error: permissionError || 'Access denied' }, { status: 403 })
}
}
const updateData: any = {
@@ -167,6 +191,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
if (details !== undefined) updateData.details = details
if (tags !== undefined) updateData.tags = tags
if (creatorId !== undefined) updateData.creatorId = creatorId
if (status !== undefined) updateData.status = status
if (updateState && template.workflowId) {
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')

View File

@@ -1,190 +0,0 @@
import { db, workflowDeploymentVersion } from '@sim/db'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { generateRequestId } from '@/lib/core/utils/request'
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
import {
cleanupDeploymentVersion,
createSchedulesForDeploy,
validateWorkflowSchedules,
} from '@/lib/workflows/schedules'
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
import type { BlockState } from '@/stores/workflows/workflow/types'
const logger = createLogger('WorkflowActivateDeploymentAPI')
export const dynamic = 'force-dynamic'
export const runtime = 'nodejs'
export async function POST(
request: NextRequest,
{ params }: { params: Promise<{ id: string; version: string }> }
) {
const requestId = generateRequestId()
const { id, version } = await params
try {
const {
error,
session,
workflow: workflowData,
} = await validateWorkflowPermissions(id, requestId, 'admin')
if (error) {
return createErrorResponse(error.message, error.status)
}
const actorUserId = session?.user?.id
if (!actorUserId) {
logger.warn(`[${requestId}] Unable to resolve actor user for deployment activation: ${id}`)
return createErrorResponse('Unable to determine activating user', 400)
}
const versionNum = Number(version)
if (!Number.isFinite(versionNum)) {
return createErrorResponse('Invalid version number', 400)
}
const [versionRow] = await db
.select({
id: workflowDeploymentVersion.id,
state: workflowDeploymentVersion.state,
})
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, id),
eq(workflowDeploymentVersion.version, versionNum)
)
)
.limit(1)
if (!versionRow?.state) {
return createErrorResponse('Deployment version not found', 404)
}
const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const previousVersionId = currentActiveVersion?.id
const deployedState = versionRow.state as { blocks?: Record<string, BlockState> }
const blocks = deployedState.blocks
if (!blocks || typeof blocks !== 'object') {
return createErrorResponse('Invalid deployed state structure', 500)
}
const scheduleValidation = validateWorkflowSchedules(blocks)
if (!scheduleValidation.isValid) {
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
}
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId: id,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
blocks,
requestId,
deploymentVersionId: versionRow.id,
previousVersionId,
forceRecreateSubscriptions: true,
})
if (!triggerSaveResult.success) {
return createErrorResponse(
triggerSaveResult.error?.message || 'Failed to sync trigger configuration',
triggerSaveResult.error?.status || 500
)
}
const scheduleResult = await createSchedulesForDeploy(id, blocks, db, versionRow.id)
if (!scheduleResult.success) {
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
previousVersionId,
requestId,
})
}
return createErrorResponse(scheduleResult.error || 'Failed to sync schedules', 500)
}
const result = await activateWorkflowVersion({ workflowId: id, version: versionNum })
if (!result.success) {
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
previousVersionId,
requestId,
})
}
return createErrorResponse(result.error || 'Failed to activate deployment', 400)
}
if (previousVersionId && previousVersionId !== versionRow.id) {
try {
logger.info(
`[${requestId}] Cleaning up previous version ${previousVersionId} webhooks/schedules`
)
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
logger.info(`[${requestId}] Previous version cleanup completed`)
} catch (cleanupError) {
logger.error(
`[${requestId}] Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
}
}
await syncMcpToolsForWorkflow({
workflowId: id,
requestId,
state: versionRow.state,
context: 'activate',
})
return createSuccessResponse({
success: true,
deployedAt: result.deployedAt,
warnings: triggerSaveResult.warnings,
})
} catch (error: any) {
logger.error(`[${requestId}] Error activating deployment for workflow: ${id}`, error)
return createErrorResponse(error.message || 'Failed to activate deployment', 500)
}
}

View File

@@ -4,8 +4,17 @@ import { and, eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { z } from 'zod'
import { generateRequestId } from '@/lib/core/utils/request'
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
import {
cleanupDeploymentVersion,
createSchedulesForDeploy,
validateWorkflowSchedules,
} from '@/lib/workflows/schedules'
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
import type { BlockState } from '@/stores/workflows/workflow/types'
const logger = createLogger('WorkflowDeploymentVersionAPI')
@@ -23,10 +32,14 @@ const patchBodySchema = z
.max(500, 'Description must be 500 characters or less')
.nullable()
.optional(),
isActive: z.literal(true).optional(), // Set to true to activate this version
})
.refine((data) => data.name !== undefined || data.description !== undefined, {
message: 'At least one of name or description must be provided',
})
.refine(
(data) => data.name !== undefined || data.description !== undefined || data.isActive === true,
{
message: 'At least one of name, description, or isActive must be provided',
}
)
export const dynamic = 'force-dynamic'
export const runtime = 'nodejs'
@@ -82,7 +95,22 @@ export async function PATCH(
const { id, version } = await params
try {
const { error } = await validateWorkflowPermissions(id, requestId, 'write')
const body = await request.json()
const validation = patchBodySchema.safeParse(body)
if (!validation.success) {
return createErrorResponse(validation.error.errors[0]?.message || 'Invalid request body', 400)
}
const { name, description, isActive } = validation.data
// Activation requires admin permission, other updates require write
const requiredPermission = isActive ? 'admin' : 'write'
const {
error,
session,
workflow: workflowData,
} = await validateWorkflowPermissions(id, requestId, requiredPermission)
if (error) {
return createErrorResponse(error.message, error.status)
}
@@ -92,15 +120,193 @@ export async function PATCH(
return createErrorResponse('Invalid version', 400)
}
const body = await request.json()
const validation = patchBodySchema.safeParse(body)
// Handle activation
if (isActive) {
const actorUserId = session?.user?.id
if (!actorUserId) {
logger.warn(`[${requestId}] Unable to resolve actor user for deployment activation: ${id}`)
return createErrorResponse('Unable to determine activating user', 400)
}
if (!validation.success) {
return createErrorResponse(validation.error.errors[0]?.message || 'Invalid request body', 400)
const [versionRow] = await db
.select({
id: workflowDeploymentVersion.id,
state: workflowDeploymentVersion.state,
})
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, id),
eq(workflowDeploymentVersion.version, versionNum)
)
)
.limit(1)
if (!versionRow?.state) {
return createErrorResponse('Deployment version not found', 404)
}
const [currentActiveVersion] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, id),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const previousVersionId = currentActiveVersion?.id
const deployedState = versionRow.state as { blocks?: Record<string, BlockState> }
const blocks = deployedState.blocks
if (!blocks || typeof blocks !== 'object') {
return createErrorResponse('Invalid deployed state structure', 500)
}
const scheduleValidation = validateWorkflowSchedules(blocks)
if (!scheduleValidation.isValid) {
return createErrorResponse(
`Invalid schedule configuration: ${scheduleValidation.error}`,
400
)
}
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
request,
workflowId: id,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
blocks,
requestId,
deploymentVersionId: versionRow.id,
previousVersionId,
forceRecreateSubscriptions: true,
})
if (!triggerSaveResult.success) {
return createErrorResponse(
triggerSaveResult.error?.message || 'Failed to sync trigger configuration',
triggerSaveResult.error?.status || 500
)
}
const scheduleResult = await createSchedulesForDeploy(id, blocks, db, versionRow.id)
if (!scheduleResult.success) {
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
previousVersionId,
requestId,
})
}
return createErrorResponse(scheduleResult.error || 'Failed to sync schedules', 500)
}
const result = await activateWorkflowVersion({ workflowId: id, version: versionNum })
if (!result.success) {
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: versionRow.id,
})
if (previousVersionId) {
await restorePreviousVersionWebhooks({
request,
workflow: workflowData as Record<string, unknown>,
userId: actorUserId,
previousVersionId,
requestId,
})
}
return createErrorResponse(result.error || 'Failed to activate deployment', 400)
}
if (previousVersionId && previousVersionId !== versionRow.id) {
try {
logger.info(
`[${requestId}] Cleaning up previous version ${previousVersionId} webhooks/schedules`
)
await cleanupDeploymentVersion({
workflowId: id,
workflow: workflowData as Record<string, unknown>,
requestId,
deploymentVersionId: previousVersionId,
skipExternalCleanup: true,
})
logger.info(`[${requestId}] Previous version cleanup completed`)
} catch (cleanupError) {
logger.error(
`[${requestId}] Failed to clean up previous version ${previousVersionId}`,
cleanupError
)
}
}
await syncMcpToolsForWorkflow({
workflowId: id,
requestId,
state: versionRow.state,
context: 'activate',
})
// Apply name/description updates if provided alongside activation
let updatedName: string | null | undefined
let updatedDescription: string | null | undefined
if (name !== undefined || description !== undefined) {
const activationUpdateData: { name?: string; description?: string | null } = {}
if (name !== undefined) {
activationUpdateData.name = name
}
if (description !== undefined) {
activationUpdateData.description = description
}
const [updated] = await db
.update(workflowDeploymentVersion)
.set(activationUpdateData)
.where(
and(
eq(workflowDeploymentVersion.workflowId, id),
eq(workflowDeploymentVersion.version, versionNum)
)
)
.returning({
name: workflowDeploymentVersion.name,
description: workflowDeploymentVersion.description,
})
if (updated) {
updatedName = updated.name
updatedDescription = updated.description
logger.info(
`[${requestId}] Updated deployment version ${version} metadata during activation`,
{ name: activationUpdateData.name, description: activationUpdateData.description }
)
}
}
return createSuccessResponse({
success: true,
deployedAt: result.deployedAt,
warnings: triggerSaveResult.warnings,
...(updatedName !== undefined && { name: updatedName }),
...(updatedDescription !== undefined && { description: updatedDescription }),
})
}
const { name, description } = validation.data
// Handle name/description updates
const updateData: { name?: string; description?: string | null } = {}
if (name !== undefined) {
updateData.name = name

View File

@@ -1,286 +0,0 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { getTimeoutErrorMessage, isTimeoutError } from '@/lib/core/execution-limits'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import { hasExecutionResult } from '@/executor/utils/errors'
const logger = createLogger('ExecuteFromBlockAPI')
const ExecuteFromBlockSchema = z.object({
startBlockId: z.string().min(1, 'Start block ID is required'),
sourceSnapshot: z.object({
blockStates: z.record(z.any()),
executedBlocks: z.array(z.string()),
blockLogs: z.array(z.any()),
decisions: z.object({
router: z.record(z.string()),
condition: z.record(z.string()),
}),
completedLoops: z.array(z.string()),
loopExecutions: z.record(z.any()).optional(),
parallelExecutions: z.record(z.any()).optional(),
parallelBlockMapping: z.record(z.any()).optional(),
activeExecutionPath: z.array(z.string()),
}),
input: z.any().optional(),
})
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const { id: workflowId } = await params
try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId
let body: unknown
try {
body = await req.json()
} catch {
return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 })
}
const validation = ExecuteFromBlockSchema.safeParse(body)
if (!validation.success) {
logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors)
return NextResponse.json(
{
error: 'Invalid request body',
details: validation.error.errors.map((e) => ({
path: e.path.join('.'),
message: e.message,
})),
},
{ status: 400 }
)
}
const { startBlockId, sourceSnapshot, input } = validation.data
const executionId = uuidv4()
// Run preprocessing checks (billing, rate limits, usage limits)
const preprocessResult = await preprocessExecution({
workflowId,
userId,
triggerType: 'manual',
executionId,
requestId,
checkRateLimit: false, // Manual executions don't rate limit
checkDeployment: false, // Run-from-block doesn't require deployment
})
if (!preprocessResult.success) {
const { error } = preprocessResult
logger.warn(`[${requestId}] Preprocessing failed for run-from-block`, {
workflowId,
error: error?.message,
statusCode: error?.statusCode,
})
return NextResponse.json(
{ error: error?.message || 'Execution blocked' },
{ status: error?.statusCode || 500 }
)
}
const workflowRecord = preprocessResult.workflowRecord
if (!workflowRecord?.workspaceId) {
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
}
const workspaceId = workflowRecord.workspaceId
const workflowUserId = workflowRecord.userId
logger.info(`[${requestId}] Starting run-from-block execution`, {
workflowId,
startBlockId,
executedBlocksCount: sourceSnapshot.executedBlocks.length,
billingActorUserId: preprocessResult.actorUserId,
})
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
const abortController = new AbortController()
let isStreamClosed = false
let isTimedOut = false
const syncTimeout = preprocessResult.executionTimeout?.sync
let timeoutId: NodeJS.Timeout | undefined
if (syncTimeout) {
timeoutId = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, syncTimeout)
}
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const { sendEvent, onBlockStart, onBlockComplete, onStream } = createSSECallbacks({
executionId,
workflowId,
controller,
isStreamClosed: () => isStreamClosed,
setStreamClosed: () => {
isStreamClosed = true
},
})
const metadata: ExecutionMetadata = {
requestId,
workflowId,
userId,
executionId,
triggerType: 'manual',
workspaceId,
workflowUserId,
useDraftState: true,
isClientSession: true,
startTime: new Date().toISOString(),
}
const snapshot = new ExecutionSnapshot(metadata, {}, input || {}, {})
try {
const startTime = new Date()
sendEvent({
type: 'execution:started',
timestamp: startTime.toISOString(),
executionId,
workflowId,
data: { startTime: startTime.toISOString() },
})
const result = await executeWorkflowCore({
snapshot,
loggingSession,
abortSignal: abortController.signal,
runFromBlock: {
startBlockId,
sourceSnapshot: sourceSnapshot as SerializableExecutionState,
},
callbacks: { onBlockStart, onBlockComplete, onStream },
})
if (result.status === 'cancelled') {
if (isTimedOut && syncTimeout) {
const timeoutErrorMessage = getTimeoutErrorMessage(null, syncTimeout)
logger.info(`[${requestId}] Run-from-block execution timed out`, {
timeoutMs: syncTimeout,
})
await loggingSession.markAsFailed(timeoutErrorMessage)
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: timeoutErrorMessage,
duration: result.metadata?.duration || 0,
},
})
} else {
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { duration: result.metadata?.duration || 0 },
})
}
} else {
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
}
} catch (error: unknown) {
const isTimeout = isTimeoutError(error) || isTimedOut
const errorMessage = isTimeout
? getTimeoutErrorMessage(error, syncTimeout)
: error instanceof Error
? error.message
: 'Unknown error'
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`, {
isTimeout,
})
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans, totalDuration } = executionResult
? buildTraceSpans(executionResult)
: { traceSpans: [], totalDuration: 0 }
await loggingSession.safeCompleteWithError({
totalDurationMs: totalDuration || executionResult?.metadata?.duration,
error: { message: errorMessage },
traceSpans,
})
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: executionResult?.error || errorMessage,
duration: executionResult?.metadata?.duration || 0,
},
})
} finally {
if (timeoutId) clearTimeout(timeoutId)
if (!isStreamClosed) {
try {
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
controller.close()
} catch {}
}
}
},
cancel() {
isStreamClosed = true
if (timeoutId) clearTimeout(timeoutId)
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})
},
})
return new NextResponse(stream, {
headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId },
})
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Failed to start run-from-block execution:`, error)
return NextResponse.json(
{ error: errorMessage || 'Failed to start execution' },
{ status: 500 }
)
}
}

View File

@@ -1,10 +1,9 @@
import { createLogger } from '@sim/logger'
import { tasks } from '@trigger.dev/sdk'
import { type NextRequest, NextResponse } from 'next/server'
import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import {
createTimeoutAbortController,
getTimeoutErrorMessage,
@@ -31,7 +30,7 @@ import {
} from '@/lib/workflows/persistence/utils'
import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
import type { WorkflowExecutionPayload } from '@/background/workflow-execution'
import { executeWorkflowJob, type WorkflowExecutionPayload } from '@/background/workflow-execution'
import { normalizeName } from '@/executor/constants'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
@@ -60,6 +59,25 @@ const ExecuteWorkflowSchema = z.object({
})
.optional(),
stopAfterBlockId: z.string().optional(),
runFromBlock: z
.object({
startBlockId: z.string().min(1, 'Start block ID is required'),
sourceSnapshot: z.object({
blockStates: z.record(z.any()),
executedBlocks: z.array(z.string()),
blockLogs: z.array(z.any()),
decisions: z.object({
router: z.record(z.string()),
condition: z.record(z.string()),
}),
completedLoops: z.array(z.string()),
loopExecutions: z.record(z.any()).optional(),
parallelExecutions: z.record(z.any()).optional(),
parallelBlockMapping: z.record(z.any()).optional(),
activeExecutionPath: z.array(z.string()),
}),
})
.optional(),
})
export const runtime = 'nodejs'
@@ -124,41 +142,66 @@ type AsyncExecutionParams = {
userId: string
input: any
triggerType: CoreTriggerType
executionId: string
}
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
const { requestId, workflowId, userId, input, triggerType } = params
if (!isTriggerDevEnabled) {
logger.warn(`[${requestId}] Async mode requested but TRIGGER_DEV_ENABLED is false`)
return NextResponse.json(
{ error: 'Async execution is not enabled. Set TRIGGER_DEV_ENABLED=true to use async mode.' },
{ status: 400 }
)
}
const { requestId, workflowId, userId, input, triggerType, executionId } = params
const payload: WorkflowExecutionPayload = {
workflowId,
userId,
input,
triggerType,
executionId,
}
try {
const handle = await tasks.trigger('workflow-execution', payload)
const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('workflow-execution', payload, {
metadata: { workflowId, userId },
})
logger.info(`[${requestId}] Queued async workflow execution`, {
workflowId,
jobId: handle.id,
jobId,
})
if (shouldExecuteInline()) {
void (async () => {
try {
await jobQueue.startJob(jobId)
const output = await executeWorkflowJob(payload)
await jobQueue.completeJob(jobId, output)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logger.error(`[${requestId}] Async workflow execution failed`, {
jobId,
error: errorMessage,
})
try {
await jobQueue.markJobFailed(jobId, errorMessage)
} catch (markFailedError) {
logger.error(`[${requestId}] Failed to mark job as failed`, {
jobId,
error:
markFailedError instanceof Error
? markFailedError.message
: String(markFailedError),
})
}
}
})()
}
return NextResponse.json(
{
success: true,
async: true,
jobId: handle.id,
jobId,
executionId,
message: 'Workflow execution queued',
statusUrl: `${getBaseUrl()}/api/jobs/${handle.id}`,
statusUrl: `${getBaseUrl()}/api/jobs/${jobId}`,
},
{ status: 202 }
)
@@ -226,6 +269,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId,
runFromBlock,
} = validation.data
// For API key and internal JWT auth, the entire body is the input (except for our control fields)
@@ -242,6 +286,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId: _stopAfterBlockId,
runFromBlock: _runFromBlock,
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
...rest
} = body
@@ -320,6 +365,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
userId: actorUserId,
input,
triggerType: loggingTriggerType,
executionId,
})
}
@@ -444,6 +490,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
runFromBlock,
abortSignal: timeoutController.signal,
})
@@ -492,6 +539,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const filteredResult = {
success: result.success,
executionId,
output: outputWithBase64,
error: result.error,
metadata: result.metadata
@@ -783,6 +831,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
runFromBlock,
})
if (result.status === 'paused') {

View File

@@ -508,8 +508,10 @@ export default function TemplateDetails({ isWorkspaceContext = false }: Template
setIsApproving(true)
try {
const response = await fetch(`/api/templates/${template.id}/approve`, {
method: 'POST',
const response = await fetch(`/api/templates/${template.id}`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ status: 'approved' }),
})
if (response.ok) {
@@ -531,8 +533,10 @@ export default function TemplateDetails({ isWorkspaceContext = false }: Template
setIsRejecting(true)
try {
const response = await fetch(`/api/templates/${template.id}/reject`, {
method: 'POST',
const response = await fetch(`/api/templates/${template.id}`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ status: 'rejected' }),
})
if (response.ok) {
@@ -554,10 +558,11 @@ export default function TemplateDetails({ isWorkspaceContext = false }: Template
setIsVerifying(true)
try {
const endpoint = `/api/creators/${template.creator.id}/verify`
const method = template.creator.verified ? 'DELETE' : 'POST'
const response = await fetch(endpoint, { method })
const response = await fetch(`/api/creators/${template.creator.id}`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ verified: !template.creator.verified }),
})
if (response.ok) {
// Refresh page to show updated verification status

View File

@@ -12,7 +12,6 @@ import {
Tooltip,
} from '@/components/emcn'
import { Skeleton } from '@/components/ui'
import { getEnv, isTruthy } from '@/lib/core/config/env'
import { OutputSelect } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select'
interface WorkflowDeploymentInfo {
@@ -78,7 +77,6 @@ export function ApiDeploy({
async: false,
})
const isAsyncEnabled = isTruthy(getEnv('NEXT_PUBLIC_TRIGGER_DEV_ENABLED'))
const info = deploymentInfo ? { ...deploymentInfo, needsRedeployment } : null
const getBaseEndpoint = () => {
@@ -272,7 +270,7 @@ response = requests.post(
)
job = response.json()
print(job) # Contains job_id for status checking`
print(job) # Contains jobId and executionId`
case 'javascript':
return `const response = await fetch("${endpoint}", {
@@ -286,7 +284,7 @@ print(job) # Contains job_id for status checking`
});
const job = await response.json();
console.log(job); // Contains job_id for status checking`
console.log(job); // Contains jobId and executionId`
case 'typescript':
return `const response = await fetch("${endpoint}", {
@@ -299,8 +297,8 @@ console.log(job); // Contains job_id for status checking`
body: JSON.stringify(${JSON.stringify(payload)})
});
const job: { job_id: string } = await response.json();
console.log(job); // Contains job_id for status checking`
const job: { jobId: string; executionId: string } = await response.json();
console.log(job); // Contains jobId and executionId`
default:
return ''
@@ -539,55 +537,49 @@ console.log(limits);`
/>
</div>
{isAsyncEnabled && (
<div>
<div className='mb-[6.5px] flex items-center justify-between'>
<Label className='block pl-[2px] font-medium text-[13px] text-[var(--text-primary)]'>
Run workflow (async)
</Label>
<div className='flex items-center gap-[6px]'>
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={() => handleCopy('async', getAsyncCommand())}
aria-label='Copy command'
className='!p-1.5 -my-1.5'
>
{copied.async ? (
<Check className='h-3 w-3' />
) : (
<Clipboard className='h-3 w-3' />
)}
</Button>
</Tooltip.Trigger>
<Tooltip.Content>
<span>{copied.async ? 'Copied' : 'Copy'}</span>
</Tooltip.Content>
</Tooltip.Root>
<Combobox
size='sm'
className='!w-fit !py-[2px] min-w-[100px] rounded-[6px] px-[9px]'
options={[
{ label: 'Execute Job', value: 'execute' },
{ label: 'Check Status', value: 'status' },
{ label: 'Rate Limits', value: 'rate-limits' },
]}
value={asyncExampleType}
onChange={(value) => setAsyncExampleType(value as AsyncExampleType)}
align='end'
dropdownWidth={160}
/>
</div>
<div>
<div className='mb-[6.5px] flex items-center justify-between'>
<Label className='block pl-[2px] font-medium text-[13px] text-[var(--text-primary)]'>
Run workflow (async)
</Label>
<div className='flex items-center gap-[6px]'>
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={() => handleCopy('async', getAsyncCommand())}
aria-label='Copy command'
className='!p-1.5 -my-1.5'
>
{copied.async ? <Check className='h-3 w-3' /> : <Clipboard className='h-3 w-3' />}
</Button>
</Tooltip.Trigger>
<Tooltip.Content>
<span>{copied.async ? 'Copied' : 'Copy'}</span>
</Tooltip.Content>
</Tooltip.Root>
<Combobox
size='sm'
className='!w-fit !py-[2px] min-w-[100px] rounded-[6px] px-[9px]'
options={[
{ label: 'Execute Job', value: 'execute' },
{ label: 'Check Status', value: 'status' },
{ label: 'Rate Limits', value: 'rate-limits' },
]}
value={asyncExampleType}
onChange={(value) => setAsyncExampleType(value as AsyncExampleType)}
align='end'
dropdownWidth={160}
/>
</div>
<Code.Viewer
code={getAsyncCommand()}
language={LANGUAGE_SYNTAX[language]}
wrapText
className='!min-h-0 rounded-[4px] border border-[var(--border-1)]'
/>
</div>
)}
<Code.Viewer
code={getAsyncCommand()}
language={LANGUAGE_SYNTAX[language]}
wrapText
className='!min-h-0 rounded-[4px] border border-[var(--border-1)]'
/>
</div>
</div>
)
}

View File

@@ -15,7 +15,7 @@ import type { PlanFeature } from '@/app/workspace/[workspaceId]/w/components/sid
export const PRO_PLAN_FEATURES: PlanFeature[] = [
{ icon: Zap, text: '150 runs per minute (sync)' },
{ icon: Clock, text: '1,000 runs per minute (async)' },
{ icon: Timer, text: '60 min sync execution limit' },
{ icon: Timer, text: '50 min sync execution limit' },
{ icon: HardDrive, text: '50GB file storage' },
{ icon: Users, text: 'Unlimited invites' },
{ icon: Database, text: 'Unlimited log retention' },
@@ -24,7 +24,7 @@ export const PRO_PLAN_FEATURES: PlanFeature[] = [
export const TEAM_PLAN_FEATURES: PlanFeature[] = [
{ icon: Zap, text: '300 runs per minute (sync)' },
{ icon: Clock, text: '2,500 runs per minute (async)' },
{ icon: Timer, text: '60 min sync execution limit' },
{ icon: Timer, text: '50 min sync execution limit' },
{ icon: HardDrive, text: '500GB file storage (pooled)' },
{ icon: Users, text: 'Unlimited invites' },
{ icon: Database, text: 'Unlimited log retention' },

View File

@@ -14,6 +14,7 @@ import { createLogger } from '@sim/logger'
import { useParams } from 'next/navigation'
import { io, type Socket } from 'socket.io-client'
import { getEnv } from '@/lib/core/config/env'
import { useOperationQueueStore } from '@/stores/operation-queue/store'
const logger = createLogger('SocketContext')
@@ -138,6 +139,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [authFailed, setAuthFailed] = useState(false)
const initializedRef = useRef(false)
const socketRef = useRef<Socket | null>(null)
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)
const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined
@@ -341,9 +343,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
})
socketInstance.on('join-workflow-error', ({ error }) => {
socketInstance.on('join-workflow-error', ({ error, code }) => {
isRejoiningRef.current = false
logger.error('Failed to join workflow:', error)
logger.error('Failed to join workflow:', { error, code })
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
triggerOfflineMode()
}
})
socketInstance.on('workflow-operation', (data) => {

View File

@@ -20,6 +20,7 @@ export type WorkflowExecutionPayload = {
userId: string
input?: any
triggerType?: CoreTriggerType
executionId?: string
metadata?: Record<string, any>
}
@@ -30,7 +31,7 @@ export type WorkflowExecutionPayload = {
*/
export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
const workflowId = payload.workflowId
const executionId = uuidv4()
const executionId = payload.executionId || uuidv4()
const requestId = executionId.slice(0, 8)
logger.info(`[${requestId}] Starting workflow execution job: ${workflowId}`, {

View File

@@ -143,11 +143,147 @@ export const MistralParseBlock: BlockConfig<MistralParserOutput> = {
},
}
/**
* V2 Block - Restored from main branch for backwards compatibility
* Hidden from toolbar, uses filePath subblock ID for advanced mode
*/
export const MistralParseV2Block: BlockConfig<MistralParserOutput> = {
...MistralParseBlock,
type: 'mistral_parse_v2',
name: 'Mistral Parser',
description: 'Extract text from PDF documents',
hideFromToolbar: true,
subBlocks: [
{
id: 'fileUpload',
title: 'PDF Document',
type: 'file-upload' as SubBlockType,
canonicalParamId: 'document',
acceptedTypes: 'application/pdf',
placeholder: 'Upload a PDF document',
mode: 'basic',
maxSize: 50,
},
{
id: 'filePath',
title: 'PDF Document',
type: 'short-input' as SubBlockType,
canonicalParamId: 'document',
placeholder: 'Document URL',
mode: 'advanced',
},
{
id: 'resultType',
title: 'Output Format',
type: 'dropdown',
options: [
{ id: 'markdown', label: 'Markdown' },
{ id: 'text', label: 'Plain Text' },
{ id: 'json', label: 'JSON' },
],
},
{
id: 'pages',
title: 'Specific Pages',
type: 'short-input',
placeholder: 'e.g. 0,1,2 (leave empty for all pages)',
},
{
id: 'apiKey',
title: 'API Key',
type: 'short-input' as SubBlockType,
placeholder: 'Enter your Mistral API key',
password: true,
required: true,
},
],
tools: {
access: ['mistral_parser_v2'],
config: {
tool: createVersionedToolSelector({
baseToolSelector: () => 'mistral_parser',
suffix: '_v2',
fallbackToolId: 'mistral_parser_v2',
}),
params: (params) => {
if (!params || !params.apiKey || params.apiKey.trim() === '') {
throw new Error('Mistral API key is required')
}
const parameters: Record<string, unknown> = {
apiKey: params.apiKey.trim(),
resultType: params.resultType || 'markdown',
}
// Original V2 pattern: fileUpload (basic) or filePath (advanced) or document (wired)
const documentInput = params.fileUpload || params.filePath || params.document
if (!documentInput) {
throw new Error('PDF document is required')
}
// Smart handling: object → fileUpload param, string → filePath param
if (typeof documentInput === 'object') {
parameters.fileUpload = documentInput
} else if (typeof documentInput === 'string') {
parameters.filePath = documentInput.trim()
}
let pagesArray: number[] | undefined
if (params.pages && params.pages.trim() !== '') {
try {
pagesArray = params.pages
.split(',')
.map((p: string) => p.trim())
.filter((p: string) => p.length > 0)
.map((p: string) => {
const num = Number.parseInt(p, 10)
if (Number.isNaN(num) || num < 0) {
throw new Error(`Invalid page number: ${p}`)
}
return num
})
if (pagesArray && pagesArray.length === 0) {
pagesArray = undefined
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error)
throw new Error(`Page number format error: ${errorMessage}`)
}
}
if (pagesArray && pagesArray.length > 0) {
parameters.pages = pagesArray
}
return parameters
},
},
},
inputs: {
document: { type: 'json', description: 'Document input (file upload or URL reference)' },
filePath: { type: 'string', description: 'PDF document URL (advanced mode)' },
fileUpload: { type: 'json', description: 'Uploaded PDF file (basic mode)' },
apiKey: { type: 'string', description: 'Mistral API key' },
resultType: { type: 'string', description: 'Output format type' },
pages: { type: 'string', description: 'Page selection' },
},
outputs: {
pages: { type: 'array', description: 'Array of page objects from Mistral OCR' },
model: { type: 'string', description: 'Mistral OCR model identifier' },
usage_info: { type: 'json', description: 'Usage statistics from the API' },
document_annotation: { type: 'string', description: 'Structured annotation data' },
},
}
/**
* V3 Block - New file handling pattern with UserFile normalization
* Uses fileReference subblock ID with canonicalParamId for proper file handling
*/
export const MistralParseV3Block: BlockConfig<MistralParserOutput> = {
...MistralParseBlock,
type: 'mistral_parse_v3',
name: 'Mistral Parser',
description: 'Extract text from PDF documents',
hideFromToolbar: false,
subBlocks: [
{
@@ -196,13 +332,9 @@ export const MistralParseV2Block: BlockConfig<MistralParserOutput> = {
},
],
tools: {
access: ['mistral_parser_v2'],
access: ['mistral_parser_v3'],
config: {
tool: createVersionedToolSelector({
baseToolSelector: () => 'mistral_parser',
suffix: '_v2',
fallbackToolId: 'mistral_parser_v2',
}),
tool: () => 'mistral_parser_v3',
params: (params) => {
if (!params || !params.apiKey || params.apiKey.trim() === '') {
throw new Error('Mistral API key is required')
@@ -213,6 +345,7 @@ export const MistralParseV2Block: BlockConfig<MistralParserOutput> = {
resultType: params.resultType || 'markdown',
}
// V3 pattern: normalize file inputs from basic/advanced modes
const documentInput = normalizeFileInput(
params.fileUpload || params.fileReference || params.document,
{ single: true }

View File

@@ -79,7 +79,11 @@ import { MemoryBlock } from '@/blocks/blocks/memory'
import { MicrosoftExcelBlock, MicrosoftExcelV2Block } from '@/blocks/blocks/microsoft_excel'
import { MicrosoftPlannerBlock } from '@/blocks/blocks/microsoft_planner'
import { MicrosoftTeamsBlock } from '@/blocks/blocks/microsoft_teams'
import { MistralParseBlock, MistralParseV2Block } from '@/blocks/blocks/mistral_parse'
import {
MistralParseBlock,
MistralParseV2Block,
MistralParseV3Block,
} from '@/blocks/blocks/mistral_parse'
import { MongoDBBlock } from '@/blocks/blocks/mongodb'
import { MySQLBlock } from '@/blocks/blocks/mysql'
import { Neo4jBlock } from '@/blocks/blocks/neo4j'
@@ -255,6 +259,7 @@ export const registry: Record<string, BlockConfig> = {
microsoft_teams: MicrosoftTeamsBlock,
mistral_parse: MistralParseBlock,
mistral_parse_v2: MistralParseV2Block,
mistral_parse_v3: MistralParseV3Block,
mongodb: MongoDBBlock,
mysql: MySQLBlock,
neo4j: Neo4jBlock,

View File

@@ -549,11 +549,12 @@ export function useActivateDeploymentVersion() {
workflowId,
version,
}: ActivateVersionVariables): Promise<ActivateVersionResult> => {
const response = await fetch(`/api/workflows/${workflowId}/deployments/${version}/activate`, {
method: 'POST',
const response = await fetch(`/api/workflows/${workflowId}/deployments/${version}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ isActive: true }),
})
if (!response.ok) {

View File

@@ -211,12 +211,16 @@ export function useExecutionStream() {
currentExecutionRef.current = null
try {
const response = await fetch(`/api/workflows/${workflowId}/execute-from-block`, {
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ startBlockId, sourceSnapshot, input }),
body: JSON.stringify({
stream: true,
input,
runFromBlock: { startBlockId, sourceSnapshot },
}),
signal: abortController.signal,
})

View File

@@ -0,0 +1,113 @@
import { asyncJobs, db } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq, sql } from 'drizzle-orm'
import {
type EnqueueOptions,
JOB_STATUS,
type Job,
type JobMetadata,
type JobQueueBackend,
type JobStatus,
type JobType,
} from '@/lib/core/async-jobs/types'
const logger = createLogger('DatabaseJobQueue')
type AsyncJobRow = typeof asyncJobs.$inferSelect
function rowToJob(row: AsyncJobRow): Job {
return {
id: row.id,
type: row.type as JobType,
payload: row.payload,
status: row.status as JobStatus,
createdAt: row.createdAt,
startedAt: row.startedAt ?? undefined,
completedAt: row.completedAt ?? undefined,
attempts: row.attempts,
maxAttempts: row.maxAttempts,
error: row.error ?? undefined,
output: row.output as unknown,
metadata: (row.metadata ?? {}) as JobMetadata,
}
}
export class DatabaseJobQueue implements JobQueueBackend {
async enqueue<TPayload>(
type: JobType,
payload: TPayload,
options?: EnqueueOptions
): Promise<string> {
const jobId = `run_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`
const now = new Date()
await db.insert(asyncJobs).values({
id: jobId,
type,
payload: payload as Record<string, unknown>,
status: JOB_STATUS.PENDING,
createdAt: now,
attempts: 0,
maxAttempts: options?.maxAttempts ?? 3,
metadata: (options?.metadata ?? {}) as Record<string, unknown>,
updatedAt: now,
})
logger.debug('Enqueued job', { jobId, type })
return jobId
}
async getJob(jobId: string): Promise<Job | null> {
const [row] = await db.select().from(asyncJobs).where(eq(asyncJobs.id, jobId)).limit(1)
return row ? rowToJob(row) : null
}
async startJob(jobId: string): Promise<void> {
const now = new Date()
await db
.update(asyncJobs)
.set({
status: JOB_STATUS.PROCESSING,
startedAt: now,
attempts: sql`${asyncJobs.attempts} + 1`,
updatedAt: now,
})
.where(eq(asyncJobs.id, jobId))
logger.debug('Started job', { jobId })
}
async completeJob(jobId: string, output: unknown): Promise<void> {
const now = new Date()
await db
.update(asyncJobs)
.set({
status: JOB_STATUS.COMPLETED,
completedAt: now,
output: output as Record<string, unknown>,
updatedAt: now,
})
.where(eq(asyncJobs.id, jobId))
logger.debug('Completed job', { jobId })
}
async markJobFailed(jobId: string, error: string): Promise<void> {
const now = new Date()
await db
.update(asyncJobs)
.set({
status: JOB_STATUS.FAILED,
completedAt: now,
error,
updatedAt: now,
})
.where(eq(asyncJobs.id, jobId))
logger.debug('Marked job as failed', { jobId })
}
}

View File

@@ -0,0 +1,3 @@
export { DatabaseJobQueue } from './database'
export { RedisJobQueue } from './redis'
export { TriggerDevJobQueue } from './trigger-dev'

View File

@@ -0,0 +1,176 @@
/**
* @vitest-environment node
*/
import { createMockRedis, loggerMock, type MockRedis } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'
vi.mock('@sim/logger', () => loggerMock)
import {
JOB_MAX_LIFETIME_SECONDS,
JOB_RETENTION_SECONDS,
JOB_STATUS,
} from '@/lib/core/async-jobs/types'
import { RedisJobQueue } from './redis'
describe('RedisJobQueue', () => {
let mockRedis: MockRedis
let queue: RedisJobQueue
beforeEach(() => {
vi.clearAllMocks()
mockRedis = createMockRedis()
queue = new RedisJobQueue(mockRedis as never)
})
describe('enqueue', () => {
it.concurrent('should create a job with pending status', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
const jobId = await localQueue.enqueue('workflow-execution', { test: 'data' })
expect(jobId).toMatch(/^run_/)
expect(localRedis.hset).toHaveBeenCalledTimes(1)
const [key, data] = localRedis.hset.mock.calls[0]
expect(key).toBe(`async-jobs:job:${jobId}`)
expect(data.status).toBe(JOB_STATUS.PENDING)
expect(data.type).toBe('workflow-execution')
})
it.concurrent('should set max lifetime TTL on enqueue', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
const jobId = await localQueue.enqueue('workflow-execution', { test: 'data' })
expect(localRedis.expire).toHaveBeenCalledWith(
`async-jobs:job:${jobId}`,
JOB_MAX_LIFETIME_SECONDS
)
})
})
describe('completeJob', () => {
it.concurrent('should set status to completed and set TTL', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
const jobId = 'run_test123'
await localQueue.completeJob(jobId, { result: 'success' })
expect(localRedis.hset).toHaveBeenCalledWith(`async-jobs:job:${jobId}`, {
status: JOB_STATUS.COMPLETED,
completedAt: expect.any(String),
output: JSON.stringify({ result: 'success' }),
updatedAt: expect.any(String),
})
expect(localRedis.expire).toHaveBeenCalledWith(
`async-jobs:job:${jobId}`,
JOB_RETENTION_SECONDS
)
})
it.concurrent('should set TTL to 24 hours (86400 seconds)', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
await localQueue.completeJob('run_test123', {})
expect(localRedis.expire).toHaveBeenCalledWith(expect.any(String), 86400)
})
})
describe('markJobFailed', () => {
it.concurrent('should set status to failed and set TTL', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
const jobId = 'run_test456'
const error = 'Something went wrong'
await localQueue.markJobFailed(jobId, error)
expect(localRedis.hset).toHaveBeenCalledWith(`async-jobs:job:${jobId}`, {
status: JOB_STATUS.FAILED,
completedAt: expect.any(String),
error,
updatedAt: expect.any(String),
})
expect(localRedis.expire).toHaveBeenCalledWith(
`async-jobs:job:${jobId}`,
JOB_RETENTION_SECONDS
)
})
it.concurrent('should set TTL to 24 hours (86400 seconds)', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
await localQueue.markJobFailed('run_test456', 'error')
expect(localRedis.expire).toHaveBeenCalledWith(expect.any(String), 86400)
})
})
describe('startJob', () => {
it.concurrent('should not set TTL when starting a job', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
await localQueue.startJob('run_test789')
expect(localRedis.hset).toHaveBeenCalled()
expect(localRedis.expire).not.toHaveBeenCalled()
})
})
describe('getJob', () => {
it.concurrent('should return null for non-existent job', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
localRedis.hgetall.mockResolvedValue({})
const job = await localQueue.getJob('run_nonexistent')
expect(job).toBeNull()
})
it.concurrent('should deserialize job data correctly', async () => {
const localRedis = createMockRedis()
const localQueue = new RedisJobQueue(localRedis as never)
const now = new Date()
localRedis.hgetall.mockResolvedValue({
id: 'run_test',
type: 'workflow-execution',
payload: JSON.stringify({ foo: 'bar' }),
status: JOB_STATUS.COMPLETED,
createdAt: now.toISOString(),
startedAt: now.toISOString(),
completedAt: now.toISOString(),
attempts: '1',
maxAttempts: '3',
error: '',
output: JSON.stringify({ result: 'ok' }),
metadata: JSON.stringify({ workflowId: 'wf_123' }),
})
const job = await localQueue.getJob('run_test')
expect(job).not.toBeNull()
expect(job?.id).toBe('run_test')
expect(job?.type).toBe('workflow-execution')
expect(job?.payload).toEqual({ foo: 'bar' })
expect(job?.status).toBe(JOB_STATUS.COMPLETED)
expect(job?.output).toEqual({ result: 'ok' })
expect(job?.metadata.workflowId).toBe('wf_123')
})
})
})
describe('JOB_RETENTION_SECONDS', () => {
it.concurrent('should be 24 hours in seconds', async () => {
expect(JOB_RETENTION_SECONDS).toBe(24 * 60 * 60)
expect(JOB_RETENTION_SECONDS).toBe(86400)
})
})

View File

@@ -0,0 +1,146 @@
import { createLogger } from '@sim/logger'
import type Redis from 'ioredis'
import {
type EnqueueOptions,
JOB_MAX_LIFETIME_SECONDS,
JOB_RETENTION_SECONDS,
JOB_STATUS,
type Job,
type JobMetadata,
type JobQueueBackend,
type JobStatus,
type JobType,
} from '@/lib/core/async-jobs/types'
const logger = createLogger('RedisJobQueue')
const KEYS = {
job: (id: string) => `async-jobs:job:${id}`,
} as const
function serializeJob(job: Job): Record<string, string> {
return {
id: job.id,
type: job.type,
payload: JSON.stringify(job.payload),
status: job.status,
createdAt: job.createdAt.toISOString(),
startedAt: job.startedAt?.toISOString() ?? '',
completedAt: job.completedAt?.toISOString() ?? '',
attempts: job.attempts.toString(),
maxAttempts: job.maxAttempts.toString(),
error: job.error ?? '',
output: job.output !== undefined ? JSON.stringify(job.output) : '',
metadata: JSON.stringify(job.metadata),
updatedAt: new Date().toISOString(),
}
}
function deserializeJob(data: Record<string, string>): Job | null {
if (!data || !data.id) return null
try {
return {
id: data.id,
type: data.type as JobType,
payload: JSON.parse(data.payload),
status: data.status as JobStatus,
createdAt: new Date(data.createdAt),
startedAt: data.startedAt ? new Date(data.startedAt) : undefined,
completedAt: data.completedAt ? new Date(data.completedAt) : undefined,
attempts: Number.parseInt(data.attempts, 10),
maxAttempts: Number.parseInt(data.maxAttempts, 10),
error: data.error || undefined,
output: data.output ? JSON.parse(data.output) : undefined,
metadata: JSON.parse(data.metadata) as JobMetadata,
}
} catch (error) {
logger.error('Failed to deserialize job', { error, data })
return null
}
}
export class RedisJobQueue implements JobQueueBackend {
private redis: Redis
constructor(redis: Redis) {
this.redis = redis
}
async enqueue<TPayload>(
type: JobType,
payload: TPayload,
options?: EnqueueOptions
): Promise<string> {
const jobId = `run_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`
const now = new Date()
const job: Job<TPayload> = {
id: jobId,
type,
payload,
status: JOB_STATUS.PENDING,
createdAt: now,
attempts: 0,
maxAttempts: options?.maxAttempts ?? 3,
metadata: options?.metadata ?? {},
}
const key = KEYS.job(jobId)
const serialized = serializeJob(job as Job)
await this.redis.hset(key, serialized)
await this.redis.expire(key, JOB_MAX_LIFETIME_SECONDS)
logger.debug('Enqueued job', { jobId, type })
return jobId
}
async getJob(jobId: string): Promise<Job | null> {
const data = await this.redis.hgetall(KEYS.job(jobId))
return deserializeJob(data)
}
async startJob(jobId: string): Promise<void> {
const now = new Date()
const key = KEYS.job(jobId)
await this.redis.hset(key, {
status: JOB_STATUS.PROCESSING,
startedAt: now.toISOString(),
updatedAt: now.toISOString(),
})
await this.redis.hincrby(key, 'attempts', 1)
logger.debug('Started job', { jobId })
}
async completeJob(jobId: string, output: unknown): Promise<void> {
const now = new Date()
const key = KEYS.job(jobId)
await this.redis.hset(key, {
status: JOB_STATUS.COMPLETED,
completedAt: now.toISOString(),
output: JSON.stringify(output),
updatedAt: now.toISOString(),
})
await this.redis.expire(key, JOB_RETENTION_SECONDS)
logger.debug('Completed job', { jobId })
}
async markJobFailed(jobId: string, error: string): Promise<void> {
const now = new Date()
const key = KEYS.job(jobId)
await this.redis.hset(key, {
status: JOB_STATUS.FAILED,
completedAt: now.toISOString(),
error,
updatedAt: now.toISOString(),
})
await this.redis.expire(key, JOB_RETENTION_SECONDS)
logger.debug('Marked job as failed', { jobId })
}
}

View File

@@ -0,0 +1,119 @@
import { createLogger } from '@sim/logger'
import { runs, tasks } from '@trigger.dev/sdk'
import {
type EnqueueOptions,
JOB_STATUS,
type Job,
type JobMetadata,
type JobQueueBackend,
type JobStatus,
type JobType,
} from '@/lib/core/async-jobs/types'
const logger = createLogger('TriggerDevJobQueue')
/**
* Maps trigger.dev task IDs to our JobType
*/
const JOB_TYPE_TO_TASK_ID: Record<JobType, string> = {
'workflow-execution': 'workflow-execution',
'schedule-execution': 'schedule-execution',
'webhook-execution': 'webhook-execution',
}
/**
* Maps trigger.dev run status to our JobStatus
*/
function mapTriggerDevStatus(status: string): JobStatus {
switch (status) {
case 'QUEUED':
case 'WAITING_FOR_DEPLOY':
return JOB_STATUS.PENDING
case 'EXECUTING':
case 'RESCHEDULED':
case 'FROZEN':
return JOB_STATUS.PROCESSING
case 'COMPLETED':
return JOB_STATUS.COMPLETED
case 'CANCELED':
case 'FAILED':
case 'CRASHED':
case 'INTERRUPTED':
case 'SYSTEM_FAILURE':
case 'EXPIRED':
return JOB_STATUS.FAILED
default:
return JOB_STATUS.PENDING
}
}
/**
* Adapter that wraps the trigger.dev SDK to conform to JobQueueBackend interface.
*/
export class TriggerDevJobQueue implements JobQueueBackend {
async enqueue<TPayload>(
type: JobType,
payload: TPayload,
options?: EnqueueOptions
): Promise<string> {
const taskId = JOB_TYPE_TO_TASK_ID[type]
if (!taskId) {
throw new Error(`Unknown job type: ${type}`)
}
const enrichedPayload =
options?.metadata && typeof payload === 'object' && payload !== null
? { ...payload, ...options.metadata }
: payload
const handle = await tasks.trigger(taskId, enrichedPayload)
logger.debug('Enqueued job via trigger.dev', { jobId: handle.id, type, taskId })
return handle.id
}
async getJob(jobId: string): Promise<Job | null> {
try {
const run = await runs.retrieve(jobId)
const payload = run.payload as Record<string, unknown>
const metadata: JobMetadata = {
workflowId: payload?.workflowId as string | undefined,
userId: payload?.userId as string | undefined,
}
return {
id: jobId,
type: run.taskIdentifier as JobType,
payload: run.payload,
status: mapTriggerDevStatus(run.status),
createdAt: run.createdAt ? new Date(run.createdAt) : new Date(),
startedAt: run.startedAt ? new Date(run.startedAt) : undefined,
completedAt: run.finishedAt ? new Date(run.finishedAt) : undefined,
attempts: run.attemptCount ?? 1,
maxAttempts: 3,
error: run.error?.message,
output: run.output as unknown,
metadata,
}
} catch (error) {
const isNotFound =
(error instanceof Error && error.message.toLowerCase().includes('not found')) ||
(error && typeof error === 'object' && 'status' in error && error.status === 404)
if (isNotFound) {
logger.debug('Job not found in trigger.dev', { jobId })
return null
}
logger.error('Failed to get job from trigger.dev', { jobId, error })
throw error
}
}
async startJob(_jobId: string): Promise<void> {}
async completeJob(_jobId: string, _output: unknown): Promise<void> {}
async markJobFailed(_jobId: string, _error: string): Promise<void> {}
}

View File

@@ -0,0 +1,88 @@
import { createLogger } from '@sim/logger'
import type { AsyncBackendType, JobQueueBackend } from '@/lib/core/async-jobs/types'
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getRedisClient } from '@/lib/core/config/redis'
const logger = createLogger('AsyncJobsConfig')
let cachedBackend: JobQueueBackend | null = null
let cachedBackendType: AsyncBackendType | null = null
/**
* Determines which async backend to use based on environment configuration.
* Follows the fallback chain: trigger.dev → redis → database
*/
export function getAsyncBackendType(): AsyncBackendType {
if (isTriggerDevEnabled) {
return 'trigger-dev'
}
const redis = getRedisClient()
if (redis) {
return 'redis'
}
return 'database'
}
/**
* Gets the job queue backend singleton.
* Creates the appropriate backend based on environment configuration.
*/
export async function getJobQueue(): Promise<JobQueueBackend> {
if (cachedBackend) {
return cachedBackend
}
const type = getAsyncBackendType()
switch (type) {
case 'trigger-dev': {
const { TriggerDevJobQueue } = await import('@/lib/core/async-jobs/backends/trigger-dev')
cachedBackend = new TriggerDevJobQueue()
break
}
case 'redis': {
const redis = getRedisClient()
if (!redis) {
throw new Error('Redis client not available but redis backend was selected')
}
const { RedisJobQueue } = await import('@/lib/core/async-jobs/backends/redis')
cachedBackend = new RedisJobQueue(redis)
break
}
case 'database': {
const { DatabaseJobQueue } = await import('@/lib/core/async-jobs/backends/database')
cachedBackend = new DatabaseJobQueue()
break
}
}
cachedBackendType = type
logger.info(`Async job backend initialized: ${type}`)
return cachedBackend
}
/**
* Gets the current backend type (for logging/debugging)
*/
export function getCurrentBackendType(): AsyncBackendType | null {
return cachedBackendType
}
/**
* Checks if jobs should be executed inline (fire-and-forget).
* For Redis/DB backends, we execute inline. Trigger.dev handles execution itself.
*/
export function shouldExecuteInline(): boolean {
return getAsyncBackendType() !== 'trigger-dev'
}
/**
* Resets the cached backend (useful for testing)
*/
export function resetJobQueueCache(): void {
cachedBackend = null
cachedBackendType = null
}

View File

@@ -0,0 +1,22 @@
export {
getAsyncBackendType,
getCurrentBackendType,
getJobQueue,
resetJobQueueCache,
shouldExecuteInline,
} from './config'
export type {
AsyncBackendType,
EnqueueOptions,
Job,
JobMetadata,
JobQueueBackend,
JobStatus,
JobType,
} from './types'
export {
JOB_MAX_LIFETIME_SECONDS,
JOB_RETENTION_HOURS,
JOB_RETENTION_SECONDS,
JOB_STATUS,
} from './types'

View File

@@ -0,0 +1,32 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import { JOB_MAX_LIFETIME_SECONDS, JOB_RETENTION_HOURS, JOB_RETENTION_SECONDS } from './types'
describe('Job retention constants', () => {
it.concurrent('JOB_RETENTION_HOURS should be 24', async () => {
expect(JOB_RETENTION_HOURS).toBe(24)
})
it.concurrent('JOB_RETENTION_SECONDS should be derived from JOB_RETENTION_HOURS', async () => {
expect(JOB_RETENTION_SECONDS).toBe(JOB_RETENTION_HOURS * 60 * 60)
})
it.concurrent('JOB_RETENTION_SECONDS should equal 86400 (24 hours)', async () => {
expect(JOB_RETENTION_SECONDS).toBe(86400)
})
it.concurrent('constants should be consistent with each other', async () => {
const hoursToSeconds = JOB_RETENTION_HOURS * 60 * 60
expect(JOB_RETENTION_SECONDS).toBe(hoursToSeconds)
})
it.concurrent(
'JOB_MAX_LIFETIME_SECONDS should be greater than JOB_RETENTION_SECONDS',
async () => {
expect(JOB_MAX_LIFETIME_SECONDS).toBeGreaterThan(JOB_RETENTION_SECONDS)
expect(JOB_MAX_LIFETIME_SECONDS).toBe(48 * 60 * 60)
}
)
})

View File

@@ -0,0 +1,82 @@
/**
* Types and constants for the async job queue system
*/
/** Retention period for completed/failed jobs (in hours) */
export const JOB_RETENTION_HOURS = 24
/** Retention period for completed/failed jobs (in seconds, for Redis TTL) */
export const JOB_RETENTION_SECONDS = JOB_RETENTION_HOURS * 60 * 60
/** Max lifetime for jobs in Redis (in seconds) - cleanup for stuck pending/processing jobs */
export const JOB_MAX_LIFETIME_SECONDS = 48 * 60 * 60
export const JOB_STATUS = {
PENDING: 'pending',
PROCESSING: 'processing',
COMPLETED: 'completed',
FAILED: 'failed',
} as const
export type JobStatus = (typeof JOB_STATUS)[keyof typeof JOB_STATUS]
export type JobType = 'workflow-execution' | 'schedule-execution' | 'webhook-execution'
export interface Job<TPayload = unknown, TOutput = unknown> {
id: string
type: JobType
payload: TPayload
status: JobStatus
createdAt: Date
startedAt?: Date
completedAt?: Date
attempts: number
maxAttempts: number
error?: string
output?: TOutput
metadata: JobMetadata
}
export interface JobMetadata {
workflowId?: string
userId?: string
[key: string]: unknown
}
export interface EnqueueOptions {
maxAttempts?: number
metadata?: JobMetadata
}
/**
* Backend interface for job queue implementations.
* All backends must implement this interface.
*/
export interface JobQueueBackend {
/**
* Add a job to the queue
*/
enqueue<TPayload>(type: JobType, payload: TPayload, options?: EnqueueOptions): Promise<string>
/**
* Get a job by ID
*/
getJob(jobId: string): Promise<Job | null>
/**
* Mark a job as started/processing
*/
startJob(jobId: string): Promise<void>
/**
* Mark a job as completed with output
*/
completeJob(jobId: string, output: unknown): Promise<void>
/**
* Mark a job as failed with error message
*/
markJobFailed(jobId: string, error: string): Promise<void>
}
export type AsyncBackendType = 'trigger-dev' | 'redis' | 'database'

View File

@@ -170,10 +170,15 @@ export const env = createEnv({
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'),
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3600'),
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3600'),
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3600'),
// Timeout Configuration
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'), // 5 minutes
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3000'), // 50 minutes
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3000'), // 50 minutes
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3000'), // 50 minutes
EXECUTION_TIMEOUT_ASYNC_FREE: z.string().optional().default('5400'), // 90 minutes
EXECUTION_TIMEOUT_ASYNC_PRO: z.string().optional().default('5400'), // 90 minutes
EXECUTION_TIMEOUT_ASYNC_TEAM: z.string().optional().default('5400'), // 90 minutes
EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: z.string().optional().default('5400'), // 90 minutes
// Knowledge Base Processing Configuration - Shared across all processing methods
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
@@ -345,7 +350,6 @@ export const env = createEnv({
NEXT_PUBLIC_BRAND_BACKGROUND_COLOR: z.string().regex(/^#[0-9A-Fa-f]{6}$/).optional(), // Brand background color (hex format)
// Feature Flags
NEXT_PUBLIC_TRIGGER_DEV_ENABLED: z.boolean().optional(), // Client-side gate for async executions UI
NEXT_PUBLIC_SSO_ENABLED: z.boolean().optional(), // Enable SSO login UI components
NEXT_PUBLIC_CREDENTIAL_SETS_ENABLED: z.boolean().optional(), // Enable credential sets (email polling) on self-hosted
NEXT_PUBLIC_ACCESS_CONTROL_ENABLED: z.boolean().optional(), // Enable access control (permission groups) on self-hosted
@@ -377,7 +381,6 @@ export const env = createEnv({
NEXT_PUBLIC_BRAND_ACCENT_COLOR: process.env.NEXT_PUBLIC_BRAND_ACCENT_COLOR,
NEXT_PUBLIC_BRAND_ACCENT_HOVER_COLOR: process.env.NEXT_PUBLIC_BRAND_ACCENT_HOVER_COLOR,
NEXT_PUBLIC_BRAND_BACKGROUND_COLOR: process.env.NEXT_PUBLIC_BRAND_BACKGROUND_COLOR,
NEXT_PUBLIC_TRIGGER_DEV_ENABLED: process.env.NEXT_PUBLIC_TRIGGER_DEV_ENABLED,
NEXT_PUBLIC_SSO_ENABLED: process.env.NEXT_PUBLIC_SSO_ENABLED,
NEXT_PUBLIC_CREDENTIAL_SETS_ENABLED: process.env.NEXT_PUBLIC_CREDENTIAL_SETS_ENABLED,
NEXT_PUBLIC_ACCESS_CONTROL_ENABLED: process.env.NEXT_PUBLIC_ACCESS_CONTROL_ENABLED,

View File

@@ -1,4 +1,5 @@
import { env } from '@/lib/core/config/env'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
interface ExecutionTimeoutConfig {
@@ -8,13 +9,17 @@ interface ExecutionTimeoutConfig {
const DEFAULT_SYNC_TIMEOUTS_SECONDS = {
free: 300,
pro: 3600,
team: 3600,
enterprise: 3600,
pro: 3000,
team: 3000,
enterprise: 3000,
} as const
const ASYNC_MULTIPLIER = 2
const MAX_ASYNC_TIMEOUT_SECONDS = 5400
const DEFAULT_ASYNC_TIMEOUTS_SECONDS = {
free: 5400,
pro: 5400,
team: 5400,
enterprise: 5400,
} as const
function getSyncTimeoutForPlan(plan: SubscriptionPlan): number {
const envVarMap: Record<SubscriptionPlan, string | undefined> = {
@@ -27,10 +32,13 @@ function getSyncTimeoutForPlan(plan: SubscriptionPlan): number {
}
function getAsyncTimeoutForPlan(plan: SubscriptionPlan): number {
const syncMs = getSyncTimeoutForPlan(plan)
const asyncMs = syncMs * ASYNC_MULTIPLIER
const maxAsyncMs = MAX_ASYNC_TIMEOUT_SECONDS * 1000
return Math.min(asyncMs, maxAsyncMs)
const envVarMap: Record<SubscriptionPlan, string | undefined> = {
free: env.EXECUTION_TIMEOUT_ASYNC_FREE,
pro: env.EXECUTION_TIMEOUT_ASYNC_PRO,
team: env.EXECUTION_TIMEOUT_ASYNC_TEAM,
enterprise: env.EXECUTION_TIMEOUT_ASYNC_ENTERPRISE,
}
return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_ASYNC_TIMEOUTS_SECONDS[plan]) * 1000
}
const EXECUTION_TIMEOUTS: Record<SubscriptionPlan, ExecutionTimeoutConfig> = {
@@ -56,6 +64,9 @@ export function getExecutionTimeout(
plan: SubscriptionPlan | undefined,
type: 'sync' | 'async' = 'sync'
): number {
if (!isBillingEnabled) {
return EXECUTION_TIMEOUTS.enterprise[type]
}
return EXECUTION_TIMEOUTS[plan || 'free'][type]
}
@@ -63,7 +74,9 @@ export function getMaxExecutionTimeout(): number {
return EXECUTION_TIMEOUTS.enterprise.async
}
export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync
export const DEFAULT_EXECUTION_TIMEOUT_MS = isBillingEnabled
? EXECUTION_TIMEOUTS.free.sync
: EXECUTION_TIMEOUTS.enterprise.sync
export function isTimeoutError(error: unknown): boolean {
if (!error) return false

View File

@@ -156,7 +156,7 @@ export class McpClient {
return result.tools.map((tool: Tool) => ({
name: tool.name,
description: tool.description,
inputSchema: tool.inputSchema,
inputSchema: tool.inputSchema as McpTool['inputSchema'],
serverId: this.config.id,
serverName: this.config.name,
}))

View File

@@ -57,14 +57,29 @@ export interface McpSecurityPolicy {
auditLevel: 'none' | 'basic' | 'detailed'
}
/**
* JSON Schema property definition for tool parameters.
* Follows JSON Schema specification with description support.
*/
export interface McpToolSchemaProperty {
type: string
description?: string
items?: McpToolSchemaProperty
properties?: Record<string, McpToolSchemaProperty>
required?: string[]
enum?: Array<string | number | boolean>
default?: unknown
}
/**
* JSON Schema for tool input parameters.
* Aligns with MCP SDK's Tool.inputSchema structure.
*/
export interface McpToolSchema {
type: 'object'
properties?: Record<string, unknown>
properties?: Record<string, McpToolSchemaProperty>
required?: string[]
description?: string
}
/**

View File

@@ -1,12 +1,12 @@
import { db, webhook, workflow, workflowDeploymentVersion } from '@sim/db'
import { credentialSet, subscription } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { tasks } from '@trigger.dev/sdk'
import { and, eq, isNull, or } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
import { isProd, isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
import { isProd } from '@/lib/core/config/feature-flags'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils'
@@ -1015,18 +1015,39 @@ export async function queueWebhookExecution(
...(credentialId ? { credentialId } : {}),
}
if (isTriggerDevEnabled) {
const handle = await tasks.trigger('webhook-execution', payload)
logger.info(
`[${options.requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
)
} else {
void executeWebhookJob(payload).catch((error) => {
logger.error(`[${options.requestId}] Direct webhook execution failed`, error)
})
logger.info(
`[${options.requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
)
const jobQueue = await getJobQueue()
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
metadata: { workflowId: foundWorkflow.id, userId: foundWorkflow.userId },
})
logger.info(
`[${options.requestId}] Queued webhook execution task ${jobId} for ${foundWebhook.provider} webhook`
)
if (shouldExecuteInline()) {
void (async () => {
try {
await jobQueue.startJob(jobId)
const output = await executeWebhookJob(payload)
await jobQueue.completeJob(jobId, output)
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logger.error(`[${options.requestId}] Webhook execution failed`, {
jobId,
error: errorMessage,
})
try {
await jobQueue.markJobFailed(jobId, errorMessage)
} catch (markFailedError) {
logger.error(`[${options.requestId}] Failed to mark job as failed`, {
jobId,
error:
markFailedError instanceof Error
? markFailedError.message
: String(markFailedError),
})
}
}
})()
}
if (foundWebhook.provider === 'microsoft-teams') {

View File

@@ -6,7 +6,6 @@ import { MAX_TOOL_ITERATIONS } from '@/providers'
import {
checkForForcedToolUsage,
createReadableStreamFromAnthropicStream,
generateToolUseId,
} from '@/providers/anthropic/utils'
import {
getMaxOutputTokensForModel,
@@ -433,11 +432,32 @@ export const anthropicProvider: ProviderConfig = {
const executionResults = await Promise.allSettled(toolExecutionPromises)
// Collect all tool_use and tool_result blocks for batching
const toolUseBlocks: Array<{
type: 'tool_use'
id: string
name: string
input: Record<string, unknown>
}> = []
const toolResultBlocks: Array<{
type: 'tool_result'
tool_use_id: string
content: string
}> = []
for (const settledResult of executionResults) {
if (settledResult.status === 'rejected' || !settledResult.value) continue
const { toolName, toolArgs, toolParams, result, startTime, endTime, duration } =
settledResult.value
const {
toolUse,
toolName,
toolArgs,
toolParams,
result,
startTime,
endTime,
duration,
} = settledResult.value
timeSegments.push({
type: 'tool',
@@ -447,7 +467,7 @@ export const anthropicProvider: ProviderConfig = {
duration: duration,
})
let resultContent: any
let resultContent: unknown
if (result.success) {
toolResults.push(result.output)
resultContent = result.output
@@ -469,29 +489,34 @@ export const anthropicProvider: ProviderConfig = {
success: result.success,
})
const toolUseId = generateToolUseId(toolName)
currentMessages.push({
role: 'assistant',
content: [
{
type: 'tool_use',
id: toolUseId,
name: toolName,
input: toolArgs,
} as any,
],
// Add to batched arrays using the ORIGINAL ID from Claude's response
toolUseBlocks.push({
type: 'tool_use',
id: toolUse.id,
name: toolName,
input: toolArgs,
})
toolResultBlocks.push({
type: 'tool_result',
tool_use_id: toolUse.id,
content: JSON.stringify(resultContent),
})
}
// Add ONE assistant message with ALL tool_use blocks
if (toolUseBlocks.length > 0) {
currentMessages.push({
role: 'assistant',
content: toolUseBlocks as unknown as Anthropic.Messages.ContentBlock[],
})
}
// Add ONE user message with ALL tool_result blocks
if (toolResultBlocks.length > 0) {
currentMessages.push({
role: 'user',
content: [
{
type: 'tool_result',
tool_use_id: toolUseId,
content: JSON.stringify(resultContent),
} as any,
],
content: toolResultBlocks as unknown as Anthropic.Messages.ContentBlockParam[],
})
}
@@ -777,6 +802,8 @@ export const anthropicProvider: ProviderConfig = {
const toolCallStartTime = Date.now()
const toolName = toolUse.name
const toolArgs = toolUse.input as Record<string, any>
// Preserve the original tool_use ID from Claude's response
const toolUseId = toolUse.id
try {
const tool = request.tools?.find((t) => t.id === toolName)
@@ -787,6 +814,7 @@ export const anthropicProvider: ProviderConfig = {
const toolCallEndTime = Date.now()
return {
toolUseId,
toolName,
toolArgs,
toolParams,
@@ -800,6 +828,7 @@ export const anthropicProvider: ProviderConfig = {
logger.error('Error processing tool call:', { error, toolName })
return {
toolUseId,
toolName,
toolArgs,
toolParams: {},
@@ -817,11 +846,32 @@ export const anthropicProvider: ProviderConfig = {
const executionResults = await Promise.allSettled(toolExecutionPromises)
// Collect all tool_use and tool_result blocks for batching
const toolUseBlocks: Array<{
type: 'tool_use'
id: string
name: string
input: Record<string, unknown>
}> = []
const toolResultBlocks: Array<{
type: 'tool_result'
tool_use_id: string
content: string
}> = []
for (const settledResult of executionResults) {
if (settledResult.status === 'rejected' || !settledResult.value) continue
const { toolName, toolArgs, toolParams, result, startTime, endTime, duration } =
settledResult.value
const {
toolUseId,
toolName,
toolArgs,
toolParams,
result,
startTime,
endTime,
duration,
} = settledResult.value
timeSegments.push({
type: 'tool',
@@ -831,7 +881,7 @@ export const anthropicProvider: ProviderConfig = {
duration: duration,
})
let resultContent: any
let resultContent: unknown
if (result.success) {
toolResults.push(result.output)
resultContent = result.output
@@ -853,29 +903,34 @@ export const anthropicProvider: ProviderConfig = {
success: result.success,
})
const toolUseId = generateToolUseId(toolName)
currentMessages.push({
role: 'assistant',
content: [
{
type: 'tool_use',
id: toolUseId,
name: toolName,
input: toolArgs,
} as any,
],
// Add to batched arrays using the ORIGINAL ID from Claude's response
toolUseBlocks.push({
type: 'tool_use',
id: toolUseId,
name: toolName,
input: toolArgs,
})
toolResultBlocks.push({
type: 'tool_result',
tool_use_id: toolUseId,
content: JSON.stringify(resultContent),
})
}
// Add ONE assistant message with ALL tool_use blocks
if (toolUseBlocks.length > 0) {
currentMessages.push({
role: 'assistant',
content: toolUseBlocks as unknown as Anthropic.Messages.ContentBlock[],
})
}
// Add ONE user message with ALL tool_result blocks
if (toolResultBlocks.length > 0) {
currentMessages.push({
role: 'user',
content: [
{
type: 'tool_result',
tool_use_id: toolUseId,
content: JSON.stringify(resultContent),
} as any,
],
content: toolResultBlocks as unknown as Anthropic.Messages.ContentBlockParam[],
})
}
@@ -1061,7 +1116,7 @@ export const anthropicProvider: ProviderConfig = {
startTime: tc.startTime,
endTime: tc.endTime,
duration: tc.duration,
result: tc.result,
result: tc.result as Record<string, unknown> | undefined,
}))
: undefined,
toolResults: toolResults.length > 0 ? toolResults : undefined,

View File

@@ -67,8 +67,17 @@ export function checkForForcedToolUsage(
return null
}
/**
* Generates a unique tool use ID for Bedrock.
* AWS Bedrock requires toolUseId to be 1-64 characters, pattern [a-zA-Z0-9_-]+
*/
export function generateToolUseId(toolName: string): string {
return `${toolName}-${Date.now()}-${Math.random().toString(36).substring(2, 7)}`
const timestamp = Date.now().toString(36) // Base36 timestamp (9 chars)
const random = Math.random().toString(36).substring(2, 7) // 5 random chars
const suffix = `-${timestamp}-${random}` // ~15 chars
const maxNameLength = 64 - suffix.length
const truncatedName = toolName.substring(0, maxNameLength).replace(/[^a-zA-Z0-9_-]/g, '_')
return `${truncatedName}${suffix}`
}
/**

View File

@@ -76,7 +76,7 @@ export const deepseekProvider: ProviderConfig = {
: undefined
const payload: any = {
model: 'deepseek-chat',
model: request.model,
messages: allMessages,
}

View File

@@ -20,7 +20,7 @@ import {
convertUsageMetadata,
createReadableStreamFromGeminiStream,
ensureStructResponse,
extractFunctionCallPart,
extractAllFunctionCallParts,
extractTextContent,
mapToThinkingLevel,
} from '@/providers/google/utils'
@@ -32,7 +32,7 @@ import {
prepareToolsWithUsageControl,
} from '@/providers/utils'
import { executeTool } from '@/tools'
import type { ExecutionState, GeminiProviderType, GeminiUsage, ParsedFunctionCall } from './types'
import type { ExecutionState, GeminiProviderType, GeminiUsage } from './types'
/**
* Creates initial execution state
@@ -79,101 +79,168 @@ function createInitialState(
}
/**
* Executes a tool call and updates state
* Executes multiple tool calls in parallel and updates state.
* Per Gemini docs, all function calls from a single response should be executed
* together, with one model message containing all function calls and one user
* message containing all function responses.
*/
async function executeToolCall(
functionCallPart: Part,
functionCall: ParsedFunctionCall,
async function executeToolCallsBatch(
functionCallParts: Part[],
request: ProviderRequest,
state: ExecutionState,
forcedTools: string[],
logger: ReturnType<typeof createLogger>
): Promise<{ success: boolean; state: ExecutionState }> {
const toolCallStartTime = Date.now()
const toolName = functionCall.name
const tool = request.tools?.find((t) => t.id === toolName)
if (!tool) {
logger.warn(`Tool ${toolName} not found in registry, skipping`)
if (functionCallParts.length === 0) {
return { success: false, state }
}
try {
const { toolParams, executionParams } = prepareToolExecution(tool, functionCall.args, request)
const result = await executeTool(toolName, executionParams)
const toolCallEndTime = Date.now()
const duration = toolCallEndTime - toolCallStartTime
const executionPromises = functionCallParts.map(async (part) => {
const toolCallStartTime = Date.now()
const functionCall = part.functionCall!
const toolName = functionCall.name ?? ''
const args = (functionCall.args ?? {}) as Record<string, unknown>
const resultContent: Record<string, unknown> = result.success
? ensureStructResponse(result.output)
: { error: true, message: result.error || 'Tool execution failed', tool: toolName }
const toolCall: FunctionCallResponse = {
name: toolName,
arguments: toolParams,
startTime: new Date(toolCallStartTime).toISOString(),
endTime: new Date(toolCallEndTime).toISOString(),
duration,
result: resultContent,
const tool = request.tools?.find((t) => t.id === toolName)
if (!tool) {
logger.warn(`Tool ${toolName} not found in registry, skipping`)
return {
success: false,
part,
toolName,
args,
resultContent: { error: true, message: `Tool ${toolName} not found`, tool: toolName },
toolParams: {},
startTime: toolCallStartTime,
endTime: Date.now(),
duration: Date.now() - toolCallStartTime,
}
}
const updatedContents: Content[] = [
...state.contents,
{
role: 'model',
parts: [functionCallPart],
},
{
role: 'user',
parts: [
{
functionResponse: {
name: functionCall.name,
response: resultContent,
},
},
],
},
]
try {
const { toolParams, executionParams } = prepareToolExecution(tool, args, request)
const result = await executeTool(toolName, executionParams)
const toolCallEndTime = Date.now()
const duration = toolCallEndTime - toolCallStartTime
const forcedToolCheck = checkForForcedToolUsage(
[{ name: functionCall.name, args: functionCall.args }],
state.currentToolConfig,
forcedTools,
state.usedForcedTools
)
const resultContent: Record<string, unknown> = result.success
? ensureStructResponse(result.output)
: { error: true, message: result.error || 'Tool execution failed', tool: toolName }
return {
success: true,
state: {
...state,
contents: updatedContents,
toolCalls: [...state.toolCalls, toolCall],
toolResults: result.success
? [...state.toolResults, result.output as Record<string, unknown>]
: state.toolResults,
toolsTime: state.toolsTime + duration,
timeSegments: [
...state.timeSegments,
{
type: 'tool',
name: toolName,
startTime: toolCallStartTime,
endTime: toolCallEndTime,
duration,
},
],
usedForcedTools: forcedToolCheck?.usedForcedTools ?? state.usedForcedTools,
currentToolConfig: forcedToolCheck?.nextToolConfig ?? state.currentToolConfig,
},
return {
success: result.success,
part,
toolName,
args,
resultContent,
toolParams,
result,
startTime: toolCallStartTime,
endTime: toolCallEndTime,
duration,
}
} catch (error) {
const toolCallEndTime = Date.now()
logger.error('Error processing function call:', {
error: error instanceof Error ? error.message : String(error),
functionName: toolName,
})
return {
success: false,
part,
toolName,
args,
resultContent: {
error: true,
message: error instanceof Error ? error.message : 'Tool execution failed',
tool: toolName,
},
toolParams: {},
startTime: toolCallStartTime,
endTime: toolCallEndTime,
duration: toolCallEndTime - toolCallStartTime,
}
}
} catch (error) {
logger.error('Error processing function call:', {
error: error instanceof Error ? error.message : String(error),
functionName: toolName,
})
})
const results = await Promise.all(executionPromises)
// Check if at least one tool was found (not all failed due to missing tools)
const hasValidResults = results.some((r) => r.result !== undefined)
if (!hasValidResults && results.every((r) => !r.success)) {
return { success: false, state }
}
// Build batched messages per Gemini spec:
// ONE model message with ALL function call parts
// ONE user message with ALL function responses
const modelParts: Part[] = results.map((r) => r.part)
const userParts: Part[] = results.map((r) => ({
functionResponse: {
name: r.toolName,
response: r.resultContent,
},
}))
const updatedContents: Content[] = [
...state.contents,
{ role: 'model', parts: modelParts },
{ role: 'user', parts: userParts },
]
// Collect all tool calls and results
const newToolCalls: FunctionCallResponse[] = []
const newToolResults: Record<string, unknown>[] = []
const newTimeSegments: ExecutionState['timeSegments'] = []
let totalToolsTime = 0
for (const r of results) {
newToolCalls.push({
name: r.toolName,
arguments: r.toolParams,
startTime: new Date(r.startTime).toISOString(),
endTime: new Date(r.endTime).toISOString(),
duration: r.duration,
result: r.resultContent,
})
if (r.success && r.result?.output) {
newToolResults.push(r.result.output as Record<string, unknown>)
}
newTimeSegments.push({
type: 'tool',
name: r.toolName,
startTime: r.startTime,
endTime: r.endTime,
duration: r.duration,
})
totalToolsTime += r.duration
}
// Check forced tool usage for all executed tools
const executedToolsInfo = results.map((r) => ({ name: r.toolName, args: r.args }))
const forcedToolCheck = checkForForcedToolUsage(
executedToolsInfo,
state.currentToolConfig,
forcedTools,
state.usedForcedTools
)
return {
success: true,
state: {
...state,
contents: updatedContents,
toolCalls: [...state.toolCalls, ...newToolCalls],
toolResults: [...state.toolResults, ...newToolResults],
toolsTime: state.toolsTime + totalToolsTime,
timeSegments: [...state.timeSegments, ...newTimeSegments],
usedForcedTools: forcedToolCheck?.usedForcedTools ?? state.usedForcedTools,
currentToolConfig: forcedToolCheck?.nextToolConfig ?? state.currentToolConfig,
},
}
}
/**
@@ -506,27 +573,25 @@ export async function executeGeminiRequest(
// Tool execution loop
const functionCalls = response.functionCalls
if (functionCalls?.length) {
logger.info(`Received function call from Gemini: ${functionCalls[0].name}`)
const functionNames = functionCalls.map((fc) => fc.name).join(', ')
logger.info(`Received ${functionCalls.length} function call(s) from Gemini: ${functionNames}`)
while (state.iterationCount < MAX_TOOL_ITERATIONS) {
const functionCallPart = extractFunctionCallPart(currentResponse.candidates?.[0])
if (!functionCallPart?.functionCall) {
// Extract ALL function call parts from the response (Gemini can return multiple)
const functionCallParts = extractAllFunctionCallParts(currentResponse.candidates?.[0])
if (functionCallParts.length === 0) {
content = extractTextContent(currentResponse.candidates?.[0])
break
}
const functionCall: ParsedFunctionCall = {
name: functionCallPart.functionCall.name ?? '',
args: (functionCallPart.functionCall.args ?? {}) as Record<string, unknown>,
}
const callNames = functionCallParts.map((p) => p.functionCall?.name ?? 'unknown').join(', ')
logger.info(
`Processing function call: ${functionCall.name} (iteration ${state.iterationCount + 1})`
`Processing ${functionCallParts.length} function call(s): ${callNames} (iteration ${state.iterationCount + 1})`
)
const { success, state: updatedState } = await executeToolCall(
functionCallPart,
functionCall,
// Execute ALL function calls in this batch
const { success, state: updatedState } = await executeToolCallsBatch(
functionCallParts,
request,
state,
forcedTools,

View File

@@ -109,6 +109,7 @@ export function extractFunctionCall(candidate: Candidate | undefined): ParsedFun
/**
* Extracts the full Part containing the function call (preserves thoughtSignature)
* @deprecated Use extractAllFunctionCallParts for proper multi-tool handling
*/
export function extractFunctionCallPart(candidate: Candidate | undefined): Part | null {
if (!candidate?.content?.parts) return null
@@ -122,6 +123,17 @@ export function extractFunctionCallPart(candidate: Candidate | undefined): Part
return null
}
/**
* Extracts ALL Parts containing function calls from a candidate.
* Gemini can return multiple function calls in a single response,
* and all should be executed before continuing the conversation.
*/
export function extractAllFunctionCallParts(candidate: Candidate | undefined): Part[] {
if (!candidate?.content?.parts) return []
return candidate.content.parts.filter((part) => part.functionCall)
}
/**
* Converts usage metadata from SDK response to our format.
* Per Gemini docs, total = promptTokenCount + candidatesTokenCount + toolUsePromptTokenCount + thoughtsTokenCount

View File

@@ -320,6 +320,7 @@ export const groqProvider: ProviderConfig = {
currentMessages.push({
role: 'tool',
tool_call_id: toolCall.id,
name: toolName,
content: JSON.stringify(resultContent),
})
}

View File

@@ -383,6 +383,7 @@ export const mistralProvider: ProviderConfig = {
currentMessages.push({
role: 'tool',
tool_call_id: toolCall.id,
name: toolName,
content: JSON.stringify(resultContent),
})
}

View File

@@ -12,39 +12,70 @@ import {
import { persistWorkflowOperation } from '@/socket/database/operations'
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
import { checkRolePermission } from '@/socket/middleware/permissions'
import type { IRoomManager } from '@/socket/rooms'
import type { IRoomManager, UserSession } from '@/socket/rooms'
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
const logger = createLogger('OperationsHandlers')
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('workflow-operation', async (data) => {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
if (data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
const emitOperationError = (
forbidden: { type: string; message: string; operation?: string; target?: string },
failed?: { error: string; retryable?: boolean }
) => {
socket.emit('operation-forbidden', forbidden)
if (failed && data?.operationId) {
socket.emit('operation-failed', { operationId: data.operationId, ...failed })
}
}
if (!roomManager.isReady()) {
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
let workflowId: string | null = null
let session: UserSession | null = null
try {
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
session = await roomManager.getUserSession(socket.id)
} catch (error) {
logger.error('Error loading session for workflow operation:', error)
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
if (!workflowId || !session) {
emitOperationError(
{ type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' },
{ error: 'Session expired' }
)
return
}
let hasRoom = false
try {
hasRoom = await roomManager.hasWorkflowRoom(workflowId)
} catch (error) {
logger.error('Error checking workflow room:', error)
emitOperationError(
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
{ error: 'Realtime unavailable', retryable: true }
)
return
}
if (!hasRoom) {
socket.emit('operation-forbidden', {
type: 'ROOM_NOT_FOUND',
message: 'Workflow room not found',
})
if (data?.operationId) {
socket.emit('operation-failed', {
operationId: data.operationId,
error: 'Workflow room not found',
})
}
emitOperationError(
{ type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' },
{ error: 'Workflow room not found' }
)
return
}
@@ -77,15 +108,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
// Check permissions from cached role for all other operations
if (!userPresence) {
logger.warn(`User presence not found for socket ${socket.id}`)
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'User session not found',
operation,
target,
})
if (operationId) {
socket.emit('operation-failed', { operationId, error: 'User session not found' })
}
emitOperationError(
{
type: 'SESSION_ERROR',
message: 'User session not found',
operation,
target,
},
{ error: 'User session not found' }
)
return
}
@@ -97,7 +128,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
logger.warn(
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
)
socket.emit('operation-forbidden', {
emitOperationError({
type: 'INSUFFICIENT_PERMISSIONS',
message: `${permissionCheck.reason} on '${target}'`,
operation,

View File

@@ -48,6 +48,21 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
operationId,
} = data
if (!roomManager.isReady()) {
socket.emit('operation-forbidden', {
type: 'ROOM_MANAGER_UNAVAILABLE',
message: 'Realtime unavailable',
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Realtime unavailable',
retryable: true,
})
}
return
}
try {
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)

View File

@@ -37,6 +37,21 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
socket.on('variable-update', async (data) => {
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
if (!roomManager.isReady()) {
socket.emit('operation-forbidden', {
type: 'ROOM_MANAGER_UNAVAILABLE',
message: 'Realtime unavailable',
})
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Realtime unavailable',
retryable: true,
})
}
return
}
try {
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)

View File

@@ -20,6 +20,15 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
return
}
if (!roomManager.isReady()) {
logger.warn(`Join workflow rejected: Room manager unavailable`)
socket.emit('join-workflow-error', {
error: 'Realtime unavailable',
code: 'ROOM_MANAGER_UNAVAILABLE',
})
return
}
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
// Verify workflow access
@@ -128,12 +137,20 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
// Undo socket.join and room manager entry if any operation failed
socket.leave(workflowId)
await roomManager.removeUserFromRoom(socket.id)
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
const isReady = roomManager.isReady()
socket.emit('join-workflow-error', {
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
})
}
})
socket.on('leave-workflow', async () => {
try {
if (!roomManager.isReady()) {
return
}
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)

View File

@@ -26,6 +26,10 @@ export class MemoryRoomManager implements IRoomManager {
logger.info('MemoryRoomManager initialized (single-pod mode)')
}
isReady(): boolean {
return true
}
async shutdown(): Promise<void> {
this.workflowRooms.clear()
this.socketToWorkflow.clear()

View File

@@ -96,17 +96,6 @@ export class RedisRoomManager implements IRoomManager {
this._io = io
this.redis = createClient({
url: redisUrl,
socket: {
reconnectStrategy: (retries) => {
if (retries > 10) {
logger.error('Redis reconnection failed after 10 attempts')
return new Error('Redis reconnection failed')
}
const delay = Math.min(retries * 100, 3000)
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
return delay
},
},
})
this.redis.on('error', (err) => {
@@ -122,12 +111,21 @@ export class RedisRoomManager implements IRoomManager {
logger.info('Redis client ready')
this.isConnected = true
})
this.redis.on('end', () => {
logger.warn('Redis client connection closed')
this.isConnected = false
})
}
get io(): Server {
return this._io
}
isReady(): boolean {
return this.isConnected
}
async initialize(): Promise<void> {
if (this.isConnected) return

View File

@@ -48,6 +48,11 @@ export interface IRoomManager {
*/
initialize(): Promise<void>
/**
* Whether the room manager is ready to serve requests
*/
isReady(): boolean
/**
* Clean shutdown
*/

View File

@@ -85,6 +85,11 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
res.end(JSON.stringify({ error: authResult.error }))
return
}
if (!roomManager.isReady()) {
sendError(res, 'Room manager unavailable', 503)
return
}
}
// Handle workflow deletion notifications from the main API

View File

@@ -1,3 +1,3 @@
import { mistralParserTool, mistralParserV2Tool } from '@/tools/mistral/parser'
import { mistralParserTool, mistralParserV2Tool, mistralParserV3Tool } from '@/tools/mistral/parser'
export { mistralParserTool, mistralParserV2Tool }
export { mistralParserTool, mistralParserV2Tool, mistralParserV3Tool }

View File

@@ -349,74 +349,14 @@ export const mistralParserTool: ToolConfig<MistralParserInput, MistralParserOutp
},
}
const mistralParserV2Params = {
file: {
type: 'file',
required: true,
visibility: 'hidden',
description: 'File data from a previous block',
},
resultType: mistralParserTool.params.resultType,
includeImageBase64: mistralParserTool.params.includeImageBase64,
pages: mistralParserTool.params.pages,
imageLimit: mistralParserTool.params.imageLimit,
imageMinSize: mistralParserTool.params.imageMinSize,
apiKey: mistralParserTool.params.apiKey,
} satisfies ToolConfig['params']
export const mistralParserV2Tool: ToolConfig<MistralParserV2Input, MistralParserV2Output> = {
export const mistralParserV2Tool: ToolConfig<MistralParserInput, MistralParserV2Output> = {
id: 'mistral_parser_v2',
name: 'Mistral PDF Parser',
description: 'Parse PDF documents using Mistral OCR API',
version: '2.0.0',
params: mistralParserV2Params,
request: {
url: '/api/tools/mistral/parse',
method: 'POST',
headers: (params) => {
return {
'Content-Type': 'application/json',
Accept: 'application/json',
Authorization: `Bearer ${params.apiKey}`,
}
},
body: (params) => {
if (!params || typeof params !== 'object') {
throw new Error('Invalid parameters: Parameters must be provided as an object')
}
if (!params.apiKey || typeof params.apiKey !== 'string' || params.apiKey.trim() === '') {
throw new Error('Missing or invalid API key: A valid Mistral API key is required')
}
const file = params.file
if (!file || typeof file !== 'object') {
throw new Error('File input is required')
}
const requestBody: Record<string, unknown> = {
apiKey: params.apiKey,
resultType: params.resultType || 'markdown',
}
requestBody.file = file
if (params.pages) {
requestBody.pages = params.pages
}
if (params.includeImageBase64 !== undefined) {
requestBody.includeImageBase64 = params.includeImageBase64
}
if (params.imageLimit !== undefined) {
requestBody.imageLimit = params.imageLimit
}
if (params.imageMinSize !== undefined) {
requestBody.imageMinSize = params.imageMinSize
}
return requestBody
},
},
params: mistralParserTool.params,
request: mistralParserTool.request,
transformResponse: async (response: Response) => {
let ocrResult
@@ -543,3 +483,73 @@ export const mistralParserV2Tool: ToolConfig<MistralParserV2Input, MistralParser
},
},
}
/**
* V3 tool - Updated for new file handling pattern with UserFile normalization
* Used by MistralParseV3Block which uses fileUpload (basic) and fileReference (advanced) subblocks
*/
export const mistralParserV3Tool: ToolConfig<MistralParserV2Input, MistralParserV2Output> = {
...mistralParserV2Tool,
id: 'mistral_parser_v3',
version: '3.0.0',
params: {
file: {
type: 'file',
required: true,
visibility: 'hidden',
description: 'Normalized UserFile from file upload or file reference',
},
resultType: mistralParserTool.params.resultType,
includeImageBase64: mistralParserTool.params.includeImageBase64,
pages: mistralParserTool.params.pages,
imageLimit: mistralParserTool.params.imageLimit,
imageMinSize: mistralParserTool.params.imageMinSize,
apiKey: mistralParserTool.params.apiKey,
},
request: {
url: '/api/tools/mistral/parse',
method: 'POST',
headers: (params) => {
return {
'Content-Type': 'application/json',
Accept: 'application/json',
Authorization: `Bearer ${params.apiKey}`,
}
},
body: (params) => {
if (!params || typeof params !== 'object') {
throw new Error('Invalid parameters: Parameters must be provided as an object')
}
if (!params.apiKey || typeof params.apiKey !== 'string' || params.apiKey.trim() === '') {
throw new Error('Missing or invalid API key: A valid Mistral API key is required')
}
// V3 expects normalized UserFile object via `file` param
const file = params.file
if (!file || typeof file !== 'object') {
throw new Error('File input is required: provide a file upload or file reference')
}
const requestBody: Record<string, unknown> = {
apiKey: params.apiKey,
resultType: params.resultType || 'markdown',
file: file,
}
if (params.pages) {
requestBody.pages = params.pages
}
if (params.includeImageBase64 !== undefined) {
requestBody.includeImageBase64 = params.includeImageBase64
}
if (params.imageLimit !== undefined) {
requestBody.imageLimit = params.imageLimit
}
if (params.imageMinSize !== undefined) {
requestBody.imageMinSize = params.imageMinSize
}
return requestBody
},
},
}

View File

@@ -1093,7 +1093,7 @@ import {
microsoftTeamsWriteChannelTool,
microsoftTeamsWriteChatTool,
} from '@/tools/microsoft_teams'
import { mistralParserTool, mistralParserV2Tool } from '@/tools/mistral'
import { mistralParserTool, mistralParserV2Tool, mistralParserV3Tool } from '@/tools/mistral'
import {
mongodbDeleteTool,
mongodbExecuteTool,
@@ -2684,6 +2684,7 @@ export const tools: Record<string, ToolConfig> = {
apollo_email_accounts: apolloEmailAccountsTool,
mistral_parser: mistralParserTool,
mistral_parser_v2: mistralParserV2Tool,
mistral_parser_v3: mistralParserV3Tool,
reducto_parser: reductoParserTool,
reducto_parser_v2: reductoParserV2Tool,
textract_parser: textractParserTool,

View File

@@ -127,6 +127,18 @@ app:
RATE_LIMIT_WINDOW_MS: "60000" # Rate limit window duration (1 minute)
RATE_LIMIT_FREE_SYNC: "50" # Sync API executions per minute
RATE_LIMIT_FREE_ASYNC: "200" # Async API executions per minute
# Execution Timeout Configuration (in seconds)
# Sync timeouts apply to synchronous API calls
EXECUTION_TIMEOUT_FREE: "300" # Free tier sync timeout (5 minutes)
EXECUTION_TIMEOUT_PRO: "3000" # Pro tier sync timeout (50 minutes)
EXECUTION_TIMEOUT_TEAM: "3000" # Team tier sync timeout (50 minutes)
EXECUTION_TIMEOUT_ENTERPRISE: "3000" # Enterprise tier sync timeout (50 minutes)
# Async timeouts apply to async/background job executions
EXECUTION_TIMEOUT_ASYNC_FREE: "5400" # Free tier async timeout (90 minutes)
EXECUTION_TIMEOUT_ASYNC_PRO: "5400" # Pro tier async timeout (90 minutes)
EXECUTION_TIMEOUT_ASYNC_TEAM: "5400" # Team tier async timeout (90 minutes)
EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: "5400" # Enterprise tier async timeout (90 minutes)
# UI Branding & Whitelabeling Configuration
NEXT_PUBLIC_BRAND_NAME: "Sim" # Custom brand name

View File

@@ -0,0 +1,19 @@
CREATE TABLE "async_jobs" (
"id" text PRIMARY KEY NOT NULL,
"type" text NOT NULL,
"payload" jsonb NOT NULL,
"status" text DEFAULT 'pending' NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL,
"started_at" timestamp,
"completed_at" timestamp,
"run_at" timestamp,
"attempts" integer DEFAULT 0 NOT NULL,
"max_attempts" integer DEFAULT 3 NOT NULL,
"error" text,
"output" jsonb,
"metadata" jsonb DEFAULT '{}' NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE INDEX "async_jobs_status_started_at_idx" ON "async_jobs" USING btree ("status","started_at");--> statement-breakpoint
CREATE INDEX "async_jobs_status_completed_at_idx" ON "async_jobs" USING btree ("status","completed_at");

File diff suppressed because it is too large Load Diff

View File

@@ -1051,6 +1051,13 @@
"when": 1769897862156,
"tag": "0150_flimsy_hemingway",
"breakpoints": true
},
{
"idx": 151,
"version": "7",
"when": 1770239332381,
"tag": "0151_stale_screwball",
"breakpoints": true
}
]
}

View File

@@ -2124,3 +2124,34 @@ export const permissionGroupMember = pgTable(
userIdUnique: uniqueIndex('permission_group_member_user_id_unique').on(table.userId),
})
)
/**
* Async Jobs - Queue for background job processing (Redis/DB backends)
* Used when trigger.dev is not available for async workflow executions
*/
export const asyncJobs = pgTable(
'async_jobs',
{
id: text('id').primaryKey(),
type: text('type').notNull(),
payload: jsonb('payload').notNull(),
status: text('status').notNull().default('pending'),
createdAt: timestamp('created_at').notNull().defaultNow(),
startedAt: timestamp('started_at'),
completedAt: timestamp('completed_at'),
runAt: timestamp('run_at'),
attempts: integer('attempts').notNull().default(0),
maxAttempts: integer('max_attempts').notNull().default(3),
error: text('error'),
output: jsonb('output'),
metadata: jsonb('metadata').notNull().default('{}'),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
},
(table) => ({
statusStartedAtIdx: index('async_jobs_status_started_at_idx').on(table.status, table.startedAt),
statusCompletedAtIdx: index('async_jobs_status_completed_at_idx').on(
table.status,
table.completedAt
),
})
)

View File

@@ -45,12 +45,14 @@ export * from './assertions'
export * from './builders'
export * from './factories'
export {
clearRedisMocks,
createEnvMock,
createMockDb,
createMockFetch,
createMockFormDataRequest,
createMockGetEnv,
createMockLogger,
createMockRedis,
createMockRequest,
createMockResponse,
createMockSocket,
@@ -63,6 +65,7 @@ export {
loggerMock,
type MockAuthResult,
type MockFetchResponse,
type MockRedis,
type MockUser,
mockAuth,
mockCommonSchemas,

View File

@@ -63,6 +63,8 @@ export {
} from './fetch.mock'
// Logger mocks
export { clearLoggerMocks, createMockLogger, getLoggerCalls, loggerMock } from './logger.mock'
// Redis mocks
export { clearRedisMocks, createMockRedis, type MockRedis } from './redis.mock'
// Request mocks
export { createMockFormDataRequest, createMockRequest } from './request.mock'
// Socket mocks

View File

@@ -0,0 +1,80 @@
import { vi } from 'vitest'
/**
* Creates a mock Redis client with common operations.
*
* @example
* ```ts
* const redis = createMockRedis()
* const queue = new RedisJobQueue(redis as never)
*
* // After operations
* expect(redis.hset).toHaveBeenCalled()
* expect(redis.expire).toHaveBeenCalledWith('key', 86400)
* ```
*/
export function createMockRedis() {
return {
// Hash operations
hset: vi.fn().mockResolvedValue(1),
hget: vi.fn().mockResolvedValue(null),
hgetall: vi.fn().mockResolvedValue({}),
hdel: vi.fn().mockResolvedValue(1),
hmset: vi.fn().mockResolvedValue('OK'),
hincrby: vi.fn().mockResolvedValue(1),
// Key operations
get: vi.fn().mockResolvedValue(null),
set: vi.fn().mockResolvedValue('OK'),
del: vi.fn().mockResolvedValue(1),
exists: vi.fn().mockResolvedValue(0),
expire: vi.fn().mockResolvedValue(1),
ttl: vi.fn().mockResolvedValue(-1),
// List operations
lpush: vi.fn().mockResolvedValue(1),
rpush: vi.fn().mockResolvedValue(1),
lpop: vi.fn().mockResolvedValue(null),
rpop: vi.fn().mockResolvedValue(null),
lrange: vi.fn().mockResolvedValue([]),
llen: vi.fn().mockResolvedValue(0),
// Set operations
sadd: vi.fn().mockResolvedValue(1),
srem: vi.fn().mockResolvedValue(1),
smembers: vi.fn().mockResolvedValue([]),
sismember: vi.fn().mockResolvedValue(0),
// Pub/Sub
publish: vi.fn().mockResolvedValue(0),
subscribe: vi.fn().mockResolvedValue(undefined),
unsubscribe: vi.fn().mockResolvedValue(undefined),
on: vi.fn(),
// Transaction
multi: vi.fn(() => ({
exec: vi.fn().mockResolvedValue([]),
})),
// Connection
ping: vi.fn().mockResolvedValue('PONG'),
quit: vi.fn().mockResolvedValue('OK'),
disconnect: vi.fn().mockResolvedValue(undefined),
// Status
status: 'ready',
}
}
export type MockRedis = ReturnType<typeof createMockRedis>
/**
* Clears all Redis mock calls.
*/
export function clearRedisMocks(redis: MockRedis) {
Object.values(redis).forEach((value) => {
if (typeof value === 'function' && 'mockClear' in value) {
value.mockClear()
}
})
}