mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
Compare commits
5 Commits
dev
...
waleedlati
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e8c20f45c9 | ||
|
|
b22c2869f6 | ||
|
|
85d9fcb216 | ||
|
|
74ff2b5804 | ||
|
|
0e8a2d7e7c |
@@ -6,13 +6,14 @@ import {
|
||||
knowledgeConnectorSyncLog,
|
||||
} from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq, inArray, isNull, lt, ne, sql } from 'drizzle-orm'
|
||||
import { and, eq, gt, inArray, isNull, lt, ne, or, sql } from 'drizzle-orm'
|
||||
import { decryptApiKey } from '@/lib/api-key/crypto'
|
||||
import { getInternalApiBaseUrl } from '@/lib/core/utils/urls'
|
||||
import type { DocumentData } from '@/lib/knowledge/documents/service'
|
||||
import {
|
||||
hardDeleteDocuments,
|
||||
isTriggerAvailable,
|
||||
processDocumentAsync,
|
||||
processDocumentsWithQueue,
|
||||
} from '@/lib/knowledge/documents/service'
|
||||
import { StorageService } from '@/lib/uploads'
|
||||
import { deleteFile } from '@/lib/uploads/core/storage-service'
|
||||
@@ -39,6 +40,8 @@ class ConnectorDeletedException extends Error {
|
||||
const SYNC_BATCH_SIZE = 5
|
||||
const MAX_PAGES = 500
|
||||
const MAX_SAFE_TITLE_LENGTH = 200
|
||||
const STALE_PROCESSING_MINUTES = 45
|
||||
const RETRY_WINDOW_DAYS = 7
|
||||
|
||||
/** Sanitizes a document title for use in S3 storage keys. */
|
||||
function sanitizeStorageTitle(title: string): string {
|
||||
@@ -147,11 +150,14 @@ export async function dispatchSync(
|
||||
const requestId = options?.requestId ?? crypto.randomUUID()
|
||||
|
||||
if (isTriggerAvailable()) {
|
||||
await knowledgeConnectorSync.trigger({
|
||||
connectorId,
|
||||
fullSync: options?.fullSync,
|
||||
requestId,
|
||||
})
|
||||
await knowledgeConnectorSync.trigger(
|
||||
{
|
||||
connectorId,
|
||||
fullSync: options?.fullSync,
|
||||
requestId,
|
||||
},
|
||||
{ tags: [`connector:${connectorId}`] }
|
||||
)
|
||||
logger.info(`Dispatched connector sync to Trigger.dev`, { connectorId, requestId })
|
||||
} else {
|
||||
executeSync(connectorId, { fullSync: options?.fullSync }).catch((error) => {
|
||||
@@ -500,9 +506,11 @@ export async function executeSync(
|
||||
})
|
||||
)
|
||||
|
||||
const batchDocs: DocumentData[] = []
|
||||
for (let j = 0; j < settled.length; j++) {
|
||||
const outcome = settled[j]
|
||||
if (outcome.status === 'fulfilled') {
|
||||
batchDocs.push(outcome.value)
|
||||
if (batch[j].type === 'add') result.docsAdded++
|
||||
else result.docsUpdated++
|
||||
} else {
|
||||
@@ -515,6 +523,26 @@ export async function executeSync(
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue this batch for processing immediately so that even if the sync
|
||||
// task times out, all documents inserted up to this point are already
|
||||
// queued for processing and won't be orphaned.
|
||||
if (batchDocs.length > 0) {
|
||||
try {
|
||||
await processDocumentsWithQueue(
|
||||
batchDocs,
|
||||
connector.knowledgeBaseId,
|
||||
{},
|
||||
crypto.randomUUID()
|
||||
)
|
||||
} catch (error) {
|
||||
logger.warn('Failed to enqueue batch for processing — will retry on next sync', {
|
||||
connectorId,
|
||||
count: batchDocs.length,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Skip deletion reconciliation during incremental syncs — results only contain changed docs
|
||||
@@ -537,9 +565,14 @@ export async function executeSync(
|
||||
throw new Error(`Knowledge base ${connector.knowledgeBaseId} was deleted during sync`)
|
||||
}
|
||||
|
||||
// Retry stuck documents that failed or never completed processing.
|
||||
// Retry stuck documents that failed, never started, or were abandoned mid-processing.
|
||||
// Only retry docs uploaded BEFORE this sync — docs added in the current sync
|
||||
// are still processing asynchronously and would cause a duplicate processing race.
|
||||
// Documents stuck in 'processing' beyond STALE_PROCESSING_MINUTES are considered
|
||||
// abandoned (e.g. the Trigger.dev task process exited before processing completed).
|
||||
// Documents uploaded more than RETRY_WINDOW_DAYS ago are not retried.
|
||||
const staleProcessingCutoff = new Date(Date.now() - STALE_PROCESSING_MINUTES * 60 * 1000)
|
||||
const retryCutoff = new Date(Date.now() - RETRY_WINDOW_DAYS * 24 * 60 * 60 * 1000)
|
||||
const stuckDocs = await db
|
||||
.select({
|
||||
id: document.id,
|
||||
@@ -552,8 +585,18 @@ export async function executeSync(
|
||||
.where(
|
||||
and(
|
||||
eq(document.connectorId, connectorId),
|
||||
inArray(document.processingStatus, ['pending', 'failed']),
|
||||
or(
|
||||
inArray(document.processingStatus, ['pending', 'failed']),
|
||||
and(
|
||||
eq(document.processingStatus, 'processing'),
|
||||
or(
|
||||
isNull(document.processingStartedAt),
|
||||
lt(document.processingStartedAt, staleProcessingCutoff)
|
||||
)
|
||||
)
|
||||
),
|
||||
lt(document.uploadedAt, syncStartedAt),
|
||||
gt(document.uploadedAt, retryCutoff),
|
||||
eq(document.userExcluded, false),
|
||||
isNull(document.archivedAt),
|
||||
isNull(document.deletedAt)
|
||||
@@ -562,28 +605,42 @@ export async function executeSync(
|
||||
|
||||
if (stuckDocs.length > 0) {
|
||||
logger.info(`Retrying ${stuckDocs.length} stuck documents`, { connectorId })
|
||||
for (const doc of stuckDocs) {
|
||||
processDocumentAsync(
|
||||
connector.knowledgeBaseId,
|
||||
doc.id,
|
||||
{
|
||||
try {
|
||||
await processDocumentsWithQueue(
|
||||
stuckDocs.map((doc) => ({
|
||||
documentId: doc.id,
|
||||
filename: doc.filename ?? 'document.txt',
|
||||
fileUrl: doc.fileUrl ?? '',
|
||||
fileSize: doc.fileSize ?? 0,
|
||||
mimeType: doc.mimeType ?? 'text/plain',
|
||||
},
|
||||
{}
|
||||
).catch((error) => {
|
||||
logger.warn('Failed to retry stuck document', {
|
||||
documentId: doc.id,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
})),
|
||||
connector.knowledgeBaseId,
|
||||
{},
|
||||
crypto.randomUUID()
|
||||
)
|
||||
} catch (error) {
|
||||
logger.warn('Failed to enqueue stuck documents for reprocessing', {
|
||||
connectorId,
|
||||
count: stuckDocs.length,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
await completeSyncLog(syncLogId, 'completed', result)
|
||||
|
||||
const [{ count: actualDocCount }] = await db
|
||||
.select({ count: sql<number>`count(*)::int` })
|
||||
.from(document)
|
||||
.where(
|
||||
and(
|
||||
eq(document.connectorId, connectorId),
|
||||
eq(document.userExcluded, false),
|
||||
isNull(document.archivedAt),
|
||||
isNull(document.deletedAt)
|
||||
)
|
||||
)
|
||||
|
||||
const now = new Date()
|
||||
await db
|
||||
.update(knowledgeConnector)
|
||||
@@ -591,7 +648,7 @@ export async function executeSync(
|
||||
status: 'active',
|
||||
lastSyncAt: now,
|
||||
lastSyncError: null,
|
||||
lastSyncDocCount: externalDocs.length,
|
||||
lastSyncDocCount: actualDocCount,
|
||||
nextSyncAt: calculateNextSyncTime(connector.syncIntervalMinutes),
|
||||
consecutiveFailures: 0,
|
||||
updatedAt: now,
|
||||
@@ -711,7 +768,7 @@ async function addDocument(
|
||||
connectorType: string,
|
||||
extDoc: ExternalDocument,
|
||||
sourceConfig?: Record<string, unknown>
|
||||
): Promise<void> {
|
||||
): Promise<DocumentData> {
|
||||
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
|
||||
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
|
||||
}
|
||||
@@ -773,23 +830,13 @@ async function addDocument(
|
||||
throw error
|
||||
}
|
||||
|
||||
processDocumentAsync(
|
||||
knowledgeBaseId,
|
||||
return {
|
||||
documentId,
|
||||
{
|
||||
filename: processingFilename,
|
||||
fileUrl,
|
||||
fileSize: contentBuffer.length,
|
||||
mimeType: 'text/plain',
|
||||
},
|
||||
{}
|
||||
).catch((error) => {
|
||||
logger.error('Failed to process connector document', {
|
||||
documentId,
|
||||
connectorId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
})
|
||||
filename: processingFilename,
|
||||
fileUrl,
|
||||
fileSize: contentBuffer.length,
|
||||
mimeType: 'text/plain',
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -803,7 +850,7 @@ async function updateDocument(
|
||||
connectorType: string,
|
||||
extDoc: ExternalDocument,
|
||||
sourceConfig?: Record<string, unknown>
|
||||
): Promise<void> {
|
||||
): Promise<DocumentData> {
|
||||
if (await isKnowledgeBaseDeleted(knowledgeBaseId)) {
|
||||
throw new Error(`Knowledge base ${knowledgeBaseId} is deleted`)
|
||||
}
|
||||
@@ -894,21 +941,11 @@ async function updateDocument(
|
||||
}
|
||||
}
|
||||
|
||||
processDocumentAsync(
|
||||
knowledgeBaseId,
|
||||
existingDocId,
|
||||
{
|
||||
filename: processingFilename,
|
||||
fileUrl,
|
||||
fileSize: contentBuffer.length,
|
||||
mimeType: 'text/plain',
|
||||
},
|
||||
{}
|
||||
).catch((error) => {
|
||||
logger.error('Failed to re-process updated connector document', {
|
||||
documentId: existingDocId,
|
||||
connectorId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
})
|
||||
return {
|
||||
documentId: existingDocId,
|
||||
filename: processingFilename,
|
||||
fileUrl,
|
||||
fileSize: contentBuffer.length,
|
||||
mimeType: 'text/plain',
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,11 +114,11 @@ export interface DocumentData {
|
||||
}
|
||||
|
||||
export interface ProcessingOptions {
|
||||
chunkSize: number
|
||||
minCharactersPerChunk: number
|
||||
recipe: string
|
||||
lang: string
|
||||
chunkOverlap: number
|
||||
chunkSize?: number
|
||||
minCharactersPerChunk?: number
|
||||
recipe?: string
|
||||
lang?: string
|
||||
chunkOverlap?: number
|
||||
}
|
||||
|
||||
export interface DocumentJobData {
|
||||
@@ -668,7 +668,7 @@ export function isTriggerAvailable(): boolean {
|
||||
export async function processDocumentsWithTrigger(
|
||||
documents: DocumentProcessingPayload[],
|
||||
requestId: string
|
||||
): Promise<{ success: boolean; message: string; jobIds?: string[] }> {
|
||||
): Promise<{ success: boolean; message: string; batchIds?: string[] }> {
|
||||
if (!isTriggerAvailable()) {
|
||||
throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing')
|
||||
}
|
||||
@@ -676,19 +676,32 @@ export async function processDocumentsWithTrigger(
|
||||
try {
|
||||
logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`)
|
||||
|
||||
const jobPromises = documents.map(async (document) => {
|
||||
const job = await tasks.trigger('knowledge-process-document', document)
|
||||
return job.id
|
||||
})
|
||||
const MAX_BATCH_SIZE = 1000
|
||||
const batchIds: string[] = []
|
||||
|
||||
const jobIds = await Promise.all(jobPromises)
|
||||
for (let i = 0; i < documents.length; i += MAX_BATCH_SIZE) {
|
||||
const chunk = documents.slice(i, i + MAX_BATCH_SIZE)
|
||||
const batchResult = await tasks.batchTrigger(
|
||||
'knowledge-process-document',
|
||||
chunk.map((doc) => ({
|
||||
payload: doc,
|
||||
options: {
|
||||
idempotencyKey: `doc-process-${doc.documentId}-${requestId}`,
|
||||
tags: [`kb:${doc.knowledgeBaseId}`, `doc:${doc.documentId}`],
|
||||
},
|
||||
}))
|
||||
)
|
||||
batchIds.push(batchResult.batchId)
|
||||
}
|
||||
|
||||
logger.info(`[${requestId}] Triggered ${jobIds.length} document processing jobs`)
|
||||
logger.info(
|
||||
`[${requestId}] Triggered ${documents.length} document processing jobs in ${batchIds.length} batch(es)`
|
||||
)
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: `${documents.length} document processing jobs triggered`,
|
||||
jobIds,
|
||||
batchIds,
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error)
|
||||
@@ -1590,10 +1603,19 @@ export async function retryDocumentProcessing(
|
||||
chunkOverlap: kbConfig.overlap,
|
||||
}
|
||||
|
||||
processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions).catch(
|
||||
(error: unknown) => {
|
||||
logger.error(`[${requestId}] Background retry processing error:`, error)
|
||||
}
|
||||
await processDocumentsWithQueue(
|
||||
[
|
||||
{
|
||||
documentId,
|
||||
filename: docData.filename,
|
||||
fileUrl: docData.fileUrl,
|
||||
fileSize: docData.fileSize,
|
||||
mimeType: docData.mimeType,
|
||||
},
|
||||
],
|
||||
knowledgeBaseId,
|
||||
processingOptions,
|
||||
requestId
|
||||
)
|
||||
|
||||
logger.info(`[${requestId}] Document retry initiated: ${documentId}`)
|
||||
|
||||
Reference in New Issue
Block a user