From c720f23d9b45cb13cb79f4e8e0da97086d16ecb6 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 27 Aug 2025 17:11:39 -0700 Subject: [PATCH] fix(sockets): useCollabWorkflow cleanup, variables store logic simplification (#1154) * fix(sockets): useCollabWorkflow cleanup, variables store logic simplification * remove unecessary check --- .../panel/components/variables/variables.tsx | 11 +- .../[workspaceId]/w/[workflowId]/workflow.tsx | 16 +- apps/sim/components/ui/tag-dropdown.tsx | 7 - apps/sim/hooks/use-collaborative-workflow.ts | 97 +++++-- apps/sim/stores/index.ts | 2 +- apps/sim/stores/panel/variables/store.ts | 237 ------------------ apps/sim/stores/panel/variables/types.ts | 7 - apps/sim/stores/workflows/registry/store.ts | 20 +- 8 files changed, 93 insertions(+), 304 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/variables/variables.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/variables/variables.tsx index bec115663f..3540ad99ab 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/variables/variables.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/variables/variables.tsx @@ -38,15 +38,8 @@ import { useWorkflowRegistry } from '@/stores/workflows/registry/store' const logger = createLogger('Variables') export function Variables() { - const { activeWorkflowId, workflows } = useWorkflowRegistry() - const { - variables: storeVariables, - addVariable, - updateVariable, - deleteVariable, - duplicateVariable, - getVariablesByWorkflowId, - } = useVariablesStore() + const { activeWorkflowId } = useWorkflowRegistry() + const { getVariablesByWorkflowId } = useVariablesStore() const { collaborativeUpdateVariable, collaborativeAddVariable, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx index ef978fbc3e..de6a14878c 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx @@ -36,7 +36,6 @@ import { useStreamCleanup } from '@/hooks/use-stream-cleanup' import { useWorkspacePermissions } from '@/hooks/use-workspace-permissions' import { useCopilotStore } from '@/stores/copilot/store' import { useExecutionStore } from '@/stores/execution/store' -import { useVariablesStore } from '@/stores/panel/variables/store' import { useGeneralStore } from '@/stores/settings/general/store' import { useWorkflowDiffStore } from '@/stores/workflow-diff/store' import { hasWorkflowsInitiallyLoaded, useWorkflowRegistry } from '@/stores/workflows/registry/store' @@ -178,8 +177,6 @@ const WorkflowContent = React.memo(() => { collaborativeSetSubblockValue, } = useCollaborativeWorkflow() - const { resetLoaded: resetVariablesLoaded } = useVariablesStore() - // Execution and debug mode state const { activeBlockIds, pendingBlocks } = useExecutionStore() const { isDebugModeEnabled } = useGeneralStore() @@ -960,9 +957,6 @@ const WorkflowContent = React.memo(() => { const { activeWorkflowId } = useWorkflowRegistry.getState() if (activeWorkflowId !== currentId) { - // Only reset variables when actually switching workflows - resetVariablesLoaded() - // Clear workflow diff store when switching workflows const { clearDiff } = useWorkflowDiffStore.getState() clearDiff() @@ -975,15 +969,7 @@ const WorkflowContent = React.memo(() => { } validateAndNavigate() - }, [ - params.workflowId, - workflows, - isLoading, - setActiveWorkflow, - createWorkflow, - router, - resetVariablesLoaded, - ]) + }, [params.workflowId, workflows, isLoading, setActiveWorkflow, createWorkflow, router]) // Transform blocks and loops into ReactFlow nodes const nodes = useMemo(() => { diff --git a/apps/sim/components/ui/tag-dropdown.tsx b/apps/sim/components/ui/tag-dropdown.tsx index c1e43801a1..1ee1067d01 100644 --- a/apps/sim/components/ui/tag-dropdown.tsx +++ b/apps/sim/components/ui/tag-dropdown.tsx @@ -298,16 +298,9 @@ export const TagDropdown: React.FC = ({ const workflowId = useWorkflowRegistry((state) => state.activeWorkflowId) const getVariablesByWorkflowId = useVariablesStore((state) => state.getVariablesByWorkflowId) - const loadVariables = useVariablesStore((state) => state.loadVariables) const variables = useVariablesStore((state) => state.variables) const workflowVariables = workflowId ? getVariablesByWorkflowId(workflowId) : [] - useEffect(() => { - if (workflowId) { - loadVariables(workflowId) - } - }, [workflowId, loadVariables]) - const searchTerm = useMemo(() => { const textBeforeCursor = inputValue.slice(0, cursorPosition) const match = textBeforeCursor.match(/<([^>]*)$/) diff --git a/apps/sim/hooks/use-collaborative-workflow.ts b/apps/sim/hooks/use-collaborative-workflow.ts index 8fc59b6acf..c681f2d131 100644 --- a/apps/sim/hooks/use-collaborative-workflow.ts +++ b/apps/sim/hooks/use-collaborative-workflow.ts @@ -60,6 +60,10 @@ export function useCollaborativeWorkflow() { cancelOperationsForVariable, } = useOperationQueue() + const isInActiveRoom = useCallback(() => { + return !!currentWorkflowId && activeWorkflowId === currentWorkflowId + }, [currentWorkflowId, activeWorkflowId]) + // Clear position timestamps when switching workflows // Note: Workflow joining is now handled automatically by socket connect event based on URL useEffect(() => { @@ -470,6 +474,16 @@ export function useCollaborativeWorkflow() { return } + if (!isInActiveRoom()) { + logger.debug('Skipping operation - not in active workflow', { + currentWorkflowId, + activeWorkflowId, + operation, + target, + }) + return + } + const operationId = crypto.randomUUID() addToQueue({ @@ -485,7 +499,14 @@ export function useCollaborativeWorkflow() { localAction() }, - [addToQueue, session?.user?.id, isShowingDiff] + [ + addToQueue, + session?.user?.id, + isShowingDiff, + activeWorkflowId, + isInActiveRoom, + currentWorkflowId, + ] ) const executeQueuedDebouncedOperation = useCallback( @@ -497,11 +518,21 @@ export function useCollaborativeWorkflow() { return } + if (!isInActiveRoom()) { + logger.debug('Skipping debounced operation - not in active workflow', { + currentWorkflowId, + activeWorkflowId, + operation, + target, + }) + return + } + localAction() emitWorkflowOperation(operation, target, payload) }, - [emitWorkflowOperation, isShowingDiff] + [emitWorkflowOperation, isShowingDiff, isInActiveRoom, currentWorkflowId, activeWorkflowId] ) const collaborativeAddBlock = useCallback( @@ -521,6 +552,14 @@ export function useCollaborativeWorkflow() { return } + if (!isInActiveRoom()) { + logger.debug('Skipping collaborative add block - not in active workflow', { + currentWorkflowId, + activeWorkflowId, + }) + return + } + const blockConfig = getBlock(type) // Handle loop/parallel blocks that don't use BlockConfig @@ -647,7 +686,15 @@ export function useCollaborativeWorkflow() { workflowStore.addEdge(autoConnectEdge) } }, - [workflowStore, emitWorkflowOperation, addToQueue, session?.user?.id, isShowingDiff] + [ + workflowStore, + activeWorkflowId, + addToQueue, + session?.user?.id, + isShowingDiff, + isInActiveRoom, + currentWorkflowId, + ] ) const collaborativeRemoveBlock = useCallback( @@ -674,14 +721,7 @@ export function useCollaborativeWorkflow() { workflowStore.updateBlockName(id, name) }) }, - [ - executeQueuedOperation, - workflowStore, - addToQueue, - subBlockStore, - activeWorkflowId, - session?.user?.id, - ] + [executeQueuedOperation, workflowStore] ) const collaborativeToggleBlockEnabled = useCallback( @@ -795,7 +835,7 @@ export function useCollaborativeWorkflow() { return } - if (!currentWorkflowId || activeWorkflowId !== currentWorkflowId) { + if (!isInActiveRoom()) { logger.debug('Skipping subblock update - not in active workflow', { currentWorkflowId, activeWorkflowId, @@ -847,12 +887,12 @@ export function useCollaborativeWorkflow() { }, [ subBlockStore, - emitSubblockUpdate, currentWorkflowId, activeWorkflowId, addToQueue, session?.user?.id, isShowingDiff, + isInActiveRoom, ] ) @@ -861,7 +901,7 @@ export function useCollaborativeWorkflow() { (blockId: string, subblockId: string, value: any) => { if (isApplyingRemoteChange.current) return - if (!currentWorkflowId || activeWorkflowId !== currentWorkflowId) { + if (!isInActiveRoom()) { logger.debug('Skipping tag selection - not in active workflow', { currentWorkflowId, activeWorkflowId, @@ -884,16 +924,32 @@ export function useCollaborativeWorkflow() { target: 'subblock', payload: { blockId, subblockId, value }, }, - workflowId: activeWorkflowId, + workflowId: activeWorkflowId || '', userId: session?.user?.id || 'unknown', immediate: true, }) }, - [subBlockStore, addToQueue, currentWorkflowId, activeWorkflowId, session?.user?.id] + [ + subBlockStore, + addToQueue, + currentWorkflowId, + activeWorkflowId, + session?.user?.id, + isInActiveRoom, + ] ) const collaborativeDuplicateBlock = useCallback( (sourceId: string) => { + if (!isInActiveRoom()) { + logger.debug('Skipping duplicate block - not in active workflow', { + currentWorkflowId, + activeWorkflowId, + sourceId, + }) + return + } + const sourceBlock = workflowStore.blocks[sourceId] if (!sourceBlock) return @@ -995,7 +1051,14 @@ export function useCollaborativeWorkflow() { } }) }, - [executeQueuedOperation, workflowStore, subBlockStore, activeWorkflowId] + [ + executeQueuedOperation, + workflowStore, + subBlockStore, + activeWorkflowId, + isInActiveRoom, + currentWorkflowId, + ] ) const collaborativeUpdateLoopType = useCallback( diff --git a/apps/sim/stores/index.ts b/apps/sim/stores/index.ts index a96cc6be83..4e44ab5143 100644 --- a/apps/sim/stores/index.ts +++ b/apps/sim/stores/index.ts @@ -231,7 +231,7 @@ export const resetAllStores = () => { useConsoleStore.setState({ entries: [], isOpen: false }) useCopilotStore.setState({ messages: [], isSendingMessage: false, error: null }) useCustomToolsStore.setState({ tools: {} }) - useVariablesStore.getState().resetLoaded() // Reset variables store tracking + // Variables store has no tracking to reset; registry hydrates useSubscriptionStore.getState().reset() // Reset subscription store } diff --git a/apps/sim/stores/panel/variables/store.ts b/apps/sim/stores/panel/variables/store.ts index 5fdbfd82de..ceb4d35bd7 100644 --- a/apps/sim/stores/panel/variables/store.ts +++ b/apps/sim/stores/panel/variables/store.ts @@ -1,30 +1,11 @@ import { create } from 'zustand' import { devtools } from 'zustand/middleware' import { createLogger } from '@/lib/logs/console/logger' -import { API_ENDPOINTS } from '@/stores/constants' import type { Variable, VariablesStore } from '@/stores/panel/variables/types' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useSubBlockStore } from '@/stores/workflows/subblock/store' const logger = createLogger('VariablesStore') -// Track which workflows have already been loaded -const loadedWorkflows = new Set() -// Track recently added variable IDs with timestamps -const recentlyAddedVariables = new Map() -// Time window in ms to consider a variable as "recently added" (3 seconds) -const RECENT_VARIABLE_WINDOW = 3000 - -// Clear a workspace from the loaded tracking when switching workspaces -export function clearWorkflowVariablesTracking() { - loadedWorkflows.clear() - // Also clear any old entries from recentlyAddedVariables - const now = Date.now() - recentlyAddedVariables.forEach((timestamp, id) => { - if (now - timestamp > RECENT_VARIABLE_WINDOW * 2) { - recentlyAddedVariables.delete(id) - } - }) -} /** * Check if variable format is valid according to type without modifying it @@ -169,9 +150,6 @@ export const useVariablesStore = create()( newVariable.validationError = validationError } - // Mark this variable as recently added with current timestamp - recentlyAddedVariables.set(id, Date.now()) - set((state) => ({ variables: { ...state.variables, @@ -335,9 +313,6 @@ export const useVariablesStore = create()( nameIndex++ } - // Mark this duplicated variable as recently added - recentlyAddedVariables.set(newId, Date.now()) - set((state) => ({ variables: { ...state.variables, @@ -354,220 +329,8 @@ export const useVariablesStore = create()( return newId }, - loadVariables: async (workflowId) => { - // Skip if already loaded to prevent redundant API calls, but ensure - // we check for the special case of recently added variables first - if (loadedWorkflows.has(workflowId)) { - // Even if workflow is loaded, check if we have recent variables to protect - const workflowVariables = Object.values(get().variables).filter( - (v) => v.workflowId === workflowId - ) - - const now = Date.now() - const hasRecentVariables = workflowVariables.some( - (v) => - recentlyAddedVariables.has(v.id) && - now - (recentlyAddedVariables.get(v.id) || 0) < RECENT_VARIABLE_WINDOW - ) - - // No force reload needed if no recent variables and we've already loaded - if (!hasRecentVariables) { - return - } - - // Otherwise continue and do a full load+merge to protect recent variables - } - - try { - set({ isLoading: true, error: null }) - - const response = await fetch(`${API_ENDPOINTS.WORKFLOWS}/${workflowId}/variables`) - - // Capture current variables for this workflow before we modify anything - const currentWorkflowVariables = Object.values(get().variables) - .filter((v) => v.workflowId === workflowId) - .reduce( - (acc, v) => { - acc[v.id] = v - return acc - }, - {} as Record - ) - - // Check which variables were recently added (within the last few seconds) - const now = Date.now() - const protectedVariableIds = new Set() - - // Identify variables that should be protected from being overwritten - Object.keys(currentWorkflowVariables).forEach((id) => { - // Protect recently added variables - if ( - recentlyAddedVariables.has(id) && - now - (recentlyAddedVariables.get(id) || 0) < RECENT_VARIABLE_WINDOW - ) { - protectedVariableIds.add(id) - } - }) - - // Handle 404 workflow not found gracefully - if (response.status === 404) { - logger.info(`No variables found for workflow ${workflowId}, initializing empty set`) - set((state) => { - // Keep variables from other workflows - const otherVariables = Object.values(state.variables).reduce( - (acc, variable) => { - if (variable.workflowId !== workflowId) { - acc[variable.id] = variable - } - return acc - }, - {} as Record - ) - - // Add back protected variables that should not be removed - Object.keys(currentWorkflowVariables).forEach((id) => { - if (protectedVariableIds.has(id)) { - otherVariables[id] = currentWorkflowVariables[id] - } - }) - - // Mark this workflow as loaded to prevent further attempts - loadedWorkflows.add(workflowId) - - return { - variables: otherVariables, - isLoading: false, - } - }) - return - } - - // Handle unauthorized (401) or forbidden (403) gracefully - if (response.status === 401 || response.status === 403) { - logger.warn(`No permission to access variables for workflow ${workflowId}`) - set((state) => { - // Keep variables from other workflows - const otherVariables = Object.values(state.variables).reduce( - (acc, variable) => { - if (variable.workflowId !== workflowId) { - acc[variable.id] = variable - } - return acc - }, - {} as Record - ) - - // Mark this workflow as loaded but with access issues - loadedWorkflows.add(workflowId) - - return { - variables: otherVariables, - isLoading: false, - error: 'You do not have permission to access these variables', - } - }) - return - } - - if (!response.ok) { - throw new Error(`Failed to load workflow variables: ${response.statusText}`) - } - - const { data } = await response.json() - - if (data && typeof data === 'object') { - set((state) => { - // Migrate any 'string' type variables to 'plain' - const migratedData: Record = {} - for (const [id, variable] of Object.entries(data)) { - migratedData[id] = migrateStringToPlain(variable as Variable) - } - - // Merge with existing variables from other workflows - const otherVariables = Object.values(state.variables).reduce( - (acc, variable) => { - if (variable.workflowId !== workflowId) { - acc[variable.id] = variable - } - return acc - }, - {} as Record - ) - - // Create the final variables object, prioritizing protected variables - const finalVariables = { ...otherVariables, ...migratedData } - - // Restore any protected variables that shouldn't be overwritten - Object.keys(currentWorkflowVariables).forEach((id) => { - if (protectedVariableIds.has(id)) { - finalVariables[id] = currentWorkflowVariables[id] - } - }) - - // Mark this workflow as loaded - loadedWorkflows.add(workflowId) - - return { - variables: finalVariables, - isLoading: false, - } - }) - } else { - set((state) => { - // Keep variables from other workflows - const otherVariables = Object.values(state.variables).reduce( - (acc, variable) => { - if (variable.workflowId !== workflowId) { - acc[variable.id] = variable - } - return acc - }, - {} as Record - ) - - // Add back protected variables that should not be removed - Object.keys(currentWorkflowVariables).forEach((id) => { - if (protectedVariableIds.has(id)) { - otherVariables[id] = currentWorkflowVariables[id] - } - }) - - // Mark this workflow as loaded - loadedWorkflows.add(workflowId) - - return { - variables: otherVariables, - isLoading: false, - } - }) - } - } catch (error) { - logger.error('Error loading workflow variables:', { - error, - workflowId, - }) - set({ - error: error instanceof Error ? error.message : 'Unknown error', - isLoading: false, - }) - } - }, - getVariablesByWorkflowId: (workflowId) => { return Object.values(get().variables).filter((variable) => variable.workflowId === workflowId) }, - - // Reset the loaded workflow tracking - resetLoaded: () => { - loadedWorkflows.clear() - - // Clean up stale entries from recentlyAddedVariables - const now = Date.now() - recentlyAddedVariables.forEach((timestamp, id) => { - if (now - timestamp > RECENT_VARIABLE_WINDOW * 2) { - recentlyAddedVariables.delete(id) - } - }) - }, })) ) diff --git a/apps/sim/stores/panel/variables/types.ts b/apps/sim/stores/panel/variables/types.ts index 7ab220efd9..a27138feb5 100644 --- a/apps/sim/stores/panel/variables/types.ts +++ b/apps/sim/stores/panel/variables/types.ts @@ -44,15 +44,8 @@ export interface VariablesStore { */ duplicateVariable: (id: string, providedId?: string) => string - loadVariables: (workflowId: string) => Promise - /** * Returns all variables for a specific workflow */ getVariablesByWorkflowId: (workflowId: string) => Variable[] - - /** - * Resets tracking of loaded workflows - */ - resetLoaded: () => void } diff --git a/apps/sim/stores/workflows/registry/store.ts b/apps/sim/stores/workflows/registry/store.ts index cbb352c730..465e6bc742 100644 --- a/apps/sim/stores/workflows/registry/store.ts +++ b/apps/sim/stores/workflows/registry/store.ts @@ -3,7 +3,7 @@ import { devtools } from 'zustand/middleware' import { createLogger } from '@/lib/logs/console/logger' import { generateCreativeWorkflowName } from '@/lib/naming' import { API_ENDPOINTS } from '@/stores/constants' -import { clearWorkflowVariablesTracking, useVariablesStore } from '@/stores/panel/variables/store' +import { useVariablesStore } from '@/stores/panel/variables/store' import type { DeploymentStatus, WorkflowMetadata, @@ -145,14 +145,15 @@ async function fetchWorkflowsFromDB(workspaceId?: string): Promise { }, })) - // Update variables store with workflow variables (if any) if (variables && typeof variables === 'object') { - useVariablesStore.setState((state) => ({ - variables: { - ...state.variables, - ...variables, - }, - })) + useVariablesStore.setState((state) => { + const withoutWorkflow = Object.fromEntries( + Object.entries(state.variables).filter(([, v]: any) => v.workflowId !== id) + ) + return { + variables: { ...withoutWorkflow, ...variables }, + } + }) } }) @@ -201,9 +202,6 @@ const TRANSITION_TIMEOUT = 5000 // 5 seconds maximum for workspace transitions // Resets workflow and subblock stores to prevent data leakage between workspaces function resetWorkflowStores() { - // Reset variable tracking to prevent stale API calls - clearWorkflowVariablesTracking() - // Reset the workflow store to prevent data leakage between workspaces useWorkflowStore.setState({ blocks: {},