feat(execution-filesystem): system to pass files between blocks (#866)

* feat(files): pass files between blocks

* presigned URL for downloads

* Remove latest migration before merge

* starter block file upload wasn't getting logged

* checkpoint in human readable form

* checkpoint files / file type outputs

* file downloads working for block outputs

* checkpoint file download

* fix type issues

* remove filereference interface with simpler user file interface

* show files in the tag dropdown for start block

* more migration to simple url object, reduce presigned time to 5 min

* Remove migration 0065_parallel_nightmare and related files

- Deleted apps/sim/db/migrations/0065_parallel_nightmare.sql
- Deleted apps/sim/db/migrations/meta/0065_snapshot.json
- Removed 0065 entry from apps/sim/db/migrations/meta/_journal.json

Preparing for merge with origin/staging and migration regeneration

* add migration files

* fix tests

* Update apps/sim/lib/uploads/setup.ts

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Update apps/sim/lib/workflows/execution-file-storage.ts

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Update apps/sim/lib/workflows/execution-file-storage.ts

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* cleanup types

* fix lint

* fix logs typing for file refs

* open download in new tab

* fixed

* Update apps/sim/tools/index.ts

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* fix file block

* cleanup unused code

* fix bugs

* remove hacky file id logic

* fix drag and drop

* fix tests

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
This commit is contained in:
Vikhyath Mondreti
2025-08-07 12:51:30 -07:00
committed by GitHub
parent 75963eb851
commit de93e167af
60 changed files with 8487 additions and 455 deletions

View File

@@ -50,7 +50,7 @@ Choose your input method from the dropdown:
<video autoPlay loop muted playsInline className="w-full -mb-2 rounded-lg" src="/chat-input.mp4"></video>
</div>
<p className="text-sm text-gray-600">Chat with your workflow and access both input text and conversation ID for context-aware responses.</p>
<p className="text-sm text-gray-600">Chat with your workflow and access input text, conversation ID, and uploaded files for context-aware responses.</p>
</div>
</Tab>
</Tabs>
@@ -60,13 +60,15 @@ Choose your input method from the dropdown:
In Chat mode, access user input and conversation context through special variables:
```yaml
# Reference the chat input and conversation ID in your workflow
# Reference the chat input, conversation ID, and files in your workflow
user_message: "<start.input>"
conversation_id: "<start.conversationId>"
uploaded_files: "<start.files>"
```
- **`<start.input>`** - Contains the user's message text
- **`<start.conversationId>`** - Unique identifier for the conversation thread
- **`<start.files>`** - Array of files uploaded by the user (if any)
## API Execution

View File

@@ -712,6 +712,7 @@ export function mockFileSystem(
}
return Promise.reject(new Error('File not found'))
}),
mkdir: vi.fn().mockResolvedValue(undefined),
}))
}
@@ -761,14 +762,15 @@ export function createStorageProviderMocks(options: StorageProviderMockOptions =
getStorageProvider: vi.fn().mockReturnValue(provider),
isUsingCloudStorage: vi.fn().mockReturnValue(isCloudEnabled),
uploadFile: vi.fn().mockResolvedValue({
path: '/api/files/serve/test-key',
key: 'test-key',
path: '/api/files/serve/test-key.txt',
key: 'test-key.txt',
name: 'test.txt',
size: 100,
type: 'text/plain',
}),
downloadFile: vi.fn().mockResolvedValue(Buffer.from('test content')),
deleteFile: vi.fn().mockResolvedValue(undefined),
getPresignedUrl: vi.fn().mockResolvedValue(presignedUrl),
}))
if (provider === 's3') {
@@ -1235,14 +1237,15 @@ export function setupFileApiMocks(
getStorageProvider: vi.fn().mockReturnValue('local'),
isUsingCloudStorage: vi.fn().mockReturnValue(cloudEnabled),
uploadFile: vi.fn().mockResolvedValue({
path: '/api/files/serve/test-key',
key: 'test-key',
path: '/api/files/serve/test-key.txt',
key: 'test-key.txt',
name: 'test.txt',
size: 100,
type: 'text/plain',
}),
downloadFile: vi.fn().mockResolvedValue(Buffer.from('test content')),
deleteFile: vi.fn().mockResolvedValue(undefined),
getPresignedUrl: vi.fn().mockResolvedValue('https://example.com/presigned-url'),
}))
}
@@ -1347,8 +1350,8 @@ export function mockUploadUtils(
const {
isCloudStorage = false,
uploadResult = {
path: '/api/files/serve/test-key',
key: 'test-key',
path: '/api/files/serve/test-key.txt',
key: 'test-key.txt',
name: 'test.txt',
size: 100,
type: 'text/plain',

View File

@@ -0,0 +1,99 @@
import { type NextRequest, NextResponse } from 'next/server'
import { createLogger } from '@/lib/logs/console/logger'
import { getPresignedUrl, getPresignedUrlWithConfig, isUsingCloudStorage } from '@/lib/uploads'
import { BLOB_EXECUTION_FILES_CONFIG, S3_EXECUTION_FILES_CONFIG } from '@/lib/uploads/setup'
import { createErrorResponse } from '@/app/api/files/utils'
const logger = createLogger('FileDownload')
export const dynamic = 'force-dynamic'
export async function POST(request: NextRequest) {
try {
const body = await request.json()
const { key, name, storageProvider, bucketName, isExecutionFile } = body
if (!key) {
return createErrorResponse(new Error('File key is required'), 400)
}
logger.info(`Generating download URL for file: ${name || key}`)
if (isUsingCloudStorage()) {
// Generate a fresh 5-minute presigned URL for cloud storage
try {
let downloadUrl: string
// Use execution files storage if flagged as execution file
if (isExecutionFile) {
logger.info(`Using execution files storage for file: ${key}`)
downloadUrl = await getPresignedUrlWithConfig(
key,
{
bucket: S3_EXECUTION_FILES_CONFIG.bucket,
region: S3_EXECUTION_FILES_CONFIG.region,
},
5 * 60 // 5 minutes
)
} else if (storageProvider && (storageProvider === 's3' || storageProvider === 'blob')) {
// Use explicitly specified storage provider (legacy support)
logger.info(`Using specified storage provider '${storageProvider}' for file: ${key}`)
if (storageProvider === 's3') {
downloadUrl = await getPresignedUrlWithConfig(
key,
{
bucket: bucketName || S3_EXECUTION_FILES_CONFIG.bucket,
region: S3_EXECUTION_FILES_CONFIG.region,
},
5 * 60 // 5 minutes
)
} else {
// blob
downloadUrl = await getPresignedUrlWithConfig(
key,
{
accountName: BLOB_EXECUTION_FILES_CONFIG.accountName,
accountKey: BLOB_EXECUTION_FILES_CONFIG.accountKey,
connectionString: BLOB_EXECUTION_FILES_CONFIG.connectionString,
containerName: bucketName || BLOB_EXECUTION_FILES_CONFIG.containerName,
},
5 * 60 // 5 minutes
)
}
} else {
// Use default storage (regular uploads)
logger.info(`Using default storage for file: ${key}`)
downloadUrl = await getPresignedUrl(key, 5 * 60) // 5 minutes
}
return NextResponse.json({
downloadUrl,
expiresIn: 300, // 5 minutes in seconds
fileName: name || key.split('/').pop() || 'download',
})
} catch (error) {
logger.error(`Failed to generate presigned URL for ${key}:`, error)
return createErrorResponse(
error instanceof Error ? error : new Error('Failed to generate download URL'),
500
)
}
} else {
// For local storage, return the direct path
const downloadUrl = `${process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3000'}/api/files/serve/${key}`
return NextResponse.json({
downloadUrl,
expiresIn: null, // Local URLs don't expire
fileName: name || key.split('/').pop() || 'download',
})
}
} catch (error) {
logger.error('Error in file download endpoint:', error)
return createErrorResponse(
error instanceof Error ? error : new Error('Internal server error'),
500
)
}
}

View File

@@ -0,0 +1,70 @@
import { type NextRequest, NextResponse } from 'next/server'
import { createLogger } from '@/lib/logs/console/logger'
import { generateExecutionFileDownloadUrl } from '@/lib/workflows/execution-file-storage'
import { getExecutionFiles } from '@/lib/workflows/execution-files-server'
import type { UserFile } from '@/executor/types'
const logger = createLogger('ExecutionFileDownloadAPI')
/**
* Generate a short-lived presigned URL for secure execution file download
* GET /api/files/execution/[executionId]/[fileId]
*/
export async function GET(
request: NextRequest,
{ params }: { params: Promise<{ executionId: string; fileId: string }> }
) {
try {
const { executionId, fileId } = await params
if (!executionId || !fileId) {
return NextResponse.json({ error: 'Execution ID and File ID are required' }, { status: 400 })
}
logger.info(`Generating download URL for file ${fileId} in execution ${executionId}`)
// Get files for this execution
const executionFiles = await getExecutionFiles(executionId)
if (executionFiles.length === 0) {
return NextResponse.json({ error: 'No files found for this execution' }, { status: 404 })
}
// Find the specific file
const file = executionFiles.find((f) => f.id === fileId)
if (!file) {
return NextResponse.json({ error: 'File not found in this execution' }, { status: 404 })
}
// Check if file is expired
if (new Date(file.expiresAt) < new Date()) {
return NextResponse.json({ error: 'File has expired' }, { status: 410 })
}
// Since ExecutionFileMetadata is now just UserFile, no conversion needed
const userFile: UserFile = file
// Generate a new short-lived presigned URL (5 minutes)
const downloadUrl = await generateExecutionFileDownloadUrl(userFile)
logger.info(`Generated download URL for file ${file.name} (execution: ${executionId})`)
const response = NextResponse.json({
downloadUrl,
fileName: file.name,
fileSize: file.size,
fileType: file.type,
expiresIn: 300, // 5 minutes
})
// Ensure no caching of download URLs
response.headers.set('Cache-Control', 'no-cache, no-store, must-revalidate')
response.headers.set('Pragma', 'no-cache')
response.headers.set('Expires', '0')
return response
} catch (error) {
logger.error('Error generating execution file download URL:', error)
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -7,7 +7,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { isSupportedFileType, parseFile } from '@/lib/file-parsers'
import { createLogger } from '@/lib/logs/console/logger'
import { downloadFile, isUsingCloudStorage } from '@/lib/uploads'
import { UPLOAD_DIR } from '@/lib/uploads/setup'
import { UPLOAD_DIR_SERVER } from '@/lib/uploads/setup.server'
import '@/lib/uploads/setup.server'
export const dynamic = 'force-dynamic'
@@ -70,7 +70,7 @@ export async function POST(request: NextRequest) {
const requestData = await request.json()
const { filePath, fileType } = requestData
if (!filePath) {
if (!filePath || (typeof filePath === 'string' && filePath.trim() === '')) {
return NextResponse.json({ success: false, error: 'No file path provided' }, { status: 400 })
}
@@ -80,6 +80,16 @@ export async function POST(request: NextRequest) {
if (Array.isArray(filePath)) {
const results = []
for (const path of filePath) {
// Skip empty or invalid paths
if (!path || (typeof path === 'string' && path.trim() === '')) {
results.push({
success: false,
error: 'Empty file path in array',
filePath: path || '',
})
continue
}
const result = await parseFileSingle(path, fileType)
// Add processing time to metadata
if (result.metadata) {
@@ -154,6 +164,15 @@ export async function POST(request: NextRequest) {
async function parseFileSingle(filePath: string, fileType?: string): Promise<ParseResult> {
logger.info('Parsing file:', filePath)
// Validate that filePath is not empty
if (!filePath || filePath.trim() === '') {
return {
success: false,
error: 'Empty file path provided',
filePath: filePath || '',
}
}
// Validate path for security before any processing
const pathValidation = validateFilePath(filePath)
if (!pathValidation.isValid) {
@@ -337,7 +356,7 @@ async function handleLocalFile(filePath: string, fileType?: string): Promise<Par
try {
// Extract filename from path
const filename = filePath.split('/').pop() || filePath
const fullPath = path.join(UPLOAD_DIR, filename)
const fullPath = path.join(UPLOAD_DIR_SERVER, filename)
logger.info('Processing local file:', fullPath)

View File

@@ -4,8 +4,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
import { getStorageProvider, isUsingCloudStorage } from '@/lib/uploads'
import { getBlobServiceClient } from '@/lib/uploads/blob/blob-client'
import { getS3Client, sanitizeFilenameForMetadata } from '@/lib/uploads/s3/s3-client'
// Dynamic imports for storage clients to avoid client-side bundling
import {
BLOB_CHAT_CONFIG,
BLOB_CONFIG,
@@ -169,6 +168,7 @@ async function handleS3PresignedUrl(
const uniqueKey = `${prefix}${uuidv4()}-${safeFileName}`
const { sanitizeFilenameForMetadata } = await import('@/lib/uploads/s3/s3-client')
const sanitizedOriginalName = sanitizeFilenameForMetadata(fileName)
const metadata: Record<string, string> = {
@@ -194,6 +194,7 @@ async function handleS3PresignedUrl(
let presignedUrl: string
try {
const { getS3Client } = await import('@/lib/uploads/s3/s3-client')
presignedUrl = await getSignedUrl(getS3Client(), command, { expiresIn: 3600 })
} catch (s3Error) {
logger.error('Failed to generate S3 presigned URL:', s3Error)
@@ -272,6 +273,7 @@ async function handleBlobPresignedUrl(
const uniqueKey = `${prefix}${uuidv4()}-${safeFileName}`
const { getBlobServiceClient } = await import('@/lib/uploads/blob/blob-client')
const blobServiceClient = getBlobServiceClient()
const containerClient = blobServiceClient.getContainerClient(config.containerName)
const blockBlobClient = containerClient.getBlockBlobClient(uniqueKey)

View File

@@ -26,7 +26,9 @@ describe('File Upload API Route', () => {
beforeEach(() => {
vi.resetModules()
vi.doMock('@/lib/uploads/setup.server', () => ({}))
vi.doMock('@/lib/uploads/setup.server', () => ({
UPLOAD_DIR_SERVER: '/tmp/test-uploads',
}))
})
afterEach(() => {
@@ -52,6 +54,12 @@ describe('File Upload API Route', () => {
const response = await POST(req)
const data = await response.json()
// Log error details if test fails
if (response.status !== 200) {
console.error('Upload failed with status:', response.status)
console.error('Error response:', data)
}
expect(response.status).toBe(200)
expect(data).toHaveProperty('path')
expect(data.path).toMatch(/\/api\/files\/serve\/.*\.txt$/)
@@ -59,8 +67,9 @@ describe('File Upload API Route', () => {
expect(data).toHaveProperty('size')
expect(data).toHaveProperty('type', 'text/plain')
const fs = await import('fs/promises')
expect(fs.writeFile).toHaveBeenCalled()
// Verify the upload function was called (we're mocking at the uploadFile level)
const { uploadFile } = await import('@/lib/uploads')
expect(uploadFile).toHaveBeenCalled()
})
it('should upload a file to S3 when in S3 mode', async () => {

View File

@@ -1,10 +1,6 @@
import { writeFile } from 'fs/promises'
import { join } from 'path'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
import { isUsingCloudStorage, uploadFile } from '@/lib/uploads'
import { UPLOAD_DIR } from '@/lib/uploads/setup'
import { getPresignedUrl, isUsingCloudStorage, uploadFile } from '@/lib/uploads'
import '@/lib/uploads/setup.server'
import {
createErrorResponse,
@@ -27,10 +23,21 @@ export async function POST(request: NextRequest) {
throw new InvalidRequestError('No files provided')
}
// Get optional scoping parameters for execution-scoped storage
const workflowId = formData.get('workflowId') as string | null
const executionId = formData.get('executionId') as string | null
const workspaceId = formData.get('workspaceId') as string | null
// Log storage mode
const usingCloudStorage = isUsingCloudStorage()
logger.info(`Using storage mode: ${usingCloudStorage ? 'Cloud' : 'Local'} for file upload`)
if (workflowId && executionId) {
logger.info(
`Uploading files for execution-scoped storage: workflow=${workflowId}, execution=${executionId}`
)
}
const uploadResults = []
// Process each file
@@ -39,33 +46,60 @@ export async function POST(request: NextRequest) {
const bytes = await file.arrayBuffer()
const buffer = Buffer.from(bytes)
if (usingCloudStorage) {
// Upload to cloud storage (S3 or Azure Blob)
try {
logger.info(`Uploading file to cloud storage: ${originalName}`)
const result = await uploadFile(buffer, originalName, file.type, file.size)
logger.info(`Successfully uploaded to cloud storage: ${result.key}`)
uploadResults.push(result)
} catch (error) {
logger.error('Error uploading to cloud storage:', error)
throw error
// For execution-scoped files, use the dedicated execution file storage
if (workflowId && executionId) {
// Use the dedicated execution file storage system
const { uploadExecutionFile } = await import('@/lib/workflows/execution-file-storage')
const userFile = await uploadExecutionFile(
{
workspaceId: workspaceId || '',
workflowId,
executionId,
},
buffer,
originalName,
file.type
)
uploadResults.push(userFile)
continue
}
// Upload to cloud or local storage using the standard uploadFile function
try {
logger.info(`Uploading file: ${originalName}`)
const result = await uploadFile(buffer, originalName, file.type, file.size)
// Generate a presigned URL for cloud storage with appropriate expiry
// Regular files get 24 hours (execution files are handled above)
let presignedUrl: string | undefined
if (usingCloudStorage) {
try {
presignedUrl = await getPresignedUrl(result.key, 24 * 60 * 60) // 24 hours
} catch (error) {
logger.warn(`Failed to generate presigned URL for ${originalName}:`, error)
}
}
} else {
// Upload to local file system in development
const extension = originalName.split('.').pop() || ''
const uniqueFilename = `${uuidv4()}.${extension}`
const filePath = join(UPLOAD_DIR, uniqueFilename)
logger.info(`Uploading file to local storage: ${filePath}`)
await writeFile(filePath, buffer)
logger.info(`Successfully wrote file to: ${filePath}`)
// Create the serve path
const servePath = `/api/files/serve/${result.key}`
uploadResults.push({
path: `/api/files/serve/${uniqueFilename}`,
const uploadResult = {
name: originalName,
size: file.size,
type: file.type,
})
key: result.key,
path: servePath,
url: presignedUrl || servePath,
uploadedAt: new Date().toISOString(),
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000).toISOString(), // 24 hours
}
logger.info(`Successfully uploaded: ${result.key}`)
uploadResults.push(uploadResult)
} catch (error) {
logger.error(`Error uploading ${originalName}:`, error)
throw error
}
}

View File

@@ -5,7 +5,8 @@ import { verifyCronAuth } from '@/lib/auth/internal'
import { env } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import { getS3Client } from '@/lib/uploads/s3/s3-client'
import { deleteFile, isUsingCloudStorage } from '@/lib/uploads'
// Dynamic import for S3 client to avoid client-side bundling
import { db } from '@/db'
import { subscription, user, workflow, workflowExecutionLogs } from '@/db/schema'
@@ -69,6 +70,11 @@ export async function GET(request: NextRequest) {
deleted: 0,
deleteFailed: 0,
},
files: {
total: 0,
deleted: 0,
deleteFailed: 0,
},
snapshots: {
cleaned: 0,
cleanupFailed: 0,
@@ -106,6 +112,7 @@ export async function GET(request: NextRequest) {
totalInputCost: workflowExecutionLogs.totalInputCost,
totalOutputCost: workflowExecutionLogs.totalOutputCost,
totalTokens: workflowExecutionLogs.totalTokens,
files: workflowExecutionLogs.files,
metadata: workflowExecutionLogs.metadata,
createdAt: workflowExecutionLogs.createdAt,
})
@@ -132,6 +139,7 @@ export async function GET(request: NextRequest) {
})
try {
const { getS3Client } = await import('@/lib/uploads/s3/s3-client')
await getS3Client().send(
new PutObjectCommand({
Bucket: S3_CONFIG.bucket,
@@ -150,6 +158,23 @@ export async function GET(request: NextRequest) {
results.enhancedLogs.archived++
// Clean up associated files if using cloud storage
if (isUsingCloudStorage() && log.files && Array.isArray(log.files)) {
for (const file of log.files) {
if (file && typeof file === 'object' && file.key) {
results.files.total++
try {
await deleteFile(file.key)
results.files.deleted++
logger.info(`Deleted file: ${file.key}`)
} catch (fileError) {
results.files.deleteFailed++
logger.error(`Failed to delete file ${file.key}:`, { fileError })
}
}
}
}
try {
// Delete enhanced log
const deleteResult = await db
@@ -198,7 +223,7 @@ export async function GET(request: NextRequest) {
const reachedLimit = batchesProcessed >= MAX_BATCHES && hasMoreLogs
return NextResponse.json({
message: `Processed ${batchesProcessed} enhanced log batches (${results.enhancedLogs.total} logs) in ${timeElapsed.toFixed(2)}s${reachedLimit ? ' (batch limit reached)' : ''}`,
message: `Processed ${batchesProcessed} enhanced log batches (${results.enhancedLogs.total} logs, ${results.files.total} files) in ${timeElapsed.toFixed(2)}s${reachedLimit ? ' (batch limit reached)' : ''}`,
results,
complete: !hasMoreLogs,
batchLimitReached: reachedLimit,

View File

@@ -95,6 +95,7 @@ export async function GET(request: NextRequest) {
totalOutputCost: workflowExecutionLogs.totalOutputCost,
totalTokens: workflowExecutionLogs.totalTokens,
metadata: workflowExecutionLogs.metadata,
files: workflowExecutionLogs.files,
createdAt: workflowExecutionLogs.createdAt,
workflowName: workflow.name,
workflowDescription: workflow.description,
@@ -334,6 +335,7 @@ export async function GET(request: NextRequest) {
duration: log.totalDurationMs ? `${log.totalDurationMs}ms` : null,
trigger: log.trigger,
createdAt: log.startedAt.toISOString(),
files: log.files || undefined,
workflow: params.includeWorkflow ? workflow : undefined,
metadata: {
totalDuration: log.totalDurationMs,

View File

@@ -177,7 +177,7 @@ export async function POST(request: Request) {
throw new Error('Invalid JSON in request body')
}
const { toolId, params } = requestBody
const { toolId, params, executionContext } = requestBody
if (!toolId) {
logger.error(`[${requestId}] Missing toolId in request`)
@@ -214,8 +214,21 @@ export async function POST(request: Request) {
})
}
// Check if tool has file outputs - if so, don't skip post-processing
const hasFileOutputs =
tool.outputs &&
Object.values(tool.outputs).some(
(output) => output.type === 'file' || output.type === 'file[]'
)
// Execute tool
const result = await executeTool(toolId, params, true, true)
const result = await executeTool(
toolId,
params,
true, // skipProxy (we're already in the proxy)
!hasFileOutputs, // skipPostProcess (don't skip if tool has file outputs)
executionContext // pass execution context for file processing
)
if (!result.success) {
logger.warn(`[${requestId}] Tool execution failed for ${toolId}`, {

View File

@@ -394,13 +394,17 @@ export async function GET() {
variables: variables || {},
})
const executor = new Executor(
serializedWorkflow,
processedBlockStates,
decryptedEnvVars,
input,
workflowVariables
)
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: input,
workflowVariables,
contextExtensions: {
executionId,
workspaceId: workflowRecord.workspaceId || '',
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)

View File

@@ -327,11 +327,14 @@ describe('Workflow Execution API Route', () => {
expect(executeMock).toHaveBeenCalledWith('workflow-id')
expect(Executor).toHaveBeenCalledWith(
expect.anything(), // serializedWorkflow
expect.anything(), // processedBlockStates
expect.anything(), // decryptedEnvVars
requestBody, // processedInput (direct input, not wrapped)
expect.anything() // workflowVariables
expect.objectContaining({
workflow: expect.any(Object), // serializedWorkflow
currentBlockStates: expect.any(Object), // processedBlockStates
envVarValues: expect.any(Object), // decryptedEnvVars
workflowInput: requestBody, // processedInput (direct input, not wrapped)
workflowVariables: expect.any(Object),
contextExtensions: expect.any(Object), // Allow any context extensions object
})
)
})
@@ -363,11 +366,14 @@ describe('Workflow Execution API Route', () => {
const Executor = (await import('@/executor')).Executor
expect(Executor).toHaveBeenCalledWith(
expect.anything(), // serializedWorkflow
expect.anything(), // processedBlockStates
expect.anything(), // decryptedEnvVars
structuredInput, // processedInput (direct input, not wrapped)
expect.anything() // workflowVariables
expect.objectContaining({
workflow: expect.any(Object), // serializedWorkflow
currentBlockStates: expect.any(Object), // processedBlockStates
envVarValues: expect.any(Object), // decryptedEnvVars
workflowInput: structuredInput, // processedInput (direct input, not wrapped)
workflowVariables: expect.any(Object),
contextExtensions: expect.any(Object), // Allow any context extensions object
})
)
})
@@ -391,11 +397,14 @@ describe('Workflow Execution API Route', () => {
const Executor = (await import('@/executor')).Executor
expect(Executor).toHaveBeenCalledWith(
expect.anything(), // serializedWorkflow
expect.anything(), // processedBlockStates
expect.anything(), // decryptedEnvVars
expect.objectContaining({}), // processedInput with empty input
expect.anything() // workflowVariables
expect.objectContaining({
workflow: expect.any(Object), // serializedWorkflow
currentBlockStates: expect.any(Object), // processedBlockStates
envVarValues: expect.any(Object), // decryptedEnvVars
workflowInput: expect.objectContaining({}), // processedInput with empty input
workflowVariables: expect.any(Object),
contextExtensions: expect.any(Object), // Allow any context extensions object
})
)
})
@@ -585,8 +594,13 @@ describe('Workflow Execution API Route', () => {
expect(executorCalls.length).toBeGreaterThan(0)
const lastCall = executorCalls[executorCalls.length - 1]
expect(lastCall.length).toBeGreaterThanOrEqual(5)
expect(lastCall.length).toBeGreaterThanOrEqual(1)
expect(lastCall[4]).toEqual(workflowVariables)
// Check that workflowVariables are passed in the options object
expect(lastCall[0]).toEqual(
expect.objectContaining({
workflowVariables: workflowVariables,
})
)
})
})

View File

@@ -278,13 +278,17 @@ async function executeWorkflow(workflow: any, requestId: string, input?: any): P
true // Enable validation during execution
)
const executor = new Executor(
serializedWorkflow,
processedBlockStates,
decryptedEnvVars,
processedInput,
workflowVariables
)
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: processedInput,
workflowVariables,
contextExtensions: {
executionId,
workspaceId: workflow.workspaceId,
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)

View File

@@ -0,0 +1,87 @@
'use client'
import { useState } from 'react'
import { Download, Loader2 } from 'lucide-react'
import { Button } from '@/components/ui/button'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('FileDownload')
interface FileDownloadProps {
file: {
id?: string
name: string
size: number
type: string
key: string
url: string
uploadedAt: string
expiresAt: string
storageProvider?: 's3' | 'blob' | 'local'
bucketName?: string
}
isExecutionFile?: boolean // Flag to indicate this is an execution file
className?: string
}
export function FileDownload({ file, isExecutionFile = false, className }: FileDownloadProps) {
const [isDownloading, setIsDownloading] = useState(false)
const handleDownload = async () => {
if (isDownloading) return
setIsDownloading(true)
try {
logger.info(`Initiating download for file: ${file.name}`)
// Generate a fresh download URL
const response = await fetch('/api/files/download', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
key: file.key,
name: file.name,
storageProvider: file.storageProvider,
bucketName: file.bucketName,
isExecutionFile, // Add flag to indicate execution file
}),
})
if (!response.ok) {
const errorData = await response.json().catch(() => ({ error: response.statusText }))
throw new Error(errorData.error || `Failed to generate download URL: ${response.status}`)
}
const { downloadUrl, fileName } = await response.json()
// Open the download URL in a new tab
window.open(downloadUrl, '_blank')
logger.info(`Download initiated for file: ${fileName}`)
} catch (error) {
logger.error(`Failed to download file ${file.name}:`, error)
} finally {
setIsDownloading(false)
}
}
return (
<Button
variant='ghost'
size='sm'
className={`h-7 px-2 text-xs ${className}`}
onClick={handleDownload}
disabled={isDownloading}
>
{isDownloading ? (
<Loader2 className='h-3 w-3 animate-spin' />
) : (
<Download className='h-3 w-3' />
)}
{isDownloading ? 'Downloading...' : 'Download'}
</Button>
)
}

View File

@@ -9,6 +9,7 @@ import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/comp
import { BASE_EXECUTION_CHARGE } from '@/lib/billing/constants'
import { redactApiKeys } from '@/lib/utils'
import { FrozenCanvasModal } from '@/app/workspace/[workspaceId]/logs/components/frozen-canvas/frozen-canvas-modal'
import { FileDownload } from '@/app/workspace/[workspaceId]/logs/components/sidebar/components/file-download'
import LogMarkdownRenderer from '@/app/workspace/[workspaceId]/logs/components/sidebar/components/markdown-renderer'
import { ToolCallsDisplay } from '@/app/workspace/[workspaceId]/logs/components/tool-calls/tool-calls-display'
import { TraceSpansDisplay } from '@/app/workspace/[workspaceId]/logs/components/trace-spans/trace-spans-display'
@@ -489,6 +490,36 @@ export function Sidebar({
</div>
)}
{/* Files */}
{log.files && log.files.length > 0 && (
<div>
<h3 className='mb-1 font-medium text-muted-foreground text-xs'>
Files ({log.files.length})
</h3>
<div className='space-y-2'>
{log.files.map((file, index) => (
<div
key={file.id || index}
className='flex items-center justify-between rounded-md border bg-muted/30 p-2'
>
<div className='min-w-0 flex-1'>
<div className='truncate font-medium text-sm' title={file.name}>
{file.name}
</div>
<div className='text-muted-foreground text-xs'>
{file.size ? `${Math.round(file.size / 1024)}KB` : 'Unknown size'}
{file.type && `${file.type.split('/')[0]}`}
</div>
</div>
<div className='ml-2 flex items-center gap-1'>
<FileDownload file={file} isExecutionFile={true} />
</div>
</div>
))}
</div>
</div>
)}
{/* Frozen Canvas Button - only show for workflow execution logs with execution ID */}
{isWorkflowExecutionLog && log.executionId && (
<div>

View File

@@ -19,9 +19,18 @@ import { useExecutionStore } from '@/stores/execution/store'
import { useChatStore } from '@/stores/panel/chat/store'
import { useConsoleStore } from '@/stores/panel/console/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { ChatFileUpload } from './components/chat-file-upload'
const logger = createLogger('ChatPanel')
interface ChatFile {
id: string
name: string
size: number
type: string
file: File
}
interface ChatProps {
panelWidth: number
chatMessage: string
@@ -51,6 +60,11 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
const [promptHistory, setPromptHistory] = useState<string[]>([])
const [historyIndex, setHistoryIndex] = useState(-1)
// File upload state
const [chatFiles, setChatFiles] = useState<ChatFile[]>([])
const [isUploadingFiles, setIsUploadingFiles] = useState(false)
const [dragCounter, setDragCounter] = useState(0)
const isDragOver = dragCounter > 0
// Scroll state
const [isNearBottom, setIsNearBottom] = useState(true)
const [showScrollButton, setShowScrollButton] = useState(false)
@@ -211,13 +225,22 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
// Handle send message
const handleSendMessage = useCallback(async () => {
if (!chatMessage.trim() || !activeWorkflowId || isExecuting) return
if (
(!chatMessage.trim() && chatFiles.length === 0) ||
!activeWorkflowId ||
isExecuting ||
isUploadingFiles
)
return
// Store the message being sent for reference
const sentMessage = chatMessage.trim()
// Add to prompt history if it's not already the most recent
if (promptHistory.length === 0 || promptHistory[promptHistory.length - 1] !== sentMessage) {
if (
sentMessage &&
(promptHistory.length === 0 || promptHistory[promptHistory.length - 1] !== sentMessage)
) {
setPromptHistory((prev) => [...prev, sentMessage])
}
@@ -232,23 +255,46 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
// Get the conversationId for this workflow before adding the message
const conversationId = getConversationId(activeWorkflowId)
let result: any = null
// Add user message
addMessage({
content: sentMessage,
workflowId: activeWorkflowId,
type: 'user',
})
try {
// Add user message
addMessage({
content:
sentMessage || (chatFiles.length > 0 ? `Uploaded ${chatFiles.length} file(s)` : ''),
workflowId: activeWorkflowId,
type: 'user',
})
// Clear input and refocus immediately
setChatMessage('')
focusInput(10)
// Prepare workflow input
const workflowInput: any = {
input: sentMessage,
conversationId: conversationId,
}
// Execute the workflow to generate a response, passing the chat message and conversationId as input
const result = await handleRunWorkflow({
input: sentMessage,
conversationId: conversationId,
})
// Add files if any (pass the File objects directly)
if (chatFiles.length > 0) {
workflowInput.files = chatFiles.map((chatFile) => ({
name: chatFile.name,
size: chatFile.size,
type: chatFile.type,
file: chatFile.file, // Pass the actual File object
}))
}
// Clear input and files, refocus immediately
setChatMessage('')
setChatFiles([])
focusInput(10)
// Execute the workflow to generate a response
result = await handleRunWorkflow(workflowInput)
} catch (error) {
logger.error('Error in handleSendMessage:', error)
setIsUploadingFiles(false)
// You might want to show an error message to the user here
return
}
// Check if we got a streaming response
if (result && 'stream' in result && result.stream instanceof ReadableStream) {
@@ -541,7 +587,57 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
</div>
{/* Input section - Fixed height */}
<div className='-mt-[1px] relative flex-nonept-3 pb-4'>
<div
className='-mt-[1px] relative flex-none pt-3 pb-4'
onDragEnter={(e) => {
e.preventDefault()
e.stopPropagation()
if (!(!activeWorkflowId || isExecuting || isUploadingFiles)) {
setDragCounter((prev) => prev + 1)
}
}}
onDragOver={(e) => {
e.preventDefault()
e.stopPropagation()
if (!(!activeWorkflowId || isExecuting || isUploadingFiles)) {
e.dataTransfer.dropEffect = 'copy'
}
}}
onDragLeave={(e) => {
e.preventDefault()
e.stopPropagation()
setDragCounter((prev) => Math.max(0, prev - 1))
}}
onDrop={(e) => {
e.preventDefault()
e.stopPropagation()
setDragCounter(0)
if (!(!activeWorkflowId || isExecuting || isUploadingFiles)) {
const droppedFiles = Array.from(e.dataTransfer.files)
if (droppedFiles.length > 0) {
const newFiles = droppedFiles.slice(0, 5 - chatFiles.length).map((file) => ({
id: crypto.randomUUID(),
name: file.name,
size: file.size,
type: file.type,
file,
}))
setChatFiles([...chatFiles, ...newFiles])
}
}
}}
>
{/* File upload section */}
<div className='mb-2'>
<ChatFileUpload
files={chatFiles}
onFilesChange={setChatFiles}
maxFiles={5}
maxSize={10}
disabled={!activeWorkflowId || isExecuting || isUploadingFiles}
/>
</div>
<div className='flex gap-2'>
<Input
ref={inputRef}
@@ -551,14 +647,23 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
setHistoryIndex(-1) // Reset history index when typing
}}
onKeyDown={handleKeyPress}
placeholder='Type a message...'
className='h-9 flex-1 rounded-lg border-[#E5E5E5] bg-[#FFFFFF] text-muted-foreground shadow-xs focus-visible:ring-0 focus-visible:ring-offset-0 dark:border-[#414141] dark:bg-[#202020]'
disabled={!activeWorkflowId || isExecuting}
placeholder={isDragOver ? 'Drop files here...' : 'Type a message...'}
className={`h-9 flex-1 rounded-lg border-[#E5E5E5] bg-[#FFFFFF] text-muted-foreground shadow-xs focus-visible:ring-0 focus-visible:ring-offset-0 dark:border-[#414141] dark:bg-[#202020] ${
isDragOver
? 'border-[#802FFF] bg-purple-50/50 dark:border-[#802FFF] dark:bg-purple-950/20'
: ''
}`}
disabled={!activeWorkflowId || isExecuting || isUploadingFiles}
/>
<Button
onClick={handleSendMessage}
size='icon'
disabled={!chatMessage.trim() || !activeWorkflowId || isExecuting}
disabled={
(!chatMessage.trim() && chatFiles.length === 0) ||
!activeWorkflowId ||
isExecuting ||
isUploadingFiles
}
className='h-9 w-9 rounded-lg bg-[#802FFF] text-white shadow-[0_0_0_0_#802FFF] transition-all duration-200 hover:bg-[#7028E6] hover:shadow-[0_0_0_4px_rgba(127,47,255,0.15)]'
>
<ArrowUp className='h-4 w-4' />

View File

@@ -0,0 +1,225 @@
'use client'
import { useRef, useState } from 'react'
import { File, FileText, Image, Paperclip, X } from 'lucide-react'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('ChatFileUpload')
interface ChatFile {
id: string
name: string
size: number
type: string
file: File
}
interface ChatFileUploadProps {
files: ChatFile[]
onFilesChange: (files: ChatFile[]) => void
maxFiles?: number
maxSize?: number // in MB
acceptedTypes?: string[]
disabled?: boolean
}
export function ChatFileUpload({
files,
onFilesChange,
maxFiles = 5,
maxSize = 10,
acceptedTypes = ['*'],
disabled = false,
}: ChatFileUploadProps) {
const [isDragOver, setIsDragOver] = useState(false)
const fileInputRef = useRef<HTMLInputElement>(null)
const handleFileSelect = (selectedFiles: FileList | null) => {
if (!selectedFiles || disabled) return
const newFiles: ChatFile[] = []
const errors: string[] = []
for (let i = 0; i < selectedFiles.length; i++) {
const file = selectedFiles[i]
// Check file count limit
if (files.length + newFiles.length >= maxFiles) {
errors.push(`Maximum ${maxFiles} files allowed`)
break
}
// Check file size
if (file.size > maxSize * 1024 * 1024) {
errors.push(`${file.name} is too large (max ${maxSize}MB)`)
continue
}
// Check file type if specified
if (acceptedTypes.length > 0 && !acceptedTypes.includes('*')) {
const isAccepted = acceptedTypes.some((type) => {
if (type.endsWith('/*')) {
return file.type.startsWith(type.slice(0, -1))
}
return file.type === type
})
if (!isAccepted) {
errors.push(`${file.name} type not supported`)
continue
}
}
// Check for duplicates
const isDuplicate = files.some(
(existingFile) => existingFile.name === file.name && existingFile.size === file.size
)
if (isDuplicate) {
errors.push(`${file.name} already added`)
continue
}
newFiles.push({
id: crypto.randomUUID(),
name: file.name,
size: file.size,
type: file.type,
file,
})
}
if (errors.length > 0) {
logger.warn('File upload errors:', errors)
// You could show these errors in a toast or alert
}
if (newFiles.length > 0) {
onFilesChange([...files, ...newFiles])
}
}
const handleRemoveFile = (fileId: string) => {
onFilesChange(files.filter((f) => f.id !== fileId))
}
const handleDragOver = (e: React.DragEvent) => {
e.preventDefault()
e.stopPropagation()
if (!disabled) {
setIsDragOver(true)
e.dataTransfer.dropEffect = 'copy'
}
}
const handleDragEnter = (e: React.DragEvent) => {
e.preventDefault()
e.stopPropagation()
if (!disabled) {
setIsDragOver(true)
}
}
const handleDragLeave = (e: React.DragEvent) => {
e.preventDefault()
e.stopPropagation()
setIsDragOver(false)
}
const handleDrop = (e: React.DragEvent) => {
e.preventDefault()
e.stopPropagation()
setIsDragOver(false)
if (!disabled) {
handleFileSelect(e.dataTransfer.files)
}
}
const getFileIcon = (type: string) => {
if (type.startsWith('image/')) return <Image className='h-4 w-4' />
if (type.includes('text') || type.includes('json')) return <FileText className='h-4 w-4' />
return <File className='h-4 w-4' />
}
const formatFileSize = (bytes: number) => {
if (bytes === 0) return '0 B'
const k = 1024
const sizes = ['B', 'KB', 'MB', 'GB']
const i = Math.floor(Math.log(bytes) / Math.log(k))
return `${Number.parseFloat((bytes / k ** i).toFixed(1))} ${sizes[i]}`
}
return (
<div className='space-y-2'>
{/* File Upload Button */}
<div className='flex items-center gap-2'>
<button
type='button'
onClick={() => fileInputRef.current?.click()}
disabled={disabled || files.length >= maxFiles}
className='flex items-center gap-1 rounded-md px-2 py-1 text-gray-600 text-sm transition-colors hover:bg-gray-100 hover:text-gray-800 disabled:cursor-not-allowed disabled:opacity-50'
title={files.length >= maxFiles ? `Maximum ${maxFiles} files` : 'Attach files'}
>
<Paperclip className='h-4 w-4' />
<span className='hidden sm:inline'>Attach</span>
</button>
<input
ref={fileInputRef}
type='file'
multiple
onChange={(e) => handleFileSelect(e.target.files)}
className='hidden'
accept={acceptedTypes.join(',')}
disabled={disabled}
/>
{files.length > 0 && (
<span className='text-gray-500 text-xs'>
{files.length}/{maxFiles} files
</span>
)}
</div>
{/* File List */}
{files.length > 0 && (
<div className='space-y-1'>
{files.map((file) => (
<div
key={file.id}
className='flex items-center gap-2 rounded-md bg-gray-50 px-2 py-1 text-sm'
>
{getFileIcon(file.type)}
<span className='flex-1 truncate' title={file.name}>
{file.name}
</span>
<span className='text-gray-500 text-xs'>{formatFileSize(file.size)}</span>
<button
type='button'
onClick={() => handleRemoveFile(file.id)}
className='p-0.5 text-gray-400 transition-colors hover:text-red-500'
title='Remove file'
>
<X className='h-3 w-3' />
</button>
</div>
))}
</div>
)}
{/* Drag and Drop Area (when dragging) */}
{isDragOver && (
<div
className='fixed inset-0 z-50 flex items-center justify-center border-2 border-blue-500 border-dashed bg-blue-500/10'
onDragOver={handleDragOver}
onDragLeave={handleDragLeave}
onDrop={handleDrop}
>
<div className='rounded-lg bg-white p-4 shadow-lg'>
<p className='font-medium text-blue-600'>Drop files here to attach</p>
</div>
</div>
)}
</div>
)
}

View File

@@ -323,12 +323,18 @@ const UserInput = forwardRef<UserInputRef, UserInputProps>(
}
const handleFileSelect = () => {
if (disabled || isLoading) {
return
}
fileInputRef.current?.click()
}
const handleFileChange = async (e: React.ChangeEvent<HTMLInputElement>) => {
const files = e.target.files
if (!files || files.length === 0) return
if (!files || files.length === 0) {
return
}
await processFiles(files)
@@ -554,6 +560,7 @@ const UserInput = forwardRef<UserInputRef, UserInputProps>(
className='hidden'
accept='.pdf,.doc,.docx,.txt,.md,.png,.jpg,.jpeg,.gif,.svg'
multiple
disabled={disabled || isLoading}
/>
</div>
</div>

View File

@@ -196,7 +196,12 @@ export function FileUpload({
}
// Use the file info returned from the presigned URL endpoint
uploadedFiles.push(presignedData.fileInfo)
uploadedFiles.push({
name: presignedData.fileInfo.name,
path: presignedData.fileInfo.path,
size: presignedData.fileInfo.size,
type: presignedData.fileInfo.type,
})
} else {
// Fallback to traditional upload through API route
useDirectUpload = false
@@ -224,7 +229,7 @@ export function FileUpload({
uploadedFiles.push({
name: file.name,
path: data.path,
path: data.url || data.path, // Use url or path from upload response
size: file.size,
type: file.type,
})
@@ -276,12 +281,12 @@ export function FileUpload({
if (multiple) {
// For multiple files: Append to existing files if any
const existingFiles = Array.isArray(value) ? value : value ? [value] : []
// Create a map to identify duplicates by path
// Create a map to identify duplicates by url
const uniqueFiles = new Map()
// Add existing files to the map
existingFiles.forEach((file) => {
uniqueFiles.set(file.path, file)
uniqueFiles.set(file.url || file.path, file) // Use url, fallback to path for backward compatibility
})
// Add new files to the map (will overwrite if same path)
@@ -327,7 +332,7 @@ export function FileUpload({
}
// Mark this file as being deleted
setDeletingFiles((prev) => ({ ...prev, [file.path]: true }))
setDeletingFiles((prev) => ({ ...prev, [file.path || '']: true }))
try {
// Call API to delete the file from server
@@ -366,7 +371,7 @@ export function FileUpload({
// Remove file from the deleting state
setDeletingFiles((prev) => {
const updated = { ...prev }
delete updated[file.path]
delete updated[file.path || '']
return updated
})
}
@@ -382,12 +387,11 @@ export function FileUpload({
if (!value) return
const filesToDelete = Array.isArray(value) ? value : [value]
const _fileCount = filesToDelete.length
// Mark all files as deleting
const deletingStatus: Record<string, boolean> = {}
filesToDelete.forEach((file) => {
deletingStatus[file.path] = true
deletingStatus[file.path || ''] = true
})
setDeletingFiles(deletingStatus)
@@ -448,11 +452,12 @@ export function FileUpload({
// Helper to render a single file item
const renderFileItem = (file: UploadedFile) => {
const isDeleting = deletingFiles[file.path]
const fileKey = file.path || ''
const isDeleting = deletingFiles[fileKey]
return (
<div
key={file.path}
key={fileKey}
className='flex items-center justify-between rounded border border-border bg-background px-3 py-2'
>
<div className='flex-1 truncate pr-2'>

View File

@@ -33,6 +33,7 @@ interface ExecutorOptions {
edges?: Array<{ source: string; target: string }>
onStream?: (streamingExecution: StreamingExecution) => Promise<void>
executionId?: string
workspaceId?: string
}
}
@@ -44,7 +45,7 @@ interface DebugValidationResult {
export function useWorkflowExecution() {
const currentWorkflow = useCurrentWorkflow()
const { activeWorkflowId } = useWorkflowRegistry()
const { activeWorkflowId, workflows } = useWorkflowRegistry()
const { toggleConsole } = useConsoleStore()
const { getAllVariables } = useEnvironmentStore()
const { isDebugModeEnabled } = useGeneralStore()
@@ -246,6 +247,14 @@ export function useWorkflowExecution() {
async (workflowInput?: any, enableDebug = false) => {
if (!activeWorkflowId) return
// Get workspaceId from workflow metadata
const workspaceId = workflows[activeWorkflowId]?.workspaceId
if (!workspaceId) {
logger.error('Cannot execute workflow without workspaceId')
return
}
// Reset execution result and set execution state
setExecutionResult(null)
setIsExecuting(true)
@@ -268,6 +277,78 @@ export function useWorkflowExecution() {
const streamedContent = new Map<string, string>()
const streamReadingPromises: Promise<void>[] = []
// Handle file uploads if present
const uploadedFiles: any[] = []
console.log('Checking for files to upload:', workflowInput.files)
if (workflowInput.files && Array.isArray(workflowInput.files)) {
try {
console.log('Processing files for upload:', workflowInput.files.length)
for (const fileData of workflowInput.files) {
console.log('Uploading file:', fileData.name, fileData.size)
console.log('File data:', fileData)
// Create FormData for upload
const formData = new FormData()
formData.append('file', fileData.file)
formData.append('workflowId', activeWorkflowId)
formData.append('executionId', executionId)
formData.append('workspaceId', workspaceId)
// Upload the file
const response = await fetch('/api/files/upload', {
method: 'POST',
body: formData,
})
if (response.ok) {
const uploadResult = await response.json()
console.log('Upload successful:', uploadResult)
// Convert upload result to clean UserFile format
const processUploadResult = (result: any) => ({
id:
result.id ||
`file_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`,
name: result.name,
url: result.url,
size: result.size,
type: result.type,
key: result.key,
uploadedAt: result.uploadedAt,
expiresAt: result.expiresAt,
})
// The API returns the file directly for single uploads
// or { files: [...] } for multiple uploads
if (uploadResult.files && Array.isArray(uploadResult.files)) {
uploadedFiles.push(...uploadResult.files.map(processUploadResult))
} else if (uploadResult.path || uploadResult.url) {
// Single file upload - the result IS the file object
uploadedFiles.push(processUploadResult(uploadResult))
} else {
console.error('Unexpected upload response format:', uploadResult)
}
} else {
const errorText = await response.text()
console.error(
`Failed to upload file ${fileData.name}:`,
response.status,
errorText
)
}
}
console.log('All files processed. Uploaded files:', uploadedFiles)
// Update workflow input with uploaded files
workflowInput.files = uploadedFiles
} catch (error) {
console.error('Error uploading files:', error)
// Continue execution even if file upload fails
workflowInput.files = []
}
}
const onStream = async (streamingExecution: StreamingExecution) => {
const promise = (async () => {
if (!streamingExecution.stream) return
@@ -558,6 +639,9 @@ export function useWorkflowExecution() {
selectedOutputIds = chatStore.getState().getSelectedWorkflowOutput(activeWorkflowId)
}
// Get workspaceId from workflow metadata
const workspaceId = activeWorkflowId ? workflows[activeWorkflowId]?.workspaceId : undefined
// Create executor options
const executorOptions: ExecutorOptions = {
workflow,
@@ -574,6 +658,7 @@ export function useWorkflowExecution() {
})),
onStream,
executionId,
workspaceId,
},
}

View File

@@ -38,7 +38,7 @@ export const GmailBlock: BlockConfig<GmailToolResponse> = {
requiredScopes: [
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify',
// 'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.labels',
],
placeholder: 'Select Gmail account',
@@ -105,6 +105,13 @@ export const GmailBlock: BlockConfig<GmailToolResponse> = {
layout: 'full',
condition: { field: 'operation', value: 'read_gmail' },
},
{
id: 'includeAttachments',
title: 'Include Attachments',
type: 'switch',
layout: 'full',
condition: { field: 'operation', value: 'read_gmail' },
},
{
id: 'messageId',
title: 'Message ID',
@@ -187,6 +194,7 @@ export const GmailBlock: BlockConfig<GmailToolResponse> = {
manualFolder: { type: 'string', description: 'Manual folder name' },
messageId: { type: 'string', description: 'Message identifier' },
unreadOnly: { type: 'boolean', description: 'Unread messages only' },
includeAttachments: { type: 'boolean', description: 'Include email attachments' },
// Search operation inputs
query: { type: 'string', description: 'Search query' },
maxResults: { type: 'number', description: 'Maximum results' },
@@ -194,5 +202,9 @@ export const GmailBlock: BlockConfig<GmailToolResponse> = {
outputs: {
content: { type: 'string', description: 'Response content' },
metadata: { type: 'json', description: 'Email metadata' },
attachments: {
type: 'json',
description: 'Email attachments (when includeAttachments is enabled)',
},
},
}

View File

@@ -212,8 +212,12 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
.getValue(activeSourceBlockId, 'startWorkflow')
if (startWorkflowValue === 'chat') {
// For chat mode, provide input and conversationId
blockTags = [`${normalizedBlockName}.input`, `${normalizedBlockName}.conversationId`]
// For chat mode, provide input, conversationId, and files
blockTags = [
`${normalizedBlockName}.input`,
`${normalizedBlockName}.conversationId`,
`${normalizedBlockName}.files`,
]
} else {
// Check for custom input format fields (for manual mode)
const inputFormatValue = useSubBlockStore
@@ -487,8 +491,12 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
.getValue(accessibleBlockId, 'startWorkflow')
if (startWorkflowValue === 'chat') {
// For chat mode, provide input and conversationId
blockTags = [`${normalizedBlockName}.input`, `${normalizedBlockName}.conversationId`]
// For chat mode, provide input, conversationId, and files
blockTags = [
`${normalizedBlockName}.input`,
`${normalizedBlockName}.conversationId`,
`${normalizedBlockName}.files`,
]
} else {
// Check for custom input format fields (for manual mode)
const inputFormatValue = useSubBlockStore

View File

@@ -0,0 +1 @@
ALTER TABLE "workflow_execution_logs" ADD COLUMN "files" jsonb;

File diff suppressed because it is too large Load Diff

View File

@@ -470,6 +470,13 @@
"when": 1754424644234,
"tag": "0067_safe_bushwacker",
"breakpoints": true
},
{
"idx": 68,
"version": "7",
"when": 1754446277077,
"tag": "0068_fine_hardball",
"breakpoints": true
}
]
}

View File

@@ -300,6 +300,7 @@ export const workflowExecutionLogs = pgTable(
totalTokens: integer('total_tokens'),
metadata: jsonb('metadata').notNull().default('{}'),
files: jsonb('files'), // File metadata for execution files
createdAt: timestamp('created_at').notNull().defaultNow(),
},
(table) => ({

View File

@@ -364,16 +364,23 @@ describe('AgentBlockHandler', () => {
expect.objectContaining({
code: 'return { result: "auto tool executed", input }',
input: 'test input',
})
}),
false, // skipProxy
false, // skipPostProcess
expect.any(Object) // execution context
)
await forceTool.executeFunction({ input: 'another test' })
expect(mockExecuteTool).toHaveBeenCalledWith(
expect(mockExecuteTool).toHaveBeenNthCalledWith(
2, // Check the 2nd call
'function_execute',
expect.objectContaining({
code: 'return { result: "force tool executed", input }',
input: 'another test',
})
}),
false, // skipProxy
false, // skipPostProcess
expect.any(Object) // execution context
)
const fetchCall = mockFetch.mock.calls[0]

View File

@@ -172,14 +172,20 @@ export class AgentBlockHandler implements BlockHandler {
// Merge user-provided parameters with LLM-generated parameters
const mergedParams = mergeToolParameters(userProvidedParams, callParams)
const result = await executeTool('function_execute', {
code: tool.code,
...mergedParams,
timeout: tool.timeout ?? DEFAULT_FUNCTION_TIMEOUT,
envVars: context.environmentVariables || {},
isCustomTool: true,
_context: { workflowId: context.workflowId },
})
const result = await executeTool(
'function_execute',
{
code: tool.code,
...mergedParams,
timeout: tool.timeout ?? DEFAULT_FUNCTION_TIMEOUT,
envVars: context.environmentVariables || {},
isCustomTool: true,
_context: { workflowId: context.workflowId },
},
false, // skipProxy
false, // skipPostProcess
context // execution context for file processing
)
if (!result.success) {
throw new Error(result.error || 'Function execution failed')

View File

@@ -100,11 +100,17 @@ describe('ApiBlockHandler', () => {
const result = await handler.execute(mockBlock, inputs, mockContext)
expect(mockGetTool).toHaveBeenCalledWith('http_request')
expect(mockExecuteTool).toHaveBeenCalledWith('http_request', {
...inputs,
body: { key: 'value' }, // Expect parsed body
_context: { workflowId: 'test-workflow-id' },
})
expect(mockExecuteTool).toHaveBeenCalledWith(
'http_request',
{
...inputs,
body: { key: 'value' }, // Expect parsed body
_context: { workflowId: 'test-workflow-id' },
},
false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
expect(result).toEqual(expectedOutput)
})
@@ -152,7 +158,10 @@ describe('ApiBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'http_request',
expect.objectContaining({ body: expectedParsedBody })
expect.objectContaining({ body: expectedParsedBody }),
false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
})
@@ -166,7 +175,10 @@ describe('ApiBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'http_request',
expect.objectContaining({ body: 'This is plain text' })
expect.objectContaining({ body: 'This is plain text' }),
false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
})
@@ -180,7 +192,10 @@ describe('ApiBlockHandler', () => {
expect(mockExecuteTool).toHaveBeenCalledWith(
'http_request',
expect.objectContaining({ body: undefined })
expect.objectContaining({ body: undefined }),
false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
})

View File

@@ -93,10 +93,16 @@ export class ApiBlockHandler implements BlockHandler {
JSON.stringify(processedInputs.body, null, 2)
)
const result = await executeTool(block.config.tool, {
...processedInputs,
_context: { workflowId: context.workflowId },
})
const result = await executeTool(
block.config.tool,
{
...processedInputs,
_context: { workflowId: context.workflowId },
},
false, // skipProxy
false, // skipPostProcess
context // execution context for file processing
)
if (!result.success) {
const errorDetails = []

View File

@@ -85,7 +85,13 @@ describe('FunctionBlockHandler', () => {
const result = await handler.execute(mockBlock, inputs, mockContext)
expect(mockExecuteTool).toHaveBeenCalledWith('function_execute', expectedToolParams)
expect(mockExecuteTool).toHaveBeenCalledWith(
'function_execute',
expectedToolParams,
false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
expect(result).toEqual(expectedOutput)
})
@@ -110,7 +116,13 @@ describe('FunctionBlockHandler', () => {
const result = await handler.execute(mockBlock, inputs, mockContext)
expect(mockExecuteTool).toHaveBeenCalledWith('function_execute', expectedToolParams)
expect(mockExecuteTool).toHaveBeenCalledWith(
'function_execute',
expectedToolParams,
false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
expect(result).toEqual(expectedOutput)
})
@@ -127,7 +139,13 @@ describe('FunctionBlockHandler', () => {
await handler.execute(mockBlock, inputs, mockContext)
expect(mockExecuteTool).toHaveBeenCalledWith('function_execute', expectedToolParams)
expect(mockExecuteTool).toHaveBeenCalledWith(
'function_execute',
expectedToolParams,
false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
})
it('should handle execution errors from the tool', async () => {

View File

@@ -40,14 +40,20 @@ export class FunctionBlockHandler implements BlockHandler {
}
// Directly use the function_execute tool which calls the API route
const result = await executeTool('function_execute', {
code: codeContent,
timeout: inputs.timeout || 5000,
envVars: context.environmentVariables || {},
blockData: blockData, // Pass block data for variable resolution
blockNameMapping: blockNameMapping, // Pass block name to ID mapping
_context: { workflowId: context.workflowId },
})
const result = await executeTool(
'function_execute',
{
code: codeContent,
timeout: inputs.timeout || 5000,
envVars: context.environmentVariables || {},
blockData: blockData, // Pass block data for variable resolution
blockNameMapping: blockNameMapping, // Pass block name to ID mapping
_context: { workflowId: context.workflowId },
},
false, // skipProxy
false, // skipPostProcess
context // execution context for file processing
)
if (!result.success) {
throw new Error(result.error || 'Function execution failed')

View File

@@ -93,7 +93,13 @@ describe('GenericBlockHandler', () => {
const result = await handler.execute(mockBlock, inputs, mockContext)
expect(mockGetTool).toHaveBeenCalledWith('some_custom_tool')
expect(mockExecuteTool).toHaveBeenCalledWith('some_custom_tool', expectedToolParams)
expect(mockExecuteTool).toHaveBeenCalledWith(
'some_custom_tool',
expectedToolParams,
false, // skipProxy
false, // skipPostProcess
mockContext // execution context
)
expect(result).toEqual(expectedOutput)
})

View File

@@ -29,10 +29,16 @@ export class GenericBlockHandler implements BlockHandler {
}
try {
const result = await executeTool(block.config.tool, {
...inputs,
_context: { workflowId: context.workflowId },
})
const result = await executeTool(
block.config.tool,
{
...inputs,
_context: { workflowId: context.workflowId },
},
false, // skipProxy
false, // skipPostProcess
context // execution context for file processing
)
if (!result.success) {
const errorDetails = []

View File

@@ -974,6 +974,7 @@ describe('Executor', () => {
async () => {
// Create a workflow with two parallel agents
const workflow = {
version: '1.0',
blocks: [
{
id: 'starter',

View File

@@ -87,6 +87,7 @@ export class Executor {
edges?: Array<{ source: string; target: string }>
onStream?: (streamingExecution: StreamingExecution) => Promise<void>
executionId?: string
workspaceId?: string
}
},
private initialBlockStates: Record<string, BlockOutput> = {},
@@ -675,6 +676,8 @@ export class Executor {
): ExecutionContext {
const context: ExecutionContext = {
workflowId,
workspaceId: this.contextExtensions.workspaceId,
executionId: this.contextExtensions.executionId,
blockStates: new Map(),
blockLogs: [],
metadata: {
@@ -797,6 +800,11 @@ export class Executor {
...finalInput, // Add input fields directly at top level
}
// Add files if present (for all trigger types)
if (this.workflowInput?.files && Array.isArray(this.workflowInput.files)) {
blockOutput.files = this.workflowInput.files
}
logger.info(`[Executor] Starting block output:`, JSON.stringify(blockOutput, null, 2))
context.blockStates.set(initBlock.id, {
@@ -804,6 +812,10 @@ export class Executor {
executed: true,
executionTime: 0,
})
// Create a block log for the starter block if it has files
// This ensures files are captured in trace spans and execution logs
this.createStartedBlockWithFilesLog(initBlock, blockOutput, context)
} else {
// Handle structured input (like API calls or chat messages)
if (this.workflowInput && typeof this.workflowInput === 'object') {
@@ -812,17 +824,26 @@ export class Executor {
Object.hasOwn(this.workflowInput, 'input') &&
Object.hasOwn(this.workflowInput, 'conversationId')
) {
// Chat workflow: extract input and conversationId to root level
const starterOutput = {
// Chat workflow: extract input, conversationId, and files to root level
const starterOutput: any = {
input: this.workflowInput.input,
conversationId: this.workflowInput.conversationId,
}
// Add files if present
if (this.workflowInput.files && Array.isArray(this.workflowInput.files)) {
starterOutput.files = this.workflowInput.files
}
context.blockStates.set(initBlock.id, {
output: starterOutput,
executed: true,
executionTime: 0,
})
// Create a block log for the starter block if it has files
// This ensures files are captured in trace spans and execution logs
this.createStartedBlockWithFilesLog(initBlock, starterOutput, context)
} else {
// API workflow: spread the raw data directly (no wrapping)
const starterOutput = { ...this.workflowInput }
@@ -857,11 +878,16 @@ export class Executor {
Object.hasOwn(this.workflowInput, 'input') &&
Object.hasOwn(this.workflowInput, 'conversationId')
) {
// Chat workflow: extract input and conversationId to root level
// Chat workflow: extract input, conversationId, and files to root level
blockOutput = {
input: this.workflowInput.input,
conversationId: this.workflowInput.conversationId,
}
// Add files if present
if (this.workflowInput.files && Array.isArray(this.workflowInput.files)) {
blockOutput.files = this.workflowInput.files
}
} else {
// API workflow: spread the raw data directly (no wrapping)
blockOutput = { ...this.workflowInput }
@@ -883,6 +909,7 @@ export class Executor {
executed: true,
executionTime: 0,
})
this.createStartedBlockWithFilesLog(initBlock, blockOutput, context)
}
// Ensure the starting block is in the active execution path
context.activeExecutionPath.add(initBlock.id)
@@ -1806,4 +1833,29 @@ export class Executor {
// Fallback to string conversion
return String(error)
}
/**
* Creates a block log for the starter block if it contains files.
* This ensures files are captured in trace spans and execution logs.
*/
private createStartedBlockWithFilesLog(
initBlock: SerializedBlock,
blockOutput: any,
context: ExecutionContext
): void {
if (blockOutput.files && Array.isArray(blockOutput.files) && blockOutput.files.length > 0) {
const starterBlockLog: BlockLog = {
blockId: initBlock.id,
blockName: initBlock.metadata?.name || 'Start',
blockType: initBlock.metadata?.id || 'start',
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
success: true,
input: this.workflowInput,
output: blockOutput,
durationMs: 0,
}
context.blockLogs.push(starterBlockLog)
}
}
}

View File

@@ -8,6 +8,13 @@ import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
const logger = createLogger('InputResolver')
/**
* Helper function to resolve property access
*/
function resolvePropertyAccess(obj: any, property: string): any {
return obj[property]
}
/**
* Resolves input values for blocks by handling references and variable substitution.
*/
@@ -516,7 +523,32 @@ export class InputResolver {
throw new Error(`Invalid path "${part}" in "${path}" for starter block.`)
}
replacementValue = replacementValue[part]
// Handle array indexing syntax like "files[0]" or "items[1]"
const arrayMatch = part.match(/^([^[]+)\[(\d+)\]$/)
if (arrayMatch) {
const [, arrayName, indexStr] = arrayMatch
const index = Number.parseInt(indexStr, 10)
// First access the array property
const arrayValue = replacementValue[arrayName]
if (!Array.isArray(arrayValue)) {
throw new Error(
`Property "${arrayName}" is not an array in path "${path}" for starter block.`
)
}
// Then access the array element
if (index < 0 || index >= arrayValue.length) {
throw new Error(
`Array index ${index} is out of bounds for "${arrayName}" (length: ${arrayValue.length}) in path "${path}" for starter block.`
)
}
replacementValue = arrayValue[index]
} else {
// Regular property access with FileReference mapping
replacementValue = resolvePropertyAccess(replacementValue, part)
}
if (replacementValue === undefined) {
logger.warn(
@@ -699,7 +731,32 @@ export class InputResolver {
)
}
replacementValue = replacementValue[part]
// Handle array indexing syntax like "files[0]" or "items[1]"
const arrayMatch = part.match(/^([^[]+)\[(\d+)\]$/)
if (arrayMatch) {
const [, arrayName, indexStr] = arrayMatch
const index = Number.parseInt(indexStr, 10)
// First access the array property
const arrayValue = replacementValue[arrayName]
if (!Array.isArray(arrayValue)) {
throw new Error(
`Property "${arrayName}" is not an array in path "${path}" for block "${sourceBlock.metadata?.name || sourceBlock.id}".`
)
}
// Then access the array element
if (index < 0 || index >= arrayValue.length) {
throw new Error(
`Array index ${index} is out of bounds for "${arrayName}" (length: ${arrayValue.length}) in path "${path}" for block "${sourceBlock.metadata?.name || sourceBlock.id}".`
)
}
replacementValue = arrayValue[index]
} else {
// Regular property access with FileReference mapping
replacementValue = resolvePropertyAccess(replacementValue, part)
}
if (replacementValue === undefined) {
throw new Error(

View File

@@ -1,6 +1,20 @@
import type { BlockOutput } from '@/blocks/types'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
/**
* User-facing file object with simplified interface
*/
export interface UserFile {
id: string
name: string
url: string
size: number
type: string
key: string
uploadedAt: string
expiresAt: string
}
/**
* Standardized block output format that ensures compatibility with the execution engine.
*/
@@ -18,6 +32,8 @@ export interface NormalizedBlockOutput {
list: any[]
count: number
}
// File fields
files?: UserFile[] // Binary files/attachments from this block
// Path selection fields
selectedPath?: {
blockId: string
@@ -81,6 +97,8 @@ export interface BlockState {
*/
export interface ExecutionContext {
workflowId: string // Unique identifier for this workflow execution
workspaceId?: string // Workspace ID for file storage scoping
executionId?: string // Unique execution ID for file storage scoping
blockStates: Map<string, BlockState>
blockLogs: BlockLog[] // Chronological log of block executions
metadata: ExecutionMetadata // Timing metadata for the execution

View File

@@ -0,0 +1,200 @@
import { createLogger } from '@/lib/logs/console/logger'
import { uploadExecutionFile } from '@/lib/workflows/execution-file-storage'
import type { ExecutionContext, UserFile } from '@/executor/types'
import type { ToolConfig, ToolFileData } from '@/tools/types'
const logger = createLogger('FileToolProcessor')
/**
* Processes tool outputs and converts file-typed outputs to UserFile objects.
* This enables tools to return file data that gets automatically stored in the
* execution filesystem and made available as UserFile objects for workflow use.
*/
export class FileToolProcessor {
/**
* Process tool outputs and convert file-typed outputs to UserFile objects
*/
static async processToolOutputs(
toolOutput: any,
toolConfig: ToolConfig,
executionContext: ExecutionContext
): Promise<any> {
if (!toolConfig.outputs) {
return toolOutput
}
const processedOutput = { ...toolOutput }
// Process each output that's marked as file or file[]
for (const [outputKey, outputDef] of Object.entries(toolConfig.outputs)) {
if (!FileToolProcessor.isFileOutput(outputDef.type)) {
continue
}
const fileData = processedOutput[outputKey]
if (!fileData) {
logger.warn(`File-typed output '${outputKey}' is missing from tool result`)
continue
}
try {
processedOutput[outputKey] = await FileToolProcessor.processFileOutput(
fileData,
outputDef.type,
outputKey,
executionContext
)
} catch (error) {
logger.error(`Error processing file output '${outputKey}':`, error)
const errorMessage = error instanceof Error ? error.message : String(error)
throw new Error(`Failed to process file output '${outputKey}': ${errorMessage}`)
}
}
return processedOutput
}
/**
* Check if an output type is file-related
*/
private static isFileOutput(type: string): boolean {
return type === 'file' || type === 'file[]'
}
/**
* Process a single file output (either single file or array of files)
*/
private static async processFileOutput(
fileData: any,
outputType: string,
outputKey: string,
executionContext: ExecutionContext
): Promise<UserFile | UserFile[]> {
if (outputType === 'file[]') {
return FileToolProcessor.processFileArray(fileData, outputKey, executionContext)
}
return FileToolProcessor.processFileData(fileData, executionContext, outputKey)
}
/**
* Process an array of files
*/
private static async processFileArray(
fileData: any,
outputKey: string,
executionContext: ExecutionContext
): Promise<UserFile[]> {
if (!Array.isArray(fileData)) {
throw new Error(`Output '${outputKey}' is marked as file[] but is not an array`)
}
return Promise.all(
fileData.map((file, index) =>
FileToolProcessor.processFileData(file, executionContext, `${outputKey}[${index}]`)
)
)
}
/**
* Convert various file data formats to UserFile by storing in execution filesystem
*/
private static async processFileData(
fileData: ToolFileData,
context: ExecutionContext,
outputKey: string
): Promise<UserFile> {
logger.info(`Processing file data for output '${outputKey}': ${fileData.name}`)
try {
// Convert various formats to Buffer
let buffer: Buffer
if (Buffer.isBuffer(fileData.data)) {
buffer = fileData.data
logger.info(`Using Buffer data for ${fileData.name} (${buffer.length} bytes)`)
} else if (
fileData.data &&
typeof fileData.data === 'object' &&
'type' in fileData.data &&
'data' in fileData.data
) {
// Handle serialized Buffer objects (from JSON serialization)
const serializedBuffer = fileData.data as { type: string; data: number[] }
if (serializedBuffer.type === 'Buffer' && Array.isArray(serializedBuffer.data)) {
buffer = Buffer.from(serializedBuffer.data)
} else {
throw new Error(`Invalid serialized buffer format for ${fileData.name}`)
}
logger.info(
`Converted serialized Buffer to Buffer for ${fileData.name} (${buffer.length} bytes)`
)
} else if (typeof fileData.data === 'string' && fileData.data) {
// Assume base64 or base64url
let base64Data = fileData.data
// Convert base64url to base64 if needed (Gmail API format)
if (base64Data && (base64Data.includes('-') || base64Data.includes('_'))) {
base64Data = base64Data.replace(/-/g, '+').replace(/_/g, '/')
}
buffer = Buffer.from(base64Data, 'base64')
logger.info(
`Converted base64 string to Buffer for ${fileData.name} (${buffer.length} bytes)`
)
} else if (fileData.url) {
// Download from URL
logger.info(`Downloading file from URL: ${fileData.url}`)
const response = await fetch(fileData.url)
if (!response.ok) {
throw new Error(`Failed to download file from ${fileData.url}: ${response.statusText}`)
}
const arrayBuffer = await response.arrayBuffer()
buffer = Buffer.from(arrayBuffer)
logger.info(`Downloaded file from URL for ${fileData.name} (${buffer.length} bytes)`)
} else {
throw new Error(
`File data for '${fileData.name}' must have either 'data' (Buffer/base64) or 'url' property`
)
}
// Validate buffer
if (buffer.length === 0) {
throw new Error(`File '${fileData.name}' has zero bytes`)
}
// Store in execution filesystem
const userFile = await uploadExecutionFile(
{
workspaceId: context.workspaceId || '',
workflowId: context.workflowId,
executionId: context.executionId || '',
},
buffer,
fileData.name,
fileData.mimeType
)
logger.info(
`Successfully stored file '${fileData.name}' in execution filesystem with key: ${userFile.key}`
)
return userFile
} catch (error) {
logger.error(`Error processing file data for '${fileData.name}':`, error)
throw error
}
}
/**
* Check if a tool has any file-typed outputs
*/
static hasFileOutputs(toolConfig: ToolConfig): boolean {
if (!toolConfig.outputs) {
return false
}
return Object.values(toolConfig.outputs).some(
(output) => output.type === 'file' || output.type === 'file[]'
)
}
}

View File

@@ -95,6 +95,7 @@ export const env = createEnv({
S3_BUCKET_NAME: z.string().optional(), // S3 bucket for general file storage
S3_LOGS_BUCKET_NAME: z.string().optional(), // S3 bucket for storing logs
S3_KB_BUCKET_NAME: z.string().optional(), // S3 bucket for knowledge base files
S3_EXECUTION_FILES_BUCKET_NAME: z.string().optional(), // S3 bucket for workflow execution files
S3_CHAT_BUCKET_NAME: z.string().optional(), // S3 bucket for chat logos
S3_COPILOT_BUCKET_NAME: z.string().optional(), // S3 bucket for copilot files
@@ -104,6 +105,7 @@ export const env = createEnv({
AZURE_CONNECTION_STRING: z.string().optional(), // Azure storage connection string
AZURE_STORAGE_CONTAINER_NAME: z.string().optional(), // Azure container for general files
AZURE_STORAGE_KB_CONTAINER_NAME: z.string().optional(), // Azure container for knowledge base files
AZURE_STORAGE_EXECUTION_FILES_CONTAINER_NAME: z.string().optional(), // Azure container for workflow execution files
AZURE_STORAGE_CHAT_CONTAINER_NAME: z.string().optional(), // Azure container for chat logos
AZURE_STORAGE_COPILOT_CONTAINER_NAME: z.string().optional(), // Azure container for copilot files

View File

@@ -153,6 +153,9 @@ export class ExecutionLogger implements IExecutionLoggerService {
const level = hasErrors ? 'error' : 'info'
const message = hasErrors ? 'Workflow execution failed' : 'Workflow execution completed'
// Extract files from trace spans and final output
const executionFiles = this.extractFilesFromExecution(traceSpans, finalOutput)
const [updatedLog] = await db
.update(workflowExecutionLogs)
.set({
@@ -168,6 +171,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
totalInputCost: costSummary.totalInputCost.toString(),
totalOutputCost: costSummary.totalOutputCost.toString(),
totalTokens: costSummary.totalTokens,
files: executionFiles.length > 0 ? executionFiles : null,
metadata: {
traceSpans,
finalOutput,
@@ -414,6 +418,112 @@ export class ExecutionLogger implements IExecutionLoggerService {
return 'Unknown'
}
}
/**
* Extract file references from execution trace spans and final output
*/
private extractFilesFromExecution(traceSpans?: any[], finalOutput?: any): any[] {
const files: any[] = []
const seenFileIds = new Set<string>()
// Helper function to extract files from any object
const extractFilesFromObject = (obj: any, source: string) => {
if (!obj || typeof obj !== 'object') return
// Check if this object has files property
if (Array.isArray(obj.files)) {
for (const file of obj.files) {
if (file?.name && file.key && file.id) {
if (!seenFileIds.has(file.id)) {
seenFileIds.add(file.id)
files.push({
id: file.id,
name: file.name,
size: file.size,
type: file.type,
url: file.url,
key: file.key,
uploadedAt: file.uploadedAt,
expiresAt: file.expiresAt,
storageProvider: file.storageProvider,
bucketName: file.bucketName,
})
}
}
}
}
// Check if this object has attachments property (for Gmail and other tools)
if (Array.isArray(obj.attachments)) {
for (const file of obj.attachments) {
if (file?.name && file.key && file.id) {
if (!seenFileIds.has(file.id)) {
seenFileIds.add(file.id)
files.push({
id: file.id,
name: file.name,
size: file.size,
type: file.type,
url: file.url,
key: file.key,
uploadedAt: file.uploadedAt,
expiresAt: file.expiresAt,
storageProvider: file.storageProvider,
bucketName: file.bucketName,
})
}
}
}
}
// Check if this object itself is a file reference
if (obj.name && obj.key && typeof obj.size === 'number') {
if (!obj.id) {
logger.warn(`File object missing ID, skipping: ${obj.name}`)
return
}
if (!seenFileIds.has(obj.id)) {
seenFileIds.add(obj.id)
files.push({
id: obj.id,
name: obj.name,
size: obj.size,
type: obj.type,
url: obj.url,
key: obj.key,
uploadedAt: obj.uploadedAt,
expiresAt: obj.expiresAt,
storageProvider: obj.storageProvider,
bucketName: obj.bucketName,
})
}
}
// Recursively check nested objects and arrays
if (Array.isArray(obj)) {
obj.forEach((item, index) => extractFilesFromObject(item, `${source}[${index}]`))
} else if (typeof obj === 'object') {
Object.entries(obj).forEach(([key, value]) => {
extractFilesFromObject(value, `${source}.${key}`)
})
}
}
// Extract files from trace spans
if (traceSpans && Array.isArray(traceSpans)) {
traceSpans.forEach((span, index) => {
extractFilesFromObject(span, `trace_span_${index}`)
})
}
// Extract files from final output
if (finalOutput) {
extractFilesFromObject(finalOutput, 'final_output')
}
return files
}
}
export const executionLogger = new ExecutionLogger()

View File

@@ -95,6 +95,18 @@ export interface WorkflowExecutionLog {
totalInputCost: number
totalOutputCost: number
totalTokens: number
files?: Array<{
id: string
name: string
size: number
type: string
url: string
key: string
uploadedAt: string
expiresAt: string
storageProvider?: 's3' | 'blob' | 'local'
bucketName?: string
}>
metadata: {
environment: ExecutionEnvironment
trigger: ExecutionTrigger

View File

@@ -262,9 +262,47 @@ export async function getPresignedUrlWithConfig(
* @param key Blob name
* @returns File buffer
*/
export async function downloadFromBlob(key: string) {
const blobServiceClient = getBlobServiceClient()
const containerClient = blobServiceClient.getContainerClient(BLOB_CONFIG.containerName)
export async function downloadFromBlob(key: string): Promise<Buffer>
/**
* Download a file from Azure Blob Storage with custom configuration
* @param key Blob name
* @param customConfig Custom Blob configuration
* @returns File buffer
*/
export async function downloadFromBlob(key: string, customConfig: CustomBlobConfig): Promise<Buffer>
export async function downloadFromBlob(
key: string,
customConfig?: CustomBlobConfig
): Promise<Buffer> {
let blobServiceClient: BlobServiceClient
let containerName: string
if (customConfig) {
// Use custom configuration
if (customConfig.connectionString) {
blobServiceClient = BlobServiceClient.fromConnectionString(customConfig.connectionString)
} else if (customConfig.accountName && customConfig.accountKey) {
const credential = new StorageSharedKeyCredential(
customConfig.accountName,
customConfig.accountKey
)
blobServiceClient = new BlobServiceClient(
`https://${customConfig.accountName}.blob.core.windows.net`,
credential
)
} else {
throw new Error('Invalid custom blob configuration')
}
containerName = customConfig.containerName
} else {
// Use default configuration
blobServiceClient = getBlobServiceClient()
containerName = BLOB_CONFIG.containerName
}
const containerClient = blobServiceClient.getContainerClient(containerName)
const blockBlobClient = containerClient.getBlockBlobClient(key)
const downloadBlockBlobResponse = await blockBlobClient.download()
@@ -280,9 +318,43 @@ export async function downloadFromBlob(key: string) {
* Delete a file from Azure Blob Storage
* @param key Blob name
*/
export async function deleteFromBlob(key: string) {
const blobServiceClient = getBlobServiceClient()
const containerClient = blobServiceClient.getContainerClient(BLOB_CONFIG.containerName)
export async function deleteFromBlob(key: string): Promise<void>
/**
* Delete a file from Azure Blob Storage with custom configuration
* @param key Blob name
* @param customConfig Custom Blob configuration
*/
export async function deleteFromBlob(key: string, customConfig: CustomBlobConfig): Promise<void>
export async function deleteFromBlob(key: string, customConfig?: CustomBlobConfig): Promise<void> {
let blobServiceClient: BlobServiceClient
let containerName: string
if (customConfig) {
// Use custom configuration
if (customConfig.connectionString) {
blobServiceClient = BlobServiceClient.fromConnectionString(customConfig.connectionString)
} else if (customConfig.accountName && customConfig.accountKey) {
const credential = new StorageSharedKeyCredential(
customConfig.accountName,
customConfig.accountKey
)
blobServiceClient = new BlobServiceClient(
`https://${customConfig.accountName}.blob.core.windows.net`,
credential
)
} else {
throw new Error('Invalid custom blob configuration')
}
containerName = customConfig.containerName
} else {
// Use default configuration
blobServiceClient = getBlobServiceClient()
containerName = BLOB_CONFIG.containerName
}
const containerClient = blobServiceClient.getContainerClient(containerName)
const blockBlobClient = containerClient.getBlockBlobClient(key)
await blockBlobClient.delete()

View File

@@ -1,10 +1,10 @@
export * as BlobClient from '@/lib/uploads/blob/blob-client'
export * as S3Client from '@/lib/uploads/s3/s3-client'
// BlobClient and S3Client are server-only - import from specific files when needed
// export * as BlobClient from '@/lib/uploads/blob/blob-client'
// export * as S3Client from '@/lib/uploads/s3/s3-client'
export {
BLOB_CHAT_CONFIG,
BLOB_CONFIG,
BLOB_KB_CONFIG,
ensureUploadsDirectory,
S3_CHAT_CONFIG,
S3_CONFIG,
S3_KB_CONFIG,

View File

@@ -100,6 +100,7 @@ export async function uploadToS3(
* @param contentType MIME type of the file
* @param customConfig Custom S3 configuration (bucket and region)
* @param size File size in bytes (optional, will use buffer length if not provided)
* @param skipTimestampPrefix Skip adding timestamp prefix to filename (default: false)
* @returns Object with file information
*/
export async function uploadToS3(
@@ -107,7 +108,8 @@ export async function uploadToS3(
fileName: string,
contentType: string,
customConfig: CustomS3Config,
size?: number
size?: number,
skipTimestampPrefix?: boolean
): Promise<FileInfo>
export async function uploadToS3(
@@ -115,32 +117,29 @@ export async function uploadToS3(
fileName: string,
contentType: string,
configOrSize?: CustomS3Config | number,
size?: number
size?: number,
skipTimestampPrefix?: boolean
): Promise<FileInfo> {
// Handle overloaded parameters
let config: CustomS3Config
let fileSize: number
let shouldSkipTimestamp: boolean
if (typeof configOrSize === 'object') {
// Custom config provided
config = configOrSize
fileSize = size ?? file.length
shouldSkipTimestamp = skipTimestampPrefix ?? false
} else {
// Use default config
config = { bucket: S3_CONFIG.bucket, region: S3_CONFIG.region }
fileSize = configOrSize ?? file.length
shouldSkipTimestamp = size === undefined ? false : (skipTimestampPrefix ?? false)
}
// Create a unique filename with timestamp to prevent collisions
// Use a simple timestamp without directory structure
// Create filename - optionally skip timestamp prefix
const safeFileName = fileName.replace(/\s+/g, '-') // Replace spaces with hyphens
const uniqueKey = `${Date.now()}-${safeFileName}`
// Sanitize filename for S3 metadata (only allow ASCII printable characters)
const sanitizedOriginalName = fileName
.replace(/[^\x20-\x7E]/g, '') // Remove non-ASCII characters
.replace(/["\\]/g, '') // Remove quotes and backslashes
.trim()
const uniqueKey = shouldSkipTimestamp ? safeFileName : `${Date.now()}-${safeFileName}`
const s3Client = getS3Client()
@@ -211,9 +210,21 @@ export async function getPresignedUrlWithConfig(
* @param key S3 object key
* @returns File buffer
*/
export async function downloadFromS3(key: string) {
export async function downloadFromS3(key: string): Promise<Buffer>
/**
* Download a file from S3 with custom bucket configuration
* @param key S3 object key
* @param customConfig Custom S3 configuration
* @returns File buffer
*/
export async function downloadFromS3(key: string, customConfig: CustomS3Config): Promise<Buffer>
export async function downloadFromS3(key: string, customConfig?: CustomS3Config): Promise<Buffer> {
const config = customConfig || { bucket: S3_CONFIG.bucket, region: S3_CONFIG.region }
const command = new GetObjectCommand({
Bucket: S3_CONFIG.bucket,
Bucket: config.bucket,
Key: key,
})
@@ -257,10 +268,21 @@ export async function downloadFromS3WithConfig(key: string, customConfig: Custom
* Delete a file from S3
* @param key S3 object key
*/
export async function deleteFromS3(key: string) {
export async function deleteFromS3(key: string): Promise<void>
/**
* Delete a file from S3 with custom bucket configuration
* @param key S3 object key
* @param customConfig Custom S3 configuration
*/
export async function deleteFromS3(key: string, customConfig: CustomS3Config): Promise<void>
export async function deleteFromS3(key: string, customConfig?: CustomS3Config): Promise<void> {
const config = customConfig || { bucket: S3_CONFIG.bucket, region: S3_CONFIG.region }
await getS3Client().send(
new DeleteObjectCommand({
Bucket: S3_CONFIG.bucket,
Bucket: config.bucket,
Key: key,
})
)

View File

@@ -1,14 +1,43 @@
import { existsSync } from 'fs'
import { mkdir } from 'fs/promises'
import path, { join } from 'path'
import { env } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'
import {
ensureUploadsDirectory,
getStorageProvider,
USE_BLOB_STORAGE,
USE_S3_STORAGE,
} from '@/lib/uploads/setup'
import { getStorageProvider, USE_BLOB_STORAGE, USE_S3_STORAGE } from '@/lib/uploads/setup'
const logger = createLogger('UploadsSetup')
// Server-only upload directory path
const PROJECT_ROOT = path.resolve(process.cwd())
export const UPLOAD_DIR_SERVER = join(PROJECT_ROOT, 'uploads')
/**
* Server-only function to ensure uploads directory exists
*/
export async function ensureUploadsDirectory() {
if (USE_S3_STORAGE) {
logger.info('Using S3 storage, skipping local uploads directory creation')
return true
}
if (USE_BLOB_STORAGE) {
logger.info('Using Azure Blob storage, skipping local uploads directory creation')
return true
}
try {
if (!existsSync(UPLOAD_DIR_SERVER)) {
await mkdir(UPLOAD_DIR_SERVER, { recursive: true })
} else {
logger.info(`Uploads directory already exists at ${UPLOAD_DIR_SERVER}`)
}
return true
} catch (error) {
logger.error('Failed to create uploads directory:', error)
return false
}
}
// Immediately invoke on server startup
if (typeof process !== 'undefined') {
const storageProvider = getStorageProvider()

View File

@@ -1,14 +1,7 @@
import { existsSync } from 'fs'
import { mkdir } from 'fs/promises'
import path, { join } from 'path'
import { env } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('UploadsSetup')
const PROJECT_ROOT = path.resolve(process.cwd())
export const UPLOAD_DIR = join(PROJECT_ROOT, 'uploads')
// Client-safe configuration - no Node.js modules
export const UPLOAD_DIR = '/uploads'
// Check if S3 is configured (has required credentials)
const hasS3Config = !!(env.S3_BUCKET_NAME && env.AWS_REGION)
@@ -41,6 +34,11 @@ export const S3_KB_CONFIG = {
region: env.AWS_REGION || '',
}
export const S3_EXECUTION_FILES_CONFIG = {
bucket: env.S3_EXECUTION_FILES_BUCKET_NAME || 'sim-execution-files',
region: env.AWS_REGION || '',
}
export const BLOB_KB_CONFIG = {
accountName: env.AZURE_ACCOUNT_NAME || '',
accountKey: env.AZURE_ACCOUNT_KEY || '',
@@ -48,6 +46,13 @@ export const BLOB_KB_CONFIG = {
containerName: env.AZURE_STORAGE_KB_CONTAINER_NAME || '',
}
export const BLOB_EXECUTION_FILES_CONFIG = {
accountName: env.AZURE_ACCOUNT_NAME || '',
accountKey: env.AZURE_ACCOUNT_KEY || '',
connectionString: env.AZURE_CONNECTION_STRING || '',
containerName: env.AZURE_STORAGE_EXECUTION_FILES_CONTAINER_NAME || 'sim-execution-files',
}
export const S3_CHAT_CONFIG = {
bucket: env.S3_CHAT_BUCKET_NAME || '',
region: env.AWS_REGION || '',
@@ -72,30 +77,6 @@ export const BLOB_COPILOT_CONFIG = {
containerName: env.AZURE_STORAGE_COPILOT_CONTAINER_NAME || '',
}
export async function ensureUploadsDirectory() {
if (USE_S3_STORAGE) {
logger.info('Using S3 storage, skipping local uploads directory creation')
return true
}
if (USE_BLOB_STORAGE) {
logger.info('Using Azure Blob storage, skipping local uploads directory creation')
return true
}
try {
if (!existsSync(UPLOAD_DIR)) {
await mkdir(UPLOAD_DIR, { recursive: true })
} else {
logger.info(`Uploads directory already exists at ${UPLOAD_DIR}`)
}
return true
} catch (error) {
logger.error('Failed to create uploads directory:', error)
return false
}
}
/**
* Get the current storage provider as a human-readable string
*/

View File

@@ -1,28 +1,29 @@
import { createLogger } from '@/lib/logs/console/logger'
import {
type FileInfo as BlobFileInfo,
type CustomBlobConfig,
deleteFromBlob,
downloadFromBlob,
getPresignedUrl as getBlobPresignedUrl,
getPresignedUrlWithConfig as getBlobPresignedUrlWithConfig,
uploadToBlob,
} from '@/lib/uploads/blob/blob-client'
import {
type CustomS3Config,
deleteFromS3,
downloadFromS3,
getPresignedUrl as getS3PresignedUrl,
getPresignedUrlWithConfig as getS3PresignedUrlWithConfig,
type FileInfo as S3FileInfo,
uploadToS3,
} from '@/lib/uploads/s3/s3-client'
import type { CustomBlobConfig } from '@/lib/uploads/blob/blob-client'
import type { CustomS3Config } from '@/lib/uploads/s3/s3-client'
import { USE_BLOB_STORAGE, USE_S3_STORAGE } from '@/lib/uploads/setup'
const logger = createLogger('StorageClient')
export type FileInfo = S3FileInfo | BlobFileInfo
export type CustomStorageConfig = CustomS3Config | CustomBlobConfig
// Client-safe type definitions
export type FileInfo = {
path: string
key: string
name: string
size: number
type: string
}
export type CustomStorageConfig = {
// S3 config
bucket?: string
region?: string
// Blob config
containerName?: string
accountName?: string
accountKey?: string
connectionString?: string
}
/**
* Upload a file to the configured storage provider
@@ -65,16 +66,28 @@ export async function uploadFile(
): Promise<FileInfo> {
if (USE_BLOB_STORAGE) {
logger.info(`Uploading file to Azure Blob Storage: ${fileName}`)
const { uploadToBlob } = await import('@/lib/uploads/blob/blob-client')
if (typeof configOrSize === 'object') {
return uploadToBlob(file, fileName, contentType, configOrSize as CustomBlobConfig, size)
const blobConfig: CustomBlobConfig = {
containerName: configOrSize.containerName!,
accountName: configOrSize.accountName!,
accountKey: configOrSize.accountKey,
connectionString: configOrSize.connectionString,
}
return uploadToBlob(file, fileName, contentType, blobConfig, size)
}
return uploadToBlob(file, fileName, contentType, configOrSize)
}
if (USE_S3_STORAGE) {
logger.info(`Uploading file to S3: ${fileName}`)
const { uploadToS3 } = await import('@/lib/uploads/s3/s3-client')
if (typeof configOrSize === 'object') {
return uploadToS3(file, fileName, contentType, configOrSize as CustomS3Config, size)
const s3Config: CustomS3Config = {
bucket: configOrSize.bucket!,
region: configOrSize.region!,
}
return uploadToS3(file, fileName, contentType, s3Config, size)
}
return uploadToS3(file, fileName, contentType, configOrSize)
}
@@ -92,11 +105,13 @@ export async function uploadFile(
export async function downloadFile(key: string): Promise<Buffer> {
if (USE_BLOB_STORAGE) {
logger.info(`Downloading file from Azure Blob Storage: ${key}`)
const { downloadFromBlob } = await import('@/lib/uploads/blob/blob-client')
return downloadFromBlob(key)
}
if (USE_S3_STORAGE) {
logger.info(`Downloading file from S3: ${key}`)
const { downloadFromS3 } = await import('@/lib/uploads/s3/s3-client')
return downloadFromS3(key)
}
@@ -112,11 +127,13 @@ export async function downloadFile(key: string): Promise<Buffer> {
export async function deleteFile(key: string): Promise<void> {
if (USE_BLOB_STORAGE) {
logger.info(`Deleting file from Azure Blob Storage: ${key}`)
const { deleteFromBlob } = await import('@/lib/uploads/blob/blob-client')
return deleteFromBlob(key)
}
if (USE_S3_STORAGE) {
logger.info(`Deleting file from S3: ${key}`)
const { deleteFromS3 } = await import('@/lib/uploads/s3/s3-client')
return deleteFromS3(key)
}
@@ -134,11 +151,13 @@ export async function deleteFile(key: string): Promise<void> {
export async function getPresignedUrl(key: string, expiresIn = 3600): Promise<string> {
if (USE_BLOB_STORAGE) {
logger.info(`Generating presigned URL for Azure Blob Storage: ${key}`)
const { getPresignedUrl: getBlobPresignedUrl } = await import('@/lib/uploads/blob/blob-client')
return getBlobPresignedUrl(key, expiresIn)
}
if (USE_S3_STORAGE) {
logger.info(`Generating presigned URL for S3: ${key}`)
const { getPresignedUrl: getS3PresignedUrl } = await import('@/lib/uploads/s3/s3-client')
return getS3PresignedUrl(key, expiresIn)
}
@@ -161,12 +180,30 @@ export async function getPresignedUrlWithConfig(
): Promise<string> {
if (USE_BLOB_STORAGE) {
logger.info(`Generating presigned URL for Azure Blob Storage with custom config: ${key}`)
return getBlobPresignedUrlWithConfig(key, customConfig as CustomBlobConfig, expiresIn)
const { getPresignedUrlWithConfig: getBlobPresignedUrlWithConfig } = await import(
'@/lib/uploads/blob/blob-client'
)
// Convert CustomStorageConfig to CustomBlobConfig
const blobConfig: CustomBlobConfig = {
containerName: customConfig.containerName!,
accountName: customConfig.accountName!,
accountKey: customConfig.accountKey,
connectionString: customConfig.connectionString,
}
return getBlobPresignedUrlWithConfig(key, blobConfig, expiresIn)
}
if (USE_S3_STORAGE) {
logger.info(`Generating presigned URL for S3 with custom config: ${key}`)
return getS3PresignedUrlWithConfig(key, customConfig as CustomS3Config, expiresIn)
const { getPresignedUrlWithConfig: getS3PresignedUrlWithConfig } = await import(
'@/lib/uploads/s3/s3-client'
)
// Convert CustomStorageConfig to CustomS3Config
const s3Config: CustomS3Config = {
bucket: customConfig.bucket!,
region: customConfig.region!,
}
return getS3PresignedUrlWithConfig(key, s3Config, expiresIn)
}
throw new Error(

View File

@@ -0,0 +1,257 @@
/**
* Specialized storage client for workflow execution files
* Uses dedicated S3 bucket: sim-execution-files
* Directory structure: workspace_id/workflow_id/execution_id/filename
*/
import { createLogger } from '@/lib/logs/console/logger'
import {
deleteFromBlob,
downloadFromBlob,
getPresignedUrlWithConfig as getBlobPresignedUrlWithConfig,
uploadToBlob,
} from '@/lib/uploads/blob/blob-client'
import {
deleteFromS3,
downloadFromS3,
getPresignedUrlWithConfig,
uploadToS3,
} from '@/lib/uploads/s3/s3-client'
import {
BLOB_EXECUTION_FILES_CONFIG,
S3_EXECUTION_FILES_CONFIG,
USE_BLOB_STORAGE,
USE_S3_STORAGE,
} from '@/lib/uploads/setup'
import type { UserFile } from '@/executor/types'
import type { ExecutionContext } from './execution-files'
import { generateExecutionFileKey, generateFileId, getFileExpirationDate } from './execution-files'
const logger = createLogger('ExecutionFileStorage')
/**
* Upload a file to execution-scoped storage
*/
export async function uploadExecutionFile(
context: ExecutionContext,
fileBuffer: Buffer,
fileName: string,
contentType: string
): Promise<UserFile> {
logger.info(`Uploading execution file: ${fileName} for execution ${context.executionId}`)
logger.debug(`File upload context:`, {
workspaceId: context.workspaceId,
workflowId: context.workflowId,
executionId: context.executionId,
fileName,
bufferSize: fileBuffer.length,
})
// Generate execution-scoped storage key
const storageKey = generateExecutionFileKey(context, fileName)
const fileId = generateFileId()
logger.info(`Generated storage key: "${storageKey}" for file: ${fileName}`)
try {
let fileInfo: any
let directUrl: string | undefined
if (USE_S3_STORAGE) {
// Upload to S3 execution files bucket with exact key (no timestamp prefix)
logger.debug(
`Uploading to S3 with key: ${storageKey}, bucket: ${S3_EXECUTION_FILES_CONFIG.bucket}`
)
fileInfo = await uploadToS3(
fileBuffer,
storageKey, // Use storageKey as fileName
contentType,
{
bucket: S3_EXECUTION_FILES_CONFIG.bucket,
region: S3_EXECUTION_FILES_CONFIG.region,
},
undefined, // size (will use buffer length)
true // skipTimestampPrefix = true
)
logger.info(`S3 upload returned key: "${fileInfo.key}" for file: ${fileName}`)
logger.info(`Original storage key was: "${storageKey}"`)
logger.info(`Keys match: ${fileInfo.key === storageKey}`)
// Generate presigned URL for execution (5 minutes)
try {
logger.info(`Generating presigned URL with key: "${fileInfo.key}"`)
directUrl = await getPresignedUrlWithConfig(
fileInfo.key, // Use the actual uploaded key
{
bucket: S3_EXECUTION_FILES_CONFIG.bucket,
region: S3_EXECUTION_FILES_CONFIG.region,
},
5 * 60 // 5 minutes
)
logger.info(`Generated presigned URL: ${directUrl}`)
} catch (error) {
logger.warn(`Failed to generate S3 presigned URL for ${fileName}:`, error)
}
} else if (USE_BLOB_STORAGE) {
// Upload to Azure Blob execution files container
fileInfo = await uploadToBlob(fileBuffer, storageKey, contentType, {
accountName: BLOB_EXECUTION_FILES_CONFIG.accountName,
accountKey: BLOB_EXECUTION_FILES_CONFIG.accountKey,
connectionString: BLOB_EXECUTION_FILES_CONFIG.connectionString,
containerName: BLOB_EXECUTION_FILES_CONFIG.containerName,
})
// Generate presigned URL for execution (5 minutes)
try {
directUrl = await getBlobPresignedUrlWithConfig(
fileInfo.key, // Use the actual uploaded key
{
accountName: BLOB_EXECUTION_FILES_CONFIG.accountName,
accountKey: BLOB_EXECUTION_FILES_CONFIG.accountKey,
connectionString: BLOB_EXECUTION_FILES_CONFIG.connectionString,
containerName: BLOB_EXECUTION_FILES_CONFIG.containerName,
},
5 * 60 // 5 minutes
)
} catch (error) {
logger.warn(`Failed to generate Blob presigned URL for ${fileName}:`, error)
}
} else {
throw new Error('No cloud storage configured for execution files')
}
const userFile: UserFile = {
id: fileId,
name: fileName,
size: fileBuffer.length,
type: contentType,
url: directUrl || `/api/files/serve/${fileInfo.key}`, // Use 5-minute presigned URL, fallback to serve path
key: fileInfo.key, // Use the actual uploaded key from S3/Blob
uploadedAt: new Date().toISOString(),
expiresAt: getFileExpirationDate(),
}
logger.info(`Successfully uploaded execution file: ${fileName} (${fileBuffer.length} bytes)`)
return userFile
} catch (error) {
logger.error(`Failed to upload execution file ${fileName}:`, error)
throw new Error(
`Failed to upload file: ${error instanceof Error ? error.message : 'Unknown error'}`
)
}
}
/**
* Download a file from execution-scoped storage
*/
export async function downloadExecutionFile(userFile: UserFile): Promise<Buffer> {
logger.info(`Downloading execution file: ${userFile.name}`)
try {
let fileBuffer: Buffer
if (USE_S3_STORAGE) {
fileBuffer = await downloadFromS3(userFile.key, {
bucket: S3_EXECUTION_FILES_CONFIG.bucket,
region: S3_EXECUTION_FILES_CONFIG.region,
})
} else if (USE_BLOB_STORAGE) {
fileBuffer = await downloadFromBlob(userFile.key, {
accountName: BLOB_EXECUTION_FILES_CONFIG.accountName,
accountKey: BLOB_EXECUTION_FILES_CONFIG.accountKey,
connectionString: BLOB_EXECUTION_FILES_CONFIG.connectionString,
containerName: BLOB_EXECUTION_FILES_CONFIG.containerName,
})
} else {
throw new Error('No cloud storage configured for execution files')
}
logger.info(
`Successfully downloaded execution file: ${userFile.name} (${fileBuffer.length} bytes)`
)
return fileBuffer
} catch (error) {
logger.error(`Failed to download execution file ${userFile.name}:`, error)
throw new Error(
`Failed to download file: ${error instanceof Error ? error.message : 'Unknown error'}`
)
}
}
/**
* Generate a short-lived presigned URL for file download (5 minutes)
*/
export async function generateExecutionFileDownloadUrl(userFile: UserFile): Promise<string> {
logger.info(`Generating download URL for execution file: ${userFile.name}`)
logger.info(`File key: "${userFile.key}"`)
logger.info(`S3 bucket: ${S3_EXECUTION_FILES_CONFIG.bucket}`)
try {
let downloadUrl: string
if (USE_S3_STORAGE) {
downloadUrl = await getPresignedUrlWithConfig(
userFile.key,
{
bucket: S3_EXECUTION_FILES_CONFIG.bucket,
region: S3_EXECUTION_FILES_CONFIG.region,
},
5 * 60 // 5 minutes
)
} else if (USE_BLOB_STORAGE) {
downloadUrl = await getBlobPresignedUrlWithConfig(
userFile.key,
{
accountName: BLOB_EXECUTION_FILES_CONFIG.accountName,
accountKey: BLOB_EXECUTION_FILES_CONFIG.accountKey,
connectionString: BLOB_EXECUTION_FILES_CONFIG.connectionString,
containerName: BLOB_EXECUTION_FILES_CONFIG.containerName,
},
5 * 60 // 5 minutes
)
} else {
throw new Error('No cloud storage configured for execution files')
}
logger.info(`Generated download URL for execution file: ${userFile.name}`)
return downloadUrl
} catch (error) {
logger.error(`Failed to generate download URL for ${userFile.name}:`, error)
throw new Error(
`Failed to generate download URL: ${error instanceof Error ? error.message : 'Unknown error'}`
)
}
}
/**
* Delete a file from execution-scoped storage
*/
export async function deleteExecutionFile(userFile: UserFile): Promise<void> {
logger.info(`Deleting execution file: ${userFile.name}`)
try {
if (USE_S3_STORAGE) {
await deleteFromS3(userFile.key, {
bucket: S3_EXECUTION_FILES_CONFIG.bucket,
region: S3_EXECUTION_FILES_CONFIG.region,
})
} else if (USE_BLOB_STORAGE) {
await deleteFromBlob(userFile.key, {
accountName: BLOB_EXECUTION_FILES_CONFIG.accountName,
accountKey: BLOB_EXECUTION_FILES_CONFIG.accountKey,
connectionString: BLOB_EXECUTION_FILES_CONFIG.connectionString,
containerName: BLOB_EXECUTION_FILES_CONFIG.containerName,
})
} else {
throw new Error('No cloud storage configured for execution files')
}
logger.info(`Successfully deleted execution file: ${userFile.name}`)
} catch (error) {
logger.error(`Failed to delete execution file ${userFile.name}:`, error)
throw new Error(
`Failed to delete file: ${error instanceof Error ? error.message : 'Unknown error'}`
)
}
}

View File

@@ -0,0 +1,147 @@
/**
* Server-only execution file metadata management
* This file contains database operations and should only be imported by server-side code
*/
import { eq } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console/logger'
import { db } from '@/db'
import { workflowExecutionLogs } from '@/db/schema'
import type { ExecutionFileMetadata } from './execution-files'
const logger = createLogger('ExecutionFilesServer')
/**
* Retrieve file metadata from execution logs
*/
export async function getExecutionFiles(executionId: string): Promise<ExecutionFileMetadata[]> {
try {
const log = await db
.select()
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.executionId, executionId))
.limit(1)
if (log.length === 0) {
return []
}
// Get files from the dedicated files column
return (log[0].files as ExecutionFileMetadata[]) || []
} catch (error) {
logger.error(`Failed to retrieve file metadata for execution ${executionId}:`, error)
return []
}
}
/**
* Store file metadata in execution logs
*/
export async function storeExecutionFiles(
executionId: string,
files: ExecutionFileMetadata[]
): Promise<void> {
try {
logger.info(`Storing ${files.length} file metadata entries for execution ${executionId}`)
await db
.update(workflowExecutionLogs)
.set({ files })
.where(eq(workflowExecutionLogs.executionId, executionId))
logger.info(`Successfully stored file metadata for execution ${executionId}`)
} catch (error) {
logger.error(`Failed to store file metadata for execution ${executionId}:`, error)
throw error
}
}
/**
* Add file metadata to existing execution logs
*/
export async function addExecutionFile(
executionId: string,
fileMetadata: ExecutionFileMetadata
): Promise<void> {
try {
// Get existing files
const existingFiles = await getExecutionFiles(executionId)
// Add new file
const updatedFiles = [...existingFiles, fileMetadata]
// Store updated files
await storeExecutionFiles(executionId, updatedFiles)
logger.info(`Added file ${fileMetadata.name} to execution ${executionId}`)
} catch (error) {
logger.error(`Failed to add file to execution ${executionId}:`, error)
throw error
}
}
/**
* Get all expired files across all executions
*/
export async function getExpiredFiles(): Promise<ExecutionFileMetadata[]> {
try {
const now = new Date().toISOString()
// Query all execution logs that have files
const logs = await db
.select()
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.level, 'info')) // Only get successful executions
const expiredFiles: ExecutionFileMetadata[] = []
for (const log of logs) {
const files = log.files as ExecutionFileMetadata[]
if (files) {
const expired = files.filter((file) => file.expiresAt < now)
expiredFiles.push(...expired)
}
}
return expiredFiles
} catch (error) {
logger.error('Failed to get expired files:', error)
return []
}
}
/**
* Remove expired file metadata from execution logs
*/
export async function cleanupExpiredFileMetadata(): Promise<number> {
try {
const now = new Date().toISOString()
let cleanedCount = 0
// Get all execution logs
const logs = await db.select().from(workflowExecutionLogs)
for (const log of logs) {
const files = log.files as ExecutionFileMetadata[]
if (files && files.length > 0) {
const nonExpiredFiles = files.filter((file) => file.expiresAt >= now)
if (nonExpiredFiles.length !== files.length) {
// Some files expired, update the files column
await db
.update(workflowExecutionLogs)
.set({ files: nonExpiredFiles.length > 0 ? nonExpiredFiles : null })
.where(eq(workflowExecutionLogs.id, log.id))
cleanedCount += files.length - nonExpiredFiles.length
}
}
}
logger.info(`Cleaned up ${cleanedCount} expired file metadata entries`)
return cleanedCount
} catch (error) {
logger.error('Failed to cleanup expired file metadata:', error)
return 0
}
}

View File

@@ -0,0 +1,60 @@
/**
* Execution file management system for binary data transfer between blocks
* This handles file storage, retrieval, and cleanup for workflow executions
*/
import type { UserFile } from '@/executor/types'
/**
* Execution context for file operations
*/
export interface ExecutionContext {
workspaceId: string
workflowId: string
executionId: string
}
/**
* File metadata stored in execution logs - now just uses UserFile directly
*/
export type ExecutionFileMetadata = UserFile
/**
* Generate execution-scoped storage key
* Format: workspace_id/workflow_id/execution_id/filename
*/
export function generateExecutionFileKey(context: ExecutionContext, fileName: string): string {
const { workspaceId, workflowId, executionId } = context
const safeFileName = fileName.replace(/\s+/g, '-').replace(/[^a-zA-Z0-9.-]/g, '_')
return `${workspaceId}/${workflowId}/${executionId}/${safeFileName}`
}
/**
* Generate execution prefix for cleanup operations
* Format: workspace_id/workflow_id/execution_id/
*/
export function generateExecutionPrefix(context: ExecutionContext): string {
const { workspaceId, workflowId, executionId } = context
return `${workspaceId}/${workflowId}/${executionId}/`
}
/**
* Generate unique file ID for execution files
*/
export function generateFileId(): string {
return `file_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`
}
/**
* Check if a user file is expired
*/
export function isFileExpired(userFile: UserFile): boolean {
return new Date(userFile.expiresAt) < new Date()
}
/**
* Get file expiration date for execution files (5 minutes from now)
*/
export function getFileExpirationDate(): string {
return new Date(Date.now() + 5 * 60 * 1000).toISOString()
}

View File

@@ -78,6 +78,18 @@ export interface WorkflowLog {
trigger: string | null
createdAt: string
workflow?: WorkflowData | null
files?: Array<{
id: string
name: string
size: number
type: string
url: string
key: string
uploadedAt: string
expiresAt: string
storageProvider?: 's3' | 'blob' | 'local'
bucketName?: string
}>
metadata?: ToolCallMetadata & {
traceSpans?: TraceSpan[]
totalDuration?: number

View File

@@ -1,4 +1,9 @@
import type { GmailMessage, GmailReadParams, GmailToolResponse } from '@/tools/gmail/types'
import type {
GmailAttachment,
GmailMessage,
GmailReadParams,
GmailToolResponse,
} from '@/tools/gmail/types'
import type { ToolConfig } from '@/tools/types'
const GMAIL_API_BASE = 'https://gmail.googleapis.com/gmail/v1/users/me'
@@ -12,7 +17,16 @@ export const gmailReadTool: ToolConfig<GmailReadParams, GmailToolResponse> = {
oauth: {
required: true,
provider: 'google-email',
additionalScopes: ['https://www.googleapis.com/auth/gmail.labels'],
additionalScopes: [
'https://www.googleapis.com/auth/gmail.labels',
'https://www.googleapis.com/auth/gmail.readonly',
],
},
outputs: {
content: { type: 'string' },
metadata: { type: 'json' },
attachments: { type: 'file[]' },
},
params: {
@@ -46,6 +60,12 @@ export const gmailReadTool: ToolConfig<GmailReadParams, GmailToolResponse> = {
visibility: 'user-only',
description: 'Maximum number of messages to retrieve (default: 1, max: 10)',
},
includeAttachments: {
type: 'boolean',
required: false,
visibility: 'user-only',
description: 'Download and include email attachments',
},
},
request: {
@@ -98,43 +118,77 @@ export const gmailReadTool: ToolConfig<GmailReadParams, GmailToolResponse> = {
},
transformResponse: async (response: Response, params?: GmailReadParams) => {
try {
const data = await response.json()
const data = await response.json()
if (!response.ok) {
throw new Error(data.error?.message || 'Failed to read email')
if (!response.ok) {
throw new Error(data.error?.message || 'Failed to read email')
}
// If we're fetching a single message directly (by ID)
if (params?.messageId) {
return await processMessage(data, params)
}
// If we're listing messages, we need to fetch each message's details
if (data.messages && Array.isArray(data.messages)) {
// Return a message if no emails found
if (data.messages.length === 0) {
return {
success: true,
output: {
content: 'No messages found in the selected folder.',
metadata: {
results: [], // Use SearchMetadata format
},
},
}
}
// If we're fetching a single message directly (by ID)
if (params?.messageId) {
return processMessage(data)
}
// For agentic workflows, we'll fetch the first message by default
// If maxResults > 1, we'll return a summary of messages found
const maxResults = params?.maxResults ? Math.min(params.maxResults, 10) : 1
// If we're listing messages, we need to fetch each message's details
if (data.messages && Array.isArray(data.messages)) {
// Return a message if no emails found
if (data.messages.length === 0) {
if (maxResults === 1) {
try {
// Get the first message details
const messageId = data.messages[0].id
const messageResponse = await fetch(
`${GMAIL_API_BASE}/messages/${messageId}?format=full`,
{
headers: {
Authorization: `Bearer ${params?.accessToken || ''}`,
'Content-Type': 'application/json',
},
}
)
if (!messageResponse.ok) {
const errorData = await messageResponse.json()
throw new Error(errorData.error?.message || 'Failed to fetch message details')
}
const message = await messageResponse.json()
return await processMessage(message, params)
} catch (error: any) {
return {
success: true,
output: {
content: 'No messages found in the selected folder.',
content: `Found messages but couldn't retrieve details: ${error.message || 'Unknown error'}`,
metadata: {
results: [], // Use SearchMetadata format
results: data.messages.map((msg: any) => ({
id: msg.id,
threadId: msg.threadId,
})),
},
},
}
}
// For agentic workflows, we'll fetch the first message by default
// If maxResults > 1, we'll return a summary of messages found
const maxResults = params?.maxResults ? Math.min(params.maxResults, 10) : 1
if (maxResults === 1) {
try {
// Get the first message details
const messageId = data.messages[0].id
} else {
// If maxResults > 1, fetch details for all messages
try {
const messagePromises = data.messages.slice(0, maxResults).map(async (msg: any) => {
const messageResponse = await fetch(
`${GMAIL_API_BASE}/messages/${messageId}?format=full`,
`${GMAIL_API_BASE}/messages/${msg.id}?format=full`,
{
headers: {
Authorization: `Bearer ${params?.accessToken || ''}`,
@@ -144,99 +198,58 @@ export const gmailReadTool: ToolConfig<GmailReadParams, GmailToolResponse> = {
)
if (!messageResponse.ok) {
const errorData = await messageResponse.json()
throw new Error(errorData.error?.message || 'Failed to fetch message details')
throw new Error(`Failed to fetch details for message ${msg.id}`)
}
const message = await messageResponse.json()
return processMessage(message)
} catch (error: any) {
console.error('Error fetching message details:', error)
return {
success: true,
output: {
content: `Found messages but couldn't retrieve details: ${error.message || 'Unknown error'}`,
metadata: {
results: data.messages.map((msg: any) => ({
id: msg.id,
threadId: msg.threadId,
})),
},
return await messageResponse.json()
})
const messages = await Promise.all(messagePromises)
// Process all messages and create a summary
const processedMessages = messages.map(processMessageForSummary)
return {
success: true,
output: {
content: createMessagesSummary(processedMessages),
metadata: {
results: processedMessages.map((msg) => ({
id: msg.id,
threadId: msg.threadId,
subject: msg.subject,
from: msg.from,
date: msg.date,
})),
},
}
},
}
} else {
// If maxResults > 1, fetch details for all messages
try {
const messagePromises = data.messages.slice(0, maxResults).map(async (msg: any) => {
const messageResponse = await fetch(
`${GMAIL_API_BASE}/messages/${msg.id}?format=full`,
{
headers: {
Authorization: `Bearer ${params?.accessToken || ''}`,
'Content-Type': 'application/json',
},
}
)
if (!messageResponse.ok) {
throw new Error(`Failed to fetch details for message ${msg.id}`)
}
return await messageResponse.json()
})
const messages = await Promise.all(messagePromises)
// Process all messages and create a summary
const processedMessages = messages.map(processMessageForSummary)
return {
success: true,
output: {
content: createMessagesSummary(processedMessages),
metadata: {
results: processedMessages.map((msg) => ({
id: msg.id,
threadId: msg.threadId,
subject: msg.subject,
from: msg.from,
date: msg.date,
})),
},
} catch (error: any) {
return {
success: true,
output: {
content: `Found ${data.messages.length} messages but couldn't retrieve all details: ${error.message || 'Unknown error'}`,
metadata: {
results: data.messages.map((msg: any) => ({
id: msg.id,
threadId: msg.threadId,
})),
},
}
} catch (error: any) {
console.error('Error fetching multiple message details:', error)
return {
success: true,
output: {
content: `Found ${data.messages.length} messages but couldn't retrieve all details: ${error.message || 'Unknown error'}`,
metadata: {
results: data.messages.map((msg: any) => ({
id: msg.id,
threadId: msg.threadId,
})),
},
},
}
},
}
}
}
}
// Fallback for unexpected response format
return {
success: true,
output: {
content: 'Unexpected response format from Gmail API',
metadata: {
results: [],
},
// Fallback for unexpected response format
return {
success: true,
output: {
content: 'Unexpected response format from Gmail API',
metadata: {
results: [],
},
}
} catch (error) {
console.error('Error in transformResponse:', error)
throw error
},
}
},
@@ -255,7 +268,10 @@ export const gmailReadTool: ToolConfig<GmailReadParams, GmailToolResponse> = {
}
// Helper function to process a Gmail message
function processMessage(message: GmailMessage): GmailToolResponse {
async function processMessage(
message: GmailMessage,
params?: GmailReadParams
): Promise<GmailToolResponse> {
// Check if message and payload exist
if (!message || !message.payload) {
return {
@@ -280,7 +296,21 @@ function processMessage(message: GmailMessage): GmailToolResponse {
// Extract the message body
const body = extractMessageBody(message.payload)
return {
// Check for attachments
const attachmentInfo = extractAttachmentInfo(message.payload)
const hasAttachments = attachmentInfo.length > 0
// Download attachments if requested
let attachments: GmailAttachment[] | undefined
if (params?.includeAttachments && hasAttachments && params.accessToken) {
try {
attachments = await downloadAttachments(message.id, attachmentInfo, params.accessToken)
} catch (error) {
// Continue without attachments rather than failing the entire request
}
}
const result: GmailToolResponse = {
success: true,
output: {
content: body || 'No content found in email',
@@ -292,9 +322,15 @@ function processMessage(message: GmailMessage): GmailToolResponse {
to,
subject,
date,
hasAttachments,
attachmentCount: attachmentInfo.length,
},
// Always include attachments array (empty if none downloaded)
attachments: attachments || [],
},
}
return result
}
// Helper function to process a message for summary (without full content)
@@ -382,3 +418,83 @@ function extractMessageBody(payload: any): string {
// If we couldn't find any text content, return empty string
return ''
}
// Helper function to extract attachment information from message payload
function extractAttachmentInfo(
payload: any
): Array<{ attachmentId: string; filename: string; mimeType: string; size: number }> {
const attachments: Array<{
attachmentId: string
filename: string
mimeType: string
size: number
}> = []
function processPayloadPart(part: any) {
// Check if this part has an attachment
if (part.body?.attachmentId && part.filename) {
attachments.push({
attachmentId: part.body.attachmentId,
filename: part.filename,
mimeType: part.mimeType || 'application/octet-stream',
size: part.body.size || 0,
})
}
// Recursively process nested parts
if (part.parts && Array.isArray(part.parts)) {
part.parts.forEach(processPayloadPart)
}
}
// Process the main payload
processPayloadPart(payload)
return attachments
}
// Helper function to download attachments from Gmail API
async function downloadAttachments(
messageId: string,
attachmentInfo: Array<{ attachmentId: string; filename: string; mimeType: string; size: number }>,
accessToken: string
): Promise<GmailAttachment[]> {
const downloadedAttachments: GmailAttachment[] = []
for (const attachment of attachmentInfo) {
try {
// Download attachment from Gmail API
const attachmentResponse = await fetch(
`${GMAIL_API_BASE}/messages/${messageId}/attachments/${attachment.attachmentId}`,
{
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
}
)
if (!attachmentResponse.ok) {
continue
}
const attachmentData = (await attachmentResponse.json()) as { data: string; size: number }
// Decode base64url data to buffer
// Gmail API returns data in base64url format (URL-safe base64)
const base64Data = attachmentData.data.replace(/-/g, '+').replace(/_/g, '/')
const buffer = Buffer.from(base64Data, 'base64')
downloadedAttachments.push({
name: attachment.filename,
data: buffer,
mimeType: attachment.mimeType,
size: attachment.size,
})
} catch (error) {
// Continue with other attachments
}
}
return downloadedAttachments
}

View File

@@ -18,6 +18,7 @@ export interface GmailReadParams extends BaseGmailParams {
folder: string
unreadOnly?: boolean
maxResults?: number
includeAttachments?: boolean
}
// Search operation parameters
@@ -41,6 +42,8 @@ interface EmailMetadata extends BaseGmailMetadata {
to?: string
subject?: string
date?: string
hasAttachments?: boolean
attachmentCount?: number
}
interface SearchMetadata extends BaseGmailMetadata {
@@ -55,6 +58,7 @@ export interface GmailToolResponse extends ToolResponse {
output: {
content: string
metadata: EmailMetadata | SearchMetadata
attachments?: GmailAttachment[]
}
}
@@ -71,12 +75,26 @@ export interface GmailMessage {
}>
body: {
data?: string
attachmentId?: string
size?: number
}
parts?: Array<{
mimeType: string
filename?: string
body: {
data?: string
attachmentId?: string
size?: number
}
parts?: Array<any>
}>
}
}
// Gmail Attachment Interface (for processed attachments)
export interface GmailAttachment {
name: string
data: Buffer
mimeType: string
size: number
}

View File

@@ -1,5 +1,6 @@
import { createLogger } from '@/lib/logs/console/logger'
import { getBaseUrl } from '@/lib/urls/utils'
import type { ExecutionContext } from '@/executor/types'
import type { OAuthTokenPayload, ToolConfig, ToolResponse } from '@/tools/types'
import {
formatRequestParams,
@@ -10,12 +11,63 @@ import {
const logger = createLogger('Tools')
/**
* Process file outputs for a tool result if execution context is available
* Uses dynamic imports to avoid client-side bundling issues
*/
async function processFileOutputs(
result: ToolResponse,
tool: ToolConfig,
executionContext?: ExecutionContext
): Promise<ToolResponse> {
// Skip file processing if no execution context or not successful
if (!executionContext || !result.success) {
return result
}
// Skip file processing on client-side (no Node.js modules available)
if (typeof window !== 'undefined') {
return result
}
try {
// Dynamic import to avoid client-side bundling issues
const { FileToolProcessor } = await import('@/executor/utils/file-tool-processor')
// Check if tool has file outputs
if (!FileToolProcessor.hasFileOutputs(tool)) {
return result
}
logger.info(`File processing for tool ${tool.id}: checking outputs`, Object.keys(result.output))
const processedOutput = await FileToolProcessor.processToolOutputs(
result.output,
tool,
executionContext
)
logger.info(
`File processing for tool ${tool.id}: processed outputs`,
Object.keys(processedOutput)
)
return {
...result,
output: processedOutput,
}
} catch (error) {
logger.error(`Error processing file outputs for tool ${tool.id}:`, error)
// Return original result if file processing fails
return result
}
}
// Execute a tool by calling either the proxy for external APIs or directly for internal routes
export async function executeTool(
toolId: string,
params: Record<string, any>,
skipProxy = false,
skipPostProcess = false
skipPostProcess = false,
executionContext?: ExecutionContext
): Promise<ToolResponse> {
// Capture start time for precise timing
const startTime = new Date()
@@ -124,38 +176,23 @@ export async function executeTool(
const duration = endTime.getTime() - startTime.getTime()
// Apply post-processing if available and not skipped
let finalResult = directResult
if (tool.postProcess && directResult.success && !skipPostProcess) {
try {
const postProcessResult = await tool.postProcess(
directResult,
contextParams,
executeTool
)
return {
...postProcessResult,
timing: {
startTime: startTimeISO,
endTime: endTimeISO,
duration,
},
}
finalResult = await tool.postProcess(directResult, contextParams, executeTool)
} catch (error) {
logger.error(`[${requestId}] Post-processing error for ${toolId}:`, {
error: error instanceof Error ? error.message : String(error),
})
return {
...directResult,
timing: {
startTime: startTimeISO,
endTime: endTimeISO,
duration,
},
}
finalResult = directResult
}
}
// Process file outputs if execution context is available
finalResult = await processFileOutputs(finalResult, tool, executionContext)
return {
...directResult,
...finalResult,
timing: {
startTime: startTimeISO,
endTime: endTimeISO,
@@ -177,48 +214,27 @@ export async function executeTool(
const result = await handleInternalRequest(toolId, tool, contextParams)
// Apply post-processing if available and not skipped
let finalResult = result
if (tool.postProcess && result.success && !skipPostProcess) {
try {
const postProcessResult = await tool.postProcess(result, contextParams, executeTool)
// Add timing data to the post-processed result
const endTime = new Date()
const endTimeISO = endTime.toISOString()
const duration = endTime.getTime() - startTime.getTime()
return {
...postProcessResult,
timing: {
startTime: startTimeISO,
endTime: endTimeISO,
duration,
},
}
finalResult = await tool.postProcess(result, contextParams, executeTool)
} catch (error) {
logger.error(`[${requestId}] Post-processing error for ${toolId}:`, {
error: error instanceof Error ? error.message : String(error),
})
// Return original result if post-processing fails
// Still include timing data
const endTime = new Date()
const endTimeISO = endTime.toISOString()
const duration = endTime.getTime() - startTime.getTime()
return {
...result,
timing: {
startTime: startTimeISO,
endTime: endTimeISO,
duration,
},
}
finalResult = result
}
}
// Process file outputs if execution context is available
finalResult = await processFileOutputs(finalResult, tool, executionContext)
// Add timing data to the result
const endTime = new Date()
const endTimeISO = endTime.toISOString()
const duration = endTime.getTime() - startTime.getTime()
return {
...result,
...finalResult,
timing: {
startTime: startTimeISO,
endTime: endTimeISO,
@@ -228,50 +244,30 @@ export async function executeTool(
}
// For external APIs, use the proxy
const result = await handleProxyRequest(toolId, contextParams)
const result = await handleProxyRequest(toolId, contextParams, executionContext)
// Apply post-processing if available and not skipped
let finalResult = result
if (tool.postProcess && result.success && !skipPostProcess) {
try {
const postProcessResult = await tool.postProcess(result, contextParams, executeTool)
// Add timing data to the post-processed result
const endTime = new Date()
const endTimeISO = endTime.toISOString()
const duration = endTime.getTime() - startTime.getTime()
return {
...postProcessResult,
timing: {
startTime: startTimeISO,
endTime: endTimeISO,
duration,
},
}
finalResult = await tool.postProcess(result, contextParams, executeTool)
} catch (error) {
logger.error(`[${requestId}] Post-processing error for ${toolId}:`, {
error: error instanceof Error ? error.message : String(error),
})
// Return original result if post-processing fails, but include timing data
const endTime = new Date()
const endTimeISO = endTime.toISOString()
const duration = endTime.getTime() - startTime.getTime()
return {
...result,
timing: {
startTime: startTimeISO,
endTime: endTimeISO,
duration,
},
}
finalResult = result
}
}
// Process file outputs if execution context is available
finalResult = await processFileOutputs(finalResult, tool, executionContext)
// Add timing data to the result
const endTime = new Date()
const endTimeISO = endTime.toISOString()
const duration = endTime.getTime() - startTime.getTime()
return {
...result,
...finalResult,
timing: {
startTime: startTimeISO,
endTime: endTimeISO,
@@ -592,7 +588,8 @@ function validateClientSideParams(
*/
async function handleProxyRequest(
toolId: string,
params: Record<string, any>
params: Record<string, any>,
executionContext?: ExecutionContext
): Promise<ToolResponse> {
const requestId = crypto.randomUUID().slice(0, 8)
@@ -603,7 +600,7 @@ async function handleProxyRequest(
const response = await fetch(proxyUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ toolId, params }),
body: JSON.stringify({ toolId, params, executionContext }),
})
if (!response.ok) {

View File

@@ -93,10 +93,10 @@ export const mistralParserTool: ToolConfig<MistralParserInput, MistralParserOutp
if (
typeof params.fileUpload === 'object' &&
params.fileUpload !== null &&
params.fileUpload.path
(params.fileUpload.url || params.fileUpload.path)
) {
// Get the full URL to the file
let uploadedFilePath = params.fileUpload.path
// Get the full URL to the file - prefer url over path for UserFile compatibility
let uploadedFilePath = params.fileUpload.url || params.fileUpload.path
// Make sure the file path is an absolute URL
if (uploadedFilePath.startsWith('/')) {

View File

@@ -44,6 +44,19 @@ export interface ToolConfig<P = any, R = any> {
}
>
// Output schema - what this tool produces
outputs?: Record<
string,
{
type: 'string' | 'number' | 'boolean' | 'json' | 'file' | 'file[]'
description?: string
fileConfig?: {
mimeType?: string // Expected MIME type for file outputs
extension?: string // Expected file extension
}
}
>
// OAuth configuration for this tool (if it requires authentication)
oauth?: OAuthConfig
@@ -83,3 +96,14 @@ export interface OAuthTokenPayload {
credentialId: string
workflowId?: string
}
/**
* File data that tools can return for file-typed outputs
*/
export interface ToolFileData {
name: string
mimeType: string
data?: Buffer | string // Buffer or base64 string
url?: string // URL to download file from
size?: number
}

View File

@@ -204,13 +204,17 @@ export const webhookExecution = task({
}
// Create executor and execute
const executor = new Executor(
serializedWorkflow,
processedBlockStates,
decryptedEnvVars,
input || {},
workflowVariables
)
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: input || {},
workflowVariables,
contextExtensions: {
executionId,
workspaceId: '', // TODO: Get from workflow if needed - see comment on line 103
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)

View File

@@ -126,13 +126,17 @@ export const workflowExecution = task({
)
// Create executor and execute
const executor = new Executor(
serializedWorkflow,
processedBlockStates,
decryptedEnvVars,
payload.input || {},
{} // workflow variables
)
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: payload.input || {},
workflowVariables: {},
contextExtensions: {
executionId,
workspaceId: '', // TODO: Get from workflow if needed - see comment on line 120
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)