feat(kb): add adjustable concurrency and batching to uploads and embeddings (#1198)

This commit is contained in:
Waleed
2025-08-29 18:37:23 -07:00
committed by GitHub
parent 4310dd6c15
commit a3838302e0
3 changed files with 30 additions and 18 deletions

View File

@@ -1,4 +1,5 @@
import { task } from '@trigger.dev/sdk'
import { env } from '@/lib/env'
import { processDocumentAsync } from '@/lib/knowledge/documents/service'
import { createLogger } from '@/lib/logs/console/logger'
@@ -25,15 +26,15 @@ export type DocumentProcessingPayload = {
export const processDocument = task({
id: 'knowledge-process-document',
maxDuration: 300,
maxDuration: env.KB_CONFIG_MAX_DURATION,
retry: {
maxAttempts: 3,
factor: 2,
minTimeoutInMs: 1000,
maxTimeoutInMs: 10000,
maxAttempts: env.KB_CONFIG_MAX_ATTEMPTS,
factor: env.KB_CONFIG_RETRY_FACTOR,
minTimeoutInMs: env.KB_CONFIG_MIN_TIMEOUT,
maxTimeoutInMs: env.KB_CONFIG_MAX_TIMEOUT,
},
queue: {
concurrencyLimit: 20,
concurrencyLimit: env.KB_CONFIG_CONCURRENCY_LIMIT,
name: 'document-processing-queue',
},
run: async (payload: DocumentProcessingPayload) => {

View File

@@ -139,6 +139,17 @@ export const env = createEnv({
RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('150'), // Enterprise tier sync API executions per minute
RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('1000'), // Enterprise tier async API executions per minute
// Knowledge Base Processing Configuration - Shared across all processing methods
KB_CONFIG_MAX_DURATION: z.number().optional().default(300), // Max processing duration in s
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts
KB_CONFIG_RETRY_FACTOR: z.number().optional().default(2), // Retry backoff factor
KB_CONFIG_MIN_TIMEOUT: z.number().optional().default(1000), // Min timeout in ms
KB_CONFIG_MAX_TIMEOUT: z.number().optional().default(10000), // Max timeout in ms
KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(20), // Queue concurrency limit
KB_CONFIG_BATCH_SIZE: z.number().optional().default(20), // Processing batch size
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(100), // Delay between batches in ms
KB_CONFIG_DELAY_BETWEEN_DOCUMENTS: z.number().optional().default(50), // Delay between documents in ms
// Real-time Communication
SOCKET_SERVER_URL: z.string().url().optional(), // WebSocket server URL for real-time features
SOCKET_PORT: z.number().optional(), // Port for WebSocket server

View File

@@ -17,8 +17,8 @@ import type { DocumentSortField, SortOrder } from './types'
const logger = createLogger('DocumentService')
const TIMEOUTS = {
OVERALL_PROCESSING: 600000,
EMBEDDINGS_API: 180000,
OVERALL_PROCESSING: env.KB_CONFIG_MAX_DURATION * 1000,
EMBEDDINGS_API: env.KB_CONFIG_MAX_TIMEOUT * 18,
} as const
/**
@@ -38,17 +38,17 @@ function withTimeout<T>(
}
const PROCESSING_CONFIG = {
maxConcurrentDocuments: 4,
batchSize: 10,
delayBetweenBatches: 200,
delayBetweenDocuments: 100,
maxConcurrentDocuments: Math.max(1, Math.floor(env.KB_CONFIG_CONCURRENCY_LIMIT / 5)) || 4,
batchSize: Math.max(1, Math.floor(env.KB_CONFIG_BATCH_SIZE / 2)) || 10,
delayBetweenBatches: env.KB_CONFIG_DELAY_BETWEEN_BATCHES * 2,
delayBetweenDocuments: env.KB_CONFIG_DELAY_BETWEEN_DOCUMENTS * 2,
}
const REDIS_PROCESSING_CONFIG = {
maxConcurrentDocuments: 12,
batchSize: 20,
delayBetweenBatches: 100,
delayBetweenDocuments: 50,
maxConcurrentDocuments: env.KB_CONFIG_CONCURRENCY_LIMIT,
batchSize: env.KB_CONFIG_BATCH_SIZE,
delayBetweenBatches: env.KB_CONFIG_DELAY_BETWEEN_BATCHES,
delayBetweenDocuments: env.KB_CONFIG_DELAY_BETWEEN_DOCUMENTS,
}
let documentQueue: DocumentProcessingQueue | null = null
@@ -59,8 +59,8 @@ export function getDocumentQueue(): DocumentProcessingQueue {
const config = redisClient ? REDIS_PROCESSING_CONFIG : PROCESSING_CONFIG
documentQueue = new DocumentProcessingQueue({
maxConcurrent: config.maxConcurrentDocuments,
retryDelay: 2000,
maxRetries: 5,
retryDelay: env.KB_CONFIG_MIN_TIMEOUT,
maxRetries: env.KB_CONFIG_MAX_ATTEMPTS,
})
}
return documentQueue