diff --git a/.github/workflows/docs-embeddings.yml b/.github/workflows/docs-embeddings.yml
index d61d86616..c42b3d96e 100644
--- a/.github/workflows/docs-embeddings.yml
+++ b/.github/workflows/docs-embeddings.yml
@@ -32,4 +32,4 @@ jobs:
env:
DATABASE_URL: ${{ github.ref == 'refs/heads/main' && secrets.DATABASE_URL || secrets.STAGING_DATABASE_URL }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- run: bun run scripts/process-docs-embeddings.ts --clear
+ run: bun run scripts/process-docs.ts --clear
diff --git a/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/[documentId]/components/edit-chunk-modal/edit-chunk-modal.tsx b/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/[documentId]/components/edit-chunk-modal/edit-chunk-modal.tsx
index eb25aa562..6d0723571 100644
--- a/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/[documentId]/components/edit-chunk-modal/edit-chunk-modal.tsx
+++ b/apps/sim/app/workspace/[workspaceId]/knowledge/[id]/[documentId]/components/edit-chunk-modal/edit-chunk-modal.tsx
@@ -312,7 +312,7 @@ export function EditChunkModal({
- Supports PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML (max 100MB
- each)
+ Supports PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML, JSON, YAML,
+ YML (max 100MB each)
diff --git a/apps/sim/app/workspace/[workspaceId]/knowledge/components/create-modal/create-modal.tsx b/apps/sim/app/workspace/[workspaceId]/knowledge/components/create-modal/create-modal.tsx
index 805ff335c..bbe08c4a8 100644
--- a/apps/sim/app/workspace/[workspaceId]/knowledge/components/create-modal/create-modal.tsx
+++ b/apps/sim/app/workspace/[workspaceId]/knowledge/components/create-modal/create-modal.tsx
@@ -158,7 +158,7 @@ export function CreateModal({ open, onOpenChange, onKnowledgeBaseCreated }: Crea
// Check file type
if (!ACCEPTED_FILE_TYPES.includes(file.type)) {
setFileError(
- `File ${file.name} has an unsupported format. Please use PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, or HTML.`
+ `File ${file.name} has an unsupported format. Please use PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML, JSON, YAML, or YML.`
)
hasError = true
continue
@@ -501,8 +501,8 @@ export function CreateModal({ open, onOpenChange, onKnowledgeBaseCreated }: Crea
: 'Drop files here or click to browse'}
- Supports PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML (max
- 100MB each)
+ Supports PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML,
+ JSON, YAML, YML (max 100MB each)
diff --git a/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts b/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts
index fd26bdbf2..7e2e33270 100644
--- a/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts
+++ b/apps/sim/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload.ts
@@ -84,16 +84,200 @@ class ProcessingError extends KnowledgeUploadError {
}
const UPLOAD_CONFIG = {
- BATCH_SIZE: 15, // Upload files in parallel - this is fast and not the bottleneck
- MAX_RETRIES: 3, // Standard retry count
- RETRY_DELAY: 2000, // Initial retry delay in ms (2 seconds)
- RETRY_MULTIPLIER: 2, // Standard exponential backoff (2s, 4s, 8s)
- CHUNK_SIZE: 5 * 1024 * 1024,
- DIRECT_UPLOAD_THRESHOLD: 4 * 1024 * 1024, // Files > 4MB must use presigned URLs
- LARGE_FILE_THRESHOLD: 50 * 1024 * 1024, // Files > 50MB need multipart upload
- UPLOAD_TIMEOUT: 60000, // 60 second timeout per upload
+ MAX_PARALLEL_UPLOADS: 3, // Prevent client saturation ā mirrors guidance on limiting simultaneous transfers (@Web)
+ MAX_RETRIES: 3,
+ RETRY_DELAY_MS: 2000,
+ RETRY_BACKOFF: 2,
+ CHUNK_SIZE: 8 * 1024 * 1024, // 8MB keeps us well above S3 minimum part size while reducing part count (@Web)
+ DIRECT_UPLOAD_THRESHOLD: 4 * 1024 * 1024,
+ LARGE_FILE_THRESHOLD: 50 * 1024 * 1024,
+ BASE_TIMEOUT_MS: 2 * 60 * 1000, // baseline per transfer window per large-file guidance (@Web)
+ TIMEOUT_PER_MB_MS: 1500,
+ MAX_TIMEOUT_MS: 10 * 60 * 1000,
+ MULTIPART_PART_CONCURRENCY: 3,
+ MULTIPART_MAX_RETRIES: 3,
+ BATCH_REQUEST_SIZE: 50,
} as const
+const calculateUploadTimeoutMs = (fileSize: number) => {
+ const sizeInMb = fileSize / (1024 * 1024)
+ const dynamicBudget = UPLOAD_CONFIG.BASE_TIMEOUT_MS + sizeInMb * UPLOAD_CONFIG.TIMEOUT_PER_MB_MS
+ return Math.min(dynamicBudget, UPLOAD_CONFIG.MAX_TIMEOUT_MS)
+}
+
+const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
+
+const getHighResTime = () =>
+ typeof performance !== 'undefined' && typeof performance.now === 'function'
+ ? performance.now()
+ : Date.now()
+
+const formatMegabytes = (bytes: number) => Number((bytes / (1024 * 1024)).toFixed(2))
+
+const calculateThroughputMbps = (bytes: number, durationMs: number) => {
+ if (!bytes || !durationMs) return 0
+ return Number((((bytes * 8) / durationMs) * 0.001).toFixed(2))
+}
+
+const formatDurationSeconds = (durationMs: number) => Number((durationMs / 1000).toFixed(2))
+
+const runWithConcurrency = async (
+ items: T[],
+ limit: number,
+ worker: (item: T, index: number) => Promise
+): Promise>> => {
+ const results: Array> = Array(items.length)
+
+ if (items.length === 0) {
+ return results
+ }
+
+ const concurrency = Math.max(1, Math.min(limit, items.length))
+ let nextIndex = 0
+
+ const runners = Array.from({ length: concurrency }, async () => {
+ while (true) {
+ const currentIndex = nextIndex++
+ if (currentIndex >= items.length) {
+ break
+ }
+
+ try {
+ const value = await worker(items[currentIndex], currentIndex)
+ results[currentIndex] = { status: 'fulfilled', value }
+ } catch (error) {
+ results[currentIndex] = { status: 'rejected', reason: error }
+ }
+ }
+ })
+
+ await Promise.all(runners)
+ return results
+}
+
+const getErrorName = (error: unknown) =>
+ typeof error === 'object' && error !== null && 'name' in error ? String((error as any).name) : ''
+
+const getErrorMessage = (error: unknown) =>
+ error instanceof Error ? error.message : typeof error === 'string' ? error : 'Unknown error'
+
+const isAbortError = (error: unknown) => getErrorName(error) === 'AbortError'
+
+const isNetworkError = (error: unknown) => {
+ if (!(error instanceof Error)) {
+ return false
+ }
+
+ const message = error.message.toLowerCase()
+ return (
+ message.includes('network') ||
+ message.includes('fetch') ||
+ message.includes('connection') ||
+ message.includes('timeout') ||
+ message.includes('timed out') ||
+ message.includes('ecconnreset')
+ )
+}
+
+interface PresignedFileInfo {
+ path: string
+ key: string
+ name: string
+ size: number
+ type: string
+}
+
+interface PresignedUploadInfo {
+ fileName: string
+ presignedUrl: string
+ fileInfo: PresignedFileInfo
+ uploadHeaders?: Record
+ directUploadSupported: boolean
+ presignedUrls?: any
+}
+
+const normalizePresignedData = (data: any, context: string): PresignedUploadInfo => {
+ const presignedUrl = data?.presignedUrl || data?.uploadUrl
+ const fileInfo = data?.fileInfo
+
+ if (!presignedUrl || !fileInfo?.path) {
+ throw new PresignedUrlError(`Invalid presigned response for ${context}`, data)
+ }
+
+ return {
+ fileName: data.fileName || fileInfo.name || context,
+ presignedUrl,
+ fileInfo: {
+ path: fileInfo.path,
+ key: fileInfo.key,
+ name: fileInfo.name || context,
+ size: fileInfo.size || data.fileSize || 0,
+ type: fileInfo.type || data.contentType || '',
+ },
+ uploadHeaders: data.uploadHeaders || undefined,
+ directUploadSupported: data.directUploadSupported !== false,
+ presignedUrls: data.presignedUrls,
+ }
+}
+
+const getPresignedData = async (
+ file: File,
+ timeoutMs: number,
+ controller?: AbortController
+): Promise => {
+ const localController = controller ?? new AbortController()
+ const timeoutId = setTimeout(() => localController.abort(), timeoutMs)
+ const startTime = getHighResTime()
+
+ try {
+ const presignedResponse = await fetch('/api/files/presigned?type=knowledge-base', {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ body: JSON.stringify({
+ fileName: file.name,
+ contentType: file.type,
+ fileSize: file.size,
+ }),
+ signal: localController.signal,
+ })
+
+ if (!presignedResponse.ok) {
+ let errorDetails: any = null
+ try {
+ errorDetails = await presignedResponse.json()
+ } catch {
+ // Ignore JSON parsing errors (@Web)
+ }
+
+ logger.error('Presigned URL request failed', {
+ status: presignedResponse.status,
+ fileSize: file.size,
+ })
+
+ throw new PresignedUrlError(
+ `Failed to get presigned URL for ${file.name}: ${presignedResponse.status} ${presignedResponse.statusText}`,
+ errorDetails
+ )
+ }
+
+ const presignedData = await presignedResponse.json()
+ const durationMs = getHighResTime() - startTime
+ logger.info('Fetched presigned URL', {
+ fileName: file.name,
+ sizeMB: formatMegabytes(file.size),
+ durationMs: formatDurationSeconds(durationMs),
+ })
+ return normalizePresignedData(presignedData, file.name)
+ } finally {
+ clearTimeout(timeoutId)
+ if (!controller) {
+ localController.abort()
+ }
+ }
+}
+
export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
const [isUploading, setIsUploading] = useState(false)
const [uploadProgress, setUploadProgress] = useState({
@@ -153,85 +337,51 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
const uploadSingleFileWithRetry = async (
file: File,
retryCount = 0,
- fileIndex?: number
+ fileIndex?: number,
+ presignedOverride?: PresignedUploadInfo
): Promise => {
+ const timeoutMs = calculateUploadTimeoutMs(file.size)
+ let presignedData: PresignedUploadInfo | undefined
+ const attempt = retryCount + 1
+ logger.info('Upload attempt started', {
+ fileName: file.name,
+ attempt,
+ sizeMB: formatMegabytes(file.size),
+ timeoutMs: formatDurationSeconds(timeoutMs),
+ })
+
try {
- // Create abort controller for timeout
const controller = new AbortController()
- const timeoutId = setTimeout(() => controller.abort(), UPLOAD_CONFIG.UPLOAD_TIMEOUT)
+ const timeoutId = setTimeout(() => controller.abort(), timeoutMs)
try {
- // Get presigned URL
- const presignedResponse = await fetch('/api/files/presigned?type=knowledge-base', {
- method: 'POST',
- headers: {
- 'Content-Type': 'application/json',
- },
- body: JSON.stringify({
- fileName: file.name,
- contentType: file.type,
- fileSize: file.size,
- }),
- signal: controller.signal,
- })
-
- clearTimeout(timeoutId)
-
- if (!presignedResponse.ok) {
- let errorDetails: any = null
- try {
- errorDetails = await presignedResponse.json()
- } catch {
- // Ignore JSON parsing errors
- }
-
- logger.error('Presigned URL request failed', {
- status: presignedResponse.status,
- fileSize: file.size,
- retryCount,
- })
-
- throw new PresignedUrlError(
- `Failed to get presigned URL for ${file.name}: ${presignedResponse.status} ${presignedResponse.statusText}`,
- errorDetails
- )
- }
-
- const presignedData = await presignedResponse.json()
+ presignedData = presignedOverride ?? (await getPresignedData(file, timeoutMs, controller))
if (presignedData.directUploadSupported) {
- // Use presigned URLs for all uploads when cloud storage is available
- // Check if file needs multipart upload for large files
if (file.size > UPLOAD_CONFIG.LARGE_FILE_THRESHOLD) {
- return await uploadFileInChunks(file, presignedData)
+ return await uploadFileInChunks(file, presignedData, timeoutMs, fileIndex)
}
- return await uploadFileDirectly(file, presignedData, fileIndex)
+ return await uploadFileDirectly(file, presignedData, timeoutMs, controller, fileIndex)
}
- // Fallback to traditional upload through API route
- // This is only used when cloud storage is not configured
- // Must check file size due to Vercel's 4.5MB limit
+
if (file.size > UPLOAD_CONFIG.DIRECT_UPLOAD_THRESHOLD) {
throw new DirectUploadError(
`File ${file.name} is too large (${(file.size / 1024 / 1024).toFixed(2)}MB) for upload. Cloud storage must be configured for files over 4MB.`,
{ fileSize: file.size, limit: UPLOAD_CONFIG.DIRECT_UPLOAD_THRESHOLD }
)
}
+
logger.warn(`Using API upload fallback for ${file.name} - cloud storage not configured`)
- return await uploadFileThroughAPI(file)
+ return await uploadFileThroughAPI(file, timeoutMs)
} finally {
clearTimeout(timeoutId)
}
} catch (error) {
- const isTimeout = error instanceof Error && error.name === 'AbortError'
- const isNetwork =
- error instanceof Error &&
- (error.message.includes('fetch') ||
- error.message.includes('network') ||
- error.message.includes('Failed to fetch'))
+ const isTimeout = isAbortError(error)
+ const isNetwork = isNetworkError(error)
- // Retry logic
if (retryCount < UPLOAD_CONFIG.MAX_RETRIES) {
- const delay = UPLOAD_CONFIG.RETRY_DELAY * UPLOAD_CONFIG.RETRY_MULTIPLIER ** retryCount // More aggressive exponential backoff
+ const delay = UPLOAD_CONFIG.RETRY_DELAY_MS * UPLOAD_CONFIG.RETRY_BACKOFF ** retryCount // More aggressive exponential backoff (@Web)
if (isTimeout || isNetwork) {
logger.warn(
`Upload failed (${isTimeout ? 'timeout' : 'network'}), retrying in ${delay / 1000}s...`,
@@ -243,7 +393,6 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
)
}
- // Reset progress to 0 before retry to indicate restart
if (fileIndex !== undefined) {
setUploadProgress((prev) => ({
...prev,
@@ -253,8 +402,14 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
}))
}
- await new Promise((resolve) => setTimeout(resolve, delay))
- return uploadSingleFileWithRetry(file, retryCount + 1, fileIndex)
+ await sleep(delay)
+ const shouldReusePresigned = (isTimeout || isNetwork) && presignedData
+ return uploadSingleFileWithRetry(
+ file,
+ retryCount + 1,
+ fileIndex,
+ shouldReusePresigned ? presignedData : undefined
+ )
}
logger.error('Upload failed after retries', {
@@ -271,12 +426,15 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
*/
const uploadFileDirectly = async (
file: File,
- presignedData: any,
+ presignedData: PresignedUploadInfo,
+ timeoutMs: number,
+ outerController: AbortController,
fileIndex?: number
): Promise => {
return new Promise((resolve, reject) => {
const xhr = new XMLHttpRequest()
- let isCompleted = false // Track if this upload has completed to prevent duplicate state updates
+ let isCompleted = false
+ const startTime = getHighResTime()
const timeoutId = setTimeout(() => {
if (!isCompleted) {
@@ -284,7 +442,18 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
xhr.abort()
reject(new Error('Upload timeout'))
}
- }, UPLOAD_CONFIG.UPLOAD_TIMEOUT)
+ }, timeoutMs)
+
+ const abortHandler = () => {
+ if (!isCompleted) {
+ isCompleted = true
+ clearTimeout(timeoutId)
+ xhr.abort()
+ reject(new DirectUploadError(`Upload aborted for ${file.name}`, {}))
+ }
+ }
+
+ outerController.signal.addEventListener('abort', abortHandler)
// Track upload progress
xhr.upload.addEventListener('progress', (event) => {
@@ -309,10 +478,19 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
if (!isCompleted) {
isCompleted = true
clearTimeout(timeoutId)
+ outerController.signal.removeEventListener('abort', abortHandler)
+ const durationMs = getHighResTime() - startTime
if (xhr.status >= 200 && xhr.status < 300) {
const fullFileUrl = presignedData.fileInfo.path.startsWith('http')
? presignedData.fileInfo.path
: `${window.location.origin}${presignedData.fileInfo.path}`
+ logger.info('Direct upload completed', {
+ fileName: file.name,
+ sizeMB: formatMegabytes(file.size),
+ durationMs: formatDurationSeconds(durationMs),
+ throughputMbps: calculateThroughputMbps(file.size, durationMs),
+ status: xhr.status,
+ })
resolve(createUploadedFile(file.name, fullFileUrl, file.size, file.type, file))
} else {
logger.error('S3 PUT request failed', {
@@ -335,17 +513,18 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
if (!isCompleted) {
isCompleted = true
clearTimeout(timeoutId)
+ outerController.signal.removeEventListener('abort', abortHandler)
+ const durationMs = getHighResTime() - startTime
+ logger.error('Direct upload network error', {
+ fileName: file.name,
+ sizeMB: formatMegabytes(file.size),
+ durationMs: formatDurationSeconds(durationMs),
+ })
reject(new DirectUploadError(`Network error uploading ${file.name}`, {}))
}
})
- xhr.addEventListener('abort', () => {
- if (!isCompleted) {
- isCompleted = true
- clearTimeout(timeoutId)
- reject(new DirectUploadError(`Upload aborted for ${file.name}`, {}))
- }
- })
+ xhr.addEventListener('abort', abortHandler)
// Start the upload
xhr.open('PUT', presignedData.presignedUrl)
@@ -365,10 +544,16 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
/**
* Upload large file in chunks (multipart upload)
*/
- const uploadFileInChunks = async (file: File, presignedData: any): Promise => {
+ const uploadFileInChunks = async (
+ file: File,
+ presignedData: PresignedUploadInfo,
+ timeoutMs: number,
+ fileIndex?: number
+ ): Promise => {
logger.info(
`Uploading large file ${file.name} (${(file.size / 1024 / 1024).toFixed(2)}MB) using multipart upload`
)
+ const startTime = getHighResTime()
try {
// Step 1: Initiate multipart upload
@@ -419,37 +604,76 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
// Step 4: Upload parts in parallel (batch them to avoid overwhelming the browser)
const uploadedParts: Array<{ ETag: string; PartNumber: number }> = []
- const PARALLEL_UPLOADS = 3 // Upload 3 parts at a time
- for (let i = 0; i < presignedUrls.length; i += PARALLEL_UPLOADS) {
- const batch = presignedUrls.slice(i, i + PARALLEL_UPLOADS)
+ const controller = new AbortController()
+ const multipartTimeoutId = setTimeout(() => controller.abort(), timeoutMs)
- const batchPromises = batch.map(async ({ partNumber, url }: any) => {
+ try {
+ const uploadPart = async ({ partNumber, url }: any) => {
const start = (partNumber - 1) * chunkSize
const end = Math.min(start + chunkSize, file.size)
const chunk = file.slice(start, end)
- const uploadResponse = await fetch(url, {
- method: 'PUT',
- body: chunk,
- headers: {
- 'Content-Type': file.type,
- },
- })
+ for (let attempt = 0; attempt <= UPLOAD_CONFIG.MULTIPART_MAX_RETRIES; attempt++) {
+ try {
+ const partResponse = await fetch(url, {
+ method: 'PUT',
+ body: chunk,
+ signal: controller.signal,
+ headers: {
+ 'Content-Type': file.type,
+ },
+ })
- if (!uploadResponse.ok) {
- throw new Error(`Failed to upload part ${partNumber}: ${uploadResponse.statusText}`)
+ if (!partResponse.ok) {
+ throw new Error(`Failed to upload part ${partNumber}: ${partResponse.statusText}`)
+ }
+
+ const etag = partResponse.headers.get('ETag') || ''
+ logger.info(`Uploaded part ${partNumber}/${numParts}`)
+
+ if (fileIndex !== undefined) {
+ const partProgress = Math.min(100, Math.round((partNumber / numParts) * 100))
+ setUploadProgress((prev) => ({
+ ...prev,
+ fileStatuses: prev.fileStatuses?.map((fs, idx) =>
+ idx === fileIndex ? { ...fs, progress: partProgress } : fs
+ ),
+ }))
+ }
+
+ return { ETag: etag.replace(/"/g, ''), PartNumber: partNumber }
+ } catch (partError) {
+ if (attempt >= UPLOAD_CONFIG.MULTIPART_MAX_RETRIES) {
+ throw partError
+ }
+
+ const delay = UPLOAD_CONFIG.RETRY_DELAY_MS * UPLOAD_CONFIG.RETRY_BACKOFF ** attempt
+ logger.warn(
+ `Part ${partNumber} failed (attempt ${attempt + 1}), retrying in ${Math.round(delay / 1000)}s`
+ )
+ await sleep(delay)
+ }
}
- // Get ETag from response headers
- const etag = uploadResponse.headers.get('ETag') || ''
- logger.info(`Uploaded part ${partNumber}/${numParts}`)
+ throw new Error(`Retries exhausted for part ${partNumber}`)
+ }
- return { ETag: etag.replace(/"/g, ''), PartNumber: partNumber }
+ const partResults = await runWithConcurrency(
+ presignedUrls,
+ UPLOAD_CONFIG.MULTIPART_PART_CONCURRENCY,
+ uploadPart
+ )
+
+ partResults.forEach((result) => {
+ if (result?.status === 'fulfilled') {
+ uploadedParts.push(result.value)
+ } else if (result?.status === 'rejected') {
+ throw result.reason
+ }
})
-
- const batchResults = await Promise.all(batchPromises)
- uploadedParts.push(...batchResults)
+ } finally {
+ clearTimeout(multipartTimeoutId)
}
// Step 5: Complete multipart upload
@@ -470,23 +694,37 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
const { path } = await completeResponse.json()
logger.info(`Completed multipart upload for ${file.name}`)
+ const durationMs = getHighResTime() - startTime
+ logger.info('Multipart upload metrics', {
+ fileName: file.name,
+ sizeMB: formatMegabytes(file.size),
+ parts: uploadedParts.length,
+ durationMs: formatDurationSeconds(durationMs),
+ throughputMbps: calculateThroughputMbps(file.size, durationMs),
+ })
+
const fullFileUrl = path.startsWith('http') ? path : `${window.location.origin}${path}`
return createUploadedFile(file.name, fullFileUrl, file.size, file.type, file)
} catch (error) {
logger.error(`Multipart upload failed for ${file.name}:`, error)
+ const durationMs = getHighResTime() - startTime
+ logger.warn('Falling back to direct upload after multipart failure', {
+ fileName: file.name,
+ sizeMB: formatMegabytes(file.size),
+ durationMs: formatDurationSeconds(durationMs),
+ })
// Fall back to direct upload if multipart fails
- logger.info('Falling back to direct upload')
- return uploadFileDirectly(file, presignedData)
+ return uploadFileDirectly(file, presignedData, timeoutMs, new AbortController(), fileIndex)
}
}
/**
* Fallback upload through API
*/
- const uploadFileThroughAPI = async (file: File): Promise => {
+ const uploadFileThroughAPI = async (file: File, timeoutMs: number): Promise => {
const controller = new AbortController()
- const timeoutId = setTimeout(() => controller.abort(), UPLOAD_CONFIG.UPLOAD_TIMEOUT)
+ const timeoutId = setTimeout(() => controller.abort(), timeoutMs)
try {
const formData = new FormData()
@@ -559,19 +797,20 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
logger.info(`Starting batch upload of ${files.length} files`)
try {
- const BATCH_SIZE = 100 // Process 100 files at a time
const batches = []
- // Create all batches
- for (let batchStart = 0; batchStart < files.length; batchStart += BATCH_SIZE) {
- const batchFiles = files.slice(batchStart, batchStart + BATCH_SIZE)
+ for (
+ let batchStart = 0;
+ batchStart < files.length;
+ batchStart += UPLOAD_CONFIG.BATCH_REQUEST_SIZE
+ ) {
+ const batchFiles = files.slice(batchStart, batchStart + UPLOAD_CONFIG.BATCH_REQUEST_SIZE)
const batchIndexOffset = batchStart
batches.push({ batchFiles, batchIndexOffset })
}
logger.info(`Starting parallel processing of ${batches.length} batches`)
- // Step 1: Get ALL presigned URLs in parallel
const presignedPromises = batches.map(async ({ batchFiles }, batchIndex) => {
logger.info(
`Getting presigned URLs for batch ${batchIndex + 1}/${batches.length} (${batchFiles.length} files)`
@@ -604,9 +843,8 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
const allPresignedData = await Promise.all(presignedPromises)
logger.info(`Got all presigned URLs, starting uploads`)
- // Step 2: Upload all files with global concurrency control
const allUploads = allPresignedData.flatMap(({ batchFiles, presignedData, batchIndex }) => {
- const batchIndexOffset = batchIndex * BATCH_SIZE
+ const batchIndexOffset = batchIndex * UPLOAD_CONFIG.BATCH_REQUEST_SIZE
return batchFiles.map((file, batchFileIndex) => {
const fileIndex = batchIndexOffset + batchFileIndex
@@ -616,16 +854,14 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
})
})
- // Process all uploads with concurrency control
- for (let i = 0; i < allUploads.length; i += UPLOAD_CONFIG.BATCH_SIZE) {
- const concurrentBatch = allUploads.slice(i, i + UPLOAD_CONFIG.BATCH_SIZE)
-
- const uploadPromises = concurrentBatch.map(async ({ file, presigned, fileIndex }) => {
+ const uploadResults = await runWithConcurrency(
+ allUploads,
+ UPLOAD_CONFIG.MAX_PARALLEL_UPLOADS,
+ async ({ file, presigned, fileIndex }) => {
if (!presigned) {
throw new Error(`No presigned data for file ${file.name}`)
}
- // Mark as uploading
setUploadProgress((prev) => ({
...prev,
fileStatuses: prev.fileStatuses?.map((fs, idx) =>
@@ -634,10 +870,8 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
}))
try {
- // Upload directly to storage
- const result = await uploadFileDirectly(file, presigned, fileIndex)
+ const result = await uploadSingleFileWithRetry(file, 0, fileIndex, presigned)
- // Mark as completed
setUploadProgress((prev) => ({
...prev,
filesCompleted: prev.filesCompleted + 1,
@@ -648,7 +882,6 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
return result
} catch (error) {
- // Mark as failed
setUploadProgress((prev) => ({
...prev,
fileStatuses: prev.fileStatuses?.map((fs, idx) =>
@@ -656,30 +889,27 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
? {
...fs,
status: 'failed' as const,
- error: error instanceof Error ? error.message : 'Upload failed',
+ error: getErrorMessage(error),
}
: fs
),
}))
throw error
}
- })
-
- const batchResults = await Promise.allSettled(uploadPromises)
-
- for (let j = 0; j < batchResults.length; j++) {
- const result = batchResults[j]
- if (result.status === 'fulfilled') {
- results.push(result.value)
- } else {
- failedFiles.push({
- file: concurrentBatch[j].file,
- error:
- result.reason instanceof Error ? result.reason : new Error(String(result.reason)),
- })
- }
}
- }
+ )
+
+ uploadResults.forEach((result, idx) => {
+ if (result?.status === 'fulfilled') {
+ results.push(result.value)
+ } else if (result?.status === 'rejected') {
+ failedFiles.push({
+ file: allUploads[idx].file,
+ error:
+ result.reason instanceof Error ? result.reason : new Error(String(result.reason)),
+ })
+ }
+ })
if (failedFiles.length > 0) {
logger.error(`Failed to upload ${failedFiles.length} files`)
diff --git a/apps/sim/lib/knowledge/documents/docs-chunker.ts b/apps/sim/lib/chunkers/docs-chunker.ts
similarity index 65%
rename from apps/sim/lib/knowledge/documents/docs-chunker.ts
rename to apps/sim/lib/chunkers/docs-chunker.ts
index 81bc96280..693c6ca35 100644
--- a/apps/sim/lib/knowledge/documents/docs-chunker.ts
+++ b/apps/sim/lib/chunkers/docs-chunker.ts
@@ -1,10 +1,17 @@
import fs from 'fs/promises'
import path from 'path'
import { generateEmbeddings } from '@/lib/embeddings/utils'
-import { isDev } from '@/lib/environment'
-import { TextChunker } from '@/lib/knowledge/documents/chunker'
-import type { DocChunk, DocsChunkerOptions, HeaderInfo } from '@/lib/knowledge/documents/types'
import { createLogger } from '@/lib/logs/console/logger'
+import { TextChunker } from './text-chunker'
+import type { DocChunk, DocsChunkerOptions } from './types'
+
+interface HeaderInfo {
+ level: number
+ text: string
+ slug?: string
+ anchor?: string
+ position?: number
+}
interface Frontmatter {
title?: string
@@ -29,7 +36,7 @@ export class DocsChunker {
overlap: options.overlap ?? 50,
})
// Use localhost docs in development, production docs otherwise
- this.baseUrl = options.baseUrl ?? (isDev ? 'http://localhost:3001' : 'https://docs.sim.ai')
+ this.baseUrl = options.baseUrl ?? 'https://docs.sim.ai'
}
/**
@@ -108,9 +115,7 @@ export class DocsChunker {
metadata: {
startIndex: chunkStart,
endIndex: chunkEnd,
- hasFrontmatter: i === 0 && content.startsWith('---'),
- documentTitle: frontmatter.title,
- documentDescription: frontmatter.description,
+ title: frontmatter.title,
},
}
@@ -200,7 +205,7 @@ export class DocsChunker {
let relevantHeader: HeaderInfo | null = null
for (const header of headers) {
- if (header.position <= position) {
+ if (header.position !== undefined && header.position <= position) {
relevantHeader = header
} else {
break
@@ -285,53 +290,6 @@ export class DocsChunker {
return { data, content: markdownContent }
}
- /**
- * Split content by headers to respect document structure
- */
- private splitByHeaders(
- content: string
- ): Array<{ header: string | null; content: string; level: number }> {
- const lines = content.split('\n')
- const sections: Array<{ header: string | null; content: string; level: number }> = []
-
- let currentHeader: string | null = null
- let currentLevel = 0
- let currentContent: string[] = []
-
- for (const line of lines) {
- const headerMatch = line.match(/^(#{1,3})\s+(.+)$/) // Only split on H1-H3, not H4-H6
-
- if (headerMatch) {
- // Save previous section
- if (currentContent.length > 0) {
- sections.push({
- header: currentHeader,
- content: currentContent.join('\n').trim(),
- level: currentLevel,
- })
- }
-
- // Start new section
- currentHeader = line
- currentLevel = headerMatch[1].length
- currentContent = []
- } else {
- currentContent.push(line)
- }
- }
-
- // Add final section
- if (currentContent.length > 0) {
- sections.push({
- header: currentHeader,
- content: currentContent.join('\n').trim(),
- level: currentLevel,
- })
- }
-
- return sections.filter((section) => section.content.trim().length > 0)
- }
-
/**
* Estimate token count (rough approximation)
*/
@@ -340,175 +298,6 @@ export class DocsChunker {
return Math.ceil(text.length / 4)
}
- /**
- * Merge small adjacent chunks to reach target size
- */
- private mergeSmallChunks(chunks: string[]): string[] {
- const merged: string[] = []
- let currentChunk = ''
-
- for (const chunk of chunks) {
- const currentTokens = this.estimateTokens(currentChunk)
- const chunkTokens = this.estimateTokens(chunk)
-
- // If adding this chunk would exceed target size, save current and start new
- if (currentTokens > 0 && currentTokens + chunkTokens > 500) {
- if (currentChunk.trim()) {
- merged.push(currentChunk.trim())
- }
- currentChunk = chunk
- } else {
- // Merge with current chunk
- currentChunk = currentChunk ? `${currentChunk}\n\n${chunk}` : chunk
- }
- }
-
- // Add final chunk
- if (currentChunk.trim()) {
- merged.push(currentChunk.trim())
- }
-
- return merged
- }
-
- /**
- * Chunk a section while preserving tables and structure
- */
- private async chunkSection(section: {
- header: string | null
- content: string
- level: number
- }): Promise {
- const content = section.content
- const header = section.header
-
- // Check if content contains tables
- const hasTable = this.containsTable(content)
-
- if (hasTable) {
- // Split by tables and handle each part
- return this.splitContentWithTables(content, header)
- }
- // Regular chunking for text-only content
- const chunks = await this.textChunker.chunk(content)
- return chunks.map((chunk, index) => {
- // Add header to first chunk only
- if (index === 0 && header) {
- return `${header}\n\n${chunk.text}`.trim()
- }
- return chunk.text
- })
- }
-
- /**
- * Check if content contains markdown tables
- */
- private containsTable(content: string): boolean {
- const lines = content.split('\n')
- return lines.some((line, index) => {
- if (line.includes('|') && line.split('|').length >= 3) {
- const nextLine = lines[index + 1]
- return nextLine?.includes('|') && nextLine.includes('-')
- }
- return false
- })
- }
-
- /**
- * Split content that contains tables, keeping tables intact
- */
- private splitContentWithTables(content: string, header: string | null): string[] {
- const lines = content.split('\n')
- const chunks: string[] = []
- let currentChunk: string[] = []
- let inTable = false
- let tableLines: string[] = []
-
- for (let i = 0; i < lines.length; i++) {
- const line = lines[i]
-
- // Detect table start
- if (line.includes('|') && line.split('|').length >= 3 && !inTable) {
- const nextLine = lines[i + 1]
- if (nextLine?.includes('|') && nextLine.includes('-')) {
- inTable = true
-
- // Save current chunk if it has content
- if (currentChunk.length > 0 && currentChunk.join('\n').trim().length > 50) {
- const chunkText = currentChunk.join('\n').trim()
- const withHeader =
- chunks.length === 0 && header ? `${header}\n\n${chunkText}` : chunkText
- chunks.push(withHeader)
- currentChunk = []
- }
-
- tableLines = [line]
- continue
- }
- }
-
- if (inTable) {
- tableLines.push(line)
-
- // Detect table end
- if (!line.includes('|') || line.trim() === '') {
- inTable = false
-
- // Save table as its own chunk
- const tableText = tableLines
- .filter((l) => l.trim())
- .join('\n')
- .trim()
- if (tableText.length > 0) {
- const withHeader =
- chunks.length === 0 && header ? `${header}\n\n${tableText}` : tableText
- chunks.push(withHeader)
- }
-
- tableLines = []
-
- // Start new chunk if current line has content
- if (line.trim() !== '') {
- currentChunk = [line]
- }
- }
- } else {
- currentChunk.push(line)
-
- // If chunk is getting large, save it
- if (this.estimateTokens(currentChunk.join('\n')) > 250) {
- const chunkText = currentChunk.join('\n').trim()
- if (chunkText.length > 50) {
- const withHeader =
- chunks.length === 0 && header ? `${header}\n\n${chunkText}` : chunkText
- chunks.push(withHeader)
- }
- currentChunk = []
- }
- }
- }
-
- // Handle remaining content
- if (inTable && tableLines.length > 0) {
- const tableText = tableLines
- .filter((l) => l.trim())
- .join('\n')
- .trim()
- if (tableText.length > 0) {
- const withHeader = chunks.length === 0 && header ? `${header}\n\n${tableText}` : tableText
- chunks.push(withHeader)
- }
- } else if (currentChunk.length > 0) {
- const chunkText = currentChunk.join('\n').trim()
- if (chunkText.length > 50) {
- const withHeader = chunks.length === 0 && header ? `${header}\n\n${chunkText}` : chunkText
- chunks.push(withHeader)
- }
- }
-
- return chunks.filter((chunk) => chunk.trim().length > 50)
- }
-
/**
* Detect table boundaries in markdown content to avoid splitting them
*/
diff --git a/apps/sim/lib/chunkers/index.ts b/apps/sim/lib/chunkers/index.ts
new file mode 100644
index 000000000..403e75a20
--- /dev/null
+++ b/apps/sim/lib/chunkers/index.ts
@@ -0,0 +1,5 @@
+export { DocsChunker } from './docs-chunker'
+export { JsonYamlChunker } from './json-yaml-chunker'
+export { StructuredDataChunker } from './structured-data-chunker'
+export { TextChunker } from './text-chunker'
+export * from './types'
diff --git a/apps/sim/lib/chunkers/json-yaml-chunker.ts b/apps/sim/lib/chunkers/json-yaml-chunker.ts
new file mode 100644
index 000000000..6922a6db4
--- /dev/null
+++ b/apps/sim/lib/chunkers/json-yaml-chunker.ts
@@ -0,0 +1,317 @@
+import { estimateTokenCount } from '@/lib/tokenization/estimators'
+import type { Chunk, ChunkerOptions } from './types'
+
+function getTokenCount(text: string): number {
+ const estimate = estimateTokenCount(text)
+ return estimate.count
+}
+
+/**
+ * Configuration for JSON/YAML chunking
+ */
+const JSON_YAML_CHUNKING_CONFIG = {
+ TARGET_CHUNK_SIZE: 2000, // Target tokens per chunk
+ MIN_CHUNK_SIZE: 100, // Minimum tokens per chunk
+ MAX_CHUNK_SIZE: 3000, // Maximum tokens per chunk
+ MAX_DEPTH_FOR_SPLITTING: 5, // Maximum depth to traverse for splitting
+}
+
+export class JsonYamlChunker {
+ private chunkSize: number
+ private minChunkSize: number
+
+ constructor(options: ChunkerOptions = {}) {
+ this.chunkSize = options.chunkSize || JSON_YAML_CHUNKING_CONFIG.TARGET_CHUNK_SIZE
+ this.minChunkSize = options.minChunkSize || JSON_YAML_CHUNKING_CONFIG.MIN_CHUNK_SIZE
+ }
+
+ /**
+ * Check if content is structured JSON/YAML data
+ */
+ static isStructuredData(content: string): boolean {
+ try {
+ JSON.parse(content)
+ return true
+ } catch {
+ try {
+ const yaml = require('js-yaml')
+ yaml.load(content)
+ return true
+ } catch {
+ return false
+ }
+ }
+ }
+
+ /**
+ * Chunk JSON/YAML content intelligently based on structure
+ */
+ async chunk(content: string): Promise {
+ try {
+ const data = JSON.parse(content)
+ return this.chunkStructuredData(data)
+ } catch (error) {
+ return this.chunkAsText(content)
+ }
+ }
+
+ /**
+ * Chunk structured data based on its structure
+ */
+ private chunkStructuredData(data: any, path: string[] = []): Chunk[] {
+ const chunks: Chunk[] = []
+
+ if (Array.isArray(data)) {
+ return this.chunkArray(data, path)
+ }
+
+ if (typeof data === 'object' && data !== null) {
+ return this.chunkObject(data, path)
+ }
+
+ const content = JSON.stringify(data, null, 2)
+ const tokenCount = getTokenCount(content)
+
+ if (tokenCount >= this.minChunkSize) {
+ chunks.push({
+ text: content,
+ tokenCount,
+ metadata: {
+ startIndex: 0,
+ endIndex: content.length,
+ },
+ })
+ }
+
+ return chunks
+ }
+
+ /**
+ * Chunk an array intelligently
+ */
+ private chunkArray(arr: any[], path: string[]): Chunk[] {
+ const chunks: Chunk[] = []
+ let currentBatch: any[] = []
+ let currentTokens = 0
+
+ const contextHeader = path.length > 0 ? `// ${path.join('.')}\n` : ''
+
+ for (let i = 0; i < arr.length; i++) {
+ const item = arr[i]
+ const itemStr = JSON.stringify(item, null, 2)
+ const itemTokens = getTokenCount(itemStr)
+
+ if (itemTokens > this.chunkSize) {
+ // Save current batch if it has items
+ if (currentBatch.length > 0) {
+ const batchContent = contextHeader + JSON.stringify(currentBatch, null, 2)
+ chunks.push({
+ text: batchContent,
+ tokenCount: getTokenCount(batchContent),
+ metadata: {
+ startIndex: i - currentBatch.length,
+ endIndex: i - 1,
+ },
+ })
+ currentBatch = []
+ currentTokens = 0
+ }
+
+ if (typeof item === 'object' && item !== null) {
+ const subChunks = this.chunkStructuredData(item, [...path, `[${i}]`])
+ chunks.push(...subChunks)
+ } else {
+ chunks.push({
+ text: contextHeader + itemStr,
+ tokenCount: itemTokens,
+ metadata: {
+ startIndex: i,
+ endIndex: i,
+ },
+ })
+ }
+ } else if (currentTokens + itemTokens > this.chunkSize && currentBatch.length > 0) {
+ const batchContent = contextHeader + JSON.stringify(currentBatch, null, 2)
+ chunks.push({
+ text: batchContent,
+ tokenCount: currentTokens,
+ metadata: {
+ startIndex: i - currentBatch.length,
+ endIndex: i - 1,
+ },
+ })
+ currentBatch = [item]
+ currentTokens = itemTokens
+ } else {
+ currentBatch.push(item)
+ currentTokens += itemTokens
+ }
+ }
+
+ if (currentBatch.length > 0) {
+ const batchContent = contextHeader + JSON.stringify(currentBatch, null, 2)
+ chunks.push({
+ text: batchContent,
+ tokenCount: currentTokens,
+ metadata: {
+ startIndex: arr.length - currentBatch.length,
+ endIndex: arr.length - 1,
+ },
+ })
+ }
+
+ return chunks
+ }
+
+ /**
+ * Chunk an object intelligently
+ */
+ private chunkObject(obj: Record, path: string[]): Chunk[] {
+ const chunks: Chunk[] = []
+ const entries = Object.entries(obj)
+
+ const fullContent = JSON.stringify(obj, null, 2)
+ const fullTokens = getTokenCount(fullContent)
+
+ if (fullTokens <= this.chunkSize) {
+ chunks.push({
+ text: fullContent,
+ tokenCount: fullTokens,
+ metadata: {
+ startIndex: 0,
+ endIndex: fullContent.length,
+ },
+ })
+ return chunks
+ }
+
+ let currentObj: Record = {}
+ let currentTokens = 0
+ let currentKeys: string[] = []
+
+ for (const [key, value] of entries) {
+ const valueStr = JSON.stringify({ [key]: value }, null, 2)
+ const valueTokens = getTokenCount(valueStr)
+
+ if (valueTokens > this.chunkSize) {
+ // Save current object if it has properties
+ if (Object.keys(currentObj).length > 0) {
+ const objContent = JSON.stringify(currentObj, null, 2)
+ chunks.push({
+ text: objContent,
+ tokenCount: currentTokens,
+ metadata: {
+ startIndex: 0,
+ endIndex: objContent.length,
+ },
+ })
+ currentObj = {}
+ currentTokens = 0
+ currentKeys = []
+ }
+
+ if (typeof value === 'object' && value !== null) {
+ const subChunks = this.chunkStructuredData(value, [...path, key])
+ chunks.push(...subChunks)
+ } else {
+ chunks.push({
+ text: valueStr,
+ tokenCount: valueTokens,
+ metadata: {
+ startIndex: 0,
+ endIndex: valueStr.length,
+ },
+ })
+ }
+ } else if (
+ currentTokens + valueTokens > this.chunkSize &&
+ Object.keys(currentObj).length > 0
+ ) {
+ const objContent = JSON.stringify(currentObj, null, 2)
+ chunks.push({
+ text: objContent,
+ tokenCount: currentTokens,
+ metadata: {
+ startIndex: 0,
+ endIndex: objContent.length,
+ },
+ })
+ currentObj = { [key]: value }
+ currentTokens = valueTokens
+ currentKeys = [key]
+ } else {
+ currentObj[key] = value
+ currentTokens += valueTokens
+ currentKeys.push(key)
+ }
+ }
+
+ if (Object.keys(currentObj).length > 0) {
+ const objContent = JSON.stringify(currentObj, null, 2)
+ chunks.push({
+ text: objContent,
+ tokenCount: currentTokens,
+ metadata: {
+ startIndex: 0,
+ endIndex: objContent.length,
+ },
+ })
+ }
+
+ return chunks
+ }
+
+ /**
+ * Fall back to text chunking if JSON parsing fails.
+ */
+ private async chunkAsText(content: string): Promise {
+ const chunks: Chunk[] = []
+ const lines = content.split('\n')
+ let currentChunk = ''
+ let currentTokens = 0
+ let startIndex = 0
+
+ for (const line of lines) {
+ const lineTokens = getTokenCount(line)
+
+ if (currentTokens + lineTokens > this.chunkSize && currentChunk) {
+ chunks.push({
+ text: currentChunk,
+ tokenCount: currentTokens,
+ metadata: {
+ startIndex,
+ endIndex: startIndex + currentChunk.length,
+ },
+ })
+
+ startIndex += currentChunk.length + 1
+ currentChunk = line
+ currentTokens = lineTokens
+ } else {
+ currentChunk = currentChunk ? `${currentChunk}\n${line}` : line
+ currentTokens += lineTokens
+ }
+ }
+
+ if (currentChunk && currentTokens >= this.minChunkSize) {
+ chunks.push({
+ text: currentChunk,
+ tokenCount: currentTokens,
+ metadata: {
+ startIndex,
+ endIndex: startIndex + currentChunk.length,
+ },
+ })
+ }
+
+ return chunks
+ }
+
+ /**
+ * Static method for chunking JSON/YAML data with default options.
+ */
+ static async chunkJsonYaml(content: string, options: ChunkerOptions = {}): Promise {
+ const chunker = new JsonYamlChunker(options)
+ return chunker.chunk(content)
+ }
+}
diff --git a/apps/sim/lib/chunkers/structured-data-chunker.ts b/apps/sim/lib/chunkers/structured-data-chunker.ts
new file mode 100644
index 000000000..45bf842b3
--- /dev/null
+++ b/apps/sim/lib/chunkers/structured-data-chunker.ts
@@ -0,0 +1,220 @@
+import type { Chunk, StructuredDataOptions } from './types'
+
+// Configuration for structured data chunking (CSV, XLSX, etc.)
+const STRUCTURED_CHUNKING_CONFIG = {
+ // Target 2000-3000 tokens per chunk for better semantic meaning
+ TARGET_CHUNK_SIZE: 2500,
+ MIN_CHUNK_SIZE: 500,
+ MAX_CHUNK_SIZE: 4000,
+
+ // For spreadsheets, group rows together
+ ROWS_PER_CHUNK: 100, // Start with 100 rows per chunk
+ MIN_ROWS_PER_CHUNK: 20,
+ MAX_ROWS_PER_CHUNK: 500,
+
+ // For better embeddings quality
+ INCLUDE_HEADERS_IN_EACH_CHUNK: true,
+ MAX_HEADER_SIZE: 200, // tokens
+}
+
+/**
+ * Smart chunker for structured data (CSV, XLSX) that preserves semantic meaning
+ */
+export class StructuredDataChunker {
+ /**
+ * Chunk structured data intelligently based on rows and semantic boundaries
+ */
+ static async chunkStructuredData(
+ content: string,
+ options: StructuredDataOptions = {}
+ ): Promise {
+ const chunks: Chunk[] = []
+ const lines = content.split('\n').filter((line) => line.trim())
+
+ if (lines.length === 0) {
+ return chunks
+ }
+
+ // Detect headers (first line or provided)
+ const headerLine = options.headers?.join('\t') || lines[0]
+ const dataStartIndex = options.headers ? 0 : 1
+
+ // Calculate optimal rows per chunk based on content
+ const estimatedTokensPerRow = StructuredDataChunker.estimateTokensPerRow(
+ lines.slice(dataStartIndex, Math.min(10, lines.length))
+ )
+ const optimalRowsPerChunk =
+ StructuredDataChunker.calculateOptimalRowsPerChunk(estimatedTokensPerRow)
+
+ console.log(
+ `Structured data chunking: ${lines.length} rows, ~${estimatedTokensPerRow} tokens/row, ${optimalRowsPerChunk} rows/chunk`
+ )
+
+ let currentChunkRows: string[] = []
+ let currentTokenEstimate = 0
+ const headerTokens = StructuredDataChunker.estimateTokens(headerLine)
+ let chunkStartRow = dataStartIndex
+
+ for (let i = dataStartIndex; i < lines.length; i++) {
+ const row = lines[i]
+ const rowTokens = StructuredDataChunker.estimateTokens(row)
+
+ // Check if adding this row would exceed our target
+ const projectedTokens =
+ currentTokenEstimate +
+ rowTokens +
+ (STRUCTURED_CHUNKING_CONFIG.INCLUDE_HEADERS_IN_EACH_CHUNK ? headerTokens : 0)
+
+ const shouldCreateChunk =
+ (projectedTokens > STRUCTURED_CHUNKING_CONFIG.TARGET_CHUNK_SIZE &&
+ currentChunkRows.length >= STRUCTURED_CHUNKING_CONFIG.MIN_ROWS_PER_CHUNK) ||
+ currentChunkRows.length >= optimalRowsPerChunk
+
+ if (shouldCreateChunk && currentChunkRows.length > 0) {
+ // Create chunk with current rows
+ const chunkContent = StructuredDataChunker.formatChunk(
+ headerLine,
+ currentChunkRows,
+ options.sheetName
+ )
+ chunks.push(StructuredDataChunker.createChunk(chunkContent, chunkStartRow, i - 1))
+
+ // Reset for next chunk
+ currentChunkRows = []
+ currentTokenEstimate = 0
+ chunkStartRow = i
+ }
+
+ currentChunkRows.push(row)
+ currentTokenEstimate += rowTokens
+ }
+
+ // Add remaining rows as final chunk
+ if (currentChunkRows.length > 0) {
+ const chunkContent = StructuredDataChunker.formatChunk(
+ headerLine,
+ currentChunkRows,
+ options.sheetName
+ )
+ chunks.push(StructuredDataChunker.createChunk(chunkContent, chunkStartRow, lines.length - 1))
+ }
+
+ console.log(`Created ${chunks.length} chunks from ${lines.length} rows of structured data`)
+
+ return chunks
+ }
+
+ /**
+ * Format a chunk with headers and context
+ */
+ private static formatChunk(headerLine: string, rows: string[], sheetName?: string): string {
+ let content = ''
+
+ // Add sheet name context if available
+ if (sheetName) {
+ content += `=== ${sheetName} ===\n\n`
+ }
+
+ // Add headers for context
+ if (STRUCTURED_CHUNKING_CONFIG.INCLUDE_HEADERS_IN_EACH_CHUNK) {
+ content += `Headers: ${headerLine}\n`
+ content += `${'-'.repeat(Math.min(80, headerLine.length))}\n`
+ }
+
+ // Add data rows
+ content += rows.join('\n')
+
+ // Add row count for context
+ content += `\n\n[Rows ${rows.length} of data]`
+
+ return content
+ }
+
+ /**
+ * Create a chunk object with actual row indices
+ */
+ private static createChunk(content: string, startRow: number, endRow: number): Chunk {
+ const tokenCount = StructuredDataChunker.estimateTokens(content)
+
+ return {
+ text: content,
+ tokenCount,
+ metadata: {
+ startIndex: startRow,
+ endIndex: endRow,
+ },
+ }
+ }
+
+ /**
+ * Estimate tokens in text (rough approximation)
+ */
+ private static estimateTokens(text: string): number {
+ // Rough estimate: 1 token per 4 characters for English text
+ // For structured data with numbers, it's closer to 1 token per 3 characters
+ return Math.ceil(text.length / 3)
+ }
+
+ /**
+ * Estimate average tokens per row from sample
+ */
+ private static estimateTokensPerRow(sampleRows: string[]): number {
+ if (sampleRows.length === 0) return 50 // default estimate
+
+ const totalTokens = sampleRows.reduce(
+ (sum, row) => sum + StructuredDataChunker.estimateTokens(row),
+ 0
+ )
+ return Math.ceil(totalTokens / sampleRows.length)
+ }
+
+ /**
+ * Calculate optimal rows per chunk based on token estimates
+ */
+ private static calculateOptimalRowsPerChunk(tokensPerRow: number): number {
+ const optimal = Math.floor(STRUCTURED_CHUNKING_CONFIG.TARGET_CHUNK_SIZE / tokensPerRow)
+
+ return Math.min(
+ Math.max(optimal, STRUCTURED_CHUNKING_CONFIG.MIN_ROWS_PER_CHUNK),
+ STRUCTURED_CHUNKING_CONFIG.MAX_ROWS_PER_CHUNK
+ )
+ }
+
+ /**
+ * Check if content appears to be structured data
+ */
+ static isStructuredData(content: string, mimeType?: string): boolean {
+ // Check mime type first
+ if (mimeType) {
+ const structuredMimeTypes = [
+ 'text/csv',
+ 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
+ 'application/vnd.ms-excel',
+ 'text/tab-separated-values',
+ ]
+ if (structuredMimeTypes.includes(mimeType)) {
+ return true
+ }
+ }
+
+ // Check content structure
+ const lines = content.split('\n').slice(0, 10) // Check first 10 lines
+ if (lines.length < 2) return false
+
+ // Check for consistent delimiters (comma, tab, pipe)
+ const delimiters = [',', '\t', '|']
+ for (const delimiter of delimiters) {
+ const counts = lines.map(
+ (line) => (line.match(new RegExp(`\\${delimiter}`, 'g')) || []).length
+ )
+ const avgCount = counts.reduce((a, b) => a + b, 0) / counts.length
+
+ // If most lines have similar delimiter counts, it's likely structured
+ if (avgCount > 2 && counts.every((c) => Math.abs(c - avgCount) <= 2)) {
+ return true
+ }
+ }
+
+ return false
+ }
+}
diff --git a/apps/sim/lib/knowledge/documents/chunker.ts b/apps/sim/lib/chunkers/text-chunker.ts
similarity index 94%
rename from apps/sim/lib/knowledge/documents/chunker.ts
rename to apps/sim/lib/chunkers/text-chunker.ts
index 5b8c7e482..0a13fb0bd 100644
--- a/apps/sim/lib/knowledge/documents/chunker.ts
+++ b/apps/sim/lib/chunkers/text-chunker.ts
@@ -1,28 +1,4 @@
-export interface ChunkMetadata {
- startIndex: number
- endIndex: number
- tokenCount: number
-}
-
-export interface TextChunk {
- text: string
- metadata: ChunkMetadata
-}
-
-export interface ChunkerOptions {
- chunkSize?: number
- minChunkSize?: number
- overlap?: number
-}
-
-export interface Chunk {
- text: string
- tokenCount: number
- metadata: {
- startIndex: number
- endIndex: number
- }
-}
+import type { Chunk, ChunkerOptions } from './types'
/**
* Lightweight text chunker optimized for RAG applications
diff --git a/apps/sim/lib/chunkers/types.ts b/apps/sim/lib/chunkers/types.ts
new file mode 100644
index 000000000..f8a870615
--- /dev/null
+++ b/apps/sim/lib/chunkers/types.ts
@@ -0,0 +1,53 @@
+export interface ChunkMetadata {
+ startIndex: number
+ endIndex: number
+ tokenCount: number
+}
+
+export interface TextChunk {
+ text: string
+ metadata: ChunkMetadata
+}
+
+export interface ChunkerOptions {
+ chunkSize?: number
+ minChunkSize?: number
+ overlap?: number
+}
+
+export interface Chunk {
+ text: string
+ tokenCount: number
+ metadata: {
+ startIndex: number
+ endIndex: number
+ }
+}
+
+export interface StructuredDataOptions {
+ headers?: string[]
+ totalRows?: number
+ sheetName?: string
+}
+
+export interface DocChunk {
+ text: string
+ tokenCount: number
+ sourceDocument: string
+ headerLink: string
+ headerText: string
+ headerLevel: number
+ embedding: number[]
+ embeddingModel: string
+ metadata: {
+ sourceUrl?: string
+ headers?: string[]
+ title?: string
+ startIndex: number
+ endIndex: number
+ }
+}
+
+export interface DocsChunkerOptions extends ChunkerOptions {
+ baseUrl?: string
+}
diff --git a/apps/sim/lib/embeddings/utils.ts b/apps/sim/lib/embeddings/utils.ts
index de1157fe8..72cb760af 100644
--- a/apps/sim/lib/embeddings/utils.ts
+++ b/apps/sim/lib/embeddings/utils.ts
@@ -114,7 +114,8 @@ export async function generateEmbeddings(
logger.info(`Using ${config.useAzure ? 'Azure OpenAI' : 'OpenAI'} for embeddings generation`)
- const batchSize = 100
+ // Reduced batch size to prevent API timeouts and improve reliability
+ const batchSize = 50 // Reduced from 100 to prevent issues with large documents
const allEmbeddings: number[][] = []
for (let i = 0; i < texts.length; i += batchSize) {
@@ -125,6 +126,11 @@ export async function generateEmbeddings(
logger.info(
`Generated embeddings for batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(texts.length / batchSize)}`
)
+
+ // Add small delay between batches to avoid rate limiting
+ if (i + batchSize < texts.length) {
+ await new Promise((resolve) => setTimeout(resolve, 100))
+ }
}
return allEmbeddings
diff --git a/apps/sim/lib/env.ts b/apps/sim/lib/env.ts
index e3b5c05b1..fbf065023 100644
--- a/apps/sim/lib/env.ts
+++ b/apps/sim/lib/env.ts
@@ -17,8 +17,6 @@ export const env = createEnv({
server: {
// Core Database & Authentication
DATABASE_URL: z.string().url(), // Primary database connection string
- DATABASE_SSL: z.enum(['disable', 'prefer', 'require', 'verify-ca', 'verify-full']).optional(), // PostgreSQL SSL mode
- DATABASE_SSL_CA: z.string().optional(), // Base64-encoded CA certificate for SSL verification
BETTER_AUTH_URL: z.string().url(), // Base URL for Better Auth service
BETTER_AUTH_SECRET: z.string().min(32), // Secret key for Better Auth JWT signing
DISABLE_REGISTRATION: z.boolean().optional(), // Flag to disable new user registration
@@ -36,7 +34,6 @@ export const env = createEnv({
AGENT_INDEXER_URL: z.string().url().optional(), // URL for agent training data indexer
AGENT_INDEXER_API_KEY: z.string().min(1).optional(), // API key for agent indexer authentication
-
// Database & Storage
REDIS_URL: z.string().url().optional(), // Redis connection string for caching/sessions
diff --git a/apps/sim/lib/file-parsers/csv-parser.ts b/apps/sim/lib/file-parsers/csv-parser.ts
index 38f6d65ed..edbb9fe7b 100644
--- a/apps/sim/lib/file-parsers/csv-parser.ts
+++ b/apps/sim/lib/file-parsers/csv-parser.ts
@@ -1,108 +1,154 @@
-import { existsSync, readFileSync } from 'fs'
-import * as Papa from 'papaparse'
+import { createReadStream, existsSync } from 'fs'
+import { Readable } from 'stream'
+import { type Options, parse } from 'csv-parse'
import type { FileParseResult, FileParser } from '@/lib/file-parsers/types'
import { sanitizeTextForUTF8 } from '@/lib/file-parsers/utils'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('CsvParser')
-const PARSE_OPTIONS = {
- header: true,
- skipEmptyLines: true,
- transformHeader: (header: string) => sanitizeTextForUTF8(String(header)),
- transform: (value: string) => sanitizeTextForUTF8(String(value || '')),
+const CONFIG = {
+ MAX_PREVIEW_ROWS: 1000, // Only keep first 1000 rows for preview
+ MAX_SAMPLE_ROWS: 100, // Sample for metadata
+ MAX_ERRORS: 100, // Stop after 100 errors
+ STREAM_CHUNK_SIZE: 16384, // 16KB chunks for streaming
}
export class CsvParser implements FileParser {
async parseFile(filePath: string): Promise {
- try {
- if (!filePath) {
- throw new Error('No file path provided')
- }
-
- if (!existsSync(filePath)) {
- throw new Error(`File not found: ${filePath}`)
- }
-
- const fileContent = readFileSync(filePath, 'utf8')
-
- const parseResult = Papa.parse(fileContent, PARSE_OPTIONS)
-
- if (parseResult.errors && parseResult.errors.length > 0) {
- const errorMessages = parseResult.errors.map((err) => err.message).join(', ')
- logger.error('CSV parsing errors:', parseResult.errors)
- throw new Error(`Failed to parse CSV file: ${errorMessages}`)
- }
-
- const results = parseResult.data as Record[]
- const headers = parseResult.meta.fields || []
-
- let content = ''
-
- if (headers.length > 0) {
- const cleanHeaders = headers.map((h) => sanitizeTextForUTF8(String(h)))
- content += `${cleanHeaders.join(', ')}\n`
- }
-
- results.forEach((row) => {
- const cleanValues = Object.values(row).map((v) => sanitizeTextForUTF8(String(v || '')))
- content += `${cleanValues.join(', ')}\n`
- })
-
- return {
- content: sanitizeTextForUTF8(content),
- metadata: {
- rowCount: results.length,
- headers: headers,
- rawData: results,
- },
- }
- } catch (error) {
- logger.error('CSV general error:', error)
- throw new Error(`Failed to process CSV file: ${(error as Error).message}`)
+ if (!filePath) {
+ throw new Error('No file path provided')
}
+
+ if (!existsSync(filePath)) {
+ throw new Error(`File not found: ${filePath}`)
+ }
+
+ const stream = createReadStream(filePath, {
+ highWaterMark: CONFIG.STREAM_CHUNK_SIZE,
+ })
+
+ return this.parseStream(stream)
}
async parseBuffer(buffer: Buffer): Promise {
- try {
- logger.info('Parsing buffer, size:', buffer.length)
+ const bufferSize = buffer.length
+ logger.info(
+ `Parsing CSV buffer, size: ${bufferSize} bytes (${(bufferSize / 1024 / 1024).toFixed(2)} MB)`
+ )
- const fileContent = buffer.toString('utf8')
+ const stream = Readable.from(buffer, {
+ highWaterMark: CONFIG.STREAM_CHUNK_SIZE,
+ })
- const parseResult = Papa.parse(fileContent, PARSE_OPTIONS)
+ return this.parseStream(stream)
+ }
- if (parseResult.errors && parseResult.errors.length > 0) {
- const errorMessages = parseResult.errors.map((err) => err.message).join(', ')
- logger.error('CSV parsing errors:', parseResult.errors)
- throw new Error(`Failed to parse CSV buffer: ${errorMessages}`)
+ private parseStream(inputStream: NodeJS.ReadableStream): Promise {
+ return new Promise((resolve, reject) => {
+ let rowCount = 0
+ let errorCount = 0
+ let headers: string[] = []
+ let processedContent = ''
+ const sampledRows: any[] = []
+ const errors: string[] = []
+ let firstRowProcessed = false
+ let aborted = false
+
+ const parserOptions: Options = {
+ columns: true, // Use first row as headers
+ skip_empty_lines: true, // Skip empty lines
+ trim: true, // Trim whitespace
+ relax_column_count: true, // Allow variable column counts
+ relax_quotes: true, // Be lenient with quotes
+ skip_records_with_error: true, // Skip bad records
+ raw: false,
+ cast: false,
}
+ const parser = parse(parserOptions)
- const results = parseResult.data as Record[]
- const headers = parseResult.meta.fields || []
+ parser.on('readable', () => {
+ let record
+ while ((record = parser.read()) !== null && !aborted) {
+ rowCount++
- let content = ''
+ if (!firstRowProcessed && record) {
+ headers = Object.keys(record).map((h) => sanitizeTextForUTF8(String(h)))
+ processedContent = `${headers.join(', ')}\n`
+ firstRowProcessed = true
+ }
- if (headers.length > 0) {
- const cleanHeaders = headers.map((h) => sanitizeTextForUTF8(String(h)))
- content += `${cleanHeaders.join(', ')}\n`
- }
+ if (rowCount <= CONFIG.MAX_PREVIEW_ROWS) {
+ try {
+ const cleanValues = Object.values(record).map((v: any) =>
+ sanitizeTextForUTF8(String(v || ''))
+ )
+ processedContent += `${cleanValues.join(', ')}\n`
- results.forEach((row) => {
- const cleanValues = Object.values(row).map((v) => sanitizeTextForUTF8(String(v || '')))
- content += `${cleanValues.join(', ')}\n`
+ if (rowCount <= CONFIG.MAX_SAMPLE_ROWS) {
+ sampledRows.push(record)
+ }
+ } catch (err) {
+ logger.warn(`Error processing row ${rowCount}:`, err)
+ }
+ }
+
+ if (rowCount % 10000 === 0) {
+ logger.info(`Processed ${rowCount} rows...`)
+ }
+ }
})
- return {
- content: sanitizeTextForUTF8(content),
- metadata: {
- rowCount: results.length,
- headers: headers,
- rawData: results,
- },
- }
- } catch (error) {
- logger.error('CSV buffer parsing error:', error)
- throw new Error(`Failed to process CSV buffer: ${(error as Error).message}`)
- }
+ parser.on('skip', (err: any) => {
+ errorCount++
+
+ if (errorCount <= 5) {
+ const errorMsg = `Row ${err.lines || rowCount}: ${err.message || 'Unknown error'}`
+ errors.push(errorMsg)
+ logger.warn('CSV skip:', errorMsg)
+ }
+
+ if (errorCount >= CONFIG.MAX_ERRORS) {
+ aborted = true
+ parser.destroy()
+ reject(new Error(`Too many errors (${errorCount}). File may be corrupted.`))
+ }
+ })
+
+ parser.on('error', (err: Error) => {
+ logger.error('CSV parser error:', err)
+ reject(new Error(`CSV parsing failed: ${err.message}`))
+ })
+
+ parser.on('end', () => {
+ if (!aborted) {
+ if (rowCount > CONFIG.MAX_PREVIEW_ROWS) {
+ processedContent += `\n[... ${rowCount.toLocaleString()} total rows, showing first ${CONFIG.MAX_PREVIEW_ROWS} ...]\n`
+ }
+
+ logger.info(`CSV parsing complete: ${rowCount} rows, ${errorCount} errors`)
+
+ resolve({
+ content: sanitizeTextForUTF8(processedContent),
+ metadata: {
+ rowCount,
+ headers,
+ errorCount,
+ errors: errors.slice(0, 10),
+ truncated: rowCount > CONFIG.MAX_PREVIEW_ROWS,
+ sampledData: sampledRows,
+ },
+ })
+ }
+ })
+
+ inputStream.on('error', (err) => {
+ logger.error('Input stream error:', err)
+ parser.destroy()
+ reject(new Error(`Stream error: ${err.message}`))
+ })
+
+ inputStream.pipe(parser)
+ })
}
}
diff --git a/apps/sim/lib/file-parsers/index.ts b/apps/sim/lib/file-parsers/index.ts
index 0eb3f3f7c..6d009af7c 100644
--- a/apps/sim/lib/file-parsers/index.ts
+++ b/apps/sim/lib/file-parsers/index.ts
@@ -27,8 +27,9 @@ function getParserInstances(): Record {
try {
const { CsvParser } = require('@/lib/file-parsers/csv-parser')
parserInstances.csv = new CsvParser()
+ logger.info('Loaded streaming CSV parser with csv-parse library')
} catch (error) {
- logger.error('Failed to load CSV parser:', error)
+ logger.error('Failed to load streaming CSV parser:', error)
}
try {
@@ -63,6 +64,7 @@ function getParserInstances(): Record {
const { XlsxParser } = require('@/lib/file-parsers/xlsx-parser')
parserInstances.xlsx = new XlsxParser()
parserInstances.xls = new XlsxParser()
+ logger.info('Loaded XLSX parser')
} catch (error) {
logger.error('Failed to load XLSX parser:', error)
}
@@ -82,6 +84,32 @@ function getParserInstances(): Record {
} catch (error) {
logger.error('Failed to load HTML parser:', error)
}
+
+ try {
+ const { parseJSON, parseJSONBuffer } = require('@/lib/file-parsers/json-parser')
+ parserInstances.json = {
+ parseFile: parseJSON,
+ parseBuffer: parseJSONBuffer,
+ }
+ logger.info('Loaded JSON parser')
+ } catch (error) {
+ logger.error('Failed to load JSON parser:', error)
+ }
+
+ try {
+ const { parseYAML, parseYAMLBuffer } = require('@/lib/file-parsers/yaml-parser')
+ parserInstances.yaml = {
+ parseFile: parseYAML,
+ parseBuffer: parseYAMLBuffer,
+ }
+ parserInstances.yml = {
+ parseFile: parseYAML,
+ parseBuffer: parseYAMLBuffer,
+ }
+ logger.info('Loaded YAML parser')
+ } catch (error) {
+ logger.error('Failed to load YAML parser:', error)
+ }
} catch (error) {
logger.error('Error loading file parsers:', error)
}
diff --git a/apps/sim/lib/file-parsers/json-parser.ts b/apps/sim/lib/file-parsers/json-parser.ts
new file mode 100644
index 000000000..bac191ccf
--- /dev/null
+++ b/apps/sim/lib/file-parsers/json-parser.ts
@@ -0,0 +1,74 @@
+import type { FileParseResult } from './types'
+
+/**
+ * Parse JSON files
+ */
+export async function parseJSON(filePath: string): Promise {
+ const fs = await import('fs/promises')
+ const content = await fs.readFile(filePath, 'utf-8')
+
+ try {
+ // Parse to validate JSON
+ const jsonData = JSON.parse(content)
+
+ // Return pretty-printed JSON for better readability
+ const formattedContent = JSON.stringify(jsonData, null, 2)
+
+ // Extract metadata about the JSON structure
+ const metadata = {
+ type: 'json',
+ isArray: Array.isArray(jsonData),
+ keys: Array.isArray(jsonData) ? [] : Object.keys(jsonData),
+ itemCount: Array.isArray(jsonData) ? jsonData.length : undefined,
+ depth: getJsonDepth(jsonData),
+ }
+
+ return {
+ content: formattedContent,
+ metadata,
+ }
+ } catch (error) {
+ throw new Error(`Invalid JSON: ${error instanceof Error ? error.message : 'Unknown error'}`)
+ }
+}
+
+/**
+ * Parse JSON from buffer
+ */
+export async function parseJSONBuffer(buffer: Buffer): Promise {
+ const content = buffer.toString('utf-8')
+
+ try {
+ const jsonData = JSON.parse(content)
+ const formattedContent = JSON.stringify(jsonData, null, 2)
+
+ const metadata = {
+ type: 'json',
+ isArray: Array.isArray(jsonData),
+ keys: Array.isArray(jsonData) ? [] : Object.keys(jsonData),
+ itemCount: Array.isArray(jsonData) ? jsonData.length : undefined,
+ depth: getJsonDepth(jsonData),
+ }
+
+ return {
+ content: formattedContent,
+ metadata,
+ }
+ } catch (error) {
+ throw new Error(`Invalid JSON: ${error instanceof Error ? error.message : 'Unknown error'}`)
+ }
+}
+
+/**
+ * Calculate the depth of a JSON object
+ */
+function getJsonDepth(obj: any): number {
+ if (obj === null || typeof obj !== 'object') return 0
+
+ if (Array.isArray(obj)) {
+ return obj.length > 0 ? 1 + Math.max(...obj.map(getJsonDepth)) : 1
+ }
+
+ const depths = Object.values(obj).map(getJsonDepth)
+ return depths.length > 0 ? 1 + Math.max(...depths) : 1
+}
diff --git a/apps/sim/lib/file-parsers/xlsx-parser.ts b/apps/sim/lib/file-parsers/xlsx-parser.ts
index 99e4e9f75..9cacacec9 100644
--- a/apps/sim/lib/file-parsers/xlsx-parser.ts
+++ b/apps/sim/lib/file-parsers/xlsx-parser.ts
@@ -6,6 +6,15 @@ import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('XlsxParser')
+// Configuration for handling large XLSX files
+const CONFIG = {
+ MAX_PREVIEW_ROWS: 1000, // Only keep first 1000 rows for preview
+ MAX_SAMPLE_ROWS: 100, // Sample for metadata
+ ROWS_PER_CHUNK: 50, // Aggregate 50 rows per chunk to reduce chunk count
+ MAX_CELL_LENGTH: 1000, // Truncate very long cell values
+ MAX_CONTENT_SIZE: 10 * 1024 * 1024, // 10MB max content size
+}
+
export class XlsxParser implements FileParser {
async parseFile(filePath: string): Promise {
try {
@@ -19,7 +28,12 @@ export class XlsxParser implements FileParser {
logger.info(`Parsing XLSX file: ${filePath}`)
- const workbook = XLSX.readFile(filePath)
+ // Read with streaming option for large files
+ const workbook = XLSX.readFile(filePath, {
+ dense: true, // Use dense mode for better memory efficiency
+ sheetStubs: false, // Don't create stub cells
+ })
+
return this.processWorkbook(workbook)
} catch (error) {
logger.error('XLSX file parsing error:', error)
@@ -29,13 +43,21 @@ export class XlsxParser implements FileParser {
async parseBuffer(buffer: Buffer): Promise {
try {
- logger.info('Parsing XLSX buffer, size:', buffer.length)
+ const bufferSize = buffer.length
+ logger.info(
+ `Parsing XLSX buffer, size: ${bufferSize} bytes (${(bufferSize / 1024 / 1024).toFixed(2)} MB)`
+ )
if (!buffer || buffer.length === 0) {
throw new Error('Empty buffer provided')
}
- const workbook = XLSX.read(buffer, { type: 'buffer' })
+ const workbook = XLSX.read(buffer, {
+ type: 'buffer',
+ dense: true, // Use dense mode for better memory efficiency
+ sheetStubs: false, // Don't create stub cells
+ })
+
return this.processWorkbook(workbook)
} catch (error) {
logger.error('XLSX buffer parsing error:', error)
@@ -45,44 +67,111 @@ export class XlsxParser implements FileParser {
private processWorkbook(workbook: XLSX.WorkBook): FileParseResult {
const sheetNames = workbook.SheetNames
- const sheets: Record = {}
let content = ''
let totalRows = 0
+ let truncated = false
+ let contentSize = 0
+ const sampledData: any[] = []
for (const sheetName of sheetNames) {
const worksheet = workbook.Sheets[sheetName]
- const sheetData = XLSX.utils.sheet_to_json(worksheet, { header: 1 })
- sheets[sheetName] = sheetData
- totalRows += sheetData.length
+ // Get sheet dimensions
+ const range = XLSX.utils.decode_range(worksheet['!ref'] || 'A1')
+ const rowCount = range.e.r - range.s.r + 1
+ logger.info(`Processing sheet: ${sheetName} with ${rowCount} rows`)
+
+ // Convert to JSON with header row
+ const sheetData = XLSX.utils.sheet_to_json(worksheet, {
+ header: 1,
+ defval: '', // Default value for empty cells
+ blankrows: false, // Skip blank rows
+ })
+
+ const actualRowCount = sheetData.length
+ totalRows += actualRowCount
+
+ // Store limited sample for metadata
+ if (sampledData.length < CONFIG.MAX_SAMPLE_ROWS) {
+ const sampleSize = Math.min(CONFIG.MAX_SAMPLE_ROWS - sampledData.length, actualRowCount)
+ sampledData.push(...sheetData.slice(0, sampleSize))
+ }
+
+ // Only process limited rows for preview
+ const rowsToProcess = Math.min(actualRowCount, CONFIG.MAX_PREVIEW_ROWS)
const cleanSheetName = sanitizeTextForUTF8(sheetName)
- content += `Sheet: ${cleanSheetName}\n`
- content += `=${'='.repeat(cleanSheetName.length + 6)}\n\n`
- if (sheetData.length > 0) {
- sheetData.forEach((row: unknown, rowIndex: number) => {
- if (Array.isArray(row) && row.length > 0) {
- const rowString = row
- .map((cell) => {
- if (cell === null || cell === undefined) {
- return ''
- }
- return sanitizeTextForUTF8(String(cell))
- })
- .join('\t')
+ // Add sheet header
+ const sheetHeader = `\n=== Sheet: ${cleanSheetName} ===\n`
+ content += sheetHeader
+ contentSize += sheetHeader.length
- content += `${rowString}\n`
+ if (actualRowCount > 0) {
+ // Get headers if available
+ const headers = sheetData[0] as any[]
+ if (headers && headers.length > 0) {
+ const headerRow = headers.map((h) => this.truncateCell(h)).join('\t')
+ content += `${headerRow}\n`
+ content += `${'-'.repeat(Math.min(80, headerRow.length))}\n`
+ contentSize += headerRow.length + 82
+ }
+
+ // Process data rows in chunks
+ let chunkContent = ''
+ let chunkRowCount = 0
+
+ for (let i = 1; i < rowsToProcess; i++) {
+ const row = sheetData[i] as any[]
+ if (row && row.length > 0) {
+ const rowString = row.map((cell) => this.truncateCell(cell)).join('\t')
+
+ chunkContent += `${rowString}\n`
+ chunkRowCount++
+
+ // Add chunk separator every N rows for better readability
+ if (chunkRowCount >= CONFIG.ROWS_PER_CHUNK) {
+ content += chunkContent
+ contentSize += chunkContent.length
+ chunkContent = ''
+ chunkRowCount = 0
+
+ // Check content size limit
+ if (contentSize > CONFIG.MAX_CONTENT_SIZE) {
+ truncated = true
+ break
+ }
+ }
}
- })
+ }
+
+ // Add remaining chunk content
+ if (chunkContent && contentSize < CONFIG.MAX_CONTENT_SIZE) {
+ content += chunkContent
+ contentSize += chunkContent.length
+ }
+
+ // Add truncation notice if needed
+ if (actualRowCount > rowsToProcess) {
+ const notice = `\n[... ${actualRowCount.toLocaleString()} total rows, showing first ${rowsToProcess.toLocaleString()} ...]\n`
+ content += notice
+ truncated = true
+ }
} else {
content += '[Empty sheet]\n'
}
- content += '\n'
+ // Stop processing if content is too large
+ if (contentSize > CONFIG.MAX_CONTENT_SIZE) {
+ content += '\n[... Content truncated due to size limits ...]\n'
+ truncated = true
+ break
+ }
}
- logger.info(`XLSX parsing completed: ${sheetNames.length} sheets, ${totalRows} total rows`)
+ logger.info(
+ `XLSX parsing completed: ${sheetNames.length} sheets, ${totalRows} total rows, truncated: ${truncated}`
+ )
const cleanContent = sanitizeTextForUTF8(content).trim()
@@ -92,8 +181,25 @@ export class XlsxParser implements FileParser {
sheetCount: sheetNames.length,
sheetNames: sheetNames,
totalRows: totalRows,
- sheets: sheets,
+ truncated: truncated,
+ sampledData: sampledData.slice(0, CONFIG.MAX_SAMPLE_ROWS),
+ contentSize: contentSize,
},
}
}
+
+ private truncateCell(cell: any): string {
+ if (cell === null || cell === undefined) {
+ return ''
+ }
+
+ let cellStr = String(cell)
+
+ // Truncate very long cells
+ if (cellStr.length > CONFIG.MAX_CELL_LENGTH) {
+ cellStr = `${cellStr.substring(0, CONFIG.MAX_CELL_LENGTH)}...`
+ }
+
+ return sanitizeTextForUTF8(cellStr)
+ }
}
diff --git a/apps/sim/lib/file-parsers/yaml-parser.ts b/apps/sim/lib/file-parsers/yaml-parser.ts
new file mode 100644
index 000000000..a636d0329
--- /dev/null
+++ b/apps/sim/lib/file-parsers/yaml-parser.ts
@@ -0,0 +1,75 @@
+import * as yaml from 'js-yaml'
+import type { FileParseResult } from './types'
+
+/**
+ * Parse YAML files
+ */
+export async function parseYAML(filePath: string): Promise {
+ const fs = await import('fs/promises')
+ const content = await fs.readFile(filePath, 'utf-8')
+
+ try {
+ // Parse YAML to validate and extract structure
+ const yamlData = yaml.load(content)
+
+ // Convert to JSON for consistent processing
+ const jsonContent = JSON.stringify(yamlData, null, 2)
+
+ // Extract metadata about the YAML structure
+ const metadata = {
+ type: 'yaml',
+ isArray: Array.isArray(yamlData),
+ keys: Array.isArray(yamlData) ? [] : Object.keys(yamlData || {}),
+ itemCount: Array.isArray(yamlData) ? yamlData.length : undefined,
+ depth: getYamlDepth(yamlData),
+ }
+
+ return {
+ content: jsonContent,
+ metadata,
+ }
+ } catch (error) {
+ throw new Error(`Invalid YAML: ${error instanceof Error ? error.message : 'Unknown error'}`)
+ }
+}
+
+/**
+ * Parse YAML from buffer
+ */
+export async function parseYAMLBuffer(buffer: Buffer): Promise {
+ const content = buffer.toString('utf-8')
+
+ try {
+ const yamlData = yaml.load(content)
+ const jsonContent = JSON.stringify(yamlData, null, 2)
+
+ const metadata = {
+ type: 'yaml',
+ isArray: Array.isArray(yamlData),
+ keys: Array.isArray(yamlData) ? [] : Object.keys(yamlData || {}),
+ itemCount: Array.isArray(yamlData) ? yamlData.length : undefined,
+ depth: getYamlDepth(yamlData),
+ }
+
+ return {
+ content: jsonContent,
+ metadata,
+ }
+ } catch (error) {
+ throw new Error(`Invalid YAML: ${error instanceof Error ? error.message : 'Unknown error'}`)
+ }
+}
+
+/**
+ * Calculate the depth of a YAML/JSON object
+ */
+function getYamlDepth(obj: any): number {
+ if (obj === null || typeof obj !== 'object') return 0
+
+ if (Array.isArray(obj)) {
+ return obj.length > 0 ? 1 + Math.max(...obj.map(getYamlDepth)) : 1
+ }
+
+ const depths = Object.values(obj).map(getYamlDepth)
+ return depths.length > 0 ? 1 + Math.max(...depths) : 1
+}
diff --git a/apps/sim/lib/knowledge/documents/document-processor.ts b/apps/sim/lib/knowledge/documents/document-processor.ts
index 2fb1f0d3d..5efcb3608 100644
--- a/apps/sim/lib/knowledge/documents/document-processor.ts
+++ b/apps/sim/lib/knowledge/documents/document-processor.ts
@@ -1,6 +1,6 @@
+import { type Chunk, JsonYamlChunker, StructuredDataChunker, TextChunker } from '@/lib/chunkers'
import { env } from '@/lib/env'
import { parseBuffer, parseFile } from '@/lib/file-parsers'
-import { type Chunk, TextChunker } from '@/lib/knowledge/documents/chunker'
import { retryWithExponentialBackoff } from '@/lib/knowledge/documents/utils'
import { createLogger } from '@/lib/logs/console/logger'
import {
@@ -15,8 +15,8 @@ import { mistralParserTool } from '@/tools/mistral/parser'
const logger = createLogger('DocumentProcessor')
const TIMEOUTS = {
- FILE_DOWNLOAD: 60000,
- MISTRAL_OCR_API: 90000,
+ FILE_DOWNLOAD: 180000,
+ MISTRAL_OCR_API: 120000,
} as const
type OCRResult = {
@@ -97,8 +97,32 @@ export async function processDocument(
const { content, processingMethod } = parseResult
const cloudUrl = 'cloudUrl' in parseResult ? parseResult.cloudUrl : undefined
- const chunker = new TextChunker({ chunkSize, overlap: chunkOverlap, minChunkSize })
- const chunks = await chunker.chunk(content)
+ let chunks: Chunk[]
+ const metadata = 'metadata' in parseResult ? parseResult.metadata : {}
+
+ const isJsonYaml =
+ metadata.type === 'json' ||
+ metadata.type === 'yaml' ||
+ mimeType.includes('json') ||
+ mimeType.includes('yaml')
+
+ if (isJsonYaml && JsonYamlChunker.isStructuredData(content)) {
+ logger.info('Using JSON/YAML chunker for structured data')
+ chunks = await JsonYamlChunker.chunkJsonYaml(content, {
+ chunkSize,
+ minChunkSize,
+ })
+ } else if (StructuredDataChunker.isStructuredData(content, mimeType)) {
+ logger.info('Using structured data chunker for spreadsheet/CSV content')
+ chunks = await StructuredDataChunker.chunkStructuredData(content, {
+ headers: metadata.headers,
+ totalRows: metadata.totalRows || metadata.rowCount,
+ sheetName: metadata.sheetNames?.[0],
+ })
+ } else {
+ const chunker = new TextChunker({ chunkSize, overlap: chunkOverlap, minChunkSize })
+ chunks = await chunker.chunk(content)
+ }
const characterCount = content.length
const tokenCount = chunks.reduce((sum, chunk) => sum + chunk.tokenCount, 0)
@@ -132,22 +156,23 @@ async function parseDocument(
content: string
processingMethod: 'file-parser' | 'mistral-ocr'
cloudUrl?: string
+ metadata?: any
}> {
const isPDF = mimeType === 'application/pdf'
const hasAzureMistralOCR =
env.OCR_AZURE_API_KEY && env.OCR_AZURE_ENDPOINT && env.OCR_AZURE_MODEL_NAME
const hasMistralOCR = env.MISTRAL_API_KEY
- // Check Azure Mistral OCR configuration
+ if (isPDF && (hasAzureMistralOCR || hasMistralOCR)) {
+ if (hasAzureMistralOCR) {
+ logger.info(`Using Azure Mistral OCR: ${filename}`)
+ return parseWithAzureMistralOCR(fileUrl, filename, mimeType)
+ }
- if (isPDF && hasAzureMistralOCR) {
- logger.info(`Using Azure Mistral OCR: ${filename}`)
- return parseWithAzureMistralOCR(fileUrl, filename, mimeType)
- }
-
- if (isPDF && hasMistralOCR) {
- logger.info(`Using Mistral OCR: ${filename}`)
- return parseWithMistralOCR(fileUrl, filename, mimeType)
+ if (hasMistralOCR) {
+ logger.info(`Using Mistral OCR: ${filename}`)
+ return parseWithMistralOCR(fileUrl, filename, mimeType)
+ }
}
logger.info(`Using file parser: ${filename}`)
@@ -200,9 +225,7 @@ async function downloadFileWithTimeout(fileUrl: string): Promise {
}
async function downloadFileForBase64(fileUrl: string): Promise {
- // Handle different URL types for Azure Mistral OCR base64 requirement
if (fileUrl.startsWith('data:')) {
- // Extract base64 data from data URI
const [, base64Data] = fileUrl.split(',')
if (!base64Data) {
throw new Error('Invalid data URI format')
@@ -210,10 +233,8 @@ async function downloadFileForBase64(fileUrl: string): Promise {
return Buffer.from(base64Data, 'base64')
}
if (fileUrl.startsWith('http')) {
- // Download from HTTP(S) URL
return downloadFileWithTimeout(fileUrl)
}
- // Local file - read it
const fs = await import('fs/promises')
return fs.readFile(fileUrl)
}
@@ -315,7 +336,6 @@ async function parseWithAzureMistralOCR(fileUrl: string, filename: string, mimeT
'Azure Mistral OCR'
)
- // Azure Mistral OCR accepts data URIs with base64 content
const fileBuffer = await downloadFileForBase64(fileUrl)
const base64Data = fileBuffer.toString('base64')
const dataUri = `data:${mimeType};base64,${base64Data}`
@@ -409,21 +429,25 @@ async function parseWithMistralOCR(fileUrl: string, filename: string, mimeType:
async function parseWithFileParser(fileUrl: string, filename: string, mimeType: string) {
try {
let content: string
+ let metadata: any = {}
if (fileUrl.startsWith('data:')) {
content = await parseDataURI(fileUrl, filename, mimeType)
} else if (fileUrl.startsWith('http')) {
- content = await parseHttpFile(fileUrl, filename)
+ const result = await parseHttpFile(fileUrl, filename)
+ content = result.content
+ metadata = result.metadata || {}
} else {
const result = await parseFile(fileUrl)
content = result.content
+ metadata = result.metadata || {}
}
if (!content.trim()) {
throw new Error('File parser returned empty content')
}
- return { content, processingMethod: 'file-parser' as const, cloudUrl: undefined }
+ return { content, processingMethod: 'file-parser' as const, cloudUrl: undefined, metadata }
} catch (error) {
logger.error(`File parser failed for ${filename}:`, error)
throw error
@@ -448,7 +472,10 @@ async function parseDataURI(fileUrl: string, filename: string, mimeType: string)
return result.content
}
-async function parseHttpFile(fileUrl: string, filename: string): Promise {
+async function parseHttpFile(
+ fileUrl: string,
+ filename: string
+): Promise<{ content: string; metadata?: any }> {
const buffer = await downloadFileWithTimeout(fileUrl)
const extension = filename.split('.').pop()?.toLowerCase()
@@ -457,5 +484,5 @@ async function parseHttpFile(fileUrl: string, filename: string): Promise
}
const result = await parseBuffer(buffer, extension)
- return result.content
+ return result
}
diff --git a/apps/sim/lib/knowledge/documents/service.ts b/apps/sim/lib/knowledge/documents/service.ts
index f78483b04..af21f2b58 100644
--- a/apps/sim/lib/knowledge/documents/service.ts
+++ b/apps/sim/lib/knowledge/documents/service.ts
@@ -17,10 +17,18 @@ import type { DocumentSortField, SortOrder } from './types'
const logger = createLogger('DocumentService')
const TIMEOUTS = {
- OVERALL_PROCESSING: (env.KB_CONFIG_MAX_DURATION || 300) * 1000,
+ OVERALL_PROCESSING: (env.KB_CONFIG_MAX_DURATION || 600) * 1000, // Increased to 10 minutes to match Trigger's timeout
EMBEDDINGS_API: (env.KB_CONFIG_MAX_TIMEOUT || 10000) * 18,
} as const
+// Configuration for handling large documents
+const LARGE_DOC_CONFIG = {
+ MAX_CHUNKS_PER_BATCH: 500, // Insert embeddings in batches of 500
+ MAX_EMBEDDING_BATCH: 50, // Generate embeddings in batches of 50
+ MAX_FILE_SIZE: 100 * 1024 * 1024, // 100MB max file size
+ MAX_CHUNKS_PER_DOCUMENT: 100000, // Maximum chunks allowed per document
+}
+
/**
* Create a timeout wrapper for async operations
*/
@@ -448,14 +456,38 @@ export async function processDocumentAsync(
processingOptions.minCharactersPerChunk || 1
)
+ if (processed.chunks.length > LARGE_DOC_CONFIG.MAX_CHUNKS_PER_DOCUMENT) {
+ throw new Error(
+ `Document has ${processed.chunks.length.toLocaleString()} chunks, exceeding maximum of ${LARGE_DOC_CONFIG.MAX_CHUNKS_PER_DOCUMENT.toLocaleString()}. ` +
+ `This document is unusually large and may need to be split into multiple files or preprocessed to reduce content.`
+ )
+ }
+
const now = new Date()
logger.info(
`[${documentId}] Document parsed successfully, generating embeddings for ${processed.chunks.length} chunks`
)
+ // Generate embeddings in batches for large documents
const chunkTexts = processed.chunks.map((chunk) => chunk.text)
- const embeddings = chunkTexts.length > 0 ? await generateEmbeddings(chunkTexts) : []
+ const embeddings: number[][] = []
+
+ if (chunkTexts.length > 0) {
+ const batchSize = LARGE_DOC_CONFIG.MAX_EMBEDDING_BATCH
+ const totalBatches = Math.ceil(chunkTexts.length / batchSize)
+
+ logger.info(`[${documentId}] Generating embeddings in ${totalBatches} batches`)
+
+ for (let i = 0; i < chunkTexts.length; i += batchSize) {
+ const batch = chunkTexts.slice(i, i + batchSize)
+ const batchNum = Math.floor(i / batchSize) + 1
+
+ logger.info(`[${documentId}] Processing embedding batch ${batchNum}/${totalBatches}`)
+ const batchEmbeddings = await generateEmbeddings(batch)
+ embeddings.push(...batchEmbeddings)
+ }
+ }
logger.info(`[${documentId}] Embeddings generated, fetching document tags`)
@@ -503,8 +535,24 @@ export async function processDocumentAsync(
}))
await db.transaction(async (tx) => {
+ // Insert embeddings in batches for large documents
if (embeddingRecords.length > 0) {
- await tx.insert(embedding).values(embeddingRecords)
+ const batchSize = LARGE_DOC_CONFIG.MAX_CHUNKS_PER_BATCH
+ const totalBatches = Math.ceil(embeddingRecords.length / batchSize)
+
+ logger.info(
+ `[${documentId}] Inserting ${embeddingRecords.length} embeddings in ${totalBatches} batches`
+ )
+
+ for (let i = 0; i < embeddingRecords.length; i += batchSize) {
+ const batch = embeddingRecords.slice(i, i + batchSize)
+ const batchNum = Math.floor(i / batchSize) + 1
+
+ await tx.insert(embedding).values(batch)
+ logger.info(
+ `[${documentId}] Inserted batch ${batchNum}/${totalBatches} (${batch.length} records)`
+ )
+ }
}
await tx
diff --git a/apps/sim/lib/uploads/validation.ts b/apps/sim/lib/uploads/validation.ts
index 220b885cb..88b97ea23 100644
--- a/apps/sim/lib/uploads/validation.ts
+++ b/apps/sim/lib/uploads/validation.ts
@@ -15,6 +15,9 @@ export const SUPPORTED_DOCUMENT_EXTENSIONS = [
'pptx',
'html',
'htm',
+ 'json',
+ 'yaml',
+ 'yml',
] as const
export type SupportedDocumentExtension = (typeof SUPPORTED_DOCUMENT_EXTENSIONS)[number]
@@ -46,6 +49,9 @@ export const SUPPORTED_MIME_TYPES: Record
],
html: ['text/html', 'application/xhtml+xml'],
htm: ['text/html', 'application/xhtml+xml'],
+ json: ['application/json', 'text/json', 'application/x-json'],
+ yaml: ['text/yaml', 'text/x-yaml', 'application/yaml', 'application/x-yaml'],
+ yml: ['text/yaml', 'text/x-yaml', 'application/yaml', 'application/x-yaml'],
}
export const ACCEPTED_FILE_TYPES = Object.values(SUPPORTED_MIME_TYPES).flat()
diff --git a/apps/sim/package.json b/apps/sim/package.json
index 94a78513d..08f9cad01 100644
--- a/apps/sim/package.json
+++ b/apps/sim/package.json
@@ -70,6 +70,7 @@
"clsx": "^2.1.1",
"cmdk": "^1.0.0",
"croner": "^9.0.0",
+ "csv-parse": "6.1.0",
"date-fns": "4.1.0",
"encoding": "0.1.13",
"entities": "6.0.1",
diff --git a/apps/sim/scripts/chunk-docs.ts b/apps/sim/scripts/chunk-docs.ts
deleted file mode 100644
index b86d019fa..000000000
--- a/apps/sim/scripts/chunk-docs.ts
+++ /dev/null
@@ -1,98 +0,0 @@
-#!/usr/bin/env bun
-
-import path from 'path'
-import { DocsChunker } from '@/lib/knowledge/documents/docs-chunker'
-import type { DocChunk } from '@/lib/knowledge/documents/types'
-import { createLogger } from '@/lib/logs/console/logger'
-
-const logger = createLogger('ChunkDocsScript')
-
-/**
- * Script to chunk all .mdx files in the docs directory
- */
-async function main() {
- try {
- // Initialize the docs chunker
- const chunker = new DocsChunker({
- chunkSize: 1024,
- minChunkSize: 100,
- overlap: 200,
- baseUrl: 'https://docs.sim.ai',
- })
-
- // Path to the docs content directory
- const docsPath = path.join(process.cwd(), '../../apps/docs/content/docs')
-
- logger.info(`Processing docs from: ${docsPath}`)
-
- // Process all .mdx files
- const chunks = await chunker.chunkAllDocs(docsPath)
-
- logger.info(`\n=== CHUNKING RESULTS ===`)
- logger.info(`Total chunks: ${chunks.length}`)
-
- // Group chunks by document
- const chunksByDoc = chunks.reduce>((acc, chunk) => {
- if (!acc[chunk.sourceDocument]) {
- acc[chunk.sourceDocument] = []
- }
- acc[chunk.sourceDocument].push(chunk)
- return acc
- }, {})
-
- // Display summary
- logger.info(`\n=== DOCUMENT SUMMARY ===`)
- for (const [doc, docChunks] of Object.entries(chunksByDoc)) {
- logger.info(`${doc}: ${docChunks.length} chunks`)
- }
-
- // Display a few sample chunks
- logger.info(`\n=== SAMPLE CHUNKS ===`)
- chunks.slice(0, 3).forEach((chunk, index) => {
- logger.info(`\nChunk ${index + 1}:`)
- logger.info(` Source: ${chunk.sourceDocument}`)
- logger.info(` Header: ${chunk.headerText} (Level ${chunk.headerLevel})`)
- logger.info(` Link: ${chunk.headerLink}`)
- logger.info(` Tokens: ${chunk.tokenCount}`)
- logger.info(` Embedding: ${chunk.embedding.length} dimensions (${chunk.embeddingModel})`)
- logger.info(
- ` Embedding Preview: [${chunk.embedding
- .slice(0, 5)
- .map((n) => n.toFixed(4))
- .join(', ')}...]`
- )
- logger.info(` Text Preview: ${chunk.text.slice(0, 100)}...`)
- })
-
- // Calculate total token count
- const totalTokens = chunks.reduce((sum, chunk) => sum + chunk.tokenCount, 0)
- const chunksWithEmbeddings = chunks.filter((chunk) => chunk.embedding.length > 0).length
-
- logger.info(`\n=== STATISTICS ===`)
- logger.info(`Total tokens: ${totalTokens}`)
- logger.info(`Average tokens per chunk: ${Math.round(totalTokens / chunks.length)}`)
- logger.info(`Chunks with embeddings: ${chunksWithEmbeddings}/${chunks.length}`)
- if (chunks.length > 0 && chunks[0].embedding.length > 0) {
- logger.info(`Embedding model: ${chunks[0].embeddingModel}`)
- logger.info(`Embedding dimensions: ${chunks[0].embedding.length}`)
- }
-
- const headerLevels = chunks.reduce>((acc, chunk) => {
- acc[chunk.headerLevel] = (acc[chunk.headerLevel] || 0) + 1
- return acc
- }, {})
-
- logger.info(`Header level distribution:`)
- Object.entries(headerLevels)
- .sort(([a], [b]) => Number(a) - Number(b))
- .forEach(([level, count]) => {
- logger.info(` H${level}: ${count} chunks`)
- })
- } catch (error) {
- logger.error('Error processing docs:', error)
- process.exit(1)
- }
-}
-
-// Run the script
-main().catch(console.error)
diff --git a/apps/sim/scripts/process-docs-embeddings.ts b/apps/sim/scripts/process-docs-embeddings.ts
deleted file mode 100644
index 061672e49..000000000
--- a/apps/sim/scripts/process-docs-embeddings.ts
+++ /dev/null
@@ -1,215 +0,0 @@
-#!/usr/bin/env bun
-
-import path from 'path'
-import { db } from '@sim/db'
-import { docsEmbeddings } from '@sim/db/schema'
-import { sql } from 'drizzle-orm'
-import { isDev } from '@/lib/environment'
-import { DocsChunker } from '@/lib/knowledge/documents/docs-chunker'
-import { createLogger } from '@/lib/logs/console/logger'
-
-const logger = createLogger('ProcessDocsEmbeddings')
-
-interface ProcessingOptions {
- /** Clear existing docs embeddings before processing */
- clearExisting?: boolean
- /** Path to docs directory */
- docsPath?: string
- /** Base URL for generating links */
- baseUrl?: string
- /** Chunk size in tokens */
- chunkSize?: number
- /** Minimum chunk size in tokens */
- minChunkSize?: number
- /** Overlap between chunks in tokens */
- overlap?: number
-}
-
-/**
- * Production script to process documentation and save embeddings to database
- */
-async function processDocsEmbeddings(options: ProcessingOptions = {}) {
- const startTime = Date.now()
- let processedChunks = 0
- let failedChunks = 0
-
- try {
- // Configuration
- const config = {
- clearExisting: options.clearExisting ?? false,
- docsPath: options.docsPath ?? path.join(process.cwd(), '../../apps/docs/content/docs/en'),
- baseUrl: options.baseUrl ?? (isDev ? 'http://localhost:3001' : 'https://docs.sim.ai'),
- chunkSize: options.chunkSize ?? 300, // Max 300 tokens per chunk
- minChunkSize: options.minChunkSize ?? 100,
- overlap: options.overlap ?? 50,
- }
-
- logger.info('š Starting docs embedding processing...')
- logger.info(`Configuration:`, {
- docsPath: config.docsPath,
- baseUrl: config.baseUrl,
- chunkSize: config.chunkSize,
- clearExisting: config.clearExisting,
- })
-
- const chunker = new DocsChunker({
- chunkSize: config.chunkSize,
- minChunkSize: config.minChunkSize,
- overlap: config.overlap,
- baseUrl: config.baseUrl,
- })
-
- logger.info(`š Processing docs from: ${config.docsPath}`)
- const chunks = await chunker.chunkAllDocs(config.docsPath)
-
- if (chunks.length === 0) {
- logger.warn('ā ļø No chunks generated from docs')
- return { success: false, processedChunks: 0, failedChunks: 0 }
- }
-
- logger.info(`š Generated ${chunks.length} chunks with embeddings`)
-
- if (config.clearExisting) {
- logger.info('šļø Clearing existing docs embeddings...')
- try {
- const deleteResult = await db.delete(docsEmbeddings)
- logger.info(`ā
Successfully deleted existing embeddings`)
- } catch (error) {
- logger.error('ā Failed to delete existing embeddings:', error)
- throw new Error('Failed to clear existing embeddings')
- }
- }
-
- const batchSize = 10
- logger.info(`š¾ Saving chunks to database (batch size: ${batchSize})...`)
-
- for (let i = 0; i < chunks.length; i += batchSize) {
- const batch = chunks.slice(i, i + batchSize)
-
- try {
- const batchData = batch.map((chunk) => ({
- chunkText: chunk.text,
- sourceDocument: chunk.sourceDocument,
- sourceLink: chunk.headerLink,
- headerText: chunk.headerText,
- headerLevel: chunk.headerLevel,
- tokenCount: chunk.tokenCount,
- embedding: chunk.embedding,
- embeddingModel: chunk.embeddingModel,
- metadata: chunk.metadata,
- }))
-
- await db.insert(docsEmbeddings).values(batchData)
-
- processedChunks += batch.length
-
- if (i % (batchSize * 5) === 0 || i + batchSize >= chunks.length) {
- logger.info(
- ` š¾ Saved ${Math.min(i + batchSize, chunks.length)}/${chunks.length} chunks`
- )
- }
- } catch (error) {
- logger.error(`ā Failed to save batch ${Math.floor(i / batchSize) + 1}:`, error)
- failedChunks += batch.length
- }
- }
-
- const savedCount = await db
- .select({ count: sql`count(*)` })
- .from(docsEmbeddings)
- .then((result) => result[0]?.count || 0)
-
- const duration = Date.now() - startTime
-
- logger.info(`ā
Processing complete!`)
- logger.info(`š Results:`)
- logger.info(` ⢠Total chunks processed: ${chunks.length}`)
- logger.info(` ⢠Successfully saved: ${processedChunks}`)
- logger.info(` ⢠Failed: ${failedChunks}`)
- logger.info(` ⢠Database total: ${savedCount}`)
- logger.info(` ⢠Duration: ${Math.round(duration / 1000)}s`)
-
- const documentStats = chunks.reduce(
- (acc, chunk) => {
- if (!acc[chunk.sourceDocument]) {
- acc[chunk.sourceDocument] = { chunks: 0, tokens: 0 }
- }
- acc[chunk.sourceDocument].chunks++
- acc[chunk.sourceDocument].tokens += chunk.tokenCount
- return acc
- },
- {} as Record
- )
-
- logger.info(`š Document breakdown:`)
- Object.entries(documentStats)
- .sort(([, a], [, b]) => b.chunks - a.chunks)
- .slice(0, 10)
- .forEach(([doc, stats]) => {
- logger.info(` ⢠${doc}: ${stats.chunks} chunks, ${stats.tokens} tokens`)
- })
-
- if (Object.keys(documentStats).length > 10) {
- logger.info(` ⢠... and ${Object.keys(documentStats).length - 10} more documents`)
- }
-
- return {
- success: failedChunks === 0,
- processedChunks,
- failedChunks,
- totalChunks: chunks.length,
- databaseCount: savedCount,
- duration,
- }
- } catch (error) {
- logger.error('š„ Fatal error during processing:', error)
- return {
- success: false,
- processedChunks,
- failedChunks,
- error: error instanceof Error ? error.message : 'Unknown error',
- }
- }
-}
-
-/**
- * Main function - handle command line arguments
- */
-async function main() {
- const args = process.argv.slice(2)
- const options: ProcessingOptions = {}
-
- if (args.includes('--clear')) {
- options.clearExisting = true
- }
-
- if (args.includes('--help') || args.includes('-h')) {
- console.log(`
-Usage: bun run scripts/process-docs-embeddings.ts [options]
-
-Options:
- --clear Clear existing docs embeddings before processing
- --help, -h Show this help message
-
-Examples:
- bun run scripts/process-docs-embeddings.ts
- bun run scripts/process-docs-embeddings.ts --clear
-`)
- process.exit(0)
- }
-
- const result = await processDocsEmbeddings(options)
-
- if (!result.success) {
- process.exit(1)
- }
-}
-
-if (import.meta.url.includes('process-docs-embeddings.ts')) {
- main().catch((error) => {
- logger.error('Script failed:', error)
- process.exit(1)
- })
-}
-
-export { processDocsEmbeddings }
diff --git a/apps/sim/scripts/process-docs.ts b/apps/sim/scripts/process-docs.ts
new file mode 100644
index 000000000..86e06ffd6
--- /dev/null
+++ b/apps/sim/scripts/process-docs.ts
@@ -0,0 +1,256 @@
+#!/usr/bin/env bun
+
+import path from 'path'
+import { db } from '@sim/db'
+import { docsEmbeddings } from '@sim/db/schema'
+import { sql } from 'drizzle-orm'
+import { type DocChunk, DocsChunker } from '@/lib/chunkers'
+import { isDev } from '@/lib/environment'
+import { createLogger } from '@/lib/logs/console/logger'
+
+const logger = createLogger('ProcessDocs')
+
+interface ProcessingOptions {
+ /** Clear existing docs embeddings before processing */
+ clearExisting?: boolean
+ /** Path to docs directory */
+ docsPath?: string
+ /** Base URL for generating links */
+ baseUrl?: string
+ /** Chunk size in tokens */
+ chunkSize?: number
+ /** Minimum chunk size */
+ minChunkSize?: number
+ /** Overlap between chunks */
+ overlap?: number
+ /** Dry run - only display results, don't save to DB */
+ dryRun?: boolean
+ /** Verbose output */
+ verbose?: boolean
+}
+
+/**
+ * Process documentation files and optionally save embeddings to database
+ */
+async function processDocs(options: ProcessingOptions = {}) {
+ const config = {
+ docsPath: options.docsPath || path.join(process.cwd(), '../../apps/docs/content/docs'),
+ baseUrl: options.baseUrl || (isDev ? 'http://localhost:4000' : 'https://docs.sim.ai'),
+ chunkSize: options.chunkSize || 1024,
+ minChunkSize: options.minChunkSize || 100,
+ overlap: options.overlap || 200,
+ clearExisting: options.clearExisting ?? false,
+ dryRun: options.dryRun ?? false,
+ verbose: options.verbose ?? false,
+ }
+
+ let processedChunks = 0
+ let failedChunks = 0
+
+ try {
+ logger.info('š Starting docs processing with config:', {
+ docsPath: config.docsPath,
+ baseUrl: config.baseUrl,
+ chunkSize: config.chunkSize,
+ clearExisting: config.clearExisting,
+ dryRun: config.dryRun,
+ })
+
+ // Initialize the chunker
+ const chunker = new DocsChunker({
+ chunkSize: config.chunkSize,
+ minChunkSize: config.minChunkSize,
+ overlap: config.overlap,
+ baseUrl: config.baseUrl,
+ })
+
+ // Process all .mdx files
+ logger.info(`š Processing docs from: ${config.docsPath}`)
+ const chunks = await chunker.chunkAllDocs(config.docsPath)
+
+ if (chunks.length === 0) {
+ logger.warn('ā ļø No chunks generated from docs')
+ return { success: false, processedChunks: 0, failedChunks: 0 }
+ }
+
+ logger.info(`š Generated ${chunks.length} chunks with embeddings`)
+
+ // Group chunks by document for summary
+ const chunksByDoc = chunks.reduce>((acc, chunk) => {
+ if (!acc[chunk.sourceDocument]) {
+ acc[chunk.sourceDocument] = []
+ }
+ acc[chunk.sourceDocument].push(chunk)
+ return acc
+ }, {})
+
+ // Display summary
+ logger.info(`\n=== DOCUMENT SUMMARY ===`)
+ for (const [doc, docChunks] of Object.entries(chunksByDoc)) {
+ logger.info(`${doc}: ${docChunks.length} chunks`)
+ }
+
+ // Display sample chunks in verbose or dry-run mode
+ if (config.verbose || config.dryRun) {
+ logger.info(`\n=== SAMPLE CHUNKS ===`)
+ chunks.slice(0, 3).forEach((chunk, index) => {
+ logger.info(`\nChunk ${index + 1}:`)
+ logger.info(` Source: ${chunk.sourceDocument}`)
+ logger.info(` Header: ${chunk.headerText} (Level ${chunk.headerLevel})`)
+ logger.info(` Link: ${chunk.headerLink}`)
+ logger.info(` Tokens: ${chunk.tokenCount}`)
+ logger.info(` Embedding: ${chunk.embedding.length} dimensions (${chunk.embeddingModel})`)
+ if (config.verbose) {
+ logger.info(` Text Preview: ${chunk.text.substring(0, 200)}...`)
+ }
+ })
+ }
+
+ // If dry run, stop here
+ if (config.dryRun) {
+ logger.info('\nā
Dry run complete - no data saved to database')
+ return { success: true, processedChunks: chunks.length, failedChunks: 0 }
+ }
+
+ // Clear existing embeddings if requested
+ if (config.clearExisting) {
+ logger.info('šļø Clearing existing docs embeddings...')
+ try {
+ await db.delete(docsEmbeddings)
+ logger.info(`ā
Successfully deleted existing embeddings`)
+ } catch (error) {
+ logger.error('ā Failed to delete existing embeddings:', error)
+ throw new Error('Failed to clear existing embeddings')
+ }
+ }
+
+ // Save chunks to database in batches
+ const batchSize = 10
+ logger.info(`š¾ Saving chunks to database (batch size: ${batchSize})...`)
+
+ for (let i = 0; i < chunks.length; i += batchSize) {
+ const batch = chunks.slice(i, i + batchSize)
+
+ try {
+ const batchData = batch.map((chunk) => ({
+ chunkText: chunk.text,
+ sourceDocument: chunk.sourceDocument,
+ sourceLink: chunk.headerLink,
+ headerText: chunk.headerText,
+ headerLevel: chunk.headerLevel,
+ tokenCount: chunk.tokenCount,
+ embedding: chunk.embedding,
+ embeddingModel: chunk.embeddingModel,
+ metadata: chunk.metadata,
+ }))
+
+ await db.insert(docsEmbeddings).values(batchData)
+ processedChunks += batch.length
+
+ if (i % (batchSize * 5) === 0 || i + batchSize >= chunks.length) {
+ logger.info(
+ ` š¾ Saved ${Math.min(i + batchSize, chunks.length)}/${chunks.length} chunks`
+ )
+ }
+ } catch (error) {
+ logger.error(`ā Failed to save batch ${Math.floor(i / batchSize) + 1}:`, error)
+ failedChunks += batch.length
+ }
+ }
+
+ // Verify final count
+ const savedCount = await db
+ .select({ count: sql`count(*)` })
+ .from(docsEmbeddings)
+ .then((res) => res[0]?.count || 0)
+
+ logger.info(
+ `\nā
Processing complete!\n` +
+ ` š Total chunks: ${chunks.length}\n` +
+ ` ā
Processed: ${processedChunks}\n` +
+ ` ā Failed: ${failedChunks}\n` +
+ ` š¾ Total in DB: ${savedCount}`
+ )
+
+ return { success: failedChunks === 0, processedChunks, failedChunks }
+ } catch (error) {
+ logger.error('ā Fatal error during processing:', error)
+ return { success: false, processedChunks, failedChunks }
+ }
+}
+
+/**
+ * Main entry point with CLI argument parsing
+ */
+async function main() {
+ const args = process.argv.slice(2)
+
+ const options: ProcessingOptions = {
+ clearExisting: args.includes('--clear'),
+ dryRun: args.includes('--dry-run'),
+ verbose: args.includes('--verbose'),
+ }
+
+ // Parse custom path if provided
+ const pathIndex = args.indexOf('--path')
+ if (pathIndex !== -1 && args[pathIndex + 1]) {
+ options.docsPath = args[pathIndex + 1]
+ }
+
+ // Parse custom base URL if provided
+ const urlIndex = args.indexOf('--url')
+ if (urlIndex !== -1 && args[urlIndex + 1]) {
+ options.baseUrl = args[urlIndex + 1]
+ }
+
+ // Parse chunk size if provided
+ const chunkSizeIndex = args.indexOf('--chunk-size')
+ if (chunkSizeIndex !== -1 && args[chunkSizeIndex + 1]) {
+ options.chunkSize = Number.parseInt(args[chunkSizeIndex + 1], 10)
+ }
+
+ // Show help if requested
+ if (args.includes('--help') || args.includes('-h')) {
+ console.log(`
+š Process Documentation Script
+
+Usage: bun run process-docs.ts [options]
+
+Options:
+ --clear Clear existing embeddings before processing
+ --dry-run Process and display results without saving to DB
+ --verbose Show detailed output including text previews
+ --path Custom path to docs directory
+ --url Custom base URL for links
+ --chunk-size Custom chunk size in tokens (default: 1024)
+ --help, -h Show this help message
+
+Examples:
+ # Dry run to test chunking
+ bun run process-docs.ts --dry-run
+
+ # Process and save to database
+ bun run process-docs.ts
+
+ # Clear existing and reprocess
+ bun run process-docs.ts --clear
+
+ # Custom path with verbose output
+ bun run process-docs.ts --path ./my-docs --verbose
+ `)
+ process.exit(0)
+ }
+
+ const result = await processDocs(options)
+ process.exit(result.success ? 0 : 1)
+}
+
+// Run if executed directly
+if (import.meta.url === `file://${process.argv[1]}`) {
+ main().catch((error) => {
+ logger.error('Fatal error:', error)
+ process.exit(1)
+ })
+}
+
+export { processDocs }
diff --git a/apps/sim/tools/mistral/parser.ts b/apps/sim/tools/mistral/parser.ts
index acd17ad37..4572b099a 100644
--- a/apps/sim/tools/mistral/parser.ts
+++ b/apps/sim/tools/mistral/parser.ts
@@ -229,19 +229,6 @@ export const mistralParserTool: ToolConfig