mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-23 05:47:59 -05:00
fix cleanup
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import { db } from '@sim/db'
|
||||
import { idempotencyKey } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq, lt } from 'drizzle-orm'
|
||||
import { and, count, inArray, like, lt, max, min, sql } from 'drizzle-orm'
|
||||
|
||||
const logger = createLogger('IdempotencyCleanup')
|
||||
|
||||
@@ -19,7 +19,8 @@ export interface CleanupOptions {
|
||||
batchSize?: number
|
||||
|
||||
/**
|
||||
* Specific namespace to clean up, or undefined to clean all namespaces
|
||||
* Specific namespace prefix to clean up (e.g., 'webhook', 'polling')
|
||||
* Keys are prefixed with namespace, so this filters by key prefix
|
||||
*/
|
||||
namespace?: string
|
||||
}
|
||||
@@ -53,13 +54,17 @@ export async function cleanupExpiredIdempotencyKeys(
|
||||
|
||||
while (hasMore) {
|
||||
try {
|
||||
// Build where condition - filter by cutoff date and optionally by namespace prefix
|
||||
const whereCondition = namespace
|
||||
? and(lt(idempotencyKey.createdAt, cutoffDate), eq(idempotencyKey.namespace, namespace))
|
||||
? and(
|
||||
lt(idempotencyKey.createdAt, cutoffDate),
|
||||
like(idempotencyKey.key, `${namespace}:%`)
|
||||
)
|
||||
: lt(idempotencyKey.createdAt, cutoffDate)
|
||||
|
||||
// First, find IDs to delete with limit
|
||||
// Find keys to delete with limit
|
||||
const toDelete = await db
|
||||
.select({ key: idempotencyKey.key, namespace: idempotencyKey.namespace })
|
||||
.select({ key: idempotencyKey.key })
|
||||
.from(idempotencyKey)
|
||||
.where(whereCondition)
|
||||
.limit(batchSize)
|
||||
@@ -68,14 +73,13 @@ export async function cleanupExpiredIdempotencyKeys(
|
||||
break
|
||||
}
|
||||
|
||||
// Delete the found records
|
||||
// Delete the found records by key
|
||||
const deleteResult = await db
|
||||
.delete(idempotencyKey)
|
||||
.where(
|
||||
and(
|
||||
...toDelete.map((item) =>
|
||||
and(eq(idempotencyKey.key, item.key), eq(idempotencyKey.namespace, item.namespace))
|
||||
)
|
||||
inArray(
|
||||
idempotencyKey.key,
|
||||
toDelete.map((item) => item.key)
|
||||
)
|
||||
)
|
||||
.returning({ key: idempotencyKey.key })
|
||||
@@ -126,6 +130,7 @@ export async function cleanupExpiredIdempotencyKeys(
|
||||
|
||||
/**
|
||||
* Get statistics about idempotency key usage
|
||||
* Uses SQL aggregations to avoid loading all keys into memory
|
||||
*/
|
||||
export async function getIdempotencyKeyStats(): Promise<{
|
||||
totalKeys: number
|
||||
@@ -134,34 +139,35 @@ export async function getIdempotencyKeyStats(): Promise<{
|
||||
newestKey: Date | null
|
||||
}> {
|
||||
try {
|
||||
const allKeys = await db
|
||||
// Get total count and date range in a single query
|
||||
const [statsResult] = await db
|
||||
.select({
|
||||
namespace: idempotencyKey.namespace,
|
||||
createdAt: idempotencyKey.createdAt,
|
||||
totalKeys: count(),
|
||||
oldestKey: min(idempotencyKey.createdAt),
|
||||
newestKey: max(idempotencyKey.createdAt),
|
||||
})
|
||||
.from(idempotencyKey)
|
||||
|
||||
const totalKeys = allKeys.length
|
||||
// Get counts by namespace prefix using SQL substring
|
||||
// Extracts everything before the first ':' as the namespace
|
||||
const namespaceStats = await db
|
||||
.select({
|
||||
namespace: sql<string>`split_part(${idempotencyKey.key}, ':', 1)`.as('namespace'),
|
||||
count: count(),
|
||||
})
|
||||
.from(idempotencyKey)
|
||||
.groupBy(sql`split_part(${idempotencyKey.key}, ':', 1)`)
|
||||
|
||||
const keysByNamespace: Record<string, number> = {}
|
||||
let oldestKey: Date | null = null
|
||||
let newestKey: Date | null = null
|
||||
|
||||
for (const key of allKeys) {
|
||||
keysByNamespace[key.namespace] = (keysByNamespace[key.namespace] || 0) + 1
|
||||
|
||||
if (!oldestKey || key.createdAt < oldestKey) {
|
||||
oldestKey = key.createdAt
|
||||
}
|
||||
if (!newestKey || key.createdAt > newestKey) {
|
||||
newestKey = key.createdAt
|
||||
}
|
||||
for (const row of namespaceStats) {
|
||||
keysByNamespace[row.namespace || 'unknown'] = row.count
|
||||
}
|
||||
|
||||
return {
|
||||
totalKeys,
|
||||
totalKeys: statsResult?.totalKeys ?? 0,
|
||||
keysByNamespace,
|
||||
oldestKey,
|
||||
newestKey,
|
||||
oldestKey: statsResult?.oldestKey ?? null,
|
||||
newestKey: statsResult?.newestKey ?? null,
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to get idempotency key stats:', error)
|
||||
|
||||
Reference in New Issue
Block a user