mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-09 15:07:55 -05:00
feat(sessions): remove redis as priority option for session data (#1600)
This commit is contained in:
@@ -6,7 +6,7 @@ import { z } from 'zod'
|
||||
import { renderOTPEmail } from '@/components/emails/render-email'
|
||||
import { sendEmail } from '@/lib/email/mailer'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { getRedisClient } from '@/lib/redis'
|
||||
import { getRedisClient, markMessageAsProcessed, releaseLock } from '@/lib/redis'
|
||||
import { generateRequestId } from '@/lib/utils'
|
||||
import { addCorsHeaders, setChatAuthCookie } from '@/app/api/chat/utils'
|
||||
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
|
||||
@@ -21,52 +21,83 @@ function generateOTP() {
|
||||
// We use 15 minutes (900 seconds) expiry for OTPs
|
||||
const OTP_EXPIRY = 15 * 60
|
||||
|
||||
async function storeOTP(email: string, chatId: string, otp: string): Promise<boolean> {
|
||||
// Store OTP in Redis
|
||||
async function storeOTP(email: string, chatId: string, otp: string): Promise<void> {
|
||||
const key = `otp:${email}:${chatId}`
|
||||
const redis = getRedisClient()
|
||||
|
||||
if (!redis) {
|
||||
logger.warn('Redis not available, OTP functionality requires Redis')
|
||||
return false
|
||||
}
|
||||
|
||||
try {
|
||||
if (redis) {
|
||||
// Use Redis if available
|
||||
await redis.set(key, otp, 'EX', OTP_EXPIRY)
|
||||
return true
|
||||
} catch (error) {
|
||||
logger.error('Error storing OTP in Redis:', error)
|
||||
return false
|
||||
} else {
|
||||
// Use the existing function as fallback to mark that an OTP exists
|
||||
await markMessageAsProcessed(key, OTP_EXPIRY)
|
||||
|
||||
// For the fallback case, we need to handle storing the OTP value separately
|
||||
// since markMessageAsProcessed only stores "1"
|
||||
const valueKey = `${key}:value`
|
||||
try {
|
||||
// Access the in-memory cache directly - hacky but works for fallback
|
||||
const inMemoryCache = (global as any).inMemoryCache
|
||||
if (inMemoryCache) {
|
||||
const fullKey = `processed:${valueKey}`
|
||||
const expiry = OTP_EXPIRY ? Date.now() + OTP_EXPIRY * 1000 : null
|
||||
inMemoryCache.set(fullKey, { value: otp, expiry })
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error storing OTP in fallback cache:', error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get OTP from Redis
|
||||
async function getOTP(email: string, chatId: string): Promise<string | null> {
|
||||
const key = `otp:${email}:${chatId}`
|
||||
const redis = getRedisClient()
|
||||
|
||||
if (!redis) {
|
||||
return null
|
||||
}
|
||||
|
||||
try {
|
||||
if (redis) {
|
||||
// Use Redis if available
|
||||
return await redis.get(key)
|
||||
} catch (error) {
|
||||
logger.error('Error getting OTP from Redis:', error)
|
||||
}
|
||||
// Use the existing function as fallback - check if it exists
|
||||
const exists = await new Promise((resolve) => {
|
||||
try {
|
||||
// Check the in-memory cache directly - hacky but works for fallback
|
||||
const inMemoryCache = (global as any).inMemoryCache
|
||||
const fullKey = `processed:${key}`
|
||||
const cacheEntry = inMemoryCache?.get(fullKey)
|
||||
resolve(!!cacheEntry)
|
||||
} catch {
|
||||
resolve(false)
|
||||
}
|
||||
})
|
||||
|
||||
if (!exists) return null
|
||||
|
||||
// Try to get the value key
|
||||
const valueKey = `${key}:value`
|
||||
try {
|
||||
const inMemoryCache = (global as any).inMemoryCache
|
||||
const fullKey = `processed:${valueKey}`
|
||||
const cacheEntry = inMemoryCache?.get(fullKey)
|
||||
return cacheEntry?.value || null
|
||||
} catch {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
// Delete OTP from Redis
|
||||
async function deleteOTP(email: string, chatId: string): Promise<void> {
|
||||
const key = `otp:${email}:${chatId}`
|
||||
const redis = getRedisClient()
|
||||
|
||||
if (!redis) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
if (redis) {
|
||||
// Use Redis if available
|
||||
await redis.del(key)
|
||||
} catch (error) {
|
||||
logger.error('Error deleting OTP from Redis:', error)
|
||||
} else {
|
||||
// Use the existing function as fallback
|
||||
await releaseLock(`processed:${key}`)
|
||||
await releaseLock(`processed:${key}:value`)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,17 +177,7 @@ export async function POST(
|
||||
|
||||
const otp = generateOTP()
|
||||
|
||||
const stored = await storeOTP(email, deployment.id, otp)
|
||||
if (!stored) {
|
||||
logger.error(`[${requestId}] Failed to store OTP - Redis unavailable`)
|
||||
return addCorsHeaders(
|
||||
createErrorResponse(
|
||||
'Email verification temporarily unavailable, please try again later',
|
||||
503
|
||||
),
|
||||
request
|
||||
)
|
||||
}
|
||||
await storeOTP(email, deployment.id, otp)
|
||||
|
||||
const emailHtml = await renderOTPEmail(
|
||||
otp,
|
||||
|
||||
@@ -44,7 +44,6 @@ import { quickValidateEmail } from '@/lib/email/validation'
|
||||
import { env, isTruthy } from '@/lib/env'
|
||||
import { isBillingEnabled, isEmailVerificationEnabled } from '@/lib/environment'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { getRedisClient } from '@/lib/redis'
|
||||
import { SSO_TRUSTED_PROVIDERS } from './sso/consts'
|
||||
|
||||
const logger = createLogger('Auth')
|
||||
@@ -60,40 +59,6 @@ if (validStripeKey) {
|
||||
})
|
||||
}
|
||||
|
||||
// Configure Redis secondary storage for session data (optional)
|
||||
const redis = getRedisClient()
|
||||
const redisSecondaryStorage = redis
|
||||
? {
|
||||
get: async (key: string) => {
|
||||
try {
|
||||
const value = await redis.get(key)
|
||||
return value
|
||||
} catch (error) {
|
||||
logger.error('Redis get error in secondaryStorage', { key, error })
|
||||
return null
|
||||
}
|
||||
},
|
||||
set: async (key: string, value: string, ttl?: number) => {
|
||||
try {
|
||||
if (ttl) {
|
||||
await redis.set(key, value, 'EX', ttl)
|
||||
} else {
|
||||
await redis.set(key, value)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Redis set error in secondaryStorage', { key, ttl, error })
|
||||
}
|
||||
},
|
||||
delete: async (key: string) => {
|
||||
try {
|
||||
await redis.del(key)
|
||||
} catch (error) {
|
||||
logger.error('Redis delete error in secondaryStorage', { key, error })
|
||||
}
|
||||
},
|
||||
}
|
||||
: undefined
|
||||
|
||||
export const auth = betterAuth({
|
||||
baseURL: getBaseURL(),
|
||||
trustedOrigins: [
|
||||
@@ -104,8 +69,6 @@ export const auth = betterAuth({
|
||||
provider: 'pg',
|
||||
schema,
|
||||
}),
|
||||
// Conditionally add secondaryStorage only if Redis is available
|
||||
...(redisSecondaryStorage ? { secondaryStorage: redisSecondaryStorage } : {}),
|
||||
session: {
|
||||
cookieCache: {
|
||||
enabled: true,
|
||||
|
||||
@@ -4,19 +4,25 @@ import { createLogger } from '@/lib/logs/console/logger'
|
||||
|
||||
const logger = createLogger('Redis')
|
||||
|
||||
// Only use Redis if explicitly configured
|
||||
const redisUrl = env.REDIS_URL
|
||||
|
||||
// Global Redis client for connection pooling
|
||||
let globalRedisClient: Redis | null = null
|
||||
|
||||
const MESSAGE_ID_PREFIX = 'processed:'
|
||||
const MESSAGE_ID_EXPIRY = 60 * 60 * 24 * 7
|
||||
// Fallback in-memory cache for when Redis is not available
|
||||
const inMemoryCache = new Map<string, { value: string; expiry: number | null }>()
|
||||
const MAX_CACHE_SIZE = 1000
|
||||
|
||||
/**
|
||||
* Get a Redis client instance
|
||||
* Uses connection pooling to avoid creating a new connection for each request
|
||||
*/
|
||||
export function getRedisClient(): Redis | null {
|
||||
// For server-side only
|
||||
if (typeof window !== 'undefined') return null
|
||||
|
||||
// Return null immediately if no Redis URL is configured
|
||||
if (!redisUrl) {
|
||||
return null
|
||||
}
|
||||
@@ -24,19 +30,25 @@ export function getRedisClient(): Redis | null {
|
||||
if (globalRedisClient) return globalRedisClient
|
||||
|
||||
try {
|
||||
// Create a new Redis client with optimized settings for serverless
|
||||
globalRedisClient = new Redis(redisUrl, {
|
||||
// Keep alive is critical for serverless to reuse connections
|
||||
keepAlive: 1000,
|
||||
// Faster connection timeout for serverless
|
||||
connectTimeout: 5000,
|
||||
// Disable reconnection attempts in serverless
|
||||
maxRetriesPerRequest: 3,
|
||||
// Retry strategy with exponential backoff
|
||||
retryStrategy: (times) => {
|
||||
if (times > 5) {
|
||||
logger.warn('Redis connection failed after 5 attempts')
|
||||
return null
|
||||
logger.warn('Redis connection failed after 5 attempts, using fallback')
|
||||
return null // Stop retrying
|
||||
}
|
||||
return Math.min(times * 200, 2000)
|
||||
return Math.min(times * 200, 2000) // Exponential backoff
|
||||
},
|
||||
})
|
||||
|
||||
// Handle connection events
|
||||
globalRedisClient.on('error', (err: any) => {
|
||||
logger.error('Redis connection error:', { err })
|
||||
if (err.code === 'ECONNREFUSED' || err.code === 'ETIMEDOUT') {
|
||||
@@ -53,90 +65,144 @@ export function getRedisClient(): Redis | null {
|
||||
}
|
||||
}
|
||||
|
||||
// Message ID cache functions
|
||||
const MESSAGE_ID_PREFIX = 'processed:' // Generic prefix
|
||||
const MESSAGE_ID_EXPIRY = 60 * 60 * 24 * 7 // 7 days in seconds
|
||||
|
||||
/**
|
||||
* Check if a key exists in Redis
|
||||
* @param key The key to check
|
||||
* @returns True if the key exists, false otherwise
|
||||
* Check if a key exists in Redis or fallback cache.
|
||||
* @param key The key to check (e.g., messageId, lockKey).
|
||||
* @returns True if the key exists and hasn't expired, false otherwise.
|
||||
*/
|
||||
export async function hasProcessedMessage(key: string): Promise<boolean> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
return false
|
||||
}
|
||||
|
||||
try {
|
||||
const fullKey = `${MESSAGE_ID_PREFIX}${key}`
|
||||
const result = await redis.exists(fullKey)
|
||||
return result === 1
|
||||
const redis = getRedisClient()
|
||||
const fullKey = `${MESSAGE_ID_PREFIX}${key}` // Use generic prefix
|
||||
|
||||
if (redis) {
|
||||
// Use Redis if available
|
||||
const result = await redis.exists(fullKey)
|
||||
return result === 1
|
||||
}
|
||||
// Fallback to in-memory cache
|
||||
const cacheEntry = inMemoryCache.get(fullKey)
|
||||
if (!cacheEntry) return false
|
||||
|
||||
// Check if the entry has expired
|
||||
if (cacheEntry.expiry && cacheEntry.expiry < Date.now()) {
|
||||
inMemoryCache.delete(fullKey)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
} catch (error) {
|
||||
logger.error(`Error checking key ${key}:`, { error })
|
||||
return false
|
||||
// Fallback to in-memory cache on error
|
||||
const fullKey = `${MESSAGE_ID_PREFIX}${key}`
|
||||
const cacheEntry = inMemoryCache.get(fullKey)
|
||||
return !!cacheEntry && (!cacheEntry.expiry || cacheEntry.expiry > Date.now())
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a key as processed in Redis
|
||||
* @param key The key to mark
|
||||
* @param expirySeconds Optional expiry time in seconds (defaults to 7 days)
|
||||
* Mark a key as processed/present in Redis or fallback cache.
|
||||
* @param key The key to mark (e.g., messageId, lockKey).
|
||||
* @param expirySeconds Optional expiry time in seconds (defaults to 7 days).
|
||||
*/
|
||||
export async function markMessageAsProcessed(
|
||||
key: string,
|
||||
expirySeconds: number = MESSAGE_ID_EXPIRY
|
||||
): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
logger.warn(`Cannot mark message as processed - Redis unavailable: ${key}`)
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const fullKey = `${MESSAGE_ID_PREFIX}${key}`
|
||||
await redis.set(fullKey, '1', 'EX', expirySeconds)
|
||||
const redis = getRedisClient()
|
||||
const fullKey = `${MESSAGE_ID_PREFIX}${key}` // Use generic prefix
|
||||
|
||||
if (redis) {
|
||||
// Use Redis if available - use pipelining for efficiency
|
||||
await redis.set(fullKey, '1', 'EX', expirySeconds)
|
||||
} else {
|
||||
// Fallback to in-memory cache
|
||||
const expiry = expirySeconds ? Date.now() + expirySeconds * 1000 : null
|
||||
inMemoryCache.set(fullKey, { value: '1', expiry })
|
||||
|
||||
// Clean up old message IDs if cache gets too large
|
||||
if (inMemoryCache.size > MAX_CACHE_SIZE) {
|
||||
const now = Date.now()
|
||||
|
||||
// First try to remove expired entries
|
||||
for (const [cacheKey, entry] of inMemoryCache.entries()) {
|
||||
if (entry.expiry && entry.expiry < now) {
|
||||
inMemoryCache.delete(cacheKey)
|
||||
}
|
||||
}
|
||||
|
||||
// If still too large, remove oldest entries (FIFO based on insertion order)
|
||||
if (inMemoryCache.size > MAX_CACHE_SIZE) {
|
||||
const keysToDelete = Array.from(inMemoryCache.keys()).slice(
|
||||
0,
|
||||
inMemoryCache.size - MAX_CACHE_SIZE
|
||||
)
|
||||
|
||||
for (const keyToDelete of keysToDelete) {
|
||||
inMemoryCache.delete(keyToDelete)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Error marking key ${key} as processed:`, { error })
|
||||
// Fallback to in-memory cache on error
|
||||
const fullKey = `${MESSAGE_ID_PREFIX}${key}`
|
||||
const expiry = expirySeconds ? Date.now() + expirySeconds * 1000 : null
|
||||
inMemoryCache.set(fullKey, { value: '1', expiry })
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to acquire a distributed lock using Redis SET NX command
|
||||
* @param lockKey The key to use for the lock
|
||||
* @param value The value to set (e.g., a unique identifier for the process holding the lock)
|
||||
* @param expirySeconds The lock's time-to-live in seconds
|
||||
* @returns True if the lock was acquired successfully, false otherwise
|
||||
* Attempts to acquire a lock using Redis SET NX command.
|
||||
* @param lockKey The key to use for the lock.
|
||||
* @param value The value to set (e.g., a unique identifier for the process holding the lock).
|
||||
* @param expirySeconds The lock's time-to-live in seconds.
|
||||
* @returns True if the lock was acquired successfully, false otherwise.
|
||||
*/
|
||||
export async function acquireLock(
|
||||
lockKey: string,
|
||||
value: string,
|
||||
expirySeconds: number
|
||||
): Promise<boolean> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
logger.warn('Redis client not available, cannot acquire lock.')
|
||||
return false
|
||||
}
|
||||
|
||||
try {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
logger.warn('Redis client not available, cannot acquire lock.')
|
||||
// Fallback behavior: maybe allow processing but log a warning?
|
||||
// Or treat as lock acquired if no Redis? Depends on desired behavior.
|
||||
return true // Or false, depending on safety requirements
|
||||
}
|
||||
|
||||
// Use SET key value EX expirySeconds NX
|
||||
// Returns "OK" if successful, null if key already exists (lock held)
|
||||
const result = await redis.set(lockKey, value, 'EX', expirySeconds, 'NX')
|
||||
|
||||
return result === 'OK'
|
||||
} catch (error) {
|
||||
logger.error(`Error acquiring lock for key ${lockKey}:`, { error })
|
||||
// Treat errors as failure to acquire lock for safety
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the value of a key from Redis
|
||||
* @param key The key to retrieve
|
||||
* @returns The value of the key, or null if the key doesn't exist or an error occurs
|
||||
* Retrieves the value of a key from Redis.
|
||||
* @param key The key to retrieve.
|
||||
* @returns The value of the key, or null if the key doesn't exist or an error occurs.
|
||||
*/
|
||||
export async function getLockValue(key: string): Promise<string | null> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
logger.warn('Redis client not available, cannot get lock value.')
|
||||
return null
|
||||
}
|
||||
|
||||
try {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
logger.warn('Redis client not available, cannot get lock value.')
|
||||
return null // Cannot determine lock value
|
||||
}
|
||||
return await redis.get(key)
|
||||
} catch (error) {
|
||||
logger.error(`Error getting value for key ${key}:`, { error })
|
||||
@@ -145,18 +211,20 @@ export async function getLockValue(key: string): Promise<string | null> {
|
||||
}
|
||||
|
||||
/**
|
||||
* Release a lock by deleting the key
|
||||
* @param lockKey The key of the lock to release
|
||||
* Releases a lock by deleting the key.
|
||||
* Ideally, use Lua script for safe release (check value before deleting),
|
||||
* but simple DEL is often sufficient if lock expiry is handled well.
|
||||
* @param lockKey The key of the lock to release.
|
||||
*/
|
||||
export async function releaseLock(lockKey: string): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
logger.warn('Redis client not available, cannot release lock.')
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
await redis.del(lockKey)
|
||||
const redis = getRedisClient()
|
||||
if (redis) {
|
||||
await redis.del(lockKey)
|
||||
} else {
|
||||
logger.warn('Redis client not available, cannot release lock.')
|
||||
// No fallback needed for releasing if using in-memory cache for locking wasn't implemented
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Error releasing lock for key ${lockKey}:`, { error })
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user