Revert "fix(sockets-server-disconnection): on reconnect force sync store to d…" (#640)

This reverts commit 6dc8b17bed.
This commit is contained in:
Waleed Latif
2025-07-08 18:32:29 -07:00
committed by GitHub
parent 2c9a4f4c3e
commit d9046042af
5 changed files with 9 additions and 365 deletions

View File

@@ -1,175 +0,0 @@
import crypto from 'crypto'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console-logger'
import { getUserEntityPermissions } from '@/lib/permissions/utils'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/db-helpers'
import { workflowStateApiSchema } from '@/lib/workflows/validation'
import { db } from '@/db'
import { workflow } from '@/db/schema'
const logger = createLogger('ForceSync')
/**
* POST /api/workflows/[id]/force-sync
* Force sync local workflow state to database immediately
* Used during socket reconnection to ensure local changes are persisted
*/
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id: workflowId } = await params
try {
logger.info(`[${requestId}] Force sync request for workflow ${workflowId}`)
// Get session
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized force sync attempt`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const userId = session.user.id
// Get workflow and verify access (inline implementation)
const workflowData = await db
.select({
userId: workflow.userId,
workspaceId: workflow.workspaceId,
name: workflow.name,
})
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!workflowData.length) {
logger.warn(`[${requestId}] Workflow ${workflowId} not found`)
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
const workflowRecord = workflowData[0]
let hasAccess = false
// Check if user owns the workflow
if (workflowRecord.userId === userId) {
hasAccess = true
}
// Check workspace membership if workflow belongs to a workspace
if (!hasAccess && workflowRecord.workspaceId) {
const userPermission = await getUserEntityPermissions(
userId,
'workspace',
workflowRecord.workspaceId
)
hasAccess = userPermission !== null
}
if (!hasAccess) {
logger.warn(`[${requestId}] Access denied for user ${userId} to workflow ${workflowId}`)
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
// Parse and validate the workflow state from request body
const body = await request.json()
const { workflowState } = body
if (!workflowState) {
return NextResponse.json({ error: 'Missing workflowState in request body' }, { status: 400 })
}
// Validate the workflow state structure
const validationResult = workflowStateApiSchema.safeParse(workflowState)
if (!validationResult.success) {
logger.error(`[${requestId}] Invalid workflow state structure:`, {
error: validationResult.error,
receivedData: JSON.stringify(workflowState, null, 2),
})
return NextResponse.json(
{
error: 'Invalid workflow state structure',
details: validationResult.error.issues,
receivedKeys: Object.keys(workflowState || {}),
},
{ status: 400 }
)
}
const validatedState = validationResult.data
// Save to normalized tables
logger.info(`[${requestId}] Saving workflow state to normalized tables`)
// Convert deployedAt to Date if it's a string
let deployedAt: Date | undefined
if (validatedState.deployedAt) {
if (typeof validatedState.deployedAt === 'string') {
deployedAt = new Date(validatedState.deployedAt)
} else if (validatedState.deployedAt instanceof Date) {
deployedAt = validatedState.deployedAt
}
}
const saveResult = await saveWorkflowToNormalizedTables(workflowId, {
blocks: validatedState.blocks,
edges: validatedState.edges,
loops: validatedState.loops || {},
parallels: validatedState.parallels || {},
lastSaved: Date.now(),
isDeployed: validatedState.isDeployed,
deployedAt,
deploymentStatuses: validatedState.deploymentStatuses || {},
hasActiveSchedule: validatedState.hasActiveSchedule || false,
hasActiveWebhook: validatedState.hasActiveWebhook || false,
})
if (!saveResult.success) {
logger.error(`[${requestId}] Failed to save workflow state:`, saveResult.error)
return NextResponse.json(
{ error: saveResult.error || 'Failed to save workflow state' },
{ status: 500 }
)
}
// Update workflow's last_synced timestamp
await db
.update(workflow)
.set({
lastSynced: new Date(),
updatedAt: new Date(),
})
.where(eq(workflow.id, workflowId))
logger.info(`[${requestId}] Successfully force synced workflow ${workflowId}`)
// Notify socket server about the sync for real-time updates
try {
const socketServerUrl = process.env.SOCKET_SERVER_URL || 'http://localhost:3002'
await fetch(`${socketServerUrl}/api/workflow-synced`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
workflowId,
timestamp: Date.now(),
userId,
}),
})
logger.debug(`[${requestId}] Notified socket server about force sync`)
} catch (socketError) {
// Don't fail the request if socket notification fails
logger.warn(`[${requestId}] Failed to notify socket server about sync:`, socketError)
}
return NextResponse.json({
success: true,
message: 'Workflow state synced successfully',
timestamp: Date.now(),
})
} catch (error: any) {
logger.error(`[${requestId}] Force sync error:`, error)
return NextResponse.json({ error: error.message || 'Internal server error' }, { status: 500 })
}
}

View File

@@ -4,23 +4,22 @@ import { useEffect, useState } from 'react'
interface ConnectionStatusProps {
isConnected: boolean
isSyncing: boolean
}
export function ConnectionStatus({ isConnected, isSyncing }: ConnectionStatusProps) {
export function ConnectionStatus({ isConnected }: ConnectionStatusProps) {
const [showOfflineNotice, setShowOfflineNotice] = useState(false)
const [syncCompleted, setSyncCompleted] = useState(false)
useEffect(() => {
let timeoutId: NodeJS.Timeout
if (!isConnected) {
// Show offline notice after 6 seconds of being disconnected
timeoutId = setTimeout(() => {
setShowOfflineNotice(true)
}, 6000) // 6 seconds
} else if (isConnected && showOfflineNotice && !isSyncing && syncCompleted) {
} else {
// Hide notice immediately when reconnected
setShowOfflineNotice(false)
setSyncCompleted(false)
}
return () => {
@@ -28,40 +27,13 @@ export function ConnectionStatus({ isConnected, isSyncing }: ConnectionStatusPro
clearTimeout(timeoutId)
}
}
}, [isConnected, isSyncing, showOfflineNotice, syncCompleted])
// Track when sync completes
useEffect(() => {
if (!isSyncing && showOfflineNotice && isConnected) {
setSyncCompleted(true)
}
}, [isSyncing, showOfflineNotice, isConnected])
}, [isConnected])
// Don't render anything if connected or if we haven't been disconnected long enough
if (!showOfflineNotice) {
return null
}
// Show different states based on connection and sync status
if (isConnected && isSyncing) {
return (
<div className='flex items-center gap-1.5'>
<div className='flex items-center gap-1.5 text-yellow-600'>
<div className='relative flex items-center justify-center'>
<div className='absolute h-3 w-3 animate-ping rounded-full bg-yellow-500/20' />
<div className='relative h-2 w-2 rounded-full bg-yellow-500' />
</div>
<div className='flex flex-col'>
<span className='font-medium text-xs leading-tight'>Syncing changes</span>
<span className='text-xs leading-tight opacity-90'>
Saving local changes to database...
</span>
</div>
</div>
</div>
)
}
return (
<div className='flex items-center gap-1.5'>
<div className='flex items-center gap-1.5 text-red-600'>

View File

@@ -1,7 +1,6 @@
'use client'
import { useMemo } from 'react'
import { useSocket } from '@/contexts/socket-context'
import { usePresence } from '../../../../hooks/use-presence'
import { ConnectionStatus } from './components/connection-status/connection-status'
import { UserAvatar } from './components/user-avatar/user-avatar'
@@ -28,7 +27,6 @@ export function UserAvatarStack({
}: UserAvatarStackProps) {
// Use presence data if no users are provided via props
const { users: presenceUsers, isConnected } = usePresence()
const { isSyncing } = useSocket()
const users = propUsers || presenceUsers
// Memoize the processed users to avoid unnecessary re-renders
@@ -47,10 +45,8 @@ export function UserAvatarStack({
}, [users, maxVisible])
// Show connection status component regardless of user count
// This will handle the offline notice when disconnected for 6 seconds
const connectionStatusElement = (
<ConnectionStatus isConnected={isConnected} isSyncing={isSyncing} />
)
// This will handle the offline notice when disconnected for 15 seconds
const connectionStatusElement = <ConnectionStatus isConnected={isConnected} />
// Only show presence when there are multiple users (>1)
// But always show connection status

View File

@@ -13,9 +13,6 @@ import { useParams } from 'next/navigation'
import { io, type Socket } from 'socket.io-client'
import { env } from '@/lib/env'
import { createLogger } from '@/lib/logs/console-logger'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
const logger = createLogger('SocketContext')
@@ -37,7 +34,6 @@ interface SocketContextType {
socket: Socket | null
isConnected: boolean
isConnecting: boolean
isSyncing: boolean
currentWorkflowId: string | null
presenceUsers: PresenceUser[]
joinWorkflow: (workflowId: string) => void
@@ -46,7 +42,6 @@ interface SocketContextType {
emitSubblockUpdate: (blockId: string, subblockId: string, value: any) => void
emitCursorUpdate: (cursor: { x: number; y: number }) => void
emitSelectionUpdate: (selection: { type: 'block' | 'edge' | 'none'; id?: string }) => void
requestForceSync: () => Promise<boolean>
// Event handlers for receiving real-time updates
onWorkflowOperation: (handler: (data: any) => void) => void
onSubblockUpdate: (handler: (data: any) => void) => void
@@ -62,7 +57,6 @@ const SocketContext = createContext<SocketContextType>({
socket: null,
isConnected: false,
isConnecting: false,
isSyncing: false,
currentWorkflowId: null,
presenceUsers: [],
joinWorkflow: () => {},
@@ -71,7 +65,6 @@ const SocketContext = createContext<SocketContextType>({
emitSubblockUpdate: () => {},
emitCursorUpdate: () => {},
emitSelectionUpdate: () => {},
requestForceSync: async () => false,
onWorkflowOperation: () => {},
onSubblockUpdate: () => {},
onCursorUpdate: () => {},
@@ -93,7 +86,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [socket, setSocket] = useState<Socket | null>(null)
const [isConnected, setIsConnected] = useState(false)
const [isConnecting, setIsConnecting] = useState(false)
const [isSyncing, setIsSyncing] = useState(false)
const [currentWorkflowId, setCurrentWorkflowId] = useState<string | null>(null)
const [presenceUsers, setPresenceUsers] = useState<PresenceUser[]>([])
@@ -101,11 +93,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined
// Access stores for force sync
const workflowStore = useWorkflowStore()
const subBlockStore = useSubBlockStore()
const { activeWorkflowId } = useWorkflowRegistry()
// Use refs to store event handlers to avoid stale closures
const eventHandlers = useRef<{
workflowOperation?: (data: any) => void
@@ -163,9 +150,9 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const socketInstance = io(socketUrl, {
transports: ['websocket', 'polling'], // Keep polling fallback for reliability
withCredentials: true,
reconnectionAttempts: Number.POSITIVE_INFINITY, // Socket.IO handles base reconnection
reconnectionAttempts: 5, // Socket.IO handles base reconnection
reconnectionDelay: 1000, // Start with 1 second delay
reconnectionDelayMax: 30000, // Max 30 second delay
reconnectionDelayMax: 5000, // Max 5 second delay
timeout: 10000, // Back to original timeout
auth: (cb) => {
// Generate a fresh token for each connection attempt (including reconnections)
@@ -583,102 +570,12 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
eventHandlers.current.workflowReverted = handler
}, [])
const requestForceSync = useCallback(async (): Promise<boolean> => {
if (!activeWorkflowId) {
logger.warn('Cannot force sync: no active workflow')
return false
}
setIsSyncing(true)
try {
logger.info(`Starting force sync for workflow ${activeWorkflowId}`)
const currentState = workflowStore
const subblockValues = subBlockStore.workflowValues[activeWorkflowId] || {}
const blocksWithSubblocks = { ...currentState.blocks }
Object.entries(subblockValues).forEach(([blockId, blockSubblocks]) => {
if (blocksWithSubblocks[blockId]) {
Object.entries(blockSubblocks as Record<string, any>).forEach(([subblockId, value]) => {
if (blocksWithSubblocks[blockId].subBlocks[subblockId]) {
blocksWithSubblocks[blockId].subBlocks[subblockId].value = value
}
})
}
})
const workflowState = {
blocks: blocksWithSubblocks,
edges: currentState.edges,
loops: currentState.loops,
parallels: currentState.parallels,
lastSaved: Date.now(),
isDeployed: currentState.isDeployed,
deployedAt: currentState.deployedAt,
deploymentStatuses: currentState.deploymentStatuses,
hasActiveSchedule: currentState.hasActiveSchedule,
hasActiveWebhook: currentState.hasActiveWebhook,
}
const response = await fetch(`/api/workflows/${activeWorkflowId}/force-sync`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ workflowState }),
})
if (!response.ok) {
const errorData = await response.json().catch(() => ({}))
logger.error('Force sync API error response:', {
status: response.status,
statusText: response.statusText,
errorData,
workflowState: JSON.stringify(workflowState, null, 2),
})
throw new Error(errorData.error || `Force sync failed: ${response.statusText}`)
}
logger.info(`Force sync completed successfully for workflow ${activeWorkflowId}`)
return true
} catch (error) {
logger.error('Force sync failed:', error)
return false
} finally {
setIsSyncing(false)
}
}, [activeWorkflowId, workflowStore, subBlockStore, setIsSyncing])
const hasBeenDisconnected = useRef(false)
useEffect(() => {
if (!isConnected) {
hasBeenDisconnected.current = true
} else if (hasBeenDisconnected.current && isConnected && activeWorkflowId) {
logger.info('Connection restored, triggering force sync to persist local changes')
const syncTimeout = setTimeout(async () => {
const syncSuccess = await requestForceSync()
if (syncSuccess) {
logger.info('Force sync completed successfully after reconnection')
} else {
logger.error('Force sync failed after reconnection')
}
hasBeenDisconnected.current = false
}, 1000)
return () => clearTimeout(syncTimeout)
}
}, [isConnected, activeWorkflowId, requestForceSync])
return (
<SocketContext.Provider
value={{
socket,
isConnected,
isConnecting,
isSyncing,
currentWorkflowId,
presenceUsers,
joinWorkflow,
@@ -687,7 +584,6 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
emitSubblockUpdate,
emitCursorUpdate,
emitSelectionUpdate,
requestForceSync,
onWorkflowOperation,
onSubblockUpdate,
onCursorUpdate,

View File

@@ -1,45 +0,0 @@
import { z } from 'zod'
/**
* Shared Zod schema for validating WorkflowState objects
* This schema is used across API routes and other validation points
* to ensure consistent workflow state structure validation.
*/
export const workflowStateSchema = z.object({
// Core workflow structure
blocks: z.record(z.any()),
edges: z.array(z.any()),
loops: z.record(z.any()).optional().default({}),
parallels: z.record(z.any()).optional().default({}),
// Timestamps
lastSaved: z.number().optional(),
lastUpdate: z.number().optional(),
// Deployment fields
isDeployed: z.boolean().optional(),
// deployedAt can be Date, string, or undefined depending on serialization
deployedAt: z.union([z.date(), z.string(), z.undefined()]).optional(),
deploymentStatuses: z.record(z.any()).optional().default({}),
needsRedeployment: z.boolean().optional(),
// Feature flags
hasActiveSchedule: z.boolean().optional().default(false),
hasActiveWebhook: z.boolean().optional().default(false),
})
/**
* Schema for validating workflow state in API requests
* This is a more lenient version that handles serialized data
*/
export const workflowStateApiSchema = workflowStateSchema
.extend({
// Allow additional fields that might be present in API requests
// but aren't part of the core WorkflowState interface
})
.passthrough()
/**
* Type inference from the schema
*/
export type ValidatedWorkflowState = z.infer<typeof workflowStateSchema>