Compare commits

...

5 Commits

2 changed files with 133 additions and 74 deletions

View File

@@ -6,13 +6,14 @@ import {
knowledgeConnectorSyncLog,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray, isNull, lt, ne, sql } from 'drizzle-orm'
import { and, eq, gt, inArray, isNull, lt, ne, or, sql } from 'drizzle-orm'
import { decryptApiKey } from '@/lib/api-key/crypto'
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import type { DocumentData } from '@/lib/knowledge/documents/service'
import {
hardDeleteDocuments,
isTriggerAvailable,
processDocumentAsync,
processDocumentsWithQueue,
} from '@/lib/knowledge/documents/service'
import { StorageService } from '@/lib/uploads'
import { deleteFile } from '@/lib/uploads/core/storage-service'
@@ -39,6 +40,8 @@ class ConnectorDeletedException extends Error {
const SYNC_BATCH_SIZE = 5
const MAX_PAGES = 500
const MAX_SAFE_TITLE_LENGTH = 200
const STALE_PROCESSING_MINUTES = 45
const RETRY_WINDOW_DAYS = 7
/** Sanitizes a document title for use in S3 storage keys. */
function sanitizeStorageTitle(title: string): string {
@@ -147,11 +150,14 @@ export async function dispatchSync(
const requestId = options?.requestId ?? crypto.randomUUID()
if (isTriggerAvailable()) {
await knowledgeConnectorSync.trigger({
connectorId,
fullSync: options?.fullSync,
requestId,
})
await knowledgeConnectorSync.trigger(
{
connectorId,
fullSync: options?.fullSync,
requestId,
},
{ tags: [`connector:${connectorId}`] }
)
logger.info(`Dispatched connector sync to Trigger.dev`, { connectorId, requestId })
} else {
executeSync(connectorId, { fullSync: options?.fullSync }).catch((error) => {
@@ -500,9 +506,11 @@ export async function executeSync(
})
)
const batchDocs: DocumentData[] = []
for (let j = 0; j < settled.length; j++) {
const outcome = settled[j]
if (outcome.status === 'fulfilled') {
batchDocs.push(outcome.value)
if (batch[j].type === 'add') result.docsAdded++
else result.docsUpdated++
} else {
@@ -515,6 +523,26 @@ export async function executeSync(
})
}
}
// Enqueue this batch for processing immediately so that even if the sync
// task times out, all documents inserted up to this point are already
// queued for processing and won't be orphaned.
if (batchDocs.length > 0) {
try {
await processDocumentsWithQueue(
batchDocs,
connector.knowledgeBaseId,
{},
crypto.randomUUID()
)
} catch (error) {
logger.warn('Failed to enqueue batch for processing — will retry on next sync', {
connectorId,
count: batchDocs.length,
error: error instanceof Error ? error.message : String(error),
})
}
}
}
// Skip deletion reconciliation during incremental syncs — results only contain changed docs
@@ -537,9 +565,14 @@ export async function executeSync(
throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`)
}
// Retry stuck documents that failed or never completed processing.
// Retry stuck documents that failed, never started, or were abandoned mid-processing.
// Only retry docs uploaded BEFORE this sync — docs added in the current sync
// are still processing asynchronously and would cause a duplicate processing race.
// Documents stuck in 'processing' beyond STALE_PROCESSING_MINUTES are considered
// abandoned (e.g. the Trigger.dev task process exited before processing completed).
// Documents uploaded more than RETRY_WINDOW_DAYS ago are not retried.
const staleProcessingCutoff = new Date(Date.now() - STALE_PROCESSING_MINUTES * 60 * 1000)
const retryCutoff = new Date(Date.now() - RETRY_WINDOW_DAYS * 24 * 60 * 60 * 1000)
const stuckDocs = await db
.select({
id: document.id,
@@ -552,8 +585,18 @@ export async function executeSync(
.where(
and(
eq(document.connectorId, connectorId),
inArray(document.processingStatus, ['pending', 'failed']),
or(
inArray(document.processingStatus, ['pending', 'failed']),
and(
eq(document.processingStatus, 'processing'),
or(
isNull(document.processingStartedAt),
lt(document.processingStartedAt, staleProcessingCutoff)
)
)
),
lt(document.uploadedAt, syncStartedAt),
gt(document.uploadedAt, retryCutoff),
eq(document.userExcluded, false),
isNull(document.archivedAt),
isNull(document.deletedAt)
@@ -562,28 +605,42 @@ export async function executeSync(
if (stuckDocs.length > 0) {
logger.info(`Retrying ${stuckDocs.length} stuck documents`, { connectorId })
for (const doc of stuckDocs) {
processDocumentAsync(
connector.knowledgeBaseId,
doc.id,
{
try {
await processDocumentsWithQueue(
stuckDocs.map((doc) => ({
documentId: doc.id,
filename: doc.filename ?? 'document.txt',
fileUrl: doc.fileUrl ?? '',
fileSize: doc.fileSize ?? 0,
mimeType: doc.mimeType ?? 'text/plain',
},
{}
).catch((error) => {
logger.warn('Failed to retry stuck document', {
documentId: doc.id,
error: error instanceof Error ? error.message : String(error),
})
})),
connector.knowledgeBaseId,
{},
crypto.randomUUID()
)
} catch (error) {
logger.warn('Failed to enqueue stuck documents for reprocessing', {
connectorId,
count: stuckDocs.length,
error: error instanceof Error ? error.message : String(error),
})
}
}
await completeSyncLog(syncLogId, 'completed', result)
const [{ count: actualDocCount }] = await db
.select({ count: sql<number>`count(*)::int` })
.from(document)
.where(
and(
eq(document.connectorId, connectorId),
eq(document.userExcluded, false),
isNull(document.archivedAt),
isNull(document.deletedAt)
)
)
const now = new Date()
await db
.update(knowledgeConnector)
@@ -591,7 +648,7 @@ export async function executeSync(
status: 'active',
lastSyncAt: now,
lastSyncError: null,
lastSyncDocCount: externalDocs.length,
lastSyncDocCount: actualDocCount,
nextSyncAt: calculateNextSyncTime(connector.syncIntervalMinutes),
consecutiveFailures: 0,
updatedAt: now,
@@ -711,7 +768,7 @@ async function addDocument(
connectorType: string,
extDoc: ExternalDocument,
sourceConfig?: Record<string, unknown>
): Promise<void> {
): Promise<DocumentData> {
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
}
@@ -773,23 +830,13 @@ async function addDocument(
throw error
}
processDocumentAsync(
knowledgeBaseId,
return {
documentId,
{
filename: processingFilename,
fileUrl,
fileSize: contentBuffer.length,
mimeType: 'text/plain',
},
{}
).catch((error) => {
logger.error('Failed to process connector document', {
documentId,
connectorId,
error: error instanceof Error ? error.message : String(error),
})
})
filename: processingFilename,
fileUrl,
fileSize: contentBuffer.length,
mimeType: 'text/plain',
}
}
/**
@@ -803,7 +850,7 @@ async function updateDocument(
connectorType: string,
extDoc: ExternalDocument,
sourceConfig?: Record<string, unknown>
): Promise<void> {
): Promise<DocumentData> {
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
}
@@ -894,21 +941,11 @@ async function updateDocument(
}
}
processDocumentAsync(
knowledgeBaseId,
existingDocId,
{
filename: processingFilename,
fileUrl,
fileSize: contentBuffer.length,
mimeType: 'text/plain',
},
{}
).catch((error) => {
logger.error('Failed to re-process updated connector document', {
documentId: existingDocId,
connectorId,
error: error instanceof Error ? error.message : String(error),
})
})
return {
documentId: existingDocId,
filename: processingFilename,
fileUrl,
fileSize: contentBuffer.length,
mimeType: 'text/plain',
}
}

View File

@@ -114,11 +114,11 @@ export interface DocumentData {
}
export interface ProcessingOptions {
chunkSize: number
minCharactersPerChunk: number
recipe: string
lang: string
chunkOverlap: number
chunkSize?: number
minCharactersPerChunk?: number
recipe?: string
lang?: string
chunkOverlap?: number
}
export interface DocumentJobData {
@@ -668,7 +668,7 @@ export function isTriggerAvailable(): boolean {
export async function processDocumentsWithTrigger(
documents: DocumentProcessingPayload[],
requestId: string
): Promise<{ success: boolean; message: string; jobIds?: string[] }> {
): Promise<{ success: boolean; message: string; batchIds?: string[] }> {
if (!isTriggerAvailable()) {
throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing')
}
@@ -676,19 +676,32 @@ export async function processDocumentsWithTrigger(
try {
logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`)
const jobPromises = documents.map(async (document) => {
const job = await tasks.trigger('knowledge-process-document', document)
return job.id
})
const MAX_BATCH_SIZE = 1000
const batchIds: string[] = []
const jobIds = await Promise.all(jobPromises)
for (let i = 0; i < documents.length; i += MAX_BATCH_SIZE) {
const chunk = documents.slice(i, i + MAX_BATCH_SIZE)
const batchResult = await tasks.batchTrigger(
'knowledge-process-document',
chunk.map((doc) => ({
payload: doc,
options: {
idempotencyKey: `doc-process-${doc.documentId}-${requestId}`,
tags: [`kb:${doc.knowledgeBaseId}`, `doc:${doc.documentId}`],
},
}))
)
batchIds.push(batchResult.batchId)
}
logger.info(`[${requestId}] Triggered ${jobIds.length} document processing jobs`)
logger.info(
`[${requestId}] Triggered ${documents.length} document processing jobs in ${batchIds.length} batch(es)`
)
return {
success: true,
message: `${documents.length} document processing jobs triggered`,
jobIds,
batchIds,
}
} catch (error) {
logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error)
@@ -1590,10 +1603,19 @@ export async function retryDocumentProcessing(
chunkOverlap: kbConfig.overlap,
}
processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions).catch(
(error: unknown) => {
logger.error(`[${requestId}] Background retry processing error:`, error)
}
await processDocumentsWithQueue(
[
{
documentId,
filename: docData.filename,
fileUrl: docData.fileUrl,
fileSize: docData.fileSize,
mimeType: docData.mimeType,
},
],
knowledgeBaseId,
processingOptions,
requestId
)
logger.info(`[${requestId}] Document retry initiated: ${documentId}`)