Compare commits

...

8 Commits

Author SHA1 Message Date
Waleed Latif
66dd56b614 fix(knowledge): infer MIME type from file extension in create/upsert tools
Both create_document and upsert_document forced .txt extension and
text/plain MIME type regardless of the document name. Now the tools
infer the correct MIME type from the file extension (html, md, csv,
json, yaml, xml) and only default to .txt when no extension is given.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-18 03:53:33 -07:00
Waleed Latif
44d9b743ab lint
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 18:13:06 -07:00
Waleed Latif
3ab6e65fea lint 2026-03-17 17:55:01 -07:00
Waleed Latif
f6359ff633 fix(knowledge): rollback on delete failure, deduplicate sub-block IDs
- Add compensating rollback: if deleteDocument throws after create
  succeeds, clean up the new record to prevent orphaned pending docs
- Merge duplicate name/content sub-blocks into single entries with
  array conditions, matching the documentTags pattern

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 17:53:02 -07:00
Waleed Latif
e346f7761f fix(knowledge): prevent documentId fallthrough and use byte-count limit
- Use if/else so filename lookup only runs when no documentId is provided,
  preventing stale IDs from silently replacing unrelated documents
- Check utf8 byte length instead of character count for 1MB size limit,
  correctly handling multi-byte characters (CJK, emoji)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 17:43:40 -07:00
Waleed Latif
1516b748d3 fix(knowledge): guard against empty createDocumentRecords result
Add safety check before accessing firstDocument to prevent TypeError
and data loss if createDocumentRecords unexpectedly returns empty.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 17:33:37 -07:00
Waleed Latif
3e4a6e451a fix(knowledge): address review comments on upsert document
- Reorder create-then-delete to prevent data loss if creation fails
- Move Zod validation before workflow authorization for validated input
- Fix btoa stack overflow for large content using loop-based encoding

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 17:18:03 -07:00
Waleed Latif
8bf63fe667 feat(knowledge): add upsert document operation to Knowledge block
Add a "Create or Update" (upsert) document capability that finds an
existing document by ID or filename, deletes it if found, then creates
a new document and queues re-processing. Includes new tool, API route,
block wiring, and typed interfaces.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 17:08:10 -07:00
7 changed files with 535 additions and 9 deletions

View File

