Files
sim/apps/sim/app/api/workflows/[id]/state/route.ts
Vikhyath Mondreti be3cdcf981 Merge pull request #3179 from simstudioai/improvement/file-download-timeouts
improvement(timeouts): files/base64 should use max timeouts + auth centralization
2026-02-10 15:57:06 -08:00

292 lines
9.6 KiB
TypeScript

import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { env } from '@/lib/core/config/env'
import { generateRequestId } from '@/lib/core/utils/request'
import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom-tools-persistence'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
import { sanitizeAgentToolsInBlocks } from '@/lib/workflows/sanitization/validation'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
import type { BlockState } from '@/stores/workflows/workflow/types'
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
const logger = createLogger('WorkflowStateAPI')
const PositionSchema = z.object({
x: z.number(),
y: z.number(),
})
const BlockDataSchema = z.object({
parentId: z.string().optional(),
extent: z.literal('parent').optional(),
width: z.number().optional(),
height: z.number().optional(),
collection: z.unknown().optional(),
count: z.number().optional(),
loopType: z.enum(['for', 'forEach', 'while', 'doWhile']).optional(),
whileCondition: z.string().optional(),
doWhileCondition: z.string().optional(),
parallelType: z.enum(['collection', 'count']).optional(),
type: z.string().optional(),
canonicalModes: z.record(z.enum(['basic', 'advanced'])).optional(),
})
const SubBlockStateSchema = z.object({
id: z.string(),
type: z.string(),
value: z.any(),
})
const BlockOutputSchema = z.any()
const BlockStateSchema = z.object({
id: z.string(),
type: z.string(),
name: z.string(),
position: PositionSchema,
subBlocks: z.record(SubBlockStateSchema),
outputs: z.record(BlockOutputSchema),
enabled: z.boolean(),
horizontalHandles: z.boolean().optional(),
height: z.number().optional(),
advancedMode: z.boolean().optional(),
triggerMode: z.boolean().optional(),
data: BlockDataSchema.optional(),
})
const EdgeSchema = z.object({
id: z.string(),
source: z.string(),
target: z.string(),
sourceHandle: z.string().optional(),
targetHandle: z.string().optional(),
type: z.string().optional(),
animated: z.boolean().optional(),
style: z.record(z.any()).optional(),
data: z.record(z.any()).optional(),
label: z.string().optional(),
labelStyle: z.record(z.any()).optional(),
labelShowBg: z.boolean().optional(),
labelBgStyle: z.record(z.any()).optional(),
labelBgPadding: z.array(z.number()).optional(),
labelBgBorderRadius: z.number().optional(),
markerStart: z.string().optional(),
markerEnd: z.string().optional(),
})
const LoopSchema = z.object({
id: z.string(),
nodes: z.array(z.string()),
iterations: z.number(),
loopType: z.enum(['for', 'forEach', 'while', 'doWhile']),
forEachItems: z.union([z.array(z.any()), z.record(z.any()), z.string()]).optional(),
whileCondition: z.string().optional(),
doWhileCondition: z.string().optional(),
})
const ParallelSchema = z.object({
id: z.string(),
nodes: z.array(z.string()),
distribution: z.union([z.array(z.any()), z.record(z.any()), z.string()]).optional(),
count: z.number().optional(),
parallelType: z.enum(['count', 'collection']).optional(),
})
const WorkflowStateSchema = z.object({
blocks: z.record(BlockStateSchema),
edges: z.array(EdgeSchema),
loops: z.record(LoopSchema).optional(),
parallels: z.record(ParallelSchema).optional(),
lastSaved: z.number().optional(),
isDeployed: z.boolean().optional(),
deployedAt: z.coerce.date().optional(),
variables: z.any().optional(), // Workflow variables
})
/**
* PUT /api/workflows/[id]/state
* Save complete workflow state to normalized database tables
*/
export async function PUT(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const startTime = Date.now()
const { id: workflowId } = await params
try {
const auth = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
logger.warn(`[${requestId}] Unauthorized state update attempt for workflow ${workflowId}`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId
const body = await request.json()
const state = WorkflowStateSchema.parse(body)
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId,
userId,
action: 'write',
})
const workflowData = authorization.workflow
if (!workflowData) {
logger.warn(`[${requestId}] Workflow ${workflowId} not found for state update`)
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
const canUpdate = authorization.allowed
if (!canUpdate) {
logger.warn(
`[${requestId}] User ${userId} denied permission to update workflow state ${workflowId}`
)
return NextResponse.json(
{ error: authorization.message || 'Access denied' },
{ status: authorization.status || 403 }
)
}
// Sanitize custom tools in agent blocks before saving
const { blocks: sanitizedBlocks, warnings } = sanitizeAgentToolsInBlocks(state.blocks as any)
// Save to normalized tables
// Ensure all required fields are present for WorkflowState type
// Filter out blocks without type or name before saving
const filteredBlocks = Object.entries(sanitizedBlocks).reduce(
(acc, [blockId, block]: [string, any]) => {
if (block.type && block.name) {
// Ensure all required fields are present
acc[blockId] = {
...block,
enabled: block.enabled !== undefined ? block.enabled : true,
horizontalHandles:
block.horizontalHandles !== undefined ? block.horizontalHandles : true,
height: block.height !== undefined ? block.height : 0,
subBlocks: block.subBlocks || {},
outputs: block.outputs || {},
}
}
return acc
},
{} as typeof state.blocks
)
const typedBlocks = filteredBlocks as Record<string, BlockState>
const canonicalLoops = generateLoopBlocks(typedBlocks)
const canonicalParallels = generateParallelBlocks(typedBlocks)
const workflowState = {
blocks: filteredBlocks,
edges: state.edges,
loops: canonicalLoops,
parallels: canonicalParallels,
lastSaved: state.lastSaved || Date.now(),
isDeployed: state.isDeployed || false,
deployedAt: state.deployedAt,
}
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowState as any)
if (!saveResult.success) {
logger.error(`[${requestId}] Failed to save workflow ${workflowId} state:`, saveResult.error)
return NextResponse.json(
{ error: 'Failed to save workflow state', details: saveResult.error },
{ status: 500 }
)
}
// Extract and persist custom tools to database
try {
const workspaceId = workflowData.workspaceId
if (workspaceId) {
const { saved, errors } = await extractAndPersistCustomTools(
workflowState,
workspaceId,
userId
)
if (saved > 0) {
logger.info(`[${requestId}] Persisted ${saved} custom tool(s) to database`, {
workflowId,
})
}
if (errors.length > 0) {
logger.warn(`[${requestId}] Some custom tools failed to persist`, { errors, workflowId })
}
} else {
logger.warn(
`[${requestId}] Workflow has no workspaceId, skipping custom tools persistence`,
{
workflowId,
}
)
}
} catch (error) {
logger.error(`[${requestId}] Failed to persist custom tools`, { error, workflowId })
}
// Update workflow's lastSynced timestamp and variables if provided
const updateData: any = {
lastSynced: new Date(),
updatedAt: new Date(),
}
// If variables are provided in the state, update them in the workflow record
if (state.variables !== undefined) {
updateData.variables = state.variables
}
await db.update(workflow).set(updateData).where(eq(workflow.id, workflowId))
const elapsed = Date.now() - startTime
logger.info(`[${requestId}] Successfully saved workflow ${workflowId} state in ${elapsed}ms`)
try {
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
const notifyResponse = await fetch(`${socketUrl}/api/workflow-updated`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
body: JSON.stringify({ workflowId }),
})
if (!notifyResponse.ok) {
logger.warn(
`[${requestId}] Failed to notify Socket.IO server about workflow ${workflowId} update`
)
}
} catch (notificationError) {
logger.warn(
`[${requestId}] Error notifying Socket.IO server about workflow ${workflowId} update`,
notificationError
)
}
return NextResponse.json({ success: true, warnings }, { status: 200 })
} catch (error: any) {
const elapsed = Date.now() - startTime
logger.error(
`[${requestId}] Error saving workflow ${workflowId} state after ${elapsed}ms`,
error
)
if (error instanceof z.ZodError) {
return NextResponse.json(
{ error: 'Invalid request body', details: error.errors },
{ status: 400 }
)
}
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}