diff --git a/apps/docs/content/docs/tools/knowledge.mdx b/apps/docs/content/docs/tools/knowledge.mdx new file mode 100644 index 0000000000..5794464c59 --- /dev/null +++ b/apps/docs/content/docs/tools/knowledge.mdx @@ -0,0 +1,108 @@ +--- +title: Knowledge +description: Search knowledge +--- + +import { BlockInfoCard } from "@/components/ui/block-info-card" + + + + + + + + + `} +/> + +{/* MANUAL-CONTENT-START:intro */} +Sim Studio's Knowledge Base is a powerful native feature that enables you to create, manage, and query custom knowledge bases directly within the platform. Using advanced AI embeddings and vector search technology, the Knowledge Base block allows you to build intelligent search capabilities into your workflows, making it easy to find and utilize relevant information across your organization. + +The Knowledge Base system provides a comprehensive solution for managing organizational knowledge through its flexible and scalable architecture. With its built-in vector search capabilities, teams can perform semantic searches that understand meaning and context, going beyond traditional keyword matching. + +Key features of the Knowledge Base include: + +- Semantic Search: Advanced AI-powered search that understands meaning and context, not just keywords +- Vector Embeddings: Automatic conversion of text into high-dimensional vectors for intelligent similarity matching +- Custom Knowledge Bases: Create and manage multiple knowledge bases for different purposes or departments +- Flexible Content Types: Support for various document formats and content types +- Real-time Updates: Immediate indexing of new content for instant searchability + +In Sim Studio, the Knowledge Base block enables your agents to perform intelligent semantic searches across your custom knowledge bases. This creates opportunities for automated information retrieval, content recommendations, and knowledge discovery as part of your AI workflows. The integration allows agents to search and retrieve relevant information programmatically, facilitating automated knowledge management tasks and ensuring that important information is easily accessible. By leveraging the Knowledge Base block, you can build intelligent agents that enhance information discovery while automating routine knowledge management tasks, improving team efficiency and ensuring consistent access to organizational knowledge. +{/* MANUAL-CONTENT-END */} + +## Usage Instructions + +Perform semantic vector search across your knowledge base to find the most relevant content. Uses advanced AI embeddings to understand meaning and context, returning the most similar documents to your search query. + + + +## Tools + +### `knowledge_search` + +Search for similar content in a knowledge base using vector similarity + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `knowledgeBaseId` | string | Yes | ID of the knowledge base to search in | +| `query` | string | Yes | Search query text | +| `topK` | number | No | Number of most similar results to return \(1-100\) | + +#### Output + +| Parameter | Type | +| --------- | ---- | +| `results` | string | +| `query` | string | +| `knowledgeBaseId` | string | +| `topK` | string | +| `totalResults` | string | +| `message` | string | + + + +## Block Configuration + +### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `knowledgeBaseId` | string | Yes | Knowledge Base - Select knowledge base | + + + +### Outputs + +| Output | Type | Description | +| ------ | ---- | ----------- | +| `response` | object | Output from response | +| ↳ `results` | json | results of the response | +| ↳ `query` | string | query of the response | +| ↳ `knowledgeBaseId` | string | knowledgeBaseId of the response | +| ↳ `topK` | number | topK of the response | +| ↳ `totalResults` | number | totalResults of the response | +| ↳ `message` | string | message of the response | + + +## Notes + +- Category: `blocks` +- Type: `knowledge` diff --git a/apps/docs/content/docs/tools/meta.json b/apps/docs/content/docs/tools/meta.json index 4569ab2165..edada9adba 100644 --- a/apps/docs/content/docs/tools/meta.json +++ b/apps/docs/content/docs/tools/meta.json @@ -21,6 +21,7 @@ "image_generator", "jina", "jira", + "knowledge", "linear", "linkup", "mem0", diff --git a/apps/docs/content/docs/tools/microsoft_teams.mdx b/apps/docs/content/docs/tools/microsoft_teams.mdx index 3e15fd5a51..d5845ed836 100644 --- a/apps/docs/content/docs/tools/microsoft_teams.mdx +++ b/apps/docs/content/docs/tools/microsoft_teams.mdx @@ -64,6 +64,9 @@ import { BlockInfoCard } from "@/components/ui/block-info-card" + + + @@ -464,8 +464,8 @@ export default function ContributorsPage() { ) }} - height={50} - className='sm:!h-[60px] text-neutral-400' + height={60} + className='text-neutral-400' /> } @@ -95,14 +37,13 @@ export async function GET( session.user.id ) - if (accessCheck.notFound) { - logger.warn( - `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}, Chunk=${chunkId}` - ) - return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) - } - if (!accessCheck.hasAccess) { + if (accessCheck.notFound) { + logger.warn( + `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}, Chunk=${chunkId}` + ) + return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) + } logger.warn( `[${requestId}] User ${session.user.id} attempted unauthorized chunk access: ${accessCheck.reason}` ) @@ -144,14 +85,13 @@ export async function PUT( session.user.id ) - if (accessCheck.notFound) { - logger.warn( - `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}, Chunk=${chunkId}` - ) - return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) - } - if (!accessCheck.hasAccess) { + if (accessCheck.notFound) { + logger.warn( + `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}, Chunk=${chunkId}` + ) + return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) + } logger.warn( `[${requestId}] User ${session.user.id} attempted unauthorized chunk update: ${accessCheck.reason}` ) @@ -235,14 +175,13 @@ export async function DELETE( session.user.id ) - if (accessCheck.notFound) { - logger.warn( - `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}, Chunk=${chunkId}` - ) - return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) - } - if (!accessCheck.hasAccess) { + if (accessCheck.notFound) { + logger.warn( + `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}, Chunk=${chunkId}` + ) + return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) + } logger.warn( `[${requestId}] User ${session.user.id} attempted unauthorized chunk deletion: ${accessCheck.reason}` ) diff --git a/apps/sim/app/api/knowledge/[id]/documents/[documentId]/chunks/route.ts b/apps/sim/app/api/knowledge/[id]/documents/[documentId]/chunks/route.ts index 0bf7ebc40c..465bb9c925 100644 --- a/apps/sim/app/api/knowledge/[id]/documents/[documentId]/chunks/route.ts +++ b/apps/sim/app/api/knowledge/[id]/documents/[documentId]/chunks/route.ts @@ -1,10 +1,11 @@ -import { and, asc, eq, ilike, isNull, sql } from 'drizzle-orm' +import { and, asc, eq, ilike, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' import { createLogger } from '@/lib/logs/console-logger' import { db } from '@/db' -import { document, embedding, knowledgeBase } from '@/db/schema' +import { embedding } from '@/db/schema' +import { checkDocumentAccess } from '../../../../utils' const logger = createLogger('DocumentChunksAPI') @@ -16,48 +17,6 @@ const GetChunksQuerySchema = z.object({ offset: z.coerce.number().min(0).optional().default(0), }) -async function checkDocumentAccess(knowledgeBaseId: string, documentId: string, userId: string) { - // First check knowledge base access - const kb = await db - .select({ - id: knowledgeBase.id, - userId: knowledgeBase.userId, - }) - .from(knowledgeBase) - .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) - .limit(1) - - if (kb.length === 0) { - return { hasAccess: false, notFound: true, reason: 'Knowledge base not found' } - } - - const kbData = kb[0] - - // Check if user owns the knowledge base - if (kbData.userId !== userId) { - return { hasAccess: false, reason: 'Unauthorized knowledge base access' } - } - - // Now check if document exists and belongs to the knowledge base - const doc = await db - .select() - .from(document) - .where( - and( - eq(document.id, documentId), - eq(document.knowledgeBaseId, knowledgeBaseId), - isNull(document.deletedAt) - ) - ) - .limit(1) - - if (doc.length === 0) { - return { hasAccess: false, notFound: true, reason: 'Document not found' } - } - - return { hasAccess: true, document: doc[0], knowledgeBase: kbData } -} - export async function GET( req: NextRequest, { params }: { params: Promise<{ id: string; documentId: string }> } @@ -74,18 +33,42 @@ export async function GET( const accessCheck = await checkDocumentAccess(knowledgeBaseId, documentId, session.user.id) - if (accessCheck.notFound) { - logger.warn(`[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}`) - return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) - } - if (!accessCheck.hasAccess) { + if (accessCheck.notFound) { + logger.warn( + `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}` + ) + return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) + } logger.warn( `[${requestId}] User ${session.user.id} attempted unauthorized chunks access: ${accessCheck.reason}` ) return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) } + // Check if document processing is completed + const doc = accessCheck.document + if (!doc) { + logger.warn( + `[${requestId}] Document data not available: KB=${knowledgeBaseId}, Doc=${documentId}` + ) + return NextResponse.json({ error: 'Document not found' }, { status: 404 }) + } + + if (doc.processingStatus !== 'completed') { + logger.warn( + `[${requestId}] Document ${documentId} is not ready for chunk access (status: ${doc.processingStatus})` + ) + return NextResponse.json( + { + error: 'Document is not ready for access', + details: `Document status: ${doc.processingStatus}`, + retryAfter: doc.processingStatus === 'processing' ? 5 : null, + }, + { status: 400 } + ) + } + // Parse query parameters const { searchParams } = new URL(req.url) const queryParams = GetChunksQuerySchema.parse({ diff --git a/apps/sim/app/api/knowledge/[id]/documents/[documentId]/retry/route.ts b/apps/sim/app/api/knowledge/[id]/documents/[documentId]/retry/route.ts new file mode 100644 index 0000000000..df113acb26 --- /dev/null +++ b/apps/sim/app/api/knowledge/[id]/documents/[documentId]/retry/route.ts @@ -0,0 +1,101 @@ +import { eq } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { getSession } from '@/lib/auth' +import { createLogger } from '@/lib/logs/console-logger' +import { db } from '@/db' +import { document, embedding } from '@/db/schema' +import { checkDocumentAccess, processDocumentAsync } from '../../../../utils' + +const logger = createLogger('DocumentRetryAPI') + +export async function POST( + req: NextRequest, + { params }: { params: Promise<{ id: string; documentId: string }> } +) { + const requestId = crypto.randomUUID().slice(0, 8) + const { id: knowledgeBaseId, documentId } = await params + + try { + const session = await getSession() + if (!session?.user?.id) { + logger.warn(`[${requestId}] Unauthorized document retry attempt`) + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const accessCheck = await checkDocumentAccess(knowledgeBaseId, documentId, session.user.id) + + if (!accessCheck.hasAccess) { + if (accessCheck.notFound) { + logger.warn( + `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}` + ) + return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) + } + logger.warn( + `[${requestId}] User ${session.user.id} attempted unauthorized document retry: ${accessCheck.reason}` + ) + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const doc = accessCheck.document + + if (doc.processingStatus !== 'failed') { + logger.warn( + `[${requestId}] Document ${documentId} is not in failed state (current: ${doc.processingStatus})` + ) + return NextResponse.json({ error: 'Document is not in failed state' }, { status: 400 }) + } + + await db.transaction(async (tx) => { + await tx.delete(embedding).where(eq(embedding.documentId, documentId)) + + await tx + .update(document) + .set({ + processingStatus: 'pending', + processingStartedAt: null, + processingCompletedAt: null, + processingError: null, + chunkCount: 0, + tokenCount: 0, + characterCount: 0, + }) + .where(eq(document.id, documentId)) + }) + + const processingOptions = { + chunkSize: 1024, + minCharactersPerChunk: 24, + recipe: 'default', + lang: 'en', + } + + const docData = { + filename: doc.filename, + fileUrl: doc.fileUrl, + fileSize: doc.fileSize, + mimeType: doc.mimeType, + fileHash: doc.fileHash, + } + + processDocumentAsync(knowledgeBaseId, documentId, docData, processingOptions).catch( + (error: unknown) => { + logger.error(`[${requestId}] Background retry processing error:`, error) + } + ) + + logger.info(`[${requestId}] Document retry initiated: ${documentId}`) + + return NextResponse.json({ + success: true, + data: { + documentId, + status: 'pending', + message: 'Document retry processing started', + }, + }) + } catch (error) { + logger.error(`[${requestId}] Error retrying document processing`, error) + return NextResponse.json({ error: 'Failed to retry document processing' }, { status: 500 }) + } +} diff --git a/apps/sim/app/api/knowledge/[id]/documents/[documentId]/route.ts b/apps/sim/app/api/knowledge/[id]/documents/[documentId]/route.ts index 7367e891e0..a709edaafa 100644 --- a/apps/sim/app/api/knowledge/[id]/documents/[documentId]/route.ts +++ b/apps/sim/app/api/knowledge/[id]/documents/[documentId]/route.ts @@ -1,14 +1,14 @@ -import { and, eq, isNull } from 'drizzle-orm' +import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' import { createLogger } from '@/lib/logs/console-logger' import { db } from '@/db' -import { document, knowledgeBase } from '@/db/schema' +import { document } from '@/db/schema' +import { checkDocumentAccess } from '../../../utils' const logger = createLogger('DocumentByIdAPI') -// Schema for document updates const UpdateDocumentSchema = z.object({ filename: z.string().min(1, 'Filename is required').optional(), enabled: z.boolean().optional(), @@ -17,48 +17,6 @@ const UpdateDocumentSchema = z.object({ characterCount: z.number().min(0).optional(), }) -async function checkDocumentAccess(knowledgeBaseId: string, documentId: string, userId: string) { - // First check knowledge base access - const kb = await db - .select({ - id: knowledgeBase.id, - userId: knowledgeBase.userId, - }) - .from(knowledgeBase) - .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) - .limit(1) - - if (kb.length === 0) { - return { hasAccess: false, notFound: true, reason: 'Knowledge base not found' } - } - - const kbData = kb[0] - - // Check if user owns the knowledge base - if (kbData.userId !== userId) { - return { hasAccess: false, reason: 'Unauthorized knowledge base access' } - } - - // Now check if document exists and belongs to the knowledge base - const doc = await db - .select() - .from(document) - .where( - and( - eq(document.id, documentId), - eq(document.knowledgeBaseId, knowledgeBaseId), - isNull(document.deletedAt) - ) - ) - .limit(1) - - if (doc.length === 0) { - return { hasAccess: false, notFound: true, reason: 'Document not found' } - } - - return { hasAccess: true, document: doc[0], knowledgeBase: kbData } -} - export async function GET( req: NextRequest, { params }: { params: Promise<{ id: string; documentId: string }> } @@ -75,12 +33,13 @@ export async function GET( const accessCheck = await checkDocumentAccess(knowledgeBaseId, documentId, session.user.id) - if (accessCheck.notFound) { - logger.warn(`[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}`) - return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) - } - if (!accessCheck.hasAccess) { + if (accessCheck.notFound) { + logger.warn( + `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}` + ) + return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) + } logger.warn( `[${requestId}] User ${session.user.id} attempted unauthorized document access: ${accessCheck.reason}` ) @@ -117,12 +76,13 @@ export async function PUT( const accessCheck = await checkDocumentAccess(knowledgeBaseId, documentId, session.user.id) - if (accessCheck.notFound) { - logger.warn(`[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}`) - return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) - } - if (!accessCheck.hasAccess) { + if (accessCheck.notFound) { + logger.warn( + `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}` + ) + return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) + } logger.warn( `[${requestId}] User ${session.user.id} attempted unauthorized document update: ${accessCheck.reason}` ) @@ -164,6 +124,7 @@ export async function PUT( if (validationError instanceof z.ZodError) { logger.warn(`[${requestId}] Invalid document update data`, { errors: validationError.errors, + documentId, }) return NextResponse.json( { error: 'Invalid request data', details: validationError.errors }, @@ -173,7 +134,7 @@ export async function PUT( throw validationError } } catch (error) { - logger.error(`[${requestId}] Error updating document`, error) + logger.error(`[${requestId}] Error updating document ${documentId}`, error) return NextResponse.json({ error: 'Failed to update document' }, { status: 500 }) } } @@ -194,12 +155,13 @@ export async function DELETE( const accessCheck = await checkDocumentAccess(knowledgeBaseId, documentId, session.user.id) - if (accessCheck.notFound) { - logger.warn(`[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}`) - return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) - } - if (!accessCheck.hasAccess) { + if (accessCheck.notFound) { + logger.warn( + `[${requestId}] ${accessCheck.reason}: KB=${knowledgeBaseId}, Doc=${documentId}` + ) + return NextResponse.json({ error: accessCheck.reason }, { status: 404 }) + } logger.warn( `[${requestId}] User ${session.user.id} attempted unauthorized document deletion: ${accessCheck.reason}` ) diff --git a/apps/sim/app/api/knowledge/[id]/documents/route.ts b/apps/sim/app/api/knowledge/[id]/documents/route.ts index d1453cd7b1..75eee293a7 100644 --- a/apps/sim/app/api/knowledge/[id]/documents/route.ts +++ b/apps/sim/app/api/knowledge/[id]/documents/route.ts @@ -4,11 +4,11 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { createLogger } from '@/lib/logs/console-logger' import { db } from '@/db' -import { document, knowledgeBase } from '@/db/schema' +import { document } from '@/db/schema' +import { checkKnowledgeBaseAccess } from '../../utils' const logger = createLogger('DocumentsAPI') -// Schema for document creation const CreateDocumentSchema = z.object({ filename: z.string().min(1, 'Filename is required'), fileUrl: z.string().url('File URL must be valid'), @@ -17,30 +17,6 @@ const CreateDocumentSchema = z.object({ fileHash: z.string().optional(), }) -async function checkKnowledgeBaseAccess(knowledgeBaseId: string, userId: string) { - const kb = await db - .select({ - id: knowledgeBase.id, - userId: knowledgeBase.userId, - }) - .from(knowledgeBase) - .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) - .limit(1) - - if (kb.length === 0) { - return { hasAccess: false, notFound: true } - } - - const kbData = kb[0] - - // Check if user owns the knowledge base - if (kbData.userId === userId) { - return { hasAccess: true, knowledgeBase: kbData } - } - - return { hasAccess: false, knowledgeBase: kbData } -} - export async function GET(req: NextRequest, { params }: { params: Promise<{ id: string }> }) { const requestId = crypto.randomUUID().slice(0, 8) const { id: knowledgeBaseId } = await params @@ -54,12 +30,11 @@ export async function GET(req: NextRequest, { params }: { params: Promise<{ id: const accessCheck = await checkKnowledgeBaseAccess(knowledgeBaseId, session.user.id) - if (accessCheck.notFound) { - logger.warn(`[${requestId}] Knowledge base not found: ${knowledgeBaseId}`) - return NextResponse.json({ error: 'Knowledge base not found' }, { status: 404 }) - } - if (!accessCheck.hasAccess) { + if ('notFound' in accessCheck && accessCheck.notFound) { + logger.warn(`[${requestId}] Knowledge base not found: ${knowledgeBaseId}`) + return NextResponse.json({ error: 'Knowledge base not found' }, { status: 404 }) + } logger.warn( `[${requestId}] User ${session.user.id} attempted to access unauthorized knowledge base documents ${knowledgeBaseId}` ) @@ -92,6 +67,10 @@ export async function GET(req: NextRequest, { params }: { params: Promise<{ id: chunkCount: document.chunkCount, tokenCount: document.tokenCount, characterCount: document.characterCount, + processingStatus: document.processingStatus, + processingStartedAt: document.processingStartedAt, + processingCompletedAt: document.processingCompletedAt, + processingError: document.processingError, enabled: document.enabled, uploadedAt: document.uploadedAt, }) @@ -126,12 +105,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const accessCheck = await checkKnowledgeBaseAccess(knowledgeBaseId, session.user.id) - if (accessCheck.notFound) { - logger.warn(`[${requestId}] Knowledge base not found: ${knowledgeBaseId}`) - return NextResponse.json({ error: 'Knowledge base not found' }, { status: 404 }) - } - if (!accessCheck.hasAccess) { + if ('notFound' in accessCheck && accessCheck.notFound) { + logger.warn(`[${requestId}] Knowledge base not found: ${knowledgeBaseId}`) + return NextResponse.json({ error: 'Knowledge base not found' }, { status: 404 }) + } logger.warn( `[${requestId}] User ${session.user.id} attempted to create document in unauthorized knowledge base ${knowledgeBaseId}` ) diff --git a/apps/sim/app/api/knowledge/[id]/process-documents/route.ts b/apps/sim/app/api/knowledge/[id]/process-documents/route.ts index 73521a7d47..d766720b6e 100644 --- a/apps/sim/app/api/knowledge/[id]/process-documents/route.ts +++ b/apps/sim/app/api/knowledge/[id]/process-documents/route.ts @@ -1,291 +1,173 @@ -import { and, eq, isNull } from 'drizzle-orm' +import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' -import { type ProcessedDocument, processDocuments } from '@/lib/document-processor' -import { env } from '@/lib/env' import { createLogger } from '@/lib/logs/console-logger' import { db } from '@/db' -import { document, embedding, knowledgeBase } from '@/db/schema' +import { document } from '@/db/schema' +import { checkKnowledgeBaseAccess, processDocumentAsync } from '../../utils' const logger = createLogger('ProcessDocumentsAPI') -// Schema for document processing request const ProcessDocumentsSchema = z.object({ - documents: z - .array( - z.object({ - filename: z.string().min(1, 'Filename is required'), - fileUrl: z.string().url('File URL must be valid'), - fileSize: z.number().min(1, 'File size must be greater than 0'), - mimeType: z.string().min(1, 'MIME type is required'), - fileHash: z.string().optional(), - }) - ) - .min(1, 'At least one document is required'), - processingOptions: z - .object({ - chunkSize: z.number().min(100).max(2048).default(512), - minCharactersPerChunk: z.number().min(10).max(1000).default(24), - recipe: z.string().default('default'), - lang: z.string().default('en'), + documents: z.array( + z.object({ + filename: z.string().min(1, 'Filename is required'), + fileUrl: z.string().url('File URL must be valid'), + fileSize: z.number().min(1, 'File size must be greater than 0'), + mimeType: z.string().min(1, 'MIME type is required'), + fileHash: z.string().optional(), }) - .optional(), + ), + processingOptions: z.object({ + chunkSize: z.number(), + minCharactersPerChunk: z.number(), + recipe: z.string(), + lang: z.string(), + }), }) -async function checkKnowledgeBaseAccess(knowledgeBaseId: string, userId: string) { - const kb = await db - .select({ - id: knowledgeBase.id, - userId: knowledgeBase.userId, - chunkingConfig: knowledgeBase.chunkingConfig, - }) - .from(knowledgeBase) - .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) - .limit(1) - - if (kb.length === 0) { - return { hasAccess: false, notFound: true } - } - - const kbData = kb[0] - - // Check if user owns the knowledge base - if (kbData.userId === userId) { - return { hasAccess: true, knowledgeBase: kbData } - } - - return { hasAccess: false, knowledgeBase: kbData } +const PROCESSING_CONFIG = { + maxConcurrentDocuments: 3, // Limit concurrent processing to prevent resource exhaustion + batchSize: 5, // Process documents in batches + delayBetweenBatches: 1000, // 1 second delay between batches + delayBetweenDocuments: 500, // 500ms delay between individual documents in a batch } -async function generateEmbeddings( - texts: string[], - embeddingModel = 'text-embedding-3-small' -): Promise { - const openaiApiKey = env.OPENAI_API_KEY - if (!openaiApiKey) { - throw new Error('OPENAI_API_KEY not configured') - } - - try { - // Batch process embeddings for efficiency - const batchSize = 100 // OpenAI allows up to 2048 inputs per request - const allEmbeddings: number[][] = [] - - for (let i = 0; i < texts.length; i += batchSize) { - const batch = texts.slice(i, i + batchSize) - - logger.info( - `Generating embeddings for batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(texts.length / batchSize)} (${batch.length} texts)` - ) - - // Make direct API call to OpenAI embeddings - const response = await fetch('https://api.openai.com/v1/embeddings', { - method: 'POST', - headers: { - Authorization: `Bearer ${openaiApiKey}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - input: batch, - model: embeddingModel, - encoding_format: 'float', - }), - }) - - if (!response.ok) { - const errorText = await response.text() - throw new Error( - `OpenAI API error: ${response.status} ${response.statusText} - ${errorText}` - ) - } - - const data = await response.json() - - if (!data.data || !Array.isArray(data.data)) { - throw new Error('Invalid response format from OpenAI embeddings API') - } - - // Extract embeddings from response - const batchEmbeddings = data.data.map((item: any) => item.embedding) - allEmbeddings.push(...batchEmbeddings) - } - - logger.info(`Successfully generated ${allEmbeddings.length} embeddings`) - return allEmbeddings - } catch (error) { - logger.error('Failed to generate embeddings:', error) - throw new Error( - `Embedding generation failed: ${error instanceof Error ? error.message : 'Unknown error'}` - ) - } -} - -async function saveProcessedDocuments( - knowledgeBaseId: string, - processedDocuments: ProcessedDocument[], - requestedDocuments: Array<{ +/** + * Process documents with concurrency control and batching + */ +async function processDocumentsWithConcurrencyControl( + createdDocuments: Array<{ + documentId: string filename: string fileUrl: string fileSize: number mimeType: string fileHash?: string - }> -) { - const now = new Date() - const results: Array<{ + }>, + knowledgeBaseId: string, + processingOptions: any, + requestId: string +): Promise { + const totalDocuments = createdDocuments.length + const batches = [] + + // Create batches + for (let i = 0; i < totalDocuments; i += PROCESSING_CONFIG.batchSize) { + batches.push(createdDocuments.slice(i, i + PROCESSING_CONFIG.batchSize)) + } + + logger.info(`[${requestId}] Processing ${totalDocuments} documents in ${batches.length} batches`) + + for (const [batchIndex, batch] of batches.entries()) { + logger.info( + `[${requestId}] Starting batch ${batchIndex + 1}/${batches.length} with ${batch.length} documents` + ) + + // Process batch with limited concurrency + await processBatchWithConcurrency(batch, knowledgeBaseId, processingOptions, requestId) + + // Add delay between batches (except for the last batch) + if (batchIndex < batches.length - 1) { + await new Promise((resolve) => setTimeout(resolve, PROCESSING_CONFIG.delayBetweenBatches)) + } + } + + logger.info(`[${requestId}] Completed processing initiation for all ${totalDocuments} documents`) +} + +/** + * Process a batch of documents with controlled concurrency + */ +async function processBatchWithConcurrency( + batch: Array<{ documentId: string - chunkCount: number - success: boolean - error?: string - }> = [] + filename: string + fileUrl: string + fileSize: number + mimeType: string + fileHash?: string + }>, + knowledgeBaseId: string, + processingOptions: any, + requestId: string +): Promise { + const semaphore = new Array(PROCESSING_CONFIG.maxConcurrentDocuments).fill(0) + const processingPromises = batch.map(async (doc, index) => { + // Add staggered delay to prevent overwhelming the system + if (index > 0) { + await new Promise((resolve) => + setTimeout(resolve, index * PROCESSING_CONFIG.delayBetweenDocuments) + ) + } - // Collect all chunk texts for batch embedding generation - const allChunkTexts: string[] = [] - const chunkMapping: Array<{ docIndex: number; chunkIndex: number }> = [] - - processedDocuments.forEach((processed, docIndex) => { - processed.chunks.forEach((chunk, chunkIndex) => { - allChunkTexts.push(chunk.text) - chunkMapping.push({ docIndex, chunkIndex }) + // Wait for available slot + await new Promise((resolve) => { + const checkSlot = () => { + const availableIndex = semaphore.findIndex((slot) => slot === 0) + if (availableIndex !== -1) { + semaphore[availableIndex] = 1 + resolve() + } else { + setTimeout(checkSlot, 100) + } + } + checkSlot() }) + + try { + logger.info(`[${requestId}] Starting processing for document: ${doc.filename}`) + + await processDocumentAsync( + knowledgeBaseId, + doc.documentId, + { + filename: doc.filename, + fileUrl: doc.fileUrl, + fileSize: doc.fileSize, + mimeType: doc.mimeType, + fileHash: doc.fileHash, + }, + processingOptions + ) + + logger.info(`[${requestId}] Successfully initiated processing for document: ${doc.filename}`) + } catch (error: unknown) { + logger.error(`[${requestId}] Failed to process document: ${doc.filename}`, { + documentId: doc.documentId, + filename: doc.filename, + fileSize: doc.fileSize, + mimeType: doc.mimeType, + error: error instanceof Error ? error.message : 'Unknown error', + stack: error instanceof Error ? error.stack : undefined, + }) + + try { + await db + .update(document) + .set({ + processingStatus: 'failed', + processingError: + error instanceof Error ? error.message : 'Failed to initiate processing', + processingCompletedAt: new Date(), + }) + .where(eq(document.id, doc.documentId)) + } catch (dbError: unknown) { + logger.error( + `[${requestId}] Failed to update document status for failed document: ${doc.documentId}`, + dbError + ) + } + } finally { + const slotIndex = semaphore.findIndex((slot) => slot === 1) + if (slotIndex !== -1) { + semaphore[slotIndex] = 0 + } + } }) - // Generate embeddings for all chunks at once - let allEmbeddings: number[][] = [] - if (allChunkTexts.length > 0) { - try { - logger.info( - `Generating embeddings for ${allChunkTexts.length} chunks across ${processedDocuments.length} documents` - ) - allEmbeddings = await generateEmbeddings(allChunkTexts, 'text-embedding-3-small') - logger.info(`Successfully generated ${allEmbeddings.length} embeddings`) - } catch (error) { - logger.error('Failed to generate embeddings for chunks:', error) - // Continue without embeddings rather than failing completely - allEmbeddings = [] - } - } - - for (let i = 0; i < processedDocuments.length; i++) { - const processed = processedDocuments[i] - const original = requestedDocuments.find((doc) => doc.filename === processed.metadata.filename) - - if (!original) { - results.push({ - documentId: '', - chunkCount: 0, - success: false, - error: `Original document data not found for ${processed.metadata.filename}`, - }) - continue - } - - try { - // Check for duplicate file hash if provided - if (original.fileHash) { - const existingDocument = await db - .select({ id: document.id }) - .from(document) - .where( - and( - eq(document.knowledgeBaseId, knowledgeBaseId), - eq(document.fileHash, original.fileHash), - isNull(document.deletedAt) - ) - ) - .limit(1) - - if (existingDocument.length > 0) { - results.push({ - documentId: existingDocument[0].id, - chunkCount: 0, - success: false, - error: 'Document with this file hash already exists', - }) - continue - } - } - - // Insert document record - const documentId = crypto.randomUUID() - const newDocument = { - id: documentId, - knowledgeBaseId, - filename: original.filename, - fileUrl: processed.metadata.s3Url || original.fileUrl, - fileSize: original.fileSize, - mimeType: original.mimeType, - fileHash: original.fileHash || null, - chunkCount: processed.metadata.chunkCount, - tokenCount: processed.metadata.tokenCount, - characterCount: processed.metadata.characterCount, - enabled: true, - uploadedAt: now, - } - - await db.insert(document).values(newDocument) - - // Insert embedding records for chunks with generated embeddings - const embeddingRecords = processed.chunks.map((chunk, chunkIndex) => { - // Find the corresponding embedding for this chunk - const globalChunkIndex = chunkMapping.findIndex( - (mapping) => mapping.docIndex === i && mapping.chunkIndex === chunkIndex - ) - const embedding = - globalChunkIndex >= 0 && globalChunkIndex < allEmbeddings.length - ? allEmbeddings[globalChunkIndex] - : null - - return { - id: crypto.randomUUID(), - knowledgeBaseId, - documentId, - chunkIndex: chunkIndex, - chunkHash: crypto.randomUUID(), // Generate a hash for the chunk - content: chunk.text, - contentLength: chunk.text.length, - tokenCount: Math.ceil(chunk.text.length / 4), // Rough token estimation - embedding: embedding, // Store the generated OpenAI embedding - embeddingModel: 'text-embedding-3-small', - startOffset: chunk.startIndex || 0, - endOffset: chunk.endIndex || chunk.text.length, - overlapTokens: 0, - metadata: {}, - searchRank: '1.0', - accessCount: 0, - lastAccessedAt: null, - qualityScore: null, - createdAt: now, - updatedAt: now, - } - }) - - if (embeddingRecords.length > 0) { - await db.insert(embedding).values(embeddingRecords) - } - - results.push({ - documentId, - chunkCount: processed.metadata.chunkCount, - success: true, - }) - - logger.info( - `Document processed and saved: ${documentId} with ${processed.metadata.chunkCount} chunks and ${embeddingRecords.filter((r) => r.embedding).length} embeddings` - ) - } catch (error) { - logger.error(`Failed to save processed document ${processed.metadata.filename}:`, error) - results.push({ - documentId: '', - chunkCount: 0, - success: false, - error: error instanceof Error ? error.message : 'Unknown error during save', - }) - } - } - - return results + await Promise.allSettled(processingPromises) } export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) { @@ -301,12 +183,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const accessCheck = await checkKnowledgeBaseAccess(knowledgeBaseId, session.user.id) - if (accessCheck.notFound) { - logger.warn(`[${requestId}] Knowledge base not found: ${knowledgeBaseId}`) - return NextResponse.json({ error: 'Knowledge base not found' }, { status: 404 }) - } - if (!accessCheck.hasAccess) { + if ('notFound' in accessCheck && accessCheck.notFound) { + logger.warn(`[${requestId}] Knowledge base not found: ${knowledgeBaseId}`) + return NextResponse.json({ error: 'Knowledge base not found' }, { status: 404 }) + } logger.warn( `[${requestId}] User ${session.user.id} attempted to process documents in unauthorized knowledge base ${knowledgeBaseId}` ) @@ -318,58 +199,67 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: try { const validatedData = ProcessDocumentsSchema.parse(body) - logger.info( - `[${requestId}] Starting processing of ${validatedData.documents.length} documents` - ) + const createdDocuments = await db.transaction(async (tx) => { + const documentPromises = validatedData.documents.map(async (docData) => { + const documentId = crypto.randomUUID() + const now = new Date() - // Get chunking config from knowledge base or use defaults - const kbChunkingConfig = accessCheck.knowledgeBase?.chunkingConfig as any - const processingOptions = { - knowledgeBaseId, - chunkSize: validatedData.processingOptions?.chunkSize || kbChunkingConfig?.maxSize || 512, - minCharactersPerChunk: - validatedData.processingOptions?.minCharactersPerChunk || kbChunkingConfig?.minSize || 24, - recipe: validatedData.processingOptions?.recipe || 'default', - lang: validatedData.processingOptions?.lang || 'en', - } + const newDocument = { + id: documentId, + knowledgeBaseId, + filename: docData.filename, + fileUrl: docData.fileUrl, + fileSize: docData.fileSize, + mimeType: docData.mimeType, + fileHash: docData.fileHash || null, + chunkCount: 0, + tokenCount: 0, + characterCount: 0, + processingStatus: 'pending' as const, + enabled: true, + uploadedAt: now, + } - // Process documents (parsing + chunking) - const processedDocuments = await processDocuments( - validatedData.documents.map((doc) => ({ - fileUrl: doc.fileUrl, - filename: doc.filename, - mimeType: doc.mimeType, - fileSize: doc.fileSize, - })), - processingOptions - ) + await tx.insert(document).values(newDocument) + return { documentId, ...docData } + }) - // Save processed documents and chunks to database - const saveResults = await saveProcessedDocuments( - knowledgeBaseId, - processedDocuments, - validatedData.documents - ) - - const successfulCount = saveResults.filter((r) => r.success).length - const totalChunks = saveResults.reduce((sum, r) => sum + r.chunkCount, 0) + return await Promise.all(documentPromises) + }) logger.info( - `[${requestId}] Document processing completed: ${successfulCount}/${validatedData.documents.length} documents, ${totalChunks} total chunks` + `[${requestId}] Starting controlled async processing of ${createdDocuments.length} documents` ) + processDocumentsWithConcurrencyControl( + createdDocuments, + knowledgeBaseId, + validatedData.processingOptions, + requestId + ).catch((error: unknown) => { + logger.error(`[${requestId}] Critical error in document processing pipeline:`, error) + }) + return NextResponse.json({ success: true, data: { - processed: successfulCount, - total: validatedData.documents.length, - totalChunks, - results: saveResults, + total: createdDocuments.length, + documentsCreated: createdDocuments.map((doc) => ({ + documentId: doc.documentId, + filename: doc.filename, + status: 'pending', + })), + processingMethod: 'background', + processingConfig: { + maxConcurrentDocuments: PROCESSING_CONFIG.maxConcurrentDocuments, + batchSize: PROCESSING_CONFIG.batchSize, + totalBatches: Math.ceil(createdDocuments.length / PROCESSING_CONFIG.batchSize), + }, }, }) } catch (validationError) { if (validationError instanceof z.ZodError) { - logger.warn(`[${requestId}] Invalid document processing data`, { + logger.warn(`[${requestId}] Invalid processing request data`, { errors: validationError.errors, }) return NextResponse.json( @@ -381,12 +271,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: } } catch (error) { logger.error(`[${requestId}] Error processing documents`, error) - return NextResponse.json( - { - error: 'Failed to process documents', - details: error instanceof Error ? error.message : 'Unknown error', - }, - { status: 500 } - ) + return NextResponse.json({ error: 'Failed to process documents' }, { status: 500 }) } } diff --git a/apps/sim/app/api/knowledge/route.ts b/apps/sim/app/api/knowledge/route.ts index 3094040cd5..3c6188963f 100644 --- a/apps/sim/app/api/knowledge/route.ts +++ b/apps/sim/app/api/knowledge/route.ts @@ -64,6 +64,15 @@ export async function GET(req: NextRequest) { .groupBy(knowledgeBase.id) .orderBy(knowledgeBase.createdAt) + // Debug logging + logger.info(`[${requestId}] Knowledge bases with counts:`, { + data: knowledgeBasesWithCounts.map((kb) => ({ + id: kb.id, + name: kb.name, + docCount: kb.docCount, + })), + }) + logger.info( `[${requestId}] Retrieved ${knowledgeBasesWithCounts.length} knowledge bases for user ${session.user.id}` ) @@ -106,6 +115,7 @@ export async function POST(req: NextRequest) { embeddingModel: validatedData.embeddingModel, embeddingDimension: validatedData.embeddingDimension, chunkingConfig: validatedData.chunkingConfig, + docCount: 0, createdAt: now, updatedAt: now, } diff --git a/apps/sim/app/api/knowledge/search/route.ts b/apps/sim/app/api/knowledge/search/route.ts index 4176d85980..c5b90aa3e7 100644 --- a/apps/sim/app/api/knowledge/search/route.ts +++ b/apps/sim/app/api/knowledge/search/route.ts @@ -2,6 +2,7 @@ import { and, eq, isNull, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { getSession } from '@/lib/auth' +import { retryWithExponentialBackoff } from '@/lib/documents/utils' import { env } from '@/lib/env' import { createLogger } from '@/lib/logs/console-logger' import { db } from '@/db' @@ -9,6 +10,16 @@ import { embedding, knowledgeBase } from '@/db/schema' const logger = createLogger('VectorSearchAPI') +class APIError extends Error { + public status: number + + constructor(message: string, status: number) { + super(message) + this.name = 'APIError' + this.status = status + } +} + // Schema for vector search request const VectorSearchSchema = z.object({ knowledgeBaseId: z.string().min(1, 'Knowledge base ID is required'), @@ -23,31 +34,45 @@ async function generateSearchEmbedding(query: string): Promise { } try { - const response = await fetch('https://api.openai.com/v1/embeddings', { - method: 'POST', - headers: { - Authorization: `Bearer ${openaiApiKey}`, - 'Content-Type': 'application/json', + return await retryWithExponentialBackoff( + async () => { + const response = await fetch('https://api.openai.com/v1/embeddings', { + method: 'POST', + headers: { + Authorization: `Bearer ${openaiApiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + input: query, + model: 'text-embedding-3-small', + encoding_format: 'float', + }), + }) + + if (!response.ok) { + const errorText = await response.text() + const error = new APIError( + `OpenAI API error: ${response.status} ${response.statusText} - ${errorText}`, + response.status + ) + throw error + } + + const data = await response.json() + + if (!data.data || !Array.isArray(data.data) || data.data.length === 0) { + throw new Error('Invalid response format from OpenAI embeddings API') + } + + return data.data[0].embedding }, - body: JSON.stringify({ - input: query, - model: 'text-embedding-3-small', - encoding_format: 'float', - }), - }) - - if (!response.ok) { - const errorText = await response.text() - throw new Error(`OpenAI API error: ${response.status} ${response.statusText} - ${errorText}`) - } - - const data = await response.json() - - if (!data.data || !Array.isArray(data.data) || data.data.length === 0) { - throw new Error('Invalid response format from OpenAI embeddings API') - } - - return data.data[0].embedding + { + maxRetries: 5, + initialDelayMs: 1000, + maxDelayMs: 30000, // Max 30 seconds delay for search queries + backoffMultiplier: 2, + } + ) } catch (error) { logger.error('Failed to generate search embedding:', error) throw new Error( diff --git a/apps/sim/app/api/knowledge/utils.ts b/apps/sim/app/api/knowledge/utils.ts new file mode 100644 index 0000000000..404cc6f8e2 --- /dev/null +++ b/apps/sim/app/api/knowledge/utils.ts @@ -0,0 +1,487 @@ +import crypto from 'crypto' +import { and, eq, isNull, sql } from 'drizzle-orm' +import { processDocuments } from '@/lib/documents/document-processor' +import { retryWithExponentialBackoff } from '@/lib/documents/utils' +import { env } from '@/lib/env' +import { createLogger } from '@/lib/logs/console-logger' +import { db } from '@/db' +import { document, embedding, knowledgeBase } from '@/db/schema' + +const logger = createLogger('KnowledgeUtils') + +class APIError extends Error { + public status: number + + constructor(message: string, status: number) { + super(message) + this.name = 'APIError' + this.status = status + } +} + +export interface KnowledgeBaseData { + id: string + userId: string + workspaceId?: string | null + name: string + description?: string | null + tokenCount: number + embeddingModel: string + embeddingDimension: number + chunkingConfig: unknown + deletedAt?: Date | null + createdAt: Date + updatedAt: Date +} + +export interface DocumentData { + id: string + knowledgeBaseId: string + filename: string + fileUrl: string + fileSize: number + mimeType: string + fileHash?: string | null + chunkCount: number + tokenCount: number + characterCount: number + processingStatus: string + processingStartedAt?: Date | null + processingCompletedAt?: Date | null + processingError?: string | null + enabled: boolean + deletedAt?: Date | null + uploadedAt: Date +} + +export interface EmbeddingData { + id: string + knowledgeBaseId: string + documentId: string + chunkIndex: number + chunkHash: string + content: string + contentLength: number + tokenCount: number + embedding?: number[] | null + embeddingModel: string + startOffset: number + endOffset: number + overlapTokens: number + metadata: unknown + searchRank?: string | null + accessCount: number + lastAccessedAt?: Date | null + qualityScore?: string | null + enabled: boolean + createdAt: Date + updatedAt: Date +} + +interface OpenAIEmbeddingResponse { + data: Array<{ + embedding: number[] + index: number + }> + model: string + usage: { + prompt_tokens: number + total_tokens: number + } +} + +export interface KnowledgeBaseAccessResult { + hasAccess: true + knowledgeBase: Pick +} + +export interface KnowledgeBaseAccessDenied { + hasAccess: false + notFound?: boolean + reason?: string +} + +export type KnowledgeBaseAccessCheck = KnowledgeBaseAccessResult | KnowledgeBaseAccessDenied + +export interface DocumentAccessResult { + hasAccess: true + document: DocumentData + knowledgeBase: Pick +} + +export interface DocumentAccessDenied { + hasAccess: false + notFound?: boolean + reason: string +} + +export type DocumentAccessCheck = DocumentAccessResult | DocumentAccessDenied + +export interface ChunkAccessResult { + hasAccess: true + chunk: EmbeddingData + document: DocumentData + knowledgeBase: Pick +} + +export interface ChunkAccessDenied { + hasAccess: false + notFound?: boolean + reason: string +} + +export type ChunkAccessCheck = ChunkAccessResult | ChunkAccessDenied + +/** + * Check if a user has access to a knowledge base + */ +export async function checkKnowledgeBaseAccess( + knowledgeBaseId: string, + userId: string +): Promise { + const kb = await db + .select({ + id: knowledgeBase.id, + userId: knowledgeBase.userId, + }) + .from(knowledgeBase) + .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) + .limit(1) + + if (kb.length === 0) { + return { hasAccess: false, notFound: true } + } + + const kbData = kb[0] + + if (kbData.userId === userId) { + return { hasAccess: true, knowledgeBase: kbData } + } + + return { hasAccess: false } +} + +/** + * Check if a user has access to a document within a knowledge base + */ +export async function checkDocumentAccess( + knowledgeBaseId: string, + documentId: string, + userId: string +): Promise { + const kb = await db + .select({ + id: knowledgeBase.id, + userId: knowledgeBase.userId, + }) + .from(knowledgeBase) + .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) + .limit(1) + + if (kb.length === 0) { + return { hasAccess: false, notFound: true, reason: 'Knowledge base not found' } + } + + const kbData = kb[0] + + if (kbData.userId !== userId) { + return { hasAccess: false, reason: 'Unauthorized knowledge base access' } + } + + const doc = await db + .select() + .from(document) + .where( + and( + eq(document.id, documentId), + eq(document.knowledgeBaseId, knowledgeBaseId), + isNull(document.deletedAt) + ) + ) + .limit(1) + + if (doc.length === 0) { + return { hasAccess: false, notFound: true, reason: 'Document not found' } + } + + return { hasAccess: true, document: doc[0] as DocumentData, knowledgeBase: kbData } +} + +/** + * Check if a user has access to a chunk within a document and knowledge base + */ +export async function checkChunkAccess( + knowledgeBaseId: string, + documentId: string, + chunkId: string, + userId: string +): Promise { + const kb = await db + .select({ + id: knowledgeBase.id, + userId: knowledgeBase.userId, + }) + .from(knowledgeBase) + .where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt))) + .limit(1) + + if (kb.length === 0) { + return { hasAccess: false, notFound: true, reason: 'Knowledge base not found' } + } + + const kbData = kb[0] + + if (kbData.userId !== userId) { + return { hasAccess: false, reason: 'Unauthorized knowledge base access' } + } + + const doc = await db + .select() + .from(document) + .where( + and( + eq(document.id, documentId), + eq(document.knowledgeBaseId, knowledgeBaseId), + isNull(document.deletedAt) + ) + ) + .limit(1) + + if (doc.length === 0) { + return { hasAccess: false, notFound: true, reason: 'Document not found' } + } + + const docData = doc[0] as DocumentData + + // Check if document processing is completed + if (docData.processingStatus !== 'completed') { + return { + hasAccess: false, + reason: `Document is not ready for access (status: ${docData.processingStatus})`, + } + } + + const chunk = await db + .select() + .from(embedding) + .where(and(eq(embedding.id, chunkId), eq(embedding.documentId, documentId))) + .limit(1) + + if (chunk.length === 0) { + return { hasAccess: false, notFound: true, reason: 'Chunk not found' } + } + + return { + hasAccess: true, + chunk: chunk[0] as EmbeddingData, + document: docData, + knowledgeBase: kbData, + } +} + +/** + * Generate embeddings using OpenAI API with retry logic for rate limiting + */ +export async function generateEmbeddings( + texts: string[], + embeddingModel = 'text-embedding-3-small' +): Promise { + const openaiApiKey = env.OPENAI_API_KEY + if (!openaiApiKey) { + throw new Error('OPENAI_API_KEY not configured') + } + + try { + const batchSize = 100 + const allEmbeddings: number[][] = [] + + for (let i = 0; i < texts.length; i += batchSize) { + const batch = texts.slice(i, i + batchSize) + + logger.info( + `Generating embeddings for batch ${Math.floor(i / batchSize) + 1} (${batch.length} texts)` + ) + + const batchEmbeddings = await retryWithExponentialBackoff( + async () => { + const response = await fetch('https://api.openai.com/v1/embeddings', { + method: 'POST', + headers: { + Authorization: `Bearer ${openaiApiKey}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + input: batch, + model: embeddingModel, + encoding_format: 'float', + }), + }) + + if (!response.ok) { + const errorText = await response.text() + const error = new APIError( + `OpenAI API error: ${response.status} ${response.statusText} - ${errorText}`, + response.status + ) + throw error + } + + const data: OpenAIEmbeddingResponse = await response.json() + return data.data.map((item) => item.embedding) + }, + { + maxRetries: 5, + initialDelayMs: 1000, + maxDelayMs: 60000, // Max 1 minute delay for embeddings + backoffMultiplier: 2, + } + ) + + allEmbeddings.push(...batchEmbeddings) + } + + return allEmbeddings + } catch (error) { + logger.error('Failed to generate embeddings:', error) + throw error + } +} + +/** + * Process a document asynchronously with full error handling + */ +export async function processDocumentAsync( + knowledgeBaseId: string, + documentId: string, + docData: { + filename: string + fileUrl: string + fileSize: number + mimeType: string + fileHash?: string | null + }, + processingOptions: { + chunkSize?: number + minCharactersPerChunk?: number + recipe?: string + lang?: string + } +): Promise { + const startTime = Date.now() + try { + logger.info(`[${documentId}] Starting document processing: ${docData.filename}`) + + // Set status to processing + await db + .update(document) + .set({ + processingStatus: 'processing', + processingStartedAt: new Date(), + processingError: null, // Clear any previous error + }) + .where(eq(document.id, documentId)) + + logger.info(`[${documentId}] Status updated to 'processing', starting document processor`) + + const processedDocuments = await processDocuments( + [ + { + fileUrl: docData.fileUrl, + filename: docData.filename, + mimeType: docData.mimeType, + fileSize: docData.fileSize, + }, + ], + { + knowledgeBaseId, + ...processingOptions, + } + ) + + if (processedDocuments.length === 0) { + throw new Error('No document was processed') + } + + const processed = processedDocuments[0] + const now = new Date() + + logger.info( + `[${documentId}] Document parsed successfully, generating embeddings for ${processed.chunks.length} chunks` + ) + + const chunkTexts = processed.chunks.map((chunk) => chunk.text) + const embeddings = chunkTexts.length > 0 ? await generateEmbeddings(chunkTexts) : [] + + logger.info(`[${documentId}] Embeddings generated, updating document record`) + + await db + .update(document) + .set({ + chunkCount: processed.metadata.chunkCount, + tokenCount: processed.metadata.tokenCount, + characterCount: processed.metadata.characterCount, + processingStatus: 'completed', + processingCompletedAt: now, + processingError: null, + }) + .where(eq(document.id, documentId)) + + const embeddingRecords = processed.chunks.map((chunk, chunkIndex) => ({ + id: crypto.randomUUID(), + knowledgeBaseId, + documentId, + chunkIndex, + chunkHash: crypto.createHash('sha256').update(chunk.text).digest('hex'), + content: chunk.text, + contentLength: chunk.text.length, + tokenCount: Math.ceil(chunk.text.length / 4), + embedding: embeddings[chunkIndex] || null, + embeddingModel: 'text-embedding-3-small', + startOffset: chunk.startIndex || 0, + endOffset: chunk.endIndex || chunk.text.length, + overlapTokens: 0, + metadata: {}, + searchRank: '1.0', + accessCount: 0, + lastAccessedAt: null, + qualityScore: null, + createdAt: now, + updatedAt: now, + })) + + if (embeddingRecords.length > 0) { + await db.insert(embedding).values(embeddingRecords) + } + + await db + .update(knowledgeBase) + .set({ + tokenCount: sql`${knowledgeBase.tokenCount} + ${processed.metadata.tokenCount}`, + updatedAt: now, + }) + .where(eq(knowledgeBase.id, knowledgeBaseId)) + + const processingTime = Date.now() - startTime + logger.info( + `[${documentId}] Successfully processed document with ${processed.metadata.chunkCount} chunks in ${processingTime}ms` + ) + } catch (error) { + const processingTime = Date.now() - startTime + logger.error(`[${documentId}] Failed to process document after ${processingTime}ms:`, { + error: error instanceof Error ? error.message : 'Unknown error', + stack: error instanceof Error ? error.stack : undefined, + filename: docData.filename, + fileUrl: docData.fileUrl, + mimeType: docData.mimeType, + }) + + await db + .update(document) + .set({ + processingStatus: 'failed', + processingError: error instanceof Error ? error.message : 'Unknown error', + processingCompletedAt: new Date(), + }) + .where(eq(document.id, documentId)) + } +} diff --git a/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/channel-selector/components/slack-channel-selector.tsx b/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/channel-selector/components/slack-channel-selector.tsx index 437a2aa60b..e5f72fa82f 100644 --- a/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/channel-selector/components/slack-channel-selector.tsx +++ b/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/channel-selector/components/slack-channel-selector.tsx @@ -44,7 +44,6 @@ export function SlackChannelSelector({ const fetchChannels = useCallback(async () => { if (!credential) return - const controller = new AbortController() setLoading(true) setError(null) @@ -53,7 +52,6 @@ export function SlackChannelSelector({ method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ credential }), - signal: controller.signal, }) if (!res.ok) throw new Error(`HTTP error! status: ${res.status}`) diff --git a/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/knowledge-base-selector/components/knowledge-base-selector.tsx b/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/knowledge-base-selector/components/knowledge-base-selector.tsx new file mode 100644 index 0000000000..22ff776f6b --- /dev/null +++ b/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/knowledge-base-selector/components/knowledge-base-selector.tsx @@ -0,0 +1,204 @@ +import { useCallback, useEffect, useState } from 'react' +import { Check, ChevronDown, RefreshCw } from 'lucide-react' +import { PackageSearchIcon } from '@/components/icons' +import { Button } from '@/components/ui/button' +import { + Command, + CommandEmpty, + CommandGroup, + CommandInput, + CommandItem, + CommandList, +} from '@/components/ui/command' +import { Popover, PopoverContent, PopoverTrigger } from '@/components/ui/popover' +import { type KnowledgeBaseData, useKnowledgeStore } from '@/stores/knowledge/knowledge' + +interface KnowledgeBaseSelectorProps { + value: string + onChange: (knowledgeBaseId: string, knowledgeBaseInfo?: KnowledgeBaseData) => void + label?: string + disabled?: boolean + isPreview?: boolean + previewValue?: string | null +} + +export function KnowledgeBaseSelector({ + value: propValue, + onChange, + label = 'Select knowledge base', + disabled = false, + isPreview = false, + previewValue, +}: KnowledgeBaseSelectorProps) { + const { getKnowledgeBasesList, knowledgeBasesList, loadingKnowledgeBasesList } = + useKnowledgeStore() + const [knowledgeBases, setKnowledgeBases] = useState([]) + const [loading, setLoading] = useState(false) + const [error, setError] = useState(null) + const [open, setOpen] = useState(false) + const [selectedKnowledgeBase, setSelectedKnowledgeBase] = useState(null) + const [initialFetchDone, setInitialFetchDone] = useState(false) + + // Use preview value when in preview mode, otherwise use prop value + const value = isPreview ? previewValue : propValue + + // Fetch knowledge bases + const fetchKnowledgeBases = useCallback(async () => { + setLoading(true) + setError(null) + + try { + const data = await getKnowledgeBasesList() + setKnowledgeBases(data) + setInitialFetchDone(true) + } catch (err) { + if ((err as Error).name === 'AbortError') return + setError((err as Error).message) + setKnowledgeBases([]) + } finally { + setLoading(false) + } + }, [getKnowledgeBasesList]) + + // Handle dropdown open/close - fetch knowledge bases when opening + const handleOpenChange = (isOpen: boolean) => { + if (isPreview) return + + setOpen(isOpen) + + // Only fetch knowledge bases when opening the dropdown if we haven't fetched yet + if (isOpen && (!initialFetchDone || knowledgeBasesList.length === 0)) { + fetchKnowledgeBases() + } + } + + // Sync selected knowledge base with value prop + useEffect(() => { + if (value && knowledgeBases.length > 0) { + const kbInfo = knowledgeBases.find((kb) => kb.id === value) + if (kbInfo) { + setSelectedKnowledgeBase(kbInfo) + } else { + setSelectedKnowledgeBase(null) + } + } else if (!value) { + setSelectedKnowledgeBase(null) + } + }, [value, knowledgeBases]) + + // Use cached data if available + useEffect(() => { + if (knowledgeBasesList.length > 0 && !initialFetchDone) { + setKnowledgeBases(knowledgeBasesList) + setInitialFetchDone(true) + } + }, [knowledgeBasesList, initialFetchDone]) + + // If we have a value but no knowledge base info and haven't fetched yet, fetch + useEffect(() => { + if (value && !selectedKnowledgeBase && !loading && !initialFetchDone && !isPreview) { + fetchKnowledgeBases() + } + }, [value, selectedKnowledgeBase, loading, initialFetchDone, fetchKnowledgeBases, isPreview]) + + const handleSelectKnowledgeBase = (knowledgeBase: KnowledgeBaseData) => { + if (isPreview) return + + setSelectedKnowledgeBase(knowledgeBase) + onChange(knowledgeBase.id, knowledgeBase) + setOpen(false) + } + + const formatKnowledgeBaseName = (knowledgeBase: KnowledgeBaseData) => { + return knowledgeBase.name + } + + const getKnowledgeBaseDescription = (knowledgeBase: KnowledgeBaseData) => { + const docCount = (knowledgeBase as any).docCount + if (docCount !== undefined) { + return `${docCount} document${docCount !== 1 ? 's' : ''}` + } + return knowledgeBase.description || 'No description' + } + + return ( + + + + + + + + + + {loading || loadingKnowledgeBasesList ? ( +
+ + Loading knowledge bases... +
+ ) : error ? ( +
+

{error}

+
+ ) : ( +
+

No knowledge bases found

+

+ Create a knowledge base to get started. +

+
+ )} +
+ + {knowledgeBases.length > 0 && ( + +
+ Knowledge Bases +
+ {knowledgeBases.map((knowledgeBase) => ( + handleSelectKnowledgeBase(knowledgeBase)} + className='cursor-pointer' + > +
+ +
+
+ {formatKnowledgeBaseName(knowledgeBase)} +
+
+ {getKnowledgeBaseDescription(knowledgeBase)} +
+
+
+ {knowledgeBase.id === value && } +
+ ))} +
+ )} +
+
+
+
+ ) +} diff --git a/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/knowledge-base-selector/knowledge-base-selector-input.tsx b/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/knowledge-base-selector/knowledge-base-selector-input.tsx new file mode 100644 index 0000000000..c300b6a037 --- /dev/null +++ b/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/knowledge-base-selector/knowledge-base-selector-input.tsx @@ -0,0 +1,65 @@ +'use client' + +import { useState } from 'react' +import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip' +import type { SubBlockConfig } from '@/blocks/types' +import type { KnowledgeBaseData } from '@/stores/knowledge/knowledge' +import { useSubBlockStore } from '@/stores/workflows/subblock/store' +import { KnowledgeBaseSelector } from './components/knowledge-base-selector' + +interface KnowledgeBaseSelectorInputProps { + blockId: string + subBlock: SubBlockConfig + disabled?: boolean + onKnowledgeBaseSelect?: (knowledgeBaseId: string) => void + isPreview?: boolean + previewValue?: string | null +} + +export function KnowledgeBaseSelectorInput({ + blockId, + subBlock, + disabled = false, + onKnowledgeBaseSelect, + isPreview = false, + previewValue, +}: KnowledgeBaseSelectorInputProps) { + const { getValue, setValue } = useSubBlockStore() + const [knowledgeBaseInfo, setKnowledgeBaseInfo] = useState(null) + + // Get the current value from the store + const storeValue = getValue(blockId, subBlock.id) + + // Handle knowledge base selection + const handleKnowledgeBaseChange = (knowledgeBaseId: string, info?: KnowledgeBaseData) => { + setKnowledgeBaseInfo(info || null) + if (!isPreview) { + setValue(blockId, subBlock.id, knowledgeBaseId) + } + onKnowledgeBaseSelect?.(knowledgeBaseId) + } + + return ( + + + +
+ { + handleKnowledgeBaseChange(knowledgeBaseId, knowledgeBaseInfo) + }} + label={subBlock.placeholder || 'Select knowledge base'} + disabled={disabled} + isPreview={isPreview} + previewValue={previewValue} + /> +
+
+ +