@@ -0,0 +1,248 @@
import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { document } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import {
createDocumentRecords,
deleteDocument,
getProcessingConfig,
processDocumentsWithQueue,
} from '@/lib/knowledge/documents/service'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
import { checkKnowledgeBaseWriteAccess } from '@/app/api/knowledge/utils'
const logger = createLogger('DocumentUpsertAPI')
const UpsertDocumentSchema = z.object({
documentId: z.string().optional(),
filename: z.string().min(1, 'Filename is required'),
fileUrl: z.string().min(1, 'File URL is required'),
fileSize: z.number().min(1, 'File size must be greater than 0'),
mimeType: z.string().min(1, 'MIME type is required'),
documentTagsData: z.string().optional(),
processingOptions: z.object({
chunkSize: z.number().min(100).max(4000),
minCharactersPerChunk: z.number().min(1).max(2000),
recipe: z.string(),
lang: z.string(),
chunkOverlap: z.number().min(0).max(500),
}),
workflowId: z.string().optional(),
})
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = randomUUID().slice(0, 8)
const { id: knowledgeBaseId } = await params
try {
const body = await req.json()
logger.info(`[${requestId}] Knowledge base document upsert request`, {
knowledgeBaseId,
hasDocumentId: !!body.documentId,
filename: body.filename,
})
const auth = await checkSessionOrInternalAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
logger.warn(`[${requestId}] Authentication failed: ${auth.error || 'Unauthorized'}`)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId
const validatedData = UpsertDocumentSchema.parse(body)
if (validatedData.workflowId) {
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId: validatedData.workflowId,
userId,
action: 'write',
})
if (!authorization.allowed) {
return NextResponse.json(
{ error: authorization.message || 'Access denied' },
{ status: authorization.status }
)
}
}
const accessCheck = await checkKnowledgeBaseWriteAccess(knowledgeBaseId, userId)
if (!accessCheck.hasAccess) {
if ('notFound' in accessCheck && accessCheck.notFound) {
logger.warn(`[${requestId}] Knowledge base not found: ${knowledgeBaseId}`)
return NextResponse.json({ error: 'Knowledge base not found' }, { status: 404 })
}
logger.warn(
`[${requestId}] User ${userId} attempted to upsert document in unauthorized knowledge base ${knowledgeBaseId}`
)
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
let existingDocumentId: string | null = null
let isUpdate = false
if (validatedData.documentId) {
const existingDoc = await db
.select({ id: document.id })
.from(document)
.where(
and(
eq(document.id, validatedData.documentId),
eq(document.knowledgeBaseId, knowledgeBaseId),
isNull(document.deletedAt)
)
)
.limit(1)
if (existingDoc.length > 0) {
existingDocumentId = existingDoc[0].id
}
} else {
const docsByFilename = await db
.select({ id: document.id })
.from(document)
.where(
and(
eq(document.filename, validatedData.filename),
eq(document.knowledgeBaseId, knowledgeBaseId),
isNull(document.deletedAt)
)
)
.limit(1)
if (docsByFilename.length > 0) {
existingDocumentId = docsByFilename[0].id
}
}
if (existingDocumentId) {
isUpdate = true
logger.info(
`[${requestId}] Found existing document ${existingDocumentId}, creating replacement before deleting old`
)
}
const createdDocuments = await createDocumentRecords(
[
{
filename: validatedData.filename,
fileUrl: validatedData.fileUrl,
fileSize: validatedData.fileSize,
mimeType: validatedData.mimeType,
...(validatedData.documentTagsData && {
documentTagsData: validatedData.documentTagsData,
}),
},
],
knowledgeBaseId,
requestId
)
const firstDocument = createdDocuments[0]
if (!firstDocument) {
logger.error(`[${requestId}] createDocumentRecords returned empty array unexpectedly`)
return NextResponse.json({ error: 'Failed to create document record' }, { status: 500 })
}
if (existingDocumentId) {
try {
await deleteDocument(existingDocumentId, requestId)
} catch (deleteError) {
logger.error(
`[${requestId}] Failed to delete old document ${existingDocumentId}, rolling back new record`,
deleteError
)
await deleteDocument(firstDocument.documentId, requestId).catch(() => {})
return NextResponse.json({ error: 'Failed to replace existing document' }, { status: 500 })
}
}
processDocumentsWithQueue(
createdDocuments,
knowledgeBaseId,
validatedData.processingOptions,
requestId
).catch((error: unknown) => {
logger.error(`[${requestId}] Critical error in document processing pipeline:`, error)
})
try {
const { PlatformEvents } = await import('@/lib/core/telemetry')
PlatformEvents.knowledgeBaseDocumentsUploaded({
knowledgeBaseId,
documentsCount: 1,
uploadType: 'single',
chunkSize: validatedData.processingOptions.chunkSize,
recipe: validatedData.processingOptions.recipe,
})
} catch (_e) {
// Silently fail
}
recordAudit({
workspaceId: accessCheck.knowledgeBase?.workspaceId ?? null,
actorId: userId,
actorName: auth.userName,
actorEmail: auth.userEmail,
action: isUpdate ? AuditAction.DOCUMENT_UPDATED : AuditAction.DOCUMENT_UPLOADED,
resourceType: AuditResourceType.DOCUMENT,
resourceId: knowledgeBaseId,
resourceName: validatedData.filename,
description: isUpdate
? `Upserted (replaced) document "${validatedData.filename}" in knowledge base "${knowledgeBaseId}"`
: `Upserted (created) document "${validatedData.filename}" in knowledge base "${knowledgeBaseId}"`,
metadata: {
fileName: validatedData.filename,
previousDocumentId: existingDocumentId,
isUpdate,
},
request: req,
})
return NextResponse.json({
success: true,
data: {
documentsCreated: [
{
documentId: firstDocument.documentId,
filename: firstDocument.filename,
status: 'pending',
},
],
isUpdate,
previousDocumentId: existingDocumentId,
processingMethod: 'background',
processingConfig: {
maxConcurrentDocuments: getProcessingConfig().maxConcurrentDocuments,
batchSize: getProcessingConfig().batchSize,
},
},
})
} catch (error) {
if (error instanceof z.ZodError) {
logger.warn(`[${requestId}] Invalid upsert request data`, { errors: error.errors })
return NextResponse.json(
{ error: 'Invalid request data', details: error.errors },
{ status: 400 }
)
}
logger.error(`[${requestId}] Error upserting document`, error)
const errorMessage = error instanceof Error ? error.message : 'Failed to upsert document'
const isStorageLimitError =
errorMessage.includes('Storage limit exceeded') || errorMessage.includes('storage limit')
const isMissingKnowledgeBase = errorMessage === 'Knowledge base not found'
return NextResponse.json(
{ error: errorMessage },
{ status: isMissingKnowledgeBase ? 404 : isStorageLimitError ? 413 : 500 }
)
}
}

