diff --git a/apps/sim/app/api/files/presigned/route.ts b/apps/sim/app/api/files/presigned/route.ts index 2775f96a6..496e7ab6f 100644 --- a/apps/sim/app/api/files/presigned/route.ts +++ b/apps/sim/app/api/files/presigned/route.ts @@ -232,10 +232,9 @@ async function handleS3PresignedUrl( ) } - // For chat images, use direct S3 URLs since they need to be permanently accessible - // For other files, use serve path for access control + // For chat images and knowledge base files, use direct URLs since they need to be accessible by external services const finalPath = - uploadType === 'chat' + uploadType === 'chat' || uploadType === 'knowledge-base' ? `https://${config.bucket}.s3.${config.region}.amazonaws.com/${uniqueKey}` : `/api/files/serve/s3/${encodeURIComponent(uniqueKey)}` diff --git a/apps/sim/background/knowledge-processing.ts b/apps/sim/background/knowledge-processing.ts new file mode 100644 index 000000000..90468bda2 --- /dev/null +++ b/apps/sim/background/knowledge-processing.ts @@ -0,0 +1,60 @@ +import { task } from '@trigger.dev/sdk' +import { processDocumentAsync } from '@/lib/knowledge/documents/service' +import { createLogger } from '@/lib/logs/console/logger' + +const logger = createLogger('TriggerKnowledgeProcessing') + +export type DocumentProcessingPayload = { + knowledgeBaseId: string + documentId: string + docData: { + filename: string + fileUrl: string + fileSize: number + mimeType: string + } + processingOptions: { + chunkSize?: number + minCharactersPerChunk?: number + recipe?: string + lang?: string + chunkOverlap?: number + } + requestId: string +} + +export const processDocument = task({ + id: 'knowledge-process-document', + maxDuration: 300, + retry: { + maxAttempts: 3, + factor: 2, + minTimeoutInMs: 1000, + maxTimeoutInMs: 10000, + }, + queue: { + concurrencyLimit: 20, + name: 'document-processing-queue', + }, + run: async (payload: DocumentProcessingPayload) => { + const { knowledgeBaseId, documentId, docData, processingOptions, requestId } = payload + + logger.info(`[${requestId}] Starting Trigger.dev processing for document: ${docData.filename}`) + + try { + await processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions) + + logger.info(`[${requestId}] Successfully processed document: ${docData.filename}`) + + return { + success: true, + documentId, + filename: docData.filename, + processingTime: Date.now(), + } + } catch (error) { + logger.error(`[${requestId}] Failed to process document: ${docData.filename}`, error) + throw error + } + }, +}) diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts index ca780baef..3a4c1ad69 100644 --- a/apps/sim/lib/knowledge/documents/service.ts +++ b/apps/sim/lib/knowledge/documents/service.ts @@ -1,11 +1,14 @@ import crypto, { randomUUID } from 'crypto' +import { tasks } from '@trigger.dev/sdk' import { and, asc, desc, eq, inArray, isNull, sql } from 'drizzle-orm' import { getSlotsForFieldType, type TAG_SLOT_CONFIG } from '@/lib/constants/knowledge' import { generateEmbeddings } from '@/lib/embeddings/utils' +import { env } from '@/lib/env' import { processDocument } from '@/lib/knowledge/documents/document-processor' import { getNextAvailableSlot } from '@/lib/knowledge/tags/service' import { createLogger } from '@/lib/logs/console/logger' import { getRedisClient } from '@/lib/redis' +import type { DocumentProcessingPayload } from '@/background/knowledge-processing' import { db } from '@/db' import { document, embedding, knowledgeBaseTagDefinitions } from '@/db/schema' import { DocumentProcessingQueue } from './queue' @@ -181,7 +184,7 @@ export async function processDocumentTags( } /** - * Process documents with Redis queue when available, fallback to concurrency control + * Process documents with best available method: Trigger.dev > Redis queue > in-memory concurrency control */ export async function processDocumentsWithQueue( createdDocuments: DocumentData[], @@ -189,6 +192,47 @@ export async function processDocumentsWithQueue( processingOptions: ProcessingOptions, requestId: string ): Promise { + // Priority 1: Trigger.dev + if (isTriggerAvailable()) { + try { + logger.info( + `[${requestId}] Using Trigger.dev background processing for ${createdDocuments.length} documents` + ) + + const triggerPayloads = createdDocuments.map((doc) => ({ + knowledgeBaseId, + documentId: doc.documentId, + docData: { + filename: doc.filename, + fileUrl: doc.fileUrl, + fileSize: doc.fileSize, + mimeType: doc.mimeType, + }, + processingOptions: { + chunkSize: processingOptions.chunkSize || 1024, + minCharactersPerChunk: processingOptions.minCharactersPerChunk || 1, + recipe: processingOptions.recipe || 'default', + lang: processingOptions.lang || 'en', + chunkOverlap: processingOptions.chunkOverlap || 200, + }, + requestId, + })) + + const result = await processDocumentsWithTrigger(triggerPayloads, requestId) + + if (result.success) { + logger.info( + `[${requestId}] Successfully triggered background processing: ${result.message}` + ) + return + } + logger.warn(`[${requestId}] Trigger.dev failed: ${result.message}, falling back to Redis`) + } catch (error) { + logger.warn(`[${requestId}] Trigger.dev processing failed, falling back to Redis:`, error) + } + } + + // Priority 2: Redis queue const queue = getDocumentQueue() const redisClient = getRedisClient() @@ -213,6 +257,7 @@ export async function processDocumentsWithQueue( await Promise.all(jobPromises) + // Start Redis background processing queue .processJobs(async (job) => { const data = job.data as DocumentJobData @@ -221,7 +266,6 @@ export async function processDocumentsWithQueue( }) .catch((error) => { logger.error(`[${requestId}] Error in Redis queue processing:`, error) - // Don't throw here - let the processing continue with fallback if needed }) logger.info(`[${requestId}] All documents queued for Redis processing`) @@ -231,7 +275,10 @@ export async function processDocumentsWithQueue( } } - logger.info(`[${requestId}] Using fallback in-memory processing (Redis not available or failed)`) + // Priority 3: In-memory processing + logger.info( + `[${requestId}] Using fallback in-memory processing (neither Trigger.dev nor Redis available)` + ) await processDocumentsWithConcurrencyControl( createdDocuments, knowledgeBaseId, @@ -500,6 +547,51 @@ export async function processDocumentAsync( } } +/** + * Check if Trigger.dev is available and configured + */ +export function isTriggerAvailable(): boolean { + return !!(env.TRIGGER_SECRET_KEY && env.TRIGGER_DEV_ENABLED !== false) +} + +/** + * Process documents using Trigger.dev + */ +export async function processDocumentsWithTrigger( + documents: DocumentProcessingPayload[], + requestId: string +): Promise<{ success: boolean; message: string; jobIds?: string[] }> { + if (!isTriggerAvailable()) { + throw new Error('Trigger.dev is not configured - TRIGGER_SECRET_KEY missing') + } + + try { + logger.info(`[${requestId}] Triggering background processing for ${documents.length} documents`) + + const jobPromises = documents.map(async (document) => { + const job = await tasks.trigger('knowledge-process-document', document) + return job.id + }) + + const jobIds = await Promise.all(jobPromises) + + logger.info(`[${requestId}] Triggered ${jobIds.length} document processing jobs`) + + return { + success: true, + message: `${documents.length} document processing jobs triggered`, + jobIds, + } + } catch (error) { + logger.error(`[${requestId}] Failed to trigger document processing jobs:`, error) + + return { + success: false, + message: error instanceof Error ? error.message : 'Failed to trigger background jobs', + } + } +} + /** * Create document records in database with tags */ @@ -644,8 +736,8 @@ export async function getDocuments( search, limit = 50, offset = 0, - sortBy = 'uploadedAt', - sortOrder = 'desc', + sortBy = 'filename', + sortOrder = 'asc', } = options // Build where conditions @@ -696,7 +788,10 @@ export async function getDocuments( } } - const orderByClause = sortOrder === 'asc' ? asc(getOrderByColumn()) : desc(getOrderByColumn()) + // Use stable secondary sort to prevent shifting when primary values are identical + const primaryOrderBy = sortOrder === 'asc' ? asc(getOrderByColumn()) : desc(getOrderByColumn()) + const secondaryOrderBy = + sortBy === 'filename' ? desc(document.uploadedAt) : asc(document.filename) const documents = await db .select({ @@ -725,7 +820,7 @@ export async function getDocuments( }) .from(document) .where(and(...whereConditions)) - .orderBy(orderByClause) + .orderBy(primaryOrderBy, secondaryOrderBy) .limit(limit) .offset(offset) diff --git a/apps/sim/package.json b/apps/sim/package.json index d576d4f62..2a04904d3 100644 --- a/apps/sim/package.json +++ b/apps/sim/package.json @@ -68,7 +68,7 @@ "@radix-ui/react-tooltip": "^1.1.6", "@react-email/components": "^0.0.34", "@sentry/nextjs": "^9.15.0", - "@trigger.dev/sdk": "4.0.0", + "@trigger.dev/sdk": "4.0.1", "@types/pg": "8.15.5", "@types/three": "0.177.0", "@vercel/og": "^0.6.5", @@ -134,7 +134,7 @@ "@testing-library/jest-dom": "^6.6.3", "@testing-library/react": "^16.3.0", "@testing-library/user-event": "^14.6.1", - "@trigger.dev/build": "4.0.0", + "@trigger.dev/build": "4.0.1", "@types/html-to-text": "^9.0.4", "@types/js-yaml": "4.0.9", "@types/jsdom": "21.1.7", diff --git a/bun.lock b/bun.lock index 8b6df4eac..79d801d91 100644 --- a/bun.lock +++ b/bun.lock @@ -98,7 +98,7 @@ "@radix-ui/react-tooltip": "^1.1.6", "@react-email/components": "^0.0.34", "@sentry/nextjs": "^9.15.0", - "@trigger.dev/sdk": "4.0.0", + "@trigger.dev/sdk": "4.0.1", "@types/pg": "8.15.5", "@types/three": "0.177.0", "@vercel/og": "^0.6.5", @@ -164,7 +164,7 @@ "@testing-library/jest-dom": "^6.6.3", "@testing-library/react": "^16.3.0", "@testing-library/user-event": "^14.6.1", - "@trigger.dev/build": "4.0.0", + "@trigger.dev/build": "4.0.1", "@types/html-to-text": "^9.0.4", "@types/js-yaml": "4.0.9", "@types/jsdom": "21.1.7", @@ -1276,11 +1276,11 @@ "@testing-library/user-event": ["@testing-library/user-event@14.6.1", "", { "peerDependencies": { "@testing-library/dom": ">=7.21.4" } }, "sha512-vq7fv0rnt+QTXgPxr5Hjc210p6YKq2kmdziLgnsZGgLJ9e6VAShx1pACLuRjd/AS/sr7phAR58OIIpf0LlmQNw=="], - "@trigger.dev/build": ["@trigger.dev/build@4.0.0", "", { "dependencies": { "@trigger.dev/core": "4.0.0", "pkg-types": "^1.1.3", "tinyglobby": "^0.2.2", "tsconfck": "3.1.3" } }, "sha512-OXTTS+pV6ZuqcCtWhiDoW/zB6lrnG1YtkGgYT+QRt+HYeYdOoVBfYfv0y8x3U4Yfiw9kznwQC/sDB1b6DiHtBA=="], + "@trigger.dev/build": ["@trigger.dev/build@4.0.1", "", { "dependencies": { "@trigger.dev/core": "4.0.1", "pkg-types": "^1.1.3", "tinyglobby": "^0.2.2", "tsconfck": "3.1.3" } }, "sha512-PGOnCPjVSKkj72xmJb6mdRbzDSP3Ti/C5/tfaBFdSZ7qcoVctSzDfS5iwEGsSoSWSIv+MVy12c4v7Ji/r7MO1A=="], - "@trigger.dev/core": ["@trigger.dev/core@4.0.0", "", { "dependencies": { "@bugsnag/cuid": "^3.1.1", "@electric-sql/client": "1.0.0-beta.1", "@google-cloud/precise-date": "^4.0.0", "@jsonhero/path": "^1.0.21", "@opentelemetry/api": "1.9.0", "@opentelemetry/api-logs": "0.203.0", "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", "dequal": "^2.0.3", "eventsource": "^3.0.5", "eventsource-parser": "^3.0.0", "execa": "^8.0.1", "humanize-duration": "^3.27.3", "jose": "^5.4.0", "lodash.get": "^4.4.2", "nanoid": "3.3.8", "prom-client": "^15.1.0", "socket.io": "4.7.4", "socket.io-client": "4.7.5", "std-env": "^3.8.1", "superjson": "^2.2.1", "tinyexec": "^0.3.2", "uncrypto": "^0.1.3", "zod": "3.25.76", "zod-error": "1.5.0", "zod-validation-error": "^1.5.0" } }, "sha512-VlRMN6RPeqU66e/j0fGmWTn97DY1b+ChsMDDBm62jZ3N9XtiOlDkrWNtggPoxPtyXsHuShllo/3gpiZDvhtKww=="], + "@trigger.dev/core": ["@trigger.dev/core@4.0.1", "", { "dependencies": { "@bugsnag/cuid": "^3.1.1", "@electric-sql/client": "1.0.0-beta.1", "@google-cloud/precise-date": "^4.0.0", "@jsonhero/path": "^1.0.21", "@opentelemetry/api": "1.9.0", "@opentelemetry/api-logs": "0.203.0", "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", "dequal": "^2.0.3", "eventsource": "^3.0.5", "eventsource-parser": "^3.0.0", "execa": "^8.0.1", "humanize-duration": "^3.27.3", "jose": "^5.4.0", "nanoid": "3.3.8", "prom-client": "^15.1.0", "socket.io": "4.7.4", "socket.io-client": "4.7.5", "std-env": "^3.8.1", "superjson": "^2.2.1", "tinyexec": "^0.3.2", "uncrypto": "^0.1.3", "zod": "3.25.76", "zod-error": "1.5.0", "zod-validation-error": "^1.5.0" } }, "sha512-NTffiVPy/zFopujdptGGoy3lj3/CKV16JA8CobCfsEpDfu+K+wEys+9p8PFY8j5I0UI86aqlFpJu9/VRqUQ/yQ=="], - "@trigger.dev/sdk": ["@trigger.dev/sdk@4.0.0", "", { "dependencies": { "@opentelemetry/api": "1.9.0", "@opentelemetry/semantic-conventions": "1.36.0", "@trigger.dev/core": "4.0.0", "chalk": "^5.2.0", "cronstrue": "^2.21.0", "debug": "^4.3.4", "evt": "^2.4.13", "slug": "^6.0.0", "ulid": "^2.3.0", "uncrypto": "^0.1.3", "uuid": "^9.0.0", "ws": "^8.11.0" }, "peerDependencies": { "ai": "^4.2.0 || ^5.0.0", "zod": "^3.0.0 || ^4.0.0" }, "optionalPeers": ["ai"] }, "sha512-rq7XvY4jxCmWr6libN1egw8w0Bq0TWbbnAxCCXDScgWEszLauYmXy8WaVlJyxbwslVMHsvXP36JBFa3J3ay2yg=="], + "@trigger.dev/sdk": ["@trigger.dev/sdk@4.0.1", "", { "dependencies": { "@opentelemetry/api": "1.9.0", "@opentelemetry/semantic-conventions": "1.36.0", "@trigger.dev/core": "4.0.1", "chalk": "^5.2.0", "cronstrue": "^2.21.0", "debug": "^4.3.4", "evt": "^2.4.13", "slug": "^6.0.0", "ulid": "^2.3.0", "uncrypto": "^0.1.3", "uuid": "^9.0.0", "ws": "^8.11.0" }, "peerDependencies": { "ai": "^4.2.0 || ^5.0.0", "zod": "^3.0.0 || ^4.0.0" }, "optionalPeers": ["ai"] }, "sha512-cdEgrwIl2Kg2jd85dA4tdePPPe+iMjAGX0Q8QrO2CNo/iBcjl7jB7uzvmSjDKYmJoC+8a30fCWviYy6ljOs1oQ=="], "@tweenjs/tween.js": ["@tweenjs/tween.js@23.1.3", "", {}, "sha512-vJmvvwFxYuGnF2axRtPYocag6Clbb5YS7kLL+SO/TeVFzHqDIWrNKYtcsPMibjDx9O+bu+psAy9NKfWklassUA=="], @@ -2304,8 +2304,6 @@ "lodash.defaults": ["lodash.defaults@4.2.0", "", {}, "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ=="], - "lodash.get": ["lodash.get@4.4.2", "", {}, "sha512-z+Uw/vLuy6gQe8cfaFWD7p0wVv8fJl3mbzXh33RS+0oW2wvUqiRXiQ69gLWSLpgB5/6sU+r6BlQR0MBILadqTQ=="], - "lodash.isarguments": ["lodash.isarguments@3.1.0", "", {}, "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg=="], "lodash.merge": ["lodash.merge@4.6.2", "", {}, "sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ=="],