fix(redis): tighten stale TCP connection detection and add fast lease deadline (#3311)

* fix(redis): tighten stale TCP connection detection and add fast lease deadline

* revert(redis): restore original retryStrategy logging

* fix(redis): clear deadline timer after Promise.race to prevent memory leak

* fix(redis): downgrade lease fallback log to warn — unavailable is expected fallback
This commit is contained in:
Waleed
2026-02-23 13:22:29 -08:00
committed by GitHub
parent 2ae814549a
commit 132fef06a1
3 changed files with 47 additions and 37 deletions

View File

@@ -29,9 +29,8 @@ describe('redis config', () => {
getRedisClient()
mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)
expect(listener).toHaveBeenCalledTimes(1)
})
@@ -44,9 +43,9 @@ describe('redis config', () => {
getRedisClient()
mockRedisInstance.ping.mockResolvedValue('PONG')
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)
expect(listener).not.toHaveBeenCalled()
})
@@ -58,34 +57,29 @@ describe('redis config', () => {
getRedisClient()
// 2 failures then a success — should reset counter
// 1 failure then a success — should reset counter
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
mockRedisInstance.ping.mockResolvedValueOnce('PONG')
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
// 2 more failures — should NOT trigger reconnect (counter was reset)
// 1 more failure — should NOT trigger reconnect (counter was reset)
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
mockRedisInstance.ping.mockRejectedValueOnce(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
expect(listener).not.toHaveBeenCalled()
})
it('should call disconnect(true) after 3 consecutive PING failures', async () => {
it('should call disconnect(true) after 2 consecutive PING failures', async () => {
const { getRedisClient } = await import('./redis')
getRedisClient()
mockRedisInstance.ping.mockRejectedValue(new Error('ETIMEDOUT'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
expect(mockRedisInstance.disconnect).toHaveBeenCalledWith(true)
})
@@ -100,9 +94,8 @@ describe('redis config', () => {
getRedisClient()
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(30_000)
await vi.advanceTimersByTimeAsync(15_000)
await vi.advanceTimersByTimeAsync(15_000)
expect(badListener).toHaveBeenCalledTimes(1)
expect(goodListener).toHaveBeenCalledTimes(1)
@@ -119,7 +112,7 @@ describe('redis config', () => {
// After closing, PING failures should not trigger disconnect
mockRedisInstance.ping.mockRejectedValue(new Error('timeout'))
await vi.advanceTimersByTimeAsync(30_000 * 5)
await vi.advanceTimersByTimeAsync(15_000 * 5)
expect(mockRedisInstance.disconnect).not.toHaveBeenCalled()
})
})

View File

@@ -11,8 +11,8 @@ let pingFailures = 0
let pingInterval: NodeJS.Timeout | null = null
let pingInFlight = false
const PING_INTERVAL_MS = 30_000
const MAX_PING_FAILURES = 3
const PING_INTERVAL_MS = 15_000
const MAX_PING_FAILURES = 2
/** Callbacks invoked when the PING health check forces a reconnect. */
const reconnectListeners: Array<() => void> = []
@@ -42,7 +42,7 @@ function startPingHealthCheck(redis: Redis): void {
})
if (pingFailures >= MAX_PING_FAILURES) {
logger.error('Redis PING failed 3 consecutive times — forcing reconnect', {
logger.error('Redis PING failed consecutive times — forcing reconnect', {
consecutiveFailures: pingFailures,
})
pingFailures = 0

View File

@@ -71,6 +71,7 @@ const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER =
MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER
const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000
const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner'
const LEASE_REDIS_DEADLINE_MS = 200
const QUEUE_RETRY_DELAY_MS = 1000
const DISTRIBUTED_LEASE_GRACE_MS = 30000
@@ -292,21 +293,37 @@ async function tryAcquireDistributedLease(
return 1
`
try {
const result = await redis.eval(
script,
1,
key,
now.toString(),
DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(),
expiresAt.toString(),
leaseId,
leaseTtlMs.toString()
let deadlineTimer: NodeJS.Timeout | undefined
const deadline = new Promise<never>((_, reject) => {
deadlineTimer = setTimeout(
() => reject(new Error(`Redis lease timed out after ${LEASE_REDIS_DEADLINE_MS}ms`)),
LEASE_REDIS_DEADLINE_MS
)
})
try {
const result = await Promise.race([
redis.eval(
script,
1,
key,
now.toString(),
DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(),
expiresAt.toString(),
leaseId,
leaseTtlMs.toString()
),
deadline,
])
return Number(result) === 1 ? 'acquired' : 'limit_exceeded'
} catch (error) {
logger.error('Failed to acquire distributed owner lease', { ownerKey, error })
logger.warn('Failed to acquire distributed owner lease — falling back to local execution', {
ownerKey,
error,
})
return 'unavailable'
} finally {
clearTimeout(deadlineTimer)
}
}