fix(sockets): useCollabWorkflow cleanup, variables store logic simplification (#1154)

* fix(sockets): useCollabWorkflow cleanup, variables store logic simplification

* remove unecessary check
This commit is contained in:
Vikhyath Mondreti
2025-08-27 17:11:39 -07:00
committed by GitHub
parent 89f7d2b943
commit c720f23d9b
8 changed files with 93 additions and 304 deletions

View File

@@ -38,15 +38,8 @@ import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
const logger = createLogger('Variables')
export function Variables() {
const { activeWorkflowId, workflows } = useWorkflowRegistry()
const {
variables: storeVariables,
addVariable,
updateVariable,
deleteVariable,
duplicateVariable,
getVariablesByWorkflowId,
} = useVariablesStore()
const { activeWorkflowId } = useWorkflowRegistry()
const { getVariablesByWorkflowId } = useVariablesStore()
const {
collaborativeUpdateVariable,
collaborativeAddVariable,

View File

@@ -36,7 +36,6 @@ import { useStreamCleanup } from '@/hooks/use-stream-cleanup'
import { useWorkspacePermissions } from '@/hooks/use-workspace-permissions'
import { useCopilotStore } from '@/stores/copilot/store'
import { useExecutionStore } from '@/stores/execution/store'
import { useVariablesStore } from '@/stores/panel/variables/store'
import { useGeneralStore } from '@/stores/settings/general/store'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import { hasWorkflowsInitiallyLoaded, useWorkflowRegistry } from '@/stores/workflows/registry/store'
@@ -178,8 +177,6 @@ const WorkflowContent = React.memo(() => {
collaborativeSetSubblockValue,
} = useCollaborativeWorkflow()
const { resetLoaded: resetVariablesLoaded } = useVariablesStore()
// Execution and debug mode state
const { activeBlockIds, pendingBlocks } = useExecutionStore()
const { isDebugModeEnabled } = useGeneralStore()
@@ -960,9 +957,6 @@ const WorkflowContent = React.memo(() => {
const { activeWorkflowId } = useWorkflowRegistry.getState()
if (activeWorkflowId !== currentId) {
// Only reset variables when actually switching workflows
resetVariablesLoaded()
// Clear workflow diff store when switching workflows
const { clearDiff } = useWorkflowDiffStore.getState()
clearDiff()
@@ -975,15 +969,7 @@ const WorkflowContent = React.memo(() => {
}
validateAndNavigate()
}, [
params.workflowId,
workflows,
isLoading,
setActiveWorkflow,
createWorkflow,
router,
resetVariablesLoaded,
])
}, [params.workflowId, workflows, isLoading, setActiveWorkflow, createWorkflow, router])
// Transform blocks and loops into ReactFlow nodes
const nodes = useMemo(() => {

View File

@@ -298,16 +298,9 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
const workflowId = useWorkflowRegistry((state) => state.activeWorkflowId)
const getVariablesByWorkflowId = useVariablesStore((state) => state.getVariablesByWorkflowId)
const loadVariables = useVariablesStore((state) => state.loadVariables)
const variables = useVariablesStore((state) => state.variables)
const workflowVariables = workflowId ? getVariablesByWorkflowId(workflowId) : []
useEffect(() => {
if (workflowId) {
loadVariables(workflowId)
}
}, [workflowId, loadVariables])
const searchTerm = useMemo(() => {
const textBeforeCursor = inputValue.slice(0, cursorPosition)
const match = textBeforeCursor.match(/<([^>]*)$/)

View File

@@ -60,6 +60,10 @@ export function useCollaborativeWorkflow() {
cancelOperationsForVariable,
} = useOperationQueue()
const isInActiveRoom = useCallback(() => {
return !!currentWorkflowId && activeWorkflowId === currentWorkflowId
}, [currentWorkflowId, activeWorkflowId])
// Clear position timestamps when switching workflows
// Note: Workflow joining is now handled automatically by socket connect event based on URL
useEffect(() => {
@@ -470,6 +474,16 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping operation - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
operation,
target,
})
return
}
const operationId = crypto.randomUUID()
addToQueue({
@@ -485,7 +499,14 @@ export function useCollaborativeWorkflow() {
localAction()
},
[addToQueue, session?.user?.id, isShowingDiff]
[
addToQueue,
session?.user?.id,
isShowingDiff,
activeWorkflowId,
isInActiveRoom,
currentWorkflowId,
]
)
const executeQueuedDebouncedOperation = useCallback(
@@ -497,11 +518,21 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping debounced operation - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
operation,
target,
})
return
}
localAction()
emitWorkflowOperation(operation, target, payload)
},
[emitWorkflowOperation, isShowingDiff]
[emitWorkflowOperation, isShowingDiff, isInActiveRoom, currentWorkflowId, activeWorkflowId]
)
const collaborativeAddBlock = useCallback(
@@ -521,6 +552,14 @@ export function useCollaborativeWorkflow() {
return
}
if (!isInActiveRoom()) {
logger.debug('Skipping collaborative add block - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
})
return
}
const blockConfig = getBlock(type)
// Handle loop/parallel blocks that don't use BlockConfig
@@ -647,7 +686,15 @@ export function useCollaborativeWorkflow() {
workflowStore.addEdge(autoConnectEdge)
}
},
[workflowStore, emitWorkflowOperation, addToQueue, session?.user?.id, isShowingDiff]
[
workflowStore,
activeWorkflowId,
addToQueue,
session?.user?.id,
isShowingDiff,
isInActiveRoom,
currentWorkflowId,
]
)
const collaborativeRemoveBlock = useCallback(
@@ -674,14 +721,7 @@ export function useCollaborativeWorkflow() {
workflowStore.updateBlockName(id, name)
})
},
[
executeQueuedOperation,
workflowStore,
addToQueue,
subBlockStore,
activeWorkflowId,
session?.user?.id,
]
[executeQueuedOperation, workflowStore]
)
const collaborativeToggleBlockEnabled = useCallback(
@@ -795,7 +835,7 @@ export function useCollaborativeWorkflow() {
return
}
if (!currentWorkflowId || activeWorkflowId !== currentWorkflowId) {
if (!isInActiveRoom()) {
logger.debug('Skipping subblock update - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
@@ -847,12 +887,12 @@ export function useCollaborativeWorkflow() {
},
[
subBlockStore,
emitSubblockUpdate,
currentWorkflowId,
activeWorkflowId,
addToQueue,
session?.user?.id,
isShowingDiff,
isInActiveRoom,
]
)
@@ -861,7 +901,7 @@ export function useCollaborativeWorkflow() {
(blockId: string, subblockId: string, value: any) => {
if (isApplyingRemoteChange.current) return
if (!currentWorkflowId || activeWorkflowId !== currentWorkflowId) {
if (!isInActiveRoom()) {
logger.debug('Skipping tag selection - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
@@ -884,16 +924,32 @@ export function useCollaborativeWorkflow() {
target: 'subblock',
payload: { blockId, subblockId, value },
},
workflowId: activeWorkflowId,
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
immediate: true,
})
},
[subBlockStore, addToQueue, currentWorkflowId, activeWorkflowId, session?.user?.id]
[
subBlockStore,
addToQueue,
currentWorkflowId,
activeWorkflowId,
session?.user?.id,
isInActiveRoom,
]
)
const collaborativeDuplicateBlock = useCallback(
(sourceId: string) => {
if (!isInActiveRoom()) {
logger.debug('Skipping duplicate block - not in active workflow', {
currentWorkflowId,
activeWorkflowId,
sourceId,
})
return
}
const sourceBlock = workflowStore.blocks[sourceId]
if (!sourceBlock) return
@@ -995,7 +1051,14 @@ export function useCollaborativeWorkflow() {
}
})
},
[executeQueuedOperation, workflowStore, subBlockStore, activeWorkflowId]
[
executeQueuedOperation,
workflowStore,
subBlockStore,
activeWorkflowId,
isInActiveRoom,
currentWorkflowId,
]
)
const collaborativeUpdateLoopType = useCallback(

View File

@@ -231,7 +231,7 @@ export const resetAllStores = () => {
useConsoleStore.setState({ entries: [], isOpen: false })
useCopilotStore.setState({ messages: [], isSendingMessage: false, error: null })
useCustomToolsStore.setState({ tools: {} })
useVariablesStore.getState().resetLoaded() // Reset variables store tracking
// Variables store has no tracking to reset; registry hydrates
useSubscriptionStore.getState().reset() // Reset subscription store
}

View File

@@ -1,30 +1,11 @@
import { create } from 'zustand'
import { devtools } from 'zustand/middleware'
import { createLogger } from '@/lib/logs/console/logger'
import { API_ENDPOINTS } from '@/stores/constants'
import type { Variable, VariablesStore } from '@/stores/panel/variables/types'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
const logger = createLogger('VariablesStore')
// Track which workflows have already been loaded
const loadedWorkflows = new Set<string>()
// Track recently added variable IDs with timestamps
const recentlyAddedVariables = new Map<string, number>()
// Time window in ms to consider a variable as "recently added" (3 seconds)
const RECENT_VARIABLE_WINDOW = 3000
// Clear a workspace from the loaded tracking when switching workspaces
export function clearWorkflowVariablesTracking() {
loadedWorkflows.clear()
// Also clear any old entries from recentlyAddedVariables
const now = Date.now()
recentlyAddedVariables.forEach((timestamp, id) => {
if (now - timestamp > RECENT_VARIABLE_WINDOW * 2) {
recentlyAddedVariables.delete(id)
}
})
}
/**
* Check if variable format is valid according to type without modifying it
@@ -169,9 +150,6 @@ export const useVariablesStore = create<VariablesStore>()(
newVariable.validationError = validationError
}
// Mark this variable as recently added with current timestamp
recentlyAddedVariables.set(id, Date.now())
set((state) => ({
variables: {
...state.variables,
@@ -335,9 +313,6 @@ export const useVariablesStore = create<VariablesStore>()(
nameIndex++
}
// Mark this duplicated variable as recently added
recentlyAddedVariables.set(newId, Date.now())
set((state) => ({
variables: {
...state.variables,
@@ -354,220 +329,8 @@ export const useVariablesStore = create<VariablesStore>()(
return newId
},
loadVariables: async (workflowId) => {
// Skip if already loaded to prevent redundant API calls, but ensure
// we check for the special case of recently added variables first
if (loadedWorkflows.has(workflowId)) {
// Even if workflow is loaded, check if we have recent variables to protect
const workflowVariables = Object.values(get().variables).filter(
(v) => v.workflowId === workflowId
)
const now = Date.now()
const hasRecentVariables = workflowVariables.some(
(v) =>
recentlyAddedVariables.has(v.id) &&
now - (recentlyAddedVariables.get(v.id) || 0) < RECENT_VARIABLE_WINDOW
)
// No force reload needed if no recent variables and we've already loaded
if (!hasRecentVariables) {
return
}
// Otherwise continue and do a full load+merge to protect recent variables
}
try {
set({ isLoading: true, error: null })
const response = await fetch(`${API_ENDPOINTS.WORKFLOWS}/${workflowId}/variables`)
// Capture current variables for this workflow before we modify anything
const currentWorkflowVariables = Object.values(get().variables)
.filter((v) => v.workflowId === workflowId)
.reduce(
(acc, v) => {
acc[v.id] = v
return acc
},
{} as Record<string, Variable>
)
// Check which variables were recently added (within the last few seconds)
const now = Date.now()
const protectedVariableIds = new Set<string>()
// Identify variables that should be protected from being overwritten
Object.keys(currentWorkflowVariables).forEach((id) => {
// Protect recently added variables
if (
recentlyAddedVariables.has(id) &&
now - (recentlyAddedVariables.get(id) || 0) < RECENT_VARIABLE_WINDOW
) {
protectedVariableIds.add(id)
}
})
// Handle 404 workflow not found gracefully
if (response.status === 404) {
logger.info(`No variables found for workflow ${workflowId}, initializing empty set`)
set((state) => {
// Keep variables from other workflows
const otherVariables = Object.values(state.variables).reduce(
(acc, variable) => {
if (variable.workflowId !== workflowId) {
acc[variable.id] = variable
}
return acc
},
{} as Record<string, Variable>
)
// Add back protected variables that should not be removed
Object.keys(currentWorkflowVariables).forEach((id) => {
if (protectedVariableIds.has(id)) {
otherVariables[id] = currentWorkflowVariables[id]
}
})
// Mark this workflow as loaded to prevent further attempts
loadedWorkflows.add(workflowId)
return {
variables: otherVariables,
isLoading: false,
}
})
return
}
// Handle unauthorized (401) or forbidden (403) gracefully
if (response.status === 401 || response.status === 403) {
logger.warn(`No permission to access variables for workflow ${workflowId}`)
set((state) => {
// Keep variables from other workflows
const otherVariables = Object.values(state.variables).reduce(
(acc, variable) => {
if (variable.workflowId !== workflowId) {
acc[variable.id] = variable
}
return acc
},
{} as Record<string, Variable>
)
// Mark this workflow as loaded but with access issues
loadedWorkflows.add(workflowId)
return {
variables: otherVariables,
isLoading: false,
error: 'You do not have permission to access these variables',
}
})
return
}
if (!response.ok) {
throw new Error(`Failed to load workflow variables: ${response.statusText}`)
}
const { data } = await response.json()
if (data && typeof data === 'object') {
set((state) => {
// Migrate any 'string' type variables to 'plain'
const migratedData: Record<string, Variable> = {}
for (const [id, variable] of Object.entries(data)) {
migratedData[id] = migrateStringToPlain(variable as Variable)
}
// Merge with existing variables from other workflows
const otherVariables = Object.values(state.variables).reduce(
(acc, variable) => {
if (variable.workflowId !== workflowId) {
acc[variable.id] = variable
}
return acc
},
{} as Record<string, Variable>
)
// Create the final variables object, prioritizing protected variables
const finalVariables = { ...otherVariables, ...migratedData }
// Restore any protected variables that shouldn't be overwritten
Object.keys(currentWorkflowVariables).forEach((id) => {
if (protectedVariableIds.has(id)) {
finalVariables[id] = currentWorkflowVariables[id]
}
})
// Mark this workflow as loaded
loadedWorkflows.add(workflowId)
return {
variables: finalVariables,
isLoading: false,
}
})
} else {
set((state) => {
// Keep variables from other workflows
const otherVariables = Object.values(state.variables).reduce(
(acc, variable) => {
if (variable.workflowId !== workflowId) {
acc[variable.id] = variable
}
return acc
},
{} as Record<string, Variable>
)
// Add back protected variables that should not be removed
Object.keys(currentWorkflowVariables).forEach((id) => {
if (protectedVariableIds.has(id)) {
otherVariables[id] = currentWorkflowVariables[id]
}
})
// Mark this workflow as loaded
loadedWorkflows.add(workflowId)
return {
variables: otherVariables,
isLoading: false,
}
})
}
} catch (error) {
logger.error('Error loading workflow variables:', {
error,
workflowId,
})
set({
error: error instanceof Error ? error.message : 'Unknown error',
isLoading: false,
})
}
},
getVariablesByWorkflowId: (workflowId) => {
return Object.values(get().variables).filter((variable) => variable.workflowId === workflowId)
},
// Reset the loaded workflow tracking
resetLoaded: () => {
loadedWorkflows.clear()
// Clean up stale entries from recentlyAddedVariables
const now = Date.now()
recentlyAddedVariables.forEach((timestamp, id) => {
if (now - timestamp > RECENT_VARIABLE_WINDOW * 2) {
recentlyAddedVariables.delete(id)
}
})
},
}))
)

View File

@@ -44,15 +44,8 @@ export interface VariablesStore {
*/
duplicateVariable: (id: string, providedId?: string) => string
loadVariables: (workflowId: string) => Promise<void>
/**
* Returns all variables for a specific workflow
*/
getVariablesByWorkflowId: (workflowId: string) => Variable[]
/**
* Resets tracking of loaded workflows
*/
resetLoaded: () => void
}

View File

@@ -3,7 +3,7 @@ import { devtools } from 'zustand/middleware'
import { createLogger } from '@/lib/logs/console/logger'
import { generateCreativeWorkflowName } from '@/lib/naming'
import { API_ENDPOINTS } from '@/stores/constants'
import { clearWorkflowVariablesTracking, useVariablesStore } from '@/stores/panel/variables/store'
import { useVariablesStore } from '@/stores/panel/variables/store'
import type {
DeploymentStatus,
WorkflowMetadata,
@@ -145,14 +145,15 @@ async function fetchWorkflowsFromDB(workspaceId?: string): Promise<void> {
},
}))
// Update variables store with workflow variables (if any)
if (variables && typeof variables === 'object') {
useVariablesStore.setState((state) => ({
variables: {
...state.variables,
...variables,
},
}))
useVariablesStore.setState((state) => {
const withoutWorkflow = Object.fromEntries(
Object.entries(state.variables).filter(([, v]: any) => v.workflowId !== id)
)
return {
variables: { ...withoutWorkflow, ...variables },
}
})
}
})
@@ -201,9 +202,6 @@ const TRANSITION_TIMEOUT = 5000 // 5 seconds maximum for workspace transitions
// Resets workflow and subblock stores to prevent data leakage between workspaces
function resetWorkflowStores() {
// Reset variable tracking to prevent stale API calls
clearWorkflowVariablesTracking()
// Reset the workflow store to prevent data leakage between workspaces
useWorkflowStore.setState({
blocks: {},