improvement(kb): add configurable concurrency to chunks processing, sped up 22x for large docs (#2681)

This commit is contained in:
Waleed
2026-01-05 00:29:31 -08:00
committed by GitHub
parent ed6b9c0c4a
commit 0977ed228f
3 changed files with 53 additions and 31 deletions

View File

@@ -174,9 +174,9 @@ export const env = createEnv({
KB_CONFIG_RETRY_FACTOR: z.number().optional().default(2), // Retry backoff factor
KB_CONFIG_MIN_TIMEOUT: z.number().optional().default(1000), // Min timeout in ms
KB_CONFIG_MAX_TIMEOUT: z.number().optional().default(10000), // Max timeout in ms
KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(20), // Queue concurrency limit
KB_CONFIG_BATCH_SIZE: z.number().optional().default(20), // Processing batch size
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(100), // Delay between batches in ms
KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(50), // Concurrent embedding API calls
KB_CONFIG_BATCH_SIZE: z.number().optional().default(2000), // Chunks to process per embedding batch
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(0), // Delay between batches in ms (0 for max speed)
KB_CONFIG_DELAY_BETWEEN_DOCUMENTS: z.number().optional().default(50), // Delay between documents in ms
// Real-time Communication

View File

@@ -29,10 +29,10 @@ const TIMEOUTS = {
// Configuration for handling large documents
const LARGE_DOC_CONFIG = {
MAX_CHUNKS_PER_BATCH: 500, // Insert embeddings in batches of 500
MAX_EMBEDDING_BATCH: 500, // Generate embeddings in batches of 500
MAX_FILE_SIZE: 100 * 1024 * 1024, // 100MB max file size
MAX_CHUNKS_PER_DOCUMENT: 100000, // Maximum chunks allowed per document
MAX_CHUNKS_PER_BATCH: 500,
MAX_EMBEDDING_BATCH: env.KB_CONFIG_BATCH_SIZE || 2000,
MAX_FILE_SIZE: 100 * 1024 * 1024,
MAX_CHUNKS_PER_DOCUMENT: 100000,
}
/**

View File

@@ -7,6 +7,7 @@ import { batchByTokenLimit, getTotalTokenCount } from '@/lib/tokenization'
const logger = createLogger('EmbeddingUtils')
const MAX_TOKENS_PER_REQUEST = 8000
const MAX_CONCURRENT_BATCHES = env.KB_CONFIG_CONCURRENCY_LIMIT || 50
export class EmbeddingAPIError extends Error {
public status: number
@@ -121,8 +122,29 @@ async function callEmbeddingAPI(inputs: string[], config: EmbeddingConfig): Prom
}
/**
* Generate embeddings for multiple texts with token-aware batching
* Uses tiktoken for token counting
* Process batches with controlled concurrency
*/
async function processWithConcurrency<T, R>(
items: T[],
concurrency: number,
processor: (item: T, index: number) => Promise<R>
): Promise<R[]> {
const results: R[] = new Array(items.length)
let currentIndex = 0
const workers = Array.from({ length: Math.min(concurrency, items.length) }, async () => {
while (currentIndex < items.length) {
const index = currentIndex++
results[index] = await processor(items[index], index)
}
})
await Promise.all(workers)
return results
}
/**
* Generate embeddings for multiple texts with token-aware batching and parallel processing
*/
export async function generateEmbeddings(
texts: string[],
@@ -138,35 +160,35 @@ export async function generateEmbeddings(
const batches = batchByTokenLimit(texts, MAX_TOKENS_PER_REQUEST, embeddingModel)
logger.info(
`Split ${texts.length} texts into ${batches.length} batches (max ${MAX_TOKENS_PER_REQUEST} tokens per batch)`
`Split ${texts.length} texts into ${batches.length} batches (max ${MAX_TOKENS_PER_REQUEST} tokens per batch, ${MAX_CONCURRENT_BATCHES} concurrent)`
)
const allEmbeddings: number[][] = []
for (let i = 0; i < batches.length; i++) {
const batch = batches[i]
const batchTokenCount = getTotalTokenCount(batch, embeddingModel)
logger.info(
`Processing batch ${i + 1}/${batches.length}: ${batch.length} texts, ${batchTokenCount} tokens`
)
try {
const batchEmbeddings = await callEmbeddingAPI(batch, config)
allEmbeddings.push(...batchEmbeddings)
const batchResults = await processWithConcurrency(
batches,
MAX_CONCURRENT_BATCHES,
async (batch, i) => {
const batchTokenCount = getTotalTokenCount(batch, embeddingModel)
logger.info(
`Generated ${batchEmbeddings.length} embeddings for batch ${i + 1}/${batches.length}`
`Processing batch ${i + 1}/${batches.length}: ${batch.length} texts, ${batchTokenCount} tokens`
)
} catch (error) {
logger.error(`Failed to generate embeddings for batch ${i + 1}:`, error)
throw error
}
if (i + 1 < batches.length) {
await new Promise((resolve) => setTimeout(resolve, 100))
try {
const batchEmbeddings = await callEmbeddingAPI(batch, config)
logger.info(
`Generated ${batchEmbeddings.length} embeddings for batch ${i + 1}/${batches.length}`
)
return batchEmbeddings
} catch (error) {
logger.error(`Failed to generate embeddings for batch ${i + 1}:`, error)
throw error
}
}
}
)
const allEmbeddings = batchResults.flat()
logger.info(`Successfully generated ${allEmbeddings.length} embeddings total`)