feat(variables): multiplayer variables through sockets, persist server side (#933)

* feat(variables): multiplayer variables through sockets, persist server side

* remove extraneous comments

* breakout variables handler in sockets
This commit is contained in:
Waleed Latif
2025-08-11 16:32:21 -07:00
committed by GitHub
parent 2a333c7cf7
commit bc455d5bf4
12 changed files with 1157 additions and 636 deletions

View File

@@ -19,6 +19,7 @@ import { ScrollArea } from '@/components/ui/scroll-area'
import { Tooltip, TooltipContent, TooltipTrigger } from '@/components/ui/tooltip'
import { createLogger } from '@/lib/logs/console/logger'
import { validateName } from '@/lib/utils'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useVariablesStore } from '@/stores/panel/variables/store'
import type { Variable, VariableType } from '@/stores/panel/variables/types'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
@@ -34,19 +35,17 @@ export function Variables() {
deleteVariable,
duplicateVariable,
getVariablesByWorkflowId,
loadVariables,
} = useVariablesStore()
const {
collaborativeUpdateVariable,
collaborativeAddVariable,
collaborativeDeleteVariable,
collaborativeDuplicateVariable,
} = useCollaborativeWorkflow()
// Get variables for the current workflow
const workflowVariables = activeWorkflowId ? getVariablesByWorkflowId(activeWorkflowId) : []
// Load variables when active workflow changes
useEffect(() => {
if (activeWorkflowId) {
loadVariables(activeWorkflowId)
}
}, [activeWorkflowId, loadVariables])
// Track editor references
const editorRefs = useRef<Record<string, HTMLDivElement | null>>({})
@@ -56,16 +55,14 @@ export function Variables() {
// Handle variable name change with validation
const handleVariableNameChange = (variableId: string, newName: string) => {
const validatedName = validateName(newName)
updateVariable(variableId, { name: validatedName })
collaborativeUpdateVariable(variableId, 'name', validatedName)
}
// Auto-save when variables are added/edited
const handleAddVariable = () => {
if (!activeWorkflowId) return
// Create a default variable - naming is handled in the store
const id = addVariable({
name: '', // Store will generate an appropriate name
const id = collaborativeAddVariable({
name: '',
type: 'string',
value: '',
workflowId: activeWorkflowId,
@@ -125,17 +122,10 @@ export function Variables() {
}
}
// Handle editor value changes - store exactly what user types
const handleEditorChange = (variable: Variable, newValue: string) => {
// Store the raw value directly, no parsing or formatting
updateVariable(variable.id, {
value: newValue,
// Clear any previous validation errors so they'll be recalculated
validationError: undefined,
})
collaborativeUpdateVariable(variable.id, 'value', newValue)
}
// Only track focus state for UI purposes
const handleEditorBlur = (variableId: string) => {
setActiveEditors((prev) => ({
...prev,
@@ -143,7 +133,6 @@ export function Variables() {
}))
}
// Track when editor becomes active
const handleEditorFocus = (variableId: string) => {
setActiveEditors((prev) => ({
...prev,
@@ -151,20 +140,14 @@ export function Variables() {
}))
}
// Always return raw value without any formatting
const formatValue = (variable: Variable) => {
if (variable.value === '') return ''
// Always return raw value exactly as typed
return typeof variable.value === 'string' ? variable.value : JSON.stringify(variable.value)
}
// Get validation status based on type and value
const getValidationStatus = (variable: Variable): string | undefined => {
// Empty values don't need validation
if (variable.value === '') return undefined
// Otherwise validate based on type
switch (variable.type) {
case 'number':
return Number.isNaN(Number(variable.value)) ? 'Not a valid number' : undefined
@@ -174,49 +157,38 @@ export function Variables() {
: undefined
case 'object':
try {
// Handle both JavaScript and JSON syntax
const valueToEvaluate = String(variable.value).trim()
// Basic security check to prevent arbitrary code execution
if (!valueToEvaluate.startsWith('{') || !valueToEvaluate.endsWith('}')) {
return 'Not a valid object format'
}
// Use Function constructor to safely evaluate the object expression
// This is safer than eval() and handles all JS object literal syntax
const parsed = new Function(`return ${valueToEvaluate}`)()
// Verify it's actually an object (not array or null)
if (parsed === null || typeof parsed !== 'object' || Array.isArray(parsed)) {
return 'Not a valid object'
}
return undefined // Valid object
return undefined
} catch (e) {
logger.info('Object parsing error:', e)
return 'Invalid object syntax'
}
case 'array':
try {
// Use actual JavaScript evaluation instead of trying to convert to JSON
// This properly handles all valid JS array syntax including mixed types
const valueToEvaluate = String(variable.value).trim()
// Basic security check to prevent arbitrary code execution
if (!valueToEvaluate.startsWith('[') || !valueToEvaluate.endsWith(']')) {
return 'Not a valid array format'
}
// Use Function constructor to safely evaluate the array expression
// This is safer than eval() and handles all JS array syntax correctly
const parsed = new Function(`return ${valueToEvaluate}`)()
// Verify it's actually an array
if (!Array.isArray(parsed)) {
return 'Not a valid array'
}
return undefined // Valid array
return undefined
} catch (e) {
logger.info('Array parsing error:', e)
return 'Invalid array syntax'
@@ -226,9 +198,7 @@ export function Variables() {
}
}
// Clear editor refs when variables change
useEffect(() => {
// Clean up any references to deleted variables
Object.keys(editorRefs.current).forEach((id) => {
if (!workflowVariables.some((v) => v.id === id)) {
delete editorRefs.current[id]
@@ -276,35 +246,35 @@ export function Variables() {
className='min-w-32 rounded-lg border-[#E5E5E5] bg-[#FFFFFF] shadow-xs dark:border-[#414141] dark:bg-[#202020]'
>
<DropdownMenuItem
onClick={() => updateVariable(variable.id, { type: 'plain' })}
onClick={() => collaborativeUpdateVariable(variable.id, 'type', 'plain')}
className='flex cursor-pointer items-center rounded-md px-3 py-2 font-[380] text-card-foreground text-sm hover:bg-secondary/50 focus:bg-secondary/50'
>
<div className='mr-2 w-5 text-center font-[380] text-sm'>Abc</div>
<span className='font-[380]'>Plain</span>
</DropdownMenuItem>
<DropdownMenuItem
onClick={() => updateVariable(variable.id, { type: 'number' })}
onClick={() => collaborativeUpdateVariable(variable.id, 'type', 'number')}
className='flex cursor-pointer items-center rounded-md px-3 py-2 font-[380] text-card-foreground text-sm hover:bg-secondary/50 focus:bg-secondary/50'
>
<div className='mr-2 w-5 text-center font-[380] text-sm'>123</div>
<span className='font-[380]'>Number</span>
</DropdownMenuItem>
<DropdownMenuItem
onClick={() => updateVariable(variable.id, { type: 'boolean' })}
onClick={() => collaborativeUpdateVariable(variable.id, 'type', 'boolean')}
className='flex cursor-pointer items-center rounded-md px-3 py-2 font-[380] text-card-foreground text-sm hover:bg-secondary/50 focus:bg-secondary/50'
>
<div className='mr-2 w-5 text-center font-[380] text-sm'>0/1</div>
<span className='font-[380]'>Boolean</span>
</DropdownMenuItem>
<DropdownMenuItem
onClick={() => updateVariable(variable.id, { type: 'object' })}
onClick={() => collaborativeUpdateVariable(variable.id, 'type', 'object')}
className='flex cursor-pointer items-center rounded-md px-3 py-2 font-[380] text-card-foreground text-sm hover:bg-secondary/50 focus:bg-secondary/50'
>
<div className='mr-2 w-5 text-center font-[380] text-sm'>{'{}'}</div>
<span className='font-[380]'>Object</span>
</DropdownMenuItem>
<DropdownMenuItem
onClick={() => updateVariable(variable.id, { type: 'array' })}
onClick={() => collaborativeUpdateVariable(variable.id, 'type', 'array')}
className='flex cursor-pointer items-center rounded-md px-3 py-2 font-[380] text-card-foreground text-sm hover:bg-secondary/50 focus:bg-secondary/50'
>
<div className='mr-2 w-5 text-center font-[380] text-sm'>[]</div>
@@ -329,14 +299,14 @@ export function Variables() {
className='min-w-32 rounded-lg border-[#E5E5E5] bg-[#FFFFFF] shadow-xs dark:border-[#414141] dark:bg-[#202020]'
>
<DropdownMenuItem
onClick={() => duplicateVariable(variable.id)}
onClick={() => collaborativeDuplicateVariable(variable.id)}
className='cursor-pointer rounded-md px-3 py-2 font-[380] text-card-foreground text-sm hover:bg-secondary/50 focus:bg-secondary/50'
>
<Copy className='mr-2 h-4 w-4 text-muted-foreground' />
Duplicate
</DropdownMenuItem>
<DropdownMenuItem
onClick={() => deleteVariable(variable.id)}
onClick={() => collaborativeDeleteVariable(variable.id)}
className='cursor-pointer rounded-md px-3 py-2 font-[380] text-destructive text-sm hover:bg-destructive/10 focus:bg-destructive/10 focus:text-destructive'
>
<Trash className='mr-2 h-4 w-4' />

View File

@@ -50,12 +50,14 @@ interface SocketContextType {
value: any,
operationId?: string
) => void
emitVariableUpdate: (variableId: string, field: string, value: any, operationId?: string) => void
emitCursorUpdate: (cursor: { x: number; y: number }) => void
emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void
// Event handlers for receiving real-time updates
onWorkflowOperation: (handler: (data: any) => void) => void
onSubblockUpdate: (handler: (data: any) => void) => void
onVariableUpdate: (handler: (data: any) => void) => void
onCursorUpdate: (handler: (data: any) => void) => void
onSelectionUpdate: (handler: (data: any) => void) => void
@@ -77,10 +79,12 @@ const SocketContext = createContext<SocketContextType>({
leaveWorkflow: () => {},
emitWorkflowOperation: () => {},
emitSubblockUpdate: () => {},
emitVariableUpdate: () => {},
emitCursorUpdate: () => {},
emitSelectionUpdate: () => {},
onWorkflowOperation: () => {},
onSubblockUpdate: () => {},
onVariableUpdate: () => {},
onCursorUpdate: () => {},
onSelectionUpdate: () => {},
onUserJoined: () => {},
@@ -113,6 +117,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const eventHandlers = useRef<{
workflowOperation?: (data: any) => void
subblockUpdate?: (data: any) => void
variableUpdate?: (data: any) => void
cursorUpdate?: (data: any) => void
selectionUpdate?: (data: any) => void
@@ -292,6 +297,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.subblockUpdate?.(data)
})
// Variable update events
socketInstance.on('variable-update', (data) => {
eventHandlers.current.variableUpdate?.(data)
})
// Workflow deletion events
socketInstance.on('workflow-deleted', (data) => {
logger.warn(`Workflow ${data.workflowId} has been deleted`)
@@ -697,6 +707,30 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
[socket, currentWorkflowId]
)
// Emit variable value updates
const emitVariableUpdate = useCallback(
(variableId: string, field: string, value: any, operationId?: string) => {
// Only emit if socket is connected and we're in a valid workflow room
if (socket && currentWorkflowId) {
socket.emit('variable-update', {
variableId,
field,
value,
timestamp: Date.now(),
operationId, // Include operation ID for queue tracking
})
} else {
logger.warn('Cannot emit variable update: no socket connection or workflow room', {
hasSocket: !!socket,
currentWorkflowId,
variableId,
field,
})
}
},
[socket, currentWorkflowId]
)
// Cursor throttling optimized for database connection health
const lastCursorEmit = useRef(0)
const emitCursorUpdate = useCallback(
@@ -732,6 +766,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.subblockUpdate = handler
}, [])
const onVariableUpdate = useCallback((handler: (data: any) => void) => {
eventHandlers.current.variableUpdate = handler
}, [])
const onCursorUpdate = useCallback((handler: (data: any) => void) => {
eventHandlers.current.cursorUpdate = handler
}, [])
@@ -776,11 +814,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
leaveWorkflow,
emitWorkflowOperation,
emitSubblockUpdate,
emitVariableUpdate,
emitCursorUpdate,
emitSelectionUpdate,
onWorkflowOperation,
onSubblockUpdate,
onVariableUpdate,
onCursorUpdate,
onSelectionUpdate,

View File

@@ -6,6 +6,7 @@ import { getBlock } from '@/blocks'
import { resolveOutputType } from '@/blocks/utils'
import { useSocket } from '@/contexts/socket-context'
import { registerEmitFunctions, useOperationQueue } from '@/stores/operation-queue/store'
import { useVariablesStore } from '@/stores/panel/variables/store'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
@@ -23,8 +24,10 @@ export function useCollaborativeWorkflow() {
leaveWorkflow,
emitWorkflowOperation,
emitSubblockUpdate,
emitVariableUpdate,
onWorkflowOperation,
onSubblockUpdate,
onVariableUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
@@ -36,6 +39,7 @@ export function useCollaborativeWorkflow() {
const { activeWorkflowId } = useWorkflowRegistry()
const workflowStore = useWorkflowStore()
const subBlockStore = useSubBlockStore()
const variablesStore = useVariablesStore()
const { data: session } = useSession()
const { isShowingDiff } = useWorkflowDiffStore()
@@ -53,6 +57,7 @@ export function useCollaborativeWorkflow() {
confirmOperation,
failOperation,
cancelOperationsForBlock,
cancelOperationsForVariable,
} = useOperationQueue()
// Clear position timestamps when switching workflows
@@ -73,8 +78,13 @@ export function useCollaborativeWorkflow() {
// Register emit functions with operation queue store
useEffect(() => {
registerEmitFunctions(emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId)
}, [emitWorkflowOperation, emitSubblockUpdate, currentWorkflowId])
registerEmitFunctions(
emitWorkflowOperation,
emitSubblockUpdate,
emitVariableUpdate,
currentWorkflowId
)
}, [emitWorkflowOperation, emitSubblockUpdate, emitVariableUpdate, currentWorkflowId])
useEffect(() => {
const handleWorkflowOperation = (data: any) => {
@@ -232,6 +242,26 @@ export function useCollaborativeWorkflow() {
}
break
}
} else if (target === 'variable') {
switch (operation) {
case 'add':
variablesStore.addVariable(
{
workflowId: payload.workflowId,
name: payload.name,
type: payload.type,
value: payload.value,
},
payload.id
)
break
case 'remove':
variablesStore.deleteVariable(payload.variableId)
break
case 'duplicate':
variablesStore.duplicateVariable(payload.sourceVariableId, payload.id)
break
}
}
} catch (error) {
logger.error('Error applying remote operation:', error)
@@ -259,6 +289,30 @@ export function useCollaborativeWorkflow() {
}
}
const handleVariableUpdate = (data: any) => {
const { variableId, field, value, userId } = data
if (isApplyingRemoteChange.current) return
logger.info(`Received variable update from user ${userId}: ${variableId}.${field}`)
isApplyingRemoteChange.current = true
try {
if (field === 'name') {
variablesStore.updateVariable(variableId, { name: value })
} else if (field === 'value') {
variablesStore.updateVariable(variableId, { value })
} else if (field === 'type') {
variablesStore.updateVariable(variableId, { type: value })
}
} catch (error) {
logger.error('Error applying remote variable update:', error)
} finally {
isApplyingRemoteChange.current = false
}
}
const handleUserJoined = (data: any) => {
logger.info(`User joined: ${data.userName}`)
}
@@ -364,6 +418,7 @@ export function useCollaborativeWorkflow() {
// Register event handlers
onWorkflowOperation(handleWorkflowOperation)
onSubblockUpdate(handleSubblockUpdate)
onVariableUpdate(handleVariableUpdate)
onUserJoined(handleUserJoined)
onUserLeft(handleUserLeft)
onWorkflowDeleted(handleWorkflowDeleted)
@@ -377,6 +432,7 @@ export function useCollaborativeWorkflow() {
}, [
onWorkflowOperation,
onSubblockUpdate,
onVariableUpdate,
onUserJoined,
onUserLeft,
onWorkflowDeleted,
@@ -385,6 +441,7 @@ export function useCollaborativeWorkflow() {
onOperationFailed,
workflowStore,
subBlockStore,
variablesStore,
activeWorkflowId,
confirmOperation,
failOperation,
@@ -1094,6 +1151,72 @@ export function useCollaborativeWorkflow() {
[executeQueuedOperation, workflowStore]
)
const collaborativeUpdateVariable = useCallback(
(variableId: string, field: 'name' | 'value' | 'type', value: any) => {
executeQueuedOperation('variable-update', 'variable', { variableId, field, value }, () => {
if (field === 'name') {
variablesStore.updateVariable(variableId, { name: value })
} else if (field === 'value') {
variablesStore.updateVariable(variableId, { value })
} else if (field === 'type') {
variablesStore.updateVariable(variableId, { type: value })
}
})
},
[executeQueuedOperation, variablesStore]
)
const collaborativeAddVariable = useCallback(
(variableData: { name: string; type: any; value: any; workflowId: string }) => {
const id = crypto.randomUUID()
variablesStore.addVariable(variableData, id)
const processedVariable = useVariablesStore.getState().variables[id]
if (processedVariable) {
const payloadWithProcessedName = {
...variableData,
id,
name: processedVariable.name,
}
executeQueuedOperation('add', 'variable', payloadWithProcessedName, () => {})
}
return id
},
[executeQueuedOperation, variablesStore]
)
const collaborativeDeleteVariable = useCallback(
(variableId: string) => {
cancelOperationsForVariable(variableId)
executeQueuedOperation('remove', 'variable', { variableId }, () => {
variablesStore.deleteVariable(variableId)
})
},
[executeQueuedOperation, variablesStore, cancelOperationsForVariable]
)
const collaborativeDuplicateVariable = useCallback(
(variableId: string) => {
const newId = crypto.randomUUID()
const sourceVariable = useVariablesStore.getState().variables[variableId]
if (!sourceVariable) return null
executeQueuedOperation(
'duplicate',
'variable',
{ sourceVariableId: variableId, id: newId },
() => {
variablesStore.duplicateVariable(variableId, newId)
}
)
return newId
},
[executeQueuedOperation, variablesStore]
)
return {
// Connection status
isConnected,
@@ -1122,6 +1245,12 @@ export function useCollaborativeWorkflow() {
collaborativeSetSubblockValue,
collaborativeSetTagSelection,
// Collaborative variable operations
collaborativeUpdateVariable,
collaborativeAddVariable,
collaborativeDeleteVariable,
collaborativeDuplicateVariable,
// Collaborative loop/parallel operations
collaborativeUpdateLoopType,
collaborativeUpdateParallelType,

View File

@@ -194,6 +194,9 @@ export async function persistWorkflowOperation(workflowId: string, operation: an
case 'subflow':
await handleSubflowOperationTx(tx, workflowId, op, payload, userId)
break
case 'variable':
await handleVariableOperationTx(tx, workflowId, op, payload, userId)
break
default:
throw new Error(`Unknown operation target: ${target}`)
}
@@ -855,3 +858,127 @@ async function handleSubflowOperationTx(
throw new Error(`Unsupported subflow operation: ${operation}`)
}
}
// Variable operations - updates workflow.variables JSON field
async function handleVariableOperationTx(
tx: any,
workflowId: string,
operation: string,
payload: any,
userId: string
) {
// Get current workflow variables
const workflowData = await tx
.select({ variables: workflow.variables })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (workflowData.length === 0) {
throw new Error(`Workflow ${workflowId} not found`)
}
const currentVariables = (workflowData[0].variables as Record<string, any>) || {}
switch (operation) {
case 'add': {
if (!payload.id || !payload.name || payload.type === undefined) {
throw new Error('Missing required fields for add variable operation')
}
// Add the new variable
const updatedVariables = {
...currentVariables,
[payload.id]: {
id: payload.id,
workflowId: payload.workflowId,
name: payload.name,
type: payload.type,
value: payload.value || '',
},
}
await tx
.update(workflow)
.set({
variables: updatedVariables,
updatedAt: new Date(),
})
.where(eq(workflow.id, workflowId))
logger.debug(`Added variable ${payload.id} (${payload.name}) to workflow ${workflowId}`)
break
}
case 'remove': {
if (!payload.variableId) {
throw new Error('Missing variable ID for remove operation')
}
// Remove the variable
const { [payload.variableId]: _, ...updatedVariables } = currentVariables
await tx
.update(workflow)
.set({
variables: updatedVariables,
updatedAt: new Date(),
})
.where(eq(workflow.id, workflowId))
logger.debug(`Removed variable ${payload.variableId} from workflow ${workflowId}`)
break
}
case 'duplicate': {
if (!payload.sourceVariableId || !payload.id) {
throw new Error('Missing required fields for duplicate variable operation')
}
const sourceVariable = currentVariables[payload.sourceVariableId]
if (!sourceVariable) {
throw new Error(`Source variable ${payload.sourceVariableId} not found`)
}
// Create duplicated variable with unique name
const baseName = `${sourceVariable.name} (copy)`
let uniqueName = baseName
let nameIndex = 1
// Ensure name uniqueness
const existingNames = Object.values(currentVariables).map((v: any) => v.name)
while (existingNames.includes(uniqueName)) {
uniqueName = `${baseName} (${nameIndex})`
nameIndex++
}
const duplicatedVariable = {
...sourceVariable,
id: payload.id,
name: uniqueName,
}
const updatedVariables = {
...currentVariables,
[payload.id]: duplicatedVariable,
}
await tx
.update(workflow)
.set({
variables: updatedVariables,
updatedAt: new Date(),
})
.where(eq(workflow.id, workflowId))
logger.debug(
`Duplicated variable ${payload.sourceVariableId} -> ${payload.id} (${uniqueName}) in workflow ${workflowId}`
)
break
}
default:
logger.warn(`Unknown variable operation: ${operation}`)
throw new Error(`Unsupported variable operation: ${operation}`)
}
}

View File

@@ -2,6 +2,7 @@ import { setupConnectionHandlers } from '@/socket-server/handlers/connection'
import { setupOperationsHandlers } from '@/socket-server/handlers/operations'
import { setupPresenceHandlers } from '@/socket-server/handlers/presence'
import { setupSubblocksHandlers } from '@/socket-server/handlers/subblocks'
import { setupVariablesHandlers } from '@/socket-server/handlers/variables'
import { setupWorkflowHandlers } from '@/socket-server/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket-server/middleware/auth'
import type { RoomManager, UserPresence, WorkflowRoom } from '@/socket-server/rooms/manager'
@@ -17,6 +18,7 @@ export function setupAllHandlers(socket: AuthenticatedSocket, roomManager: RoomM
setupWorkflowHandlers(socket, roomManager)
setupOperationsHandlers(socket, roomManager)
setupSubblocksHandlers(socket, roomManager)
setupVariablesHandlers(socket, roomManager)
setupPresenceHandlers(socket, roomManager)
setupConnectionHandlers(socket, roomManager)
}
@@ -25,6 +27,7 @@ export {
setupWorkflowHandlers,
setupOperationsHandlers,
setupSubblocksHandlers,
setupVariablesHandlers,
setupPresenceHandlers,
setupConnectionHandlers,
}

View File

@@ -126,6 +126,44 @@ export function setupOperationsHandlers(
return // Early return for position updates
}
if (target === 'variable' && ['add', 'remove', 'duplicate'].includes(operation)) {
// Persist first, then broadcast
await persistWorkflowOperation(workflowId, {
operation,
target,
payload,
timestamp: operationTimestamp,
userId: session.userId,
})
room.lastModified = Date.now()
const broadcastData = {
operation,
target,
payload,
timestamp: operationTimestamp,
senderId: socket.id,
userId: session.userId,
userName: session.userName,
metadata: {
workflowId,
operationId: crypto.randomUUID(),
},
}
socket.to(workflowId).emit('workflow-operation', broadcastData)
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
serverTimestamp: Date.now(),
})
}
return
}
// For non-position operations, persist first then broadcast
await persistWorkflowOperation(workflowId, {
operation,

View File

@@ -0,0 +1,152 @@
import { eq } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console/logger'
import { db } from '@/db'
import { workflow } from '@/db/schema'
import type { HandlerDependencies } from '@/socket-server/handlers/workflow'
import type { AuthenticatedSocket } from '@/socket-server/middleware/auth'
import type { RoomManager } from '@/socket-server/rooms/manager'
const logger = createLogger('VariablesHandlers')
export function setupVariablesHandlers(
socket: AuthenticatedSocket,
deps: HandlerDependencies | RoomManager
) {
const roomManager =
deps instanceof Object && 'roomManager' in deps ? deps.roomManager : (deps as RoomManager)
socket.on('variable-update', async (data) => {
const workflowId = roomManager.getWorkflowIdForSocket(socket.id)
const session = roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
return
}
const { variableId, field, value, timestamp, operationId } = data
const room = roomManager.getWorkflowRoom(workflowId)
if (!room) {
logger.debug(`Ignoring variable update: workflow room not found`, {
socketId: socket.id,
workflowId,
variableId,
field,
})
return
}
try {
const userPresence = room.users.get(socket.id)
if (userPresence) {
userPresence.lastActivity = Date.now()
}
const workflowExists = await db
.select({ id: workflow.id })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (workflowExists.length === 0) {
logger.warn(`Ignoring variable update: workflow ${workflowId} no longer exists`, {
socketId: socket.id,
variableId,
field,
})
roomManager.cleanupUserFromRoom(socket.id, workflowId)
return
}
let updateSuccessful = false
await db.transaction(async (tx) => {
const [workflowRecord] = await tx
.select({ variables: workflow.variables })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!workflowRecord) {
logger.debug(
`Ignoring variable update for deleted workflow: ${workflowId}/${variableId}.${field}`
)
return
}
const variables = (workflowRecord.variables as any) || {}
if (!variables[variableId]) {
logger.debug(
`Ignoring variable update for deleted variable: ${workflowId}/${variableId}.${field}`
)
return
}
variables[variableId] = {
...variables[variableId],
[field]: value,
}
await tx
.update(workflow)
.set({
variables: variables,
updatedAt: new Date(),
})
.where(eq(workflow.id, workflowId))
updateSuccessful = true
})
if (updateSuccessful) {
socket.to(workflowId).emit('variable-update', {
variableId,
field,
value,
timestamp,
senderId: socket.id,
userId: session.userId,
})
if (operationId) {
socket.emit('operation-confirmed', {
operationId,
serverTimestamp: Date.now(),
})
}
logger.debug(`Variable update in workflow ${workflowId}: ${variableId}.${field}`)
} else if (operationId) {
socket.emit('operation-failed', {
operationId,
error: 'Variable no longer exists',
retryable: false,
})
}
} catch (error) {
logger.error('Error handling variable update:', error)
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
if (operationId) {
socket.emit('operation-failed', {
operationId,
error: errorMessage,
retryable: true,
})
}
socket.emit('operation-error', {
type: 'VARIABLE_UPDATE_FAILED',
message: `Failed to update variable ${variableId}.${field}: ${errorMessage}`,
operation: 'variable-update',
target: 'variable',
})
}
})
}

View File

@@ -79,10 +79,46 @@ export const SubflowOperationSchema = z.object({
operationId: z.string().optional(),
})
export const VariableOperationSchema = z.union([
z.object({
operation: z.literal('add'),
target: z.literal('variable'),
payload: z.object({
id: z.string(),
name: z.string(),
type: z.any(),
value: z.any(),
workflowId: z.string(),
}),
timestamp: z.number(),
operationId: z.string().optional(),
}),
z.object({
operation: z.literal('remove'),
target: z.literal('variable'),
payload: z.object({
variableId: z.string(),
}),
timestamp: z.number(),
operationId: z.string().optional(),
}),
z.object({
operation: z.literal('duplicate'),
target: z.literal('variable'),
payload: z.object({
sourceVariableId: z.string(),
id: z.string(),
}),
timestamp: z.number(),
operationId: z.string().optional(),
}),
])
export const WorkflowOperationSchema = z.union([
BlockOperationSchema,
EdgeOperationSchema,
SubflowOperationSchema,
VariableOperationSchema,
])
export { PositionSchema, AutoConnectEdgeSchema }

View File

@@ -29,6 +29,7 @@ interface OperationQueueState {
handleOperationTimeout: (operationId: string) => void
processNextOperation: () => void
cancelOperationsForBlock: (blockId: string) => void
cancelOperationsForVariable: (variableId: string) => void
triggerOfflineMode: () => void
clearError: () => void
@@ -37,6 +38,7 @@ interface OperationQueueState {
const retryTimeouts = new Map<string, NodeJS.Timeout>()
const operationTimeouts = new Map<string, NodeJS.Timeout>()
const subblockDebounceTimeouts = new Map<string, NodeJS.Timeout>()
const variableDebounceTimeouts = new Map<string, NodeJS.Timeout>()
let emitWorkflowOperation:
| ((operation: string, target: string, payload: any, operationId?: string) => void)
@@ -44,14 +46,19 @@ let emitWorkflowOperation:
let emitSubblockUpdate:
| ((blockId: string, subblockId: string, value: any, operationId?: string) => void)
| null = null
let emitVariableUpdate:
| ((variableId: string, field: string, value: any, operationId?: string) => void)
| null = null
export function registerEmitFunctions(
workflowEmit: (operation: string, target: string, payload: any, operationId?: string) => void,
subblockEmit: (blockId: string, subblockId: string, value: any, operationId?: string) => void,
variableEmit: (variableId: string, field: string, value: any, operationId?: string) => void,
workflowId: string | null
) {
emitWorkflowOperation = workflowEmit
emitSubblockUpdate = subblockEmit
emitVariableUpdate = variableEmit
}
export const useOperationQueueStore = create<OperationQueueState>((set, get) => ({
@@ -108,6 +115,54 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
return
}
// Handle debouncing for variable operations
if (
operation.operation.operation === 'variable-update' &&
operation.operation.target === 'variable' &&
!operation.immediate
) {
const { variableId, field } = operation.operation.payload
const debounceKey = `${variableId}-${field}`
const existingTimeout = variableDebounceTimeouts.get(debounceKey)
if (existingTimeout) {
clearTimeout(existingTimeout)
}
set((state) => ({
operations: state.operations.filter(
(op) =>
!(
op.status === 'pending' &&
op.operation.operation === 'variable-update' &&
op.operation.target === 'variable' &&
op.operation.payload?.variableId === variableId &&
op.operation.payload?.field === field
)
),
}))
const timeoutId = setTimeout(() => {
variableDebounceTimeouts.delete(debounceKey)
const queuedOp: QueuedOperation = {
...operation,
timestamp: Date.now(),
retryCount: 0,
status: 'pending',
}
set((state) => ({
operations: [...state.operations, queuedOp],
}))
get().processNextOperation()
}, 25) // 25ms debounce for variable operations - same as subblocks
variableDebounceTimeouts.set(debounceKey, timeoutId)
return
}
// Handle non-subblock operations (existing logic)
const state = get()
@@ -201,6 +256,20 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
}
}
// Clean up any debounce timeouts for variable operations
if (
operation?.operation.operation === 'variable-update' &&
operation.operation.target === 'variable'
) {
const { variableId, field } = operation.operation.payload
const debounceKey = `${variableId}-${field}`
const debounceTimeout = variableDebounceTimeouts.get(debounceKey)
if (debounceTimeout) {
clearTimeout(debounceTimeout)
variableDebounceTimeouts.delete(debounceKey)
}
}
logger.debug('Removing operation from queue', {
operationId,
remainingOps: newOperations.length,
@@ -240,6 +309,20 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
}
}
// Clean up any debounce timeouts for variable operations
if (
operation.operation.operation === 'variable-update' &&
operation.operation.target === 'variable'
) {
const { variableId, field } = operation.operation.payload
const debounceKey = `${variableId}-${field}`
const debounceTimeout = variableDebounceTimeouts.get(debounceKey)
if (debounceTimeout) {
clearTimeout(debounceTimeout)
variableDebounceTimeouts.delete(debounceKey)
}
}
if (!retryable) {
logger.debug('Operation marked as non-retryable, removing from queue', { operationId })
@@ -333,6 +416,10 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
if (emitSubblockUpdate) {
emitSubblockUpdate(payload.blockId, payload.subblockId, payload.value, nextOperation.id)
}
} else if (op === 'variable-update' && target === 'variable') {
if (emitVariableUpdate) {
emitVariableUpdate(payload.variableId, payload.field, payload.value, nextOperation.id)
}
} else {
if (emitWorkflowOperation) {
emitWorkflowOperation(op, target, payload, nextOperation.id)
@@ -411,6 +498,68 @@ export const useOperationQueueStore = create<OperationQueueState>((set, get) =>
get().processNextOperation()
},
cancelOperationsForVariable: (variableId: string) => {
logger.debug('Canceling all operations for variable', { variableId })
// Cancel all debounce timeouts for this variable
const keysToDelete: string[] = []
for (const [key, timeout] of variableDebounceTimeouts.entries()) {
if (key.startsWith(`${variableId}-`)) {
clearTimeout(timeout)
keysToDelete.push(key)
}
}
keysToDelete.forEach((key) => variableDebounceTimeouts.delete(key))
// Find and cancel operation timeouts for operations related to this variable
const state = get()
const operationsToCancel = state.operations.filter(
(op) =>
(op.operation.target === 'variable' && op.operation.payload?.variableId === variableId) ||
(op.operation.target === 'variable' &&
op.operation.payload?.sourceVariableId === variableId)
)
// Cancel timeouts for these operations
operationsToCancel.forEach((op) => {
const operationTimeout = operationTimeouts.get(op.id)
if (operationTimeout) {
clearTimeout(operationTimeout)
operationTimeouts.delete(op.id)
}
const retryTimeout = retryTimeouts.get(op.id)
if (retryTimeout) {
clearTimeout(retryTimeout)
retryTimeouts.delete(op.id)
}
})
// Remove all operations for this variable (both pending and processing)
const newOperations = state.operations.filter(
(op) =>
!(
(op.operation.target === 'variable' && op.operation.payload?.variableId === variableId) ||
(op.operation.target === 'variable' &&
op.operation.payload?.sourceVariableId === variableId)
)
)
set({
operations: newOperations,
isProcessing: false, // Reset processing state in case we removed the current operation
})
logger.debug('Cancelled operations for variable', {
variableId,
cancelledDebounceTimeouts: keysToDelete.length,
cancelledOperations: operationsToCancel.length,
})
// Process next operation if there are any remaining
get().processNextOperation()
},
triggerOfflineMode: () => {
logger.error('Operation failed after retries - triggering offline mode')
@@ -443,6 +592,7 @@ export function useOperationQueue() {
failOperation: store.failOperation,
processNextOperation: store.processNextOperation,
cancelOperationsForBlock: store.cancelOperationsForBlock,
cancelOperationsForVariable: store.cancelOperationsForVariable,
triggerOfflineMode: store.triggerOfflineMode,
clearError: store.clearError,
}

File diff suppressed because it is too large Load Diff

View File

@@ -26,8 +26,9 @@ export interface VariablesStore {
/**
* Adds a new variable with automatic name uniqueness validation
* If a variable with the same name exists, it will be suffixed with a number
* Optionally accepts a predetermined ID for collaborative operations
*/
addVariable: (variable: Omit<Variable, 'id'>) => string
addVariable: (variable: Omit<Variable, 'id'>, providedId?: string) => string
/**
* Updates a variable, ensuring name remains unique within the workflow
@@ -39,11 +40,11 @@ export interface VariablesStore {
/**
* Duplicates a variable with a "(copy)" suffix, ensuring name uniqueness
* Optionally accepts a predetermined ID for collaborative operations
*/
duplicateVariable: (id: string) => string
duplicateVariable: (id: string, providedId?: string) => string
loadVariables: (workflowId: string) => Promise<void>
saveVariables: (workflowId: string) => Promise<void>
/**
* Returns all variables for a specific workflow

View File

@@ -3,7 +3,7 @@ import { devtools } from 'zustand/middleware'
import { createLogger } from '@/lib/logs/console/logger'
import { generateCreativeWorkflowName } from '@/lib/naming'
import { API_ENDPOINTS } from '@/stores/constants'
import { clearWorkflowVariablesTracking } from '@/stores/panel/variables/store'
import { clearWorkflowVariablesTracking, useVariablesStore } from '@/stores/panel/variables/store'
import type {
DeploymentStatus,
WorkflowMetadata,
@@ -92,6 +92,7 @@ async function fetchWorkflowsFromDB(workspaceId?: string): Promise<void> {
description,
color,
state,
variables,
createdAt,
marketplaceData,
workspaceId,
@@ -143,6 +144,16 @@ async function fetchWorkflowsFromDB(workspaceId?: string): Promise<void> {
[id]: subblockValues,
},
}))
// Update variables store with workflow variables (if any)
if (variables && typeof variables === 'object') {
useVariablesStore.setState((state) => ({
variables: {
...state.variables,
...variables,
},
}))
}
})
// Update registry with loaded workflows and deployment statuses