mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-22 13:28:04 -05:00
progress
This commit is contained in:
@@ -6,9 +6,10 @@ import { createLogger } from '@sim/logger'
|
||||
import binaryExtensionsList from 'binary-extensions'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { createPinnedUrl, validateUrlWithDNS } from '@/lib/core/security/input-validation'
|
||||
import { validateUrlWithDNS } from '@/lib/core/security/input-validation'
|
||||
import { isSupportedFileType, parseFile } from '@/lib/file-parsers'
|
||||
import { isUsingCloudStorage, type StorageContext, StorageService } from '@/lib/uploads'
|
||||
import { uploadExecutionFile } from '@/lib/uploads/contexts/execution'
|
||||
import { UPLOAD_DIR_SERVER } from '@/lib/uploads/core/setup.server'
|
||||
import { getFileMetadataByKey } from '@/lib/uploads/server/metadata'
|
||||
import {
|
||||
@@ -21,6 +22,7 @@ import {
|
||||
} from '@/lib/uploads/utils/file-utils'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
import { verifyFileAccess } from '@/app/api/files/authorization'
|
||||
import type { UserFile } from '@/executor/types'
|
||||
import '@/lib/uploads/core/setup.server'
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
@@ -30,6 +32,12 @@ const logger = createLogger('FilesParseAPI')
|
||||
const MAX_DOWNLOAD_SIZE_BYTES = 100 * 1024 * 1024 // 100 MB
|
||||
const DOWNLOAD_TIMEOUT_MS = 30000 // 30 seconds
|
||||
|
||||
interface ExecutionContext {
|
||||
workspaceId: string
|
||||
workflowId: string
|
||||
executionId: string
|
||||
}
|
||||
|
||||
interface ParseResult {
|
||||
success: boolean
|
||||
content?: string
|
||||
@@ -37,6 +45,7 @@ interface ParseResult {
|
||||
filePath: string
|
||||
originalName?: string // Original filename from database (for workspace files)
|
||||
viewerUrl?: string | null // Viewer URL for the file if available
|
||||
userFile?: UserFile // UserFile object for the raw file
|
||||
metadata?: {
|
||||
fileType: string
|
||||
size: number
|
||||
@@ -70,27 +79,45 @@ export async function POST(request: NextRequest) {
|
||||
|
||||
const userId = authResult.userId
|
||||
const requestData = await request.json()
|
||||
const { filePath, fileType, workspaceId } = requestData
|
||||
const { filePath, fileType, workspaceId, workflowId, executionId } = requestData
|
||||
|
||||
if (!filePath || (typeof filePath === 'string' && filePath.trim() === '')) {
|
||||
return NextResponse.json({ success: false, error: 'No file path provided' }, { status: 400 })
|
||||
}
|
||||
|
||||
logger.info('File parse request received:', { filePath, fileType, workspaceId, userId })
|
||||
// Build execution context if all required fields are present
|
||||
const executionContext: ExecutionContext | undefined =
|
||||
workspaceId && workflowId && executionId
|
||||
? { workspaceId, workflowId, executionId }
|
||||
: undefined
|
||||
|
||||
logger.info('File parse request received:', {
|
||||
filePath,
|
||||
fileType,
|
||||
workspaceId,
|
||||
userId,
|
||||
hasExecutionContext: !!executionContext,
|
||||
})
|
||||
|
||||
if (Array.isArray(filePath)) {
|
||||
const results = []
|
||||
for (const path of filePath) {
|
||||
if (!path || (typeof path === 'string' && path.trim() === '')) {
|
||||
for (const singlePath of filePath) {
|
||||
if (!singlePath || (typeof singlePath === 'string' && singlePath.trim() === '')) {
|
||||
results.push({
|
||||
success: false,
|
||||
error: 'Empty file path in array',
|
||||
filePath: path || '',
|
||||
filePath: singlePath || '',
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
const result = await parseFileSingle(path, fileType, workspaceId, userId)
|
||||
const result = await parseFileSingle(
|
||||
singlePath,
|
||||
fileType,
|
||||
workspaceId,
|
||||
userId,
|
||||
executionContext
|
||||
)
|
||||
if (result.metadata) {
|
||||
result.metadata.processingTime = Date.now() - startTime
|
||||
}
|
||||
@@ -106,6 +133,7 @@ export async function POST(request: NextRequest) {
|
||||
fileType: result.metadata?.fileType || 'application/octet-stream',
|
||||
size: result.metadata?.size || 0,
|
||||
binary: false,
|
||||
file: result.userFile,
|
||||
},
|
||||
filePath: result.filePath,
|
||||
viewerUrl: result.viewerUrl,
|
||||
@@ -121,7 +149,7 @@ export async function POST(request: NextRequest) {
|
||||
})
|
||||
}
|
||||
|
||||
const result = await parseFileSingle(filePath, fileType, workspaceId, userId)
|
||||
const result = await parseFileSingle(filePath, fileType, workspaceId, userId, executionContext)
|
||||
|
||||
if (result.metadata) {
|
||||
result.metadata.processingTime = Date.now() - startTime
|
||||
@@ -137,6 +165,7 @@ export async function POST(request: NextRequest) {
|
||||
fileType: result.metadata?.fileType || 'application/octet-stream',
|
||||
size: result.metadata?.size || 0,
|
||||
binary: false,
|
||||
file: result.userFile,
|
||||
},
|
||||
filePath: result.filePath,
|
||||
viewerUrl: result.viewerUrl,
|
||||
@@ -164,7 +193,8 @@ async function parseFileSingle(
|
||||
filePath: string,
|
||||
fileType: string,
|
||||
workspaceId: string,
|
||||
userId: string
|
||||
userId: string,
|
||||
executionContext?: ExecutionContext
|
||||
): Promise<ParseResult> {
|
||||
logger.info('Parsing file:', filePath)
|
||||
|
||||
@@ -186,18 +216,18 @@ async function parseFileSingle(
|
||||
}
|
||||
|
||||
if (filePath.includes('/api/files/serve/')) {
|
||||
return handleCloudFile(filePath, fileType, undefined, userId)
|
||||
return handleCloudFile(filePath, fileType, undefined, userId, executionContext)
|
||||
}
|
||||
|
||||
if (filePath.startsWith('http://') || filePath.startsWith('https://')) {
|
||||
return handleExternalUrl(filePath, fileType, workspaceId, userId)
|
||||
return handleExternalUrl(filePath, fileType, workspaceId, userId, executionContext)
|
||||
}
|
||||
|
||||
if (isUsingCloudStorage()) {
|
||||
return handleCloudFile(filePath, fileType, undefined, userId)
|
||||
return handleCloudFile(filePath, fileType, undefined, userId, executionContext)
|
||||
}
|
||||
|
||||
return handleLocalFile(filePath, fileType, userId)
|
||||
return handleLocalFile(filePath, fileType, userId, executionContext)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -230,12 +260,14 @@ function validateFilePath(filePath: string): { isValid: boolean; error?: string
|
||||
/**
|
||||
* Handle external URL
|
||||
* If workspaceId is provided, checks if file already exists and saves to workspace if not
|
||||
* If executionContext is provided, also stores the file in execution storage and returns UserFile
|
||||
*/
|
||||
async function handleExternalUrl(
|
||||
url: string,
|
||||
fileType: string,
|
||||
workspaceId: string,
|
||||
userId: string
|
||||
userId: string,
|
||||
executionContext?: ExecutionContext
|
||||
): Promise<ParseResult> {
|
||||
try {
|
||||
logger.info('Fetching external URL:', url)
|
||||
@@ -312,17 +344,16 @@ async function handleExternalUrl(
|
||||
|
||||
if (existingFile) {
|
||||
const storageFilePath = `/api/files/serve/${existingFile.key}`
|
||||
return handleCloudFile(storageFilePath, fileType, 'workspace', userId)
|
||||
return handleCloudFile(storageFilePath, fileType, 'workspace', userId, executionContext)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const pinnedUrl = createPinnedUrl(url, urlValidation.resolvedIP!)
|
||||
const response = await fetch(pinnedUrl, {
|
||||
// Use the original URL after DNS validation passes.
|
||||
// DNS pinning (connecting to IP directly) breaks TLS SNI for HTTPS.
|
||||
// Since we've validated the IP is not private/reserved, using the original URL is safe.
|
||||
const response = await fetch(url, {
|
||||
signal: AbortSignal.timeout(DOWNLOAD_TIMEOUT_MS),
|
||||
headers: {
|
||||
Host: urlValidation.originalHostname!,
|
||||
},
|
||||
})
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to fetch URL: ${response.status} ${response.statusText}`)
|
||||
@@ -341,6 +372,20 @@ async function handleExternalUrl(
|
||||
|
||||
logger.info(`Downloaded file from URL: ${url}, size: ${buffer.length} bytes`)
|
||||
|
||||
// Store file in execution storage if execution context is provided
|
||||
let userFile: UserFile | undefined
|
||||
const mimeType = response.headers.get('content-type') || getMimeTypeFromExtension(extension)
|
||||
|
||||
if (executionContext) {
|
||||
try {
|
||||
userFile = await uploadExecutionFile(executionContext, buffer, filename, mimeType, userId)
|
||||
logger.info(`Stored file in execution storage: ${filename}`, { key: userFile.key })
|
||||
} catch (uploadError) {
|
||||
logger.warn(`Failed to store file in execution storage:`, uploadError)
|
||||
// Continue without userFile - parsing can still work
|
||||
}
|
||||
}
|
||||
|
||||
if (shouldCheckWorkspace) {
|
||||
try {
|
||||
const permission = await getUserEntityPermissions(userId, 'workspace', workspaceId)
|
||||
@@ -353,8 +398,6 @@ async function handleExternalUrl(
|
||||
})
|
||||
} else {
|
||||
const { uploadWorkspaceFile } = await import('@/lib/uploads/contexts/workspace')
|
||||
const mimeType =
|
||||
response.headers.get('content-type') || getMimeTypeFromExtension(extension)
|
||||
await uploadWorkspaceFile(workspaceId, userId, buffer, filename, mimeType)
|
||||
logger.info(`Saved URL file to workspace storage: ${filename}`)
|
||||
}
|
||||
@@ -363,17 +406,23 @@ async function handleExternalUrl(
|
||||
}
|
||||
}
|
||||
|
||||
let parseResult: ParseResult
|
||||
if (extension === 'pdf') {
|
||||
return await handlePdfBuffer(buffer, filename, fileType, url)
|
||||
}
|
||||
if (extension === 'csv') {
|
||||
return await handleCsvBuffer(buffer, filename, fileType, url)
|
||||
}
|
||||
if (isSupportedFileType(extension)) {
|
||||
return await handleGenericTextBuffer(buffer, filename, extension, fileType, url)
|
||||
parseResult = await handlePdfBuffer(buffer, filename, fileType, url)
|
||||
} else if (extension === 'csv') {
|
||||
parseResult = await handleCsvBuffer(buffer, filename, fileType, url)
|
||||
} else if (isSupportedFileType(extension)) {
|
||||
parseResult = await handleGenericTextBuffer(buffer, filename, extension, fileType, url)
|
||||
} else {
|
||||
parseResult = handleGenericBuffer(buffer, filename, extension, fileType)
|
||||
}
|
||||
|
||||
return handleGenericBuffer(buffer, filename, extension, fileType)
|
||||
// Attach userFile to the result
|
||||
if (userFile) {
|
||||
parseResult.userFile = userFile
|
||||
}
|
||||
|
||||
return parseResult
|
||||
} catch (error) {
|
||||
logger.error(`Error handling external URL ${url}:`, error)
|
||||
return {
|
||||
@@ -386,12 +435,15 @@ async function handleExternalUrl(
|
||||
|
||||
/**
|
||||
* Handle file stored in cloud storage
|
||||
* If executionContext is provided and file is not already from execution storage,
|
||||
* copies the file to execution storage and returns UserFile
|
||||
*/
|
||||
async function handleCloudFile(
|
||||
filePath: string,
|
||||
fileType: string,
|
||||
explicitContext: string | undefined,
|
||||
userId: string
|
||||
userId: string,
|
||||
executionContext?: ExecutionContext
|
||||
): Promise<ParseResult> {
|
||||
try {
|
||||
const cloudKey = extractStorageKey(filePath)
|
||||
@@ -438,6 +490,7 @@ async function handleCloudFile(
|
||||
|
||||
const filename = originalFilename || cloudKey.split('/').pop() || cloudKey
|
||||
const extension = path.extname(filename).toLowerCase().substring(1)
|
||||
const mimeType = getMimeTypeFromExtension(extension)
|
||||
|
||||
const normalizedFilePath = `/api/files/serve/${encodeURIComponent(cloudKey)}?context=${context}`
|
||||
let workspaceIdFromKey: string | undefined
|
||||
@@ -453,6 +506,39 @@ async function handleCloudFile(
|
||||
|
||||
const viewerUrl = getViewerUrl(cloudKey, workspaceIdFromKey)
|
||||
|
||||
// Store file in execution storage if executionContext is provided
|
||||
let userFile: UserFile | undefined
|
||||
|
||||
if (executionContext) {
|
||||
// If file is already from execution context, create UserFile reference without re-uploading
|
||||
if (context === 'execution') {
|
||||
userFile = {
|
||||
id: `file_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`,
|
||||
name: filename,
|
||||
url: normalizedFilePath,
|
||||
size: fileBuffer.length,
|
||||
type: mimeType,
|
||||
key: cloudKey,
|
||||
context: 'execution',
|
||||
}
|
||||
logger.info(`Created UserFile reference for existing execution file: ${filename}`)
|
||||
} else {
|
||||
// Copy from workspace/other storage to execution storage
|
||||
try {
|
||||
userFile = await uploadExecutionFile(
|
||||
executionContext,
|
||||
fileBuffer,
|
||||
filename,
|
||||
mimeType,
|
||||
userId
|
||||
)
|
||||
logger.info(`Copied file to execution storage: ${filename}`, { key: userFile.key })
|
||||
} catch (uploadError) {
|
||||
logger.warn(`Failed to copy file to execution storage:`, uploadError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let parseResult: ParseResult
|
||||
if (extension === 'pdf') {
|
||||
parseResult = await handlePdfBuffer(fileBuffer, filename, fileType, normalizedFilePath)
|
||||
@@ -477,6 +563,11 @@ async function handleCloudFile(
|
||||
|
||||
parseResult.viewerUrl = viewerUrl
|
||||
|
||||
// Attach userFile to the result
|
||||
if (userFile) {
|
||||
parseResult.userFile = userFile
|
||||
}
|
||||
|
||||
return parseResult
|
||||
} catch (error) {
|
||||
logger.error(`Error handling cloud file ${filePath}:`, error)
|
||||
@@ -500,7 +591,8 @@ async function handleCloudFile(
|
||||
async function handleLocalFile(
|
||||
filePath: string,
|
||||
fileType: string,
|
||||
userId: string
|
||||
userId: string,
|
||||
executionContext?: ExecutionContext
|
||||
): Promise<ParseResult> {
|
||||
try {
|
||||
const filename = filePath.split('/').pop() || filePath
|
||||
@@ -540,13 +632,32 @@ async function handleLocalFile(
|
||||
const hash = createHash('md5').update(fileBuffer).digest('hex')
|
||||
|
||||
const extension = path.extname(filename).toLowerCase().substring(1)
|
||||
const mimeType = fileType || getMimeTypeFromExtension(extension)
|
||||
|
||||
// Store file in execution storage if executionContext is provided
|
||||
let userFile: UserFile | undefined
|
||||
if (executionContext) {
|
||||
try {
|
||||
userFile = await uploadExecutionFile(
|
||||
executionContext,
|
||||
fileBuffer,
|
||||
filename,
|
||||
mimeType,
|
||||
userId
|
||||
)
|
||||
logger.info(`Stored local file in execution storage: ${filename}`, { key: userFile.key })
|
||||
} catch (uploadError) {
|
||||
logger.warn(`Failed to store local file in execution storage:`, uploadError)
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
content: result.content,
|
||||
filePath,
|
||||
userFile,
|
||||
metadata: {
|
||||
fileType: fileType || getMimeTypeFromExtension(extension),
|
||||
fileType: mimeType,
|
||||
size: stats.size,
|
||||
hash,
|
||||
processingTime: 0,
|
||||
|
||||
@@ -5,7 +5,7 @@ import { z } from 'zod'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { generateInternalToken } from '@/lib/auth/internal'
|
||||
import { isDev } from '@/lib/core/config/feature-flags'
|
||||
import { createPinnedUrl, validateUrlWithDNS } from '@/lib/core/security/input-validation'
|
||||
import { validateUrlWithDNS } from '@/lib/core/security/input-validation'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { executeTool } from '@/tools'
|
||||
@@ -211,13 +211,13 @@ export async function GET(request: Request) {
|
||||
logger.info(`[${requestId}] Proxying ${method} request to: ${targetUrl}`)
|
||||
|
||||
try {
|
||||
const pinnedUrl = createPinnedUrl(targetUrl, urlValidation.resolvedIP!)
|
||||
const response = await fetch(pinnedUrl, {
|
||||
// Use the original URL after DNS validation passes.
|
||||
// DNS pinning breaks TLS SNI for HTTPS; validation already ensures IP is safe.
|
||||
const response = await fetch(targetUrl, {
|
||||
method: method,
|
||||
headers: {
|
||||
...getProxyHeaders(),
|
||||
...customHeaders,
|
||||
Host: urlValidation.originalHostname!,
|
||||
},
|
||||
body: body || undefined,
|
||||
})
|
||||
|
||||
@@ -12,6 +12,11 @@ import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||
import { processInputFileFields } from '@/lib/execution/files'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import {
|
||||
cleanupExecutionBase64Cache,
|
||||
containsUserFileWithMetadata,
|
||||
hydrateUserFilesWithBase64,
|
||||
} from '@/lib/uploads/utils/user-file-base64.server'
|
||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
|
||||
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
|
||||
@@ -25,7 +30,7 @@ import type { WorkflowExecutionPayload } from '@/background/workflow-execution'
|
||||
import { normalizeName } from '@/executor/constants'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
|
||||
import type { StreamingExecution } from '@/executor/types'
|
||||
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
|
||||
import { Serializer } from '@/serializer'
|
||||
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
|
||||
|
||||
@@ -38,6 +43,8 @@ const ExecuteWorkflowSchema = z.object({
|
||||
useDraftState: z.boolean().optional(),
|
||||
input: z.any().optional(),
|
||||
isClientSession: z.boolean().optional(),
|
||||
includeFileBase64: z.boolean().optional().default(true),
|
||||
base64MaxBytes: z.number().int().positive().optional(),
|
||||
workflowStateOverride: z
|
||||
.object({
|
||||
blocks: z.record(z.any()),
|
||||
@@ -214,6 +221,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
useDraftState,
|
||||
input: validatedInput,
|
||||
isClientSession = false,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
workflowStateOverride,
|
||||
} = validation.data
|
||||
|
||||
@@ -227,6 +236,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
triggerType,
|
||||
stream,
|
||||
useDraftState,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
workflowStateOverride,
|
||||
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
|
||||
...rest
|
||||
@@ -429,14 +440,27 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
loggingSession,
|
||||
})
|
||||
|
||||
const hasResponseBlock = workflowHasResponseBlock(result)
|
||||
const outputWithBase64 = includeFileBase64
|
||||
? ((await hydrateUserFilesWithBase64(result.output, {
|
||||
requestId,
|
||||
executionId,
|
||||
maxBytes: base64MaxBytes,
|
||||
})) as NormalizedBlockOutput)
|
||||
: result.output
|
||||
|
||||
const resultWithBase64 = { ...result, output: outputWithBase64 }
|
||||
|
||||
// Cleanup base64 cache for this execution
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
|
||||
const hasResponseBlock = workflowHasResponseBlock(resultWithBase64)
|
||||
if (hasResponseBlock) {
|
||||
return createHttpResponseFromBlock(result)
|
||||
return createHttpResponseFromBlock(resultWithBase64)
|
||||
}
|
||||
|
||||
const filteredResult = {
|
||||
success: result.success,
|
||||
output: result.output,
|
||||
output: outputWithBase64,
|
||||
error: result.error,
|
||||
metadata: result.metadata
|
||||
? {
|
||||
@@ -498,6 +522,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
selectedOutputs: resolvedSelectedOutputs,
|
||||
isSecureMode: false,
|
||||
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
},
|
||||
executionId,
|
||||
})
|
||||
@@ -570,6 +596,15 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
iterationContext?: IterationContext
|
||||
) => {
|
||||
const hasError = callbackData.output?.error
|
||||
const shouldHydrate =
|
||||
includeFileBase64 && !hasError && containsUserFileWithMetadata(callbackData.output)
|
||||
const outputWithBase64 = shouldHydrate
|
||||
? await hydrateUserFilesWithBase64(callbackData.output, {
|
||||
requestId,
|
||||
executionId,
|
||||
maxBytes: base64MaxBytes,
|
||||
})
|
||||
: callbackData.output
|
||||
|
||||
if (hasError) {
|
||||
logger.info(`[${requestId}] ✗ onBlockComplete (error) called:`, {
|
||||
@@ -613,7 +648,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
blockName,
|
||||
blockType,
|
||||
input: callbackData.input,
|
||||
output: callbackData.output,
|
||||
output: outputWithBase64,
|
||||
durationMs: callbackData.executionTime || 0,
|
||||
...(iterationContext && {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
@@ -750,12 +785,21 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
workflowId,
|
||||
data: {
|
||||
success: result.success,
|
||||
output: result.output,
|
||||
output: includeFileBase64
|
||||
? await hydrateUserFilesWithBase64(result.output, {
|
||||
requestId,
|
||||
executionId,
|
||||
maxBytes: base64MaxBytes,
|
||||
})
|
||||
: result.output,
|
||||
duration: result.metadata?.duration || 0,
|
||||
startTime: result.metadata?.startTime || startTime.toISOString(),
|
||||
endTime: result.metadata?.endTime || new Date().toISOString(),
|
||||
},
|
||||
})
|
||||
|
||||
// Cleanup base64 cache for this execution
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
} catch (error: any) {
|
||||
const errorMessage = error.message || 'Unknown error'
|
||||
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import { useRef, useState } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { isUserFile } from '@/lib/core/utils/display-filters'
|
||||
import { isUserFile } from '@/lib/core/utils/user-file'
|
||||
import type { ChatFile, ChatMessage } from '@/app/chat/components/message/message'
|
||||
import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants'
|
||||
|
||||
|
||||
@@ -121,5 +121,9 @@ export const FileBlock: BlockConfig<FileParserOutput> = {
|
||||
type: 'string',
|
||||
description: 'All file contents merged into a single text string',
|
||||
},
|
||||
processedFiles: {
|
||||
type: 'files',
|
||||
description: 'Array of UserFile objects for downstream use (attachments, uploads, etc.)',
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ export interface UserFile {
|
||||
type: string
|
||||
key: string
|
||||
context?: string
|
||||
base64?: string
|
||||
}
|
||||
|
||||
export interface ParallelPauseScope {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { isUserFile } from '@/lib/core/utils/display-filters'
|
||||
import { isUserFile } from '@/lib/core/utils/user-file'
|
||||
import {
|
||||
classifyStartBlockType,
|
||||
getLegacyStarterMode,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { USER_FILE_ACCESSIBLE_PROPERTIES } from '@/lib/workflows/types'
|
||||
import {
|
||||
isReference,
|
||||
normalizeName,
|
||||
@@ -24,7 +25,19 @@ function isPathInOutputSchema(
|
||||
for (let i = 0; i < pathParts.length; i++) {
|
||||
const part = pathParts[i]
|
||||
|
||||
// Handle array index access (e.g., [0])
|
||||
if (/^\d+$/.test(part)) {
|
||||
// If current is file[] type, next part should be a file property
|
||||
if (current?.type === 'file[]') {
|
||||
// Check if next part is a valid file property
|
||||
if (i + 1 < pathParts.length) {
|
||||
const nextPart = pathParts[i + 1]
|
||||
return USER_FILE_ACCESSIBLE_PROPERTIES.includes(nextPart as any)
|
||||
}
|
||||
// If no next part, array index access is valid
|
||||
return true
|
||||
}
|
||||
// For other array types, continue to next iteration
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -32,8 +45,19 @@ function isPathInOutputSchema(
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if part exists in current
|
||||
if (part in current) {
|
||||
current = current[part]
|
||||
const nextCurrent = current[part]
|
||||
// If next field is file[] type and we have more parts, check if next part is numeric
|
||||
if (nextCurrent?.type === 'file[]' && i + 1 < pathParts.length) {
|
||||
const nextPart = pathParts[i + 1]
|
||||
// If next part is numeric (array index), allow it and check the part after
|
||||
if (/^\d+$/.test(nextPart) && i + 2 < pathParts.length) {
|
||||
const propertyPart = pathParts[i + 2]
|
||||
return USER_FILE_ACCESSIBLE_PROPERTIES.includes(propertyPart as any)
|
||||
}
|
||||
}
|
||||
current = nextCurrent
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -53,6 +77,12 @@ function isPathInOutputSchema(
|
||||
}
|
||||
}
|
||||
|
||||
// Handle file[] type - allow access to file properties after array index
|
||||
if (current?.type === 'file[]' && USER_FILE_ACCESSIBLE_PROPERTIES.includes(part as any)) {
|
||||
// Valid file property access
|
||||
return true
|
||||
}
|
||||
|
||||
if ('type' in current && typeof current.type === 'string') {
|
||||
if (!current.properties && !current.items) {
|
||||
return false
|
||||
|
||||
@@ -893,7 +893,9 @@ export async function validateUrlWithDNS(
|
||||
export function createPinnedUrl(originalUrl: string, resolvedIP: string): string {
|
||||
const parsed = new URL(originalUrl)
|
||||
const port = parsed.port ? `:${parsed.port}` : ''
|
||||
return `${parsed.protocol}//${resolvedIP}${port}${parsed.pathname}${parsed.search}`
|
||||
// IPv6 addresses must be wrapped in brackets for URLs
|
||||
const host = resolvedIP.includes(':') ? `[${resolvedIP}]` : resolvedIP
|
||||
return `${parsed.protocol}//${host}${port}${parsed.pathname}${parsed.search}`
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import {
|
||||
isLargeDataKey,
|
||||
isSensitiveKey,
|
||||
REDACTED_MARKER,
|
||||
redactApiKeys,
|
||||
redactSensitiveValues,
|
||||
sanitizeEventData,
|
||||
sanitizeForLogging,
|
||||
TRUNCATED_MARKER,
|
||||
} from './redaction'
|
||||
|
||||
/**
|
||||
@@ -18,6 +20,24 @@ describe('REDACTED_MARKER', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('TRUNCATED_MARKER', () => {
|
||||
it.concurrent('should be the standard marker', () => {
|
||||
expect(TRUNCATED_MARKER).toBe('[TRUNCATED]')
|
||||
})
|
||||
})
|
||||
|
||||
describe('isLargeDataKey', () => {
|
||||
it.concurrent('should identify base64 as large data key', () => {
|
||||
expect(isLargeDataKey('base64')).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('should not identify other keys as large data', () => {
|
||||
expect(isLargeDataKey('content')).toBe(false)
|
||||
expect(isLargeDataKey('data')).toBe(false)
|
||||
expect(isLargeDataKey('base')).toBe(false)
|
||||
})
|
||||
})
|
||||
|
||||
describe('isSensitiveKey', () => {
|
||||
describe('exact matches', () => {
|
||||
it.concurrent('should match apiKey variations', () => {
|
||||
@@ -234,6 +254,80 @@ describe('redactApiKeys', () => {
|
||||
expect(result.config.database.password).toBe('[REDACTED]')
|
||||
expect(result.config.database.host).toBe('localhost')
|
||||
})
|
||||
|
||||
it.concurrent('should truncate base64 fields', () => {
|
||||
const obj = {
|
||||
id: 'file-123',
|
||||
name: 'document.pdf',
|
||||
base64: 'VGhpcyBpcyBhIHZlcnkgbG9uZyBiYXNlNjQgc3RyaW5n...',
|
||||
size: 12345,
|
||||
}
|
||||
|
||||
const result = redactApiKeys(obj)
|
||||
|
||||
expect(result.id).toBe('file-123')
|
||||
expect(result.name).toBe('document.pdf')
|
||||
expect(result.base64).toBe('[TRUNCATED]')
|
||||
expect(result.size).toBe(12345)
|
||||
})
|
||||
|
||||
it.concurrent('should truncate base64 in nested UserFile objects', () => {
|
||||
const obj = {
|
||||
files: [
|
||||
{
|
||||
id: 'file-1',
|
||||
name: 'doc1.pdf',
|
||||
url: 'http://example.com/file1',
|
||||
size: 1000,
|
||||
base64: 'base64content1...',
|
||||
},
|
||||
{
|
||||
id: 'file-2',
|
||||
name: 'doc2.pdf',
|
||||
url: 'http://example.com/file2',
|
||||
size: 2000,
|
||||
base64: 'base64content2...',
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
const result = redactApiKeys(obj)
|
||||
|
||||
expect(result.files[0].id).toBe('file-1')
|
||||
expect(result.files[0].base64).toBe('[TRUNCATED]')
|
||||
expect(result.files[1].base64).toBe('[TRUNCATED]')
|
||||
})
|
||||
|
||||
it.concurrent('should filter UserFile objects to only expose allowed fields', () => {
|
||||
const obj = {
|
||||
processedFiles: [
|
||||
{
|
||||
id: 'file-123',
|
||||
name: 'document.pdf',
|
||||
url: 'http://localhost/api/files/serve/...',
|
||||
size: 12345,
|
||||
type: 'application/pdf',
|
||||
key: 'execution/workspace/workflow/file.pdf',
|
||||
context: 'execution',
|
||||
base64: 'VGhpcyBpcyBhIGJhc2U2NCBzdHJpbmc=',
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
const result = redactApiKeys(obj)
|
||||
|
||||
// Exposed fields should be present
|
||||
expect(result.processedFiles[0].id).toBe('file-123')
|
||||
expect(result.processedFiles[0].name).toBe('document.pdf')
|
||||
expect(result.processedFiles[0].url).toBe('http://localhost/api/files/serve/...')
|
||||
expect(result.processedFiles[0].size).toBe(12345)
|
||||
expect(result.processedFiles[0].type).toBe('application/pdf')
|
||||
expect(result.processedFiles[0].base64).toBe('[TRUNCATED]')
|
||||
|
||||
// Internal fields should be filtered out
|
||||
expect(result.processedFiles[0]).not.toHaveProperty('key')
|
||||
expect(result.processedFiles[0]).not.toHaveProperty('context')
|
||||
})
|
||||
})
|
||||
|
||||
describe('primitive handling', () => {
|
||||
|
||||
@@ -2,10 +2,16 @@
|
||||
* Centralized redaction utilities for sensitive data
|
||||
*/
|
||||
|
||||
import { filterUserFileForDisplay, isUserFile } from '@/lib/core/utils/user-file'
|
||||
|
||||
export const REDACTED_MARKER = '[REDACTED]'
|
||||
export const TRUNCATED_MARKER = '[TRUNCATED]'
|
||||
|
||||
const BYPASS_REDACTION_KEYS = new Set(['nextPageToken'])
|
||||
|
||||
/** Keys that contain large binary/encoded data that should be truncated in logs */
|
||||
const LARGE_DATA_KEYS = new Set(['base64'])
|
||||
|
||||
const SENSITIVE_KEY_PATTERNS: RegExp[] = [
|
||||
/^api[_-]?key$/i,
|
||||
/^access[_-]?token$/i,
|
||||
@@ -88,6 +94,10 @@ export function redactSensitiveValues(value: string): string {
|
||||
return result
|
||||
}
|
||||
|
||||
export function isLargeDataKey(key: string): boolean {
|
||||
return LARGE_DATA_KEYS.has(key)
|
||||
}
|
||||
|
||||
export function redactApiKeys(obj: any): any {
|
||||
if (obj === null || obj === undefined) {
|
||||
return obj
|
||||
@@ -101,11 +111,26 @@ export function redactApiKeys(obj: any): any {
|
||||
return obj.map((item) => redactApiKeys(item))
|
||||
}
|
||||
|
||||
if (isUserFile(obj)) {
|
||||
const filtered = filterUserFileForDisplay(obj)
|
||||
const result: Record<string, any> = {}
|
||||
for (const [key, value] of Object.entries(filtered)) {
|
||||
if (isLargeDataKey(key) && typeof value === 'string') {
|
||||
result[key] = TRUNCATED_MARKER
|
||||
} else {
|
||||
result[key] = value
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
const result: Record<string, any> = {}
|
||||
|
||||
for (const [key, value] of Object.entries(obj)) {
|
||||
if (isSensitiveKey(key)) {
|
||||
result[key] = REDACTED_MARKER
|
||||
} else if (isLargeDataKey(key) && typeof value === 'string') {
|
||||
result[key] = TRUNCATED_MARKER
|
||||
} else if (typeof value === 'object' && value !== null) {
|
||||
result[key] = redactApiKeys(value)
|
||||
} else {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import { filterUserFileForDisplay, isUserFile } from '@/lib/core/utils/user-file'
|
||||
|
||||
const MAX_STRING_LENGTH = 15000
|
||||
const MAX_DEPTH = 50
|
||||
|
||||
@@ -8,32 +10,9 @@ function truncateString(value: string, maxLength = MAX_STRING_LENGTH): string {
|
||||
return `${value.substring(0, maxLength)}... [truncated ${value.length - maxLength} chars]`
|
||||
}
|
||||
|
||||
export function isUserFile(candidate: unknown): candidate is {
|
||||
id: string
|
||||
name: string
|
||||
url: string
|
||||
key: string
|
||||
size: number
|
||||
type: string
|
||||
context?: string
|
||||
} {
|
||||
if (!candidate || typeof candidate !== 'object') {
|
||||
return false
|
||||
}
|
||||
|
||||
const value = candidate as Record<string, unknown>
|
||||
return (
|
||||
typeof value.id === 'string' &&
|
||||
typeof value.key === 'string' &&
|
||||
typeof value.url === 'string' &&
|
||||
typeof value.name === 'string'
|
||||
)
|
||||
}
|
||||
|
||||
function filterUserFile(data: any): any {
|
||||
if (isUserFile(data)) {
|
||||
const { id, name, url, size, type } = data
|
||||
return { id, name, url, size, type }
|
||||
return filterUserFileForDisplay(data)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
57
apps/sim/lib/core/utils/user-file.ts
Normal file
57
apps/sim/lib/core/utils/user-file.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import type { UserFile } from '@/executor/types'
|
||||
|
||||
export type UserFileLike = Pick<UserFile, 'id' | 'name' | 'url' | 'key'> &
|
||||
Partial<Pick<UserFile, 'size' | 'type' | 'context' | 'base64'>>
|
||||
|
||||
/**
|
||||
* Fields exposed for UserFile objects in UI (tag dropdown) and logs.
|
||||
* Internal fields like 'key' and 'context' are not exposed.
|
||||
*/
|
||||
export const USER_FILE_DISPLAY_FIELDS = ['id', 'name', 'url', 'size', 'type', 'base64'] as const
|
||||
|
||||
export type UserFileDisplayField = (typeof USER_FILE_DISPLAY_FIELDS)[number]
|
||||
|
||||
/**
|
||||
* Checks if a value matches the minimal UserFile shape.
|
||||
*/
|
||||
export function isUserFile(value: unknown): value is UserFileLike {
|
||||
if (!value || typeof value !== 'object') {
|
||||
return false
|
||||
}
|
||||
|
||||
const candidate = value as Record<string, unknown>
|
||||
|
||||
return (
|
||||
typeof candidate.id === 'string' &&
|
||||
typeof candidate.key === 'string' &&
|
||||
typeof candidate.url === 'string' &&
|
||||
typeof candidate.name === 'string'
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a value matches the full UserFile metadata shape.
|
||||
*/
|
||||
export function isUserFileWithMetadata(value: unknown): value is UserFile {
|
||||
if (!isUserFile(value)) {
|
||||
return false
|
||||
}
|
||||
|
||||
const candidate = value as Record<string, unknown>
|
||||
|
||||
return typeof candidate.size === 'number' && typeof candidate.type === 'string'
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters a UserFile object to only include display fields.
|
||||
* Used for both UI display and log sanitization.
|
||||
*/
|
||||
export function filterUserFileForDisplay(data: Record<string, unknown>): Record<string, unknown> {
|
||||
const filtered: Record<string, unknown> = {}
|
||||
for (const field of USER_FILE_DISPLAY_FIELDS) {
|
||||
if (field in data) {
|
||||
filtered[field] = data[field]
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { isUserFile } from '@/lib/core/utils/display-filters'
|
||||
import { isUserFile } from '@/lib/core/utils/user-file'
|
||||
import type { ExecutionContext } from '@/lib/uploads/contexts/execution/utils'
|
||||
import { generateExecutionFileKey, generateFileId } from '@/lib/uploads/contexts/execution/utils'
|
||||
import type { UserFile } from '@/executor/types'
|
||||
|
||||
318
apps/sim/lib/uploads/utils/user-file-base64.server.ts
Normal file
318
apps/sim/lib/uploads/utils/user-file-base64.server.ts
Normal file
@@ -0,0 +1,318 @@
|
||||
import type { Logger } from '@sim/logger'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getRedisClient } from '@/lib/core/config/redis'
|
||||
import { isUserFileWithMetadata } from '@/lib/core/utils/user-file'
|
||||
import { bufferToBase64 } from '@/lib/uploads/utils/file-utils'
|
||||
import { downloadFileFromStorage, downloadFileFromUrl } from '@/lib/uploads/utils/file-utils.server'
|
||||
import type { UserFile } from '@/executor/types'
|
||||
|
||||
const DEFAULT_MAX_BASE64_BYTES = 10 * 1024 * 1024
|
||||
const DEFAULT_TIMEOUT_MS = 180000
|
||||
const DEFAULT_CACHE_TTL_SECONDS = 300
|
||||
const REDIS_KEY_PREFIX = 'user-file:base64:'
|
||||
|
||||
interface Base64Cache {
|
||||
get(file: UserFile): Promise<string | null>
|
||||
set(file: UserFile, value: string, ttlSeconds: number): Promise<void>
|
||||
}
|
||||
|
||||
interface HydrationState {
|
||||
seen: WeakSet<object>
|
||||
cache: Base64Cache
|
||||
cacheTtlSeconds: number
|
||||
}
|
||||
|
||||
export interface Base64HydrationOptions {
|
||||
requestId: string
|
||||
executionId?: string
|
||||
logger?: Logger
|
||||
maxBytes?: number
|
||||
allowUnknownSize?: boolean
|
||||
timeoutMs?: number
|
||||
cacheTtlSeconds?: number
|
||||
}
|
||||
|
||||
class InMemoryBase64Cache implements Base64Cache {
|
||||
private entries = new Map<string, { value: string; expiresAt: number }>()
|
||||
|
||||
async get(file: UserFile): Promise<string | null> {
|
||||
const key = getFileCacheKey(file)
|
||||
const entry = this.entries.get(key)
|
||||
if (!entry) {
|
||||
return null
|
||||
}
|
||||
if (entry.expiresAt <= Date.now()) {
|
||||
this.entries.delete(key)
|
||||
return null
|
||||
}
|
||||
return entry.value
|
||||
}
|
||||
|
||||
async set(file: UserFile, value: string, ttlSeconds: number): Promise<void> {
|
||||
const key = getFileCacheKey(file)
|
||||
const expiresAt = Date.now() + ttlSeconds * 1000
|
||||
this.entries.set(key, { value, expiresAt })
|
||||
}
|
||||
}
|
||||
|
||||
function createBase64Cache(options: Base64HydrationOptions, logger: Logger): Base64Cache {
|
||||
const redis = getRedisClient()
|
||||
const { executionId } = options
|
||||
|
||||
if (!redis) {
|
||||
logger.warn(
|
||||
`[${options.requestId}] Redis unavailable for base64 cache, using in-memory fallback`
|
||||
)
|
||||
return new InMemoryBase64Cache()
|
||||
}
|
||||
|
||||
return {
|
||||
async get(file: UserFile) {
|
||||
try {
|
||||
const key = getFullCacheKey(executionId, file)
|
||||
return await redis.get(key)
|
||||
} catch (error) {
|
||||
logger.warn(`[${options.requestId}] Redis get failed, skipping cache`, error)
|
||||
return null
|
||||
}
|
||||
},
|
||||
async set(file: UserFile, value: string, ttlSeconds: number) {
|
||||
try {
|
||||
const key = getFullCacheKey(executionId, file)
|
||||
await redis.set(key, value, 'EX', ttlSeconds)
|
||||
} catch (error) {
|
||||
logger.warn(`[${options.requestId}] Redis set failed, skipping cache`, error)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
function createHydrationState(options: Base64HydrationOptions, logger: Logger): HydrationState {
|
||||
return {
|
||||
seen: new WeakSet<object>(),
|
||||
cache: createBase64Cache(options, logger),
|
||||
cacheTtlSeconds: options.cacheTtlSeconds ?? DEFAULT_CACHE_TTL_SECONDS,
|
||||
}
|
||||
}
|
||||
|
||||
function getHydrationLogger(options: Base64HydrationOptions): Logger {
|
||||
return options.logger ?? createLogger('UserFileBase64')
|
||||
}
|
||||
|
||||
function getFileCacheKey(file: UserFile): string {
|
||||
if (file.key) {
|
||||
return `key:${file.key}`
|
||||
}
|
||||
if (file.url) {
|
||||
return `url:${file.url}`
|
||||
}
|
||||
return `id:${file.id}`
|
||||
}
|
||||
|
||||
function getFullCacheKey(executionId: string | undefined, file: UserFile): string {
|
||||
const fileKey = getFileCacheKey(file)
|
||||
if (executionId) {
|
||||
return `${REDIS_KEY_PREFIX}exec:${executionId}:${fileKey}`
|
||||
}
|
||||
return `${REDIS_KEY_PREFIX}${fileKey}`
|
||||
}
|
||||
|
||||
async function resolveBase64(
|
||||
file: UserFile,
|
||||
options: Base64HydrationOptions,
|
||||
logger: Logger
|
||||
): Promise<string | null> {
|
||||
if (file.base64) {
|
||||
return file.base64
|
||||
}
|
||||
|
||||
const maxBytes = options.maxBytes ?? DEFAULT_MAX_BASE64_BYTES
|
||||
const allowUnknownSize = options.allowUnknownSize ?? false
|
||||
const timeoutMs = options.timeoutMs ?? DEFAULT_TIMEOUT_MS
|
||||
const hasStableStorageKey = Boolean(file.key)
|
||||
|
||||
if (Number.isFinite(file.size) && file.size > maxBytes) {
|
||||
logger.warn(
|
||||
`[${options.requestId}] Skipping base64 for ${file.name} (size ${file.size} exceeds ${maxBytes})`
|
||||
)
|
||||
return null
|
||||
}
|
||||
|
||||
if (
|
||||
(!Number.isFinite(file.size) || file.size <= 0) &&
|
||||
!allowUnknownSize &&
|
||||
!hasStableStorageKey
|
||||
) {
|
||||
logger.warn(`[${options.requestId}] Skipping base64 for ${file.name} (unknown file size)`)
|
||||
return null
|
||||
}
|
||||
|
||||
let buffer: Buffer | null = null
|
||||
|
||||
if (file.key) {
|
||||
try {
|
||||
buffer = await downloadFileFromStorage(file, options.requestId, logger)
|
||||
} catch (error) {
|
||||
logger.warn(
|
||||
`[${options.requestId}] Failed to download ${file.name} from storage, trying URL fallback`,
|
||||
error
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if (!buffer && file.url) {
|
||||
try {
|
||||
buffer = await downloadFileFromUrl(file.url, timeoutMs)
|
||||
} catch (error) {
|
||||
logger.warn(`[${options.requestId}] Failed to download ${file.name} from URL`, error)
|
||||
}
|
||||
}
|
||||
|
||||
if (!buffer) {
|
||||
return null
|
||||
}
|
||||
|
||||
if (buffer.length > maxBytes) {
|
||||
logger.warn(
|
||||
`[${options.requestId}] Skipping base64 for ${file.name} (downloaded ${buffer.length} exceeds ${maxBytes})`
|
||||
)
|
||||
return null
|
||||
}
|
||||
|
||||
return bufferToBase64(buffer)
|
||||
}
|
||||
|
||||
async function hydrateUserFile(
|
||||
file: UserFile,
|
||||
options: Base64HydrationOptions,
|
||||
state: HydrationState,
|
||||
logger: Logger
|
||||
): Promise<UserFile> {
|
||||
const cached = await state.cache.get(file)
|
||||
if (cached) {
|
||||
return { ...file, base64: cached }
|
||||
}
|
||||
|
||||
const base64 = await resolveBase64(file, options, logger)
|
||||
if (!base64) {
|
||||
return file
|
||||
}
|
||||
|
||||
await state.cache.set(file, base64, state.cacheTtlSeconds)
|
||||
return { ...file, base64 }
|
||||
}
|
||||
|
||||
async function hydrateValue(
|
||||
value: unknown,
|
||||
options: Base64HydrationOptions,
|
||||
state: HydrationState,
|
||||
logger: Logger
|
||||
): Promise<unknown> {
|
||||
if (!value || typeof value !== 'object') {
|
||||
return value
|
||||
}
|
||||
|
||||
if (isUserFileWithMetadata(value)) {
|
||||
return hydrateUserFile(value, options, state, logger)
|
||||
}
|
||||
|
||||
if (state.seen.has(value)) {
|
||||
return value
|
||||
}
|
||||
state.seen.add(value)
|
||||
|
||||
if (Array.isArray(value)) {
|
||||
const hydratedItems = await Promise.all(
|
||||
value.map((item) => hydrateValue(item, options, state, logger))
|
||||
)
|
||||
return hydratedItems
|
||||
}
|
||||
|
||||
const entries = await Promise.all(
|
||||
Object.entries(value).map(async ([key, entryValue]) => {
|
||||
const hydratedEntry = await hydrateValue(entryValue, options, state, logger)
|
||||
return [key, hydratedEntry] as const
|
||||
})
|
||||
)
|
||||
|
||||
return Object.fromEntries(entries)
|
||||
}
|
||||
|
||||
/**
|
||||
* Hydrates UserFile objects within a value to include base64 content.
|
||||
* Returns the original structure with UserFile.base64 set where available.
|
||||
*/
|
||||
export async function hydrateUserFilesWithBase64(
|
||||
value: unknown,
|
||||
options: Base64HydrationOptions
|
||||
): Promise<unknown> {
|
||||
const logger = getHydrationLogger(options)
|
||||
const state = createHydrationState(options, logger)
|
||||
return hydrateValue(value, options, state, logger)
|
||||
}
|
||||
|
||||
function isPlainObject(value: unknown): value is Record<string, unknown> {
|
||||
if (!value || typeof value !== 'object') {
|
||||
return false
|
||||
}
|
||||
const proto = Object.getPrototypeOf(value)
|
||||
return proto === Object.prototype || proto === null
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a value contains any UserFile objects with metadata.
|
||||
*/
|
||||
export function containsUserFileWithMetadata(value: unknown): boolean {
|
||||
if (!value || typeof value !== 'object') {
|
||||
return false
|
||||
}
|
||||
|
||||
if (isUserFileWithMetadata(value)) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (Array.isArray(value)) {
|
||||
return value.some((item) => containsUserFileWithMetadata(item))
|
||||
}
|
||||
|
||||
if (!isPlainObject(value)) {
|
||||
return false
|
||||
}
|
||||
|
||||
return Object.values(value).some((entry) => containsUserFileWithMetadata(entry))
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleans up base64 cache entries for a specific execution.
|
||||
* Should be called at the end of workflow execution.
|
||||
*/
|
||||
export async function cleanupExecutionBase64Cache(executionId: string): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
return
|
||||
}
|
||||
|
||||
const pattern = `${REDIS_KEY_PREFIX}exec:${executionId}:*`
|
||||
const logger = createLogger('UserFileBase64')
|
||||
|
||||
try {
|
||||
let cursor = '0'
|
||||
let deletedCount = 0
|
||||
|
||||
do {
|
||||
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100)
|
||||
cursor = nextCursor
|
||||
|
||||
if (keys.length > 0) {
|
||||
await redis.del(...keys)
|
||||
deletedCount += keys.length
|
||||
}
|
||||
} while (cursor !== '0')
|
||||
|
||||
if (deletedCount > 0) {
|
||||
logger.info(`Cleaned up ${deletedCount} base64 cache entries for execution ${executionId}`)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn(`Failed to cleanup base64 cache for execution ${executionId}`, error)
|
||||
}
|
||||
}
|
||||
@@ -5,7 +5,7 @@ import { and, eq, isNull, or, sql } from 'drizzle-orm'
|
||||
import { nanoid } from 'nanoid'
|
||||
import Parser from 'rss-parser'
|
||||
import { pollingIdempotency } from '@/lib/core/idempotency/service'
|
||||
import { createPinnedUrl, validateUrlWithDNS } from '@/lib/core/security/input-validation'
|
||||
import { validateUrlWithDNS } from '@/lib/core/security/input-validation'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
|
||||
|
||||
@@ -265,11 +265,10 @@ async function fetchNewRssItems(
|
||||
throw new Error(`Invalid RSS feed URL: ${urlValidation.error}`)
|
||||
}
|
||||
|
||||
const pinnedUrl = createPinnedUrl(config.feedUrl, urlValidation.resolvedIP!)
|
||||
|
||||
const response = await fetch(pinnedUrl, {
|
||||
// Use the original URL after DNS validation passes.
|
||||
// DNS pinning breaks TLS SNI for HTTPS; validation already ensures IP is safe.
|
||||
const response = await fetch(config.feedUrl, {
|
||||
headers: {
|
||||
Host: urlValidation.originalHostname!,
|
||||
'User-Agent': 'Sim/1.0 RSS Poller',
|
||||
Accept: 'application/rss+xml, application/xml, text/xml, */*',
|
||||
},
|
||||
|
||||
@@ -3,7 +3,7 @@ import { account, webhook } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq, isNull, or } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { createPinnedUrl, validateUrlWithDNS } from '@/lib/core/security/input-validation'
|
||||
import { validateUrlWithDNS } from '@/lib/core/security/input-validation'
|
||||
import type { DbOrTx } from '@/lib/db/types'
|
||||
import { refreshAccessTokenIfNeeded } from '@/app/api/auth/oauth/utils'
|
||||
|
||||
@@ -108,17 +108,15 @@ async function fetchWithDNSPinning(
|
||||
return null
|
||||
}
|
||||
|
||||
const pinnedUrl = createPinnedUrl(url, urlValidation.resolvedIP!)
|
||||
|
||||
const headers: Record<string, string> = {
|
||||
Host: urlValidation.originalHostname!,
|
||||
}
|
||||
// Use the original URL after DNS validation passes.
|
||||
// DNS pinning breaks TLS SNI for HTTPS; validation already ensures IP is safe.
|
||||
const headers: Record<string, string> = {}
|
||||
|
||||
if (accessToken) {
|
||||
headers.Authorization = `Bearer ${accessToken}`
|
||||
}
|
||||
|
||||
const response = await fetch(pinnedUrl, {
|
||||
const response = await fetch(url, {
|
||||
headers,
|
||||
redirect: 'follow',
|
||||
})
|
||||
|
||||
@@ -7,6 +7,10 @@ import {
|
||||
import { encodeSSE } from '@/lib/core/utils/sse'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
import { processStreamingBlockLogs } from '@/lib/tokenization'
|
||||
import {
|
||||
cleanupExecutionBase64Cache,
|
||||
hydrateUserFilesWithBase64,
|
||||
} from '@/lib/uploads/utils/user-file-base64.server'
|
||||
import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow'
|
||||
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
|
||||
@@ -26,6 +30,8 @@ export interface StreamingConfig {
|
||||
selectedOutputs?: string[]
|
||||
isSecureMode?: boolean
|
||||
workflowTriggerType?: 'api' | 'chat'
|
||||
includeFileBase64?: boolean
|
||||
base64MaxBytes?: number
|
||||
}
|
||||
|
||||
export interface StreamingResponseOptions {
|
||||
@@ -57,12 +63,14 @@ function isDangerousKey(key: string): boolean {
|
||||
return DANGEROUS_KEYS.includes(key)
|
||||
}
|
||||
|
||||
function buildMinimalResult(
|
||||
async function buildMinimalResult(
|
||||
result: ExecutionResult,
|
||||
selectedOutputs: string[] | undefined,
|
||||
streamedContent: Map<string, string>,
|
||||
requestId: string
|
||||
): { success: boolean; error?: string; output: Record<string, unknown> } {
|
||||
requestId: string,
|
||||
includeFileBase64: boolean,
|
||||
base64MaxBytes: number | undefined
|
||||
): Promise<{ success: boolean; error?: string; output: Record<string, unknown> }> {
|
||||
const minimalResult = {
|
||||
success: result.success,
|
||||
error: result.error,
|
||||
@@ -223,6 +231,9 @@ export async function createStreamingResponse(
|
||||
}
|
||||
}
|
||||
|
||||
const includeFileBase64 = streamConfig.includeFileBase64 ?? true
|
||||
const base64MaxBytes = streamConfig.base64MaxBytes
|
||||
|
||||
const onBlockCompleteCallback = async (blockId: string, output: unknown) => {
|
||||
if (!streamConfig.selectedOutputs?.length) {
|
||||
return
|
||||
@@ -241,8 +252,17 @@ export async function createStreamingResponse(
|
||||
const outputValue = extractOutputValue(output, path)
|
||||
|
||||
if (outputValue !== undefined) {
|
||||
const hydratedOutput = includeFileBase64
|
||||
? await hydrateUserFilesWithBase64(outputValue, {
|
||||
requestId,
|
||||
executionId,
|
||||
maxBytes: base64MaxBytes,
|
||||
})
|
||||
: outputValue
|
||||
const formattedOutput =
|
||||
typeof outputValue === 'string' ? outputValue : JSON.stringify(outputValue, null, 2)
|
||||
typeof hydratedOutput === 'string'
|
||||
? hydratedOutput
|
||||
: JSON.stringify(hydratedOutput, null, 2)
|
||||
sendChunk(blockId, formattedOutput)
|
||||
}
|
||||
}
|
||||
@@ -273,21 +293,33 @@ export async function createStreamingResponse(
|
||||
|
||||
await completeLoggingSession(result)
|
||||
|
||||
const minimalResult = buildMinimalResult(
|
||||
const minimalResult = await buildMinimalResult(
|
||||
result,
|
||||
streamConfig.selectedOutputs,
|
||||
state.streamedContent,
|
||||
requestId
|
||||
requestId,
|
||||
streamConfig.includeFileBase64 ?? true,
|
||||
streamConfig.base64MaxBytes
|
||||
)
|
||||
|
||||
controller.enqueue(encodeSSE({ event: 'final', data: minimalResult }))
|
||||
controller.enqueue(encodeSSE('[DONE]'))
|
||||
|
||||
if (executionId) {
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
}
|
||||
|
||||
controller.close()
|
||||
} catch (error: any) {
|
||||
logger.error(`[${requestId}] Stream error:`, error)
|
||||
controller.enqueue(
|
||||
encodeSSE({ event: 'error', error: error.message || 'Stream processing error' })
|
||||
)
|
||||
|
||||
if (executionId) {
|
||||
await cleanupExecutionBase64Cache(executionId)
|
||||
}
|
||||
|
||||
controller.close()
|
||||
}
|
||||
},
|
||||
|
||||
@@ -5,7 +5,14 @@ export interface InputFormatField {
|
||||
value?: unknown
|
||||
}
|
||||
|
||||
export const USER_FILE_ACCESSIBLE_PROPERTIES = ['id', 'name', 'url', 'size', 'type'] as const
|
||||
export const USER_FILE_ACCESSIBLE_PROPERTIES = [
|
||||
'id',
|
||||
'name',
|
||||
'url',
|
||||
'size',
|
||||
'type',
|
||||
'base64',
|
||||
] as const
|
||||
|
||||
export type UserFileAccessibleProperty = (typeof USER_FILE_ACCESSIBLE_PROPERTIES)[number]
|
||||
|
||||
@@ -15,6 +22,7 @@ export const USER_FILE_PROPERTY_TYPES: Record<UserFileAccessibleProperty, string
|
||||
url: 'string',
|
||||
size: 'number',
|
||||
type: 'string',
|
||||
base64: 'string',
|
||||
} as const
|
||||
|
||||
export const START_BLOCK_RESERVED_FIELDS = ['input', 'conversationId', 'files'] as const
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { UserFile } from '@/executor/types'
|
||||
import type {
|
||||
FileParseApiMultiResponse,
|
||||
FileParseApiResponse,
|
||||
FileParseResult,
|
||||
FileParserInput,
|
||||
FileParserOutput,
|
||||
@@ -9,6 +12,23 @@ import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const logger = createLogger('FileParserTool')
|
||||
|
||||
interface FileUploadObject {
|
||||
path: string
|
||||
name?: string
|
||||
size?: number
|
||||
type?: string
|
||||
}
|
||||
|
||||
interface ToolBodyParams extends Partial<FileParserInput> {
|
||||
file?: FileUploadObject | FileUploadObject[]
|
||||
files?: FileUploadObject[]
|
||||
_context?: {
|
||||
workspaceId?: string
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
}
|
||||
}
|
||||
|
||||
export const fileParserTool: ToolConfig<FileParserInput, FileParserOutput> = {
|
||||
id: 'file_parser',
|
||||
name: 'File Parser',
|
||||
@@ -36,7 +56,7 @@ export const fileParserTool: ToolConfig<FileParserInput, FileParserOutput> = {
|
||||
headers: () => ({
|
||||
'Content-Type': 'application/json',
|
||||
}),
|
||||
body: (params: any) => {
|
||||
body: (params: ToolBodyParams) => {
|
||||
logger.info('Request parameters received by tool body:', params)
|
||||
|
||||
if (!params) {
|
||||
@@ -57,11 +77,10 @@ export const fileParserTool: ToolConfig<FileParserInput, FileParserOutput> = {
|
||||
// 2. Check for file upload (array)
|
||||
else if (params.file && Array.isArray(params.file) && params.file.length > 0) {
|
||||
logger.info('Tool body processing file array upload')
|
||||
const filePaths = params.file.map((file: any) => file.path)
|
||||
determinedFilePath = filePaths // Always send as array
|
||||
determinedFilePath = params.file.map((file) => file.path)
|
||||
}
|
||||
// 3. Check for file upload (single object)
|
||||
else if (params.file?.path) {
|
||||
else if (params.file && !Array.isArray(params.file) && params.file.path) {
|
||||
logger.info('Tool body processing single file object upload')
|
||||
determinedFilePath = params.file.path
|
||||
}
|
||||
@@ -69,7 +88,7 @@ export const fileParserTool: ToolConfig<FileParserInput, FileParserOutput> = {
|
||||
else if (params.files && Array.isArray(params.files)) {
|
||||
logger.info('Tool body processing legacy files array:', params.files.length)
|
||||
if (params.files.length > 0) {
|
||||
determinedFilePath = params.files.map((file: any) => file.path)
|
||||
determinedFilePath = params.files.map((file) => file.path)
|
||||
} else {
|
||||
logger.warn('Legacy files array provided but is empty')
|
||||
}
|
||||
@@ -86,6 +105,8 @@ export const fileParserTool: ToolConfig<FileParserInput, FileParserOutput> = {
|
||||
filePath: determinedFilePath,
|
||||
fileType: determinedFileType,
|
||||
workspaceId: params.workspaceId || params._context?.workspaceId,
|
||||
workflowId: params._context?.workflowId,
|
||||
executionId: params._context?.executionId,
|
||||
}
|
||||
},
|
||||
},
|
||||
@@ -93,21 +114,26 @@ export const fileParserTool: ToolConfig<FileParserInput, FileParserOutput> = {
|
||||
transformResponse: async (response: Response): Promise<FileParserOutput> => {
|
||||
logger.info('Received response status:', response.status)
|
||||
|
||||
const result = await response.json()
|
||||
const result = (await response.json()) as FileParseApiResponse | FileParseApiMultiResponse
|
||||
logger.info('Response parsed successfully')
|
||||
|
||||
// Handle multiple files response
|
||||
if (result.results) {
|
||||
if ('results' in result) {
|
||||
logger.info('Processing multiple files response')
|
||||
|
||||
// Extract individual file results
|
||||
const fileResults = result.results.map((fileResult: any) => {
|
||||
return fileResult.output || fileResult
|
||||
const fileResults: FileParseResult[] = result.results.map((fileResult) => {
|
||||
return fileResult.output || (fileResult as unknown as FileParseResult)
|
||||
})
|
||||
|
||||
// Collect UserFile objects from results
|
||||
const processedFiles: UserFile[] = fileResults
|
||||
.filter((file): file is FileParseResult & { file: UserFile } => Boolean(file.file))
|
||||
.map((file) => file.file)
|
||||
|
||||
// Combine all file contents with clear dividers
|
||||
const combinedContent = fileResults
|
||||
.map((file: FileParseResult, index: number) => {
|
||||
.map((file, index) => {
|
||||
const divider = `\n${'='.repeat(80)}\n`
|
||||
|
||||
return file.content + (index < fileResults.length - 1 ? divider : '')
|
||||
@@ -118,6 +144,7 @@ export const fileParserTool: ToolConfig<FileParserInput, FileParserOutput> = {
|
||||
const output: FileParserOutputData = {
|
||||
files: fileResults,
|
||||
combinedContent,
|
||||
...(processedFiles.length > 0 && { processedFiles }),
|
||||
}
|
||||
|
||||
return {
|
||||
@@ -129,10 +156,13 @@ export const fileParserTool: ToolConfig<FileParserInput, FileParserOutput> = {
|
||||
// Handle single file response
|
||||
logger.info('Successfully parsed file:', result.output?.name || 'unknown')
|
||||
|
||||
const fileOutput: FileParseResult = result.output || (result as unknown as FileParseResult)
|
||||
|
||||
// For a single file, create the output with just array format
|
||||
const output: FileParserOutputData = {
|
||||
files: [result.output || result],
|
||||
combinedContent: result.output?.content || result.content || '',
|
||||
files: [fileOutput],
|
||||
combinedContent: fileOutput?.content || result.content || '',
|
||||
...(fileOutput?.file && { processedFiles: [fileOutput.file] }),
|
||||
}
|
||||
|
||||
return {
|
||||
@@ -142,7 +172,8 @@ export const fileParserTool: ToolConfig<FileParserInput, FileParserOutput> = {
|
||||
},
|
||||
|
||||
outputs: {
|
||||
files: { type: 'array', description: 'Array of parsed files' },
|
||||
files: { type: 'array', description: 'Array of parsed files with content and metadata' },
|
||||
combinedContent: { type: 'string', description: 'Combined content of all parsed files' },
|
||||
processedFiles: { type: 'file[]', description: 'Array of UserFile objects for downstream use' },
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import type { UserFile } from '@/executor/types'
|
||||
import type { ToolResponse } from '@/tools/types'
|
||||
|
||||
export interface FileParserInput {
|
||||
filePath: string | string[]
|
||||
fileType?: string
|
||||
workspaceId?: string
|
||||
workflowId?: string
|
||||
executionId?: string
|
||||
}
|
||||
|
||||
export interface FileParseResult {
|
||||
@@ -11,15 +15,43 @@ export interface FileParseResult {
|
||||
size: number
|
||||
name: string
|
||||
binary: boolean
|
||||
metadata?: Record<string, any>
|
||||
metadata?: Record<string, unknown>
|
||||
/** UserFile object for the raw file (stored in execution storage) */
|
||||
file?: UserFile
|
||||
}
|
||||
|
||||
export interface FileParserOutputData {
|
||||
/** Array of parsed file results with content and optional UserFile */
|
||||
files: FileParseResult[]
|
||||
/** Combined text content from all files */
|
||||
combinedContent: string
|
||||
[key: string]: any
|
||||
/** Array of UserFile objects for downstream use (attachments, uploads, etc.) */
|
||||
processedFiles?: UserFile[]
|
||||
[key: string]: unknown
|
||||
}
|
||||
|
||||
export interface FileParserOutput extends ToolResponse {
|
||||
output: FileParserOutputData
|
||||
}
|
||||
|
||||
/** API response structure for single file parse */
|
||||
export interface FileParseApiResponse {
|
||||
success: boolean
|
||||
output?: FileParseResult
|
||||
content?: string
|
||||
filePath?: string
|
||||
viewerUrl?: string | null
|
||||
error?: string
|
||||
}
|
||||
|
||||
/** API response structure for multiple file parse */
|
||||
export interface FileParseApiMultiResponse {
|
||||
success: boolean
|
||||
results: Array<{
|
||||
success: boolean
|
||||
output?: FileParseResult
|
||||
filePath?: string
|
||||
viewerUrl?: string | null
|
||||
error?: string
|
||||
}>
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user