mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-10 15:38:00 -05:00
improvement(uploads): add multipart upload + batching + retries (#938)
* File upload retries + multipart uploads * Lint * FIle uploads * File uploads 2 * Lint * Fix file uploads * Add auth to file upload routes * Lint
This commit is contained in:
committed by
GitHub
parent
b159d63fbb
commit
70fa628a2a
164
apps/sim/app/api/files/multipart/route.ts
Normal file
164
apps/sim/app/api/files/multipart/route.ts
Normal file
@@ -0,0 +1,164 @@
|
||||
import {
|
||||
AbortMultipartUploadCommand,
|
||||
CompleteMultipartUploadCommand,
|
||||
CreateMultipartUploadCommand,
|
||||
UploadPartCommand,
|
||||
} from '@aws-sdk/client-s3'
|
||||
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { getStorageProvider, isUsingCloudStorage } from '@/lib/uploads'
|
||||
import { S3_KB_CONFIG } from '@/lib/uploads/setup'
|
||||
|
||||
const logger = createLogger('MultipartUploadAPI')
|
||||
|
||||
interface InitiateMultipartRequest {
|
||||
fileName: string
|
||||
contentType: string
|
||||
fileSize: number
|
||||
}
|
||||
|
||||
interface GetPartUrlsRequest {
|
||||
uploadId: string
|
||||
key: string
|
||||
partNumbers: number[]
|
||||
}
|
||||
|
||||
interface CompleteMultipartRequest {
|
||||
uploadId: string
|
||||
key: string
|
||||
parts: Array<{
|
||||
ETag: string
|
||||
PartNumber: number
|
||||
}>
|
||||
}
|
||||
|
||||
export async function POST(request: NextRequest) {
|
||||
try {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const action = request.nextUrl.searchParams.get('action')
|
||||
|
||||
if (!isUsingCloudStorage() || getStorageProvider() !== 's3') {
|
||||
return NextResponse.json(
|
||||
{ error: 'Multipart upload is only available with S3 storage' },
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const { getS3Client } = await import('@/lib/uploads/s3/s3-client')
|
||||
const s3Client = getS3Client()
|
||||
|
||||
switch (action) {
|
||||
case 'initiate': {
|
||||
const data: InitiateMultipartRequest = await request.json()
|
||||
const { fileName, contentType } = data
|
||||
|
||||
const safeFileName = fileName.replace(/\s+/g, '-').replace(/[^a-zA-Z0-9.-]/g, '_')
|
||||
const uniqueKey = `kb/${uuidv4()}-${safeFileName}`
|
||||
|
||||
const command = new CreateMultipartUploadCommand({
|
||||
Bucket: S3_KB_CONFIG.bucket,
|
||||
Key: uniqueKey,
|
||||
ContentType: contentType,
|
||||
Metadata: {
|
||||
originalName: fileName,
|
||||
uploadedAt: new Date().toISOString(),
|
||||
purpose: 'knowledge-base',
|
||||
},
|
||||
})
|
||||
|
||||
const response = await s3Client.send(command)
|
||||
|
||||
logger.info(`Initiated multipart upload for ${fileName}: ${response.UploadId}`)
|
||||
|
||||
return NextResponse.json({
|
||||
uploadId: response.UploadId,
|
||||
key: uniqueKey,
|
||||
})
|
||||
}
|
||||
|
||||
case 'get-part-urls': {
|
||||
const data: GetPartUrlsRequest = await request.json()
|
||||
const { uploadId, key, partNumbers } = data
|
||||
|
||||
const presignedUrls = await Promise.all(
|
||||
partNumbers.map(async (partNumber) => {
|
||||
const command = new UploadPartCommand({
|
||||
Bucket: S3_KB_CONFIG.bucket,
|
||||
Key: key,
|
||||
PartNumber: partNumber,
|
||||
UploadId: uploadId,
|
||||
})
|
||||
|
||||
const url = await getSignedUrl(s3Client, command, { expiresIn: 3600 })
|
||||
return { partNumber, url }
|
||||
})
|
||||
)
|
||||
|
||||
return NextResponse.json({ presignedUrls })
|
||||
}
|
||||
|
||||
case 'complete': {
|
||||
const data: CompleteMultipartRequest = await request.json()
|
||||
const { uploadId, key, parts } = data
|
||||
|
||||
const command = new CompleteMultipartUploadCommand({
|
||||
Bucket: S3_KB_CONFIG.bucket,
|
||||
Key: key,
|
||||
UploadId: uploadId,
|
||||
MultipartUpload: {
|
||||
Parts: parts.sort((a, b) => a.PartNumber - b.PartNumber),
|
||||
},
|
||||
})
|
||||
|
||||
const response = await s3Client.send(command)
|
||||
|
||||
logger.info(`Completed multipart upload for key ${key}`)
|
||||
|
||||
const finalPath = `/api/files/serve/s3/${encodeURIComponent(key)}`
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
location: response.Location,
|
||||
path: finalPath,
|
||||
key,
|
||||
})
|
||||
}
|
||||
|
||||
case 'abort': {
|
||||
const data = await request.json()
|
||||
const { uploadId, key } = data
|
||||
|
||||
const command = new AbortMultipartUploadCommand({
|
||||
Bucket: S3_KB_CONFIG.bucket,
|
||||
Key: key,
|
||||
UploadId: uploadId,
|
||||
})
|
||||
|
||||
await s3Client.send(command)
|
||||
|
||||
logger.info(`Aborted multipart upload for key ${key}`)
|
||||
|
||||
return NextResponse.json({ success: true })
|
||||
}
|
||||
|
||||
default:
|
||||
return NextResponse.json(
|
||||
{ error: 'Invalid action. Use: initiate, get-part-urls, complete, or abort' },
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Multipart upload error:', error)
|
||||
return NextResponse.json(
|
||||
{ error: error instanceof Error ? error.message : 'Multipart upload failed' },
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ import { PutObjectCommand } from '@aws-sdk/client-s3'
|
||||
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { getStorageProvider, isUsingCloudStorage } from '@/lib/uploads'
|
||||
// Dynamic imports for storage clients to avoid client-side bundling
|
||||
@@ -54,6 +55,11 @@ class ValidationError extends PresignedUrlError {
|
||||
|
||||
export async function POST(request: NextRequest) {
|
||||
try {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
let data: PresignedUrlRequest
|
||||
try {
|
||||
data = await request.json()
|
||||
@@ -61,7 +67,7 @@ export async function POST(request: NextRequest) {
|
||||
throw new ValidationError('Invalid JSON in request body')
|
||||
}
|
||||
|
||||
const { fileName, contentType, fileSize, userId, chatId } = data
|
||||
const { fileName, contentType, fileSize } = data
|
||||
|
||||
if (!fileName?.trim()) {
|
||||
throw new ValidationError('fileName is required and cannot be empty')
|
||||
@@ -90,10 +96,13 @@ export async function POST(request: NextRequest) {
|
||||
? 'copilot'
|
||||
: 'general'
|
||||
|
||||
// Validate copilot-specific requirements
|
||||
// Evaluate user id from session for copilot uploads
|
||||
const sessionUserId = session.user.id
|
||||
|
||||
// Validate copilot-specific requirements (use session user)
|
||||
if (uploadType === 'copilot') {
|
||||
if (!userId?.trim()) {
|
||||
throw new ValidationError('userId is required for copilot uploads')
|
||||
if (!sessionUserId?.trim()) {
|
||||
throw new ValidationError('Authenticated user session is required for copilot uploads')
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,9 +117,21 @@ export async function POST(request: NextRequest) {
|
||||
|
||||
switch (storageProvider) {
|
||||
case 's3':
|
||||
return await handleS3PresignedUrl(fileName, contentType, fileSize, uploadType, userId)
|
||||
return await handleS3PresignedUrl(
|
||||
fileName,
|
||||
contentType,
|
||||
fileSize,
|
||||
uploadType,
|
||||
sessionUserId
|
||||
)
|
||||
case 'blob':
|
||||
return await handleBlobPresignedUrl(fileName, contentType, fileSize, uploadType, userId)
|
||||
return await handleBlobPresignedUrl(
|
||||
fileName,
|
||||
contentType,
|
||||
fileSize,
|
||||
uploadType,
|
||||
sessionUserId
|
||||
)
|
||||
default:
|
||||
throw new StorageConfigError(`Unknown storage provider: ${storageProvider}`)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { getPresignedUrl, isUsingCloudStorage, uploadFile } from '@/lib/uploads'
|
||||
import '@/lib/uploads/setup.server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import {
|
||||
createErrorResponse,
|
||||
createOptionsResponse,
|
||||
@@ -14,6 +15,11 @@ const logger = createLogger('FilesUploadAPI')
|
||||
|
||||
export async function POST(request: NextRequest) {
|
||||
try {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const formData = await request.formData()
|
||||
|
||||
// Check if multiple files are being uploaded or a single file
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
'use client'
|
||||
|
||||
import { useRef, useState } from 'react'
|
||||
import { X } from 'lucide-react'
|
||||
import { Check, Loader2, X } from 'lucide-react'
|
||||
import { Button } from '@/components/ui/button'
|
||||
import { Dialog, DialogContent, DialogHeader, DialogTitle } from '@/components/ui/dialog'
|
||||
import { Label } from '@/components/ui/label'
|
||||
import { Progress } from '@/components/ui/progress'
|
||||
import { createLogger } from '@/lib/logs/console/logger'
|
||||
import { useKnowledgeUpload } from '@/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload'
|
||||
|
||||
@@ -151,9 +152,15 @@ export function UploadModal({
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate progress percentage
|
||||
const progressPercentage =
|
||||
uploadProgress.totalFiles > 0
|
||||
? Math.round((uploadProgress.filesCompleted / uploadProgress.totalFiles) * 100)
|
||||
: 0
|
||||
|
||||
return (
|
||||
<Dialog open={open} onOpenChange={handleClose}>
|
||||
<DialogContent className='flex max-h-[90vh] max-w-2xl flex-col overflow-hidden'>
|
||||
<DialogContent className='flex max-h-[95vh] max-w-2xl flex-col overflow-hidden'>
|
||||
<DialogHeader>
|
||||
<DialogTitle>Upload Documents</DialogTitle>
|
||||
</DialogHeader>
|
||||
@@ -218,30 +225,55 @@ export function UploadModal({
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<div className='max-h-40 space-y-2 overflow-auto'>
|
||||
{files.map((file, index) => (
|
||||
<div
|
||||
key={index}
|
||||
className='flex items-center justify-between rounded-md border p-3'
|
||||
>
|
||||
<div className='min-w-0 flex-1'>
|
||||
<p className='truncate font-medium text-sm'>{file.name}</p>
|
||||
<p className='text-muted-foreground text-xs'>
|
||||
{(file.size / 1024 / 1024).toFixed(2)} MB
|
||||
</p>
|
||||
<div className='max-h-60 space-y-1.5 overflow-auto'>
|
||||
{files.map((file, index) => {
|
||||
const fileStatus = uploadProgress.fileStatuses?.[index]
|
||||
const isCurrentlyUploading = fileStatus?.status === 'uploading'
|
||||
const isCompleted = fileStatus?.status === 'completed'
|
||||
const isFailed = fileStatus?.status === 'failed'
|
||||
|
||||
return (
|
||||
<div key={index} className='space-y-1.5 rounded-md border p-2'>
|
||||
<div className='flex items-center justify-between'>
|
||||
<div className='min-w-0 flex-1'>
|
||||
<div className='flex items-center gap-2'>
|
||||
{isCurrentlyUploading && (
|
||||
<Loader2 className='h-4 w-4 animate-spin text-blue-500' />
|
||||
)}
|
||||
{isCompleted && <Check className='h-4 w-4 text-green-500' />}
|
||||
{isFailed && <X className='h-4 w-4 text-red-500' />}
|
||||
{!isCurrentlyUploading && !isCompleted && !isFailed && (
|
||||
<div className='h-4 w-4' />
|
||||
)}
|
||||
<p className='truncate text-sm'>
|
||||
<span className='font-medium'>{file.name}</span>
|
||||
<span className='text-muted-foreground'>
|
||||
{' '}
|
||||
• {(file.size / 1024 / 1024).toFixed(2)} MB
|
||||
</span>
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
<Button
|
||||
type='button'
|
||||
variant='ghost'
|
||||
size='sm'
|
||||
onClick={() => removeFile(index)}
|
||||
disabled={isUploading}
|
||||
className='h-8 w-8 p-0'
|
||||
>
|
||||
<X className='h-4 w-4' />
|
||||
</Button>
|
||||
</div>
|
||||
{isCurrentlyUploading && (
|
||||
<Progress value={fileStatus?.progress || 0} className='h-1' />
|
||||
)}
|
||||
{isFailed && fileStatus?.error && (
|
||||
<p className='text-red-500 text-xs'>{fileStatus.error}</p>
|
||||
)}
|
||||
</div>
|
||||
<Button
|
||||
type='button'
|
||||
variant='ghost'
|
||||
size='sm'
|
||||
onClick={() => removeFile(index)}
|
||||
disabled={isUploading}
|
||||
className='h-8 w-8 p-0'
|
||||
>
|
||||
<X className='h-4 w-4' />
|
||||
</Button>
|
||||
</div>
|
||||
))}
|
||||
)
|
||||
})}
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
|
||||
@@ -18,11 +18,21 @@ export interface UploadedFile {
|
||||
tag7?: string
|
||||
}
|
||||
|
||||
export interface FileUploadStatus {
|
||||
fileName: string
|
||||
fileSize: number
|
||||
status: 'pending' | 'uploading' | 'completed' | 'failed'
|
||||
progress?: number // 0-100 percentage
|
||||
error?: string
|
||||
}
|
||||
|
||||
export interface UploadProgress {
|
||||
stage: 'idle' | 'uploading' | 'processing' | 'completing'
|
||||
filesCompleted: number
|
||||
totalFiles: number
|
||||
currentFile?: string
|
||||
currentFileProgress?: number // 0-100 percentage for current file
|
||||
fileStatuses?: FileUploadStatus[] // Track each file's status
|
||||
}
|
||||
|
||||
export interface UploadError {
|
||||
@@ -73,6 +83,19 @@ class ProcessingError extends KnowledgeUploadError {
|
||||
}
|
||||
}
|
||||
|
||||
// Upload configuration constants
|
||||
// Vercel has a 4.5MB body size limit for API routes
|
||||
const UPLOAD_CONFIG = {
|
||||
BATCH_SIZE: 5, // Upload 5 files in parallel
|
||||
MAX_RETRIES: 3, // Retry failed uploads up to 3 times
|
||||
RETRY_DELAY: 1000, // Initial retry delay in ms
|
||||
CHUNK_SIZE: 5 * 1024 * 1024,
|
||||
VERCEL_MAX_BODY_SIZE: 4.5 * 1024 * 1024, // Vercel's 4.5MB limit
|
||||
DIRECT_UPLOAD_THRESHOLD: 4 * 1024 * 1024, // Files > 4MB must use presigned URLs
|
||||
LARGE_FILE_THRESHOLD: 50 * 1024 * 1024, // Files > 50MB need multipart upload
|
||||
UPLOAD_TIMEOUT: 60000, // 60 second timeout per upload
|
||||
} as const
|
||||
|
||||
export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
|
||||
const [isUploading, setIsUploading] = useState(false)
|
||||
const [uploadProgress, setUploadProgress] = useState<UploadProgress>({
|
||||
@@ -126,6 +149,523 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload a single file with retry logic
|
||||
*/
|
||||
const uploadSingleFileWithRetry = async (
|
||||
file: File,
|
||||
retryCount = 0,
|
||||
fileIndex?: number
|
||||
): Promise<UploadedFile> => {
|
||||
try {
|
||||
// Create abort controller for timeout
|
||||
const controller = new AbortController()
|
||||
const timeoutId = setTimeout(() => controller.abort(), UPLOAD_CONFIG.UPLOAD_TIMEOUT)
|
||||
|
||||
try {
|
||||
// Get presigned URL
|
||||
const presignedResponse = await fetch('/api/files/presigned?type=knowledge-base', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
fileName: file.name,
|
||||
contentType: file.type,
|
||||
fileSize: file.size,
|
||||
}),
|
||||
signal: controller.signal,
|
||||
})
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
|
||||
if (!presignedResponse.ok) {
|
||||
let errorDetails: any = null
|
||||
try {
|
||||
errorDetails = await presignedResponse.json()
|
||||
} catch {
|
||||
// Ignore JSON parsing errors
|
||||
}
|
||||
|
||||
logger.error('Presigned URL request failed', {
|
||||
status: presignedResponse.status,
|
||||
fileSize: file.size,
|
||||
retryCount,
|
||||
})
|
||||
|
||||
throw new PresignedUrlError(
|
||||
`Failed to get presigned URL for ${file.name}: ${presignedResponse.status} ${presignedResponse.statusText}`,
|
||||
errorDetails
|
||||
)
|
||||
}
|
||||
|
||||
const presignedData = await presignedResponse.json()
|
||||
|
||||
if (presignedData.directUploadSupported) {
|
||||
// Use presigned URLs for all uploads when cloud storage is available
|
||||
// Check if file needs multipart upload for large files
|
||||
if (file.size > UPLOAD_CONFIG.LARGE_FILE_THRESHOLD) {
|
||||
return await uploadFileInChunks(file, presignedData, fileIndex)
|
||||
}
|
||||
return await uploadFileDirectly(file, presignedData, fileIndex)
|
||||
}
|
||||
// Fallback to traditional upload through API route
|
||||
// This is only used when cloud storage is not configured
|
||||
// Must check file size due to Vercel's 4.5MB limit
|
||||
if (file.size > UPLOAD_CONFIG.DIRECT_UPLOAD_THRESHOLD) {
|
||||
throw new DirectUploadError(
|
||||
`File ${file.name} is too large (${(file.size / 1024 / 1024).toFixed(2)}MB) for upload. Cloud storage must be configured for files over 4MB.`,
|
||||
{ fileSize: file.size, limit: UPLOAD_CONFIG.DIRECT_UPLOAD_THRESHOLD }
|
||||
)
|
||||
}
|
||||
logger.warn(`Using API upload fallback for ${file.name} - cloud storage not configured`)
|
||||
return await uploadFileThroughAPI(file)
|
||||
} finally {
|
||||
clearTimeout(timeoutId)
|
||||
}
|
||||
} catch (error) {
|
||||
const isTimeout = error instanceof Error && error.name === 'AbortError'
|
||||
const isNetwork =
|
||||
error instanceof Error &&
|
||||
(error.message.includes('fetch') ||
|
||||
error.message.includes('network') ||
|
||||
error.message.includes('Failed to fetch'))
|
||||
|
||||
// Retry logic
|
||||
if (retryCount < UPLOAD_CONFIG.MAX_RETRIES) {
|
||||
const delay = UPLOAD_CONFIG.RETRY_DELAY * 2 ** retryCount // Exponential backoff
|
||||
// Only log essential info for debugging
|
||||
if (isTimeout || isNetwork) {
|
||||
logger.warn(`Upload failed (${isTimeout ? 'timeout' : 'network'}), retrying...`, {
|
||||
attempt: retryCount + 1,
|
||||
fileSize: file.size,
|
||||
})
|
||||
}
|
||||
|
||||
// Reset progress to 0 before retry to indicate restart
|
||||
if (fileIndex !== undefined) {
|
||||
setUploadProgress((prev) => ({
|
||||
...prev,
|
||||
fileStatuses: prev.fileStatuses?.map((fs, idx) =>
|
||||
idx === fileIndex ? { ...fs, progress: 0, status: 'uploading' as const } : fs
|
||||
),
|
||||
}))
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, delay))
|
||||
return uploadSingleFileWithRetry(file, retryCount + 1, fileIndex)
|
||||
}
|
||||
|
||||
logger.error('Upload failed after retries', {
|
||||
fileSize: file.size,
|
||||
errorType: isTimeout ? 'timeout' : isNetwork ? 'network' : 'unknown',
|
||||
attempts: UPLOAD_CONFIG.MAX_RETRIES + 1,
|
||||
})
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload file directly with timeout and progress tracking
|
||||
*/
|
||||
const uploadFileDirectly = async (
|
||||
file: File,
|
||||
presignedData: any,
|
||||
fileIndex?: number
|
||||
): Promise<UploadedFile> => {
|
||||
return new Promise((resolve, reject) => {
|
||||
const xhr = new XMLHttpRequest()
|
||||
let isCompleted = false // Track if this upload has completed to prevent duplicate state updates
|
||||
|
||||
const timeoutId = setTimeout(() => {
|
||||
if (!isCompleted) {
|
||||
isCompleted = true
|
||||
xhr.abort()
|
||||
reject(new Error('Upload timeout'))
|
||||
}
|
||||
}, UPLOAD_CONFIG.UPLOAD_TIMEOUT)
|
||||
|
||||
// Track upload progress
|
||||
xhr.upload.addEventListener('progress', (event) => {
|
||||
if (event.lengthComputable && fileIndex !== undefined && !isCompleted) {
|
||||
const percentComplete = Math.round((event.loaded / event.total) * 100)
|
||||
setUploadProgress((prev) => {
|
||||
// Only update if this file is still uploading
|
||||
if (prev.fileStatuses?.[fileIndex]?.status === 'uploading') {
|
||||
return {
|
||||
...prev,
|
||||
fileStatuses: prev.fileStatuses?.map((fs, idx) =>
|
||||
idx === fileIndex ? { ...fs, progress: percentComplete } : fs
|
||||
),
|
||||
}
|
||||
}
|
||||
return prev
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
xhr.addEventListener('load', () => {
|
||||
if (!isCompleted) {
|
||||
isCompleted = true
|
||||
clearTimeout(timeoutId)
|
||||
if (xhr.status >= 200 && xhr.status < 300) {
|
||||
const fullFileUrl = presignedData.fileInfo.path.startsWith('http')
|
||||
? presignedData.fileInfo.path
|
||||
: `${window.location.origin}${presignedData.fileInfo.path}`
|
||||
resolve(createUploadedFile(file.name, fullFileUrl, file.size, file.type, file))
|
||||
} else {
|
||||
logger.error('S3 PUT request failed', {
|
||||
status: xhr.status,
|
||||
fileSize: file.size,
|
||||
})
|
||||
reject(
|
||||
new DirectUploadError(
|
||||
`Direct upload failed for ${file.name}: ${xhr.status} ${xhr.statusText}`,
|
||||
{ uploadResponse: xhr.statusText }
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
xhr.addEventListener('error', () => {
|
||||
if (!isCompleted) {
|
||||
isCompleted = true
|
||||
clearTimeout(timeoutId)
|
||||
reject(new DirectUploadError(`Network error uploading ${file.name}`, {}))
|
||||
}
|
||||
})
|
||||
|
||||
xhr.addEventListener('abort', () => {
|
||||
if (!isCompleted) {
|
||||
isCompleted = true
|
||||
clearTimeout(timeoutId)
|
||||
reject(new DirectUploadError(`Upload aborted for ${file.name}`, {}))
|
||||
}
|
||||
})
|
||||
|
||||
// Start the upload
|
||||
xhr.open('PUT', presignedData.presignedUrl)
|
||||
|
||||
// Set headers
|
||||
xhr.setRequestHeader('Content-Type', file.type)
|
||||
if (presignedData.uploadHeaders) {
|
||||
Object.entries(presignedData.uploadHeaders).forEach(([key, value]) => {
|
||||
xhr.setRequestHeader(key, value as string)
|
||||
})
|
||||
}
|
||||
|
||||
xhr.send(file)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload large file in chunks (multipart upload)
|
||||
*/
|
||||
const uploadFileInChunks = async (
|
||||
file: File,
|
||||
presignedData: any,
|
||||
fileIndex?: number
|
||||
): Promise<UploadedFile> => {
|
||||
logger.info(
|
||||
`Uploading large file ${file.name} (${(file.size / 1024 / 1024).toFixed(2)}MB) using multipart upload`
|
||||
)
|
||||
|
||||
try {
|
||||
// Step 1: Initiate multipart upload
|
||||
const initiateResponse = await fetch('/api/files/multipart?action=initiate', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
fileName: file.name,
|
||||
contentType: file.type,
|
||||
fileSize: file.size,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!initiateResponse.ok) {
|
||||
throw new Error(`Failed to initiate multipart upload: ${initiateResponse.statusText}`)
|
||||
}
|
||||
|
||||
const { uploadId, key } = await initiateResponse.json()
|
||||
logger.info(`Initiated multipart upload with ID: ${uploadId}`)
|
||||
|
||||
// Step 2: Calculate parts
|
||||
const chunkSize = UPLOAD_CONFIG.CHUNK_SIZE
|
||||
const numParts = Math.ceil(file.size / chunkSize)
|
||||
const partNumbers = Array.from({ length: numParts }, (_, i) => i + 1)
|
||||
|
||||
// Step 3: Get presigned URLs for all parts
|
||||
const partUrlsResponse = await fetch('/api/files/multipart?action=get-part-urls', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
uploadId,
|
||||
key,
|
||||
partNumbers,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!partUrlsResponse.ok) {
|
||||
// Abort the multipart upload if we can't get URLs
|
||||
await fetch('/api/files/multipart?action=abort', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ uploadId, key }),
|
||||
})
|
||||
throw new Error(`Failed to get part URLs: ${partUrlsResponse.statusText}`)
|
||||
}
|
||||
|
||||
const { presignedUrls } = await partUrlsResponse.json()
|
||||
|
||||
// Step 4: Upload parts in parallel (batch them to avoid overwhelming the browser)
|
||||
const uploadedParts: Array<{ ETag: string; PartNumber: number }> = []
|
||||
const PARALLEL_UPLOADS = 3 // Upload 3 parts at a time
|
||||
|
||||
for (let i = 0; i < presignedUrls.length; i += PARALLEL_UPLOADS) {
|
||||
const batch = presignedUrls.slice(i, i + PARALLEL_UPLOADS)
|
||||
|
||||
const batchPromises = batch.map(async ({ partNumber, url }: any) => {
|
||||
const start = (partNumber - 1) * chunkSize
|
||||
const end = Math.min(start + chunkSize, file.size)
|
||||
const chunk = file.slice(start, end)
|
||||
|
||||
const uploadResponse = await fetch(url, {
|
||||
method: 'PUT',
|
||||
body: chunk,
|
||||
headers: {
|
||||
'Content-Type': file.type,
|
||||
},
|
||||
})
|
||||
|
||||
if (!uploadResponse.ok) {
|
||||
throw new Error(`Failed to upload part ${partNumber}: ${uploadResponse.statusText}`)
|
||||
}
|
||||
|
||||
// Get ETag from response headers
|
||||
const etag = uploadResponse.headers.get('ETag') || ''
|
||||
logger.info(`Uploaded part ${partNumber}/${numParts}`)
|
||||
|
||||
return { ETag: etag.replace(/"/g, ''), PartNumber: partNumber }
|
||||
})
|
||||
|
||||
const batchResults = await Promise.all(batchPromises)
|
||||
uploadedParts.push(...batchResults)
|
||||
}
|
||||
|
||||
// Step 5: Complete multipart upload
|
||||
const completeResponse = await fetch('/api/files/multipart?action=complete', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
uploadId,
|
||||
key,
|
||||
parts: uploadedParts,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!completeResponse.ok) {
|
||||
throw new Error(`Failed to complete multipart upload: ${completeResponse.statusText}`)
|
||||
}
|
||||
|
||||
const { path } = await completeResponse.json()
|
||||
logger.info(`Completed multipart upload for ${file.name}`)
|
||||
|
||||
const fullFileUrl = path.startsWith('http') ? path : `${window.location.origin}${path}`
|
||||
|
||||
return createUploadedFile(file.name, fullFileUrl, file.size, file.type, file)
|
||||
} catch (error) {
|
||||
logger.error(`Multipart upload failed for ${file.name}:`, error)
|
||||
// Fall back to direct upload if multipart fails
|
||||
logger.info('Falling back to direct upload')
|
||||
return uploadFileDirectly(file, presignedData)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fallback upload through API
|
||||
*/
|
||||
const uploadFileThroughAPI = async (file: File): Promise<UploadedFile> => {
|
||||
const controller = new AbortController()
|
||||
const timeoutId = setTimeout(() => controller.abort(), UPLOAD_CONFIG.UPLOAD_TIMEOUT)
|
||||
|
||||
try {
|
||||
const formData = new FormData()
|
||||
formData.append('file', file)
|
||||
|
||||
const uploadResponse = await fetch('/api/files/upload', {
|
||||
method: 'POST',
|
||||
body: formData,
|
||||
signal: controller.signal,
|
||||
})
|
||||
|
||||
if (!uploadResponse.ok) {
|
||||
let errorData: any = null
|
||||
try {
|
||||
errorData = await uploadResponse.json()
|
||||
} catch {
|
||||
// Ignore JSON parsing errors
|
||||
}
|
||||
|
||||
throw new DirectUploadError(
|
||||
`Failed to upload ${file.name}: ${errorData?.error || 'Unknown error'}`,
|
||||
errorData
|
||||
)
|
||||
}
|
||||
|
||||
const uploadResult = await uploadResponse.json()
|
||||
|
||||
// Validate upload result structure
|
||||
if (!uploadResult.path) {
|
||||
throw new DirectUploadError(
|
||||
`Invalid upload response for ${file.name}: missing file path`,
|
||||
uploadResult
|
||||
)
|
||||
}
|
||||
|
||||
return createUploadedFile(
|
||||
file.name,
|
||||
uploadResult.path.startsWith('http')
|
||||
? uploadResult.path
|
||||
: `${window.location.origin}${uploadResult.path}`,
|
||||
file.size,
|
||||
file.type,
|
||||
file
|
||||
)
|
||||
} finally {
|
||||
clearTimeout(timeoutId)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload files with a constant pool of concurrent uploads
|
||||
*/
|
||||
const uploadFilesInBatches = async (files: File[]): Promise<UploadedFile[]> => {
|
||||
const uploadedFiles: UploadedFile[] = []
|
||||
const failedFiles: Array<{ file: File; error: Error }> = []
|
||||
|
||||
// Initialize file statuses
|
||||
const fileStatuses: FileUploadStatus[] = files.map((file) => ({
|
||||
fileName: file.name,
|
||||
fileSize: file.size,
|
||||
status: 'pending' as const,
|
||||
progress: 0,
|
||||
}))
|
||||
|
||||
setUploadProgress((prev) => ({
|
||||
...prev,
|
||||
fileStatuses,
|
||||
}))
|
||||
|
||||
// Create a queue of files to upload
|
||||
const fileQueue = files.map((file, index) => ({ file, index }))
|
||||
const activeUploads = new Map<number, Promise<any>>()
|
||||
|
||||
logger.info(
|
||||
`Starting upload of ${files.length} files with concurrency ${UPLOAD_CONFIG.BATCH_SIZE}`
|
||||
)
|
||||
|
||||
// Function to start an upload for a file
|
||||
const startUpload = async (file: File, fileIndex: number) => {
|
||||
// Mark file as uploading (only if not already processing)
|
||||
setUploadProgress((prev) => {
|
||||
const currentStatus = prev.fileStatuses?.[fileIndex]?.status
|
||||
// Don't re-upload files that are already completed or currently uploading
|
||||
if (currentStatus === 'completed' || currentStatus === 'uploading') {
|
||||
return prev
|
||||
}
|
||||
return {
|
||||
...prev,
|
||||
fileStatuses: prev.fileStatuses?.map((fs, idx) =>
|
||||
idx === fileIndex ? { ...fs, status: 'uploading' as const, progress: 0 } : fs
|
||||
),
|
||||
}
|
||||
})
|
||||
|
||||
try {
|
||||
const result = await uploadSingleFileWithRetry(file, 0, fileIndex)
|
||||
|
||||
// Mark file as completed (with atomic update)
|
||||
setUploadProgress((prev) => {
|
||||
// Only mark as completed if still uploading (prevent race conditions)
|
||||
if (prev.fileStatuses?.[fileIndex]?.status === 'uploading') {
|
||||
return {
|
||||
...prev,
|
||||
filesCompleted: prev.filesCompleted + 1,
|
||||
fileStatuses: prev.fileStatuses?.map((fs, idx) =>
|
||||
idx === fileIndex ? { ...fs, status: 'completed' as const, progress: 100 } : fs
|
||||
),
|
||||
}
|
||||
}
|
||||
return prev
|
||||
})
|
||||
|
||||
uploadedFiles.push(result)
|
||||
return { success: true, file, result }
|
||||
} catch (error) {
|
||||
// Mark file as failed (with atomic update)
|
||||
setUploadProgress((prev) => {
|
||||
// Only mark as failed if still uploading
|
||||
if (prev.fileStatuses?.[fileIndex]?.status === 'uploading') {
|
||||
return {
|
||||
...prev,
|
||||
fileStatuses: prev.fileStatuses?.map((fs, idx) =>
|
||||
idx === fileIndex
|
||||
? {
|
||||
...fs,
|
||||
status: 'failed' as const,
|
||||
error: error instanceof Error ? error.message : 'Upload failed',
|
||||
}
|
||||
: fs
|
||||
),
|
||||
}
|
||||
}
|
||||
return prev
|
||||
})
|
||||
|
||||
failedFiles.push({
|
||||
file,
|
||||
error: error instanceof Error ? error : new Error(String(error)),
|
||||
})
|
||||
|
||||
return {
|
||||
success: false,
|
||||
file,
|
||||
error: error instanceof Error ? error : new Error(String(error)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process files with constant concurrency pool
|
||||
while (fileQueue.length > 0 || activeUploads.size > 0) {
|
||||
// Start new uploads up to the batch size limit
|
||||
while (fileQueue.length > 0 && activeUploads.size < UPLOAD_CONFIG.BATCH_SIZE) {
|
||||
const { file, index } = fileQueue.shift()!
|
||||
const uploadPromise = startUpload(file, index).finally(() => {
|
||||
activeUploads.delete(index)
|
||||
})
|
||||
activeUploads.set(index, uploadPromise)
|
||||
}
|
||||
|
||||
// Wait for at least one upload to complete if we're at capacity or done with queue
|
||||
if (activeUploads.size > 0) {
|
||||
await Promise.race(Array.from(activeUploads.values()))
|
||||
}
|
||||
}
|
||||
|
||||
// Report failed files
|
||||
if (failedFiles.length > 0) {
|
||||
logger.error(`Failed to upload ${failedFiles.length} files:`, failedFiles)
|
||||
const errorMessage = `Failed to upload ${failedFiles.length} file(s): ${failedFiles.map((f) => f.file.name).join(', ')}`
|
||||
throw new KnowledgeUploadError(errorMessage, 'PARTIAL_UPLOAD_FAILURE', {
|
||||
failedFiles,
|
||||
uploadedFiles,
|
||||
})
|
||||
}
|
||||
|
||||
return uploadedFiles
|
||||
}
|
||||
|
||||
const uploadFiles = async (
|
||||
files: File[],
|
||||
knowledgeBaseId: string,
|
||||
@@ -144,129 +684,8 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
|
||||
setUploadError(null)
|
||||
setUploadProgress({ stage: 'uploading', filesCompleted: 0, totalFiles: files.length })
|
||||
|
||||
const uploadedFiles: UploadedFile[] = []
|
||||
|
||||
// Upload all files using presigned URLs
|
||||
for (const [index, file] of files.entries()) {
|
||||
setUploadProgress((prev) => ({
|
||||
...prev,
|
||||
currentFile: file.name,
|
||||
filesCompleted: index,
|
||||
}))
|
||||
|
||||
try {
|
||||
// Get presigned URL
|
||||
const presignedResponse = await fetch('/api/files/presigned?type=knowledge-base', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
fileName: file.name,
|
||||
contentType: file.type,
|
||||
fileSize: file.size,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!presignedResponse.ok) {
|
||||
let errorDetails: any = null
|
||||
try {
|
||||
errorDetails = await presignedResponse.json()
|
||||
} catch {
|
||||
// Ignore JSON parsing errors
|
||||
}
|
||||
|
||||
throw new PresignedUrlError(
|
||||
`Failed to get presigned URL for ${file.name}: ${presignedResponse.status} ${presignedResponse.statusText}`,
|
||||
errorDetails
|
||||
)
|
||||
}
|
||||
|
||||
const presignedData = await presignedResponse.json()
|
||||
|
||||
if (presignedData.directUploadSupported) {
|
||||
// Use presigned URL for direct upload
|
||||
const uploadHeaders: Record<string, string> = {
|
||||
'Content-Type': file.type,
|
||||
}
|
||||
|
||||
// Add Azure-specific headers if provided
|
||||
if (presignedData.uploadHeaders) {
|
||||
Object.assign(uploadHeaders, presignedData.uploadHeaders)
|
||||
}
|
||||
|
||||
const uploadResponse = await fetch(presignedData.presignedUrl, {
|
||||
method: 'PUT',
|
||||
headers: uploadHeaders,
|
||||
body: file,
|
||||
})
|
||||
|
||||
if (!uploadResponse.ok) {
|
||||
throw new DirectUploadError(
|
||||
`Direct upload failed for ${file.name}: ${uploadResponse.status} ${uploadResponse.statusText}`,
|
||||
{ uploadResponse: uploadResponse.statusText }
|
||||
)
|
||||
}
|
||||
|
||||
// Convert relative path to full URL for schema validation
|
||||
const fullFileUrl = presignedData.fileInfo.path.startsWith('http')
|
||||
? presignedData.fileInfo.path
|
||||
: `${window.location.origin}${presignedData.fileInfo.path}`
|
||||
|
||||
uploadedFiles.push(
|
||||
createUploadedFile(file.name, fullFileUrl, file.size, file.type, file)
|
||||
)
|
||||
} else {
|
||||
// Fallback to traditional upload through API route
|
||||
const formData = new FormData()
|
||||
formData.append('file', file)
|
||||
|
||||
const uploadResponse = await fetch('/api/files/upload', {
|
||||
method: 'POST',
|
||||
body: formData,
|
||||
})
|
||||
|
||||
if (!uploadResponse.ok) {
|
||||
let errorData: any = null
|
||||
try {
|
||||
errorData = await uploadResponse.json()
|
||||
} catch {
|
||||
// Ignore JSON parsing errors
|
||||
}
|
||||
|
||||
throw new DirectUploadError(
|
||||
`Failed to upload ${file.name}: ${errorData?.error || 'Unknown error'}`,
|
||||
errorData
|
||||
)
|
||||
}
|
||||
|
||||
const uploadResult = await uploadResponse.json()
|
||||
|
||||
// Validate upload result structure
|
||||
if (!uploadResult.path) {
|
||||
throw new DirectUploadError(
|
||||
`Invalid upload response for ${file.name}: missing file path`,
|
||||
uploadResult
|
||||
)
|
||||
}
|
||||
|
||||
uploadedFiles.push(
|
||||
createUploadedFile(
|
||||
file.name,
|
||||
uploadResult.path.startsWith('http')
|
||||
? uploadResult.path
|
||||
: `${window.location.origin}${uploadResult.path}`,
|
||||
file.size,
|
||||
file.type,
|
||||
file
|
||||
)
|
||||
)
|
||||
}
|
||||
} catch (fileError) {
|
||||
logger.error(`Error uploading file ${file.name}:`, fileError)
|
||||
throw fileError // Re-throw to be caught by outer try-catch
|
||||
}
|
||||
}
|
||||
// Upload files in batches with retry logic
|
||||
const uploadedFiles = await uploadFilesInBatches(files)
|
||||
|
||||
setUploadProgress((prev) => ({ ...prev, stage: 'processing' }))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user