mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-04 19:55:08 -05:00
Compare commits
6 Commits
feat/timeo
...
fix/restor
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0db87778c4 | ||
|
|
8d846c5983 | ||
|
|
362f4c2918 | ||
|
|
c77e351067 | ||
|
|
a627faabe7 | ||
|
|
f811594875 |
@@ -213,6 +213,25 @@ Different subscription plans have different usage limits:
|
||||
| **Team** | $40/seat (pooled, adjustable) | 300 sync, 2,500 async |
|
||||
| **Enterprise** | Custom | Custom |
|
||||
|
||||
## Execution Time Limits
|
||||
|
||||
Workflows have maximum execution time limits based on your subscription plan:
|
||||
|
||||
| Plan | Sync Execution | Async Execution |
|
||||
|------|----------------|-----------------|
|
||||
| **Free** | 5 minutes | 10 minutes |
|
||||
| **Pro** | 50 minutes | 90 minutes |
|
||||
| **Team** | 50 minutes | 90 minutes |
|
||||
| **Enterprise** | 50 minutes | 90 minutes |
|
||||
|
||||
**Sync executions** run immediately and return results directly. These are triggered via the API with `async: false` (default) or through the UI.
|
||||
**Async executions** (triggered via API with `async: true`, webhooks, or schedules) run in the background. Async time limits are up to 2x the sync limit, capped at 90 minutes.
|
||||
|
||||
|
||||
<Callout type="info">
|
||||
If a workflow exceeds its time limit, it will be terminated and marked as failed with a timeout error. Design long-running workflows to use async execution or break them into smaller workflows.
|
||||
</Callout>
|
||||
|
||||
## Billing Model
|
||||
|
||||
Sim uses a **base subscription + overage** billing model:
|
||||
|
||||
@@ -11,7 +11,7 @@ import {
|
||||
Database,
|
||||
DollarSign,
|
||||
HardDrive,
|
||||
Workflow,
|
||||
Timer,
|
||||
} from 'lucide-react'
|
||||
import { useRouter } from 'next/navigation'
|
||||
import { cn } from '@/lib/core/utils/cn'
|
||||
@@ -44,7 +44,7 @@ interface PricingTier {
|
||||
const FREE_PLAN_FEATURES: PricingFeature[] = [
|
||||
{ icon: DollarSign, text: '$20 usage limit' },
|
||||
{ icon: HardDrive, text: '5GB file storage' },
|
||||
{ icon: Workflow, text: 'Public template access' },
|
||||
{ icon: Timer, text: '5 min execution limit' },
|
||||
{ icon: Database, text: 'Limited log retention' },
|
||||
{ icon: Code2, text: 'CLI/SDK Access' },
|
||||
]
|
||||
|
||||
@@ -21,6 +21,7 @@ const UpdateCreatorProfileSchema = z.object({
|
||||
name: z.string().min(1, 'Name is required').max(100, 'Max 100 characters').optional(),
|
||||
profileImageUrl: z.string().optional().or(z.literal('')),
|
||||
details: CreatorProfileDetailsSchema.optional(),
|
||||
verified: z.boolean().optional(), // Verification status (super users only)
|
||||
})
|
||||
|
||||
// Helper to check if user has permission to manage profile
|
||||
@@ -97,11 +98,29 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
|
||||
return NextResponse.json({ error: 'Profile not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
// Check permissions
|
||||
const canEdit = await hasPermission(session.user.id, existing[0])
|
||||
if (!canEdit) {
|
||||
logger.warn(`[${requestId}] User denied permission to update profile: ${id}`)
|
||||
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
|
||||
// Verification changes require super user permission
|
||||
if (data.verified !== undefined) {
|
||||
const { verifyEffectiveSuperUser } = await import('@/lib/templates/permissions')
|
||||
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
|
||||
if (!effectiveSuperUser) {
|
||||
logger.warn(`[${requestId}] Non-super user attempted to change creator verification: ${id}`)
|
||||
return NextResponse.json(
|
||||
{ error: 'Only super users can change verification status' },
|
||||
{ status: 403 }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// For non-verified updates, check regular permissions
|
||||
const hasNonVerifiedUpdates =
|
||||
data.name !== undefined || data.profileImageUrl !== undefined || data.details !== undefined
|
||||
|
||||
if (hasNonVerifiedUpdates) {
|
||||
const canEdit = await hasPermission(session.user.id, existing[0])
|
||||
if (!canEdit) {
|
||||
logger.warn(`[${requestId}] User denied permission to update profile: ${id}`)
|
||||
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
|
||||
}
|
||||
}
|
||||
|
||||
const updateData: any = {
|
||||
@@ -111,6 +130,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
|
||||
if (data.name !== undefined) updateData.name = data.name
|
||||
if (data.profileImageUrl !== undefined) updateData.profileImageUrl = data.profileImageUrl
|
||||
if (data.details !== undefined) updateData.details = data.details
|
||||
if (data.verified !== undefined) updateData.verified = data.verified
|
||||
|
||||
const updated = await db
|
||||
.update(templateCreators)
|
||||
|
||||
@@ -1,113 +0,0 @@
|
||||
import { db } from '@sim/db'
|
||||
import { templateCreators } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { verifyEffectiveSuperUser } from '@/lib/templates/permissions'
|
||||
|
||||
const logger = createLogger('CreatorVerificationAPI')
|
||||
|
||||
export const revalidate = 0
|
||||
|
||||
// POST /api/creators/[id]/verify - Verify a creator (super users only)
|
||||
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
|
||||
const requestId = generateRequestId()
|
||||
const { id } = await params
|
||||
|
||||
try {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
logger.warn(`[${requestId}] Unauthorized verification attempt for creator: ${id}`)
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
// Check if user is a super user
|
||||
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
|
||||
if (!effectiveSuperUser) {
|
||||
logger.warn(`[${requestId}] Non-super user attempted to verify creator: ${id}`)
|
||||
return NextResponse.json({ error: 'Only super users can verify creators' }, { status: 403 })
|
||||
}
|
||||
|
||||
// Check if creator exists
|
||||
const existingCreator = await db
|
||||
.select()
|
||||
.from(templateCreators)
|
||||
.where(eq(templateCreators.id, id))
|
||||
.limit(1)
|
||||
|
||||
if (existingCreator.length === 0) {
|
||||
logger.warn(`[${requestId}] Creator not found for verification: ${id}`)
|
||||
return NextResponse.json({ error: 'Creator not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
// Update creator verified status to true
|
||||
await db
|
||||
.update(templateCreators)
|
||||
.set({ verified: true, updatedAt: new Date() })
|
||||
.where(eq(templateCreators.id, id))
|
||||
|
||||
logger.info(`[${requestId}] Creator verified: ${id} by super user: ${session.user.id}`)
|
||||
|
||||
return NextResponse.json({
|
||||
message: 'Creator verified successfully',
|
||||
creatorId: id,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error verifying creator ${id}`, error)
|
||||
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||
}
|
||||
}
|
||||
|
||||
// DELETE /api/creators/[id]/verify - Unverify a creator (super users only)
|
||||
export async function DELETE(
|
||||
request: NextRequest,
|
||||
{ params }: { params: Promise<{ id: string }> }
|
||||
) {
|
||||
const requestId = generateRequestId()
|
||||
const { id } = await params
|
||||
|
||||
try {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
logger.warn(`[${requestId}] Unauthorized unverification attempt for creator: ${id}`)
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
// Check if user is a super user
|
||||
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
|
||||
if (!effectiveSuperUser) {
|
||||
logger.warn(`[${requestId}] Non-super user attempted to unverify creator: ${id}`)
|
||||
return NextResponse.json({ error: 'Only super users can unverify creators' }, { status: 403 })
|
||||
}
|
||||
|
||||
// Check if creator exists
|
||||
const existingCreator = await db
|
||||
.select()
|
||||
.from(templateCreators)
|
||||
.where(eq(templateCreators.id, id))
|
||||
.limit(1)
|
||||
|
||||
if (existingCreator.length === 0) {
|
||||
logger.warn(`[${requestId}] Creator not found for unverification: ${id}`)
|
||||
return NextResponse.json({ error: 'Creator not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
// Update creator verified status to false
|
||||
await db
|
||||
.update(templateCreators)
|
||||
.set({ verified: false, updatedAt: new Date() })
|
||||
.where(eq(templateCreators.id, id))
|
||||
|
||||
logger.info(`[${requestId}] Creator unverified: ${id} by super user: ${session.user.id}`)
|
||||
|
||||
return NextResponse.json({
|
||||
message: 'Creator unverified successfully',
|
||||
creatorId: id,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error unverifying creator ${id}`, error)
|
||||
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,16 @@
|
||||
import { db } from '@sim/db'
|
||||
import { asyncJobs, db } from '@sim/db'
|
||||
import { workflowExecutionLogs } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq, lt, sql } from 'drizzle-orm'
|
||||
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { verifyCronAuth } from '@/lib/auth/internal'
|
||||
import { JOB_RETENTION_HOURS, JOB_STATUS } from '@/lib/core/async-jobs'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
|
||||
const logger = createLogger('CleanupStaleExecutions')
|
||||
|
||||
const STALE_THRESHOLD_MINUTES = 30
|
||||
const STALE_THRESHOLD_MS = getMaxExecutionTimeout() + 5 * 60 * 1000
|
||||
const STALE_THRESHOLD_MINUTES = Math.ceil(STALE_THRESHOLD_MS / 60000)
|
||||
const MAX_INT32 = 2_147_483_647
|
||||
|
||||
export async function GET(request: NextRequest) {
|
||||
@@ -78,12 +81,102 @@ export async function GET(request: NextRequest) {
|
||||
|
||||
logger.info(`Stale execution cleanup completed. Cleaned: ${cleaned}, Failed: ${failed}`)
|
||||
|
||||
// Clean up stale async jobs (stuck in processing)
|
||||
let asyncJobsMarkedFailed = 0
|
||||
|
||||
try {
|
||||
const staleAsyncJobs = await db
|
||||
.update(asyncJobs)
|
||||
.set({
|
||||
status: JOB_STATUS.FAILED,
|
||||
completedAt: new Date(),
|
||||
error: `Job terminated: stuck in processing for more than ${STALE_THRESHOLD_MINUTES} minutes`,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(
|
||||
and(eq(asyncJobs.status, JOB_STATUS.PROCESSING), lt(asyncJobs.startedAt, staleThreshold))
|
||||
)
|
||||
.returning({ id: asyncJobs.id })
|
||||
|
||||
asyncJobsMarkedFailed = staleAsyncJobs.length
|
||||
if (asyncJobsMarkedFailed > 0) {
|
||||
logger.info(`Marked ${asyncJobsMarkedFailed} stale async jobs as failed`)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to clean up stale async jobs:', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
|
||||
// Clean up stale pending jobs (never started, e.g., due to server crash before startJob())
|
||||
let stalePendingJobsMarkedFailed = 0
|
||||
|
||||
try {
|
||||
const stalePendingJobs = await db
|
||||
.update(asyncJobs)
|
||||
.set({
|
||||
status: JOB_STATUS.FAILED,
|
||||
completedAt: new Date(),
|
||||
error: `Job terminated: stuck in pending state for more than ${STALE_THRESHOLD_MINUTES} minutes (never started)`,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(
|
||||
and(eq(asyncJobs.status, JOB_STATUS.PENDING), lt(asyncJobs.createdAt, staleThreshold))
|
||||
)
|
||||
.returning({ id: asyncJobs.id })
|
||||
|
||||
stalePendingJobsMarkedFailed = stalePendingJobs.length
|
||||
if (stalePendingJobsMarkedFailed > 0) {
|
||||
logger.info(`Marked ${stalePendingJobsMarkedFailed} stale pending jobs as failed`)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to clean up stale pending jobs:', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
|
||||
// Delete completed/failed jobs older than retention period
|
||||
const retentionThreshold = new Date(Date.now() - JOB_RETENTION_HOURS * 60 * 60 * 1000)
|
||||
let asyncJobsDeleted = 0
|
||||
|
||||
try {
|
||||
const deletedJobs = await db
|
||||
.delete(asyncJobs)
|
||||
.where(
|
||||
and(
|
||||
inArray(asyncJobs.status, [JOB_STATUS.COMPLETED, JOB_STATUS.FAILED]),
|
||||
lt(asyncJobs.completedAt, retentionThreshold)
|
||||
)
|
||||
)
|
||||
.returning({ id: asyncJobs.id })
|
||||
|
||||
asyncJobsDeleted = deletedJobs.length
|
||||
if (asyncJobsDeleted > 0) {
|
||||
logger.info(
|
||||
`Deleted ${asyncJobsDeleted} old async jobs (retention: ${JOB_RETENTION_HOURS}h)`
|
||||
)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to delete old async jobs:', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
found: staleExecutions.length,
|
||||
cleaned,
|
||||
failed,
|
||||
thresholdMinutes: STALE_THRESHOLD_MINUTES,
|
||||
executions: {
|
||||
found: staleExecutions.length,
|
||||
cleaned,
|
||||
failed,
|
||||
thresholdMinutes: STALE_THRESHOLD_MINUTES,
|
||||
},
|
||||
asyncJobs: {
|
||||
staleProcessingMarkedFailed: asyncJobsMarkedFailed,
|
||||
stalePendingMarkedFailed: stalePendingJobsMarkedFailed,
|
||||
oldDeleted: asyncJobsDeleted,
|
||||
staleThresholdMinutes: STALE_THRESHOLD_MINUTES,
|
||||
retentionHours: JOB_RETENTION_HOURS,
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Error in stale execution cleanup job:', error)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { runs } from '@trigger.dev/sdk'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { getJobQueue, JOB_STATUS } from '@/lib/core/async-jobs'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { createErrorResponse } from '@/app/api/workflows/utils'
|
||||
|
||||
@@ -15,8 +15,6 @@ export async function GET(
|
||||
const requestId = generateRequestId()
|
||||
|
||||
try {
|
||||
logger.debug(`[${requestId}] Getting status for task: ${taskId}`)
|
||||
|
||||
const authResult = await checkHybridAuth(request, { requireWorkflowId: false })
|
||||
if (!authResult.success || !authResult.userId) {
|
||||
logger.warn(`[${requestId}] Unauthorized task status request`)
|
||||
@@ -25,76 +23,60 @@ export async function GET(
|
||||
|
||||
const authenticatedUserId = authResult.userId
|
||||
|
||||
const run = await runs.retrieve(taskId)
|
||||
const jobQueue = await getJobQueue()
|
||||
const job = await jobQueue.getJob(taskId)
|
||||
|
||||
logger.debug(`[${requestId}] Task ${taskId} status: ${run.status}`)
|
||||
|
||||
const payload = run.payload as any
|
||||
if (payload?.workflowId) {
|
||||
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
|
||||
const accessCheck = await verifyWorkflowAccess(authenticatedUserId, payload.workflowId)
|
||||
if (!accessCheck.hasAccess) {
|
||||
logger.warn(`[${requestId}] User ${authenticatedUserId} denied access to task ${taskId}`, {
|
||||
workflowId: payload.workflowId,
|
||||
})
|
||||
return createErrorResponse('Access denied', 403)
|
||||
}
|
||||
logger.debug(`[${requestId}] User ${authenticatedUserId} has access to task ${taskId}`)
|
||||
} else {
|
||||
if (payload?.userId && payload.userId !== authenticatedUserId) {
|
||||
logger.warn(
|
||||
`[${requestId}] User ${authenticatedUserId} attempted to access task ${taskId} owned by ${payload.userId}`
|
||||
)
|
||||
return createErrorResponse('Access denied', 403)
|
||||
}
|
||||
if (!payload?.userId) {
|
||||
logger.warn(
|
||||
`[${requestId}] Task ${taskId} has no ownership information in payload. Denying access for security.`
|
||||
)
|
||||
return createErrorResponse('Access denied', 403)
|
||||
}
|
||||
if (!job) {
|
||||
return createErrorResponse('Task not found', 404)
|
||||
}
|
||||
|
||||
const statusMap = {
|
||||
QUEUED: 'queued',
|
||||
WAITING_FOR_DEPLOY: 'queued',
|
||||
EXECUTING: 'processing',
|
||||
RESCHEDULED: 'processing',
|
||||
FROZEN: 'processing',
|
||||
COMPLETED: 'completed',
|
||||
CANCELED: 'cancelled',
|
||||
FAILED: 'failed',
|
||||
CRASHED: 'failed',
|
||||
INTERRUPTED: 'failed',
|
||||
SYSTEM_FAILURE: 'failed',
|
||||
EXPIRED: 'failed',
|
||||
} as const
|
||||
if (job.metadata?.workflowId) {
|
||||
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
|
||||
const accessCheck = await verifyWorkflowAccess(
|
||||
authenticatedUserId,
|
||||
job.metadata.workflowId as string
|
||||
)
|
||||
if (!accessCheck.hasAccess) {
|
||||
logger.warn(`[${requestId}] Access denied to workflow ${job.metadata.workflowId}`)
|
||||
return createErrorResponse('Access denied', 403)
|
||||
}
|
||||
} else if (job.metadata?.userId && job.metadata.userId !== authenticatedUserId) {
|
||||
logger.warn(`[${requestId}] Access denied to user ${job.metadata.userId}`)
|
||||
return createErrorResponse('Access denied', 403)
|
||||
} else if (!job.metadata?.userId && !job.metadata?.workflowId) {
|
||||
logger.warn(`[${requestId}] Access denied to job ${taskId}`)
|
||||
return createErrorResponse('Access denied', 403)
|
||||
}
|
||||
|
||||
const mappedStatus = statusMap[run.status as keyof typeof statusMap] || 'unknown'
|
||||
const mappedStatus = job.status === JOB_STATUS.PENDING ? 'queued' : job.status
|
||||
|
||||
const response: any = {
|
||||
success: true,
|
||||
taskId,
|
||||
status: mappedStatus,
|
||||
metadata: {
|
||||
startedAt: run.startedAt,
|
||||
startedAt: job.startedAt,
|
||||
},
|
||||
}
|
||||
|
||||
if (mappedStatus === 'completed') {
|
||||
response.output = run.output // This contains the workflow execution results
|
||||
response.metadata.completedAt = run.finishedAt
|
||||
response.metadata.duration = run.durationMs
|
||||
if (job.status === JOB_STATUS.COMPLETED) {
|
||||
response.output = job.output
|
||||
response.metadata.completedAt = job.completedAt
|
||||
if (job.startedAt && job.completedAt) {
|
||||
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
|
||||
}
|
||||
}
|
||||
|
||||
if (mappedStatus === 'failed') {
|
||||
response.error = run.error
|
||||
response.metadata.completedAt = run.finishedAt
|
||||
response.metadata.duration = run.durationMs
|
||||
if (job.status === JOB_STATUS.FAILED) {
|
||||
response.error = job.error
|
||||
response.metadata.completedAt = job.completedAt
|
||||
if (job.startedAt && job.completedAt) {
|
||||
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
|
||||
}
|
||||
}
|
||||
|
||||
if (mappedStatus === 'processing' || mappedStatus === 'queued') {
|
||||
response.estimatedDuration = 180000 // 3 minutes max from our config
|
||||
if (job.status === JOB_STATUS.PROCESSING || job.status === JOB_STATUS.PENDING) {
|
||||
response.estimatedDuration = 180000
|
||||
}
|
||||
|
||||
return NextResponse.json(response)
|
||||
|
||||
@@ -21,6 +21,7 @@ import { and, eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { generateInternalToken } from '@/lib/auth/internal'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
|
||||
const logger = createLogger('WorkflowMcpServeAPI')
|
||||
@@ -264,7 +265,7 @@ async function handleToolsCall(
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ input: params.arguments || {}, triggerType: 'mcp' }),
|
||||
signal: AbortSignal.timeout(600000), // 10 minute timeout
|
||||
signal: AbortSignal.timeout(getMaxExecutionTimeout()),
|
||||
})
|
||||
|
||||
const executeResult = await response.json()
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
|
||||
import { getExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
|
||||
import { mcpService } from '@/lib/mcp/service'
|
||||
import type { McpTool, McpToolCall, McpToolResult } from '@/lib/mcp/types'
|
||||
@@ -7,7 +10,6 @@ import {
|
||||
categorizeError,
|
||||
createMcpErrorResponse,
|
||||
createMcpSuccessResponse,
|
||||
MCP_CONSTANTS,
|
||||
validateStringParam,
|
||||
} from '@/lib/mcp/utils'
|
||||
|
||||
@@ -171,13 +173,16 @@ export const POST = withMcpAuth('read')(
|
||||
arguments: args,
|
||||
}
|
||||
|
||||
const userSubscription = await getHighestPrioritySubscription(userId)
|
||||
const executionTimeout = getExecutionTimeout(
|
||||
userSubscription?.plan as SubscriptionPlan | undefined,
|
||||
'sync'
|
||||
)
|
||||
|
||||
const result = await Promise.race([
|
||||
mcpService.executeTool(userId, serverId, toolCall, workspaceId),
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(
|
||||
() => reject(new Error('Tool execution timeout')),
|
||||
MCP_CONSTANTS.EXECUTION_TIMEOUT
|
||||
)
|
||||
setTimeout(() => reject(new Error('Tool execution timeout')), executionTimeout)
|
||||
),
|
||||
])
|
||||
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
import { db, workflowDeploymentVersion, workflowSchedule } from '@sim/db'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { tasks } from '@trigger.dev/sdk'
|
||||
import { and, eq, isNull, lt, lte, not, or, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { verifyCronAuth } from '@/lib/auth/internal'
|
||||
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { executeScheduleJob } from '@/background/schedule-execution'
|
||||
|
||||
@@ -55,72 +54,67 @@ export async function GET(request: NextRequest) {
|
||||
logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`)
|
||||
logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`)
|
||||
|
||||
if (isTriggerDevEnabled) {
|
||||
const triggerPromises = dueSchedules.map(async (schedule) => {
|
||||
const queueTime = schedule.lastQueuedAt ?? queuedAt
|
||||
const jobQueue = await getJobQueue()
|
||||
|
||||
try {
|
||||
const payload = {
|
||||
scheduleId: schedule.id,
|
||||
workflowId: schedule.workflowId,
|
||||
blockId: schedule.blockId || undefined,
|
||||
cronExpression: schedule.cronExpression || undefined,
|
||||
lastRanAt: schedule.lastRanAt?.toISOString(),
|
||||
failedCount: schedule.failedCount || 0,
|
||||
now: queueTime.toISOString(),
|
||||
scheduledFor: schedule.nextRunAt?.toISOString(),
|
||||
}
|
||||
const queuePromises = dueSchedules.map(async (schedule) => {
|
||||
const queueTime = schedule.lastQueuedAt ?? queuedAt
|
||||
|
||||
const handle = await tasks.trigger('schedule-execution', payload)
|
||||
logger.info(
|
||||
`[${requestId}] Queued schedule execution task ${handle.id} for workflow ${schedule.workflowId}`
|
||||
)
|
||||
return handle
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`[${requestId}] Failed to trigger schedule execution for workflow ${schedule.workflowId}`,
|
||||
error
|
||||
)
|
||||
return null
|
||||
}
|
||||
})
|
||||
const payload = {
|
||||
scheduleId: schedule.id,
|
||||
workflowId: schedule.workflowId,
|
||||
blockId: schedule.blockId || undefined,
|
||||
cronExpression: schedule.cronExpression || undefined,
|
||||
lastRanAt: schedule.lastRanAt?.toISOString(),
|
||||
failedCount: schedule.failedCount || 0,
|
||||
now: queueTime.toISOString(),
|
||||
scheduledFor: schedule.nextRunAt?.toISOString(),
|
||||
}
|
||||
|
||||
await Promise.allSettled(triggerPromises)
|
||||
|
||||
logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions to Trigger.dev`)
|
||||
} else {
|
||||
const directExecutionPromises = dueSchedules.map(async (schedule) => {
|
||||
const queueTime = schedule.lastQueuedAt ?? queuedAt
|
||||
|
||||
const payload = {
|
||||
scheduleId: schedule.id,
|
||||
workflowId: schedule.workflowId,
|
||||
blockId: schedule.blockId || undefined,
|
||||
cronExpression: schedule.cronExpression || undefined,
|
||||
lastRanAt: schedule.lastRanAt?.toISOString(),
|
||||
failedCount: schedule.failedCount || 0,
|
||||
now: queueTime.toISOString(),
|
||||
scheduledFor: schedule.nextRunAt?.toISOString(),
|
||||
}
|
||||
|
||||
void executeScheduleJob(payload).catch((error) => {
|
||||
logger.error(
|
||||
`[${requestId}] Direct schedule execution failed for workflow ${schedule.workflowId}`,
|
||||
error
|
||||
)
|
||||
try {
|
||||
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
|
||||
metadata: { workflowId: schedule.workflowId },
|
||||
})
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Queued direct schedule execution for workflow ${schedule.workflowId} (Trigger.dev disabled)`
|
||||
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`
|
||||
)
|
||||
})
|
||||
|
||||
await Promise.allSettled(directExecutionPromises)
|
||||
if (shouldExecuteInline()) {
|
||||
void (async () => {
|
||||
try {
|
||||
await jobQueue.startJob(jobId)
|
||||
const output = await executeScheduleJob(payload)
|
||||
await jobQueue.completeJob(jobId, output)
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||
logger.error(
|
||||
`[${requestId}] Schedule execution failed for workflow ${schedule.workflowId}`,
|
||||
{ jobId, error: errorMessage }
|
||||
)
|
||||
try {
|
||||
await jobQueue.markJobFailed(jobId, errorMessage)
|
||||
} catch (markFailedError) {
|
||||
logger.error(`[${requestId}] Failed to mark job as failed`, {
|
||||
jobId,
|
||||
error:
|
||||
markFailedError instanceof Error
|
||||
? markFailedError.message
|
||||
: String(markFailedError),
|
||||
})
|
||||
}
|
||||
}
|
||||
})()
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`[${requestId}] Failed to queue schedule execution for workflow ${schedule.workflowId}`,
|
||||
error
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Queued ${dueSchedules.length} direct schedule executions (Trigger.dev disabled)`
|
||||
)
|
||||
}
|
||||
await Promise.allSettled(queuePromises)
|
||||
|
||||
logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions`)
|
||||
|
||||
return NextResponse.json({
|
||||
message: 'Scheduled workflow executions processed',
|
||||
|
||||
@@ -1,101 +0,0 @@
|
||||
import { db } from '@sim/db'
|
||||
import { templates } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { verifyEffectiveSuperUser } from '@/lib/templates/permissions'
|
||||
|
||||
const logger = createLogger('TemplateApprovalAPI')
|
||||
|
||||
export const revalidate = 0
|
||||
|
||||
/**
|
||||
* POST /api/templates/[id]/approve - Approve a template (super users only)
|
||||
*/
|
||||
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
|
||||
const requestId = generateRequestId()
|
||||
const { id } = await params
|
||||
|
||||
try {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
logger.warn(`[${requestId}] Unauthorized template approval attempt for ID: ${id}`)
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
|
||||
if (!effectiveSuperUser) {
|
||||
logger.warn(`[${requestId}] Non-super user attempted to approve template: ${id}`)
|
||||
return NextResponse.json({ error: 'Only super users can approve templates' }, { status: 403 })
|
||||
}
|
||||
|
||||
const existingTemplate = await db.select().from(templates).where(eq(templates.id, id)).limit(1)
|
||||
if (existingTemplate.length === 0) {
|
||||
logger.warn(`[${requestId}] Template not found for approval: ${id}`)
|
||||
return NextResponse.json({ error: 'Template not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
await db
|
||||
.update(templates)
|
||||
.set({ status: 'approved', updatedAt: new Date() })
|
||||
.where(eq(templates.id, id))
|
||||
|
||||
logger.info(`[${requestId}] Template approved: ${id} by super user: ${session.user.id}`)
|
||||
|
||||
return NextResponse.json({
|
||||
message: 'Template approved successfully',
|
||||
templateId: id,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error approving template ${id}`, error)
|
||||
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* DELETE /api/templates/[id]/approve - Unapprove a template (super users only)
|
||||
*/
|
||||
export async function DELETE(
|
||||
_request: NextRequest,
|
||||
{ params }: { params: Promise<{ id: string }> }
|
||||
) {
|
||||
const requestId = generateRequestId()
|
||||
const { id } = await params
|
||||
|
||||
try {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
logger.warn(`[${requestId}] Unauthorized template rejection attempt for ID: ${id}`)
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
|
||||
if (!effectiveSuperUser) {
|
||||
logger.warn(`[${requestId}] Non-super user attempted to reject template: ${id}`)
|
||||
return NextResponse.json({ error: 'Only super users can reject templates' }, { status: 403 })
|
||||
}
|
||||
|
||||
const existingTemplate = await db.select().from(templates).where(eq(templates.id, id)).limit(1)
|
||||
if (existingTemplate.length === 0) {
|
||||
logger.warn(`[${requestId}] Template not found for rejection: ${id}`)
|
||||
return NextResponse.json({ error: 'Template not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
await db
|
||||
.update(templates)
|
||||
.set({ status: 'rejected', updatedAt: new Date() })
|
||||
.where(eq(templates.id, id))
|
||||
|
||||
logger.info(`[${requestId}] Template rejected: ${id} by super user: ${session.user.id}`)
|
||||
|
||||
return NextResponse.json({
|
||||
message: 'Template rejected successfully',
|
||||
templateId: id,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error rejecting template ${id}`, error)
|
||||
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||
}
|
||||
}
|
||||
@@ -1,55 +0,0 @@
|
||||
import { db } from '@sim/db'
|
||||
import { templates } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { verifyEffectiveSuperUser } from '@/lib/templates/permissions'
|
||||
|
||||
const logger = createLogger('TemplateRejectionAPI')
|
||||
|
||||
export const revalidate = 0
|
||||
|
||||
/**
|
||||
* POST /api/templates/[id]/reject - Reject a template (super users only)
|
||||
*/
|
||||
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
|
||||
const requestId = generateRequestId()
|
||||
const { id } = await params
|
||||
|
||||
try {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
logger.warn(`[${requestId}] Unauthorized template rejection attempt for ID: ${id}`)
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
|
||||
if (!effectiveSuperUser) {
|
||||
logger.warn(`[${requestId}] Non-super user attempted to reject template: ${id}`)
|
||||
return NextResponse.json({ error: 'Only super users can reject templates' }, { status: 403 })
|
||||
}
|
||||
|
||||
const existingTemplate = await db.select().from(templates).where(eq(templates.id, id)).limit(1)
|
||||
if (existingTemplate.length === 0) {
|
||||
logger.warn(`[${requestId}] Template not found for rejection: ${id}`)
|
||||
return NextResponse.json({ error: 'Template not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
await db
|
||||
.update(templates)
|
||||
.set({ status: 'rejected', updatedAt: new Date() })
|
||||
.where(eq(templates.id, id))
|
||||
|
||||
logger.info(`[${requestId}] Template rejected: ${id} by super user: ${session.user.id}`)
|
||||
|
||||
return NextResponse.json({
|
||||
message: 'Template rejected successfully',
|
||||
templateId: id,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Error rejecting template ${id}`, error)
|
||||
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||
}
|
||||
}
|
||||
@@ -106,6 +106,7 @@ const updateTemplateSchema = z.object({
|
||||
creatorId: z.string().optional(), // Creator profile ID
|
||||
tags: z.array(z.string()).max(10, 'Maximum 10 tags allowed').optional(),
|
||||
updateState: z.boolean().optional(), // Explicitly request state update from current workflow
|
||||
status: z.enum(['approved', 'rejected', 'pending']).optional(), // Status change (super users only)
|
||||
})
|
||||
|
||||
// PUT /api/templates/[id] - Update a template
|
||||
@@ -131,7 +132,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
|
||||
)
|
||||
}
|
||||
|
||||
const { name, details, creatorId, tags, updateState } = validationResult.data
|
||||
const { name, details, creatorId, tags, updateState, status } = validationResult.data
|
||||
|
||||
const existingTemplate = await db.select().from(templates).where(eq(templates.id, id)).limit(1)
|
||||
|
||||
@@ -142,21 +143,44 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
|
||||
|
||||
const template = existingTemplate[0]
|
||||
|
||||
if (!template.creatorId) {
|
||||
logger.warn(`[${requestId}] Template ${id} has no creator, denying update`)
|
||||
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
|
||||
// Status changes require super user permission
|
||||
if (status !== undefined) {
|
||||
const { verifyEffectiveSuperUser } = await import('@/lib/templates/permissions')
|
||||
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
|
||||
if (!effectiveSuperUser) {
|
||||
logger.warn(`[${requestId}] Non-super user attempted to change template status: ${id}`)
|
||||
return NextResponse.json(
|
||||
{ error: 'Only super users can change template status' },
|
||||
{ status: 403 }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
const { verifyCreatorPermission } = await import('@/lib/templates/permissions')
|
||||
const { hasPermission, error: permissionError } = await verifyCreatorPermission(
|
||||
session.user.id,
|
||||
template.creatorId,
|
||||
'admin'
|
||||
)
|
||||
// For non-status updates, verify creator permission
|
||||
const hasNonStatusUpdates =
|
||||
name !== undefined ||
|
||||
details !== undefined ||
|
||||
creatorId !== undefined ||
|
||||
tags !== undefined ||
|
||||
updateState
|
||||
|
||||
if (!hasPermission) {
|
||||
logger.warn(`[${requestId}] User denied permission to update template ${id}`)
|
||||
return NextResponse.json({ error: permissionError || 'Access denied' }, { status: 403 })
|
||||
if (hasNonStatusUpdates) {
|
||||
if (!template.creatorId) {
|
||||
logger.warn(`[${requestId}] Template ${id} has no creator, denying update`)
|
||||
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
|
||||
}
|
||||
|
||||
const { verifyCreatorPermission } = await import('@/lib/templates/permissions')
|
||||
const { hasPermission, error: permissionError } = await verifyCreatorPermission(
|
||||
session.user.id,
|
||||
template.creatorId,
|
||||
'admin'
|
||||
)
|
||||
|
||||
if (!hasPermission) {
|
||||
logger.warn(`[${requestId}] User denied permission to update template ${id}`)
|
||||
return NextResponse.json({ error: permissionError || 'Access denied' }, { status: 403 })
|
||||
}
|
||||
}
|
||||
|
||||
const updateData: any = {
|
||||
@@ -167,6 +191,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
|
||||
if (details !== undefined) updateData.details = details
|
||||
if (tags !== undefined) updateData.tags = tags
|
||||
if (creatorId !== undefined) updateData.creatorId = creatorId
|
||||
if (status !== undefined) updateData.status = status
|
||||
|
||||
if (updateState && template.workflowId) {
|
||||
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
|
||||
|
||||
@@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { httpHeaderSafeJson } from '@/lib/core/utils/validation'
|
||||
import { FileInputSchema } from '@/lib/uploads/utils/file-schemas'
|
||||
import { processFilesToUserFiles, type RawFileInput } from '@/lib/uploads/utils/file-utils'
|
||||
import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server'
|
||||
@@ -11,16 +12,6 @@ export const dynamic = 'force-dynamic'
|
||||
|
||||
const logger = createLogger('DropboxUploadAPI')
|
||||
|
||||
/**
|
||||
* Escapes non-ASCII characters in JSON string for HTTP header safety.
|
||||
* Dropbox API requires characters 0x7F and all non-ASCII to be escaped as \uXXXX.
|
||||
*/
|
||||
function httpHeaderSafeJson(value: object): string {
|
||||
return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => {
|
||||
return '\\u' + ('0000' + c.charCodeAt(0).toString(16)).slice(-4)
|
||||
})
|
||||
}
|
||||
|
||||
const DropboxUploadSchema = z.object({
|
||||
accessToken: z.string().min(1, 'Access token is required'),
|
||||
path: z.string().min(1, 'Destination path is required'),
|
||||
|
||||
@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { extractAudioFromVideo, isVideoFile } from '@/lib/audio/extractor'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
secureFetchWithPinnedIP,
|
||||
validateUrlWithDNS,
|
||||
@@ -636,7 +637,8 @@ async function transcribeWithAssemblyAI(
|
||||
|
||||
let transcript: any
|
||||
let attempts = 0
|
||||
const maxAttempts = 60 // 5 minutes with 5-second intervals
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(DEFAULT_EXECUTION_TIMEOUT_MS / pollIntervalMs)
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
const statusResponse = await fetch(`https://api.assemblyai.com/v2/transcript/${id}`, {
|
||||
|
||||
@@ -3,6 +3,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import { validateAwsRegion, validateS3BucketName } from '@/lib/core/security/input-validation'
|
||||
import {
|
||||
secureFetchWithPinnedIP,
|
||||
@@ -226,8 +227,8 @@ async function pollForJobCompletion(
|
||||
useAnalyzeDocument: boolean,
|
||||
requestId: string
|
||||
): Promise<Record<string, unknown>> {
|
||||
const pollIntervalMs = 5000 // 5 seconds between polls
|
||||
const maxPollTimeMs = 180000 // 3 minutes maximum polling time
|
||||
const pollIntervalMs = 5000
|
||||
const maxPollTimeMs = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const maxAttempts = Math.ceil(maxPollTimeMs / pollIntervalMs)
|
||||
|
||||
const getTarget = useAnalyzeDocument
|
||||
|
||||
@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { NextResponse } from 'next/server'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import { validateAlphanumericId } from '@/lib/core/security/input-validation'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { StorageService } from '@/lib/uploads'
|
||||
@@ -60,7 +61,7 @@ export async function POST(request: NextRequest) {
|
||||
text,
|
||||
model_id: modelId,
|
||||
}),
|
||||
signal: AbortSignal.timeout(60000),
|
||||
signal: AbortSignal.timeout(DEFAULT_EXECUTION_TIMEOUT_MS),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { downloadFileFromStorage } from '@/lib/uploads/utils/file-utils.server'
|
||||
import type { UserFile } from '@/executor/types'
|
||||
import type { VideoRequestBody } from '@/tools/video/types'
|
||||
@@ -326,11 +327,12 @@ async function generateWithRunway(
|
||||
|
||||
logger.info(`[${requestId}] Runway task created: ${taskId}`)
|
||||
|
||||
const maxAttempts = 120 // 10 minutes with 5-second intervals
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||
let attempts = 0
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
await sleep(5000) // Poll every 5 seconds
|
||||
await sleep(pollIntervalMs)
|
||||
|
||||
const statusResponse = await fetch(`https://api.dev.runwayml.com/v1/tasks/${taskId}`, {
|
||||
headers: {
|
||||
@@ -370,7 +372,7 @@ async function generateWithRunway(
|
||||
attempts++
|
||||
}
|
||||
|
||||
throw new Error('Runway generation timed out after 10 minutes')
|
||||
throw new Error('Runway generation timed out')
|
||||
}
|
||||
|
||||
async function generateWithVeo(
|
||||
@@ -429,11 +431,12 @@ async function generateWithVeo(
|
||||
|
||||
logger.info(`[${requestId}] Veo operation created: ${operationName}`)
|
||||
|
||||
const maxAttempts = 60 // 5 minutes with 5-second intervals
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||
let attempts = 0
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
await sleep(5000)
|
||||
await sleep(pollIntervalMs)
|
||||
|
||||
const statusResponse = await fetch(
|
||||
`https://generativelanguage.googleapis.com/v1beta/${operationName}`,
|
||||
@@ -485,7 +488,7 @@ async function generateWithVeo(
|
||||
attempts++
|
||||
}
|
||||
|
||||
throw new Error('Veo generation timed out after 5 minutes')
|
||||
throw new Error('Veo generation timed out')
|
||||
}
|
||||
|
||||
async function generateWithLuma(
|
||||
@@ -541,11 +544,12 @@ async function generateWithLuma(
|
||||
|
||||
logger.info(`[${requestId}] Luma generation created: ${generationId}`)
|
||||
|
||||
const maxAttempts = 120 // 10 minutes
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||
let attempts = 0
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
await sleep(5000)
|
||||
await sleep(pollIntervalMs)
|
||||
|
||||
const statusResponse = await fetch(
|
||||
`https://api.lumalabs.ai/dream-machine/v1/generations/${generationId}`,
|
||||
@@ -592,7 +596,7 @@ async function generateWithLuma(
|
||||
attempts++
|
||||
}
|
||||
|
||||
throw new Error('Luma generation timed out after 10 minutes')
|
||||
throw new Error('Luma generation timed out')
|
||||
}
|
||||
|
||||
async function generateWithMiniMax(
|
||||
@@ -658,14 +662,13 @@ async function generateWithMiniMax(
|
||||
|
||||
logger.info(`[${requestId}] MiniMax task created: ${taskId}`)
|
||||
|
||||
// Poll for completion (6-10 minutes typical)
|
||||
const maxAttempts = 120 // 10 minutes with 5-second intervals
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||
let attempts = 0
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
await sleep(5000)
|
||||
await sleep(pollIntervalMs)
|
||||
|
||||
// Query task status
|
||||
const statusResponse = await fetch(
|
||||
`https://api.minimax.io/v1/query/video_generation?task_id=${taskId}`,
|
||||
{
|
||||
@@ -743,7 +746,7 @@ async function generateWithMiniMax(
|
||||
attempts++
|
||||
}
|
||||
|
||||
throw new Error('MiniMax generation timed out after 10 minutes')
|
||||
throw new Error('MiniMax generation timed out')
|
||||
}
|
||||
|
||||
// Helper function to strip subpaths from Fal.ai model IDs for status/result endpoints
|
||||
@@ -861,11 +864,12 @@ async function generateWithFalAI(
|
||||
// Get base model ID (without subpath) for status and result endpoints
|
||||
const baseModelId = getBaseModelId(falModelId)
|
||||
|
||||
const maxAttempts = 96 // 8 minutes with 5-second intervals
|
||||
const pollIntervalMs = 5000
|
||||
const maxAttempts = Math.ceil(getMaxExecutionTimeout() / pollIntervalMs)
|
||||
let attempts = 0
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
await sleep(5000)
|
||||
await sleep(pollIntervalMs)
|
||||
|
||||
const statusResponse = await fetch(
|
||||
`https://queue.fal.run/${baseModelId}/requests/${requestIdFal}/status`,
|
||||
@@ -938,7 +942,7 @@ async function generateWithFalAI(
|
||||
attempts++
|
||||
}
|
||||
|
||||
throw new Error('Fal.ai generation timed out after 8 minutes')
|
||||
throw new Error('Fal.ai generation timed out')
|
||||
}
|
||||
|
||||
function getVideoDimensions(
|
||||
|
||||
@@ -1,190 +0,0 @@
|
||||
import { db, workflowDeploymentVersion } from '@sim/db'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
|
||||
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
|
||||
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
|
||||
import {
|
||||
cleanupDeploymentVersion,
|
||||
createSchedulesForDeploy,
|
||||
validateWorkflowSchedules,
|
||||
} from '@/lib/workflows/schedules'
|
||||
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
|
||||
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
|
||||
import type { BlockState } from '@/stores/workflows/workflow/types'
|
||||
|
||||
const logger = createLogger('WorkflowActivateDeploymentAPI')
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
export const runtime = 'nodejs'
|
||||
|
||||
export async function POST(
|
||||
request: NextRequest,
|
||||
{ params }: { params: Promise<{ id: string; version: string }> }
|
||||
) {
|
||||
const requestId = generateRequestId()
|
||||
const { id, version } = await params
|
||||
|
||||
try {
|
||||
const {
|
||||
error,
|
||||
session,
|
||||
workflow: workflowData,
|
||||
} = await validateWorkflowPermissions(id, requestId, 'admin')
|
||||
if (error) {
|
||||
return createErrorResponse(error.message, error.status)
|
||||
}
|
||||
|
||||
const actorUserId = session?.user?.id
|
||||
if (!actorUserId) {
|
||||
logger.warn(`[${requestId}] Unable to resolve actor user for deployment activation: ${id}`)
|
||||
return createErrorResponse('Unable to determine activating user', 400)
|
||||
}
|
||||
|
||||
const versionNum = Number(version)
|
||||
if (!Number.isFinite(versionNum)) {
|
||||
return createErrorResponse('Invalid version number', 400)
|
||||
}
|
||||
|
||||
const [versionRow] = await db
|
||||
.select({
|
||||
id: workflowDeploymentVersion.id,
|
||||
state: workflowDeploymentVersion.state,
|
||||
})
|
||||
.from(workflowDeploymentVersion)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowDeploymentVersion.workflowId, id),
|
||||
eq(workflowDeploymentVersion.version, versionNum)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
if (!versionRow?.state) {
|
||||
return createErrorResponse('Deployment version not found', 404)
|
||||
}
|
||||
|
||||
const [currentActiveVersion] = await db
|
||||
.select({ id: workflowDeploymentVersion.id })
|
||||
.from(workflowDeploymentVersion)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowDeploymentVersion.workflowId, id),
|
||||
eq(workflowDeploymentVersion.isActive, true)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
const previousVersionId = currentActiveVersion?.id
|
||||
|
||||
const deployedState = versionRow.state as { blocks?: Record<string, BlockState> }
|
||||
const blocks = deployedState.blocks
|
||||
if (!blocks || typeof blocks !== 'object') {
|
||||
return createErrorResponse('Invalid deployed state structure', 500)
|
||||
}
|
||||
|
||||
const scheduleValidation = validateWorkflowSchedules(blocks)
|
||||
if (!scheduleValidation.isValid) {
|
||||
return createErrorResponse(`Invalid schedule configuration: ${scheduleValidation.error}`, 400)
|
||||
}
|
||||
|
||||
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
|
||||
request,
|
||||
workflowId: id,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
userId: actorUserId,
|
||||
blocks,
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
previousVersionId,
|
||||
forceRecreateSubscriptions: true,
|
||||
})
|
||||
|
||||
if (!triggerSaveResult.success) {
|
||||
return createErrorResponse(
|
||||
triggerSaveResult.error?.message || 'Failed to sync trigger configuration',
|
||||
triggerSaveResult.error?.status || 500
|
||||
)
|
||||
}
|
||||
|
||||
const scheduleResult = await createSchedulesForDeploy(id, blocks, db, versionRow.id)
|
||||
|
||||
if (!scheduleResult.success) {
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId: id,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
})
|
||||
if (previousVersionId) {
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
userId: actorUserId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
return createErrorResponse(scheduleResult.error || 'Failed to sync schedules', 500)
|
||||
}
|
||||
|
||||
const result = await activateWorkflowVersion({ workflowId: id, version: versionNum })
|
||||
if (!result.success) {
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId: id,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
})
|
||||
if (previousVersionId) {
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
userId: actorUserId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
return createErrorResponse(result.error || 'Failed to activate deployment', 400)
|
||||
}
|
||||
|
||||
if (previousVersionId && previousVersionId !== versionRow.id) {
|
||||
try {
|
||||
logger.info(
|
||||
`[${requestId}] Cleaning up previous version ${previousVersionId} webhooks/schedules`
|
||||
)
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId: id,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
requestId,
|
||||
deploymentVersionId: previousVersionId,
|
||||
skipExternalCleanup: true,
|
||||
})
|
||||
logger.info(`[${requestId}] Previous version cleanup completed`)
|
||||
} catch (cleanupError) {
|
||||
logger.error(
|
||||
`[${requestId}] Failed to clean up previous version ${previousVersionId}`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
await syncMcpToolsForWorkflow({
|
||||
workflowId: id,
|
||||
requestId,
|
||||
state: versionRow.state,
|
||||
context: 'activate',
|
||||
})
|
||||
|
||||
return createSuccessResponse({
|
||||
success: true,
|
||||
deployedAt: result.deployedAt,
|
||||
warnings: triggerSaveResult.warnings,
|
||||
})
|
||||
} catch (error: any) {
|
||||
logger.error(`[${requestId}] Error activating deployment for workflow: ${id}`, error)
|
||||
return createErrorResponse(error.message || 'Failed to activate deployment', 500)
|
||||
}
|
||||
}
|
||||
@@ -4,8 +4,17 @@ import { and, eq } from 'drizzle-orm'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { syncMcpToolsForWorkflow } from '@/lib/mcp/workflow-mcp-sync'
|
||||
import { restorePreviousVersionWebhooks, saveTriggerWebhooksForDeploy } from '@/lib/webhooks/deploy'
|
||||
import { activateWorkflowVersion } from '@/lib/workflows/persistence/utils'
|
||||
import {
|
||||
cleanupDeploymentVersion,
|
||||
createSchedulesForDeploy,
|
||||
validateWorkflowSchedules,
|
||||
} from '@/lib/workflows/schedules'
|
||||
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
|
||||
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
|
||||
import type { BlockState } from '@/stores/workflows/workflow/types'
|
||||
|
||||
const logger = createLogger('WorkflowDeploymentVersionAPI')
|
||||
|
||||
@@ -23,10 +32,14 @@ const patchBodySchema = z
|
||||
.max(500, 'Description must be 500 characters or less')
|
||||
.nullable()
|
||||
.optional(),
|
||||
isActive: z.literal(true).optional(), // Set to true to activate this version
|
||||
})
|
||||
.refine((data) => data.name !== undefined || data.description !== undefined, {
|
||||
message: 'At least one of name or description must be provided',
|
||||
})
|
||||
.refine(
|
||||
(data) => data.name !== undefined || data.description !== undefined || data.isActive === true,
|
||||
{
|
||||
message: 'At least one of name, description, or isActive must be provided',
|
||||
}
|
||||
)
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
export const runtime = 'nodejs'
|
||||
@@ -82,7 +95,22 @@ export async function PATCH(
|
||||
const { id, version } = await params
|
||||
|
||||
try {
|
||||
const { error } = await validateWorkflowPermissions(id, requestId, 'write')
|
||||
const body = await request.json()
|
||||
const validation = patchBodySchema.safeParse(body)
|
||||
|
||||
if (!validation.success) {
|
||||
return createErrorResponse(validation.error.errors[0]?.message || 'Invalid request body', 400)
|
||||
}
|
||||
|
||||
const { name, description, isActive } = validation.data
|
||||
|
||||
// Activation requires admin permission, other updates require write
|
||||
const requiredPermission = isActive ? 'admin' : 'write'
|
||||
const {
|
||||
error,
|
||||
session,
|
||||
workflow: workflowData,
|
||||
} = await validateWorkflowPermissions(id, requestId, requiredPermission)
|
||||
if (error) {
|
||||
return createErrorResponse(error.message, error.status)
|
||||
}
|
||||
@@ -92,15 +120,193 @@ export async function PATCH(
|
||||
return createErrorResponse('Invalid version', 400)
|
||||
}
|
||||
|
||||
const body = await request.json()
|
||||
const validation = patchBodySchema.safeParse(body)
|
||||
// Handle activation
|
||||
if (isActive) {
|
||||
const actorUserId = session?.user?.id
|
||||
if (!actorUserId) {
|
||||
logger.warn(`[${requestId}] Unable to resolve actor user for deployment activation: ${id}`)
|
||||
return createErrorResponse('Unable to determine activating user', 400)
|
||||
}
|
||||
|
||||
if (!validation.success) {
|
||||
return createErrorResponse(validation.error.errors[0]?.message || 'Invalid request body', 400)
|
||||
const [versionRow] = await db
|
||||
.select({
|
||||
id: workflowDeploymentVersion.id,
|
||||
state: workflowDeploymentVersion.state,
|
||||
})
|
||||
.from(workflowDeploymentVersion)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowDeploymentVersion.workflowId, id),
|
||||
eq(workflowDeploymentVersion.version, versionNum)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
if (!versionRow?.state) {
|
||||
return createErrorResponse('Deployment version not found', 404)
|
||||
}
|
||||
|
||||
const [currentActiveVersion] = await db
|
||||
.select({ id: workflowDeploymentVersion.id })
|
||||
.from(workflowDeploymentVersion)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowDeploymentVersion.workflowId, id),
|
||||
eq(workflowDeploymentVersion.isActive, true)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
const previousVersionId = currentActiveVersion?.id
|
||||
|
||||
const deployedState = versionRow.state as { blocks?: Record<string, BlockState> }
|
||||
const blocks = deployedState.blocks
|
||||
if (!blocks || typeof blocks !== 'object') {
|
||||
return createErrorResponse('Invalid deployed state structure', 500)
|
||||
}
|
||||
|
||||
const scheduleValidation = validateWorkflowSchedules(blocks)
|
||||
if (!scheduleValidation.isValid) {
|
||||
return createErrorResponse(
|
||||
`Invalid schedule configuration: ${scheduleValidation.error}`,
|
||||
400
|
||||
)
|
||||
}
|
||||
|
||||
const triggerSaveResult = await saveTriggerWebhooksForDeploy({
|
||||
request,
|
||||
workflowId: id,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
userId: actorUserId,
|
||||
blocks,
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
previousVersionId,
|
||||
forceRecreateSubscriptions: true,
|
||||
})
|
||||
|
||||
if (!triggerSaveResult.success) {
|
||||
return createErrorResponse(
|
||||
triggerSaveResult.error?.message || 'Failed to sync trigger configuration',
|
||||
triggerSaveResult.error?.status || 500
|
||||
)
|
||||
}
|
||||
|
||||
const scheduleResult = await createSchedulesForDeploy(id, blocks, db, versionRow.id)
|
||||
|
||||
if (!scheduleResult.success) {
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId: id,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
})
|
||||
if (previousVersionId) {
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
userId: actorUserId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
return createErrorResponse(scheduleResult.error || 'Failed to sync schedules', 500)
|
||||
}
|
||||
|
||||
const result = await activateWorkflowVersion({ workflowId: id, version: versionNum })
|
||||
if (!result.success) {
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId: id,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
requestId,
|
||||
deploymentVersionId: versionRow.id,
|
||||
})
|
||||
if (previousVersionId) {
|
||||
await restorePreviousVersionWebhooks({
|
||||
request,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
userId: actorUserId,
|
||||
previousVersionId,
|
||||
requestId,
|
||||
})
|
||||
}
|
||||
return createErrorResponse(result.error || 'Failed to activate deployment', 400)
|
||||
}
|
||||
|
||||
if (previousVersionId && previousVersionId !== versionRow.id) {
|
||||
try {
|
||||
logger.info(
|
||||
`[${requestId}] Cleaning up previous version ${previousVersionId} webhooks/schedules`
|
||||
)
|
||||
await cleanupDeploymentVersion({
|
||||
workflowId: id,
|
||||
workflow: workflowData as Record<string, unknown>,
|
||||
requestId,
|
||||
deploymentVersionId: previousVersionId,
|
||||
skipExternalCleanup: true,
|
||||
})
|
||||
logger.info(`[${requestId}] Previous version cleanup completed`)
|
||||
} catch (cleanupError) {
|
||||
logger.error(
|
||||
`[${requestId}] Failed to clean up previous version ${previousVersionId}`,
|
||||
cleanupError
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
await syncMcpToolsForWorkflow({
|
||||
workflowId: id,
|
||||
requestId,
|
||||
state: versionRow.state,
|
||||
context: 'activate',
|
||||
})
|
||||
|
||||
// Apply name/description updates if provided alongside activation
|
||||
let updatedName: string | null | undefined
|
||||
let updatedDescription: string | null | undefined
|
||||
if (name !== undefined || description !== undefined) {
|
||||
const activationUpdateData: { name?: string; description?: string | null } = {}
|
||||
if (name !== undefined) {
|
||||
activationUpdateData.name = name
|
||||
}
|
||||
if (description !== undefined) {
|
||||
activationUpdateData.description = description
|
||||
}
|
||||
|
||||
const [updated] = await db
|
||||
.update(workflowDeploymentVersion)
|
||||
.set(activationUpdateData)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowDeploymentVersion.workflowId, id),
|
||||
eq(workflowDeploymentVersion.version, versionNum)
|
||||
)
|
||||
)
|
||||
.returning({
|
||||
name: workflowDeploymentVersion.name,
|
||||
description: workflowDeploymentVersion.description,
|
||||
})
|
||||
|
||||
if (updated) {
|
||||
updatedName = updated.name
|
||||
updatedDescription = updated.description
|
||||
logger.info(
|
||||
`[${requestId}] Updated deployment version ${version} metadata during activation`,
|
||||
{ name: activationUpdateData.name, description: activationUpdateData.description }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return createSuccessResponse({
|
||||
success: true,
|
||||
deployedAt: result.deployedAt,
|
||||
warnings: triggerSaveResult.warnings,
|
||||
...(updatedName !== undefined && { name: updatedName }),
|
||||
...(updatedDescription !== undefined && { description: updatedDescription }),
|
||||
})
|
||||
}
|
||||
|
||||
const { name, description } = validation.data
|
||||
|
||||
// Handle name/description updates
|
||||
const updateData: { name?: string; description?: string | null } = {}
|
||||
if (name !== undefined) {
|
||||
updateData.name = name
|
||||
|
||||
@@ -1,235 +0,0 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { z } from 'zod'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
|
||||
const logger = createLogger('ExecuteFromBlockAPI')
|
||||
|
||||
const ExecuteFromBlockSchema = z.object({
|
||||
startBlockId: z.string().min(1, 'Start block ID is required'),
|
||||
sourceSnapshot: z.object({
|
||||
blockStates: z.record(z.any()),
|
||||
executedBlocks: z.array(z.string()),
|
||||
blockLogs: z.array(z.any()),
|
||||
decisions: z.object({
|
||||
router: z.record(z.string()),
|
||||
condition: z.record(z.string()),
|
||||
}),
|
||||
completedLoops: z.array(z.string()),
|
||||
loopExecutions: z.record(z.any()).optional(),
|
||||
parallelExecutions: z.record(z.any()).optional(),
|
||||
parallelBlockMapping: z.record(z.any()).optional(),
|
||||
activeExecutionPath: z.array(z.string()),
|
||||
}),
|
||||
input: z.any().optional(),
|
||||
})
|
||||
|
||||
export const runtime = 'nodejs'
|
||||
export const dynamic = 'force-dynamic'
|
||||
|
||||
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
|
||||
const requestId = generateRequestId()
|
||||
const { id: workflowId } = await params
|
||||
|
||||
try {
|
||||
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
|
||||
if (!auth.success || !auth.userId) {
|
||||
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
const userId = auth.userId
|
||||
|
||||
let body: unknown
|
||||
try {
|
||||
body = await req.json()
|
||||
} catch {
|
||||
return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 })
|
||||
}
|
||||
|
||||
const validation = ExecuteFromBlockSchema.safeParse(body)
|
||||
if (!validation.success) {
|
||||
logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors)
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Invalid request body',
|
||||
details: validation.error.errors.map((e) => ({
|
||||
path: e.path.join('.'),
|
||||
message: e.message,
|
||||
})),
|
||||
},
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const { startBlockId, sourceSnapshot, input } = validation.data
|
||||
const executionId = uuidv4()
|
||||
|
||||
// Run preprocessing checks (billing, rate limits, usage limits)
|
||||
const preprocessResult = await preprocessExecution({
|
||||
workflowId,
|
||||
userId,
|
||||
triggerType: 'manual',
|
||||
executionId,
|
||||
requestId,
|
||||
checkRateLimit: false, // Manual executions don't rate limit
|
||||
checkDeployment: false, // Run-from-block doesn't require deployment
|
||||
})
|
||||
|
||||
if (!preprocessResult.success) {
|
||||
const { error } = preprocessResult
|
||||
logger.warn(`[${requestId}] Preprocessing failed for run-from-block`, {
|
||||
workflowId,
|
||||
error: error?.message,
|
||||
statusCode: error?.statusCode,
|
||||
})
|
||||
return NextResponse.json(
|
||||
{ error: error?.message || 'Execution blocked' },
|
||||
{ status: error?.statusCode || 500 }
|
||||
)
|
||||
}
|
||||
|
||||
const workflowRecord = preprocessResult.workflowRecord
|
||||
if (!workflowRecord?.workspaceId) {
|
||||
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
|
||||
}
|
||||
|
||||
const workspaceId = workflowRecord.workspaceId
|
||||
const workflowUserId = workflowRecord.userId
|
||||
|
||||
logger.info(`[${requestId}] Starting run-from-block execution`, {
|
||||
workflowId,
|
||||
startBlockId,
|
||||
executedBlocksCount: sourceSnapshot.executedBlocks.length,
|
||||
billingActorUserId: preprocessResult.actorUserId,
|
||||
})
|
||||
|
||||
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
|
||||
const abortController = new AbortController()
|
||||
let isStreamClosed = false
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
const { sendEvent, onBlockStart, onBlockComplete, onStream } = createSSECallbacks({
|
||||
executionId,
|
||||
workflowId,
|
||||
controller,
|
||||
isStreamClosed: () => isStreamClosed,
|
||||
setStreamClosed: () => {
|
||||
isStreamClosed = true
|
||||
},
|
||||
})
|
||||
|
||||
const metadata: ExecutionMetadata = {
|
||||
requestId,
|
||||
workflowId,
|
||||
userId,
|
||||
executionId,
|
||||
triggerType: 'manual',
|
||||
workspaceId,
|
||||
workflowUserId,
|
||||
useDraftState: true,
|
||||
isClientSession: true,
|
||||
startTime: new Date().toISOString(),
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(metadata, {}, input || {}, {})
|
||||
|
||||
try {
|
||||
const startTime = new Date()
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:started',
|
||||
timestamp: startTime.toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { startTime: startTime.toISOString() },
|
||||
})
|
||||
|
||||
const result = await executeWorkflowCore({
|
||||
snapshot,
|
||||
loggingSession,
|
||||
abortSignal: abortController.signal,
|
||||
runFromBlock: {
|
||||
startBlockId,
|
||||
sourceSnapshot: sourceSnapshot as SerializableExecutionState,
|
||||
},
|
||||
callbacks: { onBlockStart, onBlockComplete, onStream },
|
||||
})
|
||||
|
||||
if (result.status === 'cancelled') {
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { duration: result.metadata?.duration || 0 },
|
||||
})
|
||||
} else {
|
||||
sendEvent({
|
||||
type: 'execution:completed',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
success: result.success,
|
||||
output: result.output,
|
||||
duration: result.metadata?.duration || 0,
|
||||
startTime: result.metadata?.startTime || startTime.toISOString(),
|
||||
endTime: result.metadata?.endTime || new Date().toISOString(),
|
||||
},
|
||||
})
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
error: executionResult?.error || errorMessage,
|
||||
duration: executionResult?.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
if (!isStreamClosed) {
|
||||
try {
|
||||
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
|
||||
controller.close()
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
isStreamClosed = true
|
||||
abortController.abort()
|
||||
markExecutionCancelled(executionId).catch(() => {})
|
||||
},
|
||||
})
|
||||
|
||||
return new NextResponse(stream, {
|
||||
headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId },
|
||||
})
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Failed to start run-from-block execution:`, error)
|
||||
return NextResponse.json(
|
||||
{ error: errorMessage || 'Failed to start execution' },
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -1,10 +1,14 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { tasks } from '@trigger.dev/sdk'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
|
||||
import { z } from 'zod'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
|
||||
import {
|
||||
createTimeoutAbortController,
|
||||
getTimeoutErrorMessage,
|
||||
isTimeoutError,
|
||||
} from '@/lib/core/execution-limits'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
@@ -12,6 +16,7 @@ import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||
import { processInputFileFields } from '@/lib/execution/files'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
import {
|
||||
cleanupExecutionBase64Cache,
|
||||
hydrateUserFilesWithBase64,
|
||||
@@ -25,7 +30,7 @@ import {
|
||||
} from '@/lib/workflows/persistence/utils'
|
||||
import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
|
||||
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
|
||||
import type { WorkflowExecutionPayload } from '@/background/workflow-execution'
|
||||
import { executeWorkflowJob, type WorkflowExecutionPayload } from '@/background/workflow-execution'
|
||||
import { normalizeName } from '@/executor/constants'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
|
||||
@@ -54,6 +59,25 @@ const ExecuteWorkflowSchema = z.object({
|
||||
})
|
||||
.optional(),
|
||||
stopAfterBlockId: z.string().optional(),
|
||||
runFromBlock: z
|
||||
.object({
|
||||
startBlockId: z.string().min(1, 'Start block ID is required'),
|
||||
sourceSnapshot: z.object({
|
||||
blockStates: z.record(z.any()),
|
||||
executedBlocks: z.array(z.string()),
|
||||
blockLogs: z.array(z.any()),
|
||||
decisions: z.object({
|
||||
router: z.record(z.string()),
|
||||
condition: z.record(z.string()),
|
||||
}),
|
||||
completedLoops: z.array(z.string()),
|
||||
loopExecutions: z.record(z.any()).optional(),
|
||||
parallelExecutions: z.record(z.any()).optional(),
|
||||
parallelBlockMapping: z.record(z.any()).optional(),
|
||||
activeExecutionPath: z.array(z.string()),
|
||||
}),
|
||||
})
|
||||
.optional(),
|
||||
})
|
||||
|
||||
export const runtime = 'nodejs'
|
||||
@@ -118,45 +142,66 @@ type AsyncExecutionParams = {
|
||||
userId: string
|
||||
input: any
|
||||
triggerType: CoreTriggerType
|
||||
executionId: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles async workflow execution by queueing a background job.
|
||||
* Returns immediately with a 202 Accepted response containing the job ID.
|
||||
*/
|
||||
async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextResponse> {
|
||||
const { requestId, workflowId, userId, input, triggerType } = params
|
||||
|
||||
if (!isTriggerDevEnabled) {
|
||||
logger.warn(`[${requestId}] Async mode requested but TRIGGER_DEV_ENABLED is false`)
|
||||
return NextResponse.json(
|
||||
{ error: 'Async execution is not enabled. Set TRIGGER_DEV_ENABLED=true to use async mode.' },
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
const { requestId, workflowId, userId, input, triggerType, executionId } = params
|
||||
|
||||
const payload: WorkflowExecutionPayload = {
|
||||
workflowId,
|
||||
userId,
|
||||
input,
|
||||
triggerType,
|
||||
executionId,
|
||||
}
|
||||
|
||||
try {
|
||||
const handle = await tasks.trigger('workflow-execution', payload)
|
||||
const jobQueue = await getJobQueue()
|
||||
const jobId = await jobQueue.enqueue('workflow-execution', payload, {
|
||||
metadata: { workflowId, userId },
|
||||
})
|
||||
|
||||
logger.info(`[${requestId}] Queued async workflow execution`, {
|
||||
workflowId,
|
||||
jobId: handle.id,
|
||||
jobId,
|
||||
})
|
||||
|
||||
if (shouldExecuteInline()) {
|
||||
void (async () => {
|
||||
try {
|
||||
await jobQueue.startJob(jobId)
|
||||
const output = await executeWorkflowJob(payload)
|
||||
await jobQueue.completeJob(jobId, output)
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||
logger.error(`[${requestId}] Async workflow execution failed`, {
|
||||
jobId,
|
||||
error: errorMessage,
|
||||
})
|
||||
try {
|
||||
await jobQueue.markJobFailed(jobId, errorMessage)
|
||||
} catch (markFailedError) {
|
||||
logger.error(`[${requestId}] Failed to mark job as failed`, {
|
||||
jobId,
|
||||
error:
|
||||
markFailedError instanceof Error
|
||||
? markFailedError.message
|
||||
: String(markFailedError),
|
||||
})
|
||||
}
|
||||
}
|
||||
})()
|
||||
}
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: true,
|
||||
async: true,
|
||||
jobId: handle.id,
|
||||
jobId,
|
||||
executionId,
|
||||
message: 'Workflow execution queued',
|
||||
statusUrl: `${getBaseUrl()}/api/jobs/${handle.id}`,
|
||||
statusUrl: `${getBaseUrl()}/api/jobs/${jobId}`,
|
||||
},
|
||||
{ status: 202 }
|
||||
)
|
||||
@@ -224,6 +269,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
base64MaxBytes,
|
||||
workflowStateOverride,
|
||||
stopAfterBlockId,
|
||||
runFromBlock,
|
||||
} = validation.data
|
||||
|
||||
// For API key and internal JWT auth, the entire body is the input (except for our control fields)
|
||||
@@ -240,6 +286,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
base64MaxBytes,
|
||||
workflowStateOverride,
|
||||
stopAfterBlockId: _stopAfterBlockId,
|
||||
runFromBlock: _runFromBlock,
|
||||
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
|
||||
...rest
|
||||
} = body
|
||||
@@ -318,6 +365,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
userId: actorUserId,
|
||||
input,
|
||||
triggerType: loggingTriggerType,
|
||||
executionId,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -405,6 +453,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
if (!enableSSE) {
|
||||
logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`)
|
||||
const timeoutController = createTimeoutAbortController(
|
||||
preprocessResult.executionTimeout?.sync
|
||||
)
|
||||
|
||||
try {
|
||||
const metadata: ExecutionMetadata = {
|
||||
requestId,
|
||||
@@ -438,8 +490,38 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
runFromBlock,
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
|
||||
if (
|
||||
result.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Non-SSE execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
output: result.output,
|
||||
error: timeoutErrorMessage,
|
||||
metadata: result.metadata
|
||||
? {
|
||||
duration: result.metadata.duration,
|
||||
startTime: result.metadata.startTime,
|
||||
endTime: result.metadata.endTime,
|
||||
}
|
||||
: undefined,
|
||||
},
|
||||
{ status: 408 }
|
||||
)
|
||||
}
|
||||
|
||||
const outputWithBase64 = includeFileBase64
|
||||
? ((await hydrateUserFilesWithBase64(result.output, {
|
||||
requestId,
|
||||
@@ -450,9 +532,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
const resultWithBase64 = { ...result, output: outputWithBase64 }
|
||||
|
||||
// Cleanup base64 cache for this execution
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
|
||||
const hasResponseBlock = workflowHasResponseBlock(resultWithBase64)
|
||||
if (hasResponseBlock) {
|
||||
return createHttpResponseFromBlock(resultWithBase64)
|
||||
@@ -460,6 +539,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
|
||||
const filteredResult = {
|
||||
success: result.success,
|
||||
executionId,
|
||||
output: outputWithBase64,
|
||||
error: result.error,
|
||||
metadata: result.metadata
|
||||
@@ -474,10 +554,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
return NextResponse.json(filteredResult)
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
|
||||
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
totalDurationMs: executionResult?.metadata?.duration,
|
||||
error: { message: errorMessage },
|
||||
traceSpans: executionResult?.logs as any,
|
||||
})
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
success: false,
|
||||
@@ -493,6 +580,15 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
},
|
||||
{ status: 500 }
|
||||
)
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
if (executionId) {
|
||||
try {
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -506,7 +602,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
cachedWorkflowData?.blocks || {}
|
||||
)
|
||||
const streamVariables = cachedWorkflowData?.variables ?? (workflow as any).variables
|
||||
|
||||
const stream = await createStreamingResponse({
|
||||
requestId,
|
||||
workflow: {
|
||||
@@ -524,6 +619,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
timeoutMs: preprocessResult.executionTimeout?.sync,
|
||||
},
|
||||
executionId,
|
||||
})
|
||||
@@ -535,7 +631,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
}
|
||||
|
||||
const encoder = new TextEncoder()
|
||||
const abortController = new AbortController()
|
||||
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
|
||||
let isStreamClosed = false
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
@@ -731,10 +827,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
onStream,
|
||||
},
|
||||
loggingSession,
|
||||
abortSignal: abortController.signal,
|
||||
abortSignal: timeoutController.signal,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
runFromBlock,
|
||||
})
|
||||
|
||||
if (result.status === 'paused') {
|
||||
@@ -767,16 +864,37 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
}
|
||||
|
||||
if (result.status === 'cancelled') {
|
||||
logger.info(`[${requestId}] Workflow execution was cancelled`)
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
if (timeoutController.isTimedOut() && timeoutController.timeoutMs) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Workflow execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
error: timeoutErrorMessage,
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
logger.info(`[${requestId}] Workflow execution was cancelled`)
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
duration: result.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -799,14 +917,26 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
endTime: result.metadata?.endTime || new Date().toISOString(),
|
||||
},
|
||||
})
|
||||
|
||||
// Cleanup base64 cache for this execution
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
|
||||
const isTimeout = isTimeoutError(error) || timeoutController.isTimedOut()
|
||||
const errorMessage = isTimeout
|
||||
? getTimeoutErrorMessage(error, timeoutController.timeoutMs)
|
||||
: error instanceof Error
|
||||
? error.message
|
||||
: 'Unknown error'
|
||||
|
||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout })
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
const { traceSpans, totalDuration } = executionResult
|
||||
? buildTraceSpans(executionResult)
|
||||
: { traceSpans: [], totalDuration: 0 }
|
||||
|
||||
await loggingSession.safeCompleteWithError({
|
||||
totalDurationMs: totalDuration || executionResult?.metadata?.duration,
|
||||
error: { message: errorMessage },
|
||||
traceSpans,
|
||||
})
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
@@ -819,20 +949,23 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
if (executionId) {
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
}
|
||||
if (!isStreamClosed) {
|
||||
try {
|
||||
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
|
||||
controller.close()
|
||||
} catch {
|
||||
// Stream already closed - nothing to do
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
isStreamClosed = true
|
||||
timeoutController.cleanup()
|
||||
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
|
||||
abortController.abort()
|
||||
timeoutController.abort()
|
||||
markExecutionCancelled(executionId).catch(() => {})
|
||||
},
|
||||
})
|
||||
|
||||
@@ -508,8 +508,10 @@ export default function TemplateDetails({ isWorkspaceContext = false }: Template
|
||||
|
||||
setIsApproving(true)
|
||||
try {
|
||||
const response = await fetch(`/api/templates/${template.id}/approve`, {
|
||||
method: 'POST',
|
||||
const response = await fetch(`/api/templates/${template.id}`, {
|
||||
method: 'PUT',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ status: 'approved' }),
|
||||
})
|
||||
|
||||
if (response.ok) {
|
||||
@@ -531,8 +533,10 @@ export default function TemplateDetails({ isWorkspaceContext = false }: Template
|
||||
|
||||
setIsRejecting(true)
|
||||
try {
|
||||
const response = await fetch(`/api/templates/${template.id}/reject`, {
|
||||
method: 'POST',
|
||||
const response = await fetch(`/api/templates/${template.id}`, {
|
||||
method: 'PUT',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ status: 'rejected' }),
|
||||
})
|
||||
|
||||
if (response.ok) {
|
||||
@@ -554,10 +558,11 @@ export default function TemplateDetails({ isWorkspaceContext = false }: Template
|
||||
|
||||
setIsVerifying(true)
|
||||
try {
|
||||
const endpoint = `/api/creators/${template.creator.id}/verify`
|
||||
const method = template.creator.verified ? 'DELETE' : 'POST'
|
||||
|
||||
const response = await fetch(endpoint, { method })
|
||||
const response = await fetch(`/api/creators/${template.creator.id}`, {
|
||||
method: 'PUT',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ verified: !template.creator.verified }),
|
||||
})
|
||||
|
||||
if (response.ok) {
|
||||
// Refresh page to show updated verification status
|
||||
|
||||
@@ -12,7 +12,6 @@ import {
|
||||
Tooltip,
|
||||
} from '@/components/emcn'
|
||||
import { Skeleton } from '@/components/ui'
|
||||
import { getEnv, isTruthy } from '@/lib/core/config/env'
|
||||
import { OutputSelect } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select'
|
||||
|
||||
interface WorkflowDeploymentInfo {
|
||||
@@ -78,7 +77,6 @@ export function ApiDeploy({
|
||||
async: false,
|
||||
})
|
||||
|
||||
const isAsyncEnabled = isTruthy(getEnv('NEXT_PUBLIC_TRIGGER_DEV_ENABLED'))
|
||||
const info = deploymentInfo ? { ...deploymentInfo, needsRedeployment } : null
|
||||
|
||||
const getBaseEndpoint = () => {
|
||||
@@ -272,7 +270,7 @@ response = requests.post(
|
||||
)
|
||||
|
||||
job = response.json()
|
||||
print(job) # Contains job_id for status checking`
|
||||
print(job) # Contains jobId and executionId`
|
||||
|
||||
case 'javascript':
|
||||
return `const response = await fetch("${endpoint}", {
|
||||
@@ -286,7 +284,7 @@ print(job) # Contains job_id for status checking`
|
||||
});
|
||||
|
||||
const job = await response.json();
|
||||
console.log(job); // Contains job_id for status checking`
|
||||
console.log(job); // Contains jobId and executionId`
|
||||
|
||||
case 'typescript':
|
||||
return `const response = await fetch("${endpoint}", {
|
||||
@@ -299,8 +297,8 @@ console.log(job); // Contains job_id for status checking`
|
||||
body: JSON.stringify(${JSON.stringify(payload)})
|
||||
});
|
||||
|
||||
const job: { job_id: string } = await response.json();
|
||||
console.log(job); // Contains job_id for status checking`
|
||||
const job: { jobId: string; executionId: string } = await response.json();
|
||||
console.log(job); // Contains jobId and executionId`
|
||||
|
||||
default:
|
||||
return ''
|
||||
@@ -539,55 +537,49 @@ console.log(limits);`
|
||||
/>
|
||||
</div>
|
||||
|
||||
{isAsyncEnabled && (
|
||||
<div>
|
||||
<div className='mb-[6.5px] flex items-center justify-between'>
|
||||
<Label className='block pl-[2px] font-medium text-[13px] text-[var(--text-primary)]'>
|
||||
Run workflow (async)
|
||||
</Label>
|
||||
<div className='flex items-center gap-[6px]'>
|
||||
<Tooltip.Root>
|
||||
<Tooltip.Trigger asChild>
|
||||
<Button
|
||||
variant='ghost'
|
||||
onClick={() => handleCopy('async', getAsyncCommand())}
|
||||
aria-label='Copy command'
|
||||
className='!p-1.5 -my-1.5'
|
||||
>
|
||||
{copied.async ? (
|
||||
<Check className='h-3 w-3' />
|
||||
) : (
|
||||
<Clipboard className='h-3 w-3' />
|
||||
)}
|
||||
</Button>
|
||||
</Tooltip.Trigger>
|
||||
<Tooltip.Content>
|
||||
<span>{copied.async ? 'Copied' : 'Copy'}</span>
|
||||
</Tooltip.Content>
|
||||
</Tooltip.Root>
|
||||
<Combobox
|
||||
size='sm'
|
||||
className='!w-fit !py-[2px] min-w-[100px] rounded-[6px] px-[9px]'
|
||||
options={[
|
||||
{ label: 'Execute Job', value: 'execute' },
|
||||
{ label: 'Check Status', value: 'status' },
|
||||
{ label: 'Rate Limits', value: 'rate-limits' },
|
||||
]}
|
||||
value={asyncExampleType}
|
||||
onChange={(value) => setAsyncExampleType(value as AsyncExampleType)}
|
||||
align='end'
|
||||
dropdownWidth={160}
|
||||
/>
|
||||
</div>
|
||||
<div>
|
||||
<div className='mb-[6.5px] flex items-center justify-between'>
|
||||
<Label className='block pl-[2px] font-medium text-[13px] text-[var(--text-primary)]'>
|
||||
Run workflow (async)
|
||||
</Label>
|
||||
<div className='flex items-center gap-[6px]'>
|
||||
<Tooltip.Root>
|
||||
<Tooltip.Trigger asChild>
|
||||
<Button
|
||||
variant='ghost'
|
||||
onClick={() => handleCopy('async', getAsyncCommand())}
|
||||
aria-label='Copy command'
|
||||
className='!p-1.5 -my-1.5'
|
||||
>
|
||||
{copied.async ? <Check className='h-3 w-3' /> : <Clipboard className='h-3 w-3' />}
|
||||
</Button>
|
||||
</Tooltip.Trigger>
|
||||
<Tooltip.Content>
|
||||
<span>{copied.async ? 'Copied' : 'Copy'}</span>
|
||||
</Tooltip.Content>
|
||||
</Tooltip.Root>
|
||||
<Combobox
|
||||
size='sm'
|
||||
className='!w-fit !py-[2px] min-w-[100px] rounded-[6px] px-[9px]'
|
||||
options={[
|
||||
{ label: 'Execute Job', value: 'execute' },
|
||||
{ label: 'Check Status', value: 'status' },
|
||||
{ label: 'Rate Limits', value: 'rate-limits' },
|
||||
]}
|
||||
value={asyncExampleType}
|
||||
onChange={(value) => setAsyncExampleType(value as AsyncExampleType)}
|
||||
align='end'
|
||||
dropdownWidth={160}
|
||||
/>
|
||||
</div>
|
||||
<Code.Viewer
|
||||
code={getAsyncCommand()}
|
||||
language={LANGUAGE_SYNTAX[language]}
|
||||
wrapText
|
||||
className='!min-h-0 rounded-[4px] border border-[var(--border-1)]'
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
<Code.Viewer
|
||||
code={getAsyncCommand()}
|
||||
language={LANGUAGE_SYNTAX[language]}
|
||||
wrapText
|
||||
className='!min-h-0 rounded-[4px] border border-[var(--border-1)]'
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import type React from 'react'
|
||||
import { RepeatIcon, SplitIcon } from 'lucide-react'
|
||||
import { AlertTriangleIcon, BanIcon, RepeatIcon, SplitIcon, XCircleIcon } from 'lucide-react'
|
||||
import { getBlock } from '@/blocks'
|
||||
import { TERMINAL_BLOCK_COLUMN_WIDTH } from '@/stores/constants'
|
||||
import type { ConsoleEntry } from '@/stores/terminal'
|
||||
@@ -12,6 +12,15 @@ const SUBFLOW_COLORS = {
|
||||
parallel: '#FEE12B',
|
||||
} as const
|
||||
|
||||
/**
|
||||
* Special block type colors for errors and system messages
|
||||
*/
|
||||
const SPECIAL_BLOCK_COLORS = {
|
||||
error: '#ef4444',
|
||||
validation: '#f59e0b',
|
||||
cancelled: '#6b7280',
|
||||
} as const
|
||||
|
||||
/**
|
||||
* Retrieves the icon component for a given block type
|
||||
*/
|
||||
@@ -32,6 +41,18 @@ export function getBlockIcon(
|
||||
return SplitIcon
|
||||
}
|
||||
|
||||
if (blockType === 'error') {
|
||||
return XCircleIcon
|
||||
}
|
||||
|
||||
if (blockType === 'validation') {
|
||||
return AlertTriangleIcon
|
||||
}
|
||||
|
||||
if (blockType === 'cancelled') {
|
||||
return BanIcon
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
@@ -50,6 +71,16 @@ export function getBlockColor(blockType: string): string {
|
||||
if (blockType === 'parallel') {
|
||||
return SUBFLOW_COLORS.parallel
|
||||
}
|
||||
// Special block types for errors and system messages
|
||||
if (blockType === 'error') {
|
||||
return SPECIAL_BLOCK_COLORS.error
|
||||
}
|
||||
if (blockType === 'validation') {
|
||||
return SPECIAL_BLOCK_COLORS.validation
|
||||
}
|
||||
if (blockType === 'cancelled') {
|
||||
return SPECIAL_BLOCK_COLORS.cancelled
|
||||
}
|
||||
return '#6b7280'
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,11 @@ import { useQueryClient } from '@tanstack/react-query'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
import { processStreamingBlockLogs } from '@/lib/tokenization'
|
||||
import type {
|
||||
BlockCompletedData,
|
||||
BlockErrorData,
|
||||
BlockStartedData,
|
||||
} from '@/lib/workflows/executor/execution-events'
|
||||
import {
|
||||
extractTriggerMockPayload,
|
||||
selectBestTrigger,
|
||||
@@ -17,7 +22,13 @@ import {
|
||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||
import { getBlock } from '@/blocks'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import type {
|
||||
BlockLog,
|
||||
BlockState,
|
||||
ExecutionResult,
|
||||
NormalizedBlockOutput,
|
||||
StreamingExecution,
|
||||
} from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { coerceValue } from '@/executor/utils/start-block'
|
||||
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
||||
@@ -27,7 +38,7 @@ import { useExecutionStore } from '@/stores/execution'
|
||||
import { useNotificationStore } from '@/stores/notifications'
|
||||
import { useVariablesStore } from '@/stores/panel'
|
||||
import { useEnvironmentStore } from '@/stores/settings/environment'
|
||||
import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal'
|
||||
import { useTerminalConsoleStore } from '@/stores/terminal'
|
||||
import { useWorkflowDiffStore } from '@/stores/workflow-diff'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { mergeSubblockState } from '@/stores/workflows/utils'
|
||||
@@ -41,6 +52,19 @@ interface DebugValidationResult {
|
||||
error?: string
|
||||
}
|
||||
|
||||
interface BlockEventHandlerConfig {
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
workflowEdges: Array<{ id: string; target: string }>
|
||||
activeBlocksSet: Set<string>
|
||||
accumulatedBlockLogs: BlockLog[]
|
||||
accumulatedBlockStates: Map<string, BlockState>
|
||||
executedBlockIds: Set<string>
|
||||
consoleMode: 'update' | 'add'
|
||||
includeStartConsoleEntry: boolean
|
||||
onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise<void>
|
||||
}
|
||||
|
||||
const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed'
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
@@ -149,6 +173,340 @@ export function useWorkflowExecution() {
|
||||
setActiveBlocks,
|
||||
])
|
||||
|
||||
/**
|
||||
* Builds timing fields for execution-level console entries.
|
||||
*/
|
||||
const buildExecutionTiming = useCallback((durationMs?: number) => {
|
||||
const normalizedDuration = durationMs || 0
|
||||
return {
|
||||
durationMs: normalizedDuration,
|
||||
startedAt: new Date(Date.now() - normalizedDuration).toISOString(),
|
||||
endedAt: new Date().toISOString(),
|
||||
}
|
||||
}, [])
|
||||
|
||||
/**
|
||||
* Adds an execution-level error entry to the console when appropriate.
|
||||
*/
|
||||
const addExecutionErrorConsoleEntry = useCallback(
|
||||
(params: {
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
error?: string
|
||||
durationMs?: number
|
||||
blockLogs: BlockLog[]
|
||||
isPreExecutionError?: boolean
|
||||
}) => {
|
||||
if (!params.workflowId) return
|
||||
|
||||
const hasBlockError = params.blockLogs.some((log) => log.error)
|
||||
const isPreExecutionError = params.isPreExecutionError ?? false
|
||||
if (!isPreExecutionError && hasBlockError) {
|
||||
return
|
||||
}
|
||||
|
||||
const errorMessage = params.error || 'Execution failed'
|
||||
const isTimeout = errorMessage.toLowerCase().includes('timed out')
|
||||
const timing = buildExecutionTiming(params.durationMs)
|
||||
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: errorMessage,
|
||||
durationMs: timing.durationMs,
|
||||
startedAt: timing.startedAt,
|
||||
executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER,
|
||||
endedAt: timing.endedAt,
|
||||
workflowId: params.workflowId,
|
||||
blockId: isPreExecutionError
|
||||
? 'validation'
|
||||
: isTimeout
|
||||
? 'timeout-error'
|
||||
: 'execution-error',
|
||||
executionId: params.executionId,
|
||||
blockName: isPreExecutionError
|
||||
? 'Workflow Validation'
|
||||
: isTimeout
|
||||
? 'Timeout Error'
|
||||
: 'Execution Error',
|
||||
blockType: isPreExecutionError ? 'validation' : 'error',
|
||||
})
|
||||
},
|
||||
[addConsole, buildExecutionTiming]
|
||||
)
|
||||
|
||||
/**
|
||||
* Adds an execution-level cancellation entry to the console.
|
||||
*/
|
||||
const addExecutionCancelledConsoleEntry = useCallback(
|
||||
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
|
||||
if (!params.workflowId) return
|
||||
|
||||
const timing = buildExecutionTiming(params.durationMs)
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: 'Execution was cancelled',
|
||||
durationMs: timing.durationMs,
|
||||
startedAt: timing.startedAt,
|
||||
executionOrder: Number.MAX_SAFE_INTEGER,
|
||||
endedAt: timing.endedAt,
|
||||
workflowId: params.workflowId,
|
||||
blockId: 'cancelled',
|
||||
executionId: params.executionId,
|
||||
blockName: 'Execution Cancelled',
|
||||
blockType: 'cancelled',
|
||||
})
|
||||
},
|
||||
[addConsole, buildExecutionTiming]
|
||||
)
|
||||
|
||||
/**
|
||||
* Handles workflow-level execution errors for console output.
|
||||
*/
|
||||
const handleExecutionErrorConsole = useCallback(
|
||||
(params: {
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
error?: string
|
||||
durationMs?: number
|
||||
blockLogs: BlockLog[]
|
||||
isPreExecutionError?: boolean
|
||||
}) => {
|
||||
if (params.workflowId) {
|
||||
cancelRunningEntries(params.workflowId)
|
||||
}
|
||||
addExecutionErrorConsoleEntry(params)
|
||||
},
|
||||
[addExecutionErrorConsoleEntry, cancelRunningEntries]
|
||||
)
|
||||
|
||||
/**
|
||||
* Handles workflow-level execution cancellations for console output.
|
||||
*/
|
||||
const handleExecutionCancelledConsole = useCallback(
|
||||
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
|
||||
if (params.workflowId) {
|
||||
cancelRunningEntries(params.workflowId)
|
||||
}
|
||||
addExecutionCancelledConsoleEntry(params)
|
||||
},
|
||||
[addExecutionCancelledConsoleEntry, cancelRunningEntries]
|
||||
)
|
||||
|
||||
const buildBlockEventHandlers = useCallback(
|
||||
(config: BlockEventHandlerConfig) => {
|
||||
const {
|
||||
workflowId,
|
||||
executionId,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
consoleMode,
|
||||
includeStartConsoleEntry,
|
||||
onBlockCompleteCallback,
|
||||
} = config
|
||||
|
||||
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
|
||||
if (isActive) {
|
||||
activeBlocksSet.add(blockId)
|
||||
} else {
|
||||
activeBlocksSet.delete(blockId)
|
||||
}
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
}
|
||||
|
||||
const markIncomingEdges = (blockId: string) => {
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
|
||||
incomingEdges.forEach((edge) => {
|
||||
setEdgeRunStatus(edge.id, 'success')
|
||||
})
|
||||
}
|
||||
|
||||
const isContainerBlockType = (blockType?: string) => {
|
||||
return blockType === 'loop' || blockType === 'parallel'
|
||||
}
|
||||
|
||||
const createBlockLogEntry = (
|
||||
data: BlockCompletedData | BlockErrorData,
|
||||
options: { success: boolean; output?: unknown; error?: string }
|
||||
): BlockLog => ({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: options.output ?? {},
|
||||
success: options.success,
|
||||
error: options.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
})
|
||||
|
||||
const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => {
|
||||
if (!workflowId) return
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
}
|
||||
|
||||
const addConsoleErrorEntry = (data: BlockErrorData) => {
|
||||
if (!workflowId) return
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: data.endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
}
|
||||
|
||||
const updateConsoleEntry = (data: BlockCompletedData) => {
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
input: data.input || {},
|
||||
replaceOutput: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
endedAt: data.endedAt,
|
||||
isRunning: false,
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
},
|
||||
executionId
|
||||
)
|
||||
}
|
||||
|
||||
const updateConsoleErrorEntry = (data: BlockErrorData) => {
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
input: data.input || {},
|
||||
replaceOutput: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt: data.startedAt,
|
||||
endedAt: data.endedAt,
|
||||
isRunning: false,
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
},
|
||||
executionId
|
||||
)
|
||||
}
|
||||
|
||||
const onBlockStarted = (data: BlockStartedData) => {
|
||||
updateActiveBlocks(data.blockId, true)
|
||||
markIncomingEdges(data.blockId)
|
||||
|
||||
if (!includeStartConsoleEntry || !workflowId) return
|
||||
|
||||
const startedAt = new Date().toISOString()
|
||||
addConsole({
|
||||
input: {},
|
||||
output: undefined,
|
||||
success: undefined,
|
||||
durationMs: undefined,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: undefined,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
isRunning: true,
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
}
|
||||
|
||||
const onBlockCompleted = (data: BlockCompletedData) => {
|
||||
updateActiveBlocks(data.blockId, false)
|
||||
setBlockRunStatus(data.blockId, 'success')
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
if (isContainerBlockType(data.blockType)) {
|
||||
return
|
||||
}
|
||||
|
||||
accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))
|
||||
|
||||
if (consoleMode === 'update') {
|
||||
updateConsoleEntry(data)
|
||||
} else {
|
||||
addConsoleEntry(data, data.output as NormalizedBlockOutput)
|
||||
}
|
||||
|
||||
if (onBlockCompleteCallback) {
|
||||
onBlockCompleteCallback(data.blockId, data.output).catch((error) => {
|
||||
logger.error('Error in onBlockComplete callback:', error)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const onBlockError = (data: BlockErrorData) => {
|
||||
updateActiveBlocks(data.blockId, false)
|
||||
setBlockRunStatus(data.blockId, 'error')
|
||||
|
||||
accumulatedBlockLogs.push(
|
||||
createBlockLogEntry(data, { success: false, output: {}, error: data.error })
|
||||
)
|
||||
|
||||
if (consoleMode === 'update') {
|
||||
updateConsoleErrorEntry(data)
|
||||
} else {
|
||||
addConsoleErrorEntry(data)
|
||||
}
|
||||
}
|
||||
|
||||
return { onBlockStarted, onBlockCompleted, onBlockError }
|
||||
},
|
||||
[addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole]
|
||||
)
|
||||
|
||||
/**
|
||||
* Checks if debug session is complete based on execution result
|
||||
*/
|
||||
@@ -789,7 +1147,12 @@ export function useWorkflowExecution() {
|
||||
const startBlock = TriggerUtils.findStartBlock(filteredStates, 'chat')
|
||||
|
||||
if (!startBlock) {
|
||||
throw new Error(TriggerUtils.getTriggerValidationMessage('chat', 'missing'))
|
||||
throw new WorkflowValidationError(
|
||||
TriggerUtils.getTriggerValidationMessage('chat', 'missing'),
|
||||
'validation',
|
||||
'validation',
|
||||
'Workflow Validation'
|
||||
)
|
||||
}
|
||||
|
||||
startBlockId = startBlock.blockId
|
||||
@@ -800,7 +1163,12 @@ export function useWorkflowExecution() {
|
||||
})
|
||||
|
||||
if (candidates.length === 0) {
|
||||
const error = new Error('Workflow requires at least one trigger block to execute')
|
||||
const error = new WorkflowValidationError(
|
||||
'Workflow requires at least one trigger block to execute',
|
||||
'validation',
|
||||
'validation',
|
||||
'Workflow Validation'
|
||||
)
|
||||
logger.error('No trigger blocks found for manual run', {
|
||||
allBlockTypes: Object.values(filteredStates).map((b) => b.type),
|
||||
})
|
||||
@@ -813,7 +1181,12 @@ export function useWorkflowExecution() {
|
||||
(candidate) => candidate.path === StartBlockPath.SPLIT_API
|
||||
)
|
||||
if (apiCandidates.length > 1) {
|
||||
const error = new Error('Multiple API Trigger blocks found. Keep only one.')
|
||||
const error = new WorkflowValidationError(
|
||||
'Multiple API Trigger blocks found. Keep only one.',
|
||||
'validation',
|
||||
'validation',
|
||||
'Workflow Validation'
|
||||
)
|
||||
logger.error('Multiple API triggers found')
|
||||
setIsExecuting(false)
|
||||
throw error
|
||||
@@ -833,7 +1206,12 @@ export function useWorkflowExecution() {
|
||||
const outgoingConnections = workflowEdges.filter((edge) => edge.source === startBlockId)
|
||||
if (outgoingConnections.length === 0) {
|
||||
const triggerName = selectedTrigger.name || selectedTrigger.type
|
||||
const error = new Error(`${triggerName} must be connected to other blocks to execute`)
|
||||
const error = new WorkflowValidationError(
|
||||
`${triggerName} must be connected to other blocks to execute`,
|
||||
'validation',
|
||||
'validation',
|
||||
'Workflow Validation'
|
||||
)
|
||||
logger.error('Trigger has no outgoing connections', { triggerName, startBlockId })
|
||||
setIsExecuting(false)
|
||||
throw error
|
||||
@@ -859,7 +1237,12 @@ export function useWorkflowExecution() {
|
||||
|
||||
// If we don't have a valid startBlockId at this point, throw an error
|
||||
if (!startBlockId) {
|
||||
const error = new Error('No valid trigger block found to start execution')
|
||||
const error = new WorkflowValidationError(
|
||||
'No valid trigger block found to start execution',
|
||||
'validation',
|
||||
'validation',
|
||||
'Workflow Validation'
|
||||
)
|
||||
logger.error('No startBlockId found after trigger search')
|
||||
setIsExecuting(false)
|
||||
throw error
|
||||
@@ -892,6 +1275,19 @@ export function useWorkflowExecution() {
|
||||
|
||||
// Execute the workflow
|
||||
try {
|
||||
const blockHandlers = buildBlockEventHandlers({
|
||||
workflowId: activeWorkflowId,
|
||||
executionId,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
consoleMode: 'update',
|
||||
includeStartConsoleEntry: true,
|
||||
onBlockCompleteCallback: onBlockComplete,
|
||||
})
|
||||
|
||||
await executionStream.execute({
|
||||
workflowId: activeWorkflowId,
|
||||
input: finalWorkflowInput,
|
||||
@@ -914,145 +1310,9 @@ export function useWorkflowExecution() {
|
||||
logger.info('Server execution started:', data)
|
||||
},
|
||||
|
||||
onBlockStarted: (data) => {
|
||||
activeBlocksSet.add(data.blockId)
|
||||
// Create a new Set to trigger React re-render
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
// Track edges that led to this block as soon as execution starts
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
|
||||
incomingEdges.forEach((edge) => {
|
||||
setEdgeRunStatus(edge.id, 'success')
|
||||
})
|
||||
|
||||
// Add entry to terminal immediately with isRunning=true
|
||||
// Use server-provided executionOrder to ensure correct sort order
|
||||
const startedAt = new Date().toISOString()
|
||||
addConsole({
|
||||
input: {},
|
||||
output: undefined,
|
||||
success: undefined,
|
||||
durationMs: undefined,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt: undefined,
|
||||
workflowId: activeWorkflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
isRunning: true,
|
||||
// Pass through iteration context for subflow grouping
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
},
|
||||
|
||||
onBlockCompleted: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
setBlockRunStatus(data.blockId, 'success')
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
||||
if (isContainerBlock) return
|
||||
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
// Update existing console entry (created in onBlockStarted) with completion data
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
input: data.input || {},
|
||||
replaceOutput: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
isRunning: false,
|
||||
// Pass through iteration context for subflow grouping
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
},
|
||||
executionId
|
||||
)
|
||||
|
||||
// Call onBlockComplete callback if provided
|
||||
if (onBlockComplete) {
|
||||
onBlockComplete(data.blockId, data.output).catch((error) => {
|
||||
logger.error('Error in onBlockComplete callback:', error)
|
||||
})
|
||||
}
|
||||
},
|
||||
|
||||
onBlockError: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
// Create a new Set to trigger React re-render
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
// Track failed block execution in run path
|
||||
setBlockRunStatus(data.blockId, 'error')
|
||||
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
// Accumulate block error log for the execution result
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
// Update existing console entry (created in onBlockStarted) with error data
|
||||
updateConsole(
|
||||
data.blockId,
|
||||
{
|
||||
input: data.input || {},
|
||||
replaceOutput: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
isRunning: false,
|
||||
// Pass through iteration context for subflow grouping
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
},
|
||||
executionId
|
||||
)
|
||||
},
|
||||
onBlockStarted: blockHandlers.onBlockStarted,
|
||||
onBlockCompleted: blockHandlers.onBlockCompleted,
|
||||
onBlockError: blockHandlers.onBlockError,
|
||||
|
||||
onStreamChunk: (data) => {
|
||||
const existing = streamedContent.get(data.blockId) || ''
|
||||
@@ -1157,39 +1417,23 @@ export function useWorkflowExecution() {
|
||||
logs: accumulatedBlockLogs,
|
||||
}
|
||||
|
||||
// Only add workflow-level error if no blocks have executed yet
|
||||
// This catches pre-execution errors (validation, serialization, etc.)
|
||||
// Block execution errors are already logged via onBlockError callback
|
||||
const { entries } = useTerminalConsoleStore.getState()
|
||||
const existingLogs = entries.filter(
|
||||
(log: ConsoleEntry) => log.executionId === executionId
|
||||
)
|
||||
|
||||
if (existingLogs.length === 0) {
|
||||
// No blocks executed yet - this is a pre-execution error
|
||||
// Use 0 for executionOrder so validation errors appear first
|
||||
addConsole({
|
||||
input: {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.duration || 0,
|
||||
startedAt: new Date(Date.now() - (data.duration || 0)).toISOString(),
|
||||
executionOrder: 0,
|
||||
endedAt: new Date().toISOString(),
|
||||
workflowId: activeWorkflowId,
|
||||
blockId: 'validation',
|
||||
executionId,
|
||||
blockName: 'Workflow Validation',
|
||||
blockType: 'validation',
|
||||
})
|
||||
}
|
||||
const isPreExecutionError = accumulatedBlockLogs.length === 0
|
||||
handleExecutionErrorConsole({
|
||||
workflowId: activeWorkflowId,
|
||||
executionId,
|
||||
error: data.error,
|
||||
durationMs: data.duration,
|
||||
blockLogs: accumulatedBlockLogs,
|
||||
isPreExecutionError,
|
||||
})
|
||||
},
|
||||
|
||||
onExecutionCancelled: () => {
|
||||
if (activeWorkflowId) {
|
||||
cancelRunningEntries(activeWorkflowId)
|
||||
}
|
||||
onExecutionCancelled: (data) => {
|
||||
handleExecutionCancelledConsole({
|
||||
workflowId: activeWorkflowId,
|
||||
executionId,
|
||||
durationMs: data?.duration,
|
||||
})
|
||||
},
|
||||
},
|
||||
})
|
||||
@@ -1585,115 +1829,27 @@ export function useWorkflowExecution() {
|
||||
const activeBlocksSet = new Set<string>()
|
||||
|
||||
try {
|
||||
const blockHandlers = buildBlockEventHandlers({
|
||||
workflowId,
|
||||
executionId,
|
||||
workflowEdges,
|
||||
activeBlocksSet,
|
||||
accumulatedBlockLogs,
|
||||
accumulatedBlockStates,
|
||||
executedBlockIds,
|
||||
consoleMode: 'add',
|
||||
includeStartConsoleEntry: false,
|
||||
})
|
||||
|
||||
await executionStream.executeFromBlock({
|
||||
workflowId,
|
||||
startBlockId: blockId,
|
||||
sourceSnapshot: effectiveSnapshot,
|
||||
input: workflowInput,
|
||||
callbacks: {
|
||||
onBlockStarted: (data) => {
|
||||
activeBlocksSet.add(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
|
||||
incomingEdges.forEach((edge) => {
|
||||
setEdgeRunStatus(edge.id, 'success')
|
||||
})
|
||||
},
|
||||
|
||||
onBlockCompleted: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(data.blockId, 'success')
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
||||
if (isContainerBlock) return
|
||||
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
},
|
||||
|
||||
onBlockError: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(data.blockId, 'error')
|
||||
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
executionOrder: data.executionOrder,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
executionOrder: data.executionOrder,
|
||||
endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName,
|
||||
blockType: data.blockType,
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
},
|
||||
onBlockStarted: blockHandlers.onBlockStarted,
|
||||
onBlockCompleted: blockHandlers.onBlockCompleted,
|
||||
onBlockError: blockHandlers.onBlockError,
|
||||
|
||||
onExecutionCompleted: (data) => {
|
||||
if (data.success) {
|
||||
@@ -1736,17 +1892,23 @@ export function useWorkflowExecution() {
|
||||
'Workflow was modified. Run the workflow again to enable running from block.',
|
||||
workflowId,
|
||||
})
|
||||
} else {
|
||||
addNotification({
|
||||
level: 'error',
|
||||
message: data.error || 'Run from block failed',
|
||||
workflowId,
|
||||
})
|
||||
}
|
||||
|
||||
handleExecutionErrorConsole({
|
||||
workflowId,
|
||||
executionId,
|
||||
error: data.error,
|
||||
durationMs: data.duration,
|
||||
blockLogs: accumulatedBlockLogs,
|
||||
})
|
||||
},
|
||||
|
||||
onExecutionCancelled: () => {
|
||||
cancelRunningEntries(workflowId)
|
||||
onExecutionCancelled: (data) => {
|
||||
handleExecutionCancelledConsole({
|
||||
workflowId,
|
||||
executionId,
|
||||
durationMs: data?.duration,
|
||||
})
|
||||
},
|
||||
},
|
||||
})
|
||||
@@ -1768,8 +1930,9 @@ export function useWorkflowExecution() {
|
||||
setBlockRunStatus,
|
||||
setEdgeRunStatus,
|
||||
addNotification,
|
||||
addConsole,
|
||||
cancelRunningEntries,
|
||||
buildBlockEventHandlers,
|
||||
handleExecutionErrorConsole,
|
||||
handleExecutionCancelledConsole,
|
||||
executionStream,
|
||||
]
|
||||
)
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import {
|
||||
Building2,
|
||||
Clock,
|
||||
Database,
|
||||
HardDrive,
|
||||
HeadphonesIcon,
|
||||
Server,
|
||||
ShieldCheck,
|
||||
Timer,
|
||||
Users,
|
||||
Zap,
|
||||
} from 'lucide-react'
|
||||
@@ -15,8 +15,8 @@ import type { PlanFeature } from '@/app/workspace/[workspaceId]/w/components/sid
|
||||
export const PRO_PLAN_FEATURES: PlanFeature[] = [
|
||||
{ icon: Zap, text: '150 runs per minute (sync)' },
|
||||
{ icon: Clock, text: '1,000 runs per minute (async)' },
|
||||
{ icon: Timer, text: '50 min sync execution limit' },
|
||||
{ icon: HardDrive, text: '50GB file storage' },
|
||||
{ icon: Building2, text: 'Unlimited workspaces' },
|
||||
{ icon: Users, text: 'Unlimited invites' },
|
||||
{ icon: Database, text: 'Unlimited log retention' },
|
||||
]
|
||||
@@ -24,8 +24,8 @@ export const PRO_PLAN_FEATURES: PlanFeature[] = [
|
||||
export const TEAM_PLAN_FEATURES: PlanFeature[] = [
|
||||
{ icon: Zap, text: '300 runs per minute (sync)' },
|
||||
{ icon: Clock, text: '2,500 runs per minute (async)' },
|
||||
{ icon: Timer, text: '50 min sync execution limit' },
|
||||
{ icon: HardDrive, text: '500GB file storage (pooled)' },
|
||||
{ icon: Building2, text: 'Unlimited workspaces' },
|
||||
{ icon: Users, text: 'Unlimited invites' },
|
||||
{ icon: Database, text: 'Unlimited log retention' },
|
||||
{ icon: SlackMonoIcon, text: 'Dedicated Slack channel' },
|
||||
|
||||
@@ -14,6 +14,7 @@ import { createLogger } from '@sim/logger'
|
||||
import { useParams } from 'next/navigation'
|
||||
import { io, type Socket } from 'socket.io-client'
|
||||
import { getEnv } from '@/lib/core/config/env'
|
||||
import { useOperationQueueStore } from '@/stores/operation-queue/store'
|
||||
|
||||
const logger = createLogger('SocketContext')
|
||||
|
||||
@@ -138,6 +139,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
const [authFailed, setAuthFailed] = useState(false)
|
||||
const initializedRef = useRef(false)
|
||||
const socketRef = useRef<Socket | null>(null)
|
||||
const triggerOfflineMode = useOperationQueueStore((state) => state.triggerOfflineMode)
|
||||
|
||||
const params = useParams()
|
||||
const urlWorkflowId = params?.workflowId as string | undefined
|
||||
@@ -341,9 +343,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
|
||||
})
|
||||
})
|
||||
|
||||
socketInstance.on('join-workflow-error', ({ error }) => {
|
||||
socketInstance.on('join-workflow-error', ({ error, code }) => {
|
||||
isRejoiningRef.current = false
|
||||
logger.error('Failed to join workflow:', error)
|
||||
logger.error('Failed to join workflow:', { error, code })
|
||||
if (code === 'ROOM_MANAGER_UNAVAILABLE') {
|
||||
triggerOfflineMode()
|
||||
}
|
||||
})
|
||||
|
||||
socketInstance.on('workflow-operation', (data) => {
|
||||
|
||||
@@ -4,6 +4,7 @@ import { task } from '@trigger.dev/sdk'
|
||||
import { Cron } from 'croner'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
@@ -120,6 +121,7 @@ async function runWorkflowExecution({
|
||||
loggingSession,
|
||||
requestId,
|
||||
executionId,
|
||||
asyncTimeout,
|
||||
}: {
|
||||
payload: ScheduleExecutionPayload
|
||||
workflowRecord: WorkflowRecord
|
||||
@@ -127,6 +129,7 @@ async function runWorkflowExecution({
|
||||
loggingSession: LoggingSession
|
||||
requestId: string
|
||||
executionId: string
|
||||
asyncTimeout?: number
|
||||
}): Promise<RunWorkflowResult> {
|
||||
try {
|
||||
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
|
||||
@@ -181,15 +184,33 @@ async function runWorkflowExecution({
|
||||
[]
|
||||
)
|
||||
|
||||
const executionResult = await executeWorkflowCore({
|
||||
snapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
base64MaxBytes: undefined,
|
||||
})
|
||||
const timeoutController = createTimeoutAbortController(asyncTimeout)
|
||||
|
||||
if (executionResult.status === 'paused') {
|
||||
let executionResult
|
||||
try {
|
||||
executionResult = await executeWorkflowCore({
|
||||
snapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
base64MaxBytes: undefined,
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
|
||||
if (
|
||||
executionResult.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Scheduled workflow execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
} else if (executionResult.status === 'paused') {
|
||||
if (!executionResult.snapshotSeed) {
|
||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||
executionId,
|
||||
@@ -453,6 +474,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
|
||||
loggingSession,
|
||||
requestId,
|
||||
executionId,
|
||||
asyncTimeout: preprocessResult.executionTimeout?.async,
|
||||
})
|
||||
|
||||
if (executionResult.status === 'skip') {
|
||||
|
||||
@@ -4,7 +4,14 @@ import { createLogger } from '@sim/logger'
|
||||
import { task } from '@trigger.dev/sdk'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing'
|
||||
import {
|
||||
createTimeoutAbortController,
|
||||
getExecutionTimeout,
|
||||
getTimeoutErrorMessage,
|
||||
} from '@/lib/core/execution-limits'
|
||||
import { IdempotencyService, webhookIdempotency } from '@/lib/core/idempotency'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
import { processExecutionFiles } from '@/lib/execution/files'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
@@ -134,7 +141,13 @@ async function executeWebhookJobInternal(
|
||||
requestId
|
||||
)
|
||||
|
||||
// Track deploymentVersionId at function scope so it's available in catch block
|
||||
const userSubscription = await getHighestPrioritySubscription(payload.userId)
|
||||
const asyncTimeout = getExecutionTimeout(
|
||||
userSubscription?.plan as SubscriptionPlan | undefined,
|
||||
'async'
|
||||
)
|
||||
const timeoutController = createTimeoutAbortController(asyncTimeout)
|
||||
|
||||
let deploymentVersionId: string | undefined
|
||||
|
||||
try {
|
||||
@@ -241,11 +254,22 @@ async function executeWebhookJobInternal(
|
||||
snapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true, // Enable base64 hydration
|
||||
base64MaxBytes: undefined, // Use default limit
|
||||
includeFileBase64: true,
|
||||
base64MaxBytes: undefined,
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
|
||||
if (executionResult.status === 'paused') {
|
||||
if (
|
||||
executionResult.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Airtable webhook execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
} else if (executionResult.status === 'paused') {
|
||||
if (!executionResult.snapshotSeed) {
|
||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||
executionId,
|
||||
@@ -497,9 +521,20 @@ async function executeWebhookJobInternal(
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
|
||||
if (executionResult.status === 'paused') {
|
||||
if (
|
||||
executionResult.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Webhook execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
} else if (executionResult.status === 'paused') {
|
||||
if (!executionResult.snapshotSeed) {
|
||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||
executionId,
|
||||
@@ -601,6 +636,8 @@ async function executeWebhookJobInternal(
|
||||
}
|
||||
|
||||
throw error
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { task } from '@trigger.dev/sdk'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
@@ -19,6 +20,7 @@ export type WorkflowExecutionPayload = {
|
||||
userId: string
|
||||
input?: any
|
||||
triggerType?: CoreTriggerType
|
||||
executionId?: string
|
||||
metadata?: Record<string, any>
|
||||
}
|
||||
|
||||
@@ -29,7 +31,7 @@ export type WorkflowExecutionPayload = {
|
||||
*/
|
||||
export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
||||
const workflowId = payload.workflowId
|
||||
const executionId = uuidv4()
|
||||
const executionId = payload.executionId || uuidv4()
|
||||
const requestId = executionId.slice(0, 8)
|
||||
|
||||
logger.info(`[${requestId}] Starting workflow execution job: ${workflowId}`, {
|
||||
@@ -103,15 +105,33 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
||||
[]
|
||||
)
|
||||
|
||||
const result = await executeWorkflowCore({
|
||||
snapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
base64MaxBytes: undefined,
|
||||
})
|
||||
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.async)
|
||||
|
||||
if (result.status === 'paused') {
|
||||
let result
|
||||
try {
|
||||
result = await executeWorkflowCore({
|
||||
snapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
includeFileBase64: true,
|
||||
base64MaxBytes: undefined,
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
|
||||
if (
|
||||
result.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Workflow execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
} else if (result.status === 'paused') {
|
||||
if (!result.snapshotSeed) {
|
||||
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
|
||||
executionId,
|
||||
|
||||
@@ -143,11 +143,147 @@ export const MistralParseBlock: BlockConfig<MistralParserOutput> = {
|
||||
},
|
||||
}
|
||||
|
||||
/**
|
||||
* V2 Block - Restored from main branch for backwards compatibility
|
||||
* Hidden from toolbar, uses filePath subblock ID for advanced mode
|
||||
*/
|
||||
export const MistralParseV2Block: BlockConfig<MistralParserOutput> = {
|
||||
...MistralParseBlock,
|
||||
type: 'mistral_parse_v2',
|
||||
name: 'Mistral Parser',
|
||||
description: 'Extract text from PDF documents',
|
||||
hideFromToolbar: true,
|
||||
subBlocks: [
|
||||
{
|
||||
id: 'fileUpload',
|
||||
title: 'PDF Document',
|
||||
type: 'file-upload' as SubBlockType,
|
||||
canonicalParamId: 'document',
|
||||
acceptedTypes: 'application/pdf',
|
||||
placeholder: 'Upload a PDF document',
|
||||
mode: 'basic',
|
||||
maxSize: 50,
|
||||
},
|
||||
{
|
||||
id: 'filePath',
|
||||
title: 'PDF Document',
|
||||
type: 'short-input' as SubBlockType,
|
||||
canonicalParamId: 'document',
|
||||
placeholder: 'Document URL',
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'resultType',
|
||||
title: 'Output Format',
|
||||
type: 'dropdown',
|
||||
options: [
|
||||
{ id: 'markdown', label: 'Markdown' },
|
||||
{ id: 'text', label: 'Plain Text' },
|
||||
{ id: 'json', label: 'JSON' },
|
||||
],
|
||||
},
|
||||
{
|
||||
id: 'pages',
|
||||
title: 'Specific Pages',
|
||||
type: 'short-input',
|
||||
placeholder: 'e.g. 0,1,2 (leave empty for all pages)',
|
||||
},
|
||||
{
|
||||
id: 'apiKey',
|
||||
title: 'API Key',
|
||||
type: 'short-input' as SubBlockType,
|
||||
placeholder: 'Enter your Mistral API key',
|
||||
password: true,
|
||||
required: true,
|
||||
},
|
||||
],
|
||||
tools: {
|
||||
access: ['mistral_parser_v2'],
|
||||
config: {
|
||||
tool: createVersionedToolSelector({
|
||||
baseToolSelector: () => 'mistral_parser',
|
||||
suffix: '_v2',
|
||||
fallbackToolId: 'mistral_parser_v2',
|
||||
}),
|
||||
params: (params) => {
|
||||
if (!params || !params.apiKey || params.apiKey.trim() === '') {
|
||||
throw new Error('Mistral API key is required')
|
||||
}
|
||||
|
||||
const parameters: Record<string, unknown> = {
|
||||
apiKey: params.apiKey.trim(),
|
||||
resultType: params.resultType || 'markdown',
|
||||
}
|
||||
|
||||
// Original V2 pattern: fileUpload (basic) or filePath (advanced) or document (wired)
|
||||
const documentInput = params.fileUpload || params.filePath || params.document
|
||||
if (!documentInput) {
|
||||
throw new Error('PDF document is required')
|
||||
}
|
||||
// Smart handling: object → fileUpload param, string → filePath param
|
||||
if (typeof documentInput === 'object') {
|
||||
parameters.fileUpload = documentInput
|
||||
} else if (typeof documentInput === 'string') {
|
||||
parameters.filePath = documentInput.trim()
|
||||
}
|
||||
|
||||
let pagesArray: number[] | undefined
|
||||
if (params.pages && params.pages.trim() !== '') {
|
||||
try {
|
||||
pagesArray = params.pages
|
||||
.split(',')
|
||||
.map((p: string) => p.trim())
|
||||
.filter((p: string) => p.length > 0)
|
||||
.map((p: string) => {
|
||||
const num = Number.parseInt(p, 10)
|
||||
if (Number.isNaN(num) || num < 0) {
|
||||
throw new Error(`Invalid page number: ${p}`)
|
||||
}
|
||||
return num
|
||||
})
|
||||
|
||||
if (pagesArray && pagesArray.length === 0) {
|
||||
pagesArray = undefined
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||
throw new Error(`Page number format error: ${errorMessage}`)
|
||||
}
|
||||
}
|
||||
|
||||
if (pagesArray && pagesArray.length > 0) {
|
||||
parameters.pages = pagesArray
|
||||
}
|
||||
|
||||
return parameters
|
||||
},
|
||||
},
|
||||
},
|
||||
inputs: {
|
||||
document: { type: 'json', description: 'Document input (file upload or URL reference)' },
|
||||
filePath: { type: 'string', description: 'PDF document URL (advanced mode)' },
|
||||
fileUpload: { type: 'json', description: 'Uploaded PDF file (basic mode)' },
|
||||
apiKey: { type: 'string', description: 'Mistral API key' },
|
||||
resultType: { type: 'string', description: 'Output format type' },
|
||||
pages: { type: 'string', description: 'Page selection' },
|
||||
},
|
||||
outputs: {
|
||||
pages: { type: 'array', description: 'Array of page objects from Mistral OCR' },
|
||||
model: { type: 'string', description: 'Mistral OCR model identifier' },
|
||||
usage_info: { type: 'json', description: 'Usage statistics from the API' },
|
||||
document_annotation: { type: 'string', description: 'Structured annotation data' },
|
||||
},
|
||||
}
|
||||
|
||||
/**
|
||||
* V3 Block - New file handling pattern with UserFile normalization
|
||||
* Uses fileReference subblock ID with canonicalParamId for proper file handling
|
||||
*/
|
||||
export const MistralParseV3Block: BlockConfig<MistralParserOutput> = {
|
||||
...MistralParseBlock,
|
||||
type: 'mistral_parse_v3',
|
||||
name: 'Mistral Parser',
|
||||
description: 'Extract text from PDF documents',
|
||||
hideFromToolbar: false,
|
||||
subBlocks: [
|
||||
{
|
||||
@@ -196,13 +332,9 @@ export const MistralParseV2Block: BlockConfig<MistralParserOutput> = {
|
||||
},
|
||||
],
|
||||
tools: {
|
||||
access: ['mistral_parser_v2'],
|
||||
access: ['mistral_parser_v3'],
|
||||
config: {
|
||||
tool: createVersionedToolSelector({
|
||||
baseToolSelector: () => 'mistral_parser',
|
||||
suffix: '_v2',
|
||||
fallbackToolId: 'mistral_parser_v2',
|
||||
}),
|
||||
tool: () => 'mistral_parser_v3',
|
||||
params: (params) => {
|
||||
if (!params || !params.apiKey || params.apiKey.trim() === '') {
|
||||
throw new Error('Mistral API key is required')
|
||||
@@ -213,6 +345,7 @@ export const MistralParseV2Block: BlockConfig<MistralParserOutput> = {
|
||||
resultType: params.resultType || 'markdown',
|
||||
}
|
||||
|
||||
// V3 pattern: normalize file inputs from basic/advanced modes
|
||||
const documentInput = normalizeFileInput(
|
||||
params.fileUpload || params.fileReference || params.document,
|
||||
{ single: true }
|
||||
|
||||
@@ -79,7 +79,11 @@ import { MemoryBlock } from '@/blocks/blocks/memory'
|
||||
import { MicrosoftExcelBlock, MicrosoftExcelV2Block } from '@/blocks/blocks/microsoft_excel'
|
||||
import { MicrosoftPlannerBlock } from '@/blocks/blocks/microsoft_planner'
|
||||
import { MicrosoftTeamsBlock } from '@/blocks/blocks/microsoft_teams'
|
||||
import { MistralParseBlock, MistralParseV2Block } from '@/blocks/blocks/mistral_parse'
|
||||
import {
|
||||
MistralParseBlock,
|
||||
MistralParseV2Block,
|
||||
MistralParseV3Block,
|
||||
} from '@/blocks/blocks/mistral_parse'
|
||||
import { MongoDBBlock } from '@/blocks/blocks/mongodb'
|
||||
import { MySQLBlock } from '@/blocks/blocks/mysql'
|
||||
import { Neo4jBlock } from '@/blocks/blocks/neo4j'
|
||||
@@ -255,6 +259,7 @@ export const registry: Record<string, BlockConfig> = {
|
||||
microsoft_teams: MicrosoftTeamsBlock,
|
||||
mistral_parse: MistralParseBlock,
|
||||
mistral_parse_v2: MistralParseV2Block,
|
||||
mistral_parse_v3: MistralParseV3Block,
|
||||
mongodb: MongoDBBlock,
|
||||
mysql: MySQLBlock,
|
||||
neo4j: Neo4jBlock,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import type { LoopType, ParallelType } from '@/lib/workflows/types'
|
||||
|
||||
/**
|
||||
@@ -187,8 +188,12 @@ export const HTTP = {
|
||||
|
||||
export const AGENT = {
|
||||
DEFAULT_MODEL: 'claude-sonnet-4-5',
|
||||
DEFAULT_FUNCTION_TIMEOUT: 600000,
|
||||
REQUEST_TIMEOUT: 600000,
|
||||
get DEFAULT_FUNCTION_TIMEOUT() {
|
||||
return getMaxExecutionTimeout()
|
||||
},
|
||||
get REQUEST_TIMEOUT() {
|
||||
return getMaxExecutionTimeout()
|
||||
},
|
||||
CUSTOM_TOOL_PREFIX: 'custom_',
|
||||
} as const
|
||||
|
||||
|
||||
@@ -162,6 +162,8 @@ export class ExecutionEngine {
|
||||
}
|
||||
}
|
||||
|
||||
this.finalizeIncompleteLogs()
|
||||
|
||||
const errorMessage = normalizeError(error)
|
||||
logger.error('Execution failed', { error: errorMessage })
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
const { signal, executionId } = options
|
||||
const useRedis = isRedisCancellationEnabled() && !!executionId
|
||||
|
||||
if (!useRedis && signal?.aborted) {
|
||||
if (signal?.aborted) {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
const cleanup = () => {
|
||||
if (mainTimeoutId) clearTimeout(mainTimeoutId)
|
||||
if (checkIntervalId) clearInterval(checkIntervalId)
|
||||
if (!useRedis && signal) signal.removeEventListener('abort', onAbort)
|
||||
if (signal) signal.removeEventListener('abort', onAbort)
|
||||
}
|
||||
|
||||
const onAbort = () => {
|
||||
@@ -37,6 +37,10 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
resolve(false)
|
||||
}
|
||||
|
||||
if (signal) {
|
||||
signal.addEventListener('abort', onAbort, { once: true })
|
||||
}
|
||||
|
||||
if (useRedis) {
|
||||
checkIntervalId = setInterval(async () => {
|
||||
if (resolved) return
|
||||
@@ -49,8 +53,6 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
|
||||
}
|
||||
} catch {}
|
||||
}, CANCELLATION_CHECK_INTERVAL_MS)
|
||||
} else if (signal) {
|
||||
signal.addEventListener('abort', onAbort, { once: true })
|
||||
}
|
||||
|
||||
mainTimeoutId = setTimeout(() => {
|
||||
|
||||
@@ -126,6 +126,7 @@ export class WorkflowBlockHandler implements BlockHandler {
|
||||
workspaceId: ctx.workspaceId,
|
||||
userId: ctx.userId,
|
||||
executionId: ctx.executionId,
|
||||
abortSignal: ctx.abortSignal,
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
@@ -229,6 +229,10 @@ export function addSubflowErrorLog(
|
||||
}
|
||||
ctx.blockLogs.push(blockLog)
|
||||
|
||||
if (contextExtensions?.onBlockStart) {
|
||||
contextExtensions.onBlockStart(blockId, blockName, blockType, execOrder)
|
||||
}
|
||||
|
||||
if (contextExtensions?.onBlockComplete) {
|
||||
contextExtensions.onBlockComplete(blockId, blockName, blockType, {
|
||||
input: inputData,
|
||||
|
||||
@@ -549,11 +549,12 @@ export function useActivateDeploymentVersion() {
|
||||
workflowId,
|
||||
version,
|
||||
}: ActivateVersionVariables): Promise<ActivateVersionResult> => {
|
||||
const response = await fetch(`/api/workflows/${workflowId}/deployments/${version}/activate`, {
|
||||
method: 'POST',
|
||||
const response = await fetch(`/api/workflows/${workflowId}/deployments/${version}`, {
|
||||
method: 'PATCH',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({ isActive: true }),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
|
||||
@@ -211,12 +211,16 @@ export function useExecutionStream() {
|
||||
currentExecutionRef.current = null
|
||||
|
||||
try {
|
||||
const response = await fetch(`/api/workflows/${workflowId}/execute-from-block`, {
|
||||
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({ startBlockId, sourceSnapshot, input }),
|
||||
body: JSON.stringify({
|
||||
stream: true,
|
||||
input,
|
||||
runFromBlock: { startBlockId, sourceSnapshot },
|
||||
}),
|
||||
signal: abortController.signal,
|
||||
})
|
||||
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
export { AGENT_CARD_PATH } from '@a2a-js/sdk'
|
||||
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
|
||||
export const A2A_PROTOCOL_VERSION = '0.3.0'
|
||||
|
||||
export const A2A_DEFAULT_TIMEOUT = 300000
|
||||
export const A2A_DEFAULT_TIMEOUT = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
/**
|
||||
* Maximum number of messages stored per task in the database.
|
||||
|
||||
@@ -5,10 +5,8 @@ import type { ToolUIConfig } from './ui-config'
|
||||
|
||||
const baseToolLogger = createLogger('BaseClientTool')
|
||||
|
||||
/** Default timeout for tool execution (5 minutes) */
|
||||
const DEFAULT_TOOL_TIMEOUT_MS = 2 * 60 * 1000
|
||||
const DEFAULT_TOOL_TIMEOUT_MS = 5 * 60 * 1000
|
||||
|
||||
/** Timeout for tools that run workflows (10 minutes) */
|
||||
export const WORKFLOW_EXECUTION_TIMEOUT_MS = 10 * 60 * 1000
|
||||
|
||||
// Client tool call states used by the new runtime
|
||||
|
||||
113
apps/sim/lib/core/async-jobs/backends/database.ts
Normal file
113
apps/sim/lib/core/async-jobs/backends/database.ts
Normal file
@@ -0,0 +1,113 @@
|
||||
import { asyncJobs, db } from '@sim/db'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq, sql } from 'drizzle-orm'
|
||||
import {
|
||||
type EnqueueOptions,
|
||||
JOB_STATUS,
|
||||
type Job,
|
||||
type JobMetadata,
|
||||
type JobQueueBackend,
|
||||
type JobStatus,
|
||||
type JobType,
|
||||
} from '@/lib/core/async-jobs/types'
|
||||
|
||||
const logger = createLogger('DatabaseJobQueue')
|
||||
|
||||
type AsyncJobRow = typeof asyncJobs.$inferSelect
|
||||
|
||||
function rowToJob(row: AsyncJobRow): Job {
|
||||
return {
|
||||
id: row.id,
|
||||
type: row.type as JobType,
|
||||
payload: row.payload,
|
||||
status: row.status as JobStatus,
|
||||
createdAt: row.createdAt,
|
||||
startedAt: row.startedAt ?? undefined,
|
||||
completedAt: row.completedAt ?? undefined,
|
||||
attempts: row.attempts,
|
||||
maxAttempts: row.maxAttempts,
|
||||
error: row.error ?? undefined,
|
||||
output: row.output as unknown,
|
||||
metadata: (row.metadata ?? {}) as JobMetadata,
|
||||
}
|
||||
}
|
||||
|
||||
export class DatabaseJobQueue implements JobQueueBackend {
|
||||
async enqueue<TPayload>(
|
||||
type: JobType,
|
||||
payload: TPayload,
|
||||
options?: EnqueueOptions
|
||||
): Promise<string> {
|
||||
const jobId = `run_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`
|
||||
const now = new Date()
|
||||
|
||||
await db.insert(asyncJobs).values({
|
||||
id: jobId,
|
||||
type,
|
||||
payload: payload as Record<string, unknown>,
|
||||
status: JOB_STATUS.PENDING,
|
||||
createdAt: now,
|
||||
attempts: 0,
|
||||
maxAttempts: options?.maxAttempts ?? 3,
|
||||
metadata: (options?.metadata ?? {}) as Record<string, unknown>,
|
||||
updatedAt: now,
|
||||
})
|
||||
|
||||
logger.debug('Enqueued job', { jobId, type })
|
||||
return jobId
|
||||
}
|
||||
|
||||
async getJob(jobId: string): Promise<Job | null> {
|
||||
const [row] = await db.select().from(asyncJobs).where(eq(asyncJobs.id, jobId)).limit(1)
|
||||
|
||||
return row ? rowToJob(row) : null
|
||||
}
|
||||
|
||||
async startJob(jobId: string): Promise<void> {
|
||||
const now = new Date()
|
||||
|
||||
await db
|
||||
.update(asyncJobs)
|
||||
.set({
|
||||
status: JOB_STATUS.PROCESSING,
|
||||
startedAt: now,
|
||||
attempts: sql`${asyncJobs.attempts} + 1`,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(asyncJobs.id, jobId))
|
||||
|
||||
logger.debug('Started job', { jobId })
|
||||
}
|
||||
|
||||
async completeJob(jobId: string, output: unknown): Promise<void> {
|
||||
const now = new Date()
|
||||
|
||||
await db
|
||||
.update(asyncJobs)
|
||||
.set({
|
||||
status: JOB_STATUS.COMPLETED,
|
||||
completedAt: now,
|
||||
output: output as Record<string, unknown>,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(asyncJobs.id, jobId))
|
||||
|
||||
logger.debug('Completed job', { jobId })
|
||||
}
|
||||
|
||||
async markJobFailed(jobId: string, error: string): Promise<void> {
|
||||
const now = new Date()
|
||||
|
||||
await db
|
||||
.update(asyncJobs)
|
||||
.set({
|
||||
status: JOB_STATUS.FAILED,
|
||||
completedAt: now,
|
||||
error,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(asyncJobs.id, jobId))
|
||||
|
||||
logger.debug('Marked job as failed', { jobId })
|
||||
}
|
||||
}
|
||||
3
apps/sim/lib/core/async-jobs/backends/index.ts
Normal file
3
apps/sim/lib/core/async-jobs/backends/index.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
export { DatabaseJobQueue } from './database'
|
||||
export { RedisJobQueue } from './redis'
|
||||
export { TriggerDevJobQueue } from './trigger-dev'
|
||||
176
apps/sim/lib/core/async-jobs/backends/redis.test.ts
Normal file
176
apps/sim/lib/core/async-jobs/backends/redis.test.ts
Normal file
@@ -0,0 +1,176 @@
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
import { createMockRedis, loggerMock, type MockRedis } from '@sim/testing'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
vi.mock('@sim/logger', () => loggerMock)
|
||||
|
||||
import {
|
||||
JOB_MAX_LIFETIME_SECONDS,
|
||||
JOB_RETENTION_SECONDS,
|
||||
JOB_STATUS,
|
||||
} from '@/lib/core/async-jobs/types'
|
||||
import { RedisJobQueue } from './redis'
|
||||
|
||||
describe('RedisJobQueue', () => {
|
||||
let mockRedis: MockRedis
|
||||
let queue: RedisJobQueue
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
mockRedis = createMockRedis()
|
||||
queue = new RedisJobQueue(mockRedis as never)
|
||||
})
|
||||
|
||||
describe('enqueue', () => {
|
||||
it.concurrent('should create a job with pending status', async () => {
|
||||
const localRedis = createMockRedis()
|
||||
const localQueue = new RedisJobQueue(localRedis as never)
|
||||
|
||||
const jobId = await localQueue.enqueue('workflow-execution', { test: 'data' })
|
||||
|
||||
expect(jobId).toMatch(/^run_/)
|
||||
expect(localRedis.hset).toHaveBeenCalledTimes(1)
|
||||
|
||||
const [key, data] = localRedis.hset.mock.calls[0]
|
||||
expect(key).toBe(`async-jobs:job:${jobId}`)
|
||||
expect(data.status).toBe(JOB_STATUS.PENDING)
|
||||
expect(data.type).toBe('workflow-execution')
|
||||
})
|
||||
|
||||
it.concurrent('should set max lifetime TTL on enqueue', async () => {
|
||||
const localRedis = createMockRedis()
|
||||
const localQueue = new RedisJobQueue(localRedis as never)
|
||||
|
||||
const jobId = await localQueue.enqueue('workflow-execution', { test: 'data' })
|
||||
|
||||
expect(localRedis.expire).toHaveBeenCalledWith(
|
||||
`async-jobs:job:${jobId}`,
|
||||
JOB_MAX_LIFETIME_SECONDS
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
describe('completeJob', () => {
|
||||
it.concurrent('should set status to completed and set TTL', async () => {
|
||||
const localRedis = createMockRedis()
|
||||
const localQueue = new RedisJobQueue(localRedis as never)
|
||||
const jobId = 'run_test123'
|
||||
|
||||
await localQueue.completeJob(jobId, { result: 'success' })
|
||||
|
||||
expect(localRedis.hset).toHaveBeenCalledWith(`async-jobs:job:${jobId}`, {
|
||||
status: JOB_STATUS.COMPLETED,
|
||||
completedAt: expect.any(String),
|
||||
output: JSON.stringify({ result: 'success' }),
|
||||
updatedAt: expect.any(String),
|
||||
})
|
||||
expect(localRedis.expire).toHaveBeenCalledWith(
|
||||
`async-jobs:job:${jobId}`,
|
||||
JOB_RETENTION_SECONDS
|
||||
)
|
||||
})
|
||||
|
||||
it.concurrent('should set TTL to 24 hours (86400 seconds)', async () => {
|
||||
const localRedis = createMockRedis()
|
||||
const localQueue = new RedisJobQueue(localRedis as never)
|
||||
|
||||
await localQueue.completeJob('run_test123', {})
|
||||
|
||||
expect(localRedis.expire).toHaveBeenCalledWith(expect.any(String), 86400)
|
||||
})
|
||||
})
|
||||
|
||||
describe('markJobFailed', () => {
|
||||
it.concurrent('should set status to failed and set TTL', async () => {
|
||||
const localRedis = createMockRedis()
|
||||
const localQueue = new RedisJobQueue(localRedis as never)
|
||||
const jobId = 'run_test456'
|
||||
const error = 'Something went wrong'
|
||||
|
||||
await localQueue.markJobFailed(jobId, error)
|
||||
|
||||
expect(localRedis.hset).toHaveBeenCalledWith(`async-jobs:job:${jobId}`, {
|
||||
status: JOB_STATUS.FAILED,
|
||||
completedAt: expect.any(String),
|
||||
error,
|
||||
updatedAt: expect.any(String),
|
||||
})
|
||||
expect(localRedis.expire).toHaveBeenCalledWith(
|
||||
`async-jobs:job:${jobId}`,
|
||||
JOB_RETENTION_SECONDS
|
||||
)
|
||||
})
|
||||
|
||||
it.concurrent('should set TTL to 24 hours (86400 seconds)', async () => {
|
||||
const localRedis = createMockRedis()
|
||||
const localQueue = new RedisJobQueue(localRedis as never)
|
||||
|
||||
await localQueue.markJobFailed('run_test456', 'error')
|
||||
|
||||
expect(localRedis.expire).toHaveBeenCalledWith(expect.any(String), 86400)
|
||||
})
|
||||
})
|
||||
|
||||
describe('startJob', () => {
|
||||
it.concurrent('should not set TTL when starting a job', async () => {
|
||||
const localRedis = createMockRedis()
|
||||
const localQueue = new RedisJobQueue(localRedis as never)
|
||||
|
||||
await localQueue.startJob('run_test789')
|
||||
|
||||
expect(localRedis.hset).toHaveBeenCalled()
|
||||
expect(localRedis.expire).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
describe('getJob', () => {
|
||||
it.concurrent('should return null for non-existent job', async () => {
|
||||
const localRedis = createMockRedis()
|
||||
const localQueue = new RedisJobQueue(localRedis as never)
|
||||
localRedis.hgetall.mockResolvedValue({})
|
||||
|
||||
const job = await localQueue.getJob('run_nonexistent')
|
||||
|
||||
expect(job).toBeNull()
|
||||
})
|
||||
|
||||
it.concurrent('should deserialize job data correctly', async () => {
|
||||
const localRedis = createMockRedis()
|
||||
const localQueue = new RedisJobQueue(localRedis as never)
|
||||
const now = new Date()
|
||||
localRedis.hgetall.mockResolvedValue({
|
||||
id: 'run_test',
|
||||
type: 'workflow-execution',
|
||||
payload: JSON.stringify({ foo: 'bar' }),
|
||||
status: JOB_STATUS.COMPLETED,
|
||||
createdAt: now.toISOString(),
|
||||
startedAt: now.toISOString(),
|
||||
completedAt: now.toISOString(),
|
||||
attempts: '1',
|
||||
maxAttempts: '3',
|
||||
error: '',
|
||||
output: JSON.stringify({ result: 'ok' }),
|
||||
metadata: JSON.stringify({ workflowId: 'wf_123' }),
|
||||
})
|
||||
|
||||
const job = await localQueue.getJob('run_test')
|
||||
|
||||
expect(job).not.toBeNull()
|
||||
expect(job?.id).toBe('run_test')
|
||||
expect(job?.type).toBe('workflow-execution')
|
||||
expect(job?.payload).toEqual({ foo: 'bar' })
|
||||
expect(job?.status).toBe(JOB_STATUS.COMPLETED)
|
||||
expect(job?.output).toEqual({ result: 'ok' })
|
||||
expect(job?.metadata.workflowId).toBe('wf_123')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('JOB_RETENTION_SECONDS', () => {
|
||||
it.concurrent('should be 24 hours in seconds', async () => {
|
||||
expect(JOB_RETENTION_SECONDS).toBe(24 * 60 * 60)
|
||||
expect(JOB_RETENTION_SECONDS).toBe(86400)
|
||||
})
|
||||
})
|
||||
146
apps/sim/lib/core/async-jobs/backends/redis.ts
Normal file
146
apps/sim/lib/core/async-jobs/backends/redis.ts
Normal file
@@ -0,0 +1,146 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type Redis from 'ioredis'
|
||||
import {
|
||||
type EnqueueOptions,
|
||||
JOB_MAX_LIFETIME_SECONDS,
|
||||
JOB_RETENTION_SECONDS,
|
||||
JOB_STATUS,
|
||||
type Job,
|
||||
type JobMetadata,
|
||||
type JobQueueBackend,
|
||||
type JobStatus,
|
||||
type JobType,
|
||||
} from '@/lib/core/async-jobs/types'
|
||||
|
||||
const logger = createLogger('RedisJobQueue')
|
||||
|
||||
const KEYS = {
|
||||
job: (id: string) => `async-jobs:job:${id}`,
|
||||
} as const
|
||||
|
||||
function serializeJob(job: Job): Record<string, string> {
|
||||
return {
|
||||
id: job.id,
|
||||
type: job.type,
|
||||
payload: JSON.stringify(job.payload),
|
||||
status: job.status,
|
||||
createdAt: job.createdAt.toISOString(),
|
||||
startedAt: job.startedAt?.toISOString() ?? '',
|
||||
completedAt: job.completedAt?.toISOString() ?? '',
|
||||
attempts: job.attempts.toString(),
|
||||
maxAttempts: job.maxAttempts.toString(),
|
||||
error: job.error ?? '',
|
||||
output: job.output !== undefined ? JSON.stringify(job.output) : '',
|
||||
metadata: JSON.stringify(job.metadata),
|
||||
updatedAt: new Date().toISOString(),
|
||||
}
|
||||
}
|
||||
|
||||
function deserializeJob(data: Record<string, string>): Job | null {
|
||||
if (!data || !data.id) return null
|
||||
|
||||
try {
|
||||
return {
|
||||
id: data.id,
|
||||
type: data.type as JobType,
|
||||
payload: JSON.parse(data.payload),
|
||||
status: data.status as JobStatus,
|
||||
createdAt: new Date(data.createdAt),
|
||||
startedAt: data.startedAt ? new Date(data.startedAt) : undefined,
|
||||
completedAt: data.completedAt ? new Date(data.completedAt) : undefined,
|
||||
attempts: Number.parseInt(data.attempts, 10),
|
||||
maxAttempts: Number.parseInt(data.maxAttempts, 10),
|
||||
error: data.error || undefined,
|
||||
output: data.output ? JSON.parse(data.output) : undefined,
|
||||
metadata: JSON.parse(data.metadata) as JobMetadata,
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to deserialize job', { error, data })
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
export class RedisJobQueue implements JobQueueBackend {
|
||||
private redis: Redis
|
||||
|
||||
constructor(redis: Redis) {
|
||||
this.redis = redis
|
||||
}
|
||||
|
||||
async enqueue<TPayload>(
|
||||
type: JobType,
|
||||
payload: TPayload,
|
||||
options?: EnqueueOptions
|
||||
): Promise<string> {
|
||||
const jobId = `run_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`
|
||||
const now = new Date()
|
||||
|
||||
const job: Job<TPayload> = {
|
||||
id: jobId,
|
||||
type,
|
||||
payload,
|
||||
status: JOB_STATUS.PENDING,
|
||||
createdAt: now,
|
||||
attempts: 0,
|
||||
maxAttempts: options?.maxAttempts ?? 3,
|
||||
metadata: options?.metadata ?? {},
|
||||
}
|
||||
|
||||
const key = KEYS.job(jobId)
|
||||
const serialized = serializeJob(job as Job)
|
||||
await this.redis.hset(key, serialized)
|
||||
await this.redis.expire(key, JOB_MAX_LIFETIME_SECONDS)
|
||||
|
||||
logger.debug('Enqueued job', { jobId, type })
|
||||
return jobId
|
||||
}
|
||||
|
||||
async getJob(jobId: string): Promise<Job | null> {
|
||||
const data = await this.redis.hgetall(KEYS.job(jobId))
|
||||
return deserializeJob(data)
|
||||
}
|
||||
|
||||
async startJob(jobId: string): Promise<void> {
|
||||
const now = new Date()
|
||||
const key = KEYS.job(jobId)
|
||||
|
||||
await this.redis.hset(key, {
|
||||
status: JOB_STATUS.PROCESSING,
|
||||
startedAt: now.toISOString(),
|
||||
updatedAt: now.toISOString(),
|
||||
})
|
||||
await this.redis.hincrby(key, 'attempts', 1)
|
||||
|
||||
logger.debug('Started job', { jobId })
|
||||
}
|
||||
|
||||
async completeJob(jobId: string, output: unknown): Promise<void> {
|
||||
const now = new Date()
|
||||
const key = KEYS.job(jobId)
|
||||
|
||||
await this.redis.hset(key, {
|
||||
status: JOB_STATUS.COMPLETED,
|
||||
completedAt: now.toISOString(),
|
||||
output: JSON.stringify(output),
|
||||
updatedAt: now.toISOString(),
|
||||
})
|
||||
await this.redis.expire(key, JOB_RETENTION_SECONDS)
|
||||
|
||||
logger.debug('Completed job', { jobId })
|
||||
}
|
||||
|
||||
async markJobFailed(jobId: string, error: string): Promise<void> {
|
||||
const now = new Date()
|
||||
const key = KEYS.job(jobId)
|
||||
|
||||
await this.redis.hset(key, {
|
||||
status: JOB_STATUS.FAILED,
|
||||
completedAt: now.toISOString(),
|
||||
error,
|
||||
updatedAt: now.toISOString(),
|
||||
})
|
||||
await this.redis.expire(key, JOB_RETENTION_SECONDS)
|
||||
|
||||
logger.debug('Marked job as failed', { jobId })
|
||||
}
|
||||
}
|
||||
119
apps/sim/lib/core/async-jobs/backends/trigger-dev.ts
Normal file
119
apps/sim/lib/core/async-jobs/backends/trigger-dev.ts
Normal file
@@ -0,0 +1,119 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { runs, tasks } from '@trigger.dev/sdk'
|
||||
import {
|
||||
type EnqueueOptions,
|
||||
JOB_STATUS,
|
||||
type Job,
|
||||
type JobMetadata,
|
||||
type JobQueueBackend,
|
||||
type JobStatus,
|
||||
type JobType,
|
||||
} from '@/lib/core/async-jobs/types'
|
||||
|
||||
const logger = createLogger('TriggerDevJobQueue')
|
||||
|
||||
/**
|
||||
* Maps trigger.dev task IDs to our JobType
|
||||
*/
|
||||
const JOB_TYPE_TO_TASK_ID: Record<JobType, string> = {
|
||||
'workflow-execution': 'workflow-execution',
|
||||
'schedule-execution': 'schedule-execution',
|
||||
'webhook-execution': 'webhook-execution',
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps trigger.dev run status to our JobStatus
|
||||
*/
|
||||
function mapTriggerDevStatus(status: string): JobStatus {
|
||||
switch (status) {
|
||||
case 'QUEUED':
|
||||
case 'WAITING_FOR_DEPLOY':
|
||||
return JOB_STATUS.PENDING
|
||||
case 'EXECUTING':
|
||||
case 'RESCHEDULED':
|
||||
case 'FROZEN':
|
||||
return JOB_STATUS.PROCESSING
|
||||
case 'COMPLETED':
|
||||
return JOB_STATUS.COMPLETED
|
||||
case 'CANCELED':
|
||||
case 'FAILED':
|
||||
case 'CRASHED':
|
||||
case 'INTERRUPTED':
|
||||
case 'SYSTEM_FAILURE':
|
||||
case 'EXPIRED':
|
||||
return JOB_STATUS.FAILED
|
||||
default:
|
||||
return JOB_STATUS.PENDING
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adapter that wraps the trigger.dev SDK to conform to JobQueueBackend interface.
|
||||
*/
|
||||
export class TriggerDevJobQueue implements JobQueueBackend {
|
||||
async enqueue<TPayload>(
|
||||
type: JobType,
|
||||
payload: TPayload,
|
||||
options?: EnqueueOptions
|
||||
): Promise<string> {
|
||||
const taskId = JOB_TYPE_TO_TASK_ID[type]
|
||||
if (!taskId) {
|
||||
throw new Error(`Unknown job type: ${type}`)
|
||||
}
|
||||
|
||||
const enrichedPayload =
|
||||
options?.metadata && typeof payload === 'object' && payload !== null
|
||||
? { ...payload, ...options.metadata }
|
||||
: payload
|
||||
|
||||
const handle = await tasks.trigger(taskId, enrichedPayload)
|
||||
|
||||
logger.debug('Enqueued job via trigger.dev', { jobId: handle.id, type, taskId })
|
||||
return handle.id
|
||||
}
|
||||
|
||||
async getJob(jobId: string): Promise<Job | null> {
|
||||
try {
|
||||
const run = await runs.retrieve(jobId)
|
||||
|
||||
const payload = run.payload as Record<string, unknown>
|
||||
const metadata: JobMetadata = {
|
||||
workflowId: payload?.workflowId as string | undefined,
|
||||
userId: payload?.userId as string | undefined,
|
||||
}
|
||||
|
||||
return {
|
||||
id: jobId,
|
||||
type: run.taskIdentifier as JobType,
|
||||
payload: run.payload,
|
||||
status: mapTriggerDevStatus(run.status),
|
||||
createdAt: run.createdAt ? new Date(run.createdAt) : new Date(),
|
||||
startedAt: run.startedAt ? new Date(run.startedAt) : undefined,
|
||||
completedAt: run.finishedAt ? new Date(run.finishedAt) : undefined,
|
||||
attempts: run.attemptCount ?? 1,
|
||||
maxAttempts: 3,
|
||||
error: run.error?.message,
|
||||
output: run.output as unknown,
|
||||
metadata,
|
||||
}
|
||||
} catch (error) {
|
||||
const isNotFound =
|
||||
(error instanceof Error && error.message.toLowerCase().includes('not found')) ||
|
||||
(error && typeof error === 'object' && 'status' in error && error.status === 404)
|
||||
|
||||
if (isNotFound) {
|
||||
logger.debug('Job not found in trigger.dev', { jobId })
|
||||
return null
|
||||
}
|
||||
|
||||
logger.error('Failed to get job from trigger.dev', { jobId, error })
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
async startJob(_jobId: string): Promise<void> {}
|
||||
|
||||
async completeJob(_jobId: string, _output: unknown): Promise<void> {}
|
||||
|
||||
async markJobFailed(_jobId: string, _error: string): Promise<void> {}
|
||||
}
|
||||
88
apps/sim/lib/core/async-jobs/config.ts
Normal file
88
apps/sim/lib/core/async-jobs/config.ts
Normal file
@@ -0,0 +1,88 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { AsyncBackendType, JobQueueBackend } from '@/lib/core/async-jobs/types'
|
||||
import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { getRedisClient } from '@/lib/core/config/redis'
|
||||
|
||||
const logger = createLogger('AsyncJobsConfig')
|
||||
|
||||
let cachedBackend: JobQueueBackend | null = null
|
||||
let cachedBackendType: AsyncBackendType | null = null
|
||||
|
||||
/**
|
||||
* Determines which async backend to use based on environment configuration.
|
||||
* Follows the fallback chain: trigger.dev → redis → database
|
||||
*/
|
||||
export function getAsyncBackendType(): AsyncBackendType {
|
||||
if (isTriggerDevEnabled) {
|
||||
return 'trigger-dev'
|
||||
}
|
||||
|
||||
const redis = getRedisClient()
|
||||
if (redis) {
|
||||
return 'redis'
|
||||
}
|
||||
|
||||
return 'database'
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the job queue backend singleton.
|
||||
* Creates the appropriate backend based on environment configuration.
|
||||
*/
|
||||
export async function getJobQueue(): Promise<JobQueueBackend> {
|
||||
if (cachedBackend) {
|
||||
return cachedBackend
|
||||
}
|
||||
|
||||
const type = getAsyncBackendType()
|
||||
|
||||
switch (type) {
|
||||
case 'trigger-dev': {
|
||||
const { TriggerDevJobQueue } = await import('@/lib/core/async-jobs/backends/trigger-dev')
|
||||
cachedBackend = new TriggerDevJobQueue()
|
||||
break
|
||||
}
|
||||
case 'redis': {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
throw new Error('Redis client not available but redis backend was selected')
|
||||
}
|
||||
const { RedisJobQueue } = await import('@/lib/core/async-jobs/backends/redis')
|
||||
cachedBackend = new RedisJobQueue(redis)
|
||||
break
|
||||
}
|
||||
case 'database': {
|
||||
const { DatabaseJobQueue } = await import('@/lib/core/async-jobs/backends/database')
|
||||
cachedBackend = new DatabaseJobQueue()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
cachedBackendType = type
|
||||
logger.info(`Async job backend initialized: ${type}`)
|
||||
|
||||
return cachedBackend
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current backend type (for logging/debugging)
|
||||
*/
|
||||
export function getCurrentBackendType(): AsyncBackendType | null {
|
||||
return cachedBackendType
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if jobs should be executed inline (fire-and-forget).
|
||||
* For Redis/DB backends, we execute inline. Trigger.dev handles execution itself.
|
||||
*/
|
||||
export function shouldExecuteInline(): boolean {
|
||||
return getAsyncBackendType() !== 'trigger-dev'
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets the cached backend (useful for testing)
|
||||
*/
|
||||
export function resetJobQueueCache(): void {
|
||||
cachedBackend = null
|
||||
cachedBackendType = null
|
||||
}
|
||||
22
apps/sim/lib/core/async-jobs/index.ts
Normal file
22
apps/sim/lib/core/async-jobs/index.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
export {
|
||||
getAsyncBackendType,
|
||||
getCurrentBackendType,
|
||||
getJobQueue,
|
||||
resetJobQueueCache,
|
||||
shouldExecuteInline,
|
||||
} from './config'
|
||||
export type {
|
||||
AsyncBackendType,
|
||||
EnqueueOptions,
|
||||
Job,
|
||||
JobMetadata,
|
||||
JobQueueBackend,
|
||||
JobStatus,
|
||||
JobType,
|
||||
} from './types'
|
||||
export {
|
||||
JOB_MAX_LIFETIME_SECONDS,
|
||||
JOB_RETENTION_HOURS,
|
||||
JOB_RETENTION_SECONDS,
|
||||
JOB_STATUS,
|
||||
} from './types'
|
||||
32
apps/sim/lib/core/async-jobs/types.test.ts
Normal file
32
apps/sim/lib/core/async-jobs/types.test.ts
Normal file
@@ -0,0 +1,32 @@
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { JOB_MAX_LIFETIME_SECONDS, JOB_RETENTION_HOURS, JOB_RETENTION_SECONDS } from './types'
|
||||
|
||||
describe('Job retention constants', () => {
|
||||
it.concurrent('JOB_RETENTION_HOURS should be 24', async () => {
|
||||
expect(JOB_RETENTION_HOURS).toBe(24)
|
||||
})
|
||||
|
||||
it.concurrent('JOB_RETENTION_SECONDS should be derived from JOB_RETENTION_HOURS', async () => {
|
||||
expect(JOB_RETENTION_SECONDS).toBe(JOB_RETENTION_HOURS * 60 * 60)
|
||||
})
|
||||
|
||||
it.concurrent('JOB_RETENTION_SECONDS should equal 86400 (24 hours)', async () => {
|
||||
expect(JOB_RETENTION_SECONDS).toBe(86400)
|
||||
})
|
||||
|
||||
it.concurrent('constants should be consistent with each other', async () => {
|
||||
const hoursToSeconds = JOB_RETENTION_HOURS * 60 * 60
|
||||
expect(JOB_RETENTION_SECONDS).toBe(hoursToSeconds)
|
||||
})
|
||||
|
||||
it.concurrent(
|
||||
'JOB_MAX_LIFETIME_SECONDS should be greater than JOB_RETENTION_SECONDS',
|
||||
async () => {
|
||||
expect(JOB_MAX_LIFETIME_SECONDS).toBeGreaterThan(JOB_RETENTION_SECONDS)
|
||||
expect(JOB_MAX_LIFETIME_SECONDS).toBe(48 * 60 * 60)
|
||||
}
|
||||
)
|
||||
})
|
||||
82
apps/sim/lib/core/async-jobs/types.ts
Normal file
82
apps/sim/lib/core/async-jobs/types.ts
Normal file
@@ -0,0 +1,82 @@
|
||||
/**
|
||||
* Types and constants for the async job queue system
|
||||
*/
|
||||
|
||||
/** Retention period for completed/failed jobs (in hours) */
|
||||
export const JOB_RETENTION_HOURS = 24
|
||||
|
||||
/** Retention period for completed/failed jobs (in seconds, for Redis TTL) */
|
||||
export const JOB_RETENTION_SECONDS = JOB_RETENTION_HOURS * 60 * 60
|
||||
|
||||
/** Max lifetime for jobs in Redis (in seconds) - cleanup for stuck pending/processing jobs */
|
||||
export const JOB_MAX_LIFETIME_SECONDS = 48 * 60 * 60
|
||||
|
||||
export const JOB_STATUS = {
|
||||
PENDING: 'pending',
|
||||
PROCESSING: 'processing',
|
||||
COMPLETED: 'completed',
|
||||
FAILED: 'failed',
|
||||
} as const
|
||||
|
||||
export type JobStatus = (typeof JOB_STATUS)[keyof typeof JOB_STATUS]
|
||||
|
||||
export type JobType = 'workflow-execution' | 'schedule-execution' | 'webhook-execution'
|
||||
|
||||
export interface Job<TPayload = unknown, TOutput = unknown> {
|
||||
id: string
|
||||
type: JobType
|
||||
payload: TPayload
|
||||
status: JobStatus
|
||||
createdAt: Date
|
||||
startedAt?: Date
|
||||
completedAt?: Date
|
||||
attempts: number
|
||||
maxAttempts: number
|
||||
error?: string
|
||||
output?: TOutput
|
||||
metadata: JobMetadata
|
||||
}
|
||||
|
||||
export interface JobMetadata {
|
||||
workflowId?: string
|
||||
userId?: string
|
||||
[key: string]: unknown
|
||||
}
|
||||
|
||||
export interface EnqueueOptions {
|
||||
maxAttempts?: number
|
||||
metadata?: JobMetadata
|
||||
}
|
||||
|
||||
/**
|
||||
* Backend interface for job queue implementations.
|
||||
* All backends must implement this interface.
|
||||
*/
|
||||
export interface JobQueueBackend {
|
||||
/**
|
||||
* Add a job to the queue
|
||||
*/
|
||||
enqueue<TPayload>(type: JobType, payload: TPayload, options?: EnqueueOptions): Promise<string>
|
||||
|
||||
/**
|
||||
* Get a job by ID
|
||||
*/
|
||||
getJob(jobId: string): Promise<Job | null>
|
||||
|
||||
/**
|
||||
* Mark a job as started/processing
|
||||
*/
|
||||
startJob(jobId: string): Promise<void>
|
||||
|
||||
/**
|
||||
* Mark a job as completed with output
|
||||
*/
|
||||
completeJob(jobId: string, output: unknown): Promise<void>
|
||||
|
||||
/**
|
||||
* Mark a job as failed with error message
|
||||
*/
|
||||
markJobFailed(jobId: string, error: string): Promise<void>
|
||||
}
|
||||
|
||||
export type AsyncBackendType = 'trigger-dev' | 'redis' | 'database'
|
||||
@@ -170,6 +170,16 @@ export const env = createEnv({
|
||||
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('600'), // Enterprise tier sync API executions per minute
|
||||
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('5000'), // Enterprise tier async API executions per minute
|
||||
|
||||
// Timeout Configuration
|
||||
EXECUTION_TIMEOUT_FREE: z.string().optional().default('300'), // 5 minutes
|
||||
EXECUTION_TIMEOUT_PRO: z.string().optional().default('3000'), // 50 minutes
|
||||
EXECUTION_TIMEOUT_TEAM: z.string().optional().default('3000'), // 50 minutes
|
||||
EXECUTION_TIMEOUT_ENTERPRISE: z.string().optional().default('3000'), // 50 minutes
|
||||
EXECUTION_TIMEOUT_ASYNC_FREE: z.string().optional().default('5400'), // 90 minutes
|
||||
EXECUTION_TIMEOUT_ASYNC_PRO: z.string().optional().default('5400'), // 90 minutes
|
||||
EXECUTION_TIMEOUT_ASYNC_TEAM: z.string().optional().default('5400'), // 90 minutes
|
||||
EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: z.string().optional().default('5400'), // 90 minutes
|
||||
|
||||
// Knowledge Base Processing Configuration - Shared across all processing methods
|
||||
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
|
||||
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts
|
||||
@@ -340,7 +350,6 @@ export const env = createEnv({
|
||||
NEXT_PUBLIC_BRAND_BACKGROUND_COLOR: z.string().regex(/^#[0-9A-Fa-f]{6}$/).optional(), // Brand background color (hex format)
|
||||
|
||||
// Feature Flags
|
||||
NEXT_PUBLIC_TRIGGER_DEV_ENABLED: z.boolean().optional(), // Client-side gate for async executions UI
|
||||
NEXT_PUBLIC_SSO_ENABLED: z.boolean().optional(), // Enable SSO login UI components
|
||||
NEXT_PUBLIC_CREDENTIAL_SETS_ENABLED: z.boolean().optional(), // Enable credential sets (email polling) on self-hosted
|
||||
NEXT_PUBLIC_ACCESS_CONTROL_ENABLED: z.boolean().optional(), // Enable access control (permission groups) on self-hosted
|
||||
@@ -372,7 +381,6 @@ export const env = createEnv({
|
||||
NEXT_PUBLIC_BRAND_ACCENT_COLOR: process.env.NEXT_PUBLIC_BRAND_ACCENT_COLOR,
|
||||
NEXT_PUBLIC_BRAND_ACCENT_HOVER_COLOR: process.env.NEXT_PUBLIC_BRAND_ACCENT_HOVER_COLOR,
|
||||
NEXT_PUBLIC_BRAND_BACKGROUND_COLOR: process.env.NEXT_PUBLIC_BRAND_BACKGROUND_COLOR,
|
||||
NEXT_PUBLIC_TRIGGER_DEV_ENABLED: process.env.NEXT_PUBLIC_TRIGGER_DEV_ENABLED,
|
||||
NEXT_PUBLIC_SSO_ENABLED: process.env.NEXT_PUBLIC_SSO_ENABLED,
|
||||
NEXT_PUBLIC_CREDENTIAL_SETS_ENABLED: process.env.NEXT_PUBLIC_CREDENTIAL_SETS_ENABLED,
|
||||
NEXT_PUBLIC_ACCESS_CONTROL_ENABLED: process.env.NEXT_PUBLIC_ACCESS_CONTROL_ENABLED,
|
||||
|
||||
1
apps/sim/lib/core/execution-limits/index.ts
Normal file
1
apps/sim/lib/core/execution-limits/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export * from './types'
|
||||
147
apps/sim/lib/core/execution-limits/types.ts
Normal file
147
apps/sim/lib/core/execution-limits/types.ts
Normal file
@@ -0,0 +1,147 @@
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
|
||||
interface ExecutionTimeoutConfig {
|
||||
sync: number
|
||||
async: number
|
||||
}
|
||||
|
||||
const DEFAULT_SYNC_TIMEOUTS_SECONDS = {
|
||||
free: 300,
|
||||
pro: 3000,
|
||||
team: 3000,
|
||||
enterprise: 3000,
|
||||
} as const
|
||||
|
||||
const DEFAULT_ASYNC_TIMEOUTS_SECONDS = {
|
||||
free: 5400,
|
||||
pro: 5400,
|
||||
team: 5400,
|
||||
enterprise: 5400,
|
||||
} as const
|
||||
|
||||
function getSyncTimeoutForPlan(plan: SubscriptionPlan): number {
|
||||
const envVarMap: Record<SubscriptionPlan, string | undefined> = {
|
||||
free: env.EXECUTION_TIMEOUT_FREE,
|
||||
pro: env.EXECUTION_TIMEOUT_PRO,
|
||||
team: env.EXECUTION_TIMEOUT_TEAM,
|
||||
enterprise: env.EXECUTION_TIMEOUT_ENTERPRISE,
|
||||
}
|
||||
return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_SYNC_TIMEOUTS_SECONDS[plan]) * 1000
|
||||
}
|
||||
|
||||
function getAsyncTimeoutForPlan(plan: SubscriptionPlan): number {
|
||||
const envVarMap: Record<SubscriptionPlan, string | undefined> = {
|
||||
free: env.EXECUTION_TIMEOUT_ASYNC_FREE,
|
||||
pro: env.EXECUTION_TIMEOUT_ASYNC_PRO,
|
||||
team: env.EXECUTION_TIMEOUT_ASYNC_TEAM,
|
||||
enterprise: env.EXECUTION_TIMEOUT_ASYNC_ENTERPRISE,
|
||||
}
|
||||
return (Number.parseInt(envVarMap[plan] || '') || DEFAULT_ASYNC_TIMEOUTS_SECONDS[plan]) * 1000
|
||||
}
|
||||
|
||||
const EXECUTION_TIMEOUTS: Record<SubscriptionPlan, ExecutionTimeoutConfig> = {
|
||||
free: {
|
||||
sync: getSyncTimeoutForPlan('free'),
|
||||
async: getAsyncTimeoutForPlan('free'),
|
||||
},
|
||||
pro: {
|
||||
sync: getSyncTimeoutForPlan('pro'),
|
||||
async: getAsyncTimeoutForPlan('pro'),
|
||||
},
|
||||
team: {
|
||||
sync: getSyncTimeoutForPlan('team'),
|
||||
async: getAsyncTimeoutForPlan('team'),
|
||||
},
|
||||
enterprise: {
|
||||
sync: getSyncTimeoutForPlan('enterprise'),
|
||||
async: getAsyncTimeoutForPlan('enterprise'),
|
||||
},
|
||||
}
|
||||
|
||||
export function getExecutionTimeout(
|
||||
plan: SubscriptionPlan | undefined,
|
||||
type: 'sync' | 'async' = 'sync'
|
||||
): number {
|
||||
if (!isBillingEnabled) {
|
||||
return EXECUTION_TIMEOUTS.enterprise[type]
|
||||
}
|
||||
return EXECUTION_TIMEOUTS[plan || 'free'][type]
|
||||
}
|
||||
|
||||
export function getMaxExecutionTimeout(): number {
|
||||
return EXECUTION_TIMEOUTS.enterprise.async
|
||||
}
|
||||
|
||||
export const DEFAULT_EXECUTION_TIMEOUT_MS = isBillingEnabled
|
||||
? EXECUTION_TIMEOUTS.free.sync
|
||||
: EXECUTION_TIMEOUTS.enterprise.sync
|
||||
|
||||
export function isTimeoutError(error: unknown): boolean {
|
||||
if (!error) return false
|
||||
|
||||
if (error instanceof Error) {
|
||||
return error.name === 'TimeoutError'
|
||||
}
|
||||
|
||||
if (typeof error === 'object' && 'name' in error) {
|
||||
return (error as { name: string }).name === 'TimeoutError'
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
export function getTimeoutErrorMessage(error: unknown, timeoutMs?: number): string {
|
||||
if (timeoutMs) {
|
||||
const timeoutSeconds = Math.floor(timeoutMs / 1000)
|
||||
const timeoutMinutes = Math.floor(timeoutSeconds / 60)
|
||||
const displayTime =
|
||||
timeoutMinutes > 0
|
||||
? `${timeoutMinutes} minute${timeoutMinutes > 1 ? 's' : ''}`
|
||||
: `${timeoutSeconds} seconds`
|
||||
return `Execution timed out after ${displayTime}`
|
||||
}
|
||||
|
||||
return 'Execution timed out'
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to create an AbortController with timeout handling.
|
||||
* Centralizes the timeout abort pattern used across execution paths.
|
||||
*/
|
||||
export interface TimeoutAbortController {
|
||||
/** The AbortSignal to pass to execution functions */
|
||||
signal: AbortSignal
|
||||
/** Returns true if the abort was triggered by timeout (not user cancellation) */
|
||||
isTimedOut: () => boolean
|
||||
/** Cleanup function - call in finally block to clear the timeout */
|
||||
cleanup: () => void
|
||||
/** Manually abort the execution (for user cancellation) */
|
||||
abort: () => void
|
||||
/** The timeout duration in milliseconds (undefined if no timeout) */
|
||||
timeoutMs: number | undefined
|
||||
}
|
||||
|
||||
export function createTimeoutAbortController(timeoutMs?: number): TimeoutAbortController {
|
||||
const abortController = new AbortController()
|
||||
let isTimedOut = false
|
||||
let timeoutId: NodeJS.Timeout | undefined
|
||||
|
||||
if (timeoutMs) {
|
||||
timeoutId = setTimeout(() => {
|
||||
isTimedOut = true
|
||||
abortController.abort()
|
||||
}, timeoutMs)
|
||||
}
|
||||
|
||||
return {
|
||||
signal: abortController.signal,
|
||||
isTimedOut: () => isTimedOut,
|
||||
cleanup: () => {
|
||||
if (timeoutId) clearTimeout(timeoutId)
|
||||
},
|
||||
abort: () => abortController.abort(),
|
||||
timeoutMs,
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import { idempotencyKey } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { getRedisClient } from '@/lib/core/config/redis'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { getStorageMethod, type StorageMethod } from '@/lib/core/storage'
|
||||
import { extractProviderIdentifierFromBody } from '@/lib/webhooks/provider-utils'
|
||||
|
||||
@@ -36,9 +37,9 @@ export interface AtomicClaimResult {
|
||||
storageMethod: StorageMethod
|
||||
}
|
||||
|
||||
const DEFAULT_TTL = 60 * 60 * 24 * 7 // 7 days
|
||||
const DEFAULT_TTL = 60 * 60 * 24 * 7
|
||||
const REDIS_KEY_PREFIX = 'idempotency:'
|
||||
const MAX_WAIT_TIME_MS = 300000 // 5 minutes max wait
|
||||
const MAX_WAIT_TIME_MS = getMaxExecutionTimeout()
|
||||
const POLL_INTERVAL_MS = 1000
|
||||
|
||||
/**
|
||||
|
||||
@@ -50,3 +50,13 @@ export function getInvalidCharacters(name: string): string[] {
|
||||
const invalidChars = name.match(/[^a-zA-Z0-9_\s]/g)
|
||||
return invalidChars ? [...new Set(invalidChars)] : []
|
||||
}
|
||||
|
||||
/**
|
||||
* Escapes non-ASCII characters in JSON string for HTTP header safety.
|
||||
* Dropbox API requires characters 0x7F and all non-ASCII to be escaped as \uXXXX.
|
||||
*/
|
||||
export function httpHeaderSafeJson(value: object): string {
|
||||
return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => {
|
||||
return `\\u${(`0000${c.charCodeAt(0).toString(16)}`).slice(-4)}`
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,7 +1,3 @@
|
||||
/**
|
||||
* Execution timeout constants
|
||||
*
|
||||
* DEFAULT_EXECUTION_TIMEOUT_MS: The default timeout for executing user code (10 minutes)
|
||||
*/
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
|
||||
export const DEFAULT_EXECUTION_TIMEOUT_MS = 600000 // 10 minutes (600 seconds)
|
||||
export { DEFAULT_EXECUTION_TIMEOUT_MS }
|
||||
|
||||
@@ -4,7 +4,9 @@ import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor'
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
|
||||
import { getExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
|
||||
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
|
||||
import type { CoreTriggerType } from '@/stores/logs/filters/types'
|
||||
@@ -133,10 +135,10 @@ export interface PreprocessExecutionResult {
|
||||
success: boolean
|
||||
error?: {
|
||||
message: string
|
||||
statusCode: number // HTTP status code (401, 402, 403, 404, 429, 500)
|
||||
logCreated: boolean // Whether error was logged to execution_logs
|
||||
statusCode: number
|
||||
logCreated: boolean
|
||||
}
|
||||
actorUserId?: string // The user ID that will be billed
|
||||
actorUserId?: string
|
||||
workflowRecord?: WorkflowRecord
|
||||
userSubscription?: SubscriptionInfo | null
|
||||
rateLimitInfo?: {
|
||||
@@ -144,6 +146,10 @@ export interface PreprocessExecutionResult {
|
||||
remaining: number
|
||||
resetAt: Date
|
||||
}
|
||||
executionTimeout?: {
|
||||
sync: number
|
||||
async: number
|
||||
}
|
||||
}
|
||||
|
||||
type WorkflowRecord = typeof workflow.$inferSelect
|
||||
@@ -484,12 +490,17 @@ export async function preprocessExecution(
|
||||
triggerType,
|
||||
})
|
||||
|
||||
const plan = userSubscription?.plan as SubscriptionPlan | undefined
|
||||
return {
|
||||
success: true,
|
||||
actorUserId,
|
||||
workflowRecord,
|
||||
userSubscription,
|
||||
rateLimitInfo,
|
||||
executionTimeout: {
|
||||
sync: getExecutionTimeout(plan, 'sync'),
|
||||
async: getExecutionTimeout(plan, 'async'),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -261,10 +261,14 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
models: costSummary.models,
|
||||
}
|
||||
|
||||
const totalDuration =
|
||||
const rawDurationMs =
|
||||
isResume && existingLog?.startedAt
|
||||
? new Date(endedAt).getTime() - new Date(existingLog.startedAt).getTime()
|
||||
: totalDurationMs
|
||||
const totalDuration =
|
||||
typeof rawDurationMs === 'number' && Number.isFinite(rawDurationMs)
|
||||
? Math.max(0, Math.round(rawDurationMs))
|
||||
: 0
|
||||
|
||||
const [updatedLog] = await db
|
||||
.update(workflowExecutionLogs)
|
||||
|
||||
@@ -776,11 +776,16 @@ export class LoggingSession {
|
||||
await db
|
||||
.update(workflowExecutionLogs)
|
||||
.set({
|
||||
level: 'error',
|
||||
status: 'failed',
|
||||
executionData: sql`jsonb_set(
|
||||
COALESCE(execution_data, '{}'::jsonb),
|
||||
ARRAY['error'],
|
||||
to_jsonb(${message}::text)
|
||||
jsonb_set(
|
||||
COALESCE(execution_data, '{}'::jsonb),
|
||||
ARRAY['error'],
|
||||
to_jsonb(${message}::text)
|
||||
),
|
||||
ARRAY['finalOutput'],
|
||||
jsonb_build_object('error', ${message}::text)
|
||||
)`,
|
||||
})
|
||||
.where(eq(workflowExecutionLogs.executionId, executionId))
|
||||
|
||||
@@ -12,6 +12,7 @@ import { Client } from '@modelcontextprotocol/sdk/client/index.js'
|
||||
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'
|
||||
import type { ListToolsResult, Tool } from '@modelcontextprotocol/sdk/types.js'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
McpConnectionError,
|
||||
type McpConnectionStatus,
|
||||
@@ -155,7 +156,7 @@ export class McpClient {
|
||||
return result.tools.map((tool: Tool) => ({
|
||||
name: tool.name,
|
||||
description: tool.description,
|
||||
inputSchema: tool.inputSchema,
|
||||
inputSchema: tool.inputSchema as McpTool['inputSchema'],
|
||||
serverId: this.config.id,
|
||||
serverName: this.config.name,
|
||||
}))
|
||||
@@ -202,7 +203,7 @@ export class McpClient {
|
||||
const sdkResult = await this.client.callTool(
|
||||
{ name: toolCall.name, arguments: toolCall.arguments },
|
||||
undefined,
|
||||
{ timeout: 600000 } // 10 minutes - override SDK's 60s default
|
||||
{ timeout: getMaxExecutionTimeout() }
|
||||
)
|
||||
|
||||
return sdkResult as McpToolResult
|
||||
|
||||
@@ -32,9 +32,11 @@ export function sanitizeHeaders(
|
||||
|
||||
/**
|
||||
* Client-safe MCP constants
|
||||
* Note: CLIENT_TIMEOUT should match DEFAULT_EXECUTION_TIMEOUT_MS from @/lib/core/execution-limits
|
||||
* (5 minutes = 300 seconds for free tier). Keep in sync if that value changes.
|
||||
*/
|
||||
export const MCP_CLIENT_CONSTANTS = {
|
||||
CLIENT_TIMEOUT: 600000,
|
||||
CLIENT_TIMEOUT: 5 * 60 * 1000, // 5 minutes - matches DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
MAX_RETRIES: 3,
|
||||
RECONNECT_DELAY: 1000,
|
||||
} as const
|
||||
|
||||
@@ -57,14 +57,29 @@ export interface McpSecurityPolicy {
|
||||
auditLevel: 'none' | 'basic' | 'detailed'
|
||||
}
|
||||
|
||||
/**
|
||||
* JSON Schema property definition for tool parameters.
|
||||
* Follows JSON Schema specification with description support.
|
||||
*/
|
||||
export interface McpToolSchemaProperty {
|
||||
type: string
|
||||
description?: string
|
||||
items?: McpToolSchemaProperty
|
||||
properties?: Record<string, McpToolSchemaProperty>
|
||||
required?: string[]
|
||||
enum?: Array<string | number | boolean>
|
||||
default?: unknown
|
||||
}
|
||||
|
||||
/**
|
||||
* JSON Schema for tool input parameters.
|
||||
* Aligns with MCP SDK's Tool.inputSchema structure.
|
||||
*/
|
||||
export interface McpToolSchema {
|
||||
type: 'object'
|
||||
properties?: Record<string, unknown>
|
||||
properties?: Record<string, McpToolSchemaProperty>
|
||||
required?: string[]
|
||||
description?: string
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
categorizeError,
|
||||
createMcpToolId,
|
||||
@@ -81,8 +82,8 @@ describe('generateMcpServerId', () => {
|
||||
})
|
||||
|
||||
describe('MCP_CONSTANTS', () => {
|
||||
it.concurrent('has correct execution timeout (10 minutes)', () => {
|
||||
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(600000)
|
||||
it.concurrent('has correct execution timeout', () => {
|
||||
expect(MCP_CONSTANTS.EXECUTION_TIMEOUT).toBe(DEFAULT_EXECUTION_TIMEOUT_MS)
|
||||
})
|
||||
|
||||
it.concurrent('has correct cache timeout (5 minutes)', () => {
|
||||
@@ -107,8 +108,8 @@ describe('MCP_CONSTANTS', () => {
|
||||
})
|
||||
|
||||
describe('MCP_CLIENT_CONSTANTS', () => {
|
||||
it.concurrent('has correct client timeout (10 minutes)', () => {
|
||||
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(600000)
|
||||
it.concurrent('has correct client timeout', () => {
|
||||
expect(MCP_CLIENT_CONSTANTS.CLIENT_TIMEOUT).toBe(DEFAULT_EXECUTION_TIMEOUT_MS)
|
||||
})
|
||||
|
||||
it.concurrent('has correct auto refresh interval (5 minutes)', () => {
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
import { NextResponse } from 'next/server'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { McpApiResponse } from '@/lib/mcp/types'
|
||||
import { isMcpTool, MCP } from '@/executor/constants'
|
||||
|
||||
/**
|
||||
* MCP-specific constants
|
||||
*/
|
||||
export const MCP_CONSTANTS = {
|
||||
EXECUTION_TIMEOUT: 600000,
|
||||
EXECUTION_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
|
||||
CACHE_TIMEOUT: 5 * 60 * 1000,
|
||||
DEFAULT_RETRIES: 3,
|
||||
DEFAULT_CONNECTION_TIMEOUT: 30000,
|
||||
@@ -45,11 +43,8 @@ export function sanitizeHeaders(
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Client-safe MCP constants
|
||||
*/
|
||||
export const MCP_CLIENT_CONSTANTS = {
|
||||
CLIENT_TIMEOUT: 600000,
|
||||
CLIENT_TIMEOUT: DEFAULT_EXECUTION_TIMEOUT_MS,
|
||||
AUTO_REFRESH_INTERVAL: 5 * 60 * 1000,
|
||||
} as const
|
||||
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import { db, webhook, workflow, workflowDeploymentVersion } from '@sim/db'
|
||||
import { credentialSet, subscription } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { tasks } from '@trigger.dev/sdk'
|
||||
import { and, eq, isNull, or } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { checkEnterprisePlan, checkTeamPlan } from '@/lib/billing/subscriptions/utils'
|
||||
import { isProd, isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
|
||||
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
|
||||
import { isProd } from '@/lib/core/config/feature-flags'
|
||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { convertSquareBracketsToTwiML } from '@/lib/webhooks/utils'
|
||||
@@ -1015,18 +1015,39 @@ export async function queueWebhookExecution(
|
||||
...(credentialId ? { credentialId } : {}),
|
||||
}
|
||||
|
||||
if (isTriggerDevEnabled) {
|
||||
const handle = await tasks.trigger('webhook-execution', payload)
|
||||
logger.info(
|
||||
`[${options.requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook`
|
||||
)
|
||||
} else {
|
||||
void executeWebhookJob(payload).catch((error) => {
|
||||
logger.error(`[${options.requestId}] Direct webhook execution failed`, error)
|
||||
})
|
||||
logger.info(
|
||||
`[${options.requestId}] Queued direct webhook execution for ${foundWebhook.provider} webhook (Trigger.dev disabled)`
|
||||
)
|
||||
const jobQueue = await getJobQueue()
|
||||
const jobId = await jobQueue.enqueue('webhook-execution', payload, {
|
||||
metadata: { workflowId: foundWorkflow.id, userId: foundWorkflow.userId },
|
||||
})
|
||||
logger.info(
|
||||
`[${options.requestId}] Queued webhook execution task ${jobId} for ${foundWebhook.provider} webhook`
|
||||
)
|
||||
|
||||
if (shouldExecuteInline()) {
|
||||
void (async () => {
|
||||
try {
|
||||
await jobQueue.startJob(jobId)
|
||||
const output = await executeWebhookJob(payload)
|
||||
await jobQueue.completeJob(jobId, output)
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||
logger.error(`[${options.requestId}] Webhook execution failed`, {
|
||||
jobId,
|
||||
error: errorMessage,
|
||||
})
|
||||
try {
|
||||
await jobQueue.markJobFailed(jobId, errorMessage)
|
||||
} catch (markFailedError) {
|
||||
logger.error(`[${options.requestId}] Failed to mark job as failed`, {
|
||||
jobId,
|
||||
error:
|
||||
markFailedError instanceof Error
|
||||
? markFailedError.message
|
||||
: String(markFailedError),
|
||||
})
|
||||
}
|
||||
}
|
||||
})()
|
||||
}
|
||||
|
||||
if (foundWebhook.provider === 'microsoft-teams') {
|
||||
|
||||
@@ -19,6 +19,7 @@ export interface ExecuteWorkflowOptions {
|
||||
skipLoggingComplete?: boolean
|
||||
includeFileBase64?: boolean
|
||||
base64MaxBytes?: number
|
||||
abortSignal?: AbortSignal
|
||||
}
|
||||
|
||||
export interface WorkflowInfo {
|
||||
@@ -82,6 +83,7 @@ export async function executeWorkflow(
|
||||
loggingSession,
|
||||
includeFileBase64: streamConfig?.includeFileBase64,
|
||||
base64MaxBytes: streamConfig?.base64MaxBytes,
|
||||
abortSignal: streamConfig?.abortSignal,
|
||||
})
|
||||
|
||||
if (result.status === 'paused') {
|
||||
|
||||
@@ -58,9 +58,6 @@ export interface ExecutionErrorEvent extends BaseExecutionEvent {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execution cancelled event
|
||||
*/
|
||||
export interface ExecutionCancelledEvent extends BaseExecutionEvent {
|
||||
type: 'execution:cancelled'
|
||||
workflowId: string
|
||||
@@ -167,9 +164,6 @@ export type ExecutionEvent =
|
||||
| StreamChunkEvent
|
||||
| StreamDoneEvent
|
||||
|
||||
/**
|
||||
* Extracted data types for use in callbacks
|
||||
*/
|
||||
export type ExecutionStartedData = ExecutionStartedEvent['data']
|
||||
export type ExecutionCompletedData = ExecutionCompletedEvent['data']
|
||||
export type ExecutionErrorData = ExecutionErrorEvent['data']
|
||||
|
||||
@@ -4,6 +4,7 @@ import { pausedExecutions, resumeQueue, workflowExecutionLogs } from '@sim/db/sc
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, asc, desc, eq, inArray, lt, type SQL, sql } from 'drizzle-orm'
|
||||
import type { Edge } from 'reactflow'
|
||||
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||
@@ -771,14 +772,39 @@ export class PauseResumeManager {
|
||||
actorUserId: metadata.userId,
|
||||
})
|
||||
|
||||
return await executeWorkflowCore({
|
||||
snapshot: resumeSnapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
skipLogCreation: true, // Reuse existing log entry
|
||||
includeFileBase64: true, // Enable base64 hydration
|
||||
base64MaxBytes: undefined, // Use default limit
|
||||
})
|
||||
const timeoutController = createTimeoutAbortController(
|
||||
preprocessingResult.executionTimeout?.async
|
||||
)
|
||||
|
||||
let result: ExecutionResult
|
||||
try {
|
||||
result = await executeWorkflowCore({
|
||||
snapshot: resumeSnapshot,
|
||||
callbacks: {},
|
||||
loggingSession,
|
||||
skipLogCreation: true, // Reuse existing log entry
|
||||
includeFileBase64: true, // Enable base64 hydration
|
||||
base64MaxBytes: undefined, // Use default limit
|
||||
abortSignal: timeoutController.signal,
|
||||
})
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
|
||||
if (
|
||||
result.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info('Resume execution timed out', {
|
||||
resumeExecutionId,
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
await loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
private static async markResumeCompleted(args: {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
extractBlockIdFromOutputId,
|
||||
extractPathFromOutputId,
|
||||
@@ -32,6 +33,7 @@ export interface StreamingConfig {
|
||||
workflowTriggerType?: 'api' | 'chat'
|
||||
includeFileBase64?: boolean
|
||||
base64MaxBytes?: number
|
||||
timeoutMs?: number
|
||||
}
|
||||
|
||||
export interface StreamingResponseOptions {
|
||||
@@ -169,6 +171,7 @@ export async function createStreamingResponse(
|
||||
options: StreamingResponseOptions
|
||||
): Promise<ReadableStream> {
|
||||
const { requestId, workflow, input, executingUserId, streamConfig, executionId } = options
|
||||
const timeoutController = createTimeoutAbortController(streamConfig.timeoutMs)
|
||||
|
||||
return new ReadableStream({
|
||||
async start(controller) {
|
||||
@@ -284,6 +287,7 @@ export async function createStreamingResponse(
|
||||
skipLoggingComplete: true,
|
||||
includeFileBase64: streamConfig.includeFileBase64,
|
||||
base64MaxBytes: streamConfig.base64MaxBytes,
|
||||
abortSignal: timeoutController.signal,
|
||||
},
|
||||
executionId
|
||||
)
|
||||
@@ -293,18 +297,34 @@ export async function createStreamingResponse(
|
||||
processStreamingBlockLogs(result.logs, state.streamedContent)
|
||||
}
|
||||
|
||||
await completeLoggingSession(result)
|
||||
if (
|
||||
result.status === 'cancelled' &&
|
||||
timeoutController.isTimedOut() &&
|
||||
timeoutController.timeoutMs
|
||||
) {
|
||||
const timeoutErrorMessage = getTimeoutErrorMessage(null, timeoutController.timeoutMs)
|
||||
logger.info(`[${requestId}] Streaming execution timed out`, {
|
||||
timeoutMs: timeoutController.timeoutMs,
|
||||
})
|
||||
if (result._streamingMetadata?.loggingSession) {
|
||||
await result._streamingMetadata.loggingSession.markAsFailed(timeoutErrorMessage)
|
||||
}
|
||||
controller.enqueue(encodeSSE({ event: 'error', error: timeoutErrorMessage }))
|
||||
} else {
|
||||
await completeLoggingSession(result)
|
||||
|
||||
const minimalResult = await buildMinimalResult(
|
||||
result,
|
||||
streamConfig.selectedOutputs,
|
||||
state.streamedContent,
|
||||
requestId,
|
||||
streamConfig.includeFileBase64 ?? true,
|
||||
streamConfig.base64MaxBytes
|
||||
)
|
||||
const minimalResult = await buildMinimalResult(
|
||||
result,
|
||||
streamConfig.selectedOutputs,
|
||||
state.streamedContent,
|
||||
requestId,
|
||||
streamConfig.includeFileBase64 ?? true,
|
||||
streamConfig.base64MaxBytes
|
||||
)
|
||||
|
||||
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
|
||||
}
|
||||
|
||||
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
|
||||
controller.enqueue(encodeSSE('[DONE]'))
|
||||
|
||||
if (executionId) {
|
||||
@@ -323,6 +343,20 @@ export async function createStreamingResponse(
|
||||
}
|
||||
|
||||
controller.close()
|
||||
} finally {
|
||||
timeoutController.cleanup()
|
||||
}
|
||||
},
|
||||
async cancel(reason) {
|
||||
logger.info(`[${requestId}] Streaming response cancelled`, { reason })
|
||||
timeoutController.abort()
|
||||
timeoutController.cleanup()
|
||||
if (executionId) {
|
||||
try {
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error })
|
||||
}
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
@@ -6,7 +6,6 @@ import { MAX_TOOL_ITERATIONS } from '@/providers'
|
||||
import {
|
||||
checkForForcedToolUsage,
|
||||
createReadableStreamFromAnthropicStream,
|
||||
generateToolUseId,
|
||||
} from '@/providers/anthropic/utils'
|
||||
import {
|
||||
getMaxOutputTokensForModel,
|
||||
@@ -433,11 +432,32 @@ export const anthropicProvider: ProviderConfig = {
|
||||
|
||||
const executionResults = await Promise.allSettled(toolExecutionPromises)
|
||||
|
||||
// Collect all tool_use and tool_result blocks for batching
|
||||
const toolUseBlocks: Array<{
|
||||
type: 'tool_use'
|
||||
id: string
|
||||
name: string
|
||||
input: Record<string, unknown>
|
||||
}> = []
|
||||
const toolResultBlocks: Array<{
|
||||
type: 'tool_result'
|
||||
tool_use_id: string
|
||||
content: string
|
||||
}> = []
|
||||
|
||||
for (const settledResult of executionResults) {
|
||||
if (settledResult.status === 'rejected' || !settledResult.value) continue
|
||||
|
||||
const { toolName, toolArgs, toolParams, result, startTime, endTime, duration } =
|
||||
settledResult.value
|
||||
const {
|
||||
toolUse,
|
||||
toolName,
|
||||
toolArgs,
|
||||
toolParams,
|
||||
result,
|
||||
startTime,
|
||||
endTime,
|
||||
duration,
|
||||
} = settledResult.value
|
||||
|
||||
timeSegments.push({
|
||||
type: 'tool',
|
||||
@@ -447,7 +467,7 @@ export const anthropicProvider: ProviderConfig = {
|
||||
duration: duration,
|
||||
})
|
||||
|
||||
let resultContent: any
|
||||
let resultContent: unknown
|
||||
if (result.success) {
|
||||
toolResults.push(result.output)
|
||||
resultContent = result.output
|
||||
@@ -469,29 +489,34 @@ export const anthropicProvider: ProviderConfig = {
|
||||
success: result.success,
|
||||
})
|
||||
|
||||
const toolUseId = generateToolUseId(toolName)
|
||||
|
||||
currentMessages.push({
|
||||
role: 'assistant',
|
||||
content: [
|
||||
{
|
||||
type: 'tool_use',
|
||||
id: toolUseId,
|
||||
name: toolName,
|
||||
input: toolArgs,
|
||||
} as any,
|
||||
],
|
||||
// Add to batched arrays using the ORIGINAL ID from Claude's response
|
||||
toolUseBlocks.push({
|
||||
type: 'tool_use',
|
||||
id: toolUse.id,
|
||||
name: toolName,
|
||||
input: toolArgs,
|
||||
})
|
||||
|
||||
toolResultBlocks.push({
|
||||
type: 'tool_result',
|
||||
tool_use_id: toolUse.id,
|
||||
content: JSON.stringify(resultContent),
|
||||
})
|
||||
}
|
||||
|
||||
// Add ONE assistant message with ALL tool_use blocks
|
||||
if (toolUseBlocks.length > 0) {
|
||||
currentMessages.push({
|
||||
role: 'assistant',
|
||||
content: toolUseBlocks as unknown as Anthropic.Messages.ContentBlock[],
|
||||
})
|
||||
}
|
||||
|
||||
// Add ONE user message with ALL tool_result blocks
|
||||
if (toolResultBlocks.length > 0) {
|
||||
currentMessages.push({
|
||||
role: 'user',
|
||||
content: [
|
||||
{
|
||||
type: 'tool_result',
|
||||
tool_use_id: toolUseId,
|
||||
content: JSON.stringify(resultContent),
|
||||
} as any,
|
||||
],
|
||||
content: toolResultBlocks as unknown as Anthropic.Messages.ContentBlockParam[],
|
||||
})
|
||||
}
|
||||
|
||||
@@ -777,6 +802,8 @@ export const anthropicProvider: ProviderConfig = {
|
||||
const toolCallStartTime = Date.now()
|
||||
const toolName = toolUse.name
|
||||
const toolArgs = toolUse.input as Record<string, any>
|
||||
// Preserve the original tool_use ID from Claude's response
|
||||
const toolUseId = toolUse.id
|
||||
|
||||
try {
|
||||
const tool = request.tools?.find((t) => t.id === toolName)
|
||||
@@ -787,6 +814,7 @@ export const anthropicProvider: ProviderConfig = {
|
||||
const toolCallEndTime = Date.now()
|
||||
|
||||
return {
|
||||
toolUseId,
|
||||
toolName,
|
||||
toolArgs,
|
||||
toolParams,
|
||||
@@ -800,6 +828,7 @@ export const anthropicProvider: ProviderConfig = {
|
||||
logger.error('Error processing tool call:', { error, toolName })
|
||||
|
||||
return {
|
||||
toolUseId,
|
||||
toolName,
|
||||
toolArgs,
|
||||
toolParams: {},
|
||||
@@ -817,11 +846,32 @@ export const anthropicProvider: ProviderConfig = {
|
||||
|
||||
const executionResults = await Promise.allSettled(toolExecutionPromises)
|
||||
|
||||
// Collect all tool_use and tool_result blocks for batching
|
||||
const toolUseBlocks: Array<{
|
||||
type: 'tool_use'
|
||||
id: string
|
||||
name: string
|
||||
input: Record<string, unknown>
|
||||
}> = []
|
||||
const toolResultBlocks: Array<{
|
||||
type: 'tool_result'
|
||||
tool_use_id: string
|
||||
content: string
|
||||
}> = []
|
||||
|
||||
for (const settledResult of executionResults) {
|
||||
if (settledResult.status === 'rejected' || !settledResult.value) continue
|
||||
|
||||
const { toolName, toolArgs, toolParams, result, startTime, endTime, duration } =
|
||||
settledResult.value
|
||||
const {
|
||||
toolUseId,
|
||||
toolName,
|
||||
toolArgs,
|
||||
toolParams,
|
||||
result,
|
||||
startTime,
|
||||
endTime,
|
||||
duration,
|
||||
} = settledResult.value
|
||||
|
||||
timeSegments.push({
|
||||
type: 'tool',
|
||||
@@ -831,7 +881,7 @@ export const anthropicProvider: ProviderConfig = {
|
||||
duration: duration,
|
||||
})
|
||||
|
||||
let resultContent: any
|
||||
let resultContent: unknown
|
||||
if (result.success) {
|
||||
toolResults.push(result.output)
|
||||
resultContent = result.output
|
||||
@@ -853,29 +903,34 @@ export const anthropicProvider: ProviderConfig = {
|
||||
success: result.success,
|
||||
})
|
||||
|
||||
const toolUseId = generateToolUseId(toolName)
|
||||
|
||||
currentMessages.push({
|
||||
role: 'assistant',
|
||||
content: [
|
||||
{
|
||||
type: 'tool_use',
|
||||
id: toolUseId,
|
||||
name: toolName,
|
||||
input: toolArgs,
|
||||
} as any,
|
||||
],
|
||||
// Add to batched arrays using the ORIGINAL ID from Claude's response
|
||||
toolUseBlocks.push({
|
||||
type: 'tool_use',
|
||||
id: toolUseId,
|
||||
name: toolName,
|
||||
input: toolArgs,
|
||||
})
|
||||
|
||||
toolResultBlocks.push({
|
||||
type: 'tool_result',
|
||||
tool_use_id: toolUseId,
|
||||
content: JSON.stringify(resultContent),
|
||||
})
|
||||
}
|
||||
|
||||
// Add ONE assistant message with ALL tool_use blocks
|
||||
if (toolUseBlocks.length > 0) {
|
||||
currentMessages.push({
|
||||
role: 'assistant',
|
||||
content: toolUseBlocks as unknown as Anthropic.Messages.ContentBlock[],
|
||||
})
|
||||
}
|
||||
|
||||
// Add ONE user message with ALL tool_result blocks
|
||||
if (toolResultBlocks.length > 0) {
|
||||
currentMessages.push({
|
||||
role: 'user',
|
||||
content: [
|
||||
{
|
||||
type: 'tool_result',
|
||||
tool_use_id: toolUseId,
|
||||
content: JSON.stringify(resultContent),
|
||||
} as any,
|
||||
],
|
||||
content: toolResultBlocks as unknown as Anthropic.Messages.ContentBlockParam[],
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1061,7 +1116,7 @@ export const anthropicProvider: ProviderConfig = {
|
||||
startTime: tc.startTime,
|
||||
endTime: tc.endTime,
|
||||
duration: tc.duration,
|
||||
result: tc.result,
|
||||
result: tc.result as Record<string, unknown> | undefined,
|
||||
}))
|
||||
: undefined,
|
||||
toolResults: toolResults.length > 0 ? toolResults : undefined,
|
||||
|
||||
@@ -67,8 +67,17 @@ export function checkForForcedToolUsage(
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a unique tool use ID for Bedrock.
|
||||
* AWS Bedrock requires toolUseId to be 1-64 characters, pattern [a-zA-Z0-9_-]+
|
||||
*/
|
||||
export function generateToolUseId(toolName: string): string {
|
||||
return `${toolName}-${Date.now()}-${Math.random().toString(36).substring(2, 7)}`
|
||||
const timestamp = Date.now().toString(36) // Base36 timestamp (9 chars)
|
||||
const random = Math.random().toString(36).substring(2, 7) // 5 random chars
|
||||
const suffix = `-${timestamp}-${random}` // ~15 chars
|
||||
const maxNameLength = 64 - suffix.length
|
||||
const truncatedName = toolName.substring(0, maxNameLength).replace(/[^a-zA-Z0-9_-]/g, '_')
|
||||
return `${truncatedName}${suffix}`
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -76,7 +76,7 @@ export const deepseekProvider: ProviderConfig = {
|
||||
: undefined
|
||||
|
||||
const payload: any = {
|
||||
model: 'deepseek-chat',
|
||||
model: request.model,
|
||||
messages: allMessages,
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ import {
|
||||
convertUsageMetadata,
|
||||
createReadableStreamFromGeminiStream,
|
||||
ensureStructResponse,
|
||||
extractFunctionCallPart,
|
||||
extractAllFunctionCallParts,
|
||||
extractTextContent,
|
||||
mapToThinkingLevel,
|
||||
} from '@/providers/google/utils'
|
||||
@@ -32,7 +32,7 @@ import {
|
||||
prepareToolsWithUsageControl,
|
||||
} from '@/providers/utils'
|
||||
import { executeTool } from '@/tools'
|
||||
import type { ExecutionState, GeminiProviderType, GeminiUsage, ParsedFunctionCall } from './types'
|
||||
import type { ExecutionState, GeminiProviderType, GeminiUsage } from './types'
|
||||
|
||||
/**
|
||||
* Creates initial execution state
|
||||
@@ -79,101 +79,168 @@ function createInitialState(
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a tool call and updates state
|
||||
* Executes multiple tool calls in parallel and updates state.
|
||||
* Per Gemini docs, all function calls from a single response should be executed
|
||||
* together, with one model message containing all function calls and one user
|
||||
* message containing all function responses.
|
||||
*/
|
||||
async function executeToolCall(
|
||||
functionCallPart: Part,
|
||||
functionCall: ParsedFunctionCall,
|
||||
async function executeToolCallsBatch(
|
||||
functionCallParts: Part[],
|
||||
request: ProviderRequest,
|
||||
state: ExecutionState,
|
||||
forcedTools: string[],
|
||||
logger: ReturnType<typeof createLogger>
|
||||
): Promise<{ success: boolean; state: ExecutionState }> {
|
||||
const toolCallStartTime = Date.now()
|
||||
const toolName = functionCall.name
|
||||
|
||||
const tool = request.tools?.find((t) => t.id === toolName)
|
||||
if (!tool) {
|
||||
logger.warn(`Tool ${toolName} not found in registry, skipping`)
|
||||
if (functionCallParts.length === 0) {
|
||||
return { success: false, state }
|
||||
}
|
||||
|
||||
try {
|
||||
const { toolParams, executionParams } = prepareToolExecution(tool, functionCall.args, request)
|
||||
const result = await executeTool(toolName, executionParams)
|
||||
const toolCallEndTime = Date.now()
|
||||
const duration = toolCallEndTime - toolCallStartTime
|
||||
const executionPromises = functionCallParts.map(async (part) => {
|
||||
const toolCallStartTime = Date.now()
|
||||
const functionCall = part.functionCall!
|
||||
const toolName = functionCall.name ?? ''
|
||||
const args = (functionCall.args ?? {}) as Record<string, unknown>
|
||||
|
||||
const resultContent: Record<string, unknown> = result.success
|
||||
? ensureStructResponse(result.output)
|
||||
: { error: true, message: result.error || 'Tool execution failed', tool: toolName }
|
||||
|
||||
const toolCall: FunctionCallResponse = {
|
||||
name: toolName,
|
||||
arguments: toolParams,
|
||||
startTime: new Date(toolCallStartTime).toISOString(),
|
||||
endTime: new Date(toolCallEndTime).toISOString(),
|
||||
duration,
|
||||
result: resultContent,
|
||||
const tool = request.tools?.find((t) => t.id === toolName)
|
||||
if (!tool) {
|
||||
logger.warn(`Tool ${toolName} not found in registry, skipping`)
|
||||
return {
|
||||
success: false,
|
||||
part,
|
||||
toolName,
|
||||
args,
|
||||
resultContent: { error: true, message: `Tool ${toolName} not found`, tool: toolName },
|
||||
toolParams: {},
|
||||
startTime: toolCallStartTime,
|
||||
endTime: Date.now(),
|
||||
duration: Date.now() - toolCallStartTime,
|
||||
}
|
||||
}
|
||||
|
||||
const updatedContents: Content[] = [
|
||||
...state.contents,
|
||||
{
|
||||
role: 'model',
|
||||
parts: [functionCallPart],
|
||||
},
|
||||
{
|
||||
role: 'user',
|
||||
parts: [
|
||||
{
|
||||
functionResponse: {
|
||||
name: functionCall.name,
|
||||
response: resultContent,
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
]
|
||||
try {
|
||||
const { toolParams, executionParams } = prepareToolExecution(tool, args, request)
|
||||
const result = await executeTool(toolName, executionParams)
|
||||
const toolCallEndTime = Date.now()
|
||||
const duration = toolCallEndTime - toolCallStartTime
|
||||
|
||||
const forcedToolCheck = checkForForcedToolUsage(
|
||||
[{ name: functionCall.name, args: functionCall.args }],
|
||||
state.currentToolConfig,
|
||||
forcedTools,
|
||||
state.usedForcedTools
|
||||
)
|
||||
const resultContent: Record<string, unknown> = result.success
|
||||
? ensureStructResponse(result.output)
|
||||
: { error: true, message: result.error || 'Tool execution failed', tool: toolName }
|
||||
|
||||
return {
|
||||
success: true,
|
||||
state: {
|
||||
...state,
|
||||
contents: updatedContents,
|
||||
toolCalls: [...state.toolCalls, toolCall],
|
||||
toolResults: result.success
|
||||
? [...state.toolResults, result.output as Record<string, unknown>]
|
||||
: state.toolResults,
|
||||
toolsTime: state.toolsTime + duration,
|
||||
timeSegments: [
|
||||
...state.timeSegments,
|
||||
{
|
||||
type: 'tool',
|
||||
name: toolName,
|
||||
startTime: toolCallStartTime,
|
||||
endTime: toolCallEndTime,
|
||||
duration,
|
||||
},
|
||||
],
|
||||
usedForcedTools: forcedToolCheck?.usedForcedTools ?? state.usedForcedTools,
|
||||
currentToolConfig: forcedToolCheck?.nextToolConfig ?? state.currentToolConfig,
|
||||
},
|
||||
return {
|
||||
success: result.success,
|
||||
part,
|
||||
toolName,
|
||||
args,
|
||||
resultContent,
|
||||
toolParams,
|
||||
result,
|
||||
startTime: toolCallStartTime,
|
||||
endTime: toolCallEndTime,
|
||||
duration,
|
||||
}
|
||||
} catch (error) {
|
||||
const toolCallEndTime = Date.now()
|
||||
logger.error('Error processing function call:', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
functionName: toolName,
|
||||
})
|
||||
return {
|
||||
success: false,
|
||||
part,
|
||||
toolName,
|
||||
args,
|
||||
resultContent: {
|
||||
error: true,
|
||||
message: error instanceof Error ? error.message : 'Tool execution failed',
|
||||
tool: toolName,
|
||||
},
|
||||
toolParams: {},
|
||||
startTime: toolCallStartTime,
|
||||
endTime: toolCallEndTime,
|
||||
duration: toolCallEndTime - toolCallStartTime,
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error processing function call:', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
functionName: toolName,
|
||||
})
|
||||
})
|
||||
|
||||
const results = await Promise.all(executionPromises)
|
||||
|
||||
// Check if at least one tool was found (not all failed due to missing tools)
|
||||
const hasValidResults = results.some((r) => r.result !== undefined)
|
||||
if (!hasValidResults && results.every((r) => !r.success)) {
|
||||
return { success: false, state }
|
||||
}
|
||||
|
||||
// Build batched messages per Gemini spec:
|
||||
// ONE model message with ALL function call parts
|
||||
// ONE user message with ALL function responses
|
||||
const modelParts: Part[] = results.map((r) => r.part)
|
||||
const userParts: Part[] = results.map((r) => ({
|
||||
functionResponse: {
|
||||
name: r.toolName,
|
||||
response: r.resultContent,
|
||||
},
|
||||
}))
|
||||
|
||||
const updatedContents: Content[] = [
|
||||
...state.contents,
|
||||
{ role: 'model', parts: modelParts },
|
||||
{ role: 'user', parts: userParts },
|
||||
]
|
||||
|
||||
// Collect all tool calls and results
|
||||
const newToolCalls: FunctionCallResponse[] = []
|
||||
const newToolResults: Record<string, unknown>[] = []
|
||||
const newTimeSegments: ExecutionState['timeSegments'] = []
|
||||
let totalToolsTime = 0
|
||||
|
||||
for (const r of results) {
|
||||
newToolCalls.push({
|
||||
name: r.toolName,
|
||||
arguments: r.toolParams,
|
||||
startTime: new Date(r.startTime).toISOString(),
|
||||
endTime: new Date(r.endTime).toISOString(),
|
||||
duration: r.duration,
|
||||
result: r.resultContent,
|
||||
})
|
||||
|
||||
if (r.success && r.result?.output) {
|
||||
newToolResults.push(r.result.output as Record<string, unknown>)
|
||||
}
|
||||
|
||||
newTimeSegments.push({
|
||||
type: 'tool',
|
||||
name: r.toolName,
|
||||
startTime: r.startTime,
|
||||
endTime: r.endTime,
|
||||
duration: r.duration,
|
||||
})
|
||||
|
||||
totalToolsTime += r.duration
|
||||
}
|
||||
|
||||
// Check forced tool usage for all executed tools
|
||||
const executedToolsInfo = results.map((r) => ({ name: r.toolName, args: r.args }))
|
||||
const forcedToolCheck = checkForForcedToolUsage(
|
||||
executedToolsInfo,
|
||||
state.currentToolConfig,
|
||||
forcedTools,
|
||||
state.usedForcedTools
|
||||
)
|
||||
|
||||
return {
|
||||
success: true,
|
||||
state: {
|
||||
...state,
|
||||
contents: updatedContents,
|
||||
toolCalls: [...state.toolCalls, ...newToolCalls],
|
||||
toolResults: [...state.toolResults, ...newToolResults],
|
||||
toolsTime: state.toolsTime + totalToolsTime,
|
||||
timeSegments: [...state.timeSegments, ...newTimeSegments],
|
||||
usedForcedTools: forcedToolCheck?.usedForcedTools ?? state.usedForcedTools,
|
||||
currentToolConfig: forcedToolCheck?.nextToolConfig ?? state.currentToolConfig,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -506,27 +573,25 @@ export async function executeGeminiRequest(
|
||||
// Tool execution loop
|
||||
const functionCalls = response.functionCalls
|
||||
if (functionCalls?.length) {
|
||||
logger.info(`Received function call from Gemini: ${functionCalls[0].name}`)
|
||||
const functionNames = functionCalls.map((fc) => fc.name).join(', ')
|
||||
logger.info(`Received ${functionCalls.length} function call(s) from Gemini: ${functionNames}`)
|
||||
|
||||
while (state.iterationCount < MAX_TOOL_ITERATIONS) {
|
||||
const functionCallPart = extractFunctionCallPart(currentResponse.candidates?.[0])
|
||||
if (!functionCallPart?.functionCall) {
|
||||
// Extract ALL function call parts from the response (Gemini can return multiple)
|
||||
const functionCallParts = extractAllFunctionCallParts(currentResponse.candidates?.[0])
|
||||
if (functionCallParts.length === 0) {
|
||||
content = extractTextContent(currentResponse.candidates?.[0])
|
||||
break
|
||||
}
|
||||
|
||||
const functionCall: ParsedFunctionCall = {
|
||||
name: functionCallPart.functionCall.name ?? '',
|
||||
args: (functionCallPart.functionCall.args ?? {}) as Record<string, unknown>,
|
||||
}
|
||||
|
||||
const callNames = functionCallParts.map((p) => p.functionCall?.name ?? 'unknown').join(', ')
|
||||
logger.info(
|
||||
`Processing function call: ${functionCall.name} (iteration ${state.iterationCount + 1})`
|
||||
`Processing ${functionCallParts.length} function call(s): ${callNames} (iteration ${state.iterationCount + 1})`
|
||||
)
|
||||
|
||||
const { success, state: updatedState } = await executeToolCall(
|
||||
functionCallPart,
|
||||
functionCall,
|
||||
// Execute ALL function calls in this batch
|
||||
const { success, state: updatedState } = await executeToolCallsBatch(
|
||||
functionCallParts,
|
||||
request,
|
||||
state,
|
||||
forcedTools,
|
||||
|
||||
@@ -109,6 +109,7 @@ export function extractFunctionCall(candidate: Candidate | undefined): ParsedFun
|
||||
|
||||
/**
|
||||
* Extracts the full Part containing the function call (preserves thoughtSignature)
|
||||
* @deprecated Use extractAllFunctionCallParts for proper multi-tool handling
|
||||
*/
|
||||
export function extractFunctionCallPart(candidate: Candidate | undefined): Part | null {
|
||||
if (!candidate?.content?.parts) return null
|
||||
@@ -122,6 +123,17 @@ export function extractFunctionCallPart(candidate: Candidate | undefined): Part
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts ALL Parts containing function calls from a candidate.
|
||||
* Gemini can return multiple function calls in a single response,
|
||||
* and all should be executed before continuing the conversation.
|
||||
*/
|
||||
export function extractAllFunctionCallParts(candidate: Candidate | undefined): Part[] {
|
||||
if (!candidate?.content?.parts) return []
|
||||
|
||||
return candidate.content.parts.filter((part) => part.functionCall)
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts usage metadata from SDK response to our format.
|
||||
* Per Gemini docs, total = promptTokenCount + candidatesTokenCount + toolUsePromptTokenCount + thoughtsTokenCount
|
||||
|
||||
@@ -320,6 +320,7 @@ export const groqProvider: ProviderConfig = {
|
||||
currentMessages.push({
|
||||
role: 'tool',
|
||||
tool_call_id: toolCall.id,
|
||||
name: toolName,
|
||||
content: JSON.stringify(resultContent),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -383,6 +383,7 @@ export const mistralProvider: ProviderConfig = {
|
||||
currentMessages.push({
|
||||
role: 'tool',
|
||||
tool_call_id: toolCall.id,
|
||||
name: toolName,
|
||||
content: JSON.stringify(resultContent),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -12,39 +12,70 @@ import {
|
||||
import { persistWorkflowOperation } from '@/socket/database/operations'
|
||||
import type { AuthenticatedSocket } from '@/socket/middleware/auth'
|
||||
import { checkRolePermission } from '@/socket/middleware/permissions'
|
||||
import type { IRoomManager } from '@/socket/rooms'
|
||||
import type { IRoomManager, UserSession } from '@/socket/rooms'
|
||||
import { WorkflowOperationSchema } from '@/socket/validation/schemas'
|
||||
|
||||
const logger = createLogger('OperationsHandlers')
|
||||
|
||||
export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
|
||||
socket.on('workflow-operation', async (data) => {
|
||||
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
if (!workflowId || !session) {
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'SESSION_ERROR',
|
||||
message: 'Session expired, please rejoin workflow',
|
||||
})
|
||||
if (data?.operationId) {
|
||||
socket.emit('operation-failed', { operationId: data.operationId, error: 'Session expired' })
|
||||
const emitOperationError = (
|
||||
forbidden: { type: string; message: string; operation?: string; target?: string },
|
||||
failed?: { error: string; retryable?: boolean }
|
||||
) => {
|
||||
socket.emit('operation-forbidden', forbidden)
|
||||
if (failed && data?.operationId) {
|
||||
socket.emit('operation-failed', { operationId: data.operationId, ...failed })
|
||||
}
|
||||
}
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
||||
{ error: 'Realtime unavailable', retryable: true }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
|
||||
let workflowId: string | null = null
|
||||
let session: UserSession | null = null
|
||||
|
||||
try {
|
||||
workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
session = await roomManager.getUserSession(socket.id)
|
||||
} catch (error) {
|
||||
logger.error('Error loading session for workflow operation:', error)
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
||||
{ error: 'Realtime unavailable', retryable: true }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if (!workflowId || !session) {
|
||||
emitOperationError(
|
||||
{ type: 'SESSION_ERROR', message: 'Session expired, please rejoin workflow' },
|
||||
{ error: 'Session expired' }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
let hasRoom = false
|
||||
try {
|
||||
hasRoom = await roomManager.hasWorkflowRoom(workflowId)
|
||||
} catch (error) {
|
||||
logger.error('Error checking workflow room:', error)
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_MANAGER_UNAVAILABLE', message: 'Realtime unavailable' },
|
||||
{ error: 'Realtime unavailable', retryable: true }
|
||||
)
|
||||
return
|
||||
}
|
||||
if (!hasRoom) {
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'ROOM_NOT_FOUND',
|
||||
message: 'Workflow room not found',
|
||||
})
|
||||
if (data?.operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId: data.operationId,
|
||||
error: 'Workflow room not found',
|
||||
})
|
||||
}
|
||||
emitOperationError(
|
||||
{ type: 'ROOM_NOT_FOUND', message: 'Workflow room not found' },
|
||||
{ error: 'Workflow room not found' }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -77,15 +108,15 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
|
||||
// Check permissions from cached role for all other operations
|
||||
if (!userPresence) {
|
||||
logger.warn(`User presence not found for socket ${socket.id}`)
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'SESSION_ERROR',
|
||||
message: 'User session not found',
|
||||
operation,
|
||||
target,
|
||||
})
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', { operationId, error: 'User session not found' })
|
||||
}
|
||||
emitOperationError(
|
||||
{
|
||||
type: 'SESSION_ERROR',
|
||||
message: 'User session not found',
|
||||
operation,
|
||||
target,
|
||||
},
|
||||
{ error: 'User session not found' }
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -97,7 +128,7 @@ export function setupOperationsHandlers(socket: AuthenticatedSocket, roomManager
|
||||
logger.warn(
|
||||
`User ${session.userId} (role: ${userPresence.role}) forbidden from ${operation} on ${target}`
|
||||
)
|
||||
socket.emit('operation-forbidden', {
|
||||
emitOperationError({
|
||||
type: 'INSUFFICIENT_PERMISSIONS',
|
||||
message: `${permissionCheck.reason} on '${target}'`,
|
||||
operation,
|
||||
|
||||
@@ -48,6 +48,21 @@ export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
operationId,
|
||||
} = data
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'ROOM_MANAGER_UNAVAILABLE',
|
||||
message: 'Realtime unavailable',
|
||||
})
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId,
|
||||
error: 'Realtime unavailable',
|
||||
retryable: true,
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
@@ -37,6 +37,21 @@ export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
socket.on('variable-update', async (data) => {
|
||||
const { workflowId: payloadWorkflowId, variableId, field, value, timestamp, operationId } = data
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
socket.emit('operation-forbidden', {
|
||||
type: 'ROOM_MANAGER_UNAVAILABLE',
|
||||
message: 'Realtime unavailable',
|
||||
})
|
||||
if (operationId) {
|
||||
socket.emit('operation-failed', {
|
||||
operationId,
|
||||
error: 'Realtime unavailable',
|
||||
retryable: true,
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const sessionWorkflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
@@ -20,6 +20,15 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
return
|
||||
}
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
logger.warn(`Join workflow rejected: Room manager unavailable`)
|
||||
socket.emit('join-workflow-error', {
|
||||
error: 'Realtime unavailable',
|
||||
code: 'ROOM_MANAGER_UNAVAILABLE',
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
logger.info(`Join workflow request from ${userId} (${userName}) for workflow ${workflowId}`)
|
||||
|
||||
// Verify workflow access
|
||||
@@ -128,12 +137,20 @@ export function setupWorkflowHandlers(socket: AuthenticatedSocket, roomManager:
|
||||
// Undo socket.join and room manager entry if any operation failed
|
||||
socket.leave(workflowId)
|
||||
await roomManager.removeUserFromRoom(socket.id)
|
||||
socket.emit('join-workflow-error', { error: 'Failed to join workflow' })
|
||||
const isReady = roomManager.isReady()
|
||||
socket.emit('join-workflow-error', {
|
||||
error: isReady ? 'Failed to join workflow' : 'Realtime unavailable',
|
||||
code: isReady ? undefined : 'ROOM_MANAGER_UNAVAILABLE',
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
socket.on('leave-workflow', async () => {
|
||||
try {
|
||||
if (!roomManager.isReady()) {
|
||||
return
|
||||
}
|
||||
|
||||
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
|
||||
const session = await roomManager.getUserSession(socket.id)
|
||||
|
||||
|
||||
@@ -26,6 +26,10 @@ export class MemoryRoomManager implements IRoomManager {
|
||||
logger.info('MemoryRoomManager initialized (single-pod mode)')
|
||||
}
|
||||
|
||||
isReady(): boolean {
|
||||
return true
|
||||
}
|
||||
|
||||
async shutdown(): Promise<void> {
|
||||
this.workflowRooms.clear()
|
||||
this.socketToWorkflow.clear()
|
||||
|
||||
@@ -96,17 +96,6 @@ export class RedisRoomManager implements IRoomManager {
|
||||
this._io = io
|
||||
this.redis = createClient({
|
||||
url: redisUrl,
|
||||
socket: {
|
||||
reconnectStrategy: (retries) => {
|
||||
if (retries > 10) {
|
||||
logger.error('Redis reconnection failed after 10 attempts')
|
||||
return new Error('Redis reconnection failed')
|
||||
}
|
||||
const delay = Math.min(retries * 100, 3000)
|
||||
logger.warn(`Redis reconnecting in ${delay}ms (attempt ${retries})`)
|
||||
return delay
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
this.redis.on('error', (err) => {
|
||||
@@ -122,12 +111,21 @@ export class RedisRoomManager implements IRoomManager {
|
||||
logger.info('Redis client ready')
|
||||
this.isConnected = true
|
||||
})
|
||||
|
||||
this.redis.on('end', () => {
|
||||
logger.warn('Redis client connection closed')
|
||||
this.isConnected = false
|
||||
})
|
||||
}
|
||||
|
||||
get io(): Server {
|
||||
return this._io
|
||||
}
|
||||
|
||||
isReady(): boolean {
|
||||
return this.isConnected
|
||||
}
|
||||
|
||||
async initialize(): Promise<void> {
|
||||
if (this.isConnected) return
|
||||
|
||||
|
||||
@@ -48,6 +48,11 @@ export interface IRoomManager {
|
||||
*/
|
||||
initialize(): Promise<void>
|
||||
|
||||
/**
|
||||
* Whether the room manager is ready to serve requests
|
||||
*/
|
||||
isReady(): boolean
|
||||
|
||||
/**
|
||||
* Clean shutdown
|
||||
*/
|
||||
|
||||
@@ -85,6 +85,11 @@ export function createHttpHandler(roomManager: IRoomManager, logger: Logger) {
|
||||
res.end(JSON.stringify({ error: authResult.error }))
|
||||
return
|
||||
}
|
||||
|
||||
if (!roomManager.isReady()) {
|
||||
sendError(res, 'Room manager unavailable', 503)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Handle workflow deletion notifications from the main API
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { RunActorParams, RunActorResult } from '@/tools/apify/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
export const apifyRunActorAsyncTool: ToolConfig<RunActorParams, RunActorResult> = {
|
||||
id: 'apify_run_actor_async',
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types'
|
||||
import type { ToolConfig, ToolResponse } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('BrowserUseTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = 600000 // 10 minutes
|
||||
const MAX_POLL_TIME_MS = getMaxExecutionTimeout()
|
||||
const MAX_CONSECUTIVE_ERRORS = 3
|
||||
|
||||
async function createSessionWithProfile(
|
||||
|
||||
@@ -1,16 +1,7 @@
|
||||
import { httpHeaderSafeJson } from '@/lib/core/utils/validation'
|
||||
import type { DropboxDownloadParams, DropboxDownloadResponse } from '@/tools/dropbox/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
/**
|
||||
* Escapes non-ASCII characters in JSON string for HTTP header safety.
|
||||
* Dropbox API requires characters 0x7F and all non-ASCII to be escaped as \uXXXX.
|
||||
*/
|
||||
function httpHeaderSafeJson(value: object): string {
|
||||
return JSON.stringify(value).replace(/[\u007f-\uffff]/g, (c) => {
|
||||
return '\\u' + ('0000' + c.charCodeAt(0).toString(16)).slice(-4)
|
||||
})
|
||||
}
|
||||
|
||||
export const dropboxDownloadTool: ToolConfig<DropboxDownloadParams, DropboxDownloadResponse> = {
|
||||
id: 'dropbox_download',
|
||||
name: 'Dropbox Download File',
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { ExaResearchParams, ExaResearchResponse } from '@/tools/exa/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('ExaResearchTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
export const researchTool: ToolConfig<ExaResearchParams, ExaResearchResponse> = {
|
||||
id: 'exa_research',
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { AgentParams, AgentResponse } from '@/tools/firecrawl/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('FirecrawlAgentTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
export const agentTool: ToolConfig<AgentParams, AgentResponse> = {
|
||||
id: 'firecrawl_agent',
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { FirecrawlCrawlParams, FirecrawlCrawlResponse } from '@/tools/firecrawl/types'
|
||||
import { CRAWLED_PAGE_OUTPUT_PROPERTIES } from '@/tools/firecrawl/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('FirecrawlCrawlTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
export const crawlTool: ToolConfig<FirecrawlCrawlParams, FirecrawlCrawlResponse> = {
|
||||
id: 'firecrawl_crawl',
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import type { ExtractParams, ExtractResponse } from '@/tools/firecrawl/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('FirecrawlExtractTool')
|
||||
|
||||
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
|
||||
const MAX_POLL_TIME_MS = 300000 // 5 minutes maximum polling time
|
||||
const POLL_INTERVAL_MS = 5000
|
||||
const MAX_POLL_TIME_MS = DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
|
||||
export const extractTool: ToolConfig<ExtractParams, ExtractResponse> = {
|
||||
id: 'firecrawl_extract',
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { generateInternalToken } from '@/lib/auth/internal'
|
||||
import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/core/execution-limits'
|
||||
import {
|
||||
secureFetchWithPinnedIP,
|
||||
validateUrlWithDNS,
|
||||
@@ -628,9 +629,8 @@ async function executeToolRequest(
|
||||
let response: Response
|
||||
|
||||
if (isInternalRoute) {
|
||||
// Set up AbortController for timeout support on internal routes
|
||||
const controller = new AbortController()
|
||||
const timeout = requestParams.timeout || 300000
|
||||
const timeout = requestParams.timeout || DEFAULT_EXECUTION_TIMEOUT_MS
|
||||
const timeoutId = setTimeout(() => controller.abort(), timeout)
|
||||
|
||||
try {
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
import { mistralParserTool, mistralParserV2Tool } from '@/tools/mistral/parser'
|
||||
import { mistralParserTool, mistralParserV2Tool, mistralParserV3Tool } from '@/tools/mistral/parser'
|
||||
|
||||
export { mistralParserTool, mistralParserV2Tool }
|
||||
export { mistralParserTool, mistralParserV2Tool, mistralParserV3Tool }
|
||||
|
||||
@@ -349,74 +349,14 @@ export const mistralParserTool: ToolConfig<MistralParserInput, MistralParserOutp
|
||||
},
|
||||
}
|
||||
|
||||
const mistralParserV2Params = {
|
||||
file: {
|
||||
type: 'file',
|
||||
required: true,
|
||||
visibility: 'hidden',
|
||||
description: 'File data from a previous block',
|
||||
},
|
||||
resultType: mistralParserTool.params.resultType,
|
||||
includeImageBase64: mistralParserTool.params.includeImageBase64,
|
||||
pages: mistralParserTool.params.pages,
|
||||
imageLimit: mistralParserTool.params.imageLimit,
|
||||
imageMinSize: mistralParserTool.params.imageMinSize,
|
||||
apiKey: mistralParserTool.params.apiKey,
|
||||
} satisfies ToolConfig['params']
|
||||
|
||||
export const mistralParserV2Tool: ToolConfig<MistralParserV2Input, MistralParserV2Output> = {
|
||||
export const mistralParserV2Tool: ToolConfig<MistralParserInput, MistralParserV2Output> = {
|
||||
id: 'mistral_parser_v2',
|
||||
name: 'Mistral PDF Parser',
|
||||
description: 'Parse PDF documents using Mistral OCR API',
|
||||
version: '2.0.0',
|
||||
|
||||
params: mistralParserV2Params,
|
||||
request: {
|
||||
url: '/api/tools/mistral/parse',
|
||||
method: 'POST',
|
||||
headers: (params) => {
|
||||
return {
|
||||
'Content-Type': 'application/json',
|
||||
Accept: 'application/json',
|
||||
Authorization: `Bearer ${params.apiKey}`,
|
||||
}
|
||||
},
|
||||
body: (params) => {
|
||||
if (!params || typeof params !== 'object') {
|
||||
throw new Error('Invalid parameters: Parameters must be provided as an object')
|
||||
}
|
||||
if (!params.apiKey || typeof params.apiKey !== 'string' || params.apiKey.trim() === '') {
|
||||
throw new Error('Missing or invalid API key: A valid Mistral API key is required')
|
||||
}
|
||||
|
||||
const file = params.file
|
||||
if (!file || typeof file !== 'object') {
|
||||
throw new Error('File input is required')
|
||||
}
|
||||
|
||||
const requestBody: Record<string, unknown> = {
|
||||
apiKey: params.apiKey,
|
||||
resultType: params.resultType || 'markdown',
|
||||
}
|
||||
|
||||
requestBody.file = file
|
||||
|
||||
if (params.pages) {
|
||||
requestBody.pages = params.pages
|
||||
}
|
||||
if (params.includeImageBase64 !== undefined) {
|
||||
requestBody.includeImageBase64 = params.includeImageBase64
|
||||
}
|
||||
if (params.imageLimit !== undefined) {
|
||||
requestBody.imageLimit = params.imageLimit
|
||||
}
|
||||
if (params.imageMinSize !== undefined) {
|
||||
requestBody.imageMinSize = params.imageMinSize
|
||||
}
|
||||
|
||||
return requestBody
|
||||
},
|
||||
},
|
||||
params: mistralParserTool.params,
|
||||
request: mistralParserTool.request,
|
||||
|
||||
transformResponse: async (response: Response) => {
|
||||
let ocrResult
|
||||
@@ -543,3 +483,73 @@ export const mistralParserV2Tool: ToolConfig<MistralParserV2Input, MistralParser
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
/**
|
||||
* V3 tool - Updated for new file handling pattern with UserFile normalization
|
||||
* Used by MistralParseV3Block which uses fileUpload (basic) and fileReference (advanced) subblocks
|
||||
*/
|
||||
export const mistralParserV3Tool: ToolConfig<MistralParserV2Input, MistralParserV2Output> = {
|
||||
...mistralParserV2Tool,
|
||||
id: 'mistral_parser_v3',
|
||||
version: '3.0.0',
|
||||
params: {
|
||||
file: {
|
||||
type: 'file',
|
||||
required: true,
|
||||
visibility: 'hidden',
|
||||
description: 'Normalized UserFile from file upload or file reference',
|
||||
},
|
||||
resultType: mistralParserTool.params.resultType,
|
||||
includeImageBase64: mistralParserTool.params.includeImageBase64,
|
||||
pages: mistralParserTool.params.pages,
|
||||
imageLimit: mistralParserTool.params.imageLimit,
|
||||
imageMinSize: mistralParserTool.params.imageMinSize,
|
||||
apiKey: mistralParserTool.params.apiKey,
|
||||
},
|
||||
request: {
|
||||
url: '/api/tools/mistral/parse',
|
||||
method: 'POST',
|
||||
headers: (params) => {
|
||||
return {
|
||||
'Content-Type': 'application/json',
|
||||
Accept: 'application/json',
|
||||
Authorization: `Bearer ${params.apiKey}`,
|
||||
}
|
||||
},
|
||||
body: (params) => {
|
||||
if (!params || typeof params !== 'object') {
|
||||
throw new Error('Invalid parameters: Parameters must be provided as an object')
|
||||
}
|
||||
if (!params.apiKey || typeof params.apiKey !== 'string' || params.apiKey.trim() === '') {
|
||||
throw new Error('Missing or invalid API key: A valid Mistral API key is required')
|
||||
}
|
||||
|
||||
// V3 expects normalized UserFile object via `file` param
|
||||
const file = params.file
|
||||
if (!file || typeof file !== 'object') {
|
||||
throw new Error('File input is required: provide a file upload or file reference')
|
||||
}
|
||||
|
||||
const requestBody: Record<string, unknown> = {
|
||||
apiKey: params.apiKey,
|
||||
resultType: params.resultType || 'markdown',
|
||||
file: file,
|
||||
}
|
||||
|
||||
if (params.pages) {
|
||||
requestBody.pages = params.pages
|
||||
}
|
||||
if (params.includeImageBase64 !== undefined) {
|
||||
requestBody.includeImageBase64 = params.includeImageBase64
|
||||
}
|
||||
if (params.imageLimit !== undefined) {
|
||||
requestBody.imageLimit = params.imageLimit
|
||||
}
|
||||
if (params.imageMinSize !== undefined) {
|
||||
requestBody.imageMinSize = params.imageMinSize
|
||||
}
|
||||
|
||||
return requestBody
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1093,7 +1093,7 @@ import {
|
||||
microsoftTeamsWriteChannelTool,
|
||||
microsoftTeamsWriteChatTool,
|
||||
} from '@/tools/microsoft_teams'
|
||||
import { mistralParserTool, mistralParserV2Tool } from '@/tools/mistral'
|
||||
import { mistralParserTool, mistralParserV2Tool, mistralParserV3Tool } from '@/tools/mistral'
|
||||
import {
|
||||
mongodbDeleteTool,
|
||||
mongodbExecuteTool,
|
||||
@@ -2684,6 +2684,7 @@ export const tools: Record<string, ToolConfig> = {
|
||||
apollo_email_accounts: apolloEmailAccountsTool,
|
||||
mistral_parser: mistralParserTool,
|
||||
mistral_parser_v2: mistralParserV2Tool,
|
||||
mistral_parser_v3: mistralParserV3Tool,
|
||||
reducto_parser: reductoParserTool,
|
||||
reducto_parser_v2: reductoParserV2Tool,
|
||||
textract_parser: textractParserTool,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { AGENT, isCustomTool } from '@/executor/constants'
|
||||
import { getCustomTool } from '@/hooks/queries/custom-tools'
|
||||
@@ -122,9 +123,7 @@ export function formatRequestParams(tool: ToolConfig, params: Record<string, any
|
||||
}
|
||||
}
|
||||
|
||||
// Get timeout from params (if specified) and validate
|
||||
// Must be a finite positive number, max 600000ms (10 minutes) as documented
|
||||
const MAX_TIMEOUT_MS = 600000
|
||||
const MAX_TIMEOUT_MS = getMaxExecutionTimeout()
|
||||
const rawTimeout = params.timeout
|
||||
const timeout = rawTimeout != null ? Number(rawTimeout) : undefined
|
||||
const validTimeout =
|
||||
|
||||
@@ -6,7 +6,7 @@ export default defineConfig({
|
||||
project: env.TRIGGER_PROJECT_ID!,
|
||||
runtime: 'node',
|
||||
logLevel: 'log',
|
||||
maxDuration: 600,
|
||||
maxDuration: 5400,
|
||||
retries: {
|
||||
enabledInDev: false,
|
||||
default: {
|
||||
|
||||
1
bun.lock
1
bun.lock
@@ -1,6 +1,5 @@
|
||||
{
|
||||
"lockfileVersion": 1,
|
||||
"configVersion": 1,
|
||||
"workspaces": {
|
||||
"": {
|
||||
"name": "simstudio",
|
||||
|
||||
@@ -127,6 +127,18 @@ app:
|
||||
RATE_LIMIT_WINDOW_MS: "60000" # Rate limit window duration (1 minute)
|
||||
RATE_LIMIT_FREE_SYNC: "50" # Sync API executions per minute
|
||||
RATE_LIMIT_FREE_ASYNC: "200" # Async API executions per minute
|
||||
|
||||
# Execution Timeout Configuration (in seconds)
|
||||
# Sync timeouts apply to synchronous API calls
|
||||
EXECUTION_TIMEOUT_FREE: "300" # Free tier sync timeout (5 minutes)
|
||||
EXECUTION_TIMEOUT_PRO: "3000" # Pro tier sync timeout (50 minutes)
|
||||
EXECUTION_TIMEOUT_TEAM: "3000" # Team tier sync timeout (50 minutes)
|
||||
EXECUTION_TIMEOUT_ENTERPRISE: "3000" # Enterprise tier sync timeout (50 minutes)
|
||||
# Async timeouts apply to async/background job executions
|
||||
EXECUTION_TIMEOUT_ASYNC_FREE: "5400" # Free tier async timeout (90 minutes)
|
||||
EXECUTION_TIMEOUT_ASYNC_PRO: "5400" # Pro tier async timeout (90 minutes)
|
||||
EXECUTION_TIMEOUT_ASYNC_TEAM: "5400" # Team tier async timeout (90 minutes)
|
||||
EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: "5400" # Enterprise tier async timeout (90 minutes)
|
||||
|
||||
# UI Branding & Whitelabeling Configuration
|
||||
NEXT_PUBLIC_BRAND_NAME: "Sim" # Custom brand name
|
||||
|
||||
19
packages/db/migrations/0151_stale_screwball.sql
Normal file
19
packages/db/migrations/0151_stale_screwball.sql
Normal file
@@ -0,0 +1,19 @@
|
||||
CREATE TABLE "async_jobs" (
|
||||
"id" text PRIMARY KEY NOT NULL,
|
||||
"type" text NOT NULL,
|
||||
"payload" jsonb NOT NULL,
|
||||
"status" text DEFAULT 'pending' NOT NULL,
|
||||
"created_at" timestamp DEFAULT now() NOT NULL,
|
||||
"started_at" timestamp,
|
||||
"completed_at" timestamp,
|
||||
"run_at" timestamp,
|
||||
"attempts" integer DEFAULT 0 NOT NULL,
|
||||
"max_attempts" integer DEFAULT 3 NOT NULL,
|
||||
"error" text,
|
||||
"output" jsonb,
|
||||
"metadata" jsonb DEFAULT '{}' NOT NULL,
|
||||
"updated_at" timestamp DEFAULT now() NOT NULL
|
||||
);
|
||||
--> statement-breakpoint
|
||||
CREATE INDEX "async_jobs_status_started_at_idx" ON "async_jobs" USING btree ("status","started_at");--> statement-breakpoint
|
||||
CREATE INDEX "async_jobs_status_completed_at_idx" ON "async_jobs" USING btree ("status","completed_at");
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user