improvement(kb): use trigger.dev for kb tasks (#1166)

This commit is contained in:
Waleed
2025-08-28 12:14:31 -07:00
committed by GitHub
parent bda8ee772a
commit 3f900947ce
5 changed files with 171 additions and 19 deletions

View File

@@ -232,10 +232,9 @@ async function handleS3PresignedUrl(
)
}
// For chat images, use direct S3 URLs since they need to be permanently accessible
// For other files, use serve path for access control
// For chat images and knowledge base files, use direct URLs since they need to be accessible by external services
const finalPath =
uploadType === 'chat'
uploadType === 'chat' || uploadType === 'knowledge-base'
? `https://${config.bucket}.s3.${config.region}.amazonaws.com/${uniqueKey}`
: `/api/files/serve/s3/${encodeURIComponent(uniqueKey)}`

View File

@@ -0,0 +1,60 @@
import { task } from '@trigger.dev/sdk'
import { processDocumentAsync } from '@/lib/knowledge/documents/service'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('TriggerKnowledgeProcessing')
export type DocumentProcessingPayload = {
knowledgeBaseId: string
documentId: string
docData: {
filename: string
fileUrl: string
fileSize: number
mimeType: string
}
processingOptions: {
chunkSize?: number
minCharactersPerChunk?: number
recipe?: string
lang?: string
chunkOverlap?: number
}
requestId: string
}
export const processDocument = task({
id: 'knowledge-process-document',
maxDuration: 300,
retry: {
maxAttempts: 3,
factor: 2,
minTimeoutInMs: 1000,
maxTimeoutInMs: 10000,
},
queue: {
concurrencyLimit: 20,
name: 'document-processing-queue',
},
run: async (payload: DocumentProcessingPayload) => {
const { knowledgeBaseId, documentId, docData, processingOptions, requestId } = payload
logger.info(`[${requestId}] Starting Trigger.dev processing for document: ${docData.filename}`)
try {
await processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions)
logger.info(`[${requestId}] Successfully processed document: ${docData.filename}`)
return {
success: true,
documentId,
filename: docData.filename,
processingTime: Date.now(),
}
} catch (error) {
logger.error(`[${requestId}] Failed to process document: ${docData.filename}`, error)
throw error
}
},
})

View File

