improvement(mothership): workflow edits via sockets (#3927)

* improvement(mothership): workflow edits via sockets

* make embedded view join room

* fix cursor positioning bug
This commit is contained in:
Vikhyath Mondreti
2026-04-03 18:44:14 -07:00
committed by GitHub
parent 98fe4cd40b
commit a0796f088b
4 changed files with 126 additions and 73 deletions

View File

@@ -265,7 +265,7 @@ const WorkflowContent = React.memo(
const { fitViewToBounds, getViewportCenter } = useCanvasViewport(reactFlowInstance, {
embedded,
})
const { emitCursorUpdate } = useSocket()
const { emitCursorUpdate, joinWorkflow, leaveWorkflow } = useSocket()
useDynamicHandleRefresh()
const workspaceId = propWorkspaceId || (params.workspaceId as string)
@@ -273,6 +273,14 @@ const WorkflowContent = React.memo(
const addNotification = useNotificationStore((state) => state.addNotification)
useEffect(() => {
if (!embedded || !workflowIdParam) return
joinWorkflow(workflowIdParam)
return () => {
leaveWorkflow()
}
}, [embedded, workflowIdParam, joinWorkflow, leaveWorkflow])
useOAuthReturnForWorkflow(workflowIdParam)
const {
@@ -2144,12 +2152,9 @@ const WorkflowContent = React.memo(
const handleCanvasPointerMove = useCallback(
(event: React.PointerEvent<Element>) => {
const target = event.currentTarget as HTMLElement
const bounds = target.getBoundingClientRect()
const position = screenToFlowPosition({
x: event.clientX - bounds.left,
y: event.clientY - bounds.top,
x: event.clientX,
y: event.clientY,
})
emitCursorUpdate(position)

View File

@@ -90,6 +90,7 @@ interface SocketContextType {
onSelectionUpdate: (handler: (data: any) => void) => void
onWorkflowDeleted: (handler: (data: any) => void) => void
onWorkflowReverted: (handler: (data: any) => void) => void
onWorkflowUpdated: (handler: (data: any) => void) => void
onOperationConfirmed: (handler: (data: any) => void) => void
onOperationFailed: (handler: (data: any) => void) => void
}
@@ -118,6 +119,7 @@ const SocketContext = createContext<SocketContextType>({
onSelectionUpdate: () => {},
onWorkflowDeleted: () => {},
onWorkflowReverted: () => {},
onWorkflowUpdated: () => {},
onOperationConfirmed: () => {},
onOperationFailed: () => {},
})
@@ -155,6 +157,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
selectionUpdate?: (data: any) => void
workflowDeleted?: (data: any) => void
workflowReverted?: (data: any) => void
workflowUpdated?: (data: any) => void
operationConfirmed?: (data: any) => void
operationFailed?: (data: any) => void
}>({})
@@ -334,7 +337,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => {
isRejoiningRef.current = false
// Ignore stale success responses from previous navigation
if (workflowId !== urlWorkflowIdRef.current) {
if (urlWorkflowIdRef.current && workflowId !== urlWorkflowIdRef.current) {
logger.debug(`Ignoring stale join-workflow-success for ${workflowId}`)
return
}
@@ -382,6 +385,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.workflowReverted?.(data)
})
socketInstance.on('workflow-updated', (data) => {
logger.info(`Workflow ${data.workflowId} has been updated externally`)
eventHandlers.current.workflowUpdated?.(data)
})
const rehydrateWorkflowStores = async (workflowId: string, workflowState: any) => {
const [
{ useOperationQueueStore },
@@ -803,6 +811,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.workflowReverted = handler
}, [])
const onWorkflowUpdated = useCallback((handler: (data: any) => void) => {
eventHandlers.current.workflowUpdated = handler
}, [])
const onOperationConfirmed = useCallback((handler: (data: any) => void) => {
eventHandlers.current.operationConfirmed = handler
}, [])
@@ -836,6 +848,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onSelectionUpdate,
onWorkflowDeleted,
onWorkflowReverted,
onWorkflowUpdated,
onOperationConfirmed,
onOperationFailed,
}),
@@ -863,6 +876,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onSelectionUpdate,
onWorkflowDeleted,
onWorkflowReverted,
onWorkflowUpdated,
onOperationConfirmed,
onOperationFailed,
]

View File

@@ -123,6 +123,7 @@ export function useCollaborativeWorkflow() {
onVariableUpdate,
onWorkflowDeleted,
onWorkflowReverted,
onWorkflowUpdated,
onOperationConfirmed,
onOperationFailed,
} = useSocket()
@@ -537,81 +538,99 @@ export function useCollaborativeWorkflow() {
}
}
const reloadWorkflowFromApi = async (workflowId: string, reason: string): Promise<boolean> => {
const response = await fetch(`/api/workflows/${workflowId}`)
if (!response.ok) {
logger.error(`Failed to fetch workflow data after ${reason}: ${response.statusText}`)
return false
}
const responseData = await response.json()
const workflowData = responseData.data
if (!workflowData?.state) {
logger.error(`No state found in workflow data after ${reason}`, { workflowData })
return false
}
isApplyingRemoteChange.current = true
try {
useWorkflowStore.getState().replaceWorkflowState({
blocks: workflowData.state.blocks || {},
edges: workflowData.state.edges || [],
loops: workflowData.state.loops || {},
parallels: workflowData.state.parallels || {},
lastSaved: workflowData.state.lastSaved || Date.now(),
})
const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
})
})
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[workflowId]: subblockValues,
},
}))
const graph = {
blocksById: workflowData.state.blocks || {},
edgesById: Object.fromEntries(
(workflowData.state.edges || []).map((e: any) => [e.id, e])
),
}
const undoRedoStore = useUndoRedoStore.getState()
const stackKeys = Object.keys(undoRedoStore.stacks)
stackKeys.forEach((key) => {
const [wfId, userId] = key.split(':')
if (wfId === workflowId) {
undoRedoStore.pruneInvalidEntries(wfId, userId, graph)
}
})
logger.info(`Successfully reloaded workflow state after ${reason}`, { workflowId })
return true
} finally {
isApplyingRemoteChange.current = false
}
}
const handleWorkflowReverted = async (data: any) => {
const { workflowId } = data
logger.info(`Workflow ${workflowId} has been reverted to deployed state`)
// If the reverted workflow is the currently active one, reload the workflow state
if (activeWorkflowId === workflowId) {
logger.info(`Currently active workflow ${workflowId} was reverted, reloading state`)
if (activeWorkflowId !== workflowId) return
try {
// Fetch the updated workflow state from the server (which loads from normalized tables)
const response = await fetch(`/api/workflows/${workflowId}`)
if (response.ok) {
const responseData = await response.json()
const workflowData = responseData.data
try {
await reloadWorkflowFromApi(workflowId, 'revert')
} catch (error) {
logger.error('Error reloading workflow state after revert:', error)
}
}
if (workflowData?.state) {
// Update the workflow store with the reverted state
isApplyingRemoteChange.current = true
try {
// Update the main workflow state using the API response
useWorkflowStore.getState().replaceWorkflowState({
blocks: workflowData.state.blocks || {},
edges: workflowData.state.edges || [],
loops: workflowData.state.loops || {},
parallels: workflowData.state.parallels || {},
lastSaved: workflowData.state.lastSaved || Date.now(),
})
const handleWorkflowUpdated = async (data: any) => {
const { workflowId } = data
logger.info(`Workflow ${workflowId} has been updated externally`)
// Update subblock store with reverted values
const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
})
})
if (activeWorkflowId !== workflowId) return
// Update subblock store for this workflow
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[workflowId]: subblockValues,
},
}))
const { hasActiveDiff } = useWorkflowDiffStore.getState()
if (hasActiveDiff) {
logger.info('Skipping workflow-updated: active diff in progress', { workflowId })
return
}
logger.info(`Successfully loaded reverted workflow state for ${workflowId}`)
const graph = {
blocksById: workflowData.state.blocks || {},
edgesById: Object.fromEntries(
(workflowData.state.edges || []).map((e: any) => [e.id, e])
),
}
const undoRedoStore = useUndoRedoStore.getState()
const stackKeys = Object.keys(undoRedoStore.stacks)
stackKeys.forEach((key) => {
const [wfId, userId] = key.split(':')
if (wfId === workflowId) {
undoRedoStore.pruneInvalidEntries(wfId, userId, graph)
}
})
} finally {
isApplyingRemoteChange.current = false
}
} else {
logger.error('No state found in workflow data after revert', { workflowData })
}
} else {
logger.error(`Failed to fetch workflow data after revert: ${response.statusText}`)
}
} catch (error) {
logger.error('Error reloading workflow state after revert:', error)
}
try {
await reloadWorkflowFromApi(workflowId, 'external update')
} catch (error) {
logger.error('Error reloading workflow state after external update:', error)
}
}
@@ -633,6 +652,7 @@ export function useCollaborativeWorkflow() {
onVariableUpdate(handleVariableUpdate)
onWorkflowDeleted(handleWorkflowDeleted)
onWorkflowReverted(handleWorkflowReverted)
onWorkflowUpdated(handleWorkflowUpdated)
onOperationConfirmed(handleOperationConfirmed)
onOperationFailed(handleOperationFailed)
}, [
@@ -641,6 +661,7 @@ export function useCollaborativeWorkflow() {
onVariableUpdate,
onWorkflowDeleted,
onWorkflowReverted,
onWorkflowUpdated,
onOperationConfirmed,
onOperationFailed,
activeWorkflowId,

View File

@@ -7,6 +7,7 @@ import {
type BaseServerTool,
type ServerToolContext,
} from '@/lib/copilot/tools/server/base-tool'
import { env } from '@/lib/core/config/env'
import { applyTargetedLayout, getTargetedLayoutImpact } from '@/lib/workflows/autolayout'
import {
DEFAULT_HORIZONTAL_SPACING,
@@ -287,6 +288,18 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, unknown>
logger.info('Workflow state persisted to database', { workflowId })
const socketUrl = env.SOCKET_SERVER_URL || 'http://localhost:3002'
fetch(`${socketUrl}/api/workflow-updated`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': env.INTERNAL_API_SECRET,
},
body: JSON.stringify({ workflowId }),
}).catch((error) => {
logger.warn('Failed to notify socket server of workflow update', { workflowId, error })
})
const sanitizationWarnings = validation.warnings.length > 0 ? validation.warnings : undefined
return {