improvement(idempotency): added atomic claims to prevent duplicate processing for long-running workflows (#1366)

* improvement(idempotency): added atomic claims to prevent duplicate processing for long-running workflows

* ack PR comments
This commit is contained in:
Waleed
2025-09-17 17:17:55 -07:00
committed by GitHub
parent 6312df3a07
commit 658cf11299

View File

@@ -19,12 +19,6 @@ export interface IdempotencyConfig {
* Default: 'default'
*/
namespace?: string
/**
* Enable database fallback when Redis is not available
* Default: true
*/
enableDatabaseFallback?: boolean
}
export interface IdempotencyResult {
@@ -44,29 +38,30 @@ export interface IdempotencyResult {
previousResult?: any
/**
* Storage method used ('redis', 'database', 'memory')
* Storage method used ('redis', 'database')
*/
storageMethod: 'redis' | 'database' | 'memory'
storageMethod: 'redis' | 'database'
}
export interface ProcessingResult {
success: boolean
result?: any
error?: string
status?: 'in-progress' | 'completed' | 'failed'
startedAt?: number
}
export interface AtomicClaimResult {
claimed: boolean
existingResult?: ProcessingResult
normalizedKey: string
storageMethod: 'redis' | 'database'
}
const DEFAULT_TTL = 60 * 60 * 24 * 7 // 7 days
const REDIS_KEY_PREFIX = 'idempotency:'
const MEMORY_CACHE_SIZE = 1000
const memoryCache = new Map<
string,
{
result: any
timestamp: number
ttl: number
}
>()
const MAX_WAIT_TIME_MS = 300000 // 5 minutes max wait for in-progress operations
const POLL_INTERVAL_MS = 1000 // Check every 1 second for completion
/**
* Universal idempotency service for webhooks, triggers, and any other operations
@@ -79,7 +74,6 @@ export class IdempotencyService {
this.config = {
ttlSeconds: config.ttlSeconds ?? DEFAULT_TTL,
namespace: config.namespace ?? 'default',
enableDatabaseFallback: config.enableDatabaseFallback ?? true,
}
}
@@ -139,70 +133,202 @@ export class IdempotencyService {
logger.warn(`Redis idempotency check failed for ${normalizedKey}:`, error)
}
if (this.config.enableDatabaseFallback) {
try {
const existing = await db
.select({ result: idempotencyKey.result, createdAt: idempotencyKey.createdAt })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
// Always fallback to database when Redis is not available
try {
const existing = await db
.select({ result: idempotencyKey.result, createdAt: idempotencyKey.createdAt })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
.limit(1)
)
.limit(1)
if (existing.length > 0) {
const item = existing[0]
const isExpired = Date.now() - item.createdAt.getTime() > this.config.ttlSeconds * 1000
if (existing.length > 0) {
const item = existing[0]
const isExpired = Date.now() - item.createdAt.getTime() > this.config.ttlSeconds * 1000
if (!isExpired) {
logger.debug(`Idempotency hit in database: ${normalizedKey}`)
return {
isFirstTime: false,
normalizedKey,
previousResult: item.result,
storageMethod: 'database',
}
if (!isExpired) {
logger.debug(`Idempotency hit in database: ${normalizedKey}`)
return {
isFirstTime: false,
normalizedKey,
previousResult: item.result,
storageMethod: 'database',
}
await db
.delete(idempotencyKey)
.where(eq(idempotencyKey.key, normalizedKey))
.catch((err) => logger.warn(`Failed to clean up expired key ${normalizedKey}:`, err))
}
await db
.delete(idempotencyKey)
.where(eq(idempotencyKey.key, normalizedKey))
.catch((err) => logger.warn(`Failed to clean up expired key ${normalizedKey}:`, err))
}
logger.debug(`Idempotency miss in database: ${normalizedKey}`)
logger.debug(`Idempotency miss in database: ${normalizedKey}`)
return {
isFirstTime: true,
normalizedKey,
storageMethod: 'database',
}
} catch (error) {
logger.error(`Database idempotency check failed for ${normalizedKey}:`, error)
throw new Error(`Failed to check idempotency: database unavailable`)
}
}
/**
* Atomically claim an idempotency key for processing
* Returns true if successfully claimed, false if already exists
*/
async atomicallyClaim(
provider: string,
identifier: string,
additionalContext?: Record<string, any>
): Promise<AtomicClaimResult> {
const normalizedKey = this.normalizeKey(provider, identifier, additionalContext)
const redisKey = `${REDIS_KEY_PREFIX}${normalizedKey}`
const inProgressResult: ProcessingResult = {
success: false,
status: 'in-progress',
startedAt: Date.now(),
}
try {
const redis = getRedisClient()
if (redis) {
const claimed = await redis.set(
redisKey,
JSON.stringify(inProgressResult),
'EX',
this.config.ttlSeconds,
'NX'
)
if (claimed === 'OK') {
logger.debug(`Atomically claimed idempotency key in Redis: ${normalizedKey}`)
return {
claimed: true,
normalizedKey,
storageMethod: 'redis',
}
}
const existingData = await redis.get(redisKey)
const existingResult = existingData ? JSON.parse(existingData) : null
logger.debug(`Idempotency key already claimed in Redis: ${normalizedKey}`)
return {
isFirstTime: true,
claimed: false,
existingResult,
normalizedKey,
storageMethod: 'redis',
}
}
} catch (error) {
logger.warn(`Redis atomic claim failed for ${normalizedKey}:`, error)
}
// Always fallback to database when Redis is not available
try {
const insertResult = await db
.insert(idempotencyKey)
.values({
key: normalizedKey,
namespace: this.config.namespace,
result: inProgressResult,
createdAt: new Date(),
})
.onConflictDoNothing()
.returning({ key: idempotencyKey.key })
if (insertResult.length > 0) {
logger.debug(`Atomically claimed idempotency key in database: ${normalizedKey}`)
return {
claimed: true,
normalizedKey,
storageMethod: 'database',
}
} catch (error) {
logger.warn(`Database idempotency check failed for ${normalizedKey}:`, error)
}
}
const existing = await db
.select({ result: idempotencyKey.result })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
)
.limit(1)
const memoryEntry = memoryCache.get(normalizedKey)
if (memoryEntry) {
const isExpired = Date.now() - memoryEntry.timestamp > memoryEntry.ttl * 1000
if (!isExpired) {
logger.debug(`Idempotency hit in memory: ${normalizedKey}`)
return {
isFirstTime: false,
normalizedKey,
previousResult: memoryEntry.result,
storageMethod: 'memory',
const existingResult =
existing.length > 0 ? (existing[0].result as ProcessingResult) : undefined
logger.debug(`Idempotency key already claimed in database: ${normalizedKey}`)
return {
claimed: false,
existingResult,
normalizedKey,
storageMethod: 'database',
}
} catch (error) {
logger.error(`Database atomic claim failed for ${normalizedKey}:`, error)
throw new Error(`Failed to claim idempotency key: database unavailable`)
}
}
/**
* Wait for an in-progress operation to complete and return its result
*/
async waitForResult<T>(normalizedKey: string, storageMethod: 'redis' | 'database'): Promise<T> {
const startTime = Date.now()
const redisKey = `${REDIS_KEY_PREFIX}${normalizedKey}`
while (Date.now() - startTime < MAX_WAIT_TIME_MS) {
try {
let currentResult: ProcessingResult | null = null
if (storageMethod === 'redis') {
const redis = getRedisClient()
if (redis) {
const data = await redis.get(redisKey)
currentResult = data ? JSON.parse(data) : null
}
} else if (storageMethod === 'database') {
const existing = await db
.select({ result: idempotencyKey.result })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
)
.limit(1)
currentResult = existing.length > 0 ? (existing[0].result as ProcessingResult) : null
}
if (currentResult?.status === 'completed') {
logger.debug(`Operation completed, returning result: ${normalizedKey}`)
if (currentResult.success === false) {
throw new Error(currentResult.error || 'Previous operation failed')
}
return currentResult.result as T
}
if (currentResult?.status === 'failed') {
logger.debug(`Operation failed, throwing error: ${normalizedKey}`)
throw new Error(currentResult.error || 'Previous operation failed')
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
} catch (error) {
if (error instanceof Error && error.message.includes('operation failed')) {
throw error
}
logger.warn(`Error while waiting for result ${normalizedKey}:`, error)
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
}
memoryCache.delete(normalizedKey)
}
logger.debug(`Idempotency miss in memory: ${normalizedKey}`)
return {
isFirstTime: true,
normalizedKey,
storageMethod: 'memory',
}
throw new Error(`Timeout waiting for idempotency operation to complete: ${normalizedKey}`)
}
/**
@@ -211,7 +337,7 @@ export class IdempotencyService {
async storeResult(
normalizedKey: string,
result: ProcessingResult,
storageMethod: 'redis' | 'database' | 'memory'
storageMethod: 'redis' | 'database'
): Promise<void> {
const serializedResult = JSON.stringify(result)
@@ -232,62 +358,34 @@ export class IdempotencyService {
logger.warn(`Failed to store result in Redis for ${normalizedKey}:`, error)
}
if (this.config.enableDatabaseFallback && storageMethod !== 'memory') {
try {
await db
.insert(idempotencyKey)
.values({
key: normalizedKey,
namespace: this.config.namespace,
// Always fallback to database when Redis is not available
try {
await db
.insert(idempotencyKey)
.values({
key: normalizedKey,
namespace: this.config.namespace,
result: result,
createdAt: new Date(),
})
.onConflictDoUpdate({
target: [idempotencyKey.key, idempotencyKey.namespace],
set: {
result: result,
createdAt: new Date(),
})
.onConflictDoUpdate({
target: [idempotencyKey.key, idempotencyKey.namespace],
set: {
result: result,
createdAt: new Date(),
},
})
},
})
logger.debug(`Stored idempotency result in database: ${normalizedKey}`)
return
} catch (error) {
logger.warn(`Failed to store result in database for ${normalizedKey}:`, error)
}
logger.debug(`Stored idempotency result in database: ${normalizedKey}`)
} catch (error) {
logger.error(`Failed to store result in database for ${normalizedKey}:`, error)
throw new Error(`Failed to store idempotency result: database unavailable`)
}
memoryCache.set(normalizedKey, {
result,
timestamp: Date.now(),
ttl: this.config.ttlSeconds,
})
if (memoryCache.size > MEMORY_CACHE_SIZE) {
const entries = Array.from(memoryCache.entries())
const now = Date.now()
entries.forEach(([key, entry]) => {
if (now - entry.timestamp > entry.ttl * 1000) {
memoryCache.delete(key)
}
})
if (memoryCache.size > MEMORY_CACHE_SIZE) {
const sortedEntries = entries
.filter(([key]) => memoryCache.has(key))
.sort((a, b) => a[1].timestamp - b[1].timestamp)
const toRemove = sortedEntries.slice(0, memoryCache.size - MEMORY_CACHE_SIZE)
toRemove.forEach(([key]) => memoryCache.delete(key))
}
}
logger.debug(`Stored idempotency result in memory: ${normalizedKey}`)
}
/**
* Execute an operation with idempotency protection
* Execute an operation with idempotency protection using atomic claims
* Eliminates race conditions by claiming the key before execution
*/
async executeWithIdempotency<T>(
provider: string,
@@ -295,68 +393,105 @@ export class IdempotencyService {
operation: () => Promise<T>,
additionalContext?: Record<string, any>
): Promise<T> {
const idempotencyCheck = await this.checkIdempotency(provider, identifier, additionalContext)
const claimResult = await this.atomicallyClaim(provider, identifier, additionalContext)
if (!idempotencyCheck.isFirstTime) {
logger.info(`Skipping duplicate operation: ${idempotencyCheck.normalizedKey}`)
if (!claimResult.claimed) {
const existingResult = claimResult.existingResult
if (idempotencyCheck.previousResult?.success === false) {
throw new Error(idempotencyCheck.previousResult?.error || 'Previous operation failed')
if (existingResult?.status === 'completed') {
logger.info(`Returning cached result for: ${claimResult.normalizedKey}`)
if (existingResult.success === false) {
throw new Error(existingResult.error || 'Previous operation failed')
}
return existingResult.result as T
}
return idempotencyCheck.previousResult?.result as T
if (existingResult?.status === 'failed') {
logger.info(`Previous operation failed for: ${claimResult.normalizedKey}`)
throw new Error(existingResult.error || 'Previous operation failed')
}
if (existingResult?.status === 'in-progress') {
logger.info(`Waiting for in-progress operation: ${claimResult.normalizedKey}`)
return await this.waitForResult<T>(claimResult.normalizedKey, claimResult.storageMethod)
}
if (existingResult) {
return existingResult.result as T
}
throw new Error(`Unexpected state: key claimed but no existing result found`)
}
try {
logger.debug(`Executing new operation: ${idempotencyCheck.normalizedKey}`)
logger.info(`Executing new operation: ${claimResult.normalizedKey}`)
const result = await operation()
await this.storeResult(
idempotencyCheck.normalizedKey,
{ success: true, result },
idempotencyCheck.storageMethod
claimResult.normalizedKey,
{ success: true, result, status: 'completed' },
claimResult.storageMethod
)
logger.debug(`Successfully completed operation: ${claimResult.normalizedKey}`)
return result
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
await this.storeResult(
idempotencyCheck.normalizedKey,
{ success: false, error: errorMessage },
idempotencyCheck.storageMethod
claimResult.normalizedKey,
{ success: false, error: errorMessage, status: 'failed' },
claimResult.storageMethod
)
logger.warn(`Operation failed: ${claimResult.normalizedKey} - ${errorMessage}`)
throw error
}
}
/**
* Create an idempotency key from a webhook payload
* Create an idempotency key from a webhook payload following RFC best practices
* Priority order:
* 1. Standard webhook headers (webhook-id, x-webhook-id, etc.)
* 2. Event/message IDs from payload
* 3. Deterministic hash of stable payload fields (excluding timestamps)
*/
static createWebhookIdempotencyKey(
webhookId: string,
payload: any,
headers?: Record<string, string>
): string {
// 1. Check for standard webhook headers (RFC compliant)
const webhookIdHeader =
headers?.['x-webhook-id'] ||
headers?.['webhook-id'] || // Standard Webhooks spec
headers?.['x-webhook-id'] || // Legacy standard
headers?.['x-shopify-webhook-id'] ||
headers?.['x-github-delivery'] ||
headers?.['stripe-signature']?.split(',')[0]
headers?.['x-event-id'] // Generic event ID header
if (webhookIdHeader) {
return `${webhookId}:${webhookIdHeader}`
}
const payloadId = payload?.id || payload?.event_id || payload?.message?.id || payload?.data?.id
// 2. Extract event/message IDs from payload (most reliable)
const payloadId =
payload?.id ||
payload?.event_id ||
payload?.eventId ||
payload?.message?.id ||
payload?.data?.id ||
payload?.object?.id ||
payload?.event?.id
if (payloadId) {
return `${webhookId}:${payloadId}`
}
// 3. Create deterministic hash from stable payload fields (excluding timestamps)
const stablePayload = IdempotencyService.createStablePayloadForHashing(payload)
const payloadHash = crypto
.createHash('sha256')
.update(JSON.stringify(payload))
.update(JSON.stringify(stablePayload))
.digest('hex')
.substring(0, 16)
@@ -364,29 +499,62 @@ export class IdempotencyService {
}
/**
* Create an idempotency key for Gmail polling
* Create a stable representation of the payload for hashing by removing
* timestamp and other volatile fields that change between requests
*/
static createGmailIdempotencyKey(webhookId: string, emailId: string): string {
return `${webhookId}:${emailId}`
}
/**
* Create an idempotency key for generic triggers
*/
static createTriggerIdempotencyKey(
triggerId: string,
eventId: string,
additionalContext?: Record<string, string>
): string {
const base = `${triggerId}:${eventId}`
if (additionalContext && Object.keys(additionalContext).length > 0) {
const contextStr = Object.keys(additionalContext)
.sort()
.map((key) => `${key}=${additionalContext[key]}`)
.join('&')
return `${base}:${contextStr}`
private static createStablePayloadForHashing(payload: any): any {
if (!payload || typeof payload !== 'object') {
return payload
}
return base
const volatileFields = [
'timestamp',
'created_at',
'updated_at',
'sent_at',
'received_at',
'processed_at',
'delivered_at',
'attempt',
'retry_count',
'request_id',
'trace_id',
'span_id',
'delivery_id',
'webhook_timestamp',
]
const cleanPayload = { ...payload }
const removeVolatileFields = (obj: any): any => {
if (!obj || typeof obj !== 'object') return obj
if (Array.isArray(obj)) {
return obj.map(removeVolatileFields)
}
const cleaned: any = {}
for (const [key, value] of Object.entries(obj)) {
const lowerKey = key.toLowerCase()
if (volatileFields.some((field) => lowerKey.includes(field))) {
continue
}
if (typeof value === 'string' && /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}/.test(value)) {
continue
}
if (typeof value === 'number' && value > 1000000000 && value < 9999999999) {
continue
}
cleaned[key] = removeVolatileFields(value)
}
return cleaned
}
return removeVolatileFields(cleanPayload)
}
}