fix(retention-job): add chunking strategy for cleanup (#4305)

* fix(retention-job): add chunking strategy for cleanup

* change stats to be perjob not per chunk
This commit is contained in:
Theodore Li
2026-04-27 12:13:00 -07:00
committed by GitHub
parent 79ff5d80b3
commit 65e17de065
4 changed files with 280 additions and 249 deletions

View File

@@ -4,169 +4,71 @@ import { createLogger } from '@sim/logger'
import { task } from '@trigger.dev/sdk'
import { and, inArray, lt } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
chunkedBatchDelete,
type TableCleanupResult,
} from '@/lib/cleanup/batch-delete'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
import { deleteFileMetadata } from '@/lib/uploads/server/metadata'
const logger = createLogger('CleanupLogs')
const BATCH_SIZE = 2000
const MAX_BATCHES_PER_TIER = 10
interface TierResults {
total: number
deleted: number
deleteFailed: number
interface FileDeleteStats {
filesTotal: number
filesDeleted: number
filesDeleteFailed: number
}
function emptyTierResults(): TierResults {
return {
total: 0,
deleted: 0,
deleteFailed: 0,
filesTotal: 0,
filesDeleted: 0,
filesDeleteFailed: 0,
}
}
async function deleteExecutionFiles(files: unknown, results: TierResults): Promise<void> {
async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise<void> {
if (!isUsingCloudStorage() || !files || !Array.isArray(files)) return
const keys = files.filter((f) => f && typeof f === 'object' && f.key).map((f) => f.key as string)
results.filesTotal += keys.length
stats.filesTotal += keys.length
await Promise.all(
keys.map(async (key) => {
try {
await StorageService.deleteFile({ key, context: 'execution' })
await deleteFileMetadata(key)
results.filesDeleted++
stats.filesDeleted++
} catch (fileError) {
results.filesDeleteFailed++
stats.filesDeleteFailed++
logger.error(`Failed to delete file ${key}:`, { fileError })
}
})
)
}
async function cleanupTier(
async function cleanupWorkflowExecutionLogs(
workspaceIds: string[],
retentionDate: Date,
label: string
): Promise<TierResults> {
const results = emptyTierResults()
if (workspaceIds.length === 0) return results
): Promise<TableCleanupResult & FileDeleteStats> {
const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 }
let batchesProcessed = 0
let hasMore = true
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
const batch = await db
.select({
id: workflowExecutionLogs.id,
files: workflowExecutionLogs.files,
})
.from(workflowExecutionLogs)
.where(
and(
inArray(workflowExecutionLogs.workspaceId, workspaceIds),
lt(workflowExecutionLogs.startedAt, retentionDate)
const dbStats = await chunkedBatchDelete({
tableDef: workflowExecutionLogs,
workspaceIds,
tableName: `${label}/workflow_execution_logs`,
selectChunk: (chunkIds, limit) =>
db
.select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files })
.from(workflowExecutionLogs)
.where(
and(
inArray(workflowExecutionLogs.workspaceId, chunkIds),
lt(workflowExecutionLogs.startedAt, retentionDate)
)
)
)
.limit(BATCH_SIZE)
.limit(limit),
onBatch: async (rows) => {
for (const row of rows) await deleteExecutionFiles(row.files, fileStats)
},
})
results.total += batch.length
if (batch.length === 0) {
hasMore = false
break
}
for (const log of batch) {
await deleteExecutionFiles(log.files, results)
}
const logIds = batch.map((log) => log.id)
try {
const deleted = await db
.delete(workflowExecutionLogs)
.where(inArray(workflowExecutionLogs.id, logIds))
.returning({ id: workflowExecutionLogs.id })
results.deleted += deleted.length
} catch (deleteError) {
results.deleteFailed += logIds.length
logger.error(`Batch delete failed for ${label}:`, { deleteError })
}
batchesProcessed++
hasMore = batch.length === BATCH_SIZE
logger.info(`[${label}] Batch ${batchesProcessed}: ${batch.length} logs processed`)
}
return results
}
interface JobLogCleanupResults {
deleted: number
deleteFailed: number
}
async function cleanupJobExecutionLogsTier(
workspaceIds: string[],
retentionDate: Date,
label: string
): Promise<JobLogCleanupResults> {
const results: JobLogCleanupResults = { deleted: 0, deleteFailed: 0 }
if (workspaceIds.length === 0) return results
let batchesProcessed = 0
let hasMore = true
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
const batch = await db
.select({ id: jobExecutionLogs.id })
.from(jobExecutionLogs)
.where(
and(
inArray(jobExecutionLogs.workspaceId, workspaceIds),
lt(jobExecutionLogs.startedAt, retentionDate)
)
)
.limit(BATCH_SIZE)
if (batch.length === 0) {
hasMore = false
break
}
const logIds = batch.map((log) => log.id)
try {
const deleted = await db
.delete(jobExecutionLogs)
.where(inArray(jobExecutionLogs.id, logIds))
.returning({ id: jobExecutionLogs.id })
results.deleted += deleted.length
} catch (deleteError) {
results.deleteFailed += logIds.length
logger.error(`Batch delete failed for ${label} (job_execution_logs):`, { deleteError })
}
batchesProcessed++
hasMore = batch.length === BATCH_SIZE
logger.info(
`[${label}] job_execution_logs batch ${batchesProcessed}: ${batch.length} rows processed`
)
}
return results
return { ...dbStats, ...fileStats }
}
export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void> {
@@ -190,15 +92,19 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
`[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
)
const results = await cleanupTier(workspaceIds, retentionDate, label)
const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label)
logger.info(
`[${label}] workflow_execution_logs: ${results.deleted} deleted, ${results.deleteFailed} failed out of ${results.total} candidates`
`[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed`
)
const jobLogResults = await cleanupJobExecutionLogsTier(workspaceIds, retentionDate, label)
logger.info(
`[${label}] job_execution_logs: ${jobLogResults.deleted} deleted, ${jobLogResults.deleteFailed} failed`
)
await batchDeleteByWorkspaceAndTimestamp({
tableDef: jobExecutionLogs,
workspaceIdCol: jobExecutionLogs.workspaceId,
timestampCol: jobExecutionLogs.startedAt,
workspaceIds,
retentionDate,
tableName: `${label}/job_execution_logs`,
})
// Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces.
if (payload.plan === 'free') {

View File

@@ -18,9 +18,8 @@ import { and, inArray, isNotNull, lt } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
DEFAULT_BATCH_SIZE,
DEFAULT_MAX_BATCHES_PER_TABLE,
deleteRowsById,
selectRowsByIdChunks,
} from '@/lib/cleanup/batch-delete'
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
import type { StorageContext } from '@/lib/uploads'
@@ -44,35 +43,37 @@ async function selectExpiredWorkspaceFiles(
workspaceIds: string[],
retentionDate: Date
): Promise<WorkspaceFileScope> {
const limit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE
const [legacyRows, multiContextRows] = await Promise.all([
db
.select({ id: workspaceFile.id, key: workspaceFile.key })
.from(workspaceFile)
.where(
and(
inArray(workspaceFile.workspaceId, workspaceIds),
isNotNull(workspaceFile.deletedAt),
lt(workspaceFile.deletedAt, retentionDate)
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: workspaceFile.id, key: workspaceFile.key })
.from(workspaceFile)
.where(
and(
inArray(workspaceFile.workspaceId, chunkIds),
isNotNull(workspaceFile.deletedAt),
lt(workspaceFile.deletedAt, retentionDate)
)
)
)
.limit(limit),
db
.select({
id: workspaceFiles.id,
key: workspaceFiles.key,
context: workspaceFiles.context,
})
.from(workspaceFiles)
.where(
and(
inArray(workspaceFiles.workspaceId, workspaceIds),
isNotNull(workspaceFiles.deletedAt),
lt(workspaceFiles.deletedAt, retentionDate)
.limit(chunkLimit)
),
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({
id: workspaceFiles.id,
key: workspaceFiles.key,
context: workspaceFiles.context,
})
.from(workspaceFiles)
.where(
and(
inArray(workspaceFiles.workspaceId, chunkIds),
isNotNull(workspaceFiles.deletedAt),
lt(workspaceFiles.deletedAt, retentionDate)
)
)
)
.limit(limit),
.limit(chunkLimit)
),
])
return {
@@ -182,17 +183,19 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
// (chats + S3) AND the DB deletes below — selecting twice could return
// different subsets above the LIMIT cap and orphan or prematurely purge data.
const [doomedWorkflows, fileScope] = await Promise.all([
db
.select({ id: workflow.id })
.from(workflow)
.where(
and(
inArray(workflow.workspaceId, workspaceIds),
isNotNull(workflow.archivedAt),
lt(workflow.archivedAt, retentionDate)
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: workflow.id })
.from(workflow)
.where(
and(
inArray(workflow.workspaceId, chunkIds),
isNotNull(workflow.archivedAt),
lt(workflow.archivedAt, retentionDate)
)
)
)
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE),
.limit(chunkLimit)
),
selectExpiredWorkspaceFiles(workspaceIds, retentionDate),
])
@@ -200,11 +203,13 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
let chatCleanup: { execute: () => Promise<void> } | null = null
if (doomedWorkflowIds.length > 0) {
const doomedChats = await db
.select({ id: copilotChats.id })
.from(copilotChats)
.where(inArray(copilotChats.workflowId, doomedWorkflowIds))
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
const doomedChats = await selectRowsByIdChunks(doomedWorkflowIds, (chunkIds, chunkLimit) =>
db
.select({ id: copilotChats.id })
.from(copilotChats)
.where(inArray(copilotChats.workflowId, chunkIds))
.limit(chunkLimit)
)
const doomedChatIds = doomedChats.map((c) => c.id)
if (doomedChatIds.length > 0) {

View File

@@ -13,9 +13,8 @@ import { and, inArray, lt, sql } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
DEFAULT_BATCH_SIZE,
DEFAULT_MAX_BATCHES_PER_TABLE,
deleteRowsById,
selectRowsByIdChunks,
type TableCleanupResult,
} from '@/lib/cleanup/batch-delete'
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
@@ -67,13 +66,15 @@ async function cleanupRunChildren(
): Promise<TableCleanupResult[]> {
if (workspaceIds.length === 0) return []
const runIds = await db
.select({ id: copilotRuns.id })
.from(copilotRuns)
.where(
and(inArray(copilotRuns.workspaceId, workspaceIds), lt(copilotRuns.updatedAt, retentionDate))
)
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
const runIds = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: copilotRuns.id })
.from(copilotRuns)
.where(
and(inArray(copilotRuns.workspaceId, chunkIds), lt(copilotRuns.updatedAt, retentionDate))
)
.limit(chunkLimit)
)
if (runIds.length === 0) {
return RUN_CHILD_TABLES.map((t) => ({ table: `${label}/${t.name}`, deleted: 0, failed: 0 }))
@@ -107,17 +108,15 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void>
`[${label}] Processing ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
)
// Collect chat IDs before deleting so we can clean up the copilot backend after
const doomedChats = await db
.select({ id: copilotChats.id })
.from(copilotChats)
.where(
and(
inArray(copilotChats.workspaceId, workspaceIds),
lt(copilotChats.updatedAt, retentionDate)
const doomedChats = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: copilotChats.id })
.from(copilotChats)
.where(
and(inArray(copilotChats.workspaceId, chunkIds), lt(copilotChats.updatedAt, retentionDate))
)
)
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
.limit(chunkLimit)
)
const doomedChatIds = doomedChats.map((c) => c.id)

