feat(kb): added json/yaml parser+chunker, added dedicated csv chunker (#1539)

* feat(kb): added json/yaml parser+chunker, added dedicated csv chunker

* ack PR comments

* improved kb upload
This commit is contained in:
Waleed
2025-10-04 14:59:21 -07:00
committed by GitHub
parent 0e838940f1
commit 86ed32ea10
27 changed files with 1794 additions and 857 deletions

View File

@@ -32,4 +32,4 @@ jobs:
env:
DATABASE_URL: ${{ github.ref == 'refs/heads/main' && secrets.DATABASE_URL || secrets.STAGING_DATABASE_URL }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: bun run scripts/process-docs-embeddings.ts --clear
run: bun run scripts/process-docs.ts --clear

View File

@@ -312,7 +312,7 @@ export function EditChunkModal({
<Button
onClick={handleSaveContent}
disabled={!isFormValid || isSaving || !hasUnsavedChanges || isNavigating}
className='bg-[var(--brand-primary-hex)] font-[480] text-muted-foreground shadow-[0_0_0_0_var(--brand-primary-hex)] transition-all duration-200 hover:bg-[var(--brand-primary-hover-hex)] hover:shadow-[0_0_0_4px_rgba(127,47,255,0.15)]'
className='bg-[var(--brand-primary-hex)] font-[480] text-white shadow-[0_0_0_0_var(--brand-primary-hex)] transition-all duration-200 hover:bg-[var(--brand-primary-hover-hex)] hover:shadow-[0_0_0_4px_rgba(127,47,255,0.15)]'
>
{isSaving ? (
<>

View File

@@ -64,7 +64,7 @@ export function UploadModal({
return `File "${file.name}" is too large. Maximum size is 100MB.`
}
if (!ACCEPTED_FILE_TYPES.includes(file.type)) {
return `File "${file.name}" has an unsupported format. Please use PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, or HTML files.`
return `File "${file.name}" has an unsupported format. Please use PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML, JSON, YAML, or YML files.`
}
return null
}
@@ -193,8 +193,8 @@ export function UploadModal({
{isDragging ? 'Drop files here!' : 'Drop files here or click to browse'}
</p>
<p className='text-muted-foreground text-xs'>
Supports PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML (max 100MB
each)
Supports PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML, JSON, YAML,
YML (max 100MB each)
</p>
</div>
</div>

View File

@@ -158,7 +158,7 @@ export function CreateModal({ open, onOpenChange, onKnowledgeBaseCreated }: Crea
// Check file type
if (!ACCEPTED_FILE_TYPES.includes(file.type)) {
setFileError(
`File ${file.name} has an unsupported format. Please use PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, or HTML.`
`File ${file.name} has an unsupported format. Please use PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML, JSON, YAML, or YML.`
)
hasError = true
continue
@@ -501,8 +501,8 @@ export function CreateModal({ open, onOpenChange, onKnowledgeBaseCreated }: Crea
: 'Drop files here or click to browse'}
</p>
<p className='text-muted-foreground text-xs'>
Supports PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML (max
100MB each)
Supports PDF, DOC, DOCX, TXT, CSV, XLS, XLSX, MD, PPT, PPTX, HTML,
JSON, YAML, YML (max 100MB each)
</p>
</div>
</div>

View File

@@ -84,16 +84,200 @@ class ProcessingError extends KnowledgeUploadError {
}
const UPLOAD_CONFIG = {
BATCH_SIZE: 15, // Upload files in parallel - this is fast and not the bottleneck
MAX_RETRIES: 3, // Standard retry count
RETRY_DELAY: 2000, // Initial retry delay in ms (2 seconds)
RETRY_MULTIPLIER: 2, // Standard exponential backoff (2s, 4s, 8s)
CHUNK_SIZE: 5 * 1024 * 1024,
DIRECT_UPLOAD_THRESHOLD: 4 * 1024 * 1024, // Files > 4MB must use presigned URLs
LARGE_FILE_THRESHOLD: 50 * 1024 * 1024, // Files > 50MB need multipart upload
UPLOAD_TIMEOUT: 60000, // 60 second timeout per upload
MAX_PARALLEL_UPLOADS: 3, // Prevent client saturation mirrors guidance on limiting simultaneous transfers (@Web)
MAX_RETRIES: 3,
RETRY_DELAY_MS: 2000,
RETRY_BACKOFF: 2,
CHUNK_SIZE: 8 * 1024 * 1024, // 8MB keeps us well above S3 minimum part size while reducing part count (@Web)
DIRECT_UPLOAD_THRESHOLD: 4 * 1024 * 1024,
LARGE_FILE_THRESHOLD: 50 * 1024 * 1024,
BASE_TIMEOUT_MS: 2 * 60 * 1000, // baseline per transfer window per large-file guidance (@Web)
TIMEOUT_PER_MB_MS: 1500,
MAX_TIMEOUT_MS: 10 * 60 * 1000,
MULTIPART_PART_CONCURRENCY: 3,
MULTIPART_MAX_RETRIES: 3,
BATCH_REQUEST_SIZE: 50,
} as const
const calculateUploadTimeoutMs = (fileSize: number) => {
const sizeInMb = fileSize / (1024 * 1024)
const dynamicBudget = UPLOAD_CONFIG.BASE_TIMEOUT_MS + sizeInMb * UPLOAD_CONFIG.TIMEOUT_PER_MB_MS
return Math.min(dynamicBudget, UPLOAD_CONFIG.MAX_TIMEOUT_MS)
}
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
const getHighResTime = () =>
typeof performance !== 'undefined' && typeof performance.now === 'function'
? performance.now()
: Date.now()
const formatMegabytes = (bytes: number) => Number((bytes / (1024 * 1024)).toFixed(2))
const calculateThroughputMbps = (bytes: number, durationMs: number) => {
if (!bytes || !durationMs) return 0
return Number((((bytes * 8) / durationMs) * 0.001).toFixed(2))
}
const formatDurationSeconds = (durationMs: number) => Number((durationMs / 1000).toFixed(2))
const runWithConcurrency = async <T, R>(
items: T[],
limit: number,
worker: (item: T, index: number) => Promise<R>
): Promise<Array<PromiseSettledResult<R>>> => {
const results: Array<PromiseSettledResult<R>> = Array(items.length)
if (items.length === 0) {
return results
}
const concurrency = Math.max(1, Math.min(limit, items.length))
let nextIndex = 0
const runners = Array.from({ length: concurrency }, async () => {
while (true) {
const currentIndex = nextIndex++
if (currentIndex >= items.length) {
break
}
try {
const value = await worker(items[currentIndex], currentIndex)
results[currentIndex] = { status: 'fulfilled', value }
} catch (error) {
results[currentIndex] = { status: 'rejected', reason: error }
}
}
})
await Promise.all(runners)
return results
}
const getErrorName = (error: unknown) =>
typeof error === 'object' && error !== null && 'name' in error ? String((error as any).name) : ''
const getErrorMessage = (error: unknown) =>
error instanceof Error ? error.message : typeof error === 'string' ? error : 'Unknown error'
const isAbortError = (error: unknown) => getErrorName(error) === 'AbortError'
const isNetworkError = (error: unknown) => {
if (!(error instanceof Error)) {
return false
}
const message = error.message.toLowerCase()
return (
message.includes('network') ||
message.includes('fetch') ||
message.includes('connection') ||
message.includes('timeout') ||
message.includes('timed out') ||
message.includes('ecconnreset')
)
}
interface PresignedFileInfo {
path: string
key: string
name: string
size: number
type: string
}
interface PresignedUploadInfo {
fileName: string
presignedUrl: string
fileInfo: PresignedFileInfo
uploadHeaders?: Record<string, string>
directUploadSupported: boolean
presignedUrls?: any
}
const normalizePresignedData = (data: any, context: string): PresignedUploadInfo => {
const presignedUrl = data?.presignedUrl || data?.uploadUrl
const fileInfo = data?.fileInfo
if (!presignedUrl || !fileInfo?.path) {
throw new PresignedUrlError(`Invalid presigned response for ${context}`, data)
}
return {
fileName: data.fileName || fileInfo.name || context,
presignedUrl,
fileInfo: {
path: fileInfo.path,
key: fileInfo.key,
name: fileInfo.name || context,
size: fileInfo.size || data.fileSize || 0,
type: fileInfo.type || data.contentType || '',
},
uploadHeaders: data.uploadHeaders || undefined,
directUploadSupported: data.directUploadSupported !== false,
presignedUrls: data.presignedUrls,
}
}
const getPresignedData = async (
file: File,
timeoutMs: number,
controller?: AbortController
): Promise<PresignedUploadInfo> => {
const localController = controller ?? new AbortController()
const timeoutId = setTimeout(() => localController.abort(), timeoutMs)
const startTime = getHighResTime()
try {
const presignedResponse = await fetch('/api/files/presigned?type=knowledge-base', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
fileName: file.name,
contentType: file.type,
fileSize: file.size,
}),
signal: localController.signal,
})
if (!presignedResponse.ok) {
let errorDetails: any = null
try {
errorDetails = await presignedResponse.json()
} catch {
// Ignore JSON parsing errors (@Web)
}
logger.error('Presigned URL request failed', {
status: presignedResponse.status,
fileSize: file.size,
})
throw new PresignedUrlError(
`Failed to get presigned URL for ${file.name}: ${presignedResponse.status} ${presignedResponse.statusText}`,
errorDetails
)
}
const presignedData = await presignedResponse.json()
const durationMs = getHighResTime() - startTime
logger.info('Fetched presigned URL', {
fileName: file.name,
sizeMB: formatMegabytes(file.size),
durationMs: formatDurationSeconds(durationMs),
})
return normalizePresignedData(presignedData, file.name)
} finally {
clearTimeout(timeoutId)
if (!controller) {
localController.abort()
}
}
}
export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
const [isUploading, setIsUploading] = useState(false)
const [uploadProgress, setUploadProgress] = useState<UploadProgress>({
@@ -153,85 +337,51 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
const uploadSingleFileWithRetry = async (
file: File,
retryCount = 0,
fileIndex?: number
fileIndex?: number,
presignedOverride?: PresignedUploadInfo
): Promise<UploadedFile> => {
const timeoutMs = calculateUploadTimeoutMs(file.size)
let presignedData: PresignedUploadInfo | undefined
const attempt = retryCount + 1
logger.info('Upload attempt started', {
fileName: file.name,
attempt,
sizeMB: formatMegabytes(file.size),
timeoutMs: formatDurationSeconds(timeoutMs),
})
try {
// Create abort controller for timeout
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), UPLOAD_CONFIG.UPLOAD_TIMEOUT)
const timeoutId = setTimeout(() => controller.abort(), timeoutMs)
try {
// Get presigned URL
const presignedResponse = await fetch('/api/files/presigned?type=knowledge-base', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
fileName: file.name,
contentType: file.type,
fileSize: file.size,
}),
signal: controller.signal,
})
clearTimeout(timeoutId)
if (!presignedResponse.ok) {
let errorDetails: any = null
try {
errorDetails = await presignedResponse.json()
} catch {
// Ignore JSON parsing errors
}
logger.error('Presigned URL request failed', {
status: presignedResponse.status,
fileSize: file.size,
retryCount,
})
throw new PresignedUrlError(
`Failed to get presigned URL for ${file.name}: ${presignedResponse.status} ${presignedResponse.statusText}`,
errorDetails
)
}
const presignedData = await presignedResponse.json()
presignedData = presignedOverride ?? (await getPresignedData(file, timeoutMs, controller))
if (presignedData.directUploadSupported) {
// Use presigned URLs for all uploads when cloud storage is available
// Check if file needs multipart upload for large files
if (file.size > UPLOAD_CONFIG.LARGE_FILE_THRESHOLD) {
return await uploadFileInChunks(file, presignedData)
return await uploadFileInChunks(file, presignedData, timeoutMs, fileIndex)
}
return await uploadFileDirectly(file, presignedData, fileIndex)
return await uploadFileDirectly(file, presignedData, timeoutMs, controller, fileIndex)
}
// Fallback to traditional upload through API route
// This is only used when cloud storage is not configured
// Must check file size due to Vercel's 4.5MB limit
if (file.size > UPLOAD_CONFIG.DIRECT_UPLOAD_THRESHOLD) {
throw new DirectUploadError(
`File ${file.name} is too large (${(file.size / 1024 / 1024).toFixed(2)}MB) for upload. Cloud storage must be configured for files over 4MB.`,
{ fileSize: file.size, limit: UPLOAD_CONFIG.DIRECT_UPLOAD_THRESHOLD }
)
}
logger.warn(`Using API upload fallback for ${file.name} - cloud storage not configured`)
return await uploadFileThroughAPI(file)
return await uploadFileThroughAPI(file, timeoutMs)
} finally {
clearTimeout(timeoutId)
}
} catch (error) {
const isTimeout = error instanceof Error && error.name === 'AbortError'
const isNetwork =
error instanceof Error &&
(error.message.includes('fetch') ||
error.message.includes('network') ||
error.message.includes('Failed to fetch'))
const isTimeout = isAbortError(error)
const isNetwork = isNetworkError(error)
// Retry logic
if (retryCount < UPLOAD_CONFIG.MAX_RETRIES) {
const delay = UPLOAD_CONFIG.RETRY_DELAY * UPLOAD_CONFIG.RETRY_MULTIPLIER ** retryCount // More aggressive exponential backoff
const delay = UPLOAD_CONFIG.RETRY_DELAY_MS * UPLOAD_CONFIG.RETRY_BACKOFF ** retryCount // More aggressive exponential backoff (@Web)
if (isTimeout || isNetwork) {
logger.warn(
`Upload failed (${isTimeout ? 'timeout' : 'network'}), retrying in ${delay / 1000}s...`,
@@ -243,7 +393,6 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
)
}
// Reset progress to 0 before retry to indicate restart
if (fileIndex !== undefined) {
setUploadProgress((prev) => ({
...prev,
@@ -253,8 +402,14 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
}))
}
await new Promise((resolve) => setTimeout(resolve, delay))
return uploadSingleFileWithRetry(file, retryCount + 1, fileIndex)
await sleep(delay)
const shouldReusePresigned = (isTimeout || isNetwork) && presignedData
return uploadSingleFileWithRetry(
file,
retryCount + 1,
fileIndex,
shouldReusePresigned ? presignedData : undefined
)
}
logger.error('Upload failed after retries', {
@@ -271,12 +426,15 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
*/
const uploadFileDirectly = async (
file: File,
presignedData: any,
presignedData: PresignedUploadInfo,
timeoutMs: number,
outerController: AbortController,
fileIndex?: number
): Promise<UploadedFile> => {
return new Promise((resolve, reject) => {
const xhr = new XMLHttpRequest()
let isCompleted = false // Track if this upload has completed to prevent duplicate state updates
let isCompleted = false
const startTime = getHighResTime()
const timeoutId = setTimeout(() => {
if (!isCompleted) {
@@ -284,7 +442,18 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
xhr.abort()
reject(new Error('Upload timeout'))
}
}, UPLOAD_CONFIG.UPLOAD_TIMEOUT)
}, timeoutMs)
const abortHandler = () => {
if (!isCompleted) {
isCompleted = true
clearTimeout(timeoutId)
xhr.abort()
reject(new DirectUploadError(`Upload aborted for ${file.name}`, {}))
}
}
outerController.signal.addEventListener('abort', abortHandler)
// Track upload progress
xhr.upload.addEventListener('progress', (event) => {
@@ -309,10 +478,19 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
if (!isCompleted) {
isCompleted = true
clearTimeout(timeoutId)
outerController.signal.removeEventListener('abort', abortHandler)
const durationMs = getHighResTime() - startTime
if (xhr.status >= 200 && xhr.status < 300) {
const fullFileUrl = presignedData.fileInfo.path.startsWith('http')
? presignedData.fileInfo.path
: `${window.location.origin}${presignedData.fileInfo.path}`
logger.info('Direct upload completed', {
fileName: file.name,
sizeMB: formatMegabytes(file.size),
durationMs: formatDurationSeconds(durationMs),
throughputMbps: calculateThroughputMbps(file.size, durationMs),
status: xhr.status,
})
resolve(createUploadedFile(file.name, fullFileUrl, file.size, file.type, file))
} else {
logger.error('S3 PUT request failed', {
@@ -335,17 +513,18 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
if (!isCompleted) {
isCompleted = true
clearTimeout(timeoutId)
outerController.signal.removeEventListener('abort', abortHandler)
const durationMs = getHighResTime() - startTime
logger.error('Direct upload network error', {
fileName: file.name,
sizeMB: formatMegabytes(file.size),
durationMs: formatDurationSeconds(durationMs),
})
reject(new DirectUploadError(`Network error uploading ${file.name}`, {}))
}
})
xhr.addEventListener('abort', () => {
if (!isCompleted) {
isCompleted = true
clearTimeout(timeoutId)
reject(new DirectUploadError(`Upload aborted for ${file.name}`, {}))
}
})
xhr.addEventListener('abort', abortHandler)
// Start the upload
xhr.open('PUT', presignedData.presignedUrl)
@@ -365,10 +544,16 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
/**
* Upload large file in chunks (multipart upload)
*/
const uploadFileInChunks = async (file: File, presignedData: any): Promise<UploadedFile> => {
const uploadFileInChunks = async (
file: File,
presignedData: PresignedUploadInfo,
timeoutMs: number,
fileIndex?: number
): Promise<UploadedFile> => {
logger.info(
`Uploading large file ${file.name} (${(file.size / 1024 / 1024).toFixed(2)}MB) using multipart upload`
)
const startTime = getHighResTime()
try {
// Step 1: Initiate multipart upload
@@ -419,37 +604,76 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
// Step 4: Upload parts in parallel (batch them to avoid overwhelming the browser)
const uploadedParts: Array<{ ETag: string; PartNumber: number }> = []
const PARALLEL_UPLOADS = 3 // Upload 3 parts at a time
for (let i = 0; i < presignedUrls.length; i += PARALLEL_UPLOADS) {
const batch = presignedUrls.slice(i, i + PARALLEL_UPLOADS)
const controller = new AbortController()
const multipartTimeoutId = setTimeout(() => controller.abort(), timeoutMs)
const batchPromises = batch.map(async ({ partNumber, url }: any) => {
try {
const uploadPart = async ({ partNumber, url }: any) => {
const start = (partNumber - 1) * chunkSize
const end = Math.min(start + chunkSize, file.size)
const chunk = file.slice(start, end)
const uploadResponse = await fetch(url, {
method: 'PUT',
body: chunk,
headers: {
'Content-Type': file.type,
},
})
for (let attempt = 0; attempt <= UPLOAD_CONFIG.MULTIPART_MAX_RETRIES; attempt++) {
try {
const partResponse = await fetch(url, {
method: 'PUT',
body: chunk,
signal: controller.signal,
headers: {
'Content-Type': file.type,
},
})
if (!uploadResponse.ok) {
throw new Error(`Failed to upload part ${partNumber}: ${uploadResponse.statusText}`)
if (!partResponse.ok) {
throw new Error(`Failed to upload part ${partNumber}: ${partResponse.statusText}`)
}
const etag = partResponse.headers.get('ETag') || ''
logger.info(`Uploaded part ${partNumber}/${numParts}`)
if (fileIndex !== undefined) {
const partProgress = Math.min(100, Math.round((partNumber / numParts) * 100))
setUploadProgress((prev) => ({
...prev,
fileStatuses: prev.fileStatuses?.map((fs, idx) =>
idx === fileIndex ? { ...fs, progress: partProgress } : fs
),
}))
}
return { ETag: etag.replace(/"/g, ''), PartNumber: partNumber }
} catch (partError) {
if (attempt >= UPLOAD_CONFIG.MULTIPART_MAX_RETRIES) {
throw partError
}
const delay = UPLOAD_CONFIG.RETRY_DELAY_MS * UPLOAD_CONFIG.RETRY_BACKOFF ** attempt
logger.warn(
`Part ${partNumber} failed (attempt ${attempt + 1}), retrying in ${Math.round(delay / 1000)}s`
)
await sleep(delay)
}
}
// Get ETag from response headers
const etag = uploadResponse.headers.get('ETag') || ''
logger.info(`Uploaded part ${partNumber}/${numParts}`)
throw new Error(`Retries exhausted for part ${partNumber}`)
}
return { ETag: etag.replace(/"/g, ''), PartNumber: partNumber }
const partResults = await runWithConcurrency(
presignedUrls,
UPLOAD_CONFIG.MULTIPART_PART_CONCURRENCY,
uploadPart
)
partResults.forEach((result) => {
if (result?.status === 'fulfilled') {
uploadedParts.push(result.value)
} else if (result?.status === 'rejected') {
throw result.reason
}
})
const batchResults = await Promise.all(batchPromises)
uploadedParts.push(...batchResults)
} finally {
clearTimeout(multipartTimeoutId)
}
// Step 5: Complete multipart upload
@@ -470,23 +694,37 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
const { path } = await completeResponse.json()
logger.info(`Completed multipart upload for ${file.name}`)
const durationMs = getHighResTime() - startTime
logger.info('Multipart upload metrics', {
fileName: file.name,
sizeMB: formatMegabytes(file.size),
parts: uploadedParts.length,
durationMs: formatDurationSeconds(durationMs),
throughputMbps: calculateThroughputMbps(file.size, durationMs),
})
const fullFileUrl = path.startsWith('http') ? path : `${window.location.origin}${path}`
return createUploadedFile(file.name, fullFileUrl, file.size, file.type, file)
} catch (error) {
logger.error(`Multipart upload failed for ${file.name}:`, error)
const durationMs = getHighResTime() - startTime
logger.warn('Falling back to direct upload after multipart failure', {
fileName: file.name,
sizeMB: formatMegabytes(file.size),
durationMs: formatDurationSeconds(durationMs),
})
// Fall back to direct upload if multipart fails
logger.info('Falling back to direct upload')
return uploadFileDirectly(file, presignedData)
return uploadFileDirectly(file, presignedData, timeoutMs, new AbortController(), fileIndex)
}
}
/**
* Fallback upload through API
*/
const uploadFileThroughAPI = async (file: File): Promise<UploadedFile> => {
const uploadFileThroughAPI = async (file: File, timeoutMs: number): Promise<UploadedFile> => {
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), UPLOAD_CONFIG.UPLOAD_TIMEOUT)
const timeoutId = setTimeout(() => controller.abort(), timeoutMs)
try {
const formData = new FormData()
@@ -559,19 +797,20 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
logger.info(`Starting batch upload of ${files.length} files`)
try {
const BATCH_SIZE = 100 // Process 100 files at a time
const batches = []
// Create all batches
for (let batchStart = 0; batchStart < files.length; batchStart += BATCH_SIZE) {
const batchFiles = files.slice(batchStart, batchStart + BATCH_SIZE)
for (
let batchStart = 0;
batchStart < files.length;
batchStart += UPLOAD_CONFIG.BATCH_REQUEST_SIZE
) {
const batchFiles = files.slice(batchStart, batchStart + UPLOAD_CONFIG.BATCH_REQUEST_SIZE)
const batchIndexOffset = batchStart
batches.push({ batchFiles, batchIndexOffset })
}
logger.info(`Starting parallel processing of ${batches.length} batches`)
// Step 1: Get ALL presigned URLs in parallel
const presignedPromises = batches.map(async ({ batchFiles }, batchIndex) => {
logger.info(
`Getting presigned URLs for batch ${batchIndex + 1}/${batches.length} (${batchFiles.length} files)`
@@ -604,9 +843,8 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
const allPresignedData = await Promise.all(presignedPromises)
logger.info(`Got all presigned URLs, starting uploads`)
// Step 2: Upload all files with global concurrency control
const allUploads = allPresignedData.flatMap(({ batchFiles, presignedData, batchIndex }) => {
const batchIndexOffset = batchIndex * BATCH_SIZE
const batchIndexOffset = batchIndex * UPLOAD_CONFIG.BATCH_REQUEST_SIZE
return batchFiles.map((file, batchFileIndex) => {
const fileIndex = batchIndexOffset + batchFileIndex
@@ -616,16 +854,14 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
})
})
// Process all uploads with concurrency control
for (let i = 0; i < allUploads.length; i += UPLOAD_CONFIG.BATCH_SIZE) {
const concurrentBatch = allUploads.slice(i, i + UPLOAD_CONFIG.BATCH_SIZE)
const uploadPromises = concurrentBatch.map(async ({ file, presigned, fileIndex }) => {
const uploadResults = await runWithConcurrency(
allUploads,
UPLOAD_CONFIG.MAX_PARALLEL_UPLOADS,
async ({ file, presigned, fileIndex }) => {
if (!presigned) {
throw new Error(`No presigned data for file ${file.name}`)
}
// Mark as uploading
setUploadProgress((prev) => ({
...prev,
fileStatuses: prev.fileStatuses?.map((fs, idx) =>
@@ -634,10 +870,8 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
}))
try {
// Upload directly to storage
const result = await uploadFileDirectly(file, presigned, fileIndex)
const result = await uploadSingleFileWithRetry(file, 0, fileIndex, presigned)
// Mark as completed
setUploadProgress((prev) => ({
...prev,
filesCompleted: prev.filesCompleted + 1,
@@ -648,7 +882,6 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
return result
} catch (error) {
// Mark as failed
setUploadProgress((prev) => ({
...prev,
fileStatuses: prev.fileStatuses?.map((fs, idx) =>
@@ -656,30 +889,27 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
? {
...fs,
status: 'failed' as const,
error: error instanceof Error ? error.message : 'Upload failed',
error: getErrorMessage(error),
}
: fs
),
}))
throw error
}
})
const batchResults = await Promise.allSettled(uploadPromises)
for (let j = 0; j < batchResults.length; j++) {
const result = batchResults[j]
if (result.status === 'fulfilled') {
results.push(result.value)
} else {
failedFiles.push({
file: concurrentBatch[j].file,
error:
result.reason instanceof Error ? result.reason : new Error(String(result.reason)),
})
}
}
}
)
uploadResults.forEach((result, idx) => {
if (result?.status === 'fulfilled') {
results.push(result.value)
} else if (result?.status === 'rejected') {
failedFiles.push({
file: allUploads[idx].file,
error:
result.reason instanceof Error ? result.reason : new Error(String(result.reason)),
})
}
})
if (failedFiles.length > 0) {
logger.error(`Failed to upload ${failedFiles.length} files`)

View File

@@ -1,10 +1,17 @@
import fs from 'fs/promises'
import path from 'path'
import { generateEmbeddings } from '@/lib/embeddings/utils'
import { isDev } from '@/lib/environment'
import { TextChunker } from '@/lib/knowledge/documents/chunker'
import type { DocChunk, DocsChunkerOptions, HeaderInfo } from '@/lib/knowledge/documents/types'
import { createLogger } from '@/lib/logs/console/logger'
import { TextChunker } from './text-chunker'
import type { DocChunk, DocsChunkerOptions } from './types'
interface HeaderInfo {
level: number
text: string
slug?: string
anchor?: string
position?: number
}
interface Frontmatter {
title?: string
@@ -29,7 +36,7 @@ export class DocsChunker {
overlap: options.overlap ?? 50,
})
// Use localhost docs in development, production docs otherwise
this.baseUrl = options.baseUrl ?? (isDev ? 'http://localhost:3001' : 'https://docs.sim.ai')
this.baseUrl = options.baseUrl ?? 'https://docs.sim.ai'
}
/**
@@ -108,9 +115,7 @@ export class DocsChunker {
metadata: {
startIndex: chunkStart,
endIndex: chunkEnd,
hasFrontmatter: i === 0 && content.startsWith('---'),
documentTitle: frontmatter.title,
documentDescription: frontmatter.description,
title: frontmatter.title,
},
}
@@ -200,7 +205,7 @@ export class DocsChunker {
let relevantHeader: HeaderInfo | null = null
for (const header of headers) {
if (header.position <= position) {
if (header.position !== undefined && header.position <= position) {
relevantHeader = header
} else {
break
@@ -285,53 +290,6 @@ export class DocsChunker {
return { data, content: markdownContent }
}
/**
* Split content by headers to respect document structure
*/
private splitByHeaders(
content: string
): Array<{ header: string | null; content: string; level: number }> {
const lines = content.split('\n')
const sections: Array<{ header: string | null; content: string; level: number }> = []
let currentHeader: string | null = null
let currentLevel = 0
let currentContent: string[] = []
for (const line of lines) {
const headerMatch = line.match(/^(#{1,3})\s+(.+)$/) // Only split on H1-H3, not H4-H6
if (headerMatch) {
// Save previous section
if (currentContent.length > 0) {
sections.push({
header: currentHeader,
content: currentContent.join('\n').trim(),
level: currentLevel,
})
}
// Start new section
currentHeader = line
currentLevel = headerMatch[1].length
currentContent = []
} else {
currentContent.push(line)
}
}
// Add final section
if (currentContent.length > 0) {
sections.push({
header: currentHeader,
content: currentContent.join('\n').trim(),
level: currentLevel,
})
}
return sections.filter((section) => section.content.trim().length > 0)
}
/**
* Estimate token count (rough approximation)
*/
@@ -340,175 +298,6 @@ export class DocsChunker {
return Math.ceil(text.length / 4)
}
/**
* Merge small adjacent chunks to reach target size
*/
private mergeSmallChunks(chunks: string[]): string[] {
const merged: string[] = []
let currentChunk = ''
for (const chunk of chunks) {
const currentTokens = this.estimateTokens(currentChunk)
const chunkTokens = this.estimateTokens(chunk)
// If adding this chunk would exceed target size, save current and start new
if (currentTokens > 0 && currentTokens + chunkTokens > 500) {
if (currentChunk.trim()) {
merged.push(currentChunk.trim())
}
currentChunk = chunk
} else {
// Merge with current chunk
currentChunk = currentChunk ? `${currentChunk}\n\n${chunk}` : chunk
}
}
// Add final chunk
if (currentChunk.trim()) {
merged.push(currentChunk.trim())
}
return merged
}
/**
* Chunk a section while preserving tables and structure
*/
private async chunkSection(section: {
header: string | null
content: string
level: number
}): Promise<string[]> {
const content = section.content
const header = section.header
// Check if content contains tables
const hasTable = this.containsTable(content)
if (hasTable) {
// Split by tables and handle each part
return this.splitContentWithTables(content, header)
}
// Regular chunking for text-only content
const chunks = await this.textChunker.chunk(content)
return chunks.map((chunk, index) => {
// Add header to first chunk only
if (index === 0 && header) {
return `${header}\n\n${chunk.text}`.trim()
}
return chunk.text
})
}
/**
* Check if content contains markdown tables
*/
private containsTable(content: string): boolean {
const lines = content.split('\n')
return lines.some((line, index) => {
if (line.includes('|') && line.split('|').length >= 3) {
const nextLine = lines[index + 1]
return nextLine?.includes('|') && nextLine.includes('-')
}
return false
})
}
/**
* Split content that contains tables, keeping tables intact
*/
private splitContentWithTables(content: string, header: string | null): string[] {
const lines = content.split('\n')
const chunks: string[] = []
let currentChunk: string[] = []
let inTable = false
let tableLines: string[] = []
for (let i = 0; i < lines.length; i++) {
const line = lines[i]
// Detect table start
if (line.includes('|') && line.split('|').length >= 3 && !inTable) {
const nextLine = lines[i + 1]
if (nextLine?.includes('|') && nextLine.includes('-')) {
inTable = true
// Save current chunk if it has content
if (currentChunk.length > 0 && currentChunk.join('\n').trim().length > 50) {
const chunkText = currentChunk.join('\n').trim()
const withHeader =
chunks.length === 0 && header ? `${header}\n\n${chunkText}` : chunkText
chunks.push(withHeader)
currentChunk = []
}
tableLines = [line]
continue
}
}
if (inTable) {
tableLines.push(line)
// Detect table end
if (!line.includes('|') || line.trim() === '') {
inTable = false
// Save table as its own chunk
const tableText = tableLines
.filter((l) => l.trim())
.join('\n')
.trim()
if (tableText.length > 0) {
const withHeader =
chunks.length === 0 && header ? `${header}\n\n${tableText}` : tableText
chunks.push(withHeader)
}
tableLines = []
// Start new chunk if current line has content
if (line.trim() !== '') {
currentChunk = [line]
}
}
} else {
currentChunk.push(line)
// If chunk is getting large, save it
if (this.estimateTokens(currentChunk.join('\n')) > 250) {
const chunkText = currentChunk.join('\n').trim()
if (chunkText.length > 50) {
const withHeader =
chunks.length === 0 && header ? `${header}\n\n${chunkText}` : chunkText
chunks.push(withHeader)
}
currentChunk = []
}
}
}
// Handle remaining content
if (inTable && tableLines.length > 0) {
const tableText = tableLines
.filter((l) => l.trim())
.join('\n')
.trim()
if (tableText.length > 0) {
const withHeader = chunks.length === 0 && header ? `${header}\n\n${tableText}` : tableText
chunks.push(withHeader)
}
} else if (currentChunk.length > 0) {
const chunkText = currentChunk.join('\n').trim()
if (chunkText.length > 50) {
const withHeader = chunks.length === 0 && header ? `${header}\n\n${chunkText}` : chunkText
chunks.push(withHeader)
}
}
return chunks.filter((chunk) => chunk.trim().length > 50)
}
/**
* Detect table boundaries in markdown content to avoid splitting them
*/

View File

@@ -0,0 +1,5 @@
export { DocsChunker } from './docs-chunker'
export { JsonYamlChunker } from './json-yaml-chunker'
export { StructuredDataChunker } from './structured-data-chunker'
export { TextChunker } from './text-chunker'
export * from './types'

View File

@@ -0,0 +1,317 @@
import { estimateTokenCount } from '@/lib/tokenization/estimators'
import type { Chunk, ChunkerOptions } from './types'
function getTokenCount(text: string): number {
const estimate = estimateTokenCount(text)
return estimate.count
}
/**
* Configuration for JSON/YAML chunking
*/
const JSON_YAML_CHUNKING_CONFIG = {
TARGET_CHUNK_SIZE: 2000, // Target tokens per chunk
MIN_CHUNK_SIZE: 100, // Minimum tokens per chunk
MAX_CHUNK_SIZE: 3000, // Maximum tokens per chunk
MAX_DEPTH_FOR_SPLITTING: 5, // Maximum depth to traverse for splitting
}
export class JsonYamlChunker {
private chunkSize: number
private minChunkSize: number
constructor(options: ChunkerOptions = {}) {
this.chunkSize = options.chunkSize || JSON_YAML_CHUNKING_CONFIG.TARGET_CHUNK_SIZE
this.minChunkSize = options.minChunkSize || JSON_YAML_CHUNKING_CONFIG.MIN_CHUNK_SIZE
}
/**
* Check if content is structured JSON/YAML data
*/
static isStructuredData(content: string): boolean {
try {
JSON.parse(content)
return true
} catch {
try {
const yaml = require('js-yaml')
yaml.load(content)
return true
} catch {
return false
}
}
}
/**
* Chunk JSON/YAML content intelligently based on structure
*/
async chunk(content: string): Promise<Chunk[]> {
try {
const data = JSON.parse(content)
return this.chunkStructuredData(data)
} catch (error) {
return this.chunkAsText(content)
}
}
/**
* Chunk structured data based on its structure
*/
private chunkStructuredData(data: any, path: string[] = []): Chunk[] {
const chunks: Chunk[] = []
if (Array.isArray(data)) {
return this.chunkArray(data, path)
}
if (typeof data === 'object' && data !== null) {
return this.chunkObject(data, path)
}
const content = JSON.stringify(data, null, 2)
const tokenCount = getTokenCount(content)
if (tokenCount >= this.minChunkSize) {
chunks.push({
text: content,
tokenCount,
metadata: {
startIndex: 0,
endIndex: content.length,
},
})
}
return chunks
}
/**
* Chunk an array intelligently
*/
private chunkArray(arr: any[], path: string[]): Chunk[] {
const chunks: Chunk[] = []
let currentBatch: any[] = []
let currentTokens = 0
const contextHeader = path.length > 0 ? `// ${path.join('.')}\n` : ''
for (let i = 0; i < arr.length; i++) {
const item = arr[i]
const itemStr = JSON.stringify(item, null, 2)
const itemTokens = getTokenCount(itemStr)
if (itemTokens > this.chunkSize) {
// Save current batch if it has items
if (currentBatch.length > 0) {
const batchContent = contextHeader + JSON.stringify(currentBatch, null, 2)
chunks.push({
text: batchContent,
tokenCount: getTokenCount(batchContent),
metadata: {
startIndex: i - currentBatch.length,
endIndex: i - 1,
},
})
currentBatch = []
currentTokens = 0
}
if (typeof item === 'object' && item !== null) {
const subChunks = this.chunkStructuredData(item, [...path, `[${i}]`])
chunks.push(...subChunks)
} else {
chunks.push({
text: contextHeader + itemStr,
tokenCount: itemTokens,
metadata: {
startIndex: i,
endIndex: i,
},
})
}
} else if (currentTokens + itemTokens > this.chunkSize && currentBatch.length > 0) {
const batchContent = contextHeader + JSON.stringify(currentBatch, null, 2)
chunks.push({
text: batchContent,
tokenCount: currentTokens,
metadata: {
startIndex: i - currentBatch.length,
endIndex: i - 1,
},
})
currentBatch = [item]
currentTokens = itemTokens
} else {
currentBatch.push(item)
currentTokens += itemTokens
}
}
if (currentBatch.length > 0) {
const batchContent = contextHeader + JSON.stringify(currentBatch, null, 2)
chunks.push({
text: batchContent,
tokenCount: currentTokens,
metadata: {
startIndex: arr.length - currentBatch.length,
endIndex: arr.length - 1,
},
})
}
return chunks
}
/**
* Chunk an object intelligently
*/
private chunkObject(obj: Record<string, any>, path: string[]): Chunk[] {
const chunks: Chunk[] = []
const entries = Object.entries(obj)
const fullContent = JSON.stringify(obj, null, 2)
const fullTokens = getTokenCount(fullContent)
if (fullTokens <= this.chunkSize) {
chunks.push({
text: fullContent,
tokenCount: fullTokens,
metadata: {
startIndex: 0,
endIndex: fullContent.length,
},
})
return chunks
}
let currentObj: Record<string, any> = {}
let currentTokens = 0
let currentKeys: string[] = []
for (const [key, value] of entries) {
const valueStr = JSON.stringify({ [key]: value }, null, 2)
const valueTokens = getTokenCount(valueStr)
if (valueTokens > this.chunkSize) {
// Save current object if it has properties
if (Object.keys(currentObj).length > 0) {
const objContent = JSON.stringify(currentObj, null, 2)
chunks.push({
text: objContent,
tokenCount: currentTokens,
metadata: {
startIndex: 0,
endIndex: objContent.length,
},
})
currentObj = {}
currentTokens = 0
currentKeys = []
}
if (typeof value === 'object' && value !== null) {
const subChunks = this.chunkStructuredData(value, [...path, key])
chunks.push(...subChunks)
} else {
chunks.push({
text: valueStr,
tokenCount: valueTokens,
metadata: {
startIndex: 0,
endIndex: valueStr.length,
},
})
}
} else if (
currentTokens + valueTokens > this.chunkSize &&
Object.keys(currentObj).length > 0
) {
const objContent = JSON.stringify(currentObj, null, 2)
chunks.push({
text: objContent,
tokenCount: currentTokens,
metadata: {
startIndex: 0,
endIndex: objContent.length,
},
})
currentObj = { [key]: value }
currentTokens = valueTokens
currentKeys = [key]
} else {
currentObj[key] = value
currentTokens += valueTokens
currentKeys.push(key)
}
}
if (Object.keys(currentObj).length > 0) {
const objContent = JSON.stringify(currentObj, null, 2)
chunks.push({
text: objContent,
tokenCount: currentTokens,
metadata: {
startIndex: 0,
endIndex: objContent.length,
},
})
}
return chunks
}
/**
* Fall back to text chunking if JSON parsing fails.
*/
private async chunkAsText(content: string): Promise<Chunk[]> {
const chunks: Chunk[] = []
const lines = content.split('\n')
let currentChunk = ''
let currentTokens = 0
let startIndex = 0
for (const line of lines) {
const lineTokens = getTokenCount(line)
if (currentTokens + lineTokens > this.chunkSize && currentChunk) {
chunks.push({
text: currentChunk,
tokenCount: currentTokens,
metadata: {
startIndex,
endIndex: startIndex + currentChunk.length,
},
})
startIndex += currentChunk.length + 1
currentChunk = line
currentTokens = lineTokens
} else {
currentChunk = currentChunk ? `${currentChunk}\n${line}` : line
currentTokens += lineTokens
}
}
if (currentChunk && currentTokens >= this.minChunkSize) {
chunks.push({
text: currentChunk,
tokenCount: currentTokens,
metadata: {
startIndex,
endIndex: startIndex + currentChunk.length,
},
})
}
return chunks
}
/**
* Static method for chunking JSON/YAML data with default options.
*/
static async chunkJsonYaml(content: string, options: ChunkerOptions = {}): Promise<Chunk[]> {
const chunker = new JsonYamlChunker(options)
return chunker.chunk(content)
}
}

View File

@@ -0,0 +1,220 @@
import type { Chunk, StructuredDataOptions } from './types'
// Configuration for structured data chunking (CSV, XLSX, etc.)
const STRUCTURED_CHUNKING_CONFIG = {
// Target 2000-3000 tokens per chunk for better semantic meaning
TARGET_CHUNK_SIZE: 2500,
MIN_CHUNK_SIZE: 500,
MAX_CHUNK_SIZE: 4000,
// For spreadsheets, group rows together
ROWS_PER_CHUNK: 100, // Start with 100 rows per chunk
MIN_ROWS_PER_CHUNK: 20,
MAX_ROWS_PER_CHUNK: 500,
// For better embeddings quality
INCLUDE_HEADERS_IN_EACH_CHUNK: true,
MAX_HEADER_SIZE: 200, // tokens
}
/**
* Smart chunker for structured data (CSV, XLSX) that preserves semantic meaning
*/
export class StructuredDataChunker {
/**
* Chunk structured data intelligently based on rows and semantic boundaries
*/
static async chunkStructuredData(
content: string,
options: StructuredDataOptions = {}
): Promise<Chunk[]> {
const chunks: Chunk[] = []
const lines = content.split('\n').filter((line) => line.trim())
if (lines.length === 0) {
return chunks
}
// Detect headers (first line or provided)
const headerLine = options.headers?.join('\t') || lines[0]
const dataStartIndex = options.headers ? 0 : 1
// Calculate optimal rows per chunk based on content
const estimatedTokensPerRow = StructuredDataChunker.estimateTokensPerRow(
lines.slice(dataStartIndex, Math.min(10, lines.length))
)
const optimalRowsPerChunk =
StructuredDataChunker.calculateOptimalRowsPerChunk(estimatedTokensPerRow)
console.log(
`Structured data chunking: ${lines.length} rows, ~${estimatedTokensPerRow} tokens/row, ${optimalRowsPerChunk} rows/chunk`
)
let currentChunkRows: string[] = []
let currentTokenEstimate = 0
const headerTokens = StructuredDataChunker.estimateTokens(headerLine)
let chunkStartRow = dataStartIndex
for (let i = dataStartIndex; i < lines.length; i++) {
const row = lines[i]
const rowTokens = StructuredDataChunker.estimateTokens(row)
// Check if adding this row would exceed our target
const projectedTokens =
currentTokenEstimate +
rowTokens +
(STRUCTURED_CHUNKING_CONFIG.INCLUDE_HEADERS_IN_EACH_CHUNK ? headerTokens : 0)
const shouldCreateChunk =
(projectedTokens > STRUCTURED_CHUNKING_CONFIG.TARGET_CHUNK_SIZE &&
currentChunkRows.length >= STRUCTURED_CHUNKING_CONFIG.MIN_ROWS_PER_CHUNK) ||
currentChunkRows.length >= optimalRowsPerChunk
if (shouldCreateChunk && currentChunkRows.length > 0) {
// Create chunk with current rows
const chunkContent = StructuredDataChunker.formatChunk(
headerLine,
currentChunkRows,
options.sheetName
)
chunks.push(StructuredDataChunker.createChunk(chunkContent, chunkStartRow, i - 1))
// Reset for next chunk
currentChunkRows = []
currentTokenEstimate = 0
chunkStartRow = i
}
currentChunkRows.push(row)
currentTokenEstimate += rowTokens
}
// Add remaining rows as final chunk
if (currentChunkRows.length > 0) {
const chunkContent = StructuredDataChunker.formatChunk(
headerLine,
currentChunkRows,
options.sheetName
)
chunks.push(StructuredDataChunker.createChunk(chunkContent, chunkStartRow, lines.length - 1))
}
console.log(`Created ${chunks.length} chunks from ${lines.length} rows of structured data`)
return chunks
}
/**
* Format a chunk with headers and context
*/
private static formatChunk(headerLine: string, rows: string[], sheetName?: string): string {
let content = ''
// Add sheet name context if available
if (sheetName) {
content += `=== ${sheetName} ===\n\n`
}
// Add headers for context
if (STRUCTURED_CHUNKING_CONFIG.INCLUDE_HEADERS_IN_EACH_CHUNK) {
content += `Headers: ${headerLine}\n`
content += `${'-'.repeat(Math.min(80, headerLine.length))}\n`
}
// Add data rows
content += rows.join('\n')
// Add row count for context
content += `\n\n[Rows ${rows.length} of data]`
return content
}
/**
* Create a chunk object with actual row indices
*/
private static createChunk(content: string, startRow: number, endRow: number): Chunk {
const tokenCount = StructuredDataChunker.estimateTokens(content)
return {
text: content,
tokenCount,
metadata: {
startIndex: startRow,
endIndex: endRow,
},
}
}
/**
* Estimate tokens in text (rough approximation)
*/
private static estimateTokens(text: string): number {
// Rough estimate: 1 token per 4 characters for English text
// For structured data with numbers, it's closer to 1 token per 3 characters
return Math.ceil(text.length / 3)
}
/**
* Estimate average tokens per row from sample
*/
private static estimateTokensPerRow(sampleRows: string[]): number {
if (sampleRows.length === 0) return 50 // default estimate
const totalTokens = sampleRows.reduce(
(sum, row) => sum + StructuredDataChunker.estimateTokens(row),
0
)
return Math.ceil(totalTokens / sampleRows.length)
}
/**
* Calculate optimal rows per chunk based on token estimates
*/
private static calculateOptimalRowsPerChunk(tokensPerRow: number): number {
const optimal = Math.floor(STRUCTURED_CHUNKING_CONFIG.TARGET_CHUNK_SIZE / tokensPerRow)
return Math.min(
Math.max(optimal, STRUCTURED_CHUNKING_CONFIG.MIN_ROWS_PER_CHUNK),
STRUCTURED_CHUNKING_CONFIG.MAX_ROWS_PER_CHUNK
)
}
/**
* Check if content appears to be structured data
*/
static isStructuredData(content: string, mimeType?: string): boolean {
// Check mime type first
if (mimeType) {
const structuredMimeTypes = [
'text/csv',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.ms-excel',
'text/tab-separated-values',
]
if (structuredMimeTypes.includes(mimeType)) {
return true
}
}
// Check content structure
const lines = content.split('\n').slice(0, 10) // Check first 10 lines
if (lines.length < 2) return false
// Check for consistent delimiters (comma, tab, pipe)
const delimiters = [',', '\t', '|']
for (const delimiter of delimiters) {
const counts = lines.map(
(line) => (line.match(new RegExp(`\\${delimiter}`, 'g')) || []).length
)
const avgCount = counts.reduce((a, b) => a + b, 0) / counts.length
// If most lines have similar delimiter counts, it's likely structured
if (avgCount > 2 && counts.every((c) => Math.abs(c - avgCount) <= 2)) {
return true
}
}
return false
}
}

View File

@@ -1,28 +1,4 @@
export interface ChunkMetadata {
startIndex: number
endIndex: number
tokenCount: number
}
export interface TextChunk {
text: string
metadata: ChunkMetadata
}
export interface ChunkerOptions {
chunkSize?: number
minChunkSize?: number
overlap?: number
}
export interface Chunk {
text: string
tokenCount: number
metadata: {
startIndex: number
endIndex: number
}
}
import type { Chunk, ChunkerOptions } from './types'
/**
* Lightweight text chunker optimized for RAG applications

View File

@@ -0,0 +1,53 @@
export interface ChunkMetadata {
startIndex: number
endIndex: number
tokenCount: number
}
export interface TextChunk {
text: string
metadata: ChunkMetadata
}
export interface ChunkerOptions {
chunkSize?: number
minChunkSize?: number
overlap?: number
}
export interface Chunk {
text: string
tokenCount: number
metadata: {
startIndex: number
endIndex: number
}
}
export interface StructuredDataOptions {
headers?: string[]
totalRows?: number
sheetName?: string
}
export interface DocChunk {
text: string
tokenCount: number
sourceDocument: string
headerLink: string
headerText: string
headerLevel: number
embedding: number[]
embeddingModel: string
metadata: {
sourceUrl?: string
headers?: string[]
title?: string
startIndex: number
endIndex: number
}
}
export interface DocsChunkerOptions extends ChunkerOptions {
baseUrl?: string
}

View File

@@ -114,7 +114,8 @@ export async function generateEmbeddings(
logger.info(`Using ${config.useAzure ? 'Azure OpenAI' : 'OpenAI'} for embeddings generation`)
const batchSize = 100
// Reduced batch size to prevent API timeouts and improve reliability
const batchSize = 50 // Reduced from 100 to prevent issues with large documents
const allEmbeddings: number[][] = []
for (let i = 0; i < texts.length; i += batchSize) {
@@ -125,6 +126,11 @@ export async function generateEmbeddings(
logger.info(
`Generated embeddings for batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(texts.length / batchSize)}`
)
// Add small delay between batches to avoid rate limiting
if (i + batchSize < texts.length) {
await new Promise((resolve) => setTimeout(resolve, 100))
}
}
return allEmbeddings

View File

@@ -17,8 +17,6 @@ export const env = createEnv({
server: {
// Core Database & Authentication
DATABASE_URL: z.string().url(), // Primary database connection string
DATABASE_SSL: z.enum(['disable', 'prefer', 'require', 'verify-ca', 'verify-full']).optional(), // PostgreSQL SSL mode
DATABASE_SSL_CA: z.string().optional(), // Base64-encoded CA certificate for SSL verification
BETTER_AUTH_URL: z.string().url(), // Base URL for Better Auth service
BETTER_AUTH_SECRET: z.string().min(32), // Secret key for Better Auth JWT signing
DISABLE_REGISTRATION: z.boolean().optional(), // Flag to disable new user registration
@@ -36,7 +34,6 @@ export const env = createEnv({
AGENT_INDEXER_URL: z.string().url().optional(), // URL for agent training data indexer
AGENT_INDEXER_API_KEY: z.string().min(1).optional(), // API key for agent indexer authentication
// Database & Storage
REDIS_URL: z.string().url().optional(), // Redis connection string for caching/sessions

View File

@@ -1,108 +1,154 @@
import { existsSync, readFileSync } from 'fs'
import * as Papa from 'papaparse'
import { createReadStream, existsSync } from 'fs'
import { Readable } from 'stream'
import { type Options, parse } from 'csv-parse'
import type { FileParseResult, FileParser } from '@/lib/file-parsers/types'
import { sanitizeTextForUTF8 } from '@/lib/file-parsers/utils'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('CsvParser')
const PARSE_OPTIONS = {
header: true,
skipEmptyLines: true,
transformHeader: (header: string) => sanitizeTextForUTF8(String(header)),
transform: (value: string) => sanitizeTextForUTF8(String(value || '')),
const CONFIG = {
MAX_PREVIEW_ROWS: 1000, // Only keep first 1000 rows for preview
MAX_SAMPLE_ROWS: 100, // Sample for metadata
MAX_ERRORS: 100, // Stop after 100 errors
STREAM_CHUNK_SIZE: 16384, // 16KB chunks for streaming
}
export class CsvParser implements FileParser {
async parseFile(filePath: string): Promise<FileParseResult> {
try {
if (!filePath) {
throw new Error('No file path provided')
}
if (!existsSync(filePath)) {
throw new Error(`File not found: ${filePath}`)
}
const fileContent = readFileSync(filePath, 'utf8')
const parseResult = Papa.parse(fileContent, PARSE_OPTIONS)
if (parseResult.errors && parseResult.errors.length > 0) {
const errorMessages = parseResult.errors.map((err) => err.message).join(', ')
logger.error('CSV parsing errors:', parseResult.errors)
throw new Error(`Failed to parse CSV file: ${errorMessages}`)
}
const results = parseResult.data as Record<string, any>[]
const headers = parseResult.meta.fields || []
let content = ''
if (headers.length > 0) {
const cleanHeaders = headers.map((h) => sanitizeTextForUTF8(String(h)))
content += `${cleanHeaders.join(', ')}\n`
}
results.forEach((row) => {
const cleanValues = Object.values(row).map((v) => sanitizeTextForUTF8(String(v || '')))
content += `${cleanValues.join(', ')}\n`
})
return {
content: sanitizeTextForUTF8(content),
metadata: {
rowCount: results.length,
headers: headers,
rawData: results,
},
}
} catch (error) {
logger.error('CSV general error:', error)
throw new Error(`Failed to process CSV file: ${(error as Error).message}`)
if (!filePath) {
throw new Error('No file path provided')
}
if (!existsSync(filePath)) {
throw new Error(`File not found: ${filePath}`)
}
const stream = createReadStream(filePath, {
highWaterMark: CONFIG.STREAM_CHUNK_SIZE,
})
return this.parseStream(stream)
}
async parseBuffer(buffer: Buffer): Promise<FileParseResult> {
try {
logger.info('Parsing buffer, size:', buffer.length)
const bufferSize = buffer.length
logger.info(
`Parsing CSV buffer, size: ${bufferSize} bytes (${(bufferSize / 1024 / 1024).toFixed(2)} MB)`
)
const fileContent = buffer.toString('utf8')
const stream = Readable.from(buffer, {
highWaterMark: CONFIG.STREAM_CHUNK_SIZE,
})
const parseResult = Papa.parse(fileContent, PARSE_OPTIONS)
return this.parseStream(stream)
}
if (parseResult.errors && parseResult.errors.length > 0) {
const errorMessages = parseResult.errors.map((err) => err.message).join(', ')
logger.error('CSV parsing errors:', parseResult.errors)
throw new Error(`Failed to parse CSV buffer: ${errorMessages}`)
private parseStream(inputStream: NodeJS.ReadableStream): Promise<FileParseResult> {
return new Promise((resolve, reject) => {
let rowCount = 0
let errorCount = 0
let headers: string[] = []
let processedContent = ''
const sampledRows: any[] = []
const errors: string[] = []
let firstRowProcessed = false
let aborted = false
const parserOptions: Options = {
columns: true, // Use first row as headers
skip_empty_lines: true, // Skip empty lines
trim: true, // Trim whitespace
relax_column_count: true, // Allow variable column counts
relax_quotes: true, // Be lenient with quotes
skip_records_with_error: true, // Skip bad records
raw: false,
cast: false,
}
const parser = parse(parserOptions)
const results = parseResult.data as Record<string, any>[]
const headers = parseResult.meta.fields || []
parser.on('readable', () => {
let record
while ((record = parser.read()) !== null && !aborted) {
rowCount++
let content = ''
if (!firstRowProcessed && record) {
headers = Object.keys(record).map((h) => sanitizeTextForUTF8(String(h)))
processedContent = `${headers.join(', ')}\n`
firstRowProcessed = true
}
if (headers.length > 0) {
const cleanHeaders = headers.map((h) => sanitizeTextForUTF8(String(h)))
content += `${cleanHeaders.join(', ')}\n`
}
if (rowCount <= CONFIG.MAX_PREVIEW_ROWS) {
try {
const cleanValues = Object.values(record).map((v: any) =>
sanitizeTextForUTF8(String(v || ''))
)
processedContent += `${cleanValues.join(', ')}\n`
results.forEach((row) => {
const cleanValues = Object.values(row).map((v) => sanitizeTextForUTF8(String(v || '')))
content += `${cleanValues.join(', ')}\n`
if (rowCount <= CONFIG.MAX_SAMPLE_ROWS) {
sampledRows.push(record)
}
} catch (err) {
logger.warn(`Error processing row ${rowCount}:`, err)
}
}
if (rowCount % 10000 === 0) {
logger.info(`Processed ${rowCount} rows...`)
}
}
})
return {
content: sanitizeTextForUTF8(content),
metadata: {
rowCount: results.length,
headers: headers,
rawData: results,
},
}
} catch (error) {
logger.error('CSV buffer parsing error:', error)
throw new Error(`Failed to process CSV buffer: ${(error as Error).message}`)
}
parser.on('skip', (err: any) => {
errorCount++
if (errorCount <= 5) {
const errorMsg = `Row ${err.lines || rowCount}: ${err.message || 'Unknown error'}`
errors.push(errorMsg)
logger.warn('CSV skip:', errorMsg)
}
if (errorCount >= CONFIG.MAX_ERRORS) {
aborted = true
parser.destroy()
reject(new Error(`Too many errors (${errorCount}). File may be corrupted.`))
}
})
parser.on('error', (err: Error) => {
logger.error('CSV parser error:', err)
reject(new Error(`CSV parsing failed: ${err.message}`))
})
parser.on('end', () => {
if (!aborted) {
if (rowCount > CONFIG.MAX_PREVIEW_ROWS) {
processedContent += `\n[... ${rowCount.toLocaleString()} total rows, showing first ${CONFIG.MAX_PREVIEW_ROWS} ...]\n`
}
logger.info(`CSV parsing complete: ${rowCount} rows, ${errorCount} errors`)
resolve({
content: sanitizeTextForUTF8(processedContent),
metadata: {
rowCount,
headers,
errorCount,
errors: errors.slice(0, 10),
truncated: rowCount > CONFIG.MAX_PREVIEW_ROWS,
sampledData: sampledRows,
},
})
}
})
inputStream.on('error', (err) => {
logger.error('Input stream error:', err)
parser.destroy()
reject(new Error(`Stream error: ${err.message}`))
})
inputStream.pipe(parser)
})
}
}

View File

@@ -27,8 +27,9 @@ function getParserInstances(): Record<string, FileParser> {
try {
const { CsvParser } = require('@/lib/file-parsers/csv-parser')
parserInstances.csv = new CsvParser()
logger.info('Loaded streaming CSV parser with csv-parse library')
} catch (error) {
logger.error('Failed to load CSV parser:', error)
logger.error('Failed to load streaming CSV parser:', error)
}
try {
@@ -63,6 +64,7 @@ function getParserInstances(): Record<string, FileParser> {
const { XlsxParser } = require('@/lib/file-parsers/xlsx-parser')
parserInstances.xlsx = new XlsxParser()
parserInstances.xls = new XlsxParser()
logger.info('Loaded XLSX parser')
} catch (error) {
logger.error('Failed to load XLSX parser:', error)
}
@@ -82,6 +84,32 @@ function getParserInstances(): Record<string, FileParser> {
} catch (error) {
logger.error('Failed to load HTML parser:', error)
}
try {
const { parseJSON, parseJSONBuffer } = require('@/lib/file-parsers/json-parser')
parserInstances.json = {
parseFile: parseJSON,
parseBuffer: parseJSONBuffer,
}
logger.info('Loaded JSON parser')
} catch (error) {
logger.error('Failed to load JSON parser:', error)
}
try {
const { parseYAML, parseYAMLBuffer } = require('@/lib/file-parsers/yaml-parser')
parserInstances.yaml = {
parseFile: parseYAML,
parseBuffer: parseYAMLBuffer,
}
parserInstances.yml = {
parseFile: parseYAML,
parseBuffer: parseYAMLBuffer,
}
logger.info('Loaded YAML parser')
} catch (error) {
logger.error('Failed to load YAML parser:', error)
}
} catch (error) {
logger.error('Error loading file parsers:', error)
}

View File

@@ -0,0 +1,74 @@
import type { FileParseResult } from './types'
/**
* Parse JSON files
*/
export async function parseJSON(filePath: string): Promise<FileParseResult> {
const fs = await import('fs/promises')
const content = await fs.readFile(filePath, 'utf-8')
try {
// Parse to validate JSON
const jsonData = JSON.parse(content)
// Return pretty-printed JSON for better readability
const formattedContent = JSON.stringify(jsonData, null, 2)
// Extract metadata about the JSON structure
const metadata = {
type: 'json',
isArray: Array.isArray(jsonData),
keys: Array.isArray(jsonData) ? [] : Object.keys(jsonData),
itemCount: Array.isArray(jsonData) ? jsonData.length : undefined,
depth: getJsonDepth(jsonData),
}
return {
content: formattedContent,
metadata,
}
} catch (error) {
throw new Error(`Invalid JSON: ${error instanceof Error ? error.message : 'Unknown error'}`)
}
}
/**
* Parse JSON from buffer
*/
export async function parseJSONBuffer(buffer: Buffer): Promise<FileParseResult> {
const content = buffer.toString('utf-8')
try {
const jsonData = JSON.parse(content)
const formattedContent = JSON.stringify(jsonData, null, 2)
const metadata = {
type: 'json',
isArray: Array.isArray(jsonData),
keys: Array.isArray(jsonData) ? [] : Object.keys(jsonData),
itemCount: Array.isArray(jsonData) ? jsonData.length : undefined,
depth: getJsonDepth(jsonData),
}
return {
content: formattedContent,
metadata,
}
} catch (error) {
throw new Error(`Invalid JSON: ${error instanceof Error ? error.message : 'Unknown error'}`)
}
}
/**
* Calculate the depth of a JSON object
*/
function getJsonDepth(obj: any): number {
if (obj === null || typeof obj !== 'object') return 0
if (Array.isArray(obj)) {
return obj.length > 0 ? 1 + Math.max(...obj.map(getJsonDepth)) : 1
}
const depths = Object.values(obj).map(getJsonDepth)
return depths.length > 0 ? 1 + Math.max(...depths) : 1
}

View File

@@ -6,6 +6,15 @@ import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('XlsxParser')
// Configuration for handling large XLSX files
const CONFIG = {
MAX_PREVIEW_ROWS: 1000, // Only keep first 1000 rows for preview
MAX_SAMPLE_ROWS: 100, // Sample for metadata
ROWS_PER_CHUNK: 50, // Aggregate 50 rows per chunk to reduce chunk count
MAX_CELL_LENGTH: 1000, // Truncate very long cell values
MAX_CONTENT_SIZE: 10 * 1024 * 1024, // 10MB max content size
}
export class XlsxParser implements FileParser {
async parseFile(filePath: string): Promise<FileParseResult> {
try {
@@ -19,7 +28,12 @@ export class XlsxParser implements FileParser {
logger.info(`Parsing XLSX file: ${filePath}`)
const workbook = XLSX.readFile(filePath)
// Read with streaming option for large files
const workbook = XLSX.readFile(filePath, {
dense: true, // Use dense mode for better memory efficiency
sheetStubs: false, // Don't create stub cells
})
return this.processWorkbook(workbook)
} catch (error) {
logger.error('XLSX file parsing error:', error)
@@ -29,13 +43,21 @@ export class XlsxParser implements FileParser {
async parseBuffer(buffer: Buffer): Promise<FileParseResult> {
try {
logger.info('Parsing XLSX buffer, size:', buffer.length)
const bufferSize = buffer.length
logger.info(
`Parsing XLSX buffer, size: ${bufferSize} bytes (${(bufferSize / 1024 / 1024).toFixed(2)} MB)`
)
if (!buffer || buffer.length === 0) {
throw new Error('Empty buffer provided')
}
const workbook = XLSX.read(buffer, { type: 'buffer' })
const workbook = XLSX.read(buffer, {
type: 'buffer',
dense: true, // Use dense mode for better memory efficiency
sheetStubs: false, // Don't create stub cells
})
return this.processWorkbook(workbook)
} catch (error) {
logger.error('XLSX buffer parsing error:', error)
@@ -45,44 +67,111 @@ export class XlsxParser implements FileParser {
private processWorkbook(workbook: XLSX.WorkBook): FileParseResult {
const sheetNames = workbook.SheetNames
const sheets: Record<string, any[]> = {}
let content = ''
let totalRows = 0
let truncated = false
let contentSize = 0
const sampledData: any[] = []
for (const sheetName of sheetNames) {
const worksheet = workbook.Sheets[sheetName]
const sheetData = XLSX.utils.sheet_to_json(worksheet, { header: 1 })
sheets[sheetName] = sheetData
totalRows += sheetData.length
// Get sheet dimensions
const range = XLSX.utils.decode_range(worksheet['!ref'] || 'A1')
const rowCount = range.e.r - range.s.r + 1
logger.info(`Processing sheet: ${sheetName} with ${rowCount} rows`)
// Convert to JSON with header row
const sheetData = XLSX.utils.sheet_to_json(worksheet, {
header: 1,
defval: '', // Default value for empty cells
blankrows: false, // Skip blank rows
})
const actualRowCount = sheetData.length
totalRows += actualRowCount
// Store limited sample for metadata
if (sampledData.length < CONFIG.MAX_SAMPLE_ROWS) {
const sampleSize = Math.min(CONFIG.MAX_SAMPLE_ROWS - sampledData.length, actualRowCount)
sampledData.push(...sheetData.slice(0, sampleSize))
}
// Only process limited rows for preview
const rowsToProcess = Math.min(actualRowCount, CONFIG.MAX_PREVIEW_ROWS)
const cleanSheetName = sanitizeTextForUTF8(sheetName)
content += `Sheet: ${cleanSheetName}\n`
content += `=${'='.repeat(cleanSheetName.length + 6)}\n\n`
if (sheetData.length > 0) {
sheetData.forEach((row: unknown, rowIndex: number) => {
if (Array.isArray(row) && row.length > 0) {
const rowString = row
.map((cell) => {
if (cell === null || cell === undefined) {
return ''
}
return sanitizeTextForUTF8(String(cell))
})
.join('\t')
// Add sheet header
const sheetHeader = `\n=== Sheet: ${cleanSheetName} ===\n`
content += sheetHeader
contentSize += sheetHeader.length
content += `${rowString}\n`
if (actualRowCount > 0) {
// Get headers if available
const headers = sheetData[0] as any[]
if (headers && headers.length > 0) {
const headerRow = headers.map((h) => this.truncateCell(h)).join('\t')
content += `${headerRow}\n`
content += `${'-'.repeat(Math.min(80, headerRow.length))}\n`
contentSize += headerRow.length + 82
}
// Process data rows in chunks
let chunkContent = ''
let chunkRowCount = 0
for (let i = 1; i < rowsToProcess; i++) {
const row = sheetData[i] as any[]
if (row && row.length > 0) {
const rowString = row.map((cell) => this.truncateCell(cell)).join('\t')
chunkContent += `${rowString}\n`
chunkRowCount++
// Add chunk separator every N rows for better readability
if (chunkRowCount >= CONFIG.ROWS_PER_CHUNK) {
content += chunkContent
contentSize += chunkContent.length
chunkContent = ''
chunkRowCount = 0
// Check content size limit
if (contentSize > CONFIG.MAX_CONTENT_SIZE) {
truncated = true
break
}
}
}
})
}
// Add remaining chunk content
if (chunkContent && contentSize < CONFIG.MAX_CONTENT_SIZE) {
content += chunkContent
contentSize += chunkContent.length
}
// Add truncation notice if needed
if (actualRowCount > rowsToProcess) {
const notice = `\n[... ${actualRowCount.toLocaleString()} total rows, showing first ${rowsToProcess.toLocaleString()} ...]\n`
content += notice
truncated = true
}
} else {
content += '[Empty sheet]\n'
}
content += '\n'
// Stop processing if content is too large
if (contentSize > CONFIG.MAX_CONTENT_SIZE) {
content += '\n[... Content truncated due to size limits ...]\n'
truncated = true
break
}
}
logger.info(`XLSX parsing completed: ${sheetNames.length} sheets, ${totalRows} total rows`)
logger.info(
`XLSX parsing completed: ${sheetNames.length} sheets, ${totalRows} total rows, truncated: ${truncated}`
)
const cleanContent = sanitizeTextForUTF8(content).trim()
@@ -92,8 +181,25 @@ export class XlsxParser implements FileParser {
sheetCount: sheetNames.length,
sheetNames: sheetNames,
totalRows: totalRows,
sheets: sheets,
truncated: truncated,
sampledData: sampledData.slice(0, CONFIG.MAX_SAMPLE_ROWS),
contentSize: contentSize,
},
}
}
private truncateCell(cell: any): string {
if (cell === null || cell === undefined) {
return ''
}
let cellStr = String(cell)
// Truncate very long cells
if (cellStr.length > CONFIG.MAX_CELL_LENGTH) {
cellStr = `${cellStr.substring(0, CONFIG.MAX_CELL_LENGTH)}...`
}
return sanitizeTextForUTF8(cellStr)
}
}

View File

@@ -0,0 +1,75 @@
import * as yaml from 'js-yaml'
import type { FileParseResult } from './types'
/**
* Parse YAML files
*/
export async function parseYAML(filePath: string): Promise<FileParseResult> {
const fs = await import('fs/promises')
const content = await fs.readFile(filePath, 'utf-8')
try {
// Parse YAML to validate and extract structure
const yamlData = yaml.load(content)
// Convert to JSON for consistent processing
const jsonContent = JSON.stringify(yamlData, null, 2)
// Extract metadata about the YAML structure
const metadata = {
type: 'yaml',
isArray: Array.isArray(yamlData),
keys: Array.isArray(yamlData) ? [] : Object.keys(yamlData || {}),
itemCount: Array.isArray(yamlData) ? yamlData.length : undefined,
depth: getYamlDepth(yamlData),
}
return {
content: jsonContent,
metadata,
}
} catch (error) {
throw new Error(`Invalid YAML: ${error instanceof Error ? error.message : 'Unknown error'}`)
}
}
/**
* Parse YAML from buffer
*/
export async function parseYAMLBuffer(buffer: Buffer): Promise<FileParseResult> {
const content = buffer.toString('utf-8')
try {
const yamlData = yaml.load(content)
const jsonContent = JSON.stringify(yamlData, null, 2)
const metadata = {
type: 'yaml',
isArray: Array.isArray(yamlData),
keys: Array.isArray(yamlData) ? [] : Object.keys(yamlData || {}),
itemCount: Array.isArray(yamlData) ? yamlData.length : undefined,
depth: getYamlDepth(yamlData),
}
return {
content: jsonContent,
metadata,
}
} catch (error) {
throw new Error(`Invalid YAML: ${error instanceof Error ? error.message : 'Unknown error'}`)
}
}
/**
* Calculate the depth of a YAML/JSON object
*/
function getYamlDepth(obj: any): number {
if (obj === null || typeof obj !== 'object') return 0
if (Array.isArray(obj)) {
return obj.length > 0 ? 1 + Math.max(...obj.map(getYamlDepth)) : 1
}
const depths = Object.values(obj).map(getYamlDepth)
return depths.length > 0 ? 1 + Math.max(...depths) : 1
}

View File

@@ -1,6 +1,6 @@
import { type Chunk, JsonYamlChunker, StructuredDataChunker, TextChunker } from '@/lib/chunkers'
import { env } from '@/lib/env'
import { parseBuffer, parseFile } from '@/lib/file-parsers'
import { type Chunk, TextChunker } from '@/lib/knowledge/documents/chunker'
import { retryWithExponentialBackoff } from '@/lib/knowledge/documents/utils'
import { createLogger } from '@/lib/logs/console/logger'
import {
@@ -15,8 +15,8 @@ import { mistralParserTool } from '@/tools/mistral/parser'
const logger = createLogger('DocumentProcessor')
const TIMEOUTS = {
FILE_DOWNLOAD: 60000,
MISTRAL_OCR_API: 90000,
FILE_DOWNLOAD: 180000,
MISTRAL_OCR_API: 120000,
} as const
type OCRResult = {
@@ -97,8 +97,32 @@ export async function processDocument(
const { content, processingMethod } = parseResult
const cloudUrl = 'cloudUrl' in parseResult ? parseResult.cloudUrl : undefined
const chunker = new TextChunker({ chunkSize, overlap: chunkOverlap, minChunkSize })
const chunks = await chunker.chunk(content)
let chunks: Chunk[]
const metadata = 'metadata' in parseResult ? parseResult.metadata : {}
const isJsonYaml =
metadata.type === 'json' ||
metadata.type === 'yaml' ||
mimeType.includes('json') ||
mimeType.includes('yaml')
if (isJsonYaml && JsonYamlChunker.isStructuredData(content)) {
logger.info('Using JSON/YAML chunker for structured data')
chunks = await JsonYamlChunker.chunkJsonYaml(content, {
chunkSize,
minChunkSize,
})
} else if (StructuredDataChunker.isStructuredData(content, mimeType)) {
logger.info('Using structured data chunker for spreadsheet/CSV content')
chunks = await StructuredDataChunker.chunkStructuredData(content, {
headers: metadata.headers,
totalRows: metadata.totalRows || metadata.rowCount,
sheetName: metadata.sheetNames?.[0],
})
} else {
const chunker = new TextChunker({ chunkSize, overlap: chunkOverlap, minChunkSize })
chunks = await chunker.chunk(content)
}
const characterCount = content.length
const tokenCount = chunks.reduce((sum, chunk) => sum + chunk.tokenCount, 0)
@@ -132,22 +156,23 @@ async function parseDocument(
content: string
processingMethod: 'file-parser' | 'mistral-ocr'
cloudUrl?: string
metadata?: any
}> {
const isPDF = mimeType === 'application/pdf'
const hasAzureMistralOCR =
env.OCR_AZURE_API_KEY && env.OCR_AZURE_ENDPOINT && env.OCR_AZURE_MODEL_NAME
const hasMistralOCR = env.MISTRAL_API_KEY
// Check Azure Mistral OCR configuration
if (isPDF && (hasAzureMistralOCR || hasMistralOCR)) {
if (hasAzureMistralOCR) {
logger.info(`Using Azure Mistral OCR: ${filename}`)
return parseWithAzureMistralOCR(fileUrl, filename, mimeType)
}
if (isPDF && hasAzureMistralOCR) {
logger.info(`Using Azure Mistral OCR: ${filename}`)
return parseWithAzureMistralOCR(fileUrl, filename, mimeType)
}
if (isPDF && hasMistralOCR) {
logger.info(`Using Mistral OCR: ${filename}`)
return parseWithMistralOCR(fileUrl, filename, mimeType)
if (hasMistralOCR) {
logger.info(`Using Mistral OCR: ${filename}`)
return parseWithMistralOCR(fileUrl, filename, mimeType)
}
}
logger.info(`Using file parser: ${filename}`)
@@ -200,9 +225,7 @@ async function downloadFileWithTimeout(fileUrl: string): Promise<Buffer> {
}
async function downloadFileForBase64(fileUrl: string): Promise<Buffer> {
// Handle different URL types for Azure Mistral OCR base64 requirement
if (fileUrl.startsWith('data:')) {
// Extract base64 data from data URI
const [, base64Data] = fileUrl.split(',')
if (!base64Data) {
throw new Error('Invalid data URI format')
@@ -210,10 +233,8 @@ async function downloadFileForBase64(fileUrl: string): Promise<Buffer> {
return Buffer.from(base64Data, 'base64')
}
if (fileUrl.startsWith('http')) {
// Download from HTTP(S) URL
return downloadFileWithTimeout(fileUrl)
}
// Local file - read it
const fs = await import('fs/promises')
return fs.readFile(fileUrl)
}
@@ -315,7 +336,6 @@ async function parseWithAzureMistralOCR(fileUrl: string, filename: string, mimeT
'Azure Mistral OCR'
)
// Azure Mistral OCR accepts data URIs with base64 content
const fileBuffer = await downloadFileForBase64(fileUrl)
const base64Data = fileBuffer.toString('base64')
const dataUri = `data:${mimeType};base64,${base64Data}`
@@ -409,21 +429,25 @@ async function parseWithMistralOCR(fileUrl: string, filename: string, mimeType:
async function parseWithFileParser(fileUrl: string, filename: string, mimeType: string) {
try {
let content: string
let metadata: any = {}
if (fileUrl.startsWith('data:')) {
content = await parseDataURI(fileUrl, filename, mimeType)
} else if (fileUrl.startsWith('http')) {
content = await parseHttpFile(fileUrl, filename)
const result = await parseHttpFile(fileUrl, filename)
content = result.content
metadata = result.metadata || {}
} else {
const result = await parseFile(fileUrl)
content = result.content
metadata = result.metadata || {}
}
if (!content.trim()) {
throw new Error('File parser returned empty content')
}
return { content, processingMethod: 'file-parser' as const, cloudUrl: undefined }
return { content, processingMethod: 'file-parser' as const, cloudUrl: undefined, metadata }
} catch (error) {
logger.error(`File parser failed for ${filename}:`, error)
throw error
@@ -448,7 +472,10 @@ async function parseDataURI(fileUrl: string, filename: string, mimeType: string)
return result.content
}
async function parseHttpFile(fileUrl: string, filename: string): Promise<string> {
async function parseHttpFile(
fileUrl: string,
filename: string
): Promise<{ content: string; metadata?: any }> {
const buffer = await downloadFileWithTimeout(fileUrl)
const extension = filename.split('.').pop()?.toLowerCase()
@@ -457,5 +484,5 @@ async function parseHttpFile(fileUrl: string, filename: string): Promise<string>
}
const result = await parseBuffer(buffer, extension)
return result.content
return result
}

View File

@@ -17,10 +17,18 @@ import type { DocumentSortField, SortOrder } from './types'
const logger = createLogger('DocumentService')
const TIMEOUTS = {
OVERALL_PROCESSING: (env.KB_CONFIG_MAX_DURATION || 300) * 1000,
OVERALL_PROCESSING: (env.KB_CONFIG_MAX_DURATION || 600) * 1000, // Increased to 10 minutes to match Trigger's timeout
EMBEDDINGS_API: (env.KB_CONFIG_MAX_TIMEOUT || 10000) * 18,
} as const
// Configuration for handling large documents
const LARGE_DOC_CONFIG = {
MAX_CHUNKS_PER_BATCH: 500, // Insert embeddings in batches of 500
MAX_EMBEDDING_BATCH: 50, // Generate embeddings in batches of 50
MAX_FILE_SIZE: 100 * 1024 * 1024, // 100MB max file size
MAX_CHUNKS_PER_DOCUMENT: 100000, // Maximum chunks allowed per document
}
/**
* Create a timeout wrapper for async operations
*/
@@ -448,14 +456,38 @@ export async function processDocumentAsync(
processingOptions.minCharactersPerChunk || 1
)
if (processed.chunks.length > LARGE_DOC_CONFIG.MAX_CHUNKS_PER_DOCUMENT) {
throw new Error(
`Document has ${processed.chunks.length.toLocaleString()} chunks, exceeding maximum of ${LARGE_DOC_CONFIG.MAX_CHUNKS_PER_DOCUMENT.toLocaleString()}. ` +
`This document is unusually large and may need to be split into multiple files or preprocessed to reduce content.`
)
}
const now = new Date()
logger.info(
`[${documentId}] Document parsed successfully, generating embeddings for ${processed.chunks.length} chunks`
)
// Generate embeddings in batches for large documents
const chunkTexts = processed.chunks.map((chunk) => chunk.text)
const embeddings = chunkTexts.length > 0 ? await generateEmbeddings(chunkTexts) : []
const embeddings: number[][] = []
if (chunkTexts.length > 0) {
const batchSize = LARGE_DOC_CONFIG.MAX_EMBEDDING_BATCH
const totalBatches = Math.ceil(chunkTexts.length / batchSize)
logger.info(`[${documentId}] Generating embeddings in ${totalBatches} batches`)
for (let i = 0; i < chunkTexts.length; i += batchSize) {
const batch = chunkTexts.slice(i, i + batchSize)
const batchNum = Math.floor(i / batchSize) + 1
logger.info(`[${documentId}] Processing embedding batch ${batchNum}/${totalBatches}`)
const batchEmbeddings = await generateEmbeddings(batch)
embeddings.push(...batchEmbeddings)
}
}
logger.info(`[${documentId}] Embeddings generated, fetching document tags`)
@@ -503,8 +535,24 @@ export async function processDocumentAsync(
}))
await db.transaction(async (tx) => {
// Insert embeddings in batches for large documents
if (embeddingRecords.length > 0) {
await tx.insert(embedding).values(embeddingRecords)
const batchSize = LARGE_DOC_CONFIG.MAX_CHUNKS_PER_BATCH
const totalBatches = Math.ceil(embeddingRecords.length / batchSize)
logger.info(
`[${documentId}] Inserting ${embeddingRecords.length} embeddings in ${totalBatches} batches`
)
for (let i = 0; i < embeddingRecords.length; i += batchSize) {
const batch = embeddingRecords.slice(i, i + batchSize)
const batchNum = Math.floor(i / batchSize) + 1
await tx.insert(embedding).values(batch)
logger.info(
`[${documentId}] Inserted batch ${batchNum}/${totalBatches} (${batch.length} records)`
)
}
}
await tx

View File

@@ -15,6 +15,9 @@ export const SUPPORTED_DOCUMENT_EXTENSIONS = [
'pptx',
'html',
'htm',
'json',
'yaml',
'yml',
] as const
export type SupportedDocumentExtension = (typeof SUPPORTED_DOCUMENT_EXTENSIONS)[number]
@@ -46,6 +49,9 @@ export const SUPPORTED_MIME_TYPES: Record<SupportedDocumentExtension, string[]>
],
html: ['text/html', 'application/xhtml+xml'],
htm: ['text/html', 'application/xhtml+xml'],
json: ['application/json', 'text/json', 'application/x-json'],
yaml: ['text/yaml', 'text/x-yaml', 'application/yaml', 'application/x-yaml'],
yml: ['text/yaml', 'text/x-yaml', 'application/yaml', 'application/x-yaml'],
}
export const ACCEPTED_FILE_TYPES = Object.values(SUPPORTED_MIME_TYPES).flat()

View File

@@ -70,6 +70,7 @@
"clsx": "^2.1.1",
"cmdk": "^1.0.0",
"croner": "^9.0.0",
"csv-parse": "6.1.0",
"date-fns": "4.1.0",
"encoding": "0.1.13",
"entities": "6.0.1",

View File

@@ -1,98 +0,0 @@
#!/usr/bin/env bun
import path from 'path'
import { DocsChunker } from '@/lib/knowledge/documents/docs-chunker'
import type { DocChunk } from '@/lib/knowledge/documents/types'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('ChunkDocsScript')
/**
* Script to chunk all .mdx files in the docs directory
*/
async function main() {
try {
// Initialize the docs chunker
const chunker = new DocsChunker({
chunkSize: 1024,
minChunkSize: 100,
overlap: 200,
baseUrl: 'https://docs.sim.ai',
})
// Path to the docs content directory
const docsPath = path.join(process.cwd(), '../../apps/docs/content/docs')
logger.info(`Processing docs from: ${docsPath}`)
// Process all .mdx files
const chunks = await chunker.chunkAllDocs(docsPath)
logger.info(`\n=== CHUNKING RESULTS ===`)
logger.info(`Total chunks: ${chunks.length}`)
// Group chunks by document
const chunksByDoc = chunks.reduce<Record<string, DocChunk[]>>((acc, chunk) => {
if (!acc[chunk.sourceDocument]) {
acc[chunk.sourceDocument] = []
}
acc[chunk.sourceDocument].push(chunk)
return acc
}, {})
// Display summary
logger.info(`\n=== DOCUMENT SUMMARY ===`)
for (const [doc, docChunks] of Object.entries(chunksByDoc)) {
logger.info(`${doc}: ${docChunks.length} chunks`)
}
// Display a few sample chunks
logger.info(`\n=== SAMPLE CHUNKS ===`)
chunks.slice(0, 3).forEach((chunk, index) => {
logger.info(`\nChunk ${index + 1}:`)
logger.info(` Source: ${chunk.sourceDocument}`)
logger.info(` Header: ${chunk.headerText} (Level ${chunk.headerLevel})`)
logger.info(` Link: ${chunk.headerLink}`)
logger.info(` Tokens: ${chunk.tokenCount}`)
logger.info(` Embedding: ${chunk.embedding.length} dimensions (${chunk.embeddingModel})`)
logger.info(
` Embedding Preview: [${chunk.embedding
.slice(0, 5)
.map((n) => n.toFixed(4))
.join(', ')}...]`
)
logger.info(` Text Preview: ${chunk.text.slice(0, 100)}...`)
})
// Calculate total token count
const totalTokens = chunks.reduce((sum, chunk) => sum + chunk.tokenCount, 0)
const chunksWithEmbeddings = chunks.filter((chunk) => chunk.embedding.length > 0).length
logger.info(`\n=== STATISTICS ===`)
logger.info(`Total tokens: ${totalTokens}`)
logger.info(`Average tokens per chunk: ${Math.round(totalTokens / chunks.length)}`)
logger.info(`Chunks with embeddings: ${chunksWithEmbeddings}/${chunks.length}`)
if (chunks.length > 0 && chunks[0].embedding.length > 0) {
logger.info(`Embedding model: ${chunks[0].embeddingModel}`)
logger.info(`Embedding dimensions: ${chunks[0].embedding.length}`)
}
const headerLevels = chunks.reduce<Record<number, number>>((acc, chunk) => {
acc[chunk.headerLevel] = (acc[chunk.headerLevel] || 0) + 1
return acc
}, {})
logger.info(`Header level distribution:`)
Object.entries(headerLevels)
.sort(([a], [b]) => Number(a) - Number(b))
.forEach(([level, count]) => {
logger.info(` H${level}: ${count} chunks`)
})
} catch (error) {
logger.error('Error processing docs:', error)
process.exit(1)
}
}
// Run the script
main().catch(console.error)

View File

@@ -1,215 +0,0 @@
#!/usr/bin/env bun
import path from 'path'
import { db } from '@sim/db'
import { docsEmbeddings } from '@sim/db/schema'
import { sql } from 'drizzle-orm'
import { isDev } from '@/lib/environment'
import { DocsChunker } from '@/lib/knowledge/documents/docs-chunker'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('ProcessDocsEmbeddings')
interface ProcessingOptions {
/** Clear existing docs embeddings before processing */
clearExisting?: boolean
/** Path to docs directory */
docsPath?: string
/** Base URL for generating links */
baseUrl?: string
/** Chunk size in tokens */
chunkSize?: number
/** Minimum chunk size in tokens */
minChunkSize?: number
/** Overlap between chunks in tokens */
overlap?: number
}
/**
* Production script to process documentation and save embeddings to database
*/
async function processDocsEmbeddings(options: ProcessingOptions = {}) {
const startTime = Date.now()
let processedChunks = 0
let failedChunks = 0
try {
// Configuration
const config = {
clearExisting: options.clearExisting ?? false,
docsPath: options.docsPath ?? path.join(process.cwd(), '../../apps/docs/content/docs/en'),
baseUrl: options.baseUrl ?? (isDev ? 'http://localhost:3001' : 'https://docs.sim.ai'),
chunkSize: options.chunkSize ?? 300, // Max 300 tokens per chunk
minChunkSize: options.minChunkSize ?? 100,
overlap: options.overlap ?? 50,
}
logger.info('🚀 Starting docs embedding processing...')
logger.info(`Configuration:`, {
docsPath: config.docsPath,
baseUrl: config.baseUrl,
chunkSize: config.chunkSize,
clearExisting: config.clearExisting,
})
const chunker = new DocsChunker({
chunkSize: config.chunkSize,
minChunkSize: config.minChunkSize,
overlap: config.overlap,
baseUrl: config.baseUrl,
})
logger.info(`📚 Processing docs from: ${config.docsPath}`)
const chunks = await chunker.chunkAllDocs(config.docsPath)
if (chunks.length === 0) {
logger.warn('⚠️ No chunks generated from docs')
return { success: false, processedChunks: 0, failedChunks: 0 }
}
logger.info(`📊 Generated ${chunks.length} chunks with embeddings`)
if (config.clearExisting) {
logger.info('🗑️ Clearing existing docs embeddings...')
try {
const deleteResult = await db.delete(docsEmbeddings)
logger.info(`✅ Successfully deleted existing embeddings`)
} catch (error) {
logger.error('❌ Failed to delete existing embeddings:', error)
throw new Error('Failed to clear existing embeddings')
}
}
const batchSize = 10
logger.info(`💾 Saving chunks to database (batch size: ${batchSize})...`)
for (let i = 0; i < chunks.length; i += batchSize) {
const batch = chunks.slice(i, i + batchSize)
try {
const batchData = batch.map((chunk) => ({
chunkText: chunk.text,
sourceDocument: chunk.sourceDocument,
sourceLink: chunk.headerLink,
headerText: chunk.headerText,
headerLevel: chunk.headerLevel,
tokenCount: chunk.tokenCount,
embedding: chunk.embedding,
embeddingModel: chunk.embeddingModel,
metadata: chunk.metadata,
}))
await db.insert(docsEmbeddings).values(batchData)
processedChunks += batch.length
if (i % (batchSize * 5) === 0 || i + batchSize >= chunks.length) {
logger.info(
` 💾 Saved ${Math.min(i + batchSize, chunks.length)}/${chunks.length} chunks`
)
}
} catch (error) {
logger.error(`❌ Failed to save batch ${Math.floor(i / batchSize) + 1}:`, error)
failedChunks += batch.length
}
}
const savedCount = await db
.select({ count: sql<number>`count(*)` })
.from(docsEmbeddings)
.then((result) => result[0]?.count || 0)
const duration = Date.now() - startTime
logger.info(`✅ Processing complete!`)
logger.info(`📊 Results:`)
logger.info(` • Total chunks processed: ${chunks.length}`)
logger.info(` • Successfully saved: ${processedChunks}`)
logger.info(` • Failed: ${failedChunks}`)
logger.info(` • Database total: ${savedCount}`)
logger.info(` • Duration: ${Math.round(duration / 1000)}s`)
const documentStats = chunks.reduce(
(acc, chunk) => {
if (!acc[chunk.sourceDocument]) {
acc[chunk.sourceDocument] = { chunks: 0, tokens: 0 }
}
acc[chunk.sourceDocument].chunks++
acc[chunk.sourceDocument].tokens += chunk.tokenCount
return acc
},
{} as Record<string, { chunks: number; tokens: number }>
)
logger.info(`📋 Document breakdown:`)
Object.entries(documentStats)
.sort(([, a], [, b]) => b.chunks - a.chunks)
.slice(0, 10)
.forEach(([doc, stats]) => {
logger.info(`${doc}: ${stats.chunks} chunks, ${stats.tokens} tokens`)
})
if (Object.keys(documentStats).length > 10) {
logger.info(` • ... and ${Object.keys(documentStats).length - 10} more documents`)
}
return {
success: failedChunks === 0,
processedChunks,
failedChunks,
totalChunks: chunks.length,
databaseCount: savedCount,
duration,
}
} catch (error) {
logger.error('💥 Fatal error during processing:', error)
return {
success: false,
processedChunks,
failedChunks,
error: error instanceof Error ? error.message : 'Unknown error',
}
}
}
/**
* Main function - handle command line arguments
*/
async function main() {
const args = process.argv.slice(2)
const options: ProcessingOptions = {}
if (args.includes('--clear')) {
options.clearExisting = true
}
if (args.includes('--help') || args.includes('-h')) {
console.log(`
Usage: bun run scripts/process-docs-embeddings.ts [options]
Options:
--clear Clear existing docs embeddings before processing
--help, -h Show this help message
Examples:
bun run scripts/process-docs-embeddings.ts
bun run scripts/process-docs-embeddings.ts --clear
`)
process.exit(0)
}
const result = await processDocsEmbeddings(options)
if (!result.success) {
process.exit(1)
}
}
if (import.meta.url.includes('process-docs-embeddings.ts')) {
main().catch((error) => {
logger.error('Script failed:', error)
process.exit(1)
})
}
export { processDocsEmbeddings }

View File

@@ -0,0 +1,256 @@
#!/usr/bin/env bun
import path from 'path'
import { db } from '@sim/db'
import { docsEmbeddings } from '@sim/db/schema'
import { sql } from 'drizzle-orm'
import { type DocChunk, DocsChunker } from '@/lib/chunkers'
import { isDev } from '@/lib/environment'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('ProcessDocs')
interface ProcessingOptions {
/** Clear existing docs embeddings before processing */
clearExisting?: boolean
/** Path to docs directory */
docsPath?: string
/** Base URL for generating links */
baseUrl?: string
/** Chunk size in tokens */
chunkSize?: number
/** Minimum chunk size */
minChunkSize?: number
/** Overlap between chunks */
overlap?: number
/** Dry run - only display results, don't save to DB */
dryRun?: boolean
/** Verbose output */
verbose?: boolean
}
/**
* Process documentation files and optionally save embeddings to database
*/
async function processDocs(options: ProcessingOptions = {}) {
const config = {
docsPath: options.docsPath || path.join(process.cwd(), '../../apps/docs/content/docs'),
baseUrl: options.baseUrl || (isDev ? 'http://localhost:4000' : 'https://docs.sim.ai'),
chunkSize: options.chunkSize || 1024,
minChunkSize: options.minChunkSize || 100,
overlap: options.overlap || 200,
clearExisting: options.clearExisting ?? false,
dryRun: options.dryRun ?? false,
verbose: options.verbose ?? false,
}
let processedChunks = 0
let failedChunks = 0
try {
logger.info('🚀 Starting docs processing with config:', {
docsPath: config.docsPath,
baseUrl: config.baseUrl,
chunkSize: config.chunkSize,
clearExisting: config.clearExisting,
dryRun: config.dryRun,
})
// Initialize the chunker
const chunker = new DocsChunker({
chunkSize: config.chunkSize,
minChunkSize: config.minChunkSize,
overlap: config.overlap,
baseUrl: config.baseUrl,
})
// Process all .mdx files
logger.info(`📚 Processing docs from: ${config.docsPath}`)
const chunks = await chunker.chunkAllDocs(config.docsPath)
if (chunks.length === 0) {
logger.warn('⚠️ No chunks generated from docs')
return { success: false, processedChunks: 0, failedChunks: 0 }
}
logger.info(`📊 Generated ${chunks.length} chunks with embeddings`)
// Group chunks by document for summary
const chunksByDoc = chunks.reduce<Record<string, DocChunk[]>>((acc, chunk) => {
if (!acc[chunk.sourceDocument]) {
acc[chunk.sourceDocument] = []
}
acc[chunk.sourceDocument].push(chunk)
return acc
}, {})
// Display summary
logger.info(`\n=== DOCUMENT SUMMARY ===`)
for (const [doc, docChunks] of Object.entries(chunksByDoc)) {
logger.info(`${doc}: ${docChunks.length} chunks`)
}
// Display sample chunks in verbose or dry-run mode
if (config.verbose || config.dryRun) {
logger.info(`\n=== SAMPLE CHUNKS ===`)
chunks.slice(0, 3).forEach((chunk, index) => {
logger.info(`\nChunk ${index + 1}:`)
logger.info(` Source: ${chunk.sourceDocument}`)
logger.info(` Header: ${chunk.headerText} (Level ${chunk.headerLevel})`)
logger.info(` Link: ${chunk.headerLink}`)
logger.info(` Tokens: ${chunk.tokenCount}`)
logger.info(` Embedding: ${chunk.embedding.length} dimensions (${chunk.embeddingModel})`)
if (config.verbose) {
logger.info(` Text Preview: ${chunk.text.substring(0, 200)}...`)
}
})
}
// If dry run, stop here
if (config.dryRun) {
logger.info('\n✅ Dry run complete - no data saved to database')
return { success: true, processedChunks: chunks.length, failedChunks: 0 }
}
// Clear existing embeddings if requested
if (config.clearExisting) {
logger.info('🗑️ Clearing existing docs embeddings...')
try {
await db.delete(docsEmbeddings)
logger.info(`✅ Successfully deleted existing embeddings`)
} catch (error) {
logger.error('❌ Failed to delete existing embeddings:', error)
throw new Error('Failed to clear existing embeddings')
}
}
// Save chunks to database in batches
const batchSize = 10
logger.info(`💾 Saving chunks to database (batch size: ${batchSize})...`)
for (let i = 0; i < chunks.length; i += batchSize) {
const batch = chunks.slice(i, i + batchSize)
try {
const batchData = batch.map((chunk) => ({
chunkText: chunk.text,
sourceDocument: chunk.sourceDocument,
sourceLink: chunk.headerLink,
headerText: chunk.headerText,
headerLevel: chunk.headerLevel,
tokenCount: chunk.tokenCount,
embedding: chunk.embedding,
embeddingModel: chunk.embeddingModel,
metadata: chunk.metadata,
}))
await db.insert(docsEmbeddings).values(batchData)
processedChunks += batch.length
if (i % (batchSize * 5) === 0 || i + batchSize >= chunks.length) {
logger.info(
` 💾 Saved ${Math.min(i + batchSize, chunks.length)}/${chunks.length} chunks`
)
}
} catch (error) {
logger.error(`❌ Failed to save batch ${Math.floor(i / batchSize) + 1}:`, error)
failedChunks += batch.length
}
}
// Verify final count
const savedCount = await db
.select({ count: sql<number>`count(*)` })
.from(docsEmbeddings)
.then((res) => res[0]?.count || 0)
logger.info(
`\n✅ Processing complete!\n` +
` 📊 Total chunks: ${chunks.length}\n` +
` ✅ Processed: ${processedChunks}\n` +
` ❌ Failed: ${failedChunks}\n` +
` 💾 Total in DB: ${savedCount}`
)
return { success: failedChunks === 0, processedChunks, failedChunks }
} catch (error) {
logger.error('❌ Fatal error during processing:', error)
return { success: false, processedChunks, failedChunks }
}
}
/**
* Main entry point with CLI argument parsing
*/
async function main() {
const args = process.argv.slice(2)
const options: ProcessingOptions = {
clearExisting: args.includes('--clear'),
dryRun: args.includes('--dry-run'),
verbose: args.includes('--verbose'),
}
// Parse custom path if provided
const pathIndex = args.indexOf('--path')
if (pathIndex !== -1 && args[pathIndex + 1]) {
options.docsPath = args[pathIndex + 1]
}
// Parse custom base URL if provided
const urlIndex = args.indexOf('--url')
if (urlIndex !== -1 && args[urlIndex + 1]) {
options.baseUrl = args[urlIndex + 1]
}
// Parse chunk size if provided
const chunkSizeIndex = args.indexOf('--chunk-size')
if (chunkSizeIndex !== -1 && args[chunkSizeIndex + 1]) {
options.chunkSize = Number.parseInt(args[chunkSizeIndex + 1], 10)
}
// Show help if requested
if (args.includes('--help') || args.includes('-h')) {
console.log(`
📚 Process Documentation Script
Usage: bun run process-docs.ts [options]
Options:
--clear Clear existing embeddings before processing
--dry-run Process and display results without saving to DB
--verbose Show detailed output including text previews
--path <path> Custom path to docs directory
--url <url> Custom base URL for links
--chunk-size <n> Custom chunk size in tokens (default: 1024)
--help, -h Show this help message
Examples:
# Dry run to test chunking
bun run process-docs.ts --dry-run
# Process and save to database
bun run process-docs.ts
# Clear existing and reprocess
bun run process-docs.ts --clear
# Custom path with verbose output
bun run process-docs.ts --path ./my-docs --verbose
`)
process.exit(0)
}
const result = await processDocs(options)
process.exit(result.success ? 0 : 1)
}
// Run if executed directly
if (import.meta.url === `file://${process.argv[1]}`) {
main().catch((error) => {
logger.error('Fatal error:', error)
process.exit(1)
})
}
export { processDocs }

View File

@@ -229,19 +229,6 @@ export const mistralParserTool: ToolConfig<MistralParserInput, MistralParserOutp
}
}
// Log the request (with sensitive data redacted)
logger.info('Mistral OCR request:', {
url: url.toString(),
hasApiKey: !!params.apiKey,
model: requestBody.model,
options: {
includesImages: requestBody.include_image_base64 ?? 'not specified',
pages: requestBody.pages ?? 'all pages',
imageLimit: requestBody.image_limit ?? 'no limit',
imageMinSize: requestBody.image_min_size ?? 'no minimum',
},
})
return requestBody
},
},

View File

@@ -104,6 +104,7 @@
"clsx": "^2.1.1",
"cmdk": "^1.0.0",
"croner": "^9.0.0",
"csv-parse": "6.1.0",
"date-fns": "4.1.0",
"encoding": "0.1.13",
"entities": "6.0.1",
@@ -1664,6 +1665,8 @@
"csstype": ["csstype@3.1.3", "", {}, "sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw=="],
"csv-parse": ["csv-parse@6.1.0", "", {}, "sha512-CEE+jwpgLn+MmtCpVcPtiCZpVtB6Z2OKPTr34pycYYoL7sxdOkXDdQ4lRiw6ioC0q6BLqhc6cKweCVvral8yhw=="],
"d3-color": ["d3-color@3.1.0", "", {}, "sha512-zg/chbXyeBtMQ1LbD/WSoW2DpC3I0mpmPdW+ynRTj/x2DAWYrIY7qeZIHidozwV24m4iavr15lNwIwLxRmOxhA=="],
"d3-dispatch": ["d3-dispatch@3.0.1", "", {}, "sha512-rzUyPU/S7rwUflMyLc1ETDeBj0NRuHKKAcvukozwhshr6g6c5d8zh4c2gQjY2bZ0dXeGLWc1PF174P2tVvKhfg=="],