fix(revert-deployed): correctly revert to deployed state as unit op using separate endpoint (#633)

* fix(revert-deployed): revert deployed functionality with separate endpoint

* fix lint

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
This commit is contained in:
Vikhyath Mondreti
2025-07-07 22:16:17 -07:00
committed by GitHub
parent 5167deb75c
commit 3e45d793f1
6 changed files with 277 additions and 9 deletions

View File

@@ -0,0 +1,121 @@
import crypto from 'crypto'
import { eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { createLogger } from '@/lib/logs/console-logger'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/db-helpers'
import { db } from '@/db'
import { workflow } from '@/db/schema'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import { validateWorkflowAccess } from '../../middleware'
import { createErrorResponse, createSuccessResponse } from '../../utils'
const logger = createLogger('RevertToDeployedAPI')
export const dynamic = 'force-dynamic'
export const runtime = 'nodejs'
/**
* POST /api/workflows/[id]/revert-to-deployed
* Revert workflow to its deployed state by saving deployed state to normalized tables
*/
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id } = await params
try {
logger.debug(`[${requestId}] Reverting workflow to deployed state: ${id}`)
const validation = await validateWorkflowAccess(request, id, false)
if (validation.error) {
logger.warn(`[${requestId}] Workflow revert failed: ${validation.error.message}`)
return createErrorResponse(validation.error.message, validation.error.status)
}
const workflowData = validation.workflow
// Check if workflow is deployed and has deployed state
if (!workflowData.isDeployed || !workflowData.deployedState) {
logger.warn(`[${requestId}] Cannot revert: workflow is not deployed or has no deployed state`)
return createErrorResponse('Workflow is not deployed or has no deployed state', 400)
}
// Validate deployed state structure
const deployedState = workflowData.deployedState as WorkflowState
if (!deployedState.blocks || !deployedState.edges) {
logger.error(`[${requestId}] Invalid deployed state structure`, { deployedState })
return createErrorResponse('Invalid deployed state structure', 500)
}
logger.debug(`[${requestId}] Saving deployed state to normalized tables`, {
blocksCount: Object.keys(deployedState.blocks).length,
edgesCount: deployedState.edges.length,
loopsCount: Object.keys(deployedState.loops || {}).length,
parallelsCount: Object.keys(deployedState.parallels || {}).length,
})
// Save deployed state to normalized tables
const saveResult = await saveWorkflowToNormalizedTables(id, {
blocks: deployedState.blocks,
edges: deployedState.edges,
loops: deployedState.loops || {},
parallels: deployedState.parallels || {},
lastSaved: Date.now(),
isDeployed: workflowData.isDeployed,
deployedAt: workflowData.deployedAt,
deploymentStatuses: deployedState.deploymentStatuses || {},
hasActiveSchedule: deployedState.hasActiveSchedule || false,
hasActiveWebhook: deployedState.hasActiveWebhook || false,
})
if (!saveResult.success) {
logger.error(`[${requestId}] Failed to save deployed state to normalized tables`, {
error: saveResult.error,
})
return createErrorResponse(
saveResult.error || 'Failed to save deployed state to normalized tables',
500
)
}
// Update workflow's last_synced timestamp to indicate changes
await db
.update(workflow)
.set({
lastSynced: new Date(),
updatedAt: new Date(),
})
.where(eq(workflow.id, id))
// Notify socket server about the revert operation for real-time sync
try {
const socketServerUrl = process.env.SOCKET_SERVER_URL || 'http://localhost:3002'
await fetch(`${socketServerUrl}/api/workflow-reverted`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
workflowId: id,
timestamp: Date.now(),
}),
})
logger.debug(`[${requestId}] Notified socket server about workflow revert: ${id}`)
} catch (socketError) {
// Don't fail the request if socket notification fails
logger.warn(`[${requestId}] Failed to notify socket server about revert:`, socketError)
}
logger.info(`[${requestId}] Successfully reverted workflow to deployed state: ${id}`)
return createSuccessResponse({
message: 'Workflow successfully reverted to deployed state',
lastSaved: Date.now(),
})
} catch (error: any) {
logger.error(`[${requestId}] Error reverting workflow to deployed state: ${id}`, {
error: error.message,
stack: error.stack,
})
return createErrorResponse(error.message || 'Failed to revert workflow to deployed state', 500)
}
}

