mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-28 03:00:29 -04:00
* feat(child-workflows): nested execution snapshots * cleanup typing * address bugbot comments and fix tests * do not cascade delete logs/snapshots * fix few more inconsitencies * fix external logs route * add fallback color
215 lines
7.5 KiB
TypeScript
215 lines
7.5 KiB
TypeScript
import { db } from '@sim/db'
|
|
import { subscription, user, workflowExecutionLogs, workspace } from '@sim/db/schema'
|
|
import { createLogger } from '@sim/logger'
|
|
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
|
|
import { type NextRequest, NextResponse } from 'next/server'
|
|
import { verifyCronAuth } from '@/lib/auth/internal'
|
|
import { env } from '@/lib/core/config/env'
|
|
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
|
|
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
|
|
|
|
export const dynamic = 'force-dynamic'
|
|
|
|
const logger = createLogger('LogsCleanupAPI')
|
|
|
|
const BATCH_SIZE = 2000
|
|
|
|
export async function GET(request: NextRequest) {
|
|
try {
|
|
const authError = verifyCronAuth(request, 'logs cleanup')
|
|
if (authError) {
|
|
return authError
|
|
}
|
|
|
|
const retentionDate = new Date()
|
|
retentionDate.setDate(retentionDate.getDate() - Number(env.FREE_PLAN_LOG_RETENTION_DAYS || '7'))
|
|
|
|
const freeUsers = await db
|
|
.select({ userId: user.id })
|
|
.from(user)
|
|
.leftJoin(
|
|
subscription,
|
|
sql`${user.id} = ${subscription.referenceId} AND ${subscription.status} = 'active' AND ${subscription.plan} IN ('pro', 'team', 'enterprise')`
|
|
)
|
|
.where(sql`${subscription.id} IS NULL`)
|
|
|
|
if (freeUsers.length === 0) {
|
|
logger.info('No free users found for log cleanup')
|
|
return NextResponse.json({ message: 'No free users found for cleanup' })
|
|
}
|
|
|
|
const freeUserIds = freeUsers.map((u) => u.userId)
|
|
|
|
const workspacesQuery = await db
|
|
.select({ id: workspace.id })
|
|
.from(workspace)
|
|
.where(inArray(workspace.billedAccountUserId, freeUserIds))
|
|
|
|
if (workspacesQuery.length === 0) {
|
|
logger.info('No workspaces found for free users')
|
|
return NextResponse.json({ message: 'No workspaces found for cleanup' })
|
|
}
|
|
|
|
const workspaceIds = workspacesQuery.map((w) => w.id)
|
|
|
|
const results = {
|
|
enhancedLogs: {
|
|
total: 0,
|
|
archived: 0,
|
|
archiveFailed: 0,
|
|
deleted: 0,
|
|
deleteFailed: 0,
|
|
},
|
|
files: {
|
|
total: 0,
|
|
deleted: 0,
|
|
deleteFailed: 0,
|
|
},
|
|
snapshots: {
|
|
cleaned: 0,
|
|
cleanupFailed: 0,
|
|
},
|
|
}
|
|
|
|
const startTime = Date.now()
|
|
const MAX_BATCHES = 10
|
|
|
|
let batchesProcessed = 0
|
|
let hasMoreLogs = true
|
|
|
|
logger.info(`Starting enhanced logs cleanup for ${workspaceIds.length} workspaces`)
|
|
|
|
while (hasMoreLogs && batchesProcessed < MAX_BATCHES) {
|
|
const oldEnhancedLogs = await db
|
|
.select({
|
|
id: workflowExecutionLogs.id,
|
|
workflowId: workflowExecutionLogs.workflowId,
|
|
executionId: workflowExecutionLogs.executionId,
|
|
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
|
|
level: workflowExecutionLogs.level,
|
|
trigger: workflowExecutionLogs.trigger,
|
|
startedAt: workflowExecutionLogs.startedAt,
|
|
endedAt: workflowExecutionLogs.endedAt,
|
|
totalDurationMs: workflowExecutionLogs.totalDurationMs,
|
|
executionData: workflowExecutionLogs.executionData,
|
|
cost: workflowExecutionLogs.cost,
|
|
files: workflowExecutionLogs.files,
|
|
createdAt: workflowExecutionLogs.createdAt,
|
|
})
|
|
.from(workflowExecutionLogs)
|
|
.where(
|
|
and(
|
|
inArray(workflowExecutionLogs.workspaceId, workspaceIds),
|
|
lt(workflowExecutionLogs.createdAt, retentionDate)
|
|
)
|
|
)
|
|
.limit(BATCH_SIZE)
|
|
|
|
results.enhancedLogs.total += oldEnhancedLogs.length
|
|
|
|
for (const log of oldEnhancedLogs) {
|
|
const today = new Date().toISOString().split('T')[0]
|
|
|
|
const enhancedLogKey = `logs/archived/${today}/${log.id}.json`
|
|
const enhancedLogData = JSON.stringify({
|
|
...log,
|
|
archivedAt: new Date().toISOString(),
|
|
logType: 'enhanced',
|
|
})
|
|
|
|
try {
|
|
await StorageService.uploadFile({
|
|
file: Buffer.from(enhancedLogData),
|
|
fileName: enhancedLogKey,
|
|
contentType: 'application/json',
|
|
context: 'logs',
|
|
preserveKey: true,
|
|
customKey: enhancedLogKey,
|
|
metadata: {
|
|
logId: String(log.id),
|
|
workflowId: String(log.workflowId ?? ''),
|
|
executionId: String(log.executionId),
|
|
logType: 'enhanced',
|
|
archivedAt: new Date().toISOString(),
|
|
},
|
|
})
|
|
|
|
results.enhancedLogs.archived++
|
|
|
|
if (isUsingCloudStorage() && log.files && Array.isArray(log.files)) {
|
|
for (const file of log.files) {
|
|
if (file && typeof file === 'object' && file.key) {
|
|
results.files.total++
|
|
try {
|
|
await StorageService.deleteFile({
|
|
key: file.key,
|
|
context: 'execution',
|
|
})
|
|
results.files.deleted++
|
|
|
|
// Also delete from workspace_files table
|
|
const { deleteFileMetadata } = await import('@/lib/uploads/server/metadata')
|
|
await deleteFileMetadata(file.key)
|
|
|
|
logger.info(`Deleted execution file: ${file.key}`)
|
|
} catch (fileError) {
|
|
results.files.deleteFailed++
|
|
logger.error(`Failed to delete file ${file.key}:`, { fileError })
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
try {
|
|
const deleteResult = await db
|
|
.delete(workflowExecutionLogs)
|
|
.where(eq(workflowExecutionLogs.id, log.id))
|
|
.returning({ id: workflowExecutionLogs.id })
|
|
|
|
if (deleteResult.length > 0) {
|
|
results.enhancedLogs.deleted++
|
|
} else {
|
|
results.enhancedLogs.deleteFailed++
|
|
logger.warn(`Failed to delete log ${log.id} after archiving: No rows deleted`)
|
|
}
|
|
} catch (deleteError) {
|
|
results.enhancedLogs.deleteFailed++
|
|
logger.error(`Error deleting log ${log.id} after archiving:`, { deleteError })
|
|
}
|
|
} catch (archiveError) {
|
|
results.enhancedLogs.archiveFailed++
|
|
logger.error(`Failed to archive log ${log.id}:`, { archiveError })
|
|
}
|
|
}
|
|
|
|
batchesProcessed++
|
|
hasMoreLogs = oldEnhancedLogs.length === BATCH_SIZE
|
|
|
|
logger.info(`Processed logs batch ${batchesProcessed}: ${oldEnhancedLogs.length} logs`)
|
|
}
|
|
|
|
try {
|
|
const snapshotRetentionDays = Number(env.FREE_PLAN_LOG_RETENTION_DAYS || '7') + 1 // Keep snapshots 1 day longer
|
|
const cleanedSnapshots = await snapshotService.cleanupOrphanedSnapshots(snapshotRetentionDays)
|
|
results.snapshots.cleaned = cleanedSnapshots
|
|
logger.info(`Cleaned up ${cleanedSnapshots} orphaned snapshots`)
|
|
} catch (snapshotError) {
|
|
results.snapshots.cleanupFailed = 1
|
|
logger.error('Error cleaning up orphaned snapshots:', { snapshotError })
|
|
}
|
|
|
|
const timeElapsed = (Date.now() - startTime) / 1000
|
|
const reachedLimit = batchesProcessed >= MAX_BATCHES && hasMoreLogs
|
|
|
|
return NextResponse.json({
|
|
message: `Processed ${batchesProcessed} enhanced log batches (${results.enhancedLogs.total} logs, ${results.files.total} files) in ${timeElapsed.toFixed(2)}s${reachedLimit ? ' (batch limit reached)' : ''}`,
|
|
results,
|
|
complete: !hasMoreLogs,
|
|
batchLimitReached: reachedLimit,
|
|
})
|
|
} catch (error) {
|
|
logger.error('Error in log cleanup process:', { error })
|
|
return NextResponse.json({ error: 'Failed to process log cleanup' }, { status: 500 })
|
|
}
|
|
}
|