This commit is contained in:
Siddharth Ganesan
2025-07-08 21:20:00 -07:00
parent bacb6f3831
commit 684a8020d4
3 changed files with 409 additions and 130 deletions

View File

@@ -0,0 +1,149 @@
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console-logger'
import { getUserEntityPermissions } from '@/lib/permissions/utils'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/db-helpers'
import { db } from '@/db'
import { workflow } from '@/db/schema'
const logger = createLogger('WorkflowStateAPI')
const WorkflowStateSchema = z.object({
blocks: z.record(z.any()),
edges: z.array(z.any()),
loops: z.record(z.any()).optional(),
parallels: z.record(z.any()).optional(),
lastSaved: z.number().optional(),
isDeployed: z.boolean().optional(),
deployedAt: z.date().optional(),
deploymentStatuses: z.record(z.any()).optional(),
hasActiveSchedule: z.boolean().optional(),
hasActiveWebhook: z.boolean().optional(),
})
/**
* PUT /api/workflows/[id]/state
* Save complete workflow state to normalized database tables
*/
export async function PUT(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
const startTime = Date.now()
const { id: workflowId } = await params
try {
// Get the session
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized state update attempt for workflow ${workflowId}`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const userId = session.user.id
// Parse and validate request body
const body = await request.json()
const state = WorkflowStateSchema.parse(body)
// Fetch the workflow to check ownership/access
const workflowData = await db
.select()
.from(workflow)
.where(eq(workflow.id, workflowId))
.then((rows) => rows[0])
if (!workflowData) {
logger.warn(`[${requestId}] Workflow ${workflowId} not found for state update`)
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
// Check if user has permission to update this workflow
let canUpdate = false
// Case 1: User owns the workflow
if (workflowData.userId === userId) {
canUpdate = true
}
// Case 2: Workflow belongs to a workspace and user has write or admin permission
if (!canUpdate && workflowData.workspaceId) {
const userPermission = await getUserEntityPermissions(
userId,
'workspace',
workflowData.workspaceId
)
if (userPermission === 'write' || userPermission === 'admin') {
canUpdate = true
}
}
if (!canUpdate) {
logger.warn(
`[${requestId}] User ${userId} denied permission to update workflow state ${workflowId}`
)
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
// Save to normalized tables
logger.info(`[${requestId}] Saving workflow ${workflowId} state to normalized tables`)
// Ensure all required fields are present for WorkflowState type
const workflowState = {
blocks: state.blocks,
edges: state.edges,
loops: state.loops || {},
parallels: state.parallels || {},
lastSaved: state.lastSaved || Date.now(),
isDeployed: state.isDeployed || false,
deployedAt: state.deployedAt,
deploymentStatuses: state.deploymentStatuses || {},
hasActiveSchedule: state.hasActiveSchedule || false,
hasActiveWebhook: state.hasActiveWebhook || false,
}
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowState)
if (!saveResult.success) {
logger.error(`[${requestId}] Failed to save workflow ${workflowId} state:`, saveResult.error)
return NextResponse.json(
{ error: 'Failed to save workflow state', details: saveResult.error },
{ status: 500 }
)
}
// Update workflow's lastSynced timestamp
await db
.update(workflow)
.set({
lastSynced: new Date(),
updatedAt: new Date(),
state: saveResult.jsonBlob // Also update JSON blob for backward compatibility
})
.where(eq(workflow.id, workflowId))
const elapsed = Date.now() - startTime
logger.info(`[${requestId}] Successfully saved workflow ${workflowId} state in ${elapsed}ms`)
return NextResponse.json({
success: true,
blocksCount: Object.keys(state.blocks).length,
edgesCount: state.edges.length
}, { status: 200 })
} catch (error: any) {
const elapsed = Date.now() - startTime
if (error instanceof z.ZodError) {
logger.warn(`[${requestId}] Invalid workflow state data for ${workflowId}`, {
errors: error.errors,
})
return NextResponse.json(
{ error: 'Invalid state data', details: error.errors },
{ status: 400 }
)
}
logger.error(`[${requestId}] Error saving workflow ${workflowId} state after ${elapsed}ms`, error)
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -51,7 +51,11 @@ export function ImportControls({ disabled = false }: ImportControlsProps) {
// Stores and hooks
const { createWorkflow } = useWorkflowRegistry()
const { collaborativeAddBlock, collaborativeAddEdge } = useCollaborativeWorkflow()
const {
collaborativeAddBlock,
collaborativeAddEdge,
collaborativeSetSubblockValue
} = useCollaborativeWorkflow()
const subBlockStore = useSubBlockStore()
const handleFileUpload = async (event: React.ChangeEvent<HTMLInputElement>) => {
@@ -116,10 +120,10 @@ export function ImportControls({ disabled = false }: ImportControlsProps) {
// Navigate to the new workflow
router.push(`/workspace/${workspaceId}/w/${newWorkflowId}`)
// Small delay to ensure navigation and workflow initialization
await new Promise((resolve) => setTimeout(resolve, 1000))
// Brief delay to ensure navigation completes
await new Promise((resolve) => setTimeout(resolve, 100))
// Import the YAML into the new workflow
// Import the YAML into the new workflow (creates complete state and saves directly to DB)
const result = await importWorkflowFromYaml(yamlContent, {
addBlock: collaborativeAddBlock,
addEdge: collaborativeAddEdge,
@@ -128,7 +132,8 @@ export function ImportControls({ disabled = false }: ImportControlsProps) {
window.dispatchEvent(new CustomEvent('trigger-auto-layout'))
},
setSubBlockValue: (blockId: string, subBlockId: string, value: any) => {
subBlockStore.setValue(blockId, subBlockId, value)
// Use the collaborative function - the same one called when users type into fields
collaborativeSetSubblockValue(blockId, subBlockId, value)
},
getExistingBlocks: () => {
// This will be called after navigation, so we need to get blocks from the store
@@ -140,12 +145,9 @@ export function ImportControls({ disabled = false }: ImportControlsProps) {
setImportResult(result)
if (result.success) {
// Close dialog on success
setTimeout(() => {
setShowYamlDialog(false)
setYamlContent('')
setImportResult(null)
}, 2000)
setYamlContent('')
setShowYamlDialog(false)
logger.info('YAML import completed successfully')
}
} catch (error) {
logger.error('Failed to import YAML workflow:', error)

View File

@@ -1,6 +1,7 @@
import { load as yamlParse } from 'js-yaml'
import { createLogger } from '@/lib/logs/console-logger'
import { getBlock } from '@/blocks'
import { resolveOutputType } from '@/blocks/utils'
const logger = createLogger('WorkflowYamlImporter')
@@ -329,7 +330,7 @@ export function convertYamlToWorkflow(yamlWorkflow: YamlWorkflow): ImportResult
}
/**
* Import workflow from YAML and create blocks/edges using workflow functions
* Import workflow from YAML by creating complete state upfront (no UI simulation)
*/
export async function importWorkflowFromYaml(
yamlContent: string,
@@ -349,7 +350,7 @@ export async function importWorkflowFromYaml(
getExistingBlocks: () => Record<string, any>
}
): Promise<{ success: boolean; errors: string[]; warnings: string[]; summary?: string }> {
logger.info('Starting YAML workflow import')
logger.info('Starting YAML workflow import (complete state creation)')
try {
// Parse YAML
@@ -366,152 +367,279 @@ export async function importWorkflowFromYaml(
return { success: false, errors, warnings }
}
logger.info(`Importing ${blocks.length} blocks and ${edges.length} edges`)
logger.info(`Creating complete workflow state with ${blocks.length} blocks and ${edges.length} edges`)
logger.debug('Blocks to import:', blocks.map(b => `${b.id} (${b.type}): ${b.name}`))
// Check for existing blocks (new workflows already have a starter block)
// Get the existing workflow state (to preserve starter blocks if they exist)
const existingBlocks = workflowActions.getExistingBlocks()
const existingStarterBlocks = Object.values(existingBlocks).filter(
(block: any) => block.type === 'starter'
)
logger.debug(`Found ${existingStarterBlocks.length} existing starter blocks`)
let actualBlocksCreated = 0
// Get stores and current workflow info
const { useWorkflowStore } = require('@/stores/workflows/workflow/store')
const { useSubBlockStore } = require('@/stores/workflows/subblock/store')
const { useWorkflowRegistry } = require('@/stores/workflows/registry/store')
// Get current workflow state
const currentWorkflowState = useWorkflowStore.getState()
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
if (!activeWorkflowId) {
return { success: false, errors: ['No active workflow'], warnings: [] }
}
// Build complete blocks object
const completeBlocks: Record<string, any> = {}
const completeSubBlockValues: Record<string, Record<string, any>> = {}
const yamlIdToActualId = new Map<string, string>()
// Handle starter block
let starterBlockId: string | null = null
// Create blocks, but handle starter blocks specially
for (const block of blocks) {
if (block.type === 'starter') {
if (existingStarterBlocks.length > 0) {
// Use existing starter block
const existingStarter = existingStarterBlocks[0] as any
starterBlockId = existingStarter.id
logger.debug(`Using existing starter block: ${starterBlockId}`)
// Update the starter block's inputs if needed
Object.entries(block.inputs).forEach(([inputKey, inputValue]) => {
if (inputValue !== undefined && inputValue !== null && starterBlockId !== null) {
logger.debug(`Setting starter input: ${starterBlockId}.${inputKey} = ${inputValue}`)
workflowActions.setSubBlockValue(starterBlockId, inputKey, inputValue)
const starterBlock = blocks.find(block => block.type === 'starter')
if (starterBlock) {
if (existingStarterBlocks.length > 0) {
// Use existing starter block
const existingStarter = existingStarterBlocks[0] as any
starterBlockId = existingStarter.id
yamlIdToActualId.set(starterBlock.id, existingStarter.id)
// Keep existing starter but update its inputs
completeBlocks[existingStarter.id] = {
...existingStarter,
// Update name if provided in YAML
name: starterBlock.name !== 'Start' ? starterBlock.name : existingStarter.name,
}
// Set starter block values
completeSubBlockValues[existingStarter.id] = {
...currentWorkflowState.blocks[existingStarter.id]?.subBlocks ?
Object.fromEntries(
Object.entries(currentWorkflowState.blocks[existingStarter.id].subBlocks).map(
([key, subBlock]: [string, any]) => [key, subBlock.value]
)
) : {},
...starterBlock.inputs // Override with YAML values
}
logger.debug(`Using existing starter block: ${existingStarter.id}`)
} else {
// Create new starter block
starterBlockId = crypto.randomUUID()
yamlIdToActualId.set(starterBlock.id, starterBlockId)
// Create complete starter block from block config
const blockConfig = getBlock('starter')
if (blockConfig) {
const subBlocks: Record<string, any> = {}
blockConfig.subBlocks.forEach((subBlock) => {
subBlocks[subBlock.id] = {
id: subBlock.id,
type: subBlock.type,
value: null,
}
})
} else {
// Create new starter block with generated ID (let the system generate it)
const generatedId = crypto.randomUUID()
starterBlockId = generatedId
logger.debug(`Creating new starter block: ${generatedId}`)
workflowActions.addBlock(
generatedId,
block.type,
block.name,
block.position,
block.data,
block.parentId,
block.extent
)
actualBlocksCreated++
completeBlocks[starterBlockId] = {
id: starterBlockId,
type: 'starter',
name: starterBlock.name,
position: starterBlock.position,
subBlocks,
outputs: resolveOutputType(blockConfig.outputs),
enabled: true,
horizontalHandles: true,
isWide: false,
height: 0,
data: starterBlock.data || {},
}
// Set starter block values
completeSubBlockValues[starterBlockId] = { ...starterBlock.inputs }
logger.debug(`Created new starter block: ${starterBlockId}`)
}
} else {
// Create non-starter blocks with generated IDs to avoid conflicts
const generatedId = crypto.randomUUID()
logger.debug(`Creating block: ${generatedId} (${block.type}) originally ${block.id}`)
workflowActions.addBlock(
generatedId,
block.type,
block.name,
block.position,
block.data,
block.parentId,
block.extent
)
actualBlocksCreated++
// Update edges to use the new generated ID
edges.forEach((edge) => {
if (edge.source === block.id) {
edge.source = generatedId
}
if (edge.target === block.id) {
edge.target = generatedId
}
})
// Store mapping for setting inputs later
block.id = generatedId
}
// Update edges to use the starter block ID
if (block.type === 'starter' && starterBlockId !== null) {
const starterId = starterBlockId // TypeScript now knows this is string
edges.forEach((edge) => {
if (edge.source === block.id) {
edge.source = starterId
}
if (edge.target === block.id) {
edge.target = starterId
}
})
}
}
// Small delay to ensure blocks are created before adding edges
await new Promise((resolve) => setTimeout(resolve, 200))
// Create edges
let edgesCreated = 0
for (const edge of edges) {
try {
logger.debug(`Creating edge: ${edge.source} -> ${edge.target}`)
workflowActions.addEdge({
...edge,
id: crypto.randomUUID(), // Generate unique edge ID
})
edgesCreated++
} catch (error) {
logger.warn(`Failed to create edge ${edge.source} -> ${edge.target}:`, error)
warnings.push(`Failed to create connection from ${edge.source} to ${edge.target}`)
}
}
// Small delay before setting input values
await new Promise((resolve) => setTimeout(resolve, 200))
// Set input values for non-starter blocks
// Create all other blocks
let blocksProcessed = 0
for (const block of blocks) {
if (block.type !== 'starter') {
Object.entries(block.inputs).forEach(([inputKey, inputValue]) => {
if (inputValue !== undefined && inputValue !== null) {
try {
logger.debug(`Setting input: ${block.id}.${inputKey} = ${inputValue}`)
workflowActions.setSubBlockValue(block.id, inputKey, inputValue)
} catch (error) {
logger.warn(`Failed to set input ${block.id}.${inputKey}:`, error)
warnings.push(`Failed to set input ${inputKey} for block ${block.id}`)
}
if (block.type === 'starter') {
logger.debug(`Skipping starter block: ${block.id} (already handled)`)
continue // Already handled above
}
const blockId = crypto.randomUUID()
yamlIdToActualId.set(block.id, blockId)
// Create complete block from block config
const blockConfig = getBlock(block.type)
if (!blockConfig && (block.type === 'loop' || block.type === 'parallel')) {
// Handle loop/parallel blocks
completeBlocks[blockId] = {
id: blockId,
type: block.type,
name: block.name,
position: block.position,
subBlocks: {},
outputs: {},
enabled: true,
horizontalHandles: true,
isWide: false,
height: 0,
data: block.data || {},
}
completeSubBlockValues[blockId] = { ...block.inputs }
blocksProcessed++
logger.debug(`Prepared ${block.type} block: ${blockId} -> ${block.name}`)
} else if (blockConfig) {
// Handle regular blocks
const subBlocks: Record<string, any> = {}
blockConfig.subBlocks.forEach((subBlock) => {
subBlocks[subBlock.id] = {
id: subBlock.id,
type: subBlock.type,
value: null,
}
})
completeBlocks[blockId] = {
id: blockId,
type: block.type,
name: block.name,
position: block.position,
subBlocks,
outputs: resolveOutputType(blockConfig.outputs),
enabled: true,
horizontalHandles: true,
isWide: false,
height: 0,
data: block.data || {},
}
// Set block input values
completeSubBlockValues[blockId] = { ...block.inputs }
blocksProcessed++
logger.debug(`Prepared ${block.type} block: ${blockId} -> ${block.name}`)
} else {
logger.warn(`No block config found for type: ${block.type} (block: ${block.id})`)
}
}
logger.info(`Processed ${blocksProcessed} non-starter blocks, total blocks in state: ${Object.keys(completeBlocks).length}`)
// Create complete edges using the ID mapping
const completeEdges: any[] = []
for (const edge of edges) {
const sourceId = yamlIdToActualId.get(edge.source)
const targetId = yamlIdToActualId.get(edge.target)
if (sourceId && targetId) {
completeEdges.push({
...edge,
source: sourceId,
target: targetId,
})
logger.debug(`Prepared edge: ${sourceId} -> ${targetId}`)
} else {
logger.warn(`Skipping edge - missing blocks: ${edge.source} -> ${edge.target}`)
}
}
// Apply auto layout after a delay
setTimeout(() => {
logger.debug('Applying auto layout')
workflowActions.applyAutoLayout()
}, 800)
// Create complete workflow state with values already set in subBlocks
logger.info('Creating complete workflow state with embedded values...')
// Merge subblock values directly into block subBlocks
for (const [blockId, blockData] of Object.entries(completeBlocks)) {
const blockValues = completeSubBlockValues[blockId] || {}
// Update subBlock values in place
for (const [subBlockId, subBlockData] of Object.entries(blockData.subBlocks || {})) {
if (blockValues[subBlockId] !== undefined && blockValues[subBlockId] !== null) {
(subBlockData as any).value = blockValues[subBlockId]
logger.debug(`Embedded value in block: ${blockId}.${subBlockId} = ${blockValues[subBlockId]}`)
}
}
}
const summary = `Successfully imported ${actualBlocksCreated} new blocks and ${edgesCreated} connections`
logger.info(summary)
// Create final workflow state
const completeWorkflowState = {
blocks: completeBlocks,
edges: completeEdges,
loops: {},
parallels: {},
lastSaved: Date.now(),
isDeployed: false,
deployedAt: undefined,
deploymentStatuses: {},
hasActiveSchedule: false,
hasActiveWebhook: false,
}
// Save directly to database via API
logger.info('Saving complete workflow state directly to database...')
const response = await fetch(`/api/workflows/${activeWorkflowId}/state`, {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(completeWorkflowState),
})
if (!response.ok) {
const errorData = await response.json()
logger.error('Failed to save workflow state:', errorData.error)
return {
success: false,
errors: [`Database save failed: ${errorData.error || 'Unknown error'}`],
warnings,
}
}
const saveResponse = await response.json()
logger.info('Successfully saved to database:', saveResponse)
// Update local state for immediate UI display
logger.info('Updating local state for immediate display...')
useWorkflowStore.setState(completeWorkflowState)
// Set subblock values in local store
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[activeWorkflowId]: completeSubBlockValues,
},
}))
// Brief delay for UI to update
await new Promise((resolve) => setTimeout(resolve, 100))
// Apply auto layout
logger.info('Applying auto layout...')
workflowActions.applyAutoLayout()
const totalBlocksCreated = Object.keys(completeBlocks).length - (existingStarterBlocks.length > 0 ? 1 : 0)
logger.info(`Successfully imported workflow: ${totalBlocksCreated} blocks created, ${completeEdges.length} edges, values set for ${Object.keys(completeSubBlockValues).length} blocks`)
return {
success: true,
errors: [],
warnings,
summary,
summary: `Imported ${totalBlocksCreated} new blocks and ${completeEdges.length} connections. ${
existingStarterBlocks.length > 0 ? 'Updated existing starter block.' : 'Created new starter block.'
}`,
}
} catch (error) {
const errorMessage = `Import failed: ${error instanceof Error ? error.message : 'Unknown error'}`
logger.error(errorMessage, error)
logger.error('YAML import failed:', error)
return {
success: false,
errors: [errorMessage],
errors: [`Import failed: ${error instanceof Error ? error.message : 'Unknown error'}`],
warnings: [],
}
}