mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-22 21:38:05 -05:00
Compare commits
6 Commits
main
...
fix/idempo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e09e525ea2 | ||
|
|
ae113f76fd | ||
|
|
a160c8e145 | ||
|
|
03bb2d2763 | ||
|
|
e0d301aca7 | ||
|
|
aa99db6fdd |
@@ -1312,15 +1312,16 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
|
||||
if (currentLoop && isLoopBlock) {
|
||||
containingLoopBlockId = blockId
|
||||
const loopType = currentLoop.loopType || 'for'
|
||||
const contextualTags: string[] = ['index']
|
||||
if (loopType === 'forEach') {
|
||||
contextualTags.push('currentItem')
|
||||
contextualTags.push('items')
|
||||
}
|
||||
|
||||
const loopBlock = blocks[blockId]
|
||||
if (loopBlock) {
|
||||
const loopBlockName = loopBlock.name || loopBlock.type
|
||||
const normalizedLoopName = normalizeName(loopBlockName)
|
||||
const contextualTags: string[] = [`${normalizedLoopName}.index`]
|
||||
if (loopType === 'forEach') {
|
||||
contextualTags.push(`${normalizedLoopName}.currentItem`)
|
||||
contextualTags.push(`${normalizedLoopName}.items`)
|
||||
}
|
||||
|
||||
loopBlockGroup = {
|
||||
blockName: loopBlockName,
|
||||
@@ -1328,21 +1329,23 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
|
||||
blockType: 'loop',
|
||||
tags: contextualTags,
|
||||
distance: 0,
|
||||
isContextual: true,
|
||||
}
|
||||
}
|
||||
} else if (containingLoop) {
|
||||
const [loopId, loop] = containingLoop
|
||||
containingLoopBlockId = loopId
|
||||
const loopType = loop.loopType || 'for'
|
||||
const contextualTags: string[] = ['index']
|
||||
if (loopType === 'forEach') {
|
||||
contextualTags.push('currentItem')
|
||||
contextualTags.push('items')
|
||||
}
|
||||
|
||||
const containingLoopBlock = blocks[loopId]
|
||||
if (containingLoopBlock) {
|
||||
const loopBlockName = containingLoopBlock.name || containingLoopBlock.type
|
||||
const normalizedLoopName = normalizeName(loopBlockName)
|
||||
const contextualTags: string[] = [`${normalizedLoopName}.index`]
|
||||
if (loopType === 'forEach') {
|
||||
contextualTags.push(`${normalizedLoopName}.currentItem`)
|
||||
contextualTags.push(`${normalizedLoopName}.items`)
|
||||
}
|
||||
|
||||
loopBlockGroup = {
|
||||
blockName: loopBlockName,
|
||||
@@ -1350,6 +1353,7 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
|
||||
blockType: 'loop',
|
||||
tags: contextualTags,
|
||||
distance: 0,
|
||||
isContextual: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1363,15 +1367,16 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
|
||||
const [parallelId, parallel] = containingParallel
|
||||
containingParallelBlockId = parallelId
|
||||
const parallelType = parallel.parallelType || 'count'
|
||||
const contextualTags: string[] = ['index']
|
||||
if (parallelType === 'collection') {
|
||||
contextualTags.push('currentItem')
|
||||
contextualTags.push('items')
|
||||
}
|
||||
|
||||
const containingParallelBlock = blocks[parallelId]
|
||||
if (containingParallelBlock) {
|
||||
const parallelBlockName = containingParallelBlock.name || containingParallelBlock.type
|
||||
const normalizedParallelName = normalizeName(parallelBlockName)
|
||||
const contextualTags: string[] = [`${normalizedParallelName}.index`]
|
||||
if (parallelType === 'collection') {
|
||||
contextualTags.push(`${normalizedParallelName}.currentItem`)
|
||||
contextualTags.push(`${normalizedParallelName}.items`)
|
||||
}
|
||||
|
||||
parallelBlockGroup = {
|
||||
blockName: parallelBlockName,
|
||||
@@ -1379,6 +1384,7 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
|
||||
blockType: 'parallel',
|
||||
tags: contextualTags,
|
||||
distance: 0,
|
||||
isContextual: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1645,38 +1651,29 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
|
||||
const nestedBlockTagGroups: NestedBlockTagGroup[] = useMemo(() => {
|
||||
return filteredBlockTagGroups.map((group: BlockTagGroup) => {
|
||||
const normalizedBlockName = normalizeName(group.blockName)
|
||||
|
||||
// Handle loop/parallel contextual tags (index, currentItem, items)
|
||||
const directTags: NestedTag[] = []
|
||||
const tagsForTree: string[] = []
|
||||
|
||||
group.tags.forEach((tag: string) => {
|
||||
const tagParts = tag.split('.')
|
||||
|
||||
// Loop/parallel contextual tags without block prefix
|
||||
if (
|
||||
(group.blockType === 'loop' || group.blockType === 'parallel') &&
|
||||
tagParts.length === 1
|
||||
) {
|
||||
if (tagParts.length === 1) {
|
||||
directTags.push({
|
||||
key: tag,
|
||||
display: tag,
|
||||
fullTag: tag,
|
||||
})
|
||||
} else if (tagParts.length === 2) {
|
||||
// Direct property like blockname.property
|
||||
directTags.push({
|
||||
key: tagParts[1],
|
||||
display: tagParts[1],
|
||||
fullTag: tag,
|
||||
})
|
||||
} else {
|
||||
// Nested property - add to tree builder
|
||||
tagsForTree.push(tag)
|
||||
}
|
||||
})
|
||||
|
||||
// Build recursive tree from nested tags
|
||||
const nestedTags = [...directTags, ...buildNestedTagTree(tagsForTree, normalizedBlockName)]
|
||||
|
||||
return {
|
||||
@@ -1800,13 +1797,19 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
|
||||
processedTag = tag
|
||||
}
|
||||
} else if (
|
||||
blockGroup &&
|
||||
blockGroup?.isContextual &&
|
||||
(blockGroup.blockType === 'loop' || blockGroup.blockType === 'parallel')
|
||||
) {
|
||||
if (!tag.includes('.') && ['index', 'currentItem', 'items'].includes(tag)) {
|
||||
processedTag = `${blockGroup.blockType}.${tag}`
|
||||
const tagParts = tag.split('.')
|
||||
if (tagParts.length === 1) {
|
||||
processedTag = blockGroup.blockType
|
||||
} else {
|
||||
processedTag = tag
|
||||
const lastPart = tagParts[tagParts.length - 1]
|
||||
if (['index', 'currentItem', 'items'].includes(lastPart)) {
|
||||
processedTag = `${blockGroup.blockType}.${lastPart}`
|
||||
} else {
|
||||
processedTag = tag
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,8 @@ export interface BlockTagGroup {
|
||||
blockType: string
|
||||
tags: string[]
|
||||
distance: number
|
||||
/** True if this is a contextual group (loop/parallel iteration context available inside the subflow) */
|
||||
isContextual?: boolean
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -120,6 +120,12 @@ export const SPECIAL_REFERENCE_PREFIXES = [
|
||||
REFERENCE.PREFIX.VARIABLE,
|
||||
] as const
|
||||
|
||||
export const RESERVED_BLOCK_NAMES = [
|
||||
REFERENCE.PREFIX.LOOP,
|
||||
REFERENCE.PREFIX.PARALLEL,
|
||||
REFERENCE.PREFIX.VARIABLE,
|
||||
] as const
|
||||
|
||||
export const LOOP_REFERENCE = {
|
||||
ITERATION: 'iteration',
|
||||
INDEX: 'index',
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { loggerMock } from '@sim/testing'
|
||||
import { describe, expect, it, vi } from 'vitest'
|
||||
import type { LoopScope } from '@/executor/execution/state'
|
||||
import { InvalidFieldError } from '@/executor/utils/block-reference'
|
||||
import { LoopResolver } from './loop'
|
||||
import type { ResolutionContext } from './reference'
|
||||
|
||||
@@ -62,7 +63,12 @@ function createTestContext(
|
||||
|
||||
describe('LoopResolver', () => {
|
||||
describe('canResolve', () => {
|
||||
it.concurrent('should return true for loop references', () => {
|
||||
it.concurrent('should return true for bare loop reference', () => {
|
||||
const resolver = new LoopResolver(createTestWorkflow())
|
||||
expect(resolver.canResolve('<loop>')).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should return true for known loop properties', () => {
|
||||
const resolver = new LoopResolver(createTestWorkflow())
|
||||
expect(resolver.canResolve('<loop.index>')).toBe(true)
|
||||
expect(resolver.canResolve('<loop.iteration>')).toBe(true)
|
||||
@@ -78,6 +84,13 @@ describe('LoopResolver', () => {
|
||||
expect(resolver.canResolve('<loop.items.0>')).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should return true for unknown loop properties (validates in resolve)', () => {
|
||||
const resolver = new LoopResolver(createTestWorkflow())
|
||||
expect(resolver.canResolve('<loop.results>')).toBe(true)
|
||||
expect(resolver.canResolve('<loop.output>')).toBe(true)
|
||||
expect(resolver.canResolve('<loop.unknownProperty>')).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should return false for non-loop references', () => {
|
||||
const resolver = new LoopResolver(createTestWorkflow())
|
||||
expect(resolver.canResolve('<block.output>')).toBe(false)
|
||||
@@ -181,20 +194,34 @@ describe('LoopResolver', () => {
|
||||
})
|
||||
|
||||
describe('edge cases', () => {
|
||||
it.concurrent('should return undefined for invalid loop reference (missing property)', () => {
|
||||
it.concurrent('should return context object for bare loop reference', () => {
|
||||
const resolver = new LoopResolver(createTestWorkflow())
|
||||
const loopScope = createLoopScope({ iteration: 0 })
|
||||
const loopScope = createLoopScope({ iteration: 2, item: 'test', items: ['a', 'b', 'c'] })
|
||||
const ctx = createTestContext('block-1', loopScope)
|
||||
|
||||
expect(resolver.resolve('<loop>', ctx)).toBeUndefined()
|
||||
expect(resolver.resolve('<loop>', ctx)).toEqual({
|
||||
index: 2,
|
||||
currentItem: 'test',
|
||||
items: ['a', 'b', 'c'],
|
||||
})
|
||||
})
|
||||
|
||||
it.concurrent('should return undefined for unknown loop property', () => {
|
||||
it.concurrent('should return minimal context object for for-loop (no items)', () => {
|
||||
const resolver = new LoopResolver(createTestWorkflow())
|
||||
const loopScope = createLoopScope({ iteration: 5 })
|
||||
const ctx = createTestContext('block-1', loopScope)
|
||||
|
||||
expect(resolver.resolve('<loop>', ctx)).toEqual({
|
||||
index: 5,
|
||||
})
|
||||
})
|
||||
|
||||
it.concurrent('should throw InvalidFieldError for unknown loop property', () => {
|
||||
const resolver = new LoopResolver(createTestWorkflow())
|
||||
const loopScope = createLoopScope({ iteration: 0 })
|
||||
const ctx = createTestContext('block-1', loopScope)
|
||||
|
||||
expect(resolver.resolve('<loop.unknownProperty>', ctx)).toBeUndefined()
|
||||
expect(() => resolver.resolve('<loop.unknownProperty>', ctx)).toThrow(InvalidFieldError)
|
||||
})
|
||||
|
||||
it.concurrent('should handle iteration index 0 correctly', () => {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { isReference, parseReferencePath, REFERENCE } from '@/executor/constants'
|
||||
import { InvalidFieldError } from '@/executor/utils/block-reference'
|
||||
import { extractBaseBlockId } from '@/executor/utils/subflow-utils'
|
||||
import {
|
||||
navigatePath,
|
||||
@@ -13,6 +14,8 @@ const logger = createLogger('LoopResolver')
|
||||
export class LoopResolver implements Resolver {
|
||||
constructor(private workflow: SerializedWorkflow) {}
|
||||
|
||||
private static KNOWN_PROPERTIES = ['iteration', 'index', 'item', 'currentItem', 'items']
|
||||
|
||||
canResolve(reference: string): boolean {
|
||||
if (!isReference(reference)) {
|
||||
return false
|
||||
@@ -27,16 +30,15 @@ export class LoopResolver implements Resolver {
|
||||
|
||||
resolve(reference: string, context: ResolutionContext): any {
|
||||
const parts = parseReferencePath(reference)
|
||||
if (parts.length < 2) {
|
||||
logger.warn('Invalid loop reference - missing property', { reference })
|
||||
if (parts.length === 0) {
|
||||
logger.warn('Invalid loop reference', { reference })
|
||||
return undefined
|
||||
}
|
||||
|
||||
const [_, property, ...pathParts] = parts
|
||||
const loopId = this.findLoopForBlock(context.currentNodeId)
|
||||
let loopScope = context.loopScope
|
||||
|
||||
if (!loopScope) {
|
||||
const loopId = this.findLoopForBlock(context.currentNodeId)
|
||||
if (!loopId) {
|
||||
return undefined
|
||||
}
|
||||
@@ -48,6 +50,27 @@ export class LoopResolver implements Resolver {
|
||||
return undefined
|
||||
}
|
||||
|
||||
const isForEach = loopId ? this.isForEachLoop(loopId) : loopScope.items !== undefined
|
||||
|
||||
if (parts.length === 1) {
|
||||
const result: Record<string, any> = {
|
||||
index: loopScope.iteration,
|
||||
}
|
||||
if (loopScope.item !== undefined) {
|
||||
result.currentItem = loopScope.item
|
||||
}
|
||||
if (loopScope.items !== undefined) {
|
||||
result.items = loopScope.items
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
const [_, property, ...pathParts] = parts
|
||||
if (!LoopResolver.KNOWN_PROPERTIES.includes(property)) {
|
||||
const availableFields = isForEach ? ['index', 'currentItem', 'items'] : ['index']
|
||||
throw new InvalidFieldError('loop', property, availableFields)
|
||||
}
|
||||
|
||||
let value: any
|
||||
switch (property) {
|
||||
case 'iteration':
|
||||
@@ -61,12 +84,8 @@ export class LoopResolver implements Resolver {
|
||||
case 'items':
|
||||
value = loopScope.items
|
||||
break
|
||||
default:
|
||||
logger.warn('Unknown loop property', { property })
|
||||
return undefined
|
||||
}
|
||||
|
||||
// If there are additional path parts, navigate deeper
|
||||
if (pathParts.length > 0) {
|
||||
return navigatePath(value, pathParts)
|
||||
}
|
||||
@@ -85,4 +104,9 @@ export class LoopResolver implements Resolver {
|
||||
|
||||
return undefined
|
||||
}
|
||||
|
||||
private isForEachLoop(loopId: string): boolean {
|
||||
const loopConfig = this.workflow.loops?.[loopId]
|
||||
return loopConfig?.loopType === 'forEach'
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { loggerMock } from '@sim/testing'
|
||||
import { describe, expect, it, vi } from 'vitest'
|
||||
import { InvalidFieldError } from '@/executor/utils/block-reference'
|
||||
import { ParallelResolver } from './parallel'
|
||||
import type { ResolutionContext } from './reference'
|
||||
|
||||
@@ -81,7 +82,12 @@ function createTestContext(
|
||||
|
||||
describe('ParallelResolver', () => {
|
||||
describe('canResolve', () => {
|
||||
it.concurrent('should return true for parallel references', () => {
|
||||
it.concurrent('should return true for bare parallel reference', () => {
|
||||
const resolver = new ParallelResolver(createTestWorkflow())
|
||||
expect(resolver.canResolve('<parallel>')).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should return true for known parallel properties', () => {
|
||||
const resolver = new ParallelResolver(createTestWorkflow())
|
||||
expect(resolver.canResolve('<parallel.index>')).toBe(true)
|
||||
expect(resolver.canResolve('<parallel.currentItem>')).toBe(true)
|
||||
@@ -94,6 +100,16 @@ describe('ParallelResolver', () => {
|
||||
expect(resolver.canResolve('<parallel.items.0>')).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent(
|
||||
'should return true for unknown parallel properties (validates in resolve)',
|
||||
() => {
|
||||
const resolver = new ParallelResolver(createTestWorkflow())
|
||||
expect(resolver.canResolve('<parallel.results>')).toBe(true)
|
||||
expect(resolver.canResolve('<parallel.output>')).toBe(true)
|
||||
expect(resolver.canResolve('<parallel.unknownProperty>')).toBe(true)
|
||||
}
|
||||
)
|
||||
|
||||
it.concurrent('should return false for non-parallel references', () => {
|
||||
const resolver = new ParallelResolver(createTestWorkflow())
|
||||
expect(resolver.canResolve('<block.output>')).toBe(false)
|
||||
@@ -254,24 +270,40 @@ describe('ParallelResolver', () => {
|
||||
})
|
||||
|
||||
describe('edge cases', () => {
|
||||
it.concurrent(
|
||||
'should return undefined for invalid parallel reference (missing property)',
|
||||
() => {
|
||||
const resolver = new ParallelResolver(createTestWorkflow())
|
||||
const ctx = createTestContext('block-1₍0₎')
|
||||
it.concurrent('should return context object for bare parallel reference', () => {
|
||||
const workflow = createTestWorkflow({
|
||||
'parallel-1': { nodes: ['block-1'], distribution: ['a', 'b', 'c'] },
|
||||
})
|
||||
const resolver = new ParallelResolver(workflow)
|
||||
const ctx = createTestContext('block-1₍1₎')
|
||||
|
||||
expect(resolver.resolve('<parallel>', ctx)).toBeUndefined()
|
||||
}
|
||||
)
|
||||
expect(resolver.resolve('<parallel>', ctx)).toEqual({
|
||||
index: 1,
|
||||
currentItem: 'b',
|
||||
items: ['a', 'b', 'c'],
|
||||
})
|
||||
})
|
||||
|
||||
it.concurrent('should return undefined for unknown parallel property', () => {
|
||||
it.concurrent('should return minimal context object when no distribution', () => {
|
||||
const workflow = createTestWorkflow({
|
||||
'parallel-1': { nodes: ['block-1'] },
|
||||
})
|
||||
const resolver = new ParallelResolver(workflow)
|
||||
const ctx = createTestContext('block-1₍0₎')
|
||||
|
||||
const result = resolver.resolve('<parallel>', ctx)
|
||||
expect(result).toHaveProperty('index', 0)
|
||||
expect(result).toHaveProperty('items')
|
||||
})
|
||||
|
||||
it.concurrent('should throw InvalidFieldError for unknown parallel property', () => {
|
||||
const workflow = createTestWorkflow({
|
||||
'parallel-1': { nodes: ['block-1'], distribution: ['a'] },
|
||||
})
|
||||
const resolver = new ParallelResolver(workflow)
|
||||
const ctx = createTestContext('block-1₍0₎')
|
||||
|
||||
expect(resolver.resolve('<parallel.unknownProperty>', ctx)).toBeUndefined()
|
||||
expect(() => resolver.resolve('<parallel.unknownProperty>', ctx)).toThrow(InvalidFieldError)
|
||||
})
|
||||
|
||||
it.concurrent('should return undefined when block is not in any parallel', () => {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { isReference, parseReferencePath, REFERENCE } from '@/executor/constants'
|
||||
import { InvalidFieldError } from '@/executor/utils/block-reference'
|
||||
import { extractBaseBlockId, extractBranchIndex } from '@/executor/utils/subflow-utils'
|
||||
import {
|
||||
navigatePath,
|
||||
@@ -13,6 +14,8 @@ const logger = createLogger('ParallelResolver')
|
||||
export class ParallelResolver implements Resolver {
|
||||
constructor(private workflow: SerializedWorkflow) {}
|
||||
|
||||
private static KNOWN_PROPERTIES = ['index', 'currentItem', 'items']
|
||||
|
||||
canResolve(reference: string): boolean {
|
||||
if (!isReference(reference)) {
|
||||
return false
|
||||
@@ -27,12 +30,11 @@ export class ParallelResolver implements Resolver {
|
||||
|
||||
resolve(reference: string, context: ResolutionContext): any {
|
||||
const parts = parseReferencePath(reference)
|
||||
if (parts.length < 2) {
|
||||
logger.warn('Invalid parallel reference - missing property', { reference })
|
||||
if (parts.length === 0) {
|
||||
logger.warn('Invalid parallel reference', { reference })
|
||||
return undefined
|
||||
}
|
||||
|
||||
const [_, property, ...pathParts] = parts
|
||||
const parallelId = this.findParallelForBlock(context.currentNodeId)
|
||||
if (!parallelId) {
|
||||
return undefined
|
||||
@@ -49,11 +51,33 @@ export class ParallelResolver implements Resolver {
|
||||
return undefined
|
||||
}
|
||||
|
||||
// First try to get items from the parallel scope (resolved at runtime)
|
||||
// This is the same pattern as LoopResolver reading from loopScope.items
|
||||
const parallelScope = context.executionContext.parallelExecutions?.get(parallelId)
|
||||
const distributionItems = parallelScope?.items ?? this.getDistributionItems(parallelConfig)
|
||||
|
||||
if (parts.length === 1) {
|
||||
const result: Record<string, any> = {
|
||||
index: branchIndex,
|
||||
}
|
||||
if (distributionItems !== undefined) {
|
||||
result.items = distributionItems
|
||||
if (Array.isArray(distributionItems)) {
|
||||
result.currentItem = distributionItems[branchIndex]
|
||||
} else if (typeof distributionItems === 'object' && distributionItems !== null) {
|
||||
const keys = Object.keys(distributionItems)
|
||||
const key = keys[branchIndex]
|
||||
result.currentItem = key !== undefined ? distributionItems[key] : undefined
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
const [_, property, ...pathParts] = parts
|
||||
if (!ParallelResolver.KNOWN_PROPERTIES.includes(property)) {
|
||||
const isCollection = parallelConfig.parallelType === 'collection'
|
||||
const availableFields = isCollection ? ['index', 'currentItem', 'items'] : ['index']
|
||||
throw new InvalidFieldError('parallel', property, availableFields)
|
||||
}
|
||||
|
||||
let value: any
|
||||
switch (property) {
|
||||
case 'index':
|
||||
@@ -73,12 +97,8 @@ export class ParallelResolver implements Resolver {
|
||||
case 'items':
|
||||
value = distributionItems
|
||||
break
|
||||
default:
|
||||
logger.warn('Unknown parallel property', { property })
|
||||
return undefined
|
||||
}
|
||||
|
||||
// If there are additional path parts, navigate deeper
|
||||
if (pathParts.length > 0) {
|
||||
return navigatePath(value, pathParts)
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import { useShallow } from 'zustand/react/shallow'
|
||||
import { useSession } from '@/lib/auth/auth-client'
|
||||
import { useSocket } from '@/app/workspace/providers/socket-provider'
|
||||
import { getBlock } from '@/blocks'
|
||||
import { normalizeName } from '@/executor/constants'
|
||||
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
|
||||
import { useUndoRedo } from '@/hooks/use-undo-redo'
|
||||
import {
|
||||
BLOCK_OPERATIONS,
|
||||
@@ -740,6 +740,16 @@ export function useCollaborativeWorkflow() {
|
||||
return { success: false, error: 'Block name cannot be empty' }
|
||||
}
|
||||
|
||||
if ((RESERVED_BLOCK_NAMES as readonly string[]).includes(normalizedNewName)) {
|
||||
logger.error(`Cannot rename block to reserved name: "${trimmedName}"`)
|
||||
useNotificationStore.getState().addNotification({
|
||||
level: 'error',
|
||||
message: `"${trimmedName}" is a reserved name and cannot be used`,
|
||||
workflowId: activeWorkflowId || undefined,
|
||||
})
|
||||
return { success: false, error: `"${trimmedName}" is a reserved name` }
|
||||
}
|
||||
|
||||
const currentBlocks = useWorkflowStore.getState().blocks
|
||||
const conflictingBlock = Object.entries(currentBlocks).find(
|
||||
([blockId, block]) => blockId !== id && normalizeName(block.name) === normalizedNewName
|
||||
|
||||
@@ -14,7 +14,7 @@ import { validateWorkflowState } from '@/lib/workflows/sanitization/validation'
|
||||
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
|
||||
import { getAllBlocks, getBlock } from '@/blocks/registry'
|
||||
import type { SubBlockConfig } from '@/blocks/types'
|
||||
import { EDGE, normalizeName } from '@/executor/constants'
|
||||
import { EDGE, normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
|
||||
import { getUserPermissionConfig } from '@/executor/utils/permission-check'
|
||||
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
|
||||
import { TRIGGER_RUNTIME_SUBBLOCK_IDS } from '@/triggers/constants'
|
||||
@@ -63,6 +63,7 @@ type SkippedItemType =
|
||||
| 'invalid_subflow_parent'
|
||||
| 'nested_subflow_not_allowed'
|
||||
| 'duplicate_block_name'
|
||||
| 'reserved_block_name'
|
||||
| 'duplicate_trigger'
|
||||
| 'duplicate_single_instance_block'
|
||||
|
||||
@@ -1683,7 +1684,8 @@ function applyOperationsToWorkflowState(
|
||||
}
|
||||
}
|
||||
if (params?.name !== undefined) {
|
||||
if (!normalizeName(params.name)) {
|
||||
const normalizedName = normalizeName(params.name)
|
||||
if (!normalizedName) {
|
||||
logSkippedItem(skippedItems, {
|
||||
type: 'missing_required_params',
|
||||
operationType: 'edit',
|
||||
@@ -1691,6 +1693,14 @@ function applyOperationsToWorkflowState(
|
||||
reason: `Cannot rename to empty name`,
|
||||
details: { requestedName: params.name },
|
||||
})
|
||||
} else if ((RESERVED_BLOCK_NAMES as readonly string[]).includes(normalizedName)) {
|
||||
logSkippedItem(skippedItems, {
|
||||
type: 'reserved_block_name',
|
||||
operationType: 'edit',
|
||||
blockId: block_id,
|
||||
reason: `Cannot rename to "${params.name}" - this is a reserved name`,
|
||||
details: { requestedName: params.name },
|
||||
})
|
||||
} else {
|
||||
const conflictingBlock = findBlockWithDuplicateNormalizedName(
|
||||
modifiedState.blocks,
|
||||
@@ -1911,7 +1921,8 @@ function applyOperationsToWorkflowState(
|
||||
}
|
||||
|
||||
case 'add': {
|
||||
if (!params?.type || !params?.name || !normalizeName(params.name)) {
|
||||
const addNormalizedName = params?.name ? normalizeName(params.name) : ''
|
||||
if (!params?.type || !params?.name || !addNormalizedName) {
|
||||
logSkippedItem(skippedItems, {
|
||||
type: 'missing_required_params',
|
||||
operationType: 'add',
|
||||
@@ -1922,6 +1933,17 @@ function applyOperationsToWorkflowState(
|
||||
break
|
||||
}
|
||||
|
||||
if ((RESERVED_BLOCK_NAMES as readonly string[]).includes(addNormalizedName)) {
|
||||
logSkippedItem(skippedItems, {
|
||||
type: 'reserved_block_name',
|
||||
operationType: 'add',
|
||||
blockId: block_id,
|
||||
reason: `Block name "${params.name}" is a reserved name and cannot be used`,
|
||||
details: { requestedName: params.name },
|
||||
})
|
||||
break
|
||||
}
|
||||
|
||||
const conflictingBlock = findBlockWithDuplicateNormalizedName(
|
||||
modifiedState.blocks,
|
||||
params.name,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { db } from '@sim/db'
|
||||
import { idempotencyKey } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq, lt } from 'drizzle-orm'
|
||||
import { and, count, inArray, like, lt, max, min, sql } from 'drizzle-orm'
|
||||
|
||||
const logger = createLogger('IdempotencyCleanup')
|
||||
|
||||
@@ -19,7 +19,8 @@ export interface CleanupOptions {
|
||||
batchSize?: number
|
||||
|
||||
/**
|
||||
* Specific namespace to clean up, or undefined to clean all namespaces
|
||||
* Specific namespace prefix to clean up (e.g., 'webhook', 'polling')
|
||||
* Keys are prefixed with namespace, so this filters by key prefix
|
||||
*/
|
||||
namespace?: string
|
||||
}
|
||||
@@ -53,13 +54,17 @@ export async function cleanupExpiredIdempotencyKeys(
|
||||
|
||||
while (hasMore) {
|
||||
try {
|
||||
// Build where condition - filter by cutoff date and optionally by namespace prefix
|
||||
const whereCondition = namespace
|
||||
? and(lt(idempotencyKey.createdAt, cutoffDate), eq(idempotencyKey.namespace, namespace))
|
||||
? and(
|
||||
lt(idempotencyKey.createdAt, cutoffDate),
|
||||
like(idempotencyKey.key, `${namespace}:%`)
|
||||
)
|
||||
: lt(idempotencyKey.createdAt, cutoffDate)
|
||||
|
||||
// First, find IDs to delete with limit
|
||||
// Find keys to delete with limit
|
||||
const toDelete = await db
|
||||
.select({ key: idempotencyKey.key, namespace: idempotencyKey.namespace })
|
||||
.select({ key: idempotencyKey.key })
|
||||
.from(idempotencyKey)
|
||||
.where(whereCondition)
|
||||
.limit(batchSize)
|
||||
@@ -68,14 +73,13 @@ export async function cleanupExpiredIdempotencyKeys(
|
||||
break
|
||||
}
|
||||
|
||||
// Delete the found records
|
||||
// Delete the found records by key
|
||||
const deleteResult = await db
|
||||
.delete(idempotencyKey)
|
||||
.where(
|
||||
and(
|
||||
...toDelete.map((item) =>
|
||||
and(eq(idempotencyKey.key, item.key), eq(idempotencyKey.namespace, item.namespace))
|
||||
)
|
||||
inArray(
|
||||
idempotencyKey.key,
|
||||
toDelete.map((item) => item.key)
|
||||
)
|
||||
)
|
||||
.returning({ key: idempotencyKey.key })
|
||||
@@ -126,6 +130,7 @@ export async function cleanupExpiredIdempotencyKeys(
|
||||
|
||||
/**
|
||||
* Get statistics about idempotency key usage
|
||||
* Uses SQL aggregations to avoid loading all keys into memory
|
||||
*/
|
||||
export async function getIdempotencyKeyStats(): Promise<{
|
||||
totalKeys: number
|
||||
@@ -134,34 +139,35 @@ export async function getIdempotencyKeyStats(): Promise<{
|
||||
newestKey: Date | null
|
||||
}> {
|
||||
try {
|
||||
const allKeys = await db
|
||||
// Get total count and date range in a single query
|
||||
const [statsResult] = await db
|
||||
.select({
|
||||
namespace: idempotencyKey.namespace,
|
||||
createdAt: idempotencyKey.createdAt,
|
||||
totalKeys: count(),
|
||||
oldestKey: min(idempotencyKey.createdAt),
|
||||
newestKey: max(idempotencyKey.createdAt),
|
||||
})
|
||||
.from(idempotencyKey)
|
||||
|
||||
const totalKeys = allKeys.length
|
||||
// Get counts by namespace prefix using SQL substring
|
||||
// Extracts everything before the first ':' as the namespace
|
||||
const namespaceStats = await db
|
||||
.select({
|
||||
namespace: sql<string>`split_part(${idempotencyKey.key}, ':', 1)`.as('namespace'),
|
||||
count: count(),
|
||||
})
|
||||
.from(idempotencyKey)
|
||||
.groupBy(sql`split_part(${idempotencyKey.key}, ':', 1)`)
|
||||
|
||||
const keysByNamespace: Record<string, number> = {}
|
||||
let oldestKey: Date | null = null
|
||||
let newestKey: Date | null = null
|
||||
|
||||
for (const key of allKeys) {
|
||||
keysByNamespace[key.namespace] = (keysByNamespace[key.namespace] || 0) + 1
|
||||
|
||||
if (!oldestKey || key.createdAt < oldestKey) {
|
||||
oldestKey = key.createdAt
|
||||
}
|
||||
if (!newestKey || key.createdAt > newestKey) {
|
||||
newestKey = key.createdAt
|
||||
}
|
||||
for (const row of namespaceStats) {
|
||||
keysByNamespace[row.namespace || 'unknown'] = row.count
|
||||
}
|
||||
|
||||
return {
|
||||
totalKeys,
|
||||
totalKeys: statsResult?.totalKeys ?? 0,
|
||||
keysByNamespace,
|
||||
oldestKey,
|
||||
newestKey,
|
||||
oldestKey: statsResult?.oldestKey ?? null,
|
||||
newestKey: statsResult?.newestKey ?? null,
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to get idempotency key stats:', error)
|
||||
|
||||
@@ -2,7 +2,7 @@ import { randomUUID } from 'crypto'
|
||||
import { db } from '@sim/db'
|
||||
import { idempotencyKey } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { getRedisClient } from '@/lib/core/config/redis'
|
||||
import { getStorageMethod, type StorageMethod } from '@/lib/core/storage'
|
||||
import { extractProviderIdentifierFromBody } from '@/lib/webhooks/provider-utils'
|
||||
@@ -124,12 +124,7 @@ export class IdempotencyService {
|
||||
const existing = await db
|
||||
.select({ result: idempotencyKey.result, createdAt: idempotencyKey.createdAt })
|
||||
.from(idempotencyKey)
|
||||
.where(
|
||||
and(
|
||||
eq(idempotencyKey.key, normalizedKey),
|
||||
eq(idempotencyKey.namespace, this.config.namespace)
|
||||
)
|
||||
)
|
||||
.where(eq(idempotencyKey.key, normalizedKey))
|
||||
.limit(1)
|
||||
|
||||
if (existing.length > 0) {
|
||||
@@ -224,11 +219,12 @@ export class IdempotencyService {
|
||||
.insert(idempotencyKey)
|
||||
.values({
|
||||
key: normalizedKey,
|
||||
namespace: this.config.namespace,
|
||||
result: inProgressResult,
|
||||
createdAt: new Date(),
|
||||
})
|
||||
.onConflictDoNothing()
|
||||
.onConflictDoNothing({
|
||||
target: [idempotencyKey.key],
|
||||
})
|
||||
.returning({ key: idempotencyKey.key })
|
||||
|
||||
if (insertResult.length > 0) {
|
||||
@@ -243,12 +239,7 @@ export class IdempotencyService {
|
||||
const existing = await db
|
||||
.select({ result: idempotencyKey.result })
|
||||
.from(idempotencyKey)
|
||||
.where(
|
||||
and(
|
||||
eq(idempotencyKey.key, normalizedKey),
|
||||
eq(idempotencyKey.namespace, this.config.namespace)
|
||||
)
|
||||
)
|
||||
.where(eq(idempotencyKey.key, normalizedKey))
|
||||
.limit(1)
|
||||
|
||||
const existingResult =
|
||||
@@ -280,12 +271,7 @@ export class IdempotencyService {
|
||||
const existing = await db
|
||||
.select({ result: idempotencyKey.result })
|
||||
.from(idempotencyKey)
|
||||
.where(
|
||||
and(
|
||||
eq(idempotencyKey.key, normalizedKey),
|
||||
eq(idempotencyKey.namespace, this.config.namespace)
|
||||
)
|
||||
)
|
||||
.where(eq(idempotencyKey.key, normalizedKey))
|
||||
.limit(1)
|
||||
currentResult = existing.length > 0 ? (existing[0].result as ProcessingResult) : null
|
||||
}
|
||||
@@ -339,12 +325,11 @@ export class IdempotencyService {
|
||||
.insert(idempotencyKey)
|
||||
.values({
|
||||
key: normalizedKey,
|
||||
namespace: this.config.namespace,
|
||||
result: result,
|
||||
createdAt: new Date(),
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: [idempotencyKey.key, idempotencyKey.namespace],
|
||||
target: [idempotencyKey.key],
|
||||
set: {
|
||||
result: result,
|
||||
createdAt: new Date(),
|
||||
|
||||
@@ -7,7 +7,7 @@ import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
|
||||
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
|
||||
import { getBlock } from '@/blocks'
|
||||
import type { SubBlockConfig } from '@/blocks/types'
|
||||
import { normalizeName } from '@/executor/constants'
|
||||
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
|
||||
import { filterNewEdges, getUniqueBlockName, mergeSubblockState } from '@/stores/workflows/utils'
|
||||
@@ -726,6 +726,11 @@ export const useWorkflowStore = create<WorkflowStore>()(
|
||||
return { success: false, changedSubblocks: [] }
|
||||
}
|
||||
|
||||
if ((RESERVED_BLOCK_NAMES as readonly string[]).includes(normalizedNewName)) {
|
||||
logger.error(`Cannot rename block to reserved name: "${name}"`)
|
||||
return { success: false, changedSubblocks: [] }
|
||||
}
|
||||
|
||||
const newState = {
|
||||
blocks: {
|
||||
...get().blocks,
|
||||
|
||||
4
packages/db/migrations/0147_rare_firebrand.sql
Normal file
4
packages/db/migrations/0147_rare_firebrand.sql
Normal file
@@ -0,0 +1,4 @@
|
||||
DROP INDEX "idempotency_key_namespace_unique";--> statement-breakpoint
|
||||
DROP INDEX "idempotency_key_namespace_idx";--> statement-breakpoint
|
||||
ALTER TABLE "idempotency_key" ADD PRIMARY KEY ("key");--> statement-breakpoint
|
||||
ALTER TABLE "idempotency_key" DROP COLUMN "namespace";
|
||||
10341
packages/db/migrations/meta/0147_snapshot.json
Normal file
10341
packages/db/migrations/meta/0147_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1023,6 +1023,13 @@
|
||||
"when": 1768867605608,
|
||||
"tag": "0146_cultured_ikaris",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 147,
|
||||
"version": "7",
|
||||
"when": 1769134350805,
|
||||
"tag": "0147_rare_firebrand",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1656,20 +1656,13 @@ export const workflowDeploymentVersion = pgTable(
|
||||
export const idempotencyKey = pgTable(
|
||||
'idempotency_key',
|
||||
{
|
||||
key: text('key').notNull(),
|
||||
namespace: text('namespace').notNull().default('default'),
|
||||
key: text('key').primaryKey(),
|
||||
result: json('result').notNull(),
|
||||
createdAt: timestamp('created_at').notNull().defaultNow(),
|
||||
},
|
||||
(table) => ({
|
||||
// Primary key is combination of key and namespace
|
||||
keyNamespacePk: uniqueIndex('idempotency_key_namespace_unique').on(table.key, table.namespace),
|
||||
|
||||
// Index for cleanup operations by creation time
|
||||
createdAtIdx: index('idempotency_key_created_at_idx').on(table.createdAt),
|
||||
|
||||
// Index for namespace-based queries
|
||||
namespaceIdx: index('idempotency_key_namespace_idx').on(table.namespace),
|
||||
})
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user