mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
fix(knowledge): enqueue docs per-batch instead of post-loop to survive sync timeouts
This commit is contained in:
@@ -401,8 +401,6 @@ export async function executeSync(
|
||||
|
||||
const seenExternalIds = new Set<string>()
|
||||
|
||||
const pendingProcessing: DocumentData[] = []
|
||||
|
||||
const pendingOps: DocOp[] = []
|
||||
for (const extDoc of externalDocs) {
|
||||
seenExternalIds.add(extDoc.externalId)
|
||||
@@ -508,10 +506,11 @@ export async function executeSync(
|
||||
})
|
||||
)
|
||||
|
||||
const batchDocs: DocumentData[] = []
|
||||
for (let j = 0; j < settled.length; j++) {
|
||||
const outcome = settled[j]
|
||||
if (outcome.status === 'fulfilled') {
|
||||
pendingProcessing.push(outcome.value)
|
||||
batchDocs.push(outcome.value)
|
||||
if (batch[j].type === 'add') result.docsAdded++
|
||||
else result.docsUpdated++
|
||||
} else {
|
||||
@@ -524,6 +523,26 @@ export async function executeSync(
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue this batch for processing immediately so that even if the sync
|
||||
// task times out, all documents inserted up to this point are already
|
||||
// queued for processing and won't be orphaned.
|
||||
if (batchDocs.length > 0) {
|
||||
try {
|
||||
await processDocumentsWithQueue(
|
||||
batchDocs,
|
||||
connector.knowledgeBaseId,
|
||||
{},
|
||||
crypto.randomUUID()
|
||||
)
|
||||
} catch (error) {
|
||||
logger.warn('Failed to enqueue batch for processing — will retry on next sync', {
|
||||
connectorId,
|
||||
count: batchDocs.length,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Skip deletion reconciliation during incremental syncs — results only contain changed docs
|
||||
@@ -608,24 +627,6 @@ export async function executeSync(
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue all added/updated documents for processing in a single batch
|
||||
if (pendingProcessing.length > 0) {
|
||||
try {
|
||||
await processDocumentsWithQueue(
|
||||
pendingProcessing,
|
||||
connector.knowledgeBaseId,
|
||||
{},
|
||||
crypto.randomUUID()
|
||||
)
|
||||
} catch (error) {
|
||||
logger.warn('Failed to enqueue documents for processing — will retry on next sync', {
|
||||
connectorId,
|
||||
count: pendingProcessing.length,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
await completeSyncLog(syncLogId, 'completed', result)
|
||||
|
||||
const [{ count: actualDocCount }] = await db
|
||||
|
||||
Reference in New Issue
Block a user