fix bugbot comments

This commit is contained in:
Vikhyath Mondreti
2026-01-29 17:30:19 -08:00
parent e5d9b98909
commit 1fbe3029f4
4 changed files with 95 additions and 51 deletions

View File

@@ -127,6 +127,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
const [presenceUsers, setPresenceUsers] = useState<PresenceUser[]>([])
const [authFailed, setAuthFailed] = useState(false)
const initializedRef = useRef(false)
const socketRef = useRef<Socket | null>(null)
const params = useParams()
const urlWorkflowId = params?.workflowId as string | undefined
@@ -146,6 +147,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}>({})
const positionUpdateTimeouts = useRef<Map<string, number>>(new Map())
const isRejoiningRef = useRef<boolean>(false)
const pendingPositionUpdates = useRef<Map<string, any>>(new Map())
const generateSocketToken = async (): Promise<string> => {
@@ -315,6 +317,7 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
// Handle join workflow success - confirms room membership with presence list
socketInstance.on('join-workflow-success', ({ workflowId, presenceUsers }) => {
isRejoiningRef.current = false
setCurrentWorkflowId(workflowId)
setPresenceUsers(presenceUsers || [])
logger.info(`Successfully joined workflow room: ${workflowId}`, {
@@ -322,6 +325,11 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
})
socketInstance.on('join-workflow-error', ({ error }) => {
isRejoiningRef.current = false
logger.error('Failed to join workflow:', error)
})
socketInstance.on('workflow-operation', (data) => {
eventHandlers.current.workflowOperation?.(data)
})
@@ -485,6 +493,19 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
socketInstance.on('operation-forbidden', (error) => {
logger.warn('Operation forbidden:', error)
if (error?.type === 'SESSION_ERROR') {
const workflowId = urlWorkflowIdRef.current
if (workflowId && !isRejoiningRef.current) {
isRejoiningRef.current = true
logger.info(`Session expired, rejoining workflow: ${workflowId}`)
socketInstance.emit('join-workflow', {
workflowId,
tabSessionId: getTabSessionId(),
})
}
}
})
socketInstance.on('workflow-state', async (workflowData) => {
@@ -495,11 +516,8 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
}
})
socketRef.current = socketInstance
setSocket(socketInstance)
return () => {
socketInstance.close()
}
} catch (error) {
logger.error('Failed to initialize socket with token:', error)
setIsConnecting(false)
@@ -514,6 +532,13 @@ export function SocketProvider({ children, user }: SocketProviderProps) {
})
positionUpdateTimeouts.current.clear()
pendingPositionUpdates.current.clear()
// Close socket on unmount
if (socketRef.current) {
logger.info('Closing socket connection on unmount')
socketRef.current.close()
socketRef.current = null
}
}
}, [user?.id, authFailed])

View File

@@ -36,32 +36,36 @@ export function cleanupPendingSubblocksForSocket(socketId: string): void {
export function setupSubblocksHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('subblock-update', async (data) => {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
return
}
const { blockId, subblockId, value, timestamp, operationId } = data
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring subblock update: workflow room not found`, {
socketId: socket.id,
workflowId,
blockId,
subblockId,
})
return
}
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring subblock update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring subblock update: workflow room not found`, {
socketId: socket.id,
workflowId,
blockId,
subblockId,
})
return
}
// Update user activity
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })

View File

@@ -32,32 +32,36 @@ export function cleanupPendingVariablesForSocket(socketId: string): void {
export function setupVariablesHandlers(socket: AuthenticatedSocket, roomManager: IRoomManager) {
socket.on('variable-update', async (data) => {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
return
}
const { variableId, field, value, timestamp, operationId } = data
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring variable update: workflow room not found`, {
socketId: socket.id,
workflowId,
variableId,
field,
})
return
}
try {
const workflowId = await roomManager.getWorkflowIdForSocket(socket.id)
const session = await roomManager.getUserSession(socket.id)
if (!workflowId || !session) {
logger.debug(`Ignoring variable update: socket not connected to any workflow room`, {
socketId: socket.id,
hasWorkflowId: !!workflowId,
hasSession: !!session,
})
socket.emit('operation-forbidden', {
type: 'SESSION_ERROR',
message: 'Session expired, please rejoin workflow',
})
return
}
const hasRoom = await roomManager.hasWorkflowRoom(workflowId)
if (!hasRoom) {
logger.debug(`Ignoring variable update: workflow room not found`, {
socketId: socket.id,
workflowId,
variableId,
field,
})
return
}
// Update user activity
await roomManager.updateUserActivity(workflowId, socket.id, { lastActivity: Date.now() })

View File

@@ -48,13 +48,17 @@ return workflowId
/**
* Lua script for atomic user activity update.
* Performs read-modify-write atomically to prevent lost updates.
* Also refreshes TTL on socket keys to prevent expiry during long sessions.
*/
const UPDATE_ACTIVITY_SCRIPT = `
local workflowUsersKey = KEYS[1]
local socketWorkflowKey = KEYS[2]
local socketSessionKey = KEYS[3]
local socketId = ARGV[1]
local cursorJson = ARGV[2]
local selectionJson = ARGV[3]
local lastActivity = ARGV[4]
local ttl = tonumber(ARGV[5])
local existingJson = redis.call('HGET', workflowUsersKey, socketId)
if not existingJson then
@@ -72,6 +76,8 @@ end
existing.lastActivity = tonumber(lastActivity)
redis.call('HSET', workflowUsersKey, socketId, cjson.encode(existing))
redis.call('EXPIRE', socketWorkflowKey, ttl)
redis.call('EXPIRE', socketSessionKey, ttl)
return 1
`
@@ -269,12 +275,17 @@ export class RedisRoomManager implements IRoomManager {
try {
await this.redis.evalSha(this.updateActivityScriptSha, {
keys: [KEYS.workflowUsers(workflowId)],
keys: [
KEYS.workflowUsers(workflowId),
KEYS.socketWorkflow(socketId),
KEYS.socketSession(socketId),
],
arguments: [
socketId,
updates.cursor ? JSON.stringify(updates.cursor) : '',
updates.selection ? JSON.stringify(updates.selection) : '',
(updates.lastActivity ?? Date.now()).toString(),
SOCKET_KEY_TTL.toString(),
],
})
} catch (error) {