Select a knowledge base to search

+
+
+
+ ) +} diff --git a/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/sub-block.tsx b/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/sub-block.tsx index 23f4c0dda2..7396ff4ac2 100644 --- a/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/sub-block.tsx +++ b/apps/sim/app/w/[id]/components/workflow-block/components/sub-block/sub-block.tsx @@ -4,6 +4,7 @@ import { Tooltip, TooltipContent, TooltipTrigger } from '@/components/ui/tooltip import { getBlock } from '@/blocks/index' import type { SubBlockConfig } from '@/blocks/types' import { useWorkflowStore } from '@/stores/workflows/workflow/store' +import { ChannelSelectorInput } from './components/channel-selector/channel-selector-input' import { CheckboxList } from './components/checkbox-list' import { Code } from './components/code' import { ConditionInput } from './components/condition-input' @@ -14,6 +15,7 @@ import { EvalInput } from './components/eval-input' import { FileSelectorInput } from './components/file-selector/file-selector-input' import { FileUpload } from './components/file-upload' import { FolderSelectorInput } from './components/folder-selector/components/folder-selector-input' +import { KnowledgeBaseSelectorInput } from './components/knowledge-base-selector/knowledge-base-selector-input' import { LongInput } from './components/long-input' import { ProjectSelectorInput } from './components/project-selector/project-selector-input' import { ScheduleConfig } from './components/schedule/schedule-config' @@ -309,6 +311,16 @@ export function SubBlock({ previewValue={previewValue} /> ) + case 'knowledge-base-selector': + return ( + + ) case 'input-format': return ( ) + case 'channel-selector': + return ( + + ) default: return
Unknown input type: {config.type}
} diff --git a/apps/sim/app/w/knowledge/[id]/[documentId]/components/document-loading.tsx b/apps/sim/app/w/knowledge/[id]/[documentId]/components/document-loading.tsx index 0f53bde1aa..265c9dc8c8 100644 --- a/apps/sim/app/w/knowledge/[id]/[documentId]/components/document-loading.tsx +++ b/apps/sim/app/w/knowledge/[id]/[documentId]/components/document-loading.tsx @@ -1,8 +1,8 @@ 'use client' -import { LibraryBig, Search } from 'lucide-react' -import Link from 'next/link' +import { Search } from 'lucide-react' import { useSidebarStore } from '@/stores/sidebar/store' +import { KnowledgeHeader } from '../../../components/knowledge-header/knowledge-header' import { ChunkTableSkeleton } from '../../../components/skeletons/table-skeleton' interface DocumentLoadingProps { @@ -20,30 +20,29 @@ export function DocumentLoading({ const isSidebarCollapsed = mode === 'expanded' ? !isExpanded : mode === 'collapsed' || mode === 'hover' + const breadcrumbs = [ + { + id: 'knowledge-root', + label: 'Knowledge', + href: '/w/knowledge', + }, + { + id: `knowledge-base-${knowledgeBaseId}`, + label: knowledgeBaseName, + href: `/w/knowledge/${knowledgeBaseId}`, + }, + { + id: `document-${knowledgeBaseId}-${documentName}`, + label: documentName, + }, + ] + return (
- {/* Fixed Header with Breadcrumbs */} -
- - - Knowledge - - / - - {knowledgeBaseName} - - / - {documentName} -
+ {/* Header with Breadcrumbs */} +
diff --git a/apps/sim/app/w/knowledge/[id]/[documentId]/components/edit-chunk-modal.tsx b/apps/sim/app/w/knowledge/[id]/[documentId]/components/edit-chunk-modal.tsx index 865e6cf96d..c7e60b92b3 100644 --- a/apps/sim/app/w/knowledge/[id]/[documentId]/components/edit-chunk-modal.tsx +++ b/apps/sim/app/w/knowledge/[id]/[documentId]/components/edit-chunk-modal.tsx @@ -2,10 +2,24 @@ import { useEffect, useState } from 'react' import { X } from 'lucide-react' +import { + AlertDialog, + AlertDialogAction, + AlertDialogCancel, + AlertDialogContent, + AlertDialogDescription, + AlertDialogFooter, + AlertDialogHeader, + AlertDialogTitle, +} from '@/components/ui/alert-dialog' import { Button } from '@/components/ui/button' import { Dialog, DialogContent, DialogHeader, DialogTitle } from '@/components/ui/dialog' import { Label } from '@/components/ui/label' import { Textarea } from '@/components/ui/textarea' +import { createLogger } from '@/lib/logs/console-logger' +import type { DocumentData } from '@/stores/knowledge/knowledge' + +const logger = createLogger('EditChunkModal') interface ChunkData { id: string @@ -24,21 +38,6 @@ interface ChunkData { updatedAt: string } -interface DocumentData { - id: string - knowledgeBaseId: string - filename: string - fileUrl: string - fileSize: number - mimeType: string - fileHash?: string - chunkCount: number - tokenCount: number - characterCount: number - enabled: boolean - uploadedAt: string -} - interface EditChunkModalProps { chunk: ChunkData | null document: DocumentData | null @@ -56,8 +55,12 @@ export function EditChunkModal({ onClose, onChunkUpdate, }: EditChunkModalProps) { - const [editedContent, setEditedContent] = useState('') + const [editedContent, setEditedContent] = useState(chunk?.content || '') const [isSaving, setIsSaving] = useState(false) + const [showUnsavedChangesAlert, setShowUnsavedChangesAlert] = useState(false) + + // Check if there are unsaved changes + const hasUnsavedChanges = editedContent !== (chunk?.content || '') // Update edited content when chunk changes useEffect(() => { @@ -93,75 +96,160 @@ export function EditChunkModal({ if (result.success && onChunkUpdate) { onChunkUpdate(result.data) - onClose() + handleCloseModal() } } catch (error) { - console.error('Error updating chunk:', error) + logger.error('Error updating chunk:', error) } finally { setIsSaving(false) } } - const handleCancel = () => { - setEditedContent(chunk?.content || '') + const handleCloseModal = () => { onClose() + setEditedContent('') + } + + const handleCloseAttempt = () => { + if (hasUnsavedChanges && !isSaving) { + setShowUnsavedChangesAlert(true) + } else { + handleCloseModal() + } + } + + const handleCancel = () => { + if (hasUnsavedChanges) { + setShowUnsavedChangesAlert(true) + } else { + handleCloseModal() + } + } + + const handleConfirmDiscard = () => { + setShowUnsavedChangesAlert(false) + handleCloseModal() + } + + const handleKeepEditing = () => { + setShowUnsavedChangesAlert(false) } if (!chunk || !document) return null return ( - - - -
- Edit Chunk Content - -
-
+ <> + + + +
+
+ + Edit Chunk Content + +

+ Modify the content of this knowledge chunk +

+
+ +
+
-
-
- {/* Scrollable Content */} -
-
-
- -