View File

@@ -50,6 +50,7 @@ interface SocketContextType {
onUserJoined: (handler: (data: any) => void) => void
onUserLeft: (handler: (data: any) => void) => void
onWorkflowDeleted: (handler: (data: any) => void) => void
onWorkflowReverted: (handler: (data: any) => void) => void
}
const SocketContext = createContext<SocketContextType>({
@@ -71,6 +72,7 @@ const SocketContext = createContext<SocketContextType>({
onUserJoined: () => {},
onUserLeft: () => {},
onWorkflowDeleted: () => {},
onWorkflowReverted: () => {},
})
export const useSocket = () => useContext(SocketContext)
@@ -100,6 +102,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
userJoined?: (data: any) => void
userLeft?: (data: any) => void
workflowDeleted?: (data: any) => void
workflowReverted?: (data: any) => void
}>({})
// Helper function to generate a fresh socket token
@@ -281,6 +284,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.workflowDeleted?.(data)
})
// Workflow revert events
socketInstance.on('workflow-reverted', (data) => {
logger.info(`Workflow ${data.workflowId} has been reverted to deployed state`)
eventHandlers.current.workflowReverted?.(data)
})
// Cursor update events
socketInstance.on('cursor-update', (data) => {
setPresenceUsers((prev) =>
@@ -557,6 +566,10 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.workflowDeleted = handler
}, [])
const onWorkflowReverted = useCallback((handler: (data: any) => void) => {
eventHandlers.current.workflowReverted = handler
}, [])
return (
<SocketContext.Provider
value={{
@@ -578,6 +591,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
}}
>
{children}

View File

@@ -25,6 +25,7 @@ export function useCollaborativeWorkflow() {
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
} = useSocket()
const { activeWorkflowId } = useWorkflowRegistry()
@@ -262,12 +263,80 @@ export function useCollaborativeWorkflow() {
}
}
const handleWorkflowReverted = async (data: any) => {
const { workflowId } = data
logger.info(`Workflow ${workflowId} has been reverted to deployed state`)
// If the reverted workflow is the currently active one, reload the workflow state
if (activeWorkflowId === workflowId) {
logger.info(`Currently active workflow ${workflowId} was reverted, reloading state`)
try {
// Fetch the updated workflow state from the server (which loads from normalized tables)
const response = await fetch(`/api/workflows/${workflowId}`)
if (response.ok) {
const responseData = await response.json()
const workflowData = responseData.data
if (workflowData?.state) {
// Update the workflow store with the reverted state
isApplyingRemoteChange.current = true
try {
// Update the main workflow state using the API response
useWorkflowStore.setState({
blocks: workflowData.state.blocks || {},
edges: workflowData.state.edges || [],
loops: workflowData.state.loops || {},
parallels: workflowData.state.parallels || {},
isDeployed: workflowData.state.isDeployed || false,
deployedAt: workflowData.state.deployedAt,
lastSaved: workflowData.state.lastSaved || Date.now(),
hasActiveSchedule: workflowData.state.hasActiveSchedule || false,
hasActiveWebhook: workflowData.state.hasActiveWebhook || false,
deploymentStatuses: workflowData.state.deploymentStatuses || {},
})
// Update subblock store with reverted values
const subblockValues: Record<string, Record<string, any>> = {}
Object.entries(workflowData.state.blocks || {}).forEach(([blockId, block]) => {
const blockState = block as any
subblockValues[blockId] = {}
Object.entries(blockState.subBlocks || {}).forEach(([subblockId, subblock]) => {
subblockValues[blockId][subblockId] = (subblock as any).value
})
})
// Update subblock store for this workflow
useSubBlockStore.setState((state: any) => ({
workflowValues: {
...state.workflowValues,
[workflowId]: subblockValues,
},
}))
logger.info(`Successfully loaded reverted workflow state for ${workflowId}`)
} finally {
isApplyingRemoteChange.current = false
}
} else {
logger.error('No state found in workflow data after revert', { workflowData })
}
} else {
logger.error(`Failed to fetch workflow data after revert: ${response.statusText}`)
}
} catch (error) {
logger.error('Error reloading workflow state after revert:', error)
}
}
}
// Register event handlers
onWorkflowOperation(handleWorkflowOperation)
onSubblockUpdate(handleSubblockUpdate)
onUserJoined(handleUserJoined)
onUserLeft(handleUserLeft)
onWorkflowDeleted(handleWorkflowDeleted)
onWorkflowReverted(handleWorkflowReverted)
return () => {
// Cleanup handled by socket context
@@ -278,6 +347,7 @@ export function useCollaborativeWorkflow() {
onUserJoined,
onUserLeft,
onWorkflowDeleted,
onWorkflowReverted,
workflowStore,
subBlockStore,
activeWorkflowId,

View File

@@ -115,6 +115,26 @@ export class RoomManager {
)
}
handleWorkflowRevert(workflowId: string, timestamp: number) {
logger.info(`Handling workflow revert notification for ${workflowId}`)
const room = this.workflowRooms.get(workflowId)
if (!room) {
logger.debug(`No active room found for reverted workflow ${workflowId}`)
return
}
this.io.to(workflowId).emit('workflow-reverted', {
workflowId,
message: 'Workflow has been reverted to deployed state',
timestamp,
})
room.lastModified = timestamp
logger.info(`Notified ${room.users.size} users about workflow revert: ${workflowId}`)
}
async validateWorkflowConsistency(
workflowId: string
): Promise<{ valid: boolean; issues: string[] }> {

View File

@@ -50,6 +50,27 @@ export function createHttpHandler(roomManager: RoomManager, logger: Logger) {
return
}
// Handle workflow revert notifications from the main API
if (req.method === 'POST' && req.url === '/api/workflow-reverted') {
let body = ''
req.on('data', (chunk) => {
body += chunk.toString()
})
req.on('end', () => {
try {
const { workflowId, timestamp } = JSON.parse(body)
roomManager.handleWorkflowRevert(workflowId, timestamp)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ success: true }))
} catch (error) {
logger.error('Error handling workflow revert notification:', error)
res.writeHead(500, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Failed to process revert notification' }))
}
})
return
}
res.writeHead(404, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ error: 'Not found' }))
}