View File

@@ -7,6 +7,55 @@ const logger = createLogger('BatchDelete')
export const DEFAULT_BATCH_SIZE = 2000
export const DEFAULT_MAX_BATCHES_PER_TABLE = 10
/**
* Split workspaceIds into this-sized groups before running SELECT/DELETE. Large
* IN lists combined with `started_at < X` force Postgres to probe every
* workspace range in the composite index, which blows the 90s statement timeout
* at the scale of the full free tier.
*/
export const DEFAULT_WORKSPACE_CHUNK_SIZE = 50
export function chunkArray<T>(arr: T[], size: number): T[][] {
const out: T[][] = []
for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size))
return out
}
export interface SelectByIdChunksOptions {
/** Cap on rows returned across all chunks. Defaults to a full per-table cleanup budget. */
overallLimit?: number
chunkSize?: number
}
/**
* Run a SELECT query once per ID chunk and concatenate results up to
* `overallLimit`. Each chunk's query is passed the remaining row budget so the
* total never exceeds the cap. Use this when you need the selected row set
* (e.g. to drive S3 or copilot-backend cleanup alongside the DB delete).
*
* Works for any large ID set — workspace IDs, workflow IDs, etc. Avoids
* sending one massive `IN (...)` list that would blow Postgres's statement
* timeout.
*/
export async function selectRowsByIdChunks<T>(
ids: string[],
query: (chunkIds: string[], chunkLimit: number) => Promise<T[]>,
{
overallLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE,
chunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE,
}: SelectByIdChunksOptions = {}
): Promise<T[]> {
if (ids.length === 0) return []
const rows: T[] = []
for (const chunkIds of chunkArray(ids, chunkSize)) {
if (rows.length >= overallLimit) break
const remaining = overallLimit - rows.length
const chunkRows = await query(chunkIds, remaining)
rows.push(...chunkRows)
}
return rows
}
export interface TableCleanupResult {
table: string
@@ -14,6 +63,111 @@ export interface TableCleanupResult {
failed: number
}
export interface ChunkedBatchDeleteOptions<TRow extends { id: string }> {
tableDef: PgTable
workspaceIds: string[]
tableName: string
/** SELECT eligible rows for one workspace chunk. The result must include `id`. */
selectChunk: (chunkIds: string[], limit: number) => Promise<TRow[]>
/** Runs between SELECT and DELETE; receives the just-selected rows. */
onBatch?: (rows: TRow[]) => Promise<void>
batchSize?: number
/** Max batches per workspace chunk. */
maxBatches?: number
/**
* Hard cap on rows processed (deleted + failed) across all chunks per call.
* Defaults to `DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE`. Cron
* runs frequently enough to catch up the backlog over multiple invocations.
*/
totalRowLimit?: number
workspaceChunkSize?: number
}
/**
* Inner loop primitive for cleanup jobs.
*
* For each workspace chunk: SELECT a batch of eligible rows → run optional
* `onBatch` hook (e.g. to delete S3 files) → DELETE those rows by ID. Repeats
* until exhausted or `maxBatches` is hit, then moves to the next chunk. Stops
* the whole call once `totalRowLimit` rows have been processed.
*
* Workspace IDs are chunked before the SELECT — see
* `DEFAULT_WORKSPACE_CHUNK_SIZE` for why.
*/
export async function chunkedBatchDelete<TRow extends { id: string }>({
tableDef,
workspaceIds,
tableName,
selectChunk,
onBatch,
batchSize = DEFAULT_BATCH_SIZE,
maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE,
totalRowLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE,
workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE,
}: ChunkedBatchDeleteOptions<TRow>): Promise<TableCleanupResult> {
const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 }
if (workspaceIds.length === 0) {
logger.info(`[${tableName}] Skipped — no workspaces in scope`)
return result
}
const chunks = chunkArray(workspaceIds, workspaceChunkSize)
let stoppedEarly = false
for (const [chunkIdx, chunkIds] of chunks.entries()) {
if (result.deleted + result.failed >= totalRowLimit) {
stoppedEarly = true
break
}
let batchesProcessed = 0
let hasMore = true
while (
hasMore &&
batchesProcessed < maxBatches &&
result.deleted + result.failed < totalRowLimit
) {
let rows: TRow[] = []
try {
rows = await selectChunk(chunkIds, batchSize)
if (rows.length === 0) {
hasMore = false
break
}
if (onBatch) await onBatch(rows)
const ids = rows.map((r) => r.id)
const deleted = await db
.delete(tableDef)
.where(inArray(sql`id`, ids))
.returning({ id: sql`id` })
result.deleted += deleted.length
hasMore = rows.length === batchSize
batchesProcessed++
} catch (error) {
// Count rows we tried to delete; SELECT-stage errors leave rows=[].
result.failed += rows.length
logger.error(
`[${tableName}] Batch failed (chunk ${chunkIdx + 1}/${chunks.length}, ${rows.length} rows):`,
{ error }
)
hasMore = false
}
}
}
logger.info(
`[${tableName}] Complete: ${result.deleted} deleted, ${result.failed} failed across ${chunks.length} chunks${stoppedEarly ? ' (row-limit reached, remaining chunks deferred to next run)' : ''}`
)
return result
}
export interface BatchDeleteOptions {
tableDef: PgTable
workspaceIdCol: PgColumn
@@ -25,13 +179,13 @@ export interface BatchDeleteOptions {
requireTimestampNotNull?: boolean
batchSize?: number
maxBatches?: number
workspaceChunkSize?: number
}
/**
* Iteratively delete rows in a table matching a workspace + time-based predicate.
*
* Uses a SELECT-with-LIMIT → DELETE-by-ID pattern to keep each round bounded in
* memory and I/O (PostgreSQL DELETE does not support LIMIT directly).
* Convenience wrapper around `chunkedBatchDelete` for the common case: delete
* rows where `workspaceId IN (...) AND timestamp < retentionDate`. Use this
* when there's no per-row side effect (e.g. no S3 files to clean up alongside).
*/
export async function batchDeleteByWorkspaceAndTimestamp({
tableDef,
@@ -41,56 +195,23 @@ export async function batchDeleteByWorkspaceAndTimestamp({
retentionDate,
tableName,
requireTimestampNotNull = false,
batchSize = DEFAULT_BATCH_SIZE,
maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE,
...rest
}: BatchDeleteOptions): Promise<TableCleanupResult> {
const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 }
if (workspaceIds.length === 0) {
logger.info(`[${tableName}] Skipped — no workspaces in scope`)
return result
}
const predicates = [inArray(workspaceIdCol, workspaceIds), lt(timestampCol, retentionDate)]
if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol))
const whereClause = and(...predicates)
let batchesProcessed = 0
let hasMore = true
while (hasMore && batchesProcessed < maxBatches) {
try {
const batch = await db
return chunkedBatchDelete({
tableDef,
workspaceIds,
tableName,
selectChunk: (chunkIds, limit) => {
const predicates = [inArray(workspaceIdCol, chunkIds), lt(timestampCol, retentionDate)]
if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol))
return db
.select({ id: sql<string>`id` })
.from(tableDef)
.where(whereClause)
.limit(batchSize)
if (batch.length === 0) {
logger.info(`[${tableName}] No expired rows found`)
hasMore = false
break
}
const ids = batch.map((r) => r.id)
const deleted = await db
.delete(tableDef)
.where(inArray(sql`id`, ids))
.returning({ id: sql`id` })
result.deleted += deleted.length
hasMore = batch.length === batchSize
batchesProcessed++
logger.info(`[${tableName}] Batch ${batchesProcessed}: deleted ${deleted.length} rows`)
} catch (error) {
result.failed++
logger.error(`[${tableName}] Batch delete failed:`, { error })
hasMore = false
}
}
return result
.where(and(...predicates))
.limit(limit)
},
...rest,
})
}
/**