mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-15 01:47:59 -05:00
Compare commits
3 Commits
improvemen
...
fix/batch-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a0a36b26d | ||
|
|
59a516062d | ||
|
|
aca3b8b880 |
@@ -172,7 +172,7 @@ async function executeWebhookJobInternal(
|
||||
const workflowVariables = (wfRows[0]?.variables as Record<string, any>) || {}
|
||||
|
||||
// Merge subblock states (matching workflow-execution pattern)
|
||||
const mergedStates = mergeSubblockState(blocks, {})
|
||||
const mergedStates = mergeSubblockState(blocks)
|
||||
|
||||
// Create serialized workflow
|
||||
const serializer = new Serializer()
|
||||
|
||||
80
apps/sim/lib/workflows/subblocks.ts
Normal file
80
apps/sim/lib/workflows/subblocks.ts
Normal file
@@ -0,0 +1,80 @@
|
||||
import type { BlockState, SubBlockState } from '@/stores/workflows/workflow/types'
|
||||
|
||||
export const DEFAULT_SUBBLOCK_TYPE = 'short-input'
|
||||
|
||||
/**
|
||||
* Merges subblock values into the provided subblock structures.
|
||||
* Falls back to a default subblock shape when a value has no structure.
|
||||
* @param subBlocks - Existing subblock definitions from the workflow
|
||||
* @param values - Stored subblock values keyed by subblock id
|
||||
* @returns Merged subblock structures with updated values
|
||||
*/
|
||||
export function mergeSubBlockValues(
|
||||
subBlocks: Record<string, unknown> | undefined,
|
||||
values: Record<string, unknown> | undefined
|
||||
): Record<string, unknown> {
|
||||
const merged = { ...(subBlocks || {}) } as Record<string, any>
|
||||
|
||||
if (!values) return merged
|
||||
|
||||
Object.entries(values).forEach(([subBlockId, value]) => {
|
||||
if (merged[subBlockId] && typeof merged[subBlockId] === 'object') {
|
||||
merged[subBlockId] = {
|
||||
...(merged[subBlockId] as Record<string, unknown>),
|
||||
value,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
merged[subBlockId] = {
|
||||
id: subBlockId,
|
||||
type: DEFAULT_SUBBLOCK_TYPE,
|
||||
value,
|
||||
}
|
||||
})
|
||||
|
||||
return merged
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges workflow block states with explicit subblock values while maintaining block structure.
|
||||
* Values that are null or undefined do not override existing subblock values.
|
||||
* @param blocks - Block configurations from workflow state
|
||||
* @param subBlockValues - Subblock values keyed by blockId -> subBlockId -> value
|
||||
* @param blockId - Optional specific block ID to merge (merges all if not provided)
|
||||
* @returns Merged block states with updated subblocks
|
||||
*/
|
||||
export function mergeSubblockStateWithValues(
|
||||
blocks: Record<string, BlockState>,
|
||||
subBlockValues: Record<string, Record<string, unknown>> = {},
|
||||
blockId?: string
|
||||
): Record<string, BlockState> {
|
||||
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
|
||||
|
||||
return Object.entries(blocksToProcess).reduce(
|
||||
(acc, [id, block]) => {
|
||||
if (!block) {
|
||||
return acc
|
||||
}
|
||||
|
||||
const blockSubBlocks = block.subBlocks || {}
|
||||
const blockValues = subBlockValues[id] || {}
|
||||
const filteredValues = Object.fromEntries(
|
||||
Object.entries(blockValues).filter(([, value]) => value !== null && value !== undefined)
|
||||
)
|
||||
|
||||
const mergedSubBlocks = mergeSubBlockValues(blockSubBlocks, filteredValues) as Record<
|
||||
string,
|
||||
SubBlockState
|
||||
>
|
||||
|
||||
acc[id] = {
|
||||
...block,
|
||||
subBlocks: mergedSubBlocks,
|
||||
}
|
||||
|
||||
return acc
|
||||
},
|
||||
{} as Record<string, BlockState>
|
||||
)
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import postgres from 'postgres'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { cleanupExternalWebhook } from '@/lib/webhooks/provider-subscriptions'
|
||||
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
import { mergeSubBlockValues } from '@/lib/workflows/subblocks'
|
||||
import {
|
||||
BLOCK_OPERATIONS,
|
||||
BLOCKS_OPERATIONS,
|
||||
@@ -455,7 +456,7 @@ async function handleBlocksOperationTx(
|
||||
}
|
||||
|
||||
case BLOCKS_OPERATIONS.BATCH_ADD_BLOCKS: {
|
||||
const { blocks, edges, loops, parallels } = payload
|
||||
const { blocks, edges, loops, parallels, subBlockValues } = payload
|
||||
|
||||
logger.info(`Batch adding blocks to workflow ${workflowId}`, {
|
||||
blockCount: blocks?.length || 0,
|
||||
@@ -465,22 +466,30 @@ async function handleBlocksOperationTx(
|
||||
})
|
||||
|
||||
if (blocks && blocks.length > 0) {
|
||||
const blockValues = blocks.map((block: Record<string, unknown>) => ({
|
||||
id: block.id as string,
|
||||
workflowId,
|
||||
type: block.type as string,
|
||||
name: block.name as string,
|
||||
positionX: (block.position as { x: number; y: number }).x,
|
||||
positionY: (block.position as { x: number; y: number }).y,
|
||||
data: (block.data as Record<string, unknown>) || {},
|
||||
subBlocks: (block.subBlocks as Record<string, unknown>) || {},
|
||||
outputs: (block.outputs as Record<string, unknown>) || {},
|
||||
enabled: (block.enabled as boolean) ?? true,
|
||||
horizontalHandles: (block.horizontalHandles as boolean) ?? true,
|
||||
advancedMode: (block.advancedMode as boolean) ?? false,
|
||||
triggerMode: (block.triggerMode as boolean) ?? false,
|
||||
height: (block.height as number) || 0,
|
||||
}))
|
||||
const blockValues = blocks.map((block: Record<string, unknown>) => {
|
||||
const blockId = block.id as string
|
||||
const mergedSubBlocks = mergeSubBlockValues(
|
||||
block.subBlocks as Record<string, unknown>,
|
||||
subBlockValues?.[blockId]
|
||||
)
|
||||
|
||||
return {
|
||||
id: blockId,
|
||||
workflowId,
|
||||
type: block.type as string,
|
||||
name: block.name as string,
|
||||
positionX: (block.position as { x: number; y: number }).x,
|
||||
positionY: (block.position as { x: number; y: number }).y,
|
||||
data: (block.data as Record<string, unknown>) || {},
|
||||
subBlocks: mergedSubBlocks,
|
||||
outputs: (block.outputs as Record<string, unknown>) || {},
|
||||
enabled: (block.enabled as boolean) ?? true,
|
||||
horizontalHandles: (block.horizontalHandles as boolean) ?? true,
|
||||
advancedMode: (block.advancedMode as boolean) ?? false,
|
||||
triggerMode: (block.triggerMode as boolean) ?? false,
|
||||
height: (block.height as number) || 0,
|
||||
}
|
||||
})
|
||||
|
||||
await tx.insert(workflowBlocks).values(blockValues)
|
||||
|
||||
|
||||
@@ -8,7 +8,8 @@
|
||||
* or React hooks, making it safe for use in Next.js API routes.
|
||||
*/
|
||||
|
||||
import type { BlockState, SubBlockState } from '@/stores/workflows/workflow/types'
|
||||
import { mergeSubblockStateWithValues } from '@/lib/workflows/subblocks'
|
||||
import type { BlockState } from '@/stores/workflows/workflow/types'
|
||||
|
||||
/**
|
||||
* Server-safe version of mergeSubblockState for API routes
|
||||
@@ -26,72 +27,7 @@ export function mergeSubblockState(
|
||||
subBlockValues: Record<string, Record<string, any>> = {},
|
||||
blockId?: string
|
||||
): Record<string, BlockState> {
|
||||
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
|
||||
|
||||
return Object.entries(blocksToProcess).reduce(
|
||||
(acc, [id, block]) => {
|
||||
// Skip if block is undefined
|
||||
if (!block) {
|
||||
return acc
|
||||
}
|
||||
|
||||
// Initialize subBlocks if not present
|
||||
const blockSubBlocks = block.subBlocks || {}
|
||||
|
||||
// Get stored values for this block
|
||||
const blockValues = subBlockValues[id] || {}
|
||||
|
||||
// Create a deep copy of the block's subBlocks to maintain structure
|
||||
const mergedSubBlocks = Object.entries(blockSubBlocks).reduce(
|
||||
(subAcc, [subBlockId, subBlock]) => {
|
||||
// Skip if subBlock is undefined
|
||||
if (!subBlock) {
|
||||
return subAcc
|
||||
}
|
||||
|
||||
// Get the stored value for this subblock
|
||||
const storedValue = blockValues[subBlockId]
|
||||
|
||||
// Create a new subblock object with the same structure but updated value
|
||||
subAcc[subBlockId] = {
|
||||
...subBlock,
|
||||
value: storedValue !== undefined && storedValue !== null ? storedValue : subBlock.value,
|
||||
}
|
||||
|
||||
return subAcc
|
||||
},
|
||||
{} as Record<string, SubBlockState>
|
||||
)
|
||||
|
||||
// Return the full block state with updated subBlocks
|
||||
acc[id] = {
|
||||
...block,
|
||||
subBlocks: mergedSubBlocks,
|
||||
}
|
||||
|
||||
// Add any values that exist in the provided values but aren't in the block structure
|
||||
// This handles cases where block config has been updated but values still exist
|
||||
Object.entries(blockValues).forEach(([subBlockId, value]) => {
|
||||
if (!mergedSubBlocks[subBlockId] && value !== null && value !== undefined) {
|
||||
// Create a minimal subblock structure
|
||||
mergedSubBlocks[subBlockId] = {
|
||||
id: subBlockId,
|
||||
type: 'short-input', // Default type that's safe to use
|
||||
value: value,
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// Update the block with the final merged subBlocks (including orphaned values)
|
||||
acc[id] = {
|
||||
...block,
|
||||
subBlocks: mergedSubBlocks,
|
||||
}
|
||||
|
||||
return acc
|
||||
},
|
||||
{} as Record<string, BlockState>
|
||||
)
|
||||
return mergeSubblockStateWithValues(blocks, subBlockValues, blockId)
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,20 +1,7 @@
|
||||
import type { Edge } from 'reactflow'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
|
||||
export function filterNewEdges(edgesToAdd: Edge[], currentEdges: Edge[]): Edge[] {
|
||||
return edgesToAdd.filter((edge) => {
|
||||
if (edge.source === edge.target) return false
|
||||
return !currentEdges.some(
|
||||
(e) =>
|
||||
e.source === edge.source &&
|
||||
e.sourceHandle === edge.sourceHandle &&
|
||||
e.target === edge.target &&
|
||||
e.targetHandle === edge.targetHandle
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
|
||||
import { mergeSubBlockValues, mergeSubblockStateWithValues } from '@/lib/workflows/subblocks'
|
||||
import { getBlock } from '@/blocks'
|
||||
import { normalizeName } from '@/executor/constants'
|
||||
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
|
||||
@@ -32,6 +19,19 @@ const WEBHOOK_SUBBLOCK_FIELDS = ['webhookId', 'triggerPath']
|
||||
|
||||
export { normalizeName }
|
||||
|
||||
export function filterNewEdges(edgesToAdd: Edge[], currentEdges: Edge[]): Edge[] {
|
||||
return edgesToAdd.filter((edge) => {
|
||||
if (edge.source === edge.target) return false
|
||||
return !currentEdges.some(
|
||||
(e) =>
|
||||
e.source === edge.source &&
|
||||
e.sourceHandle === edge.sourceHandle &&
|
||||
e.target === edge.target &&
|
||||
e.targetHandle === edge.targetHandle
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
export interface RegeneratedState {
|
||||
blocks: Record<string, BlockState>
|
||||
edges: Edge[]
|
||||
@@ -201,27 +201,20 @@ export function prepareDuplicateBlockState(options: PrepareDuplicateBlockStateOp
|
||||
Object.entries(subBlockValues).filter(([key]) => !WEBHOOK_SUBBLOCK_FIELDS.includes(key))
|
||||
)
|
||||
|
||||
const mergedSubBlocks: Record<string, SubBlockState> = sourceBlock.subBlocks
|
||||
const baseSubBlocks: Record<string, SubBlockState> = sourceBlock.subBlocks
|
||||
? JSON.parse(JSON.stringify(sourceBlock.subBlocks))
|
||||
: {}
|
||||
|
||||
WEBHOOK_SUBBLOCK_FIELDS.forEach((field) => {
|
||||
if (field in mergedSubBlocks) {
|
||||
delete mergedSubBlocks[field]
|
||||
if (field in baseSubBlocks) {
|
||||
delete baseSubBlocks[field]
|
||||
}
|
||||
})
|
||||
|
||||
Object.entries(filteredSubBlockValues).forEach(([subblockId, value]) => {
|
||||
if (mergedSubBlocks[subblockId]) {
|
||||
mergedSubBlocks[subblockId].value = value as SubBlockState['value']
|
||||
} else {
|
||||
mergedSubBlocks[subblockId] = {
|
||||
id: subblockId,
|
||||
type: 'short-input',
|
||||
value: value as SubBlockState['value'],
|
||||
}
|
||||
}
|
||||
})
|
||||
const mergedSubBlocks = mergeSubBlockValues(baseSubBlocks, filteredSubBlockValues) as Record<
|
||||
string,
|
||||
SubBlockState
|
||||
>
|
||||
|
||||
const block: BlockState = {
|
||||
id: newId,
|
||||
@@ -256,11 +249,16 @@ export function mergeSubblockState(
|
||||
workflowId?: string,
|
||||
blockId?: string
|
||||
): Record<string, BlockState> {
|
||||
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
|
||||
const subBlockStore = useSubBlockStore.getState()
|
||||
|
||||
const workflowSubblockValues = workflowId ? subBlockStore.workflowValues[workflowId] || {} : {}
|
||||
|
||||
if (workflowId) {
|
||||
return mergeSubblockStateWithValues(blocks, workflowSubblockValues, blockId)
|
||||
}
|
||||
|
||||
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
|
||||
|
||||
return Object.entries(blocksToProcess).reduce(
|
||||
(acc, [id, block]) => {
|
||||
if (!block) {
|
||||
@@ -339,9 +337,15 @@ export async function mergeSubblockStateAsync(
|
||||
workflowId?: string,
|
||||
blockId?: string
|
||||
): Promise<Record<string, BlockState>> {
|
||||
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
|
||||
const subBlockStore = useSubBlockStore.getState()
|
||||
|
||||
if (workflowId) {
|
||||
const workflowValues = subBlockStore.workflowValues[workflowId] || {}
|
||||
return mergeSubblockStateWithValues(blocks, workflowValues, blockId)
|
||||
}
|
||||
|
||||
const blocksToProcess = blockId ? { [blockId]: blocks[blockId] } : blocks
|
||||
|
||||
// Process blocks in parallel for better performance
|
||||
const processedBlockEntries = await Promise.all(
|
||||
Object.entries(blocksToProcess).map(async ([id, block]) => {
|
||||
@@ -358,16 +362,7 @@ export async function mergeSubblockStateAsync(
|
||||
return null
|
||||
}
|
||||
|
||||
let storedValue = null
|
||||
|
||||
if (workflowId) {
|
||||
const workflowValues = subBlockStore.workflowValues[workflowId]
|
||||
if (workflowValues?.[id]) {
|
||||
storedValue = workflowValues[id][subBlockId]
|
||||
}
|
||||
} else {
|
||||
storedValue = subBlockStore.getValue(id, subBlockId)
|
||||
}
|
||||
const storedValue = subBlockStore.getValue(id, subBlockId)
|
||||
|
||||
return [
|
||||
subBlockId,
|
||||
@@ -386,23 +381,6 @@ export async function mergeSubblockStateAsync(
|
||||
subBlockEntries.filter((entry): entry is readonly [string, SubBlockState] => entry !== null)
|
||||
) as Record<string, SubBlockState>
|
||||
|
||||
// Add any values that exist in the store but aren't in the block structure
|
||||
// This handles cases where block config has been updated but values still exist
|
||||
// IMPORTANT: This includes runtime subblock IDs like webhookId, triggerPath, etc.
|
||||
if (workflowId) {
|
||||
const workflowValues = subBlockStore.workflowValues[workflowId]
|
||||
const blockValues = workflowValues?.[id] || {}
|
||||
Object.entries(blockValues).forEach(([subBlockId, value]) => {
|
||||
if (!mergedSubBlocks[subBlockId] && value !== null && value !== undefined) {
|
||||
mergedSubBlocks[subBlockId] = {
|
||||
id: subBlockId,
|
||||
type: 'short-input',
|
||||
value: value as SubBlockState['value'],
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Return the full block state with updated subBlocks (including orphaned values)
|
||||
return [
|
||||
id,
|
||||
|
||||
@@ -639,7 +639,8 @@ export const useWorkflowStore = create<WorkflowStore>()(
|
||||
|
||||
const newName = getUniqueBlockName(block.name, get().blocks)
|
||||
|
||||
const mergedBlock = mergeSubblockState(get().blocks, id)[id]
|
||||
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
|
||||
const mergedBlock = mergeSubblockState(get().blocks, activeWorkflowId || undefined, id)[id]
|
||||
|
||||
const newSubBlocks = Object.entries(mergedBlock.subBlocks).reduce(
|
||||
(acc, [subId, subBlock]) => ({
|
||||
@@ -668,7 +669,6 @@ export const useWorkflowStore = create<WorkflowStore>()(
|
||||
parallels: get().generateParallelBlocks(),
|
||||
}
|
||||
|
||||
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
|
||||
if (activeWorkflowId) {
|
||||
const subBlockValues =
|
||||
useSubBlockStore.getState().workflowValues[activeWorkflowId]?.[id] || {}
|
||||
|
||||
Reference in New Issue
Block a user