View File

@@ -822,13 +822,18 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
}
},
revertToDeployedState: (deployedState: WorkflowState) => {
revertToDeployedState: async (deployedState: WorkflowState) => {
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
if (!activeWorkflowId) {
console.error('Cannot revert: no active workflow ID')
return
}
// Preserving the workflow-specific deployment status if it exists
const deploymentStatus = activeWorkflowId
? useWorkflowRegistry.getState().getWorkflowDeploymentStatus(activeWorkflowId)
: null
const deploymentStatus = useWorkflowRegistry
.getState()
.getWorkflowDeploymentStatus(activeWorkflowId)
const newState = {
blocks: deployedState.blocks,
@@ -841,7 +846,7 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
// Keep existing deployment statuses and update for the active workflow if needed
deploymentStatuses: {
...get().deploymentStatuses,
...(activeWorkflowId && deploymentStatus
...(deploymentStatus
? {
[activeWorkflowId]: deploymentStatus,
}
@@ -852,9 +857,6 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
// Update the main workflow state
set(newState)
// Get the active workflow ID
if (!activeWorkflowId) return
// Initialize subblock store with values from deployed state
const subBlockStore = useSubBlockStore.getState()
const values: Record<string, Record<string, any>> = {}
@@ -885,7 +887,27 @@ export const useWorkflowStore = create<WorkflowStoreWithHistory>()(
pushHistory(set, get, newState, 'Reverted to deployed state')
get().updateLastSaved()
// Note: Socket.IO handles real-time sync automatically
// Call API to persist the revert to normalized tables
try {
const response = await fetch(`/api/workflows/${activeWorkflowId}/revert-to-deployed`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
})
if (!response.ok) {
const errorData = await response.json()
console.error('Failed to persist revert to deployed state:', errorData.error)
// Don't throw error to avoid breaking the UI, but log it
} else {
console.log('Successfully persisted revert to deployed state')
}
} catch (error) {
console.error('Error calling revert to deployed API:', error)
// Don't throw error to avoid breaking the UI
}
},
toggleBlockAdvancedMode: (id: string) => {