From ae113f76fd26799a7fb1edd1c54a0f347f792a68 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 22 Jan 2026 18:26:25 -0800 Subject: [PATCH] fix cleanup --- apps/sim/lib/core/idempotency/cleanup.ts | 64 +++++++++++++----------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/apps/sim/lib/core/idempotency/cleanup.ts b/apps/sim/lib/core/idempotency/cleanup.ts index 7dd1e2077..1569bfd14 100644 --- a/apps/sim/lib/core/idempotency/cleanup.ts +++ b/apps/sim/lib/core/idempotency/cleanup.ts @@ -1,7 +1,7 @@ import { db } from '@sim/db' import { idempotencyKey } from '@sim/db/schema' import { createLogger } from '@sim/logger' -import { and, eq, lt } from 'drizzle-orm' +import { and, count, inArray, like, lt, max, min, sql } from 'drizzle-orm' const logger = createLogger('IdempotencyCleanup') @@ -19,7 +19,8 @@ export interface CleanupOptions { batchSize?: number /** - * Specific namespace to clean up, or undefined to clean all namespaces + * Specific namespace prefix to clean up (e.g., 'webhook', 'polling') + * Keys are prefixed with namespace, so this filters by key prefix */ namespace?: string } @@ -53,13 +54,17 @@ export async function cleanupExpiredIdempotencyKeys( while (hasMore) { try { + // Build where condition - filter by cutoff date and optionally by namespace prefix const whereCondition = namespace - ? and(lt(idempotencyKey.createdAt, cutoffDate), eq(idempotencyKey.namespace, namespace)) + ? and( + lt(idempotencyKey.createdAt, cutoffDate), + like(idempotencyKey.key, `${namespace}:%`) + ) : lt(idempotencyKey.createdAt, cutoffDate) - // First, find IDs to delete with limit + // Find keys to delete with limit const toDelete = await db - .select({ key: idempotencyKey.key, namespace: idempotencyKey.namespace }) + .select({ key: idempotencyKey.key }) .from(idempotencyKey) .where(whereCondition) .limit(batchSize) @@ -68,14 +73,13 @@ export async function cleanupExpiredIdempotencyKeys( break } - // Delete the found records + // Delete the found records by key const deleteResult = await db .delete(idempotencyKey) .where( - and( - ...toDelete.map((item) => - and(eq(idempotencyKey.key, item.key), eq(idempotencyKey.namespace, item.namespace)) - ) + inArray( + idempotencyKey.key, + toDelete.map((item) => item.key) ) ) .returning({ key: idempotencyKey.key }) @@ -126,6 +130,7 @@ export async function cleanupExpiredIdempotencyKeys( /** * Get statistics about idempotency key usage + * Uses SQL aggregations to avoid loading all keys into memory */ export async function getIdempotencyKeyStats(): Promise<{ totalKeys: number @@ -134,34 +139,35 @@ export async function getIdempotencyKeyStats(): Promise<{ newestKey: Date | null }> { try { - const allKeys = await db + // Get total count and date range in a single query + const [statsResult] = await db .select({ - namespace: idempotencyKey.namespace, - createdAt: idempotencyKey.createdAt, + totalKeys: count(), + oldestKey: min(idempotencyKey.createdAt), + newestKey: max(idempotencyKey.createdAt), }) .from(idempotencyKey) - const totalKeys = allKeys.length + // Get counts by namespace prefix using SQL substring + // Extracts everything before the first ':' as the namespace + const namespaceStats = await db + .select({ + namespace: sql`split_part(${idempotencyKey.key}, ':', 1)`.as('namespace'), + count: count(), + }) + .from(idempotencyKey) + .groupBy(sql`split_part(${idempotencyKey.key}, ':', 1)`) + const keysByNamespace: Record = {} - let oldestKey: Date | null = null - let newestKey: Date | null = null - - for (const key of allKeys) { - keysByNamespace[key.namespace] = (keysByNamespace[key.namespace] || 0) + 1 - - if (!oldestKey || key.createdAt < oldestKey) { - oldestKey = key.createdAt - } - if (!newestKey || key.createdAt > newestKey) { - newestKey = key.createdAt - } + for (const row of namespaceStats) { + keysByNamespace[row.namespace || 'unknown'] = row.count } return { - totalKeys, + totalKeys: statsResult?.totalKeys ?? 0, keysByNamespace, - oldestKey, - newestKey, + oldestKey: statsResult?.oldestKey ?? null, + newestKey: statsResult?.newestKey ?? null, } } catch (error) { logger.error('Failed to get idempotency key stats:', error)