View File

@@ -29,6 +29,7 @@ export const KnowledgeBlock: BlockConfig = {
{ label: 'List Documents', id: 'list_documents' },
{ label: 'Get Document', id: 'get_document' },
{ label: 'Create Document', id: 'create_document' },
{ label: 'Upsert Document', id: 'upsert_document' },
{ label: 'Delete Document', id: 'delete_document' },
{ label: 'List Chunks', id: 'list_chunks' },
{ label: 'Upload Chunk', id: 'upload_chunk' },
@@ -175,14 +176,14 @@ export const KnowledgeBlock: BlockConfig = {
condition: { field: 'operation', value: 'upload_chunk' },
},
// --- Create Document ---
// --- Create Document / Upsert Document ---
{
id: 'name',
title: 'Document Name',
type: 'short-input',
placeholder: 'Enter document name',
required: true,
condition: { field: 'operation', value: 'create_document' },
condition: { field: 'operation', value: ['create_document', 'upsert_document'] },
},
{
id: 'content',
@@ -191,14 +192,21 @@ export const KnowledgeBlock: BlockConfig = {
placeholder: 'Enter the document content',
rows: 6,
required: true,
condition: { field: 'operation', value: 'create_document' },
condition: { field: 'operation', value: ['create_document', 'upsert_document'] },
},
{
id: 'upsertDocumentId',
title: 'Document ID (Optional)',
type: 'short-input',
placeholder: 'Enter existing document ID to update (or leave empty to match by name)',
condition: { field: 'operation', value: 'upsert_document' },
},
{
id: 'documentTags',
title: 'Document Tags',
type: 'document-tag-entry',
dependsOn: ['knowledgeBaseSelector'],
condition: { field: 'operation', value: 'create_document' },
condition: { field: 'operation', value: ['create_document', 'upsert_document'] },
},
// --- Update Chunk / Delete Chunk ---
@@ -264,6 +272,7 @@ export const KnowledgeBlock: BlockConfig = {
'knowledge_search',
'knowledge_upload_chunk',
'knowledge_create_document',
'knowledge_upsert_document',
'knowledge_list_tags',
'knowledge_list_documents',
'knowledge_get_document',
@@ -284,6 +293,8 @@ export const KnowledgeBlock: BlockConfig = {
return 'knowledge_upload_chunk'
case 'create_document':
return 'knowledge_create_document'
case 'upsert_document':
return 'knowledge_upsert_document'
case 'list_tags':
return 'knowledge_list_tags'
case 'list_documents':
@@ -355,6 +366,11 @@ export const KnowledgeBlock: BlockConfig = {
if (params.chunkEnabledFilter) params.enabled = params.chunkEnabledFilter
}
// Map upsert sub-block field to tool param
if (params.operation === 'upsert_document' && params.upsertDocumentId) {
params.documentId = String(params.upsertDocumentId).trim()
}
// Convert enabled dropdown string to boolean for update_chunk
if (params.operation === 'update_chunk' && typeof params.enabled === 'string') {
params.enabled = params.enabled === 'true'
@@ -382,6 +398,7 @@ export const KnowledgeBlock: BlockConfig = {
documentTags: { type: 'string', description: 'Document tags' },
chunkSearch: { type: 'string', description: 'Search filter for chunks' },
chunkEnabledFilter: { type: 'string', description: 'Filter chunks by enabled status' },
upsertDocumentId: { type: 'string', description: 'Document ID for upsert operation' },
connectorId: { type: 'string', description: 'Connector identifier' },
},
outputs: {

View File

@@ -1,4 +1,7 @@
import type { KnowledgeCreateDocumentResponse } from '@/tools/knowledge/types'
import {
inferDocumentFileInfo,
type KnowledgeCreateDocumentResponse,
} from '@/tools/knowledge/types'
import { enrichKBTagsSchema } from '@/tools/schema-enrichers'
import { formatDocumentTagsForAPI, parseDocumentTags } from '@/tools/shared/tags'
import type { ToolConfig } from '@/tools/types'
@@ -75,18 +78,18 @@ export const knowledgeCreateDocumentTool: ToolConfig<any, KnowledgeCreateDocumen
? Buffer.from(textContent, 'utf8').toString('base64')
: btoa(String.fromCharCode(...utf8Bytes))
const dataUri = `data:text/plain;base64,${base64Content}`
const { filename, mimeType } = inferDocumentFileInfo(documentName)
const dataUri = `data:${mimeType};base64,${base64Content}`
// Parse document tags from various formats (object, array, JSON string)
const parsedTags = parseDocumentTags(params.documentTags)
const tagData = formatDocumentTagsForAPI(parsedTags)
const documents = [
{
filename: documentName.endsWith('.txt') ? documentName : `${documentName}.txt`,
filename,
fileUrl: dataUri,
fileSize: contentBytes,
mimeType: 'text/plain',
mimeType,
...tagData,
},
]

View File

@@ -11,6 +11,7 @@ import { knowledgeSearchTool } from '@/tools/knowledge/search'
import { knowledgeTriggerSyncTool } from '@/tools/knowledge/trigger_sync'
import { knowledgeUpdateChunkTool } from '@/tools/knowledge/update_chunk'
import { knowledgeUploadChunkTool } from '@/tools/knowledge/upload_chunk'
import { knowledgeUpsertDocumentTool } from '@/tools/knowledge/upsert_document'
export {
knowledgeSearchTool,
@@ -26,4 +27,5 @@ export {
knowledgeListConnectorsTool,
knowledgeGetConnectorTool,
knowledgeTriggerSyncTool,
knowledgeUpsertDocumentTool,
}

View File

@@ -1,3 +1,38 @@
const EXTENSION_MIME_MAP: Record<string, string> = {
html: 'text/html',
htm: 'text/html',
md: 'text/markdown',
csv: 'text/csv',
json: 'application/json',
yaml: 'application/x-yaml',
yml: 'application/x-yaml',
xml: 'application/xml',
txt: 'text/plain',
} as const
/**
* Infers MIME type from a file extension. Returns `text/plain` for unknown extensions.
*/
export function getMimeTypeFromExtension(ext: string): string {
return EXTENSION_MIME_MAP[ext.toLowerCase()] ?? 'text/plain'
}
/**
* Extracts extension from a filename and returns the normalized filename and MIME type.
* If no extension is present, appends `.txt` and uses `text/plain`.
*/
export function inferDocumentFileInfo(documentName: string): {
filename: string
mimeType: string
} {
const dotIndex = documentName.lastIndexOf('.')
if (dotIndex > 0) {
const ext = documentName.slice(dotIndex + 1).toLowerCase()
return { filename: documentName, mimeType: getMimeTypeFromExtension(ext) }
}
return { filename: `${documentName}.txt`, mimeType: 'text/plain' }
}
export interface KnowledgeSearchResult {
documentId: string
documentName: string
@@ -286,3 +321,33 @@ export interface KnowledgeTriggerSyncResponse {
}
error?: string
}
export interface KnowledgeUpsertDocumentParams {
knowledgeBaseId: string
name: string
content: string
documentId?: string
documentTags?: Record<string, unknown>
_context?: { workflowId?: string }
}
export interface KnowledgeUpsertDocumentResult {
documentId: string
documentName: string
type: string
enabled: boolean
isUpdate: boolean
previousDocumentId: string | null
createdAt: string
updatedAt: string
}
export interface KnowledgeUpsertDocumentResponse {
success: boolean
output: {
data: KnowledgeUpsertDocumentResult
message: string
documentId: string
}
error?: string
}

View File

@@ -0,0 +1,189 @@
import {
inferDocumentFileInfo,
type KnowledgeUpsertDocumentParams,
type KnowledgeUpsertDocumentResponse,
} from '@/tools/knowledge/types'
import { enrichKBTagsSchema } from '@/tools/schema-enrichers'
import { formatDocumentTagsForAPI, parseDocumentTags } from '@/tools/shared/tags'
import type { ToolConfig } from '@/tools/types'
export const knowledgeUpsertDocumentTool: ToolConfig<
KnowledgeUpsertDocumentParams,
KnowledgeUpsertDocumentResponse
> = {
id: 'knowledge_upsert_document',
name: 'Knowledge Upsert Document',
description:
'Create or update a document in a knowledge base. If a document with the given ID or filename already exists, it will be replaced with the new content.',
version: '1.0.0',
params: {
knowledgeBaseId: {
type: 'string',
required: true,
visibility: 'user-or-llm',
description: 'ID of the knowledge base containing the document',
},
documentId: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description:
'Optional ID of an existing document to update. If not provided, lookup is done by filename.',
},
name: {
type: 'string',
required: true,
visibility: 'user-or-llm',
description: 'Name of the document',
},
content: {
type: 'string',
required: true,
visibility: 'user-or-llm',
description: 'Content of the document',
},
documentTags: {
type: 'json',
required: false,
visibility: 'user-or-llm',
description: 'Document tags',
},
},
schemaEnrichment: {
documentTags: {
dependsOn: 'knowledgeBaseId',
enrichSchema: enrichKBTagsSchema,
},
},
request: {
url: (params) => `/api/knowledge/${params.knowledgeBaseId}/documents/upsert`,
method: 'POST',
headers: () => ({
'Content-Type': 'application/json',
}),
body: (params) => {
const workflowId = params._context?.workflowId
const textContent = params.content?.trim()
const documentName = params.name?.trim()
if (!documentName || documentName.length === 0) {
throw new Error('Document name is required')
}
if (documentName.length > 255) {
throw new Error('Document name must be 255 characters or less')
}
if (!textContent || textContent.length < 1) {
throw new Error('Document content cannot be empty')
}
const utf8Bytes = new TextEncoder().encode(textContent)
const contentBytes = utf8Bytes.length
if (contentBytes > 1_000_000) {
throw new Error('Document content exceeds maximum size of 1MB')
}
let base64Content: string
if (typeof Buffer !== 'undefined') {
base64Content = Buffer.from(textContent, 'utf8').toString('base64')
} else {
let binary = ''
for (let i = 0; i < utf8Bytes.length; i++) {
binary += String.fromCharCode(utf8Bytes[i])
}
base64Content = btoa(binary)
}
const { filename, mimeType } = inferDocumentFileInfo(documentName)
const dataUri = `data:${mimeType};base64,${base64Content}`
const parsedTags = parseDocumentTags(params.documentTags)
const tagData = formatDocumentTagsForAPI(parsedTags)
const requestBody: Record<string, unknown> = {
filename,
fileUrl: dataUri,
fileSize: contentBytes,
mimeType,
...tagData,
processingOptions: {
chunkSize: 1024,
minCharactersPerChunk: 1,
chunkOverlap: 200,
recipe: 'default',
lang: 'en',
},
...(workflowId && { workflowId }),
}
if (params.documentId && String(params.documentId).trim().length > 0) {
requestBody.documentId = String(params.documentId).trim()
}
return requestBody
},
},
transformResponse: async (response): Promise<KnowledgeUpsertDocumentResponse> => {
const result = await response.json()
const data = result.data ?? result
const documentsCreated = data.documentsCreated ?? []
const firstDocument = documentsCreated[0]
const isUpdate = data.isUpdate ?? false
const previousDocumentId = data.previousDocumentId ?? null
const documentId = firstDocument?.documentId ?? firstDocument?.id ?? ''
return {
success: true,
output: {
message: isUpdate
? 'Successfully updated document in knowledge base'
: 'Successfully created document in knowledge base',
documentId,
data: {
documentId,
documentName: firstDocument?.filename ?? 'Unknown',
type: 'document',
enabled: true,
isUpdate,
previousDocumentId,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
},
},
}
},
outputs: {
data: {
type: 'object',
description: 'Information about the upserted document',
properties: {
documentId: { type: 'string', description: 'Document ID' },
documentName: { type: 'string', description: 'Document name' },
type: { type: 'string', description: 'Document type' },
enabled: { type: 'boolean', description: 'Whether the document is enabled' },
isUpdate: {
type: 'boolean',
description: 'Whether an existing document was replaced',
},
previousDocumentId: {
type: 'string',
description: 'ID of the document that was replaced, if any',
optional: true,
},
createdAt: { type: 'string', description: 'Creation timestamp' },
updatedAt: { type: 'string', description: 'Last update timestamp' },
},
},
message: {
type: 'string',
description: 'Success or error message describing the operation result',
},
documentId: {
type: 'string',
description: 'ID of the upserted document',
},
},
}

View File

@@ -1195,6 +1195,7 @@ import {
knowledgeTriggerSyncTool,
knowledgeUpdateChunkTool,
knowledgeUploadChunkTool,
knowledgeUpsertDocumentTool,
} from '@/tools/knowledge'
import { langsmithCreateRunsBatchTool, langsmithCreateRunTool } from '@/tools/langsmith'
import { lemlistGetActivitiesTool, lemlistGetLeadTool, lemlistSendEmailTool } from '@/tools/lemlist'
@@ -3703,6 +3704,7 @@ export const tools: Record<string, ToolConfig> = {
knowledge_list_connectors: knowledgeListConnectorsTool,
knowledge_get_connector: knowledgeGetConnectorTool,
knowledge_trigger_sync: knowledgeTriggerSyncTool,
knowledge_upsert_document: knowledgeUpsertDocumentTool,
search_tool: searchTool,
elevenlabs_tts: elevenLabsTtsTool,
fathom_list_meetings: fathomListMeetingsTool,