From ef5e2b699c252ad08073aabe7e83e01ae5817c14 Mon Sep 17 00:00:00 2001 From: Waleed Date: Fri, 29 Aug 2025 18:37:23 -0700 Subject: [PATCH] feat(kb): add adjustable concurrency and batching to uploads and embeddings (#1198) --- apps/sim/background/knowledge-processing.ts | 13 +++++------ apps/sim/lib/env.ts | 11 ++++++++++ apps/sim/lib/knowledge/documents/service.ts | 24 ++++++++++----------- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/apps/sim/background/knowledge-processing.ts b/apps/sim/background/knowledge-processing.ts index 90468bda28..0260ed0962 100644 --- a/apps/sim/background/knowledge-processing.ts +++ b/apps/sim/background/knowledge-processing.ts @@ -1,4 +1,5 @@ import { task } from '@trigger.dev/sdk' +import { env } from '@/lib/env' import { processDocumentAsync } from '@/lib/knowledge/documents/service' import { createLogger } from '@/lib/logs/console/logger' @@ -25,15 +26,15 @@ export type DocumentProcessingPayload = { export const processDocument = task({ id: 'knowledge-process-document', - maxDuration: 300, + maxDuration: env.KB_CONFIG_MAX_DURATION, retry: { - maxAttempts: 3, - factor: 2, - minTimeoutInMs: 1000, - maxTimeoutInMs: 10000, + maxAttempts: env.KB_CONFIG_MAX_ATTEMPTS, + factor: env.KB_CONFIG_RETRY_FACTOR, + minTimeoutInMs: env.KB_CONFIG_MIN_TIMEOUT, + maxTimeoutInMs: env.KB_CONFIG_MAX_TIMEOUT, }, queue: { - concurrencyLimit: 20, + concurrencyLimit: env.KB_CONFIG_CONCURRENCY_LIMIT, name: 'document-processing-queue', }, run: async (payload: DocumentProcessingPayload) => { diff --git a/apps/sim/lib/env.ts b/apps/sim/lib/env.ts index 67336a6448..d03cc674c2 100644 --- a/apps/sim/lib/env.ts +++ b/apps/sim/lib/env.ts @@ -139,6 +139,17 @@ export const env = createEnv({ RATE_LIMIT_ENTERPRISE_SYNC: z.string().optional().default('150'), // Enterprise tier sync API executions per minute RATE_LIMIT_ENTERPRISE_ASYNC: z.string().optional().default('1000'), // Enterprise tier async API executions per minute + // Knowledge Base Processing Configuration - Shared across all processing methods + KB_CONFIG_MAX_DURATION: z.number().optional().default(300), // Max processing duration in s + KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts + KB_CONFIG_RETRY_FACTOR: z.number().optional().default(2), // Retry backoff factor + KB_CONFIG_MIN_TIMEOUT: z.number().optional().default(1000), // Min timeout in ms + KB_CONFIG_MAX_TIMEOUT: z.number().optional().default(10000), // Max timeout in ms + KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(20), // Queue concurrency limit + KB_CONFIG_BATCH_SIZE: z.number().optional().default(20), // Processing batch size + KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(100), // Delay between batches in ms + KB_CONFIG_DELAY_BETWEEN_DOCUMENTS: z.number().optional().default(50), // Delay between documents in ms + // Real-time Communication SOCKET_SERVER_URL: z.string().url().optional(), // WebSocket server URL for real-time features SOCKET_PORT: z.number().optional(), // Port for WebSocket server diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index 3a4c1ad69d..3654e541e8 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -17,8 +17,8 @@ import type { DocumentSortField, SortOrder } from './types' const logger = createLogger('DocumentService') const TIMEOUTS = { - OVERALL_PROCESSING: 600000, - EMBEDDINGS_API: 180000, + OVERALL_PROCESSING: env.KB_CONFIG_MAX_DURATION * 1000, + EMBEDDINGS_API: env.KB_CONFIG_MAX_TIMEOUT * 18, } as const /** @@ -38,17 +38,17 @@ function withTimeout( } const PROCESSING_CONFIG = { - maxConcurrentDocuments: 4, - batchSize: 10, - delayBetweenBatches: 200, - delayBetweenDocuments: 100, + maxConcurrentDocuments: Math.max(1, Math.floor(env.KB_CONFIG_CONCURRENCY_LIMIT / 5)) || 4, + batchSize: Math.max(1, Math.floor(env.KB_CONFIG_BATCH_SIZE / 2)) || 10, + delayBetweenBatches: env.KB_CONFIG_DELAY_BETWEEN_BATCHES * 2, + delayBetweenDocuments: env.KB_CONFIG_DELAY_BETWEEN_DOCUMENTS * 2, } const REDIS_PROCESSING_CONFIG = { - maxConcurrentDocuments: 12, - batchSize: 20, - delayBetweenBatches: 100, - delayBetweenDocuments: 50, + maxConcurrentDocuments: env.KB_CONFIG_CONCURRENCY_LIMIT, + batchSize: env.KB_CONFIG_BATCH_SIZE, + delayBetweenBatches: env.KB_CONFIG_DELAY_BETWEEN_BATCHES, + delayBetweenDocuments: env.KB_CONFIG_DELAY_BETWEEN_DOCUMENTS, } let documentQueue: DocumentProcessingQueue | null = null @@ -59,8 +59,8 @@ export function getDocumentQueue(): DocumentProcessingQueue { const config = redisClient ? REDIS_PROCESSING_CONFIG : PROCESSING_CONFIG documentQueue = new DocumentProcessingQueue({ maxConcurrent: config.maxConcurrentDocuments, - retryDelay: 2000, - maxRetries: 5, + retryDelay: env.KB_CONFIG_MIN_TIMEOUT, + maxRetries: env.KB_CONFIG_MAX_ATTEMPTS, }) } return documentQueue