mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-09 15:07:55 -05:00
feat(locks): add no-op for locking without redis to allow deployments without redis (#2703)
* feat(locks): add no-op for locking without redis to allow deployments without redis * ack PR comments, fixed worklfow block color
This commit is contained in:
@@ -14,10 +14,6 @@ import {
|
||||
} from '@/app/api/__test-utils__/utils'
|
||||
|
||||
const {
|
||||
hasProcessedMessageMock,
|
||||
markMessageAsProcessedMock,
|
||||
closeRedisConnectionMock,
|
||||
acquireLockMock,
|
||||
generateRequestHashMock,
|
||||
validateSlackSignatureMock,
|
||||
handleWhatsAppVerificationMock,
|
||||
@@ -28,10 +24,6 @@ const {
|
||||
processWebhookMock,
|
||||
executeMock,
|
||||
} = vi.hoisted(() => ({
|
||||
hasProcessedMessageMock: vi.fn().mockResolvedValue(false),
|
||||
markMessageAsProcessedMock: vi.fn().mockResolvedValue(true),
|
||||
closeRedisConnectionMock: vi.fn().mockResolvedValue(undefined),
|
||||
acquireLockMock: vi.fn().mockResolvedValue(true),
|
||||
generateRequestHashMock: vi.fn().mockResolvedValue('test-hash-123'),
|
||||
validateSlackSignatureMock: vi.fn().mockResolvedValue(true),
|
||||
handleWhatsAppVerificationMock: vi.fn().mockResolvedValue(null),
|
||||
@@ -73,13 +65,6 @@ vi.mock('@/background/logs-webhook-delivery', () => ({
|
||||
logsWebhookDelivery: {},
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/redis', () => ({
|
||||
hasProcessedMessage: hasProcessedMessageMock,
|
||||
markMessageAsProcessed: markMessageAsProcessedMock,
|
||||
closeRedisConnection: closeRedisConnectionMock,
|
||||
acquireLock: acquireLockMock,
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/webhooks/utils', () => ({
|
||||
handleWhatsAppVerification: handleWhatsAppVerificationMock,
|
||||
handleSlackChallenge: handleSlackChallengeMock,
|
||||
@@ -201,9 +186,6 @@ describe('Webhook Trigger API Route', () => {
|
||||
workspaceId: 'test-workspace-id',
|
||||
})
|
||||
|
||||
hasProcessedMessageMock.mockResolvedValue(false)
|
||||
markMessageAsProcessedMock.mockResolvedValue(true)
|
||||
acquireLockMock.mockResolvedValue(true)
|
||||
handleWhatsAppVerificationMock.mockResolvedValue(null)
|
||||
processGenericDeduplicationMock.mockResolvedValue(null)
|
||||
processWebhookMock.mockResolvedValue(new Response('Webhook processed', { status: 200 }))
|
||||
|
||||
@@ -164,7 +164,7 @@ function getBlockIconAndColor(
|
||||
return { icon: ParallelTool.icon, bgColor: ParallelTool.bgColor }
|
||||
}
|
||||
if (lowerType === 'workflow') {
|
||||
return { icon: WorkflowIcon, bgColor: '#705335' }
|
||||
return { icon: WorkflowIcon, bgColor: '#6366F1' }
|
||||
}
|
||||
|
||||
// Look up from block registry (model maps to agent)
|
||||
|
||||
@@ -32,7 +32,7 @@ export const WorkflowBlock: BlockConfig = {
|
||||
description:
|
||||
'This is a core workflow block. Execute another workflow as a block in your workflow. Enter the input variable to pass to the child workflow.',
|
||||
category: 'blocks',
|
||||
bgColor: '#705335',
|
||||
bgColor: '#6366F1',
|
||||
icon: WorkflowIcon,
|
||||
subBlocks: [
|
||||
{
|
||||
|
||||
@@ -61,54 +61,6 @@ export function getRedisClient(): Redis | null {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if Redis is ready for commands.
|
||||
* Use for health checks only - commands should be sent regardless (ioredis queues them).
|
||||
*/
|
||||
export function isRedisConnected(): boolean {
|
||||
return globalRedisClient?.status === 'ready'
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Redis connection status for diagnostics.
|
||||
*/
|
||||
export function getRedisStatus(): string {
|
||||
return globalRedisClient?.status ?? 'not initialized'
|
||||
}
|
||||
|
||||
const MESSAGE_ID_PREFIX = 'processed:'
|
||||
const MESSAGE_ID_EXPIRY = 60 * 60 * 24 * 7
|
||||
|
||||
/**
|
||||
* Check if a message has been processed (for idempotency).
|
||||
* Requires Redis - throws if Redis is not available.
|
||||
*/
|
||||
export async function hasProcessedMessage(key: string): Promise<boolean> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
throw new Error('Redis not available for message deduplication')
|
||||
}
|
||||
|
||||
const result = await redis.exists(`${MESSAGE_ID_PREFIX}${key}`)
|
||||
return result === 1
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a message as processed (for idempotency).
|
||||
* Requires Redis - throws if Redis is not available.
|
||||
*/
|
||||
export async function markMessageAsProcessed(
|
||||
key: string,
|
||||
expirySeconds: number = MESSAGE_ID_EXPIRY
|
||||
): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
throw new Error('Redis not available for message deduplication')
|
||||
}
|
||||
|
||||
await redis.set(`${MESSAGE_ID_PREFIX}${key}`, '1', 'EX', expirySeconds)
|
||||
}
|
||||
|
||||
/**
|
||||
* Lua script for safe lock release.
|
||||
* Only deletes the key if the value matches (ownership verification).
|
||||
@@ -125,7 +77,10 @@ end
|
||||
/**
|
||||
* Acquire a distributed lock using Redis SET NX.
|
||||
* Returns true if lock acquired, false if already held.
|
||||
* Requires Redis - throws if Redis is not available.
|
||||
*
|
||||
* When Redis is not available, returns true (lock "acquired") to allow
|
||||
* single-replica deployments to function without Redis. In multi-replica
|
||||
* deployments without Redis, the idempotency layer prevents duplicate processing.
|
||||
*/
|
||||
export async function acquireLock(
|
||||
lockKey: string,
|
||||
@@ -134,36 +89,24 @@ export async function acquireLock(
|
||||
): Promise<boolean> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
throw new Error('Redis not available for distributed locking')
|
||||
return true // No-op when Redis unavailable; idempotency layer handles duplicates
|
||||
}
|
||||
|
||||
const result = await redis.set(lockKey, value, 'EX', expirySeconds, 'NX')
|
||||
return result === 'OK'
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the value of a lock key.
|
||||
* Requires Redis - throws if Redis is not available.
|
||||
*/
|
||||
export async function getLockValue(key: string): Promise<string | null> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
throw new Error('Redis not available')
|
||||
}
|
||||
|
||||
return redis.get(key)
|
||||
}
|
||||
|
||||
/**
|
||||
* Release a distributed lock safely.
|
||||
* Only releases if the caller owns the lock (value matches).
|
||||
* Returns true if lock was released, false if not owned or already expired.
|
||||
* Requires Redis - throws if Redis is not available.
|
||||
*
|
||||
* When Redis is not available, returns true (no-op) since no lock was held.
|
||||
*/
|
||||
export async function releaseLock(lockKey: string, value: string): Promise<boolean> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
throw new Error('Redis not available for distributed locking')
|
||||
return true // No-op when Redis unavailable; no lock was actually held
|
||||
}
|
||||
|
||||
const result = await redis.eval(RELEASE_LOCK_SCRIPT, 1, lockKey, value)
|
||||
|
||||
Reference in New Issue
Block a user