@@ -1,11 +1,14 @@
import crypto, { randomUUID } from 'crypto'
import { tasks } from '@trigger.dev/sdk'
import { and, asc, desc, eq, inArray, isNull, sql } from 'drizzle-orm'
import { getSlotsForFieldType, type TAG_SLOT_CONFIG } from '@/lib/constants/knowledge'
import { generateEmbeddings } from '@/lib/embeddings/utils'
import { env } from '@/lib/env'
import { processDocument } from '@/lib/knowledge/documents/document-processor'
import { getNextAvailableSlot } from '@/lib/knowledge/tags/service'
import { createLogger } from '@/lib/logs/console/logger'
import { getRedisClient } from '@/lib/redis'
import type { DocumentProcessingPayload } from '@/background/knowledge-processing'
import { db } from '@/db'
import { document, embedding, knowledgeBaseTagDefinitions } from '@/db/schema'
import { DocumentProcessingQueue } from './queue'
@@ -181,7 +184,7 @@ export async function processDocumentTags(
}
/**
* Process documents with Redis queue when available, fallback to concurrency control
* Process documents with best available method: Trigger.dev > Redis queue > in-memory concurrency control
*/
export async function processDocumentsWithQueue(
createdDocuments: DocumentData[],
@@ -189,6 +192,47 @@ export async function processDocumentsWithQueue(
processingOptions: ProcessingOptions,
requestId: string
): Promise<void> {
// Priority 1: Trigger.dev
if (isTriggerAvailable()) {
try {
logger.info(
`[${requestId}] Using Trigger.dev background processing for ${createdDocuments.length} documents`
)
const triggerPayloads = createdDocuments.map((doc) => ({
knowledgeBaseId,
documentId: doc.documentId,
docData: {
filename: doc.filename,
fileUrl: doc.fileUrl,
fileSize: doc.fileSize,
mimeType: doc.mimeType,
},
processingOptions: {
chunkSize: processingOptions.chunkSize || 1024,
minCharactersPerChunk: processingOptions.minCharactersPerChunk || 1,
recipe: processingOptions.recipe || 'default',
lang: processingOptions.lang || 'en',
chunkOverlap: processingOptions.chunkOverlap || 200,
},
requestId,
}))
const result = await processDocumentsWithTrigger(triggerPayloads, requestId)
if (result.success) {
logger.info(
`[${requestId}] Successfully triggered background processing: ${result.message}`
)
return
}
logger.warn(`[${requestId}] Trigger.dev failed: ${result.message}, falling back to Redis`)
} catch (error) {
logger.warn(`[${requestId}] Trigger.dev processing failed, falling back to Redis:`, error)
}
}
// Priority 2: Redis queue
const queue = getDocumentQueue()
const redisClient = getRedisClient()
@@ -213,6 +257,7 @@ export async function processDocumentsWithQueue(
await Promise.all(jobPromises)
// Start Redis background processing
queue
.processJobs(async (job) => {
const data = job.data as DocumentJobData
@@ -221,7 +266,6 @@ export async function processDocumentsWithQueue(
})
.catch((error) => {
logger.error(`[${requestId}] Error in Redis queue processing:`, error)
// Don't throw here - let the processing continue with fallback if needed
})
logger.info(`[${requestId}] All documents queued for Redis processing`)
@@ -231,7 +275,10 @@ export async function processDocumentsWithQueue(
}
}
logger.info(`[${requestId}] Using fallback in-memory processing (Redis not available or failed)`)
// Priority 3: In-memory processing
logger.info(
`[${requestId}] Using fallback in-memory processing (neither Trigger.dev nor Redis available)`
)
await processDocumentsWithConcurrencyControl(
createdDocuments,
knowledgeBaseId,
@@ -500,6 +547,51 @@ export async function processDocumentAsync(
}
}
/**
* Check if Trigger.dev is available and configured
*/
export function isTriggerAvailable(): boolean {
return !!(env.TRIGGER_SECRET_KEY && env.TRIGGER_DEV_ENABLED !== false)
}
/**
* Process documents using Trigger.dev
*/
export async function processDocumentsWithTrigger(
documents: DocumentProcessingPayload[],
requestId: string
): Promise<{ success: boolean; message: string; jobIds?: string[] }> {
if (!isTriggerAvailable()) {
throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing')
}
try {
logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`)
const jobPromises = documents.map(async (document) => {
const job = await tasks.trigger('knowledge-process-document', document)
return job.id
})
const jobIds = await Promise.all(jobPromises)
logger.info(`[${requestId}] Triggered ${jobIds.length} document processing jobs`)
return {
success: true,
message: `${documents.length} document processing jobs triggered`,
jobIds,
}
} catch (error) {
logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error)
return {
success: false,
message: error instanceof Error ? error.message : 'Failed to trigger background jobs',
}
}
}
/**
* Create document records in database with tags
*/
@@ -644,8 +736,8 @@ export async function getDocuments(
search,
limit = 50,
offset = 0,
sortBy = 'uploadedAt',
sortOrder = 'desc',
sortBy = 'filename',
sortOrder = 'asc',
} = options
// Build where conditions
@@ -696,7 +788,10 @@ export async function getDocuments(
}
}
const orderByClause = sortOrder === 'asc' ? asc(getOrderByColumn()) : desc(getOrderByColumn())
// Use stable secondary sort to prevent shifting when primary values are identical
const primaryOrderBy = sortOrder === 'asc' ? asc(getOrderByColumn()) : desc(getOrderByColumn())
const secondaryOrderBy =
sortBy === 'filename' ? desc(document.uploadedAt) : asc(document.filename)
const documents = await db
.select({
@@ -725,7 +820,7 @@ export async function getDocuments(
})
.from(document)
.where(and(...whereConditions))
.orderBy(orderByClause)
.orderBy(primaryOrderBy, secondaryOrderBy)
.limit(limit)
.offset(offset)

View File

@@ -68,7 +68,7 @@
"@radix-ui/react-tooltip": "^1.1.6",
"@react-email/components": "^0.0.34",
"@sentry/nextjs": "^9.15.0",
"@trigger.dev/sdk": "4.0.0",
"@trigger.dev/sdk": "4.0.1",
"@types/pg": "8.15.5",
"@types/three": "0.177.0",
"@vercel/og": "^0.6.5",
@@ -134,7 +134,7 @@
"@testing-library/jest-dom": "^6.6.3",
"@testing-library/react": "^16.3.0",
"@testing-library/user-event": "^14.6.1",
"@trigger.dev/build": "4.0.0",
"@trigger.dev/build": "4.0.1",
"@types/html-to-text": "^9.0.4",
"@types/js-yaml": "4.0.9",
"@types/jsdom": "21.1.7",

View File

@@ -98,7 +98,7 @@
"@radix-ui/react-tooltip": "^1.1.6",
"@react-email/components": "^0.0.34",
"@sentry/nextjs": "^9.15.0",
"@trigger.dev/sdk": "4.0.0",
"@trigger.dev/sdk": "4.0.1",
"@types/pg": "8.15.5",
"@types/three": "0.177.0",
"@vercel/og": "^0.6.5",
@@ -164,7 +164,7 @@
"@testing-library/jest-dom": "^6.6.3",
"@testing-library/react": "^16.3.0",
"@testing-library/user-event": "^14.6.1",
"@trigger.dev/build": "4.0.0",
"@trigger.dev/build": "4.0.1",
"@types/html-to-text": "^9.0.4",
"@types/js-yaml": "4.0.9",
"@types/jsdom": "21.1.7",
@@ -1276,11 +1276,11 @@
"@testing-library/user-event": ["@testing-library/user-event@14.6.1", "", { "peerDependencies": { "@testing-library/dom": ">=7.21.4" } }, "sha512-vq7fv0rnt+QTXgPxr5Hjc210p6YKq2kmdziLgnsZGgLJ9e6VAShx1pACLuRjd/AS/sr7phAR58OIIpf0LlmQNw=="],
"@trigger.dev/build": ["@trigger.dev/build@4.0.0", "", { "dependencies": { "@trigger.dev/core": "4.0.0", "pkg-types": "^1.1.3", "tinyglobby": "^0.2.2", "tsconfck": "3.1.3" } }, "sha512-OXTTS+pV6ZuqcCtWhiDoW/zB6lrnG1YtkGgYT+QRt+HYeYdOoVBfYfv0y8x3U4Yfiw9kznwQC/sDB1b6DiHtBA=="],
"@trigger.dev/build": ["@trigger.dev/build@4.0.1", "", { "dependencies": { "@trigger.dev/core": "4.0.1", "pkg-types": "^1.1.3", "tinyglobby": "^0.2.2", "tsconfck": "3.1.3" } }, "sha512-PGOnCPjVSKkj72xmJb6mdRbzDSP3Ti/C5/tfaBFdSZ7qcoVctSzDfS5iwEGsSoSWSIv+MVy12c4v7Ji/r7MO1A=="],
"@trigger.dev/core": ["@trigger.dev/core@4.0.0", "", { "dependencies": { "@bugsnag/cuid": "^3.1.1", "@electric-sql/client": "1.0.0-beta.1", "@google-cloud/precise-date": "^4.0.0", "@jsonhero/path": "^1.0.21", "@opentelemetry/api": "1.9.0", "@opentelemetry/api-logs": "0.203.0", "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", "dequal": "^2.0.3", "eventsource": "^3.0.5", "eventsource-parser": "^3.0.0", "execa": "^8.0.1", "humanize-duration": "^3.27.3", "jose": "^5.4.0", "lodash.get": "^4.4.2", "nanoid": "3.3.8", "prom-client": "^15.1.0", "socket.io": "4.7.4", "socket.io-client": "4.7.5", "std-env": "^3.8.1", "superjson": "^2.2.1", "tinyexec": "^0.3.2", "uncrypto": "^0.1.3", "zod": "3.25.76", "zod-error": "1.5.0", "zod-validation-error": "^1.5.0" } }, "sha512-VlRMN6RPeqU66e/j0fGmWTn97DY1b+ChsMDDBm62jZ3N9XtiOlDkrWNtggPoxPtyXsHuShllo/3gpiZDvhtKww=="],
"@trigger.dev/core": ["@trigger.dev/core@4.0.1", "", { "dependencies": { "@bugsnag/cuid": "^3.1.1", "@electric-sql/client": "1.0.0-beta.1", "@google-cloud/precise-date": "^4.0.0", "@jsonhero/path": "^1.0.21", "@opentelemetry/api": "1.9.0", "@opentelemetry/api-logs": "0.203.0", "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", "dequal": "^2.0.3", "eventsource": "^3.0.5", "eventsource-parser": "^3.0.0", "execa": "^8.0.1", "humanize-duration": "^3.27.3", "jose": "^5.4.0", "nanoid": "3.3.8", "prom-client": "^15.1.0", "socket.io": "4.7.4", "socket.io-client": "4.7.5", "std-env": "^3.8.1", "superjson": "^2.2.1", "tinyexec": "^0.3.2", "uncrypto": "^0.1.3", "zod": "3.25.76", "zod-error": "1.5.0", "zod-validation-error": "^1.5.0" } }, "sha512-NTffiVPy/zFopujdptGGoy3lj3/CKV16JA8CobCfsEpDfu+K+wEys+9p8PFY8j5I0UI86aqlFpJu9/VRqUQ/yQ=="],
"@trigger.dev/sdk": ["@trigger.dev/sdk@4.0.0", "", { "dependencies": { "@opentelemetry/api": "1.9.0", "@opentelemetry/semantic-conventions": "1.36.0", "@trigger.dev/core": "4.0.0", "chalk": "^5.2.0", "cronstrue": "^2.21.0", "debug": "^4.3.4", "evt": "^2.4.13", "slug": "^6.0.0", "ulid": "^2.3.0", "uncrypto": "^0.1.3", "uuid": "^9.0.0", "ws": "^8.11.0" }, "peerDependencies": { "ai": "^4.2.0 || ^5.0.0", "zod": "^3.0.0 || ^4.0.0" }, "optionalPeers": ["ai"] }, "sha512-rq7XvY4jxCmWr6libN1egw8w0Bq0TWbbnAxCCXDScgWEszLauYmXy8WaVlJyxbwslVMHsvXP36JBFa3J3ay2yg=="],
"@trigger.dev/sdk": ["@trigger.dev/sdk@4.0.1", "", { "dependencies": { "@opentelemetry/api": "1.9.0", "@opentelemetry/semantic-conventions": "1.36.0", "@trigger.dev/core": "4.0.1", "chalk": "^5.2.0", "cronstrue": "^2.21.0", "debug": "^4.3.4", "evt": "^2.4.13", "slug": "^6.0.0", "ulid": "^2.3.0", "uncrypto": "^0.1.3", "uuid": "^9.0.0", "ws": "^8.11.0" }, "peerDependencies": { "ai": "^4.2.0 || ^5.0.0", "zod": "^3.0.0 || ^4.0.0" }, "optionalPeers": ["ai"] }, "sha512-cdEgrwIl2Kg2jd85dA4tdePPPe+iMjAGX0Q8QrO2CNo/iBcjl7jB7uzvmSjDKYmJoC+8a30fCWviYy6ljOs1oQ=="],
"@tweenjs/tween.js": ["@tweenjs/tween.js@23.1.3", "", {}, "sha512-vJmvvwFxYuGnF2axRtPYocag6Clbb5YS7kLL+SO/TeVFzHqDIWrNKYtcsPMibjDx9O+bu+psAy9NKfWklassUA=="],
@@ -2304,8 +2304,6 @@
"lodash.defaults": ["lodash.defaults@4.2.0", "", {}, "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ=="],
"lodash.get": ["lodash.get@4.4.2", "", {}, "sha512-z+Uw/vLuy6gQe8cfaFWD7p0wVv8fJl3mbzXh33RS+0oW2wvUqiRXiQ69gLWSLpgB5/6sU+r6BlQR0MBILadqTQ=="],
"lodash.isarguments": ["lodash.isarguments@3.1.0", "", {}, "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg=="],
"lodash.merge": ["lodash.merge@4.6.2", "", {}, "sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ=="],