mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-28 03:00:29 -04:00
v0.6.60: copilot security improvements, slack canvas ops, retention jobs fixes
This commit is contained in:
@@ -8,21 +8,61 @@ import {
|
||||
isUsingCloudStorage,
|
||||
type StorageContext,
|
||||
} from '@/lib/uploads'
|
||||
import {
|
||||
signUploadToken,
|
||||
type UploadTokenPayload,
|
||||
verifyUploadToken,
|
||||
} from '@/lib/uploads/core/upload-token'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('MultipartUploadAPI')
|
||||
|
||||
const ALLOWED_UPLOAD_CONTEXTS = new Set<StorageContext>([
|
||||
'knowledge-base',
|
||||
'chat',
|
||||
'copilot',
|
||||
'mothership',
|
||||
'execution',
|
||||
'workspace',
|
||||
'profile-pictures',
|
||||
'og-images',
|
||||
'logs',
|
||||
'workspace-logos',
|
||||
])
|
||||
|
||||
interface InitiateMultipartRequest {
|
||||
fileName: string
|
||||
contentType: string
|
||||
fileSize: number
|
||||
workspaceId: string
|
||||
context?: StorageContext
|
||||
}
|
||||
|
||||
interface GetPartUrlsRequest {
|
||||
uploadId: string
|
||||
key: string
|
||||
interface TokenBoundRequest {
|
||||
uploadToken: string
|
||||
}
|
||||
|
||||
interface GetPartUrlsRequest extends TokenBoundRequest {
|
||||
partNumbers: number[]
|
||||
context?: StorageContext
|
||||
}
|
||||
|
||||
interface CompleteSingleRequest extends TokenBoundRequest {
|
||||
parts: unknown
|
||||
}
|
||||
|
||||
interface CompleteBatchRequest {
|
||||
uploads: Array<TokenBoundRequest & { parts: unknown }>
|
||||
}
|
||||
|
||||
const verifyTokenForUser = (token: string | undefined, userId: string) => {
|
||||
if (!token || typeof token !== 'string') {
|
||||
return null
|
||||
}
|
||||
const result = verifyUploadToken(token)
|
||||
if (!result.valid || result.payload.userId !== userId) {
|
||||
return null
|
||||
}
|
||||
return result.payload
|
||||
}
|
||||
|
||||
export const POST = withRouteHandler(async (request: NextRequest) => {
|
||||
@@ -31,6 +71,7 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
const userId = session.user.id
|
||||
|
||||
const action = request.nextUrl.searchParams.get('action')
|
||||
|
||||
@@ -45,32 +86,34 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
|
||||
|
||||
switch (action) {
|
||||
case 'initiate': {
|
||||
const data: InitiateMultipartRequest = await request.json()
|
||||
const { fileName, contentType, fileSize, context = 'knowledge-base' } = data
|
||||
const data = (await request.json()) as InitiateMultipartRequest
|
||||
const { fileName, contentType, fileSize, workspaceId, context = 'knowledge-base' } = data
|
||||
|
||||
if (!workspaceId || typeof workspaceId !== 'string') {
|
||||
return NextResponse.json({ error: 'workspaceId is required' }, { status: 400 })
|
||||
}
|
||||
|
||||
if (!ALLOWED_UPLOAD_CONTEXTS.has(context)) {
|
||||
return NextResponse.json({ error: 'Invalid storage context' }, { status: 400 })
|
||||
}
|
||||
|
||||
const permission = await getUserEntityPermissions(userId, 'workspace', workspaceId)
|
||||
if (permission !== 'write' && permission !== 'admin') {
|
||||
return NextResponse.json({ error: 'Forbidden' }, { status: 403 })
|
||||
}
|
||||
|
||||
const config = getStorageConfig(context)
|
||||
|
||||
let uploadId: string
|
||||
let key: string
|
||||
|
||||
if (storageProvider === 's3') {
|
||||
const { initiateS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client')
|
||||
|
||||
const result = await initiateS3MultipartUpload({
|
||||
fileName,
|
||||
contentType,
|
||||
fileSize,
|
||||
})
|
||||
|
||||
logger.info(
|
||||
`Initiated S3 multipart upload for ${fileName} (context: ${context}): ${result.uploadId}`
|
||||
)
|
||||
|
||||
return NextResponse.json({
|
||||
uploadId: result.uploadId,
|
||||
key: result.key,
|
||||
})
|
||||
}
|
||||
if (storageProvider === 'blob') {
|
||||
const result = await initiateS3MultipartUpload({ fileName, contentType, fileSize })
|
||||
uploadId = result.uploadId
|
||||
key = result.key
|
||||
} else if (storageProvider === 'blob') {
|
||||
const { initiateMultipartUpload } = await import('@/lib/uploads/providers/blob/client')
|
||||
|
||||
const result = await initiateMultipartUpload({
|
||||
fileName,
|
||||
contentType,
|
||||
@@ -82,46 +125,55 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
|
||||
connectionString: config.connectionString,
|
||||
},
|
||||
})
|
||||
|
||||
logger.info(
|
||||
`Initiated Azure multipart upload for ${fileName} (context: ${context}): ${result.uploadId}`
|
||||
uploadId = result.uploadId
|
||||
key = result.key
|
||||
} else {
|
||||
return NextResponse.json(
|
||||
{ error: `Unsupported storage provider: ${storageProvider}` },
|
||||
{ status: 400 }
|
||||
)
|
||||
|
||||
return NextResponse.json({
|
||||
uploadId: result.uploadId,
|
||||
key: result.key,
|
||||
})
|
||||
}
|
||||
|
||||
return NextResponse.json(
|
||||
{ error: `Unsupported storage provider: ${storageProvider}` },
|
||||
{ status: 400 }
|
||||
const uploadToken = signUploadToken({
|
||||
uploadId,
|
||||
key,
|
||||
userId,
|
||||
workspaceId,
|
||||
context,
|
||||
})
|
||||
|
||||
logger.info(
|
||||
`Initiated ${storageProvider} multipart upload for ${fileName} (context: ${context}, workspace: ${workspaceId}): ${uploadId}`
|
||||
)
|
||||
|
||||
return NextResponse.json({ uploadId, key, uploadToken })
|
||||
}
|
||||
|
||||
case 'get-part-urls': {
|
||||
const data: GetPartUrlsRequest = await request.json()
|
||||
const { uploadId, key, partNumbers, context = 'knowledge-base' } = data
|
||||
const data = (await request.json()) as GetPartUrlsRequest
|
||||
const { partNumbers } = data
|
||||
|
||||
const tokenPayload = verifyTokenForUser(data.uploadToken, userId)
|
||||
if (!tokenPayload) {
|
||||
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
|
||||
}
|
||||
|
||||
const { uploadId, key, context } = tokenPayload
|
||||
const config = getStorageConfig(context)
|
||||
|
||||
if (storageProvider === 's3') {
|
||||
const { getS3MultipartPartUrls } = await import('@/lib/uploads/providers/s3/client')
|
||||
|
||||
const presignedUrls = await getS3MultipartPartUrls(key, uploadId, partNumbers)
|
||||
|
||||
return NextResponse.json({ presignedUrls })
|
||||
}
|
||||
if (storageProvider === 'blob') {
|
||||
const { getMultipartPartUrls } = await import('@/lib/uploads/providers/blob/client')
|
||||
|
||||
const presignedUrls = await getMultipartPartUrls(key, partNumbers, {
|
||||
containerName: config.containerName!,
|
||||
accountName: config.accountName!,
|
||||
accountKey: config.accountKey,
|
||||
connectionString: config.connectionString,
|
||||
})
|
||||
|
||||
return NextResponse.json({ presignedUrls })
|
||||
}
|
||||
|
||||
@@ -132,24 +184,32 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
|
||||
}
|
||||
|
||||
case 'complete': {
|
||||
const data = await request.json()
|
||||
const context: StorageContext = data.context || 'knowledge-base'
|
||||
const data = (await request.json()) as CompleteSingleRequest | CompleteBatchRequest
|
||||
|
||||
const config = getStorageConfig(context)
|
||||
if ('uploads' in data && Array.isArray(data.uploads)) {
|
||||
const verified = data.uploads.map((upload) => {
|
||||
const payload = verifyTokenForUser(upload.uploadToken, userId)
|
||||
return payload ? { payload, parts: upload.parts } : null
|
||||
})
|
||||
|
||||
if (verified.some((entry) => entry === null)) {
|
||||
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
|
||||
}
|
||||
|
||||
const verifiedEntries = verified.filter(
|
||||
(entry): entry is { payload: UploadTokenPayload; parts: unknown } => entry !== null
|
||||
)
|
||||
|
||||
if ('uploads' in data) {
|
||||
const results = await Promise.all(
|
||||
data.uploads.map(async (upload: any) => {
|
||||
const { uploadId, key } = upload
|
||||
verifiedEntries.map(async ({ payload, parts }) => {
|
||||
const { uploadId, key, context } = payload
|
||||
const config = getStorageConfig(context)
|
||||
|
||||
if (storageProvider === 's3') {
|
||||
const { completeS3MultipartUpload } = await import(
|
||||
'@/lib/uploads/providers/s3/client'
|
||||
)
|
||||
const parts = upload.parts // S3 format: { ETag, PartNumber }
|
||||
|
||||
const result = await completeS3MultipartUpload(key, uploadId, parts)
|
||||
|
||||
const result = await completeS3MultipartUpload(key, uploadId, parts as any)
|
||||
return {
|
||||
success: true,
|
||||
location: result.location,
|
||||
@@ -161,15 +221,12 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
|
||||
const { completeMultipartUpload } = await import(
|
||||
'@/lib/uploads/providers/blob/client'
|
||||
)
|
||||
const parts = upload.parts // Azure format: { blockId, partNumber }
|
||||
|
||||
const result = await completeMultipartUpload(key, parts, {
|
||||
const result = await completeMultipartUpload(key, parts as any, {
|
||||
containerName: config.containerName!,
|
||||
accountName: config.accountName!,
|
||||
accountKey: config.accountKey,
|
||||
connectionString: config.connectionString,
|
||||
})
|
||||
|
||||
return {
|
||||
success: true,
|
||||
location: result.location,
|
||||
@@ -182,19 +239,23 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
|
||||
})
|
||||
)
|
||||
|
||||
logger.info(`Completed ${data.uploads.length} multipart uploads (context: ${context})`)
|
||||
logger.info(`Completed ${verifiedEntries.length} multipart uploads`)
|
||||
return NextResponse.json({ results })
|
||||
}
|
||||
|
||||
const { uploadId, key, parts } = data
|
||||
const single = data as CompleteSingleRequest
|
||||
const tokenPayload = verifyTokenForUser(single.uploadToken, userId)
|
||||
if (!tokenPayload) {
|
||||
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
|
||||
}
|
||||
|
||||
const { uploadId, key, context } = tokenPayload
|
||||
const config = getStorageConfig(context)
|
||||
|
||||
if (storageProvider === 's3') {
|
||||
const { completeS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client')
|
||||
|
||||
const result = await completeS3MultipartUpload(key, uploadId, parts)
|
||||
|
||||
const result = await completeS3MultipartUpload(key, uploadId, single.parts as any)
|
||||
logger.info(`Completed S3 multipart upload for key ${key} (context: ${context})`)
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
location: result.location,
|
||||
@@ -204,16 +265,13 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
|
||||
}
|
||||
if (storageProvider === 'blob') {
|
||||
const { completeMultipartUpload } = await import('@/lib/uploads/providers/blob/client')
|
||||
|
||||
const result = await completeMultipartUpload(key, parts, {
|
||||
const result = await completeMultipartUpload(key, single.parts as any, {
|
||||
containerName: config.containerName!,
|
||||
accountName: config.accountName!,
|
||||
accountKey: config.accountKey,
|
||||
connectionString: config.connectionString,
|
||||
})
|
||||
|
||||
logger.info(`Completed Azure multipart upload for key ${key} (context: ${context})`)
|
||||
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
location: result.location,
|
||||
@@ -229,27 +287,27 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
|
||||
}
|
||||
|
||||
case 'abort': {
|
||||
const data = await request.json()
|
||||
const { uploadId, key, context = 'knowledge-base' } = data
|
||||
const data = (await request.json()) as TokenBoundRequest
|
||||
const tokenPayload = verifyTokenForUser(data.uploadToken, userId)
|
||||
if (!tokenPayload) {
|
||||
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
|
||||
}
|
||||
|
||||
const config = getStorageConfig(context as StorageContext)
|
||||
const { uploadId, key, context } = tokenPayload
|
||||
const config = getStorageConfig(context)
|
||||
|
||||
if (storageProvider === 's3') {
|
||||
const { abortS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client')
|
||||
|
||||
await abortS3MultipartUpload(key, uploadId)
|
||||
|
||||
logger.info(`Aborted S3 multipart upload for key ${key} (context: ${context})`)
|
||||
} else if (storageProvider === 'blob') {
|
||||
const { abortMultipartUpload } = await import('@/lib/uploads/providers/blob/client')
|
||||
|
||||
await abortMultipartUpload(key, {
|
||||
containerName: config.containerName!,
|
||||
accountName: config.accountName!,
|
||||
accountKey: config.accountKey,
|
||||
connectionString: config.connectionString,
|
||||
})
|
||||
|
||||
logger.info(`Aborted Azure multipart upload for key ${key} (context: ${context})`)
|
||||
} else {
|
||||
return NextResponse.json(
|
||||
|
||||
@@ -604,6 +604,10 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
|
||||
const startTime = getHighResTime()
|
||||
|
||||
try {
|
||||
if (!options.workspaceId) {
|
||||
throw new Error('workspaceId is required for multipart upload')
|
||||
}
|
||||
|
||||
const initiateResponse = await fetch('/api/files/multipart?action=initiate', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
@@ -611,6 +615,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
|
||||
fileName: file.name,
|
||||
contentType: getFileContentType(file),
|
||||
fileSize: file.size,
|
||||
workspaceId: options.workspaceId,
|
||||
}),
|
||||
})
|
||||
|
||||
@@ -618,7 +623,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
|
||||
throw new Error(`Failed to initiate multipart upload: ${initiateResponse.statusText}`)
|
||||
}
|
||||
|
||||
const { uploadId, key } = await initiateResponse.json()
|
||||
const { uploadId, key, uploadToken } = await initiateResponse.json()
|
||||
logger.info(`Initiated multipart upload with ID: ${uploadId}`)
|
||||
|
||||
const chunkSize = UPLOAD_CONFIG.CHUNK_SIZE
|
||||
@@ -629,8 +634,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
uploadId,
|
||||
key,
|
||||
uploadToken,
|
||||
partNumbers,
|
||||
}),
|
||||
})
|
||||
@@ -639,7 +643,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
|
||||
await fetch('/api/files/multipart?action=abort', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ uploadId, key }),
|
||||
body: JSON.stringify({ uploadToken }),
|
||||
})
|
||||
throw new Error(`Failed to get part URLs: ${partUrlsResponse.statusText}`)
|
||||
}
|
||||
@@ -723,8 +727,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
uploadId,
|
||||
key,
|
||||
uploadToken,
|
||||
parts: uploadedParts,
|
||||
}),
|
||||
})
|
||||
|
||||
@@ -4,169 +4,71 @@ import { createLogger } from '@sim/logger'
|
||||
import { task } from '@trigger.dev/sdk'
|
||||
import { and, inArray, lt } from 'drizzle-orm'
|
||||
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
|
||||
import {
|
||||
batchDeleteByWorkspaceAndTimestamp,
|
||||
chunkedBatchDelete,
|
||||
type TableCleanupResult,
|
||||
} from '@/lib/cleanup/batch-delete'
|
||||
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
|
||||
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
|
||||
import { deleteFileMetadata } from '@/lib/uploads/server/metadata'
|
||||
|
||||
const logger = createLogger('CleanupLogs')
|
||||
|
||||
const BATCH_SIZE = 2000
|
||||
const MAX_BATCHES_PER_TIER = 10
|
||||
|
||||
interface TierResults {
|
||||
total: number
|
||||
deleted: number
|
||||
deleteFailed: number
|
||||
interface FileDeleteStats {
|
||||
filesTotal: number
|
||||
filesDeleted: number
|
||||
filesDeleteFailed: number
|
||||
}
|
||||
|
||||
function emptyTierResults(): TierResults {
|
||||
return {
|
||||
total: 0,
|
||||
deleted: 0,
|
||||
deleteFailed: 0,
|
||||
filesTotal: 0,
|
||||
filesDeleted: 0,
|
||||
filesDeleteFailed: 0,
|
||||
}
|
||||
}
|
||||
|
||||
async function deleteExecutionFiles(files: unknown, results: TierResults): Promise<void> {
|
||||
async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise<void> {
|
||||
if (!isUsingCloudStorage() || !files || !Array.isArray(files)) return
|
||||
|
||||
const keys = files.filter((f) => f && typeof f === 'object' && f.key).map((f) => f.key as string)
|
||||
results.filesTotal += keys.length
|
||||
stats.filesTotal += keys.length
|
||||
|
||||
await Promise.all(
|
||||
keys.map(async (key) => {
|
||||
try {
|
||||
await StorageService.deleteFile({ key, context: 'execution' })
|
||||
await deleteFileMetadata(key)
|
||||
results.filesDeleted++
|
||||
stats.filesDeleted++
|
||||
} catch (fileError) {
|
||||
results.filesDeleteFailed++
|
||||
stats.filesDeleteFailed++
|
||||
logger.error(`Failed to delete file ${key}:`, { fileError })
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
async function cleanupTier(
|
||||
async function cleanupWorkflowExecutionLogs(
|
||||
workspaceIds: string[],
|
||||
retentionDate: Date,
|
||||
label: string
|
||||
): Promise<TierResults> {
|
||||
const results = emptyTierResults()
|
||||
if (workspaceIds.length === 0) return results
|
||||
): Promise<TableCleanupResult & FileDeleteStats> {
|
||||
const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 }
|
||||
|
||||
let batchesProcessed = 0
|
||||
let hasMore = true
|
||||
|
||||
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
|
||||
const batch = await db
|
||||
.select({
|
||||
id: workflowExecutionLogs.id,
|
||||
files: workflowExecutionLogs.files,
|
||||
})
|
||||
.from(workflowExecutionLogs)
|
||||
.where(
|
||||
and(
|
||||
inArray(workflowExecutionLogs.workspaceId, workspaceIds),
|
||||
lt(workflowExecutionLogs.startedAt, retentionDate)
|
||||
const dbStats = await chunkedBatchDelete({
|
||||
tableDef: workflowExecutionLogs,
|
||||
workspaceIds,
|
||||
tableName: `${label}/workflow_execution_logs`,
|
||||
selectChunk: (chunkIds, limit) =>
|
||||
db
|
||||
.select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files })
|
||||
.from(workflowExecutionLogs)
|
||||
.where(
|
||||
and(
|
||||
inArray(workflowExecutionLogs.workspaceId, chunkIds),
|
||||
lt(workflowExecutionLogs.startedAt, retentionDate)
|
||||
)
|
||||
)
|
||||
)
|
||||
.limit(BATCH_SIZE)
|
||||
.limit(limit),
|
||||
onBatch: async (rows) => {
|
||||
for (const row of rows) await deleteExecutionFiles(row.files, fileStats)
|
||||
},
|
||||
})
|
||||
|
||||
results.total += batch.length
|
||||
|
||||
if (batch.length === 0) {
|
||||
hasMore = false
|
||||
break
|
||||
}
|
||||
|
||||
for (const log of batch) {
|
||||
await deleteExecutionFiles(log.files, results)
|
||||
}
|
||||
|
||||
const logIds = batch.map((log) => log.id)
|
||||
try {
|
||||
const deleted = await db
|
||||
.delete(workflowExecutionLogs)
|
||||
.where(inArray(workflowExecutionLogs.id, logIds))
|
||||
.returning({ id: workflowExecutionLogs.id })
|
||||
|
||||
results.deleted += deleted.length
|
||||
} catch (deleteError) {
|
||||
results.deleteFailed += logIds.length
|
||||
logger.error(`Batch delete failed for ${label}:`, { deleteError })
|
||||
}
|
||||
|
||||
batchesProcessed++
|
||||
hasMore = batch.length === BATCH_SIZE
|
||||
|
||||
logger.info(`[${label}] Batch ${batchesProcessed}: ${batch.length} logs processed`)
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
interface JobLogCleanupResults {
|
||||
deleted: number
|
||||
deleteFailed: number
|
||||
}
|
||||
|
||||
async function cleanupJobExecutionLogsTier(
|
||||
workspaceIds: string[],
|
||||
retentionDate: Date,
|
||||
label: string
|
||||
): Promise<JobLogCleanupResults> {
|
||||
const results: JobLogCleanupResults = { deleted: 0, deleteFailed: 0 }
|
||||
if (workspaceIds.length === 0) return results
|
||||
|
||||
let batchesProcessed = 0
|
||||
let hasMore = true
|
||||
|
||||
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
|
||||
const batch = await db
|
||||
.select({ id: jobExecutionLogs.id })
|
||||
.from(jobExecutionLogs)
|
||||
.where(
|
||||
and(
|
||||
inArray(jobExecutionLogs.workspaceId, workspaceIds),
|
||||
lt(jobExecutionLogs.startedAt, retentionDate)
|
||||
)
|
||||
)
|
||||
.limit(BATCH_SIZE)
|
||||
|
||||
if (batch.length === 0) {
|
||||
hasMore = false
|
||||
break
|
||||
}
|
||||
|
||||
const logIds = batch.map((log) => log.id)
|
||||
try {
|
||||
const deleted = await db
|
||||
.delete(jobExecutionLogs)
|
||||
.where(inArray(jobExecutionLogs.id, logIds))
|
||||
.returning({ id: jobExecutionLogs.id })
|
||||
|
||||
results.deleted += deleted.length
|
||||
} catch (deleteError) {
|
||||
results.deleteFailed += logIds.length
|
||||
logger.error(`Batch delete failed for ${label} (job_execution_logs):`, { deleteError })
|
||||
}
|
||||
|
||||
batchesProcessed++
|
||||
hasMore = batch.length === BATCH_SIZE
|
||||
|
||||
logger.info(
|
||||
`[${label}] job_execution_logs batch ${batchesProcessed}: ${batch.length} rows processed`
|
||||
)
|
||||
}
|
||||
|
||||
return results
|
||||
return { ...dbStats, ...fileStats }
|
||||
}
|
||||
|
||||
export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void> {
|
||||
@@ -190,15 +92,19 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
|
||||
`[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
|
||||
)
|
||||
|
||||
const results = await cleanupTier(workspaceIds, retentionDate, label)
|
||||
const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label)
|
||||
logger.info(
|
||||
`[${label}] workflow_execution_logs: ${results.deleted} deleted, ${results.deleteFailed} failed out of ${results.total} candidates`
|
||||
`[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed`
|
||||
)
|
||||
|
||||
const jobLogResults = await cleanupJobExecutionLogsTier(workspaceIds, retentionDate, label)
|
||||
logger.info(
|
||||
`[${label}] job_execution_logs: ${jobLogResults.deleted} deleted, ${jobLogResults.deleteFailed} failed`
|
||||
)
|
||||
await batchDeleteByWorkspaceAndTimestamp({
|
||||
tableDef: jobExecutionLogs,
|
||||
workspaceIdCol: jobExecutionLogs.workspaceId,
|
||||
timestampCol: jobExecutionLogs.startedAt,
|
||||
workspaceIds,
|
||||
retentionDate,
|
||||
tableName: `${label}/job_execution_logs`,
|
||||
})
|
||||
|
||||
// Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces.
|
||||
if (payload.plan === 'free') {
|
||||
|
||||
@@ -18,9 +18,8 @@ import { and, inArray, isNotNull, lt } from 'drizzle-orm'
|
||||
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
|
||||
import {
|
||||
batchDeleteByWorkspaceAndTimestamp,
|
||||
DEFAULT_BATCH_SIZE,
|
||||
DEFAULT_MAX_BATCHES_PER_TABLE,
|
||||
deleteRowsById,
|
||||
selectRowsByIdChunks,
|
||||
} from '@/lib/cleanup/batch-delete'
|
||||
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
|
||||
import type { StorageContext } from '@/lib/uploads'
|
||||
@@ -44,35 +43,37 @@ async function selectExpiredWorkspaceFiles(
|
||||
workspaceIds: string[],
|
||||
retentionDate: Date
|
||||
): Promise<WorkspaceFileScope> {
|
||||
const limit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE
|
||||
|
||||
const [legacyRows, multiContextRows] = await Promise.all([
|
||||
db
|
||||
.select({ id: workspaceFile.id, key: workspaceFile.key })
|
||||
.from(workspaceFile)
|
||||
.where(
|
||||
and(
|
||||
inArray(workspaceFile.workspaceId, workspaceIds),
|
||||
isNotNull(workspaceFile.deletedAt),
|
||||
lt(workspaceFile.deletedAt, retentionDate)
|
||||
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
|
||||
db
|
||||
.select({ id: workspaceFile.id, key: workspaceFile.key })
|
||||
.from(workspaceFile)
|
||||
.where(
|
||||
and(
|
||||
inArray(workspaceFile.workspaceId, chunkIds),
|
||||
isNotNull(workspaceFile.deletedAt),
|
||||
lt(workspaceFile.deletedAt, retentionDate)
|
||||
)
|
||||
)
|
||||
)
|
||||
.limit(limit),
|
||||
db
|
||||
.select({
|
||||
id: workspaceFiles.id,
|
||||
key: workspaceFiles.key,
|
||||
context: workspaceFiles.context,
|
||||
})
|
||||
.from(workspaceFiles)
|
||||
.where(
|
||||
and(
|
||||
inArray(workspaceFiles.workspaceId, workspaceIds),
|
||||
isNotNull(workspaceFiles.deletedAt),
|
||||
lt(workspaceFiles.deletedAt, retentionDate)
|
||||
.limit(chunkLimit)
|
||||
),
|
||||
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
|
||||
db
|
||||
.select({
|
||||
id: workspaceFiles.id,
|
||||
key: workspaceFiles.key,
|
||||
context: workspaceFiles.context,
|
||||
})
|
||||
.from(workspaceFiles)
|
||||
.where(
|
||||
and(
|
||||
inArray(workspaceFiles.workspaceId, chunkIds),
|
||||
isNotNull(workspaceFiles.deletedAt),
|
||||
lt(workspaceFiles.deletedAt, retentionDate)
|
||||
)
|
||||
)
|
||||
)
|
||||
.limit(limit),
|
||||
.limit(chunkLimit)
|
||||
),
|
||||
])
|
||||
|
||||
return {
|
||||
@@ -182,17 +183,19 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
|
||||
// (chats + S3) AND the DB deletes below — selecting twice could return
|
||||
// different subsets above the LIMIT cap and orphan or prematurely purge data.
|
||||
const [doomedWorkflows, fileScope] = await Promise.all([
|
||||
db
|
||||
.select({ id: workflow.id })
|
||||
.from(workflow)
|
||||
.where(
|
||||
and(
|
||||
inArray(workflow.workspaceId, workspaceIds),
|
||||
isNotNull(workflow.archivedAt),
|
||||
lt(workflow.archivedAt, retentionDate)
|
||||
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
|
||||
db
|
||||
.select({ id: workflow.id })
|
||||
.from(workflow)
|
||||
.where(
|
||||
and(
|
||||
inArray(workflow.workspaceId, chunkIds),
|
||||
isNotNull(workflow.archivedAt),
|
||||
lt(workflow.archivedAt, retentionDate)
|
||||
)
|
||||
)
|
||||
)
|
||||
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE),
|
||||
.limit(chunkLimit)
|
||||
),
|
||||
selectExpiredWorkspaceFiles(workspaceIds, retentionDate),
|
||||
])
|
||||
|
||||
@@ -200,11 +203,13 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
|
||||
let chatCleanup: { execute: () => Promise<void> } | null = null
|
||||
|
||||
if (doomedWorkflowIds.length > 0) {
|
||||
const doomedChats = await db
|
||||
.select({ id: copilotChats.id })
|
||||
.from(copilotChats)
|
||||
.where(inArray(copilotChats.workflowId, doomedWorkflowIds))
|
||||
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
|
||||
const doomedChats = await selectRowsByIdChunks(doomedWorkflowIds, (chunkIds, chunkLimit) =>
|
||||
db
|
||||
.select({ id: copilotChats.id })
|
||||
.from(copilotChats)
|
||||
.where(inArray(copilotChats.workflowId, chunkIds))
|
||||
.limit(chunkLimit)
|
||||
)
|
||||
|
||||
const doomedChatIds = doomedChats.map((c) => c.id)
|
||||
if (doomedChatIds.length > 0) {
|
||||
|
||||
@@ -13,9 +13,8 @@ import { and, inArray, lt, sql } from 'drizzle-orm'
|
||||
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
|
||||
import {
|
||||
batchDeleteByWorkspaceAndTimestamp,
|
||||
DEFAULT_BATCH_SIZE,
|
||||
DEFAULT_MAX_BATCHES_PER_TABLE,
|
||||
deleteRowsById,
|
||||
selectRowsByIdChunks,
|
||||
type TableCleanupResult,
|
||||
} from '@/lib/cleanup/batch-delete'
|
||||
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
|
||||
@@ -67,13 +66,15 @@ async function cleanupRunChildren(
|
||||
): Promise<TableCleanupResult[]> {
|
||||
if (workspaceIds.length === 0) return []
|
||||
|
||||
const runIds = await db
|
||||
.select({ id: copilotRuns.id })
|
||||
.from(copilotRuns)
|
||||
.where(
|
||||
and(inArray(copilotRuns.workspaceId, workspaceIds), lt(copilotRuns.updatedAt, retentionDate))
|
||||
)
|
||||
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
|
||||
const runIds = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
|
||||
db
|
||||
.select({ id: copilotRuns.id })
|
||||
.from(copilotRuns)
|
||||
.where(
|
||||
and(inArray(copilotRuns.workspaceId, chunkIds), lt(copilotRuns.updatedAt, retentionDate))
|
||||
)
|
||||
.limit(chunkLimit)
|
||||
)
|
||||
|
||||
if (runIds.length === 0) {
|
||||
return RUN_CHILD_TABLES.map((t) => ({ table: `${label}/${t.name}`, deleted: 0, failed: 0 }))
|
||||
@@ -107,17 +108,15 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void>
|
||||
`[${label}] Processing ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
|
||||
)
|
||||
|
||||
// Collect chat IDs before deleting so we can clean up the copilot backend after
|
||||
const doomedChats = await db
|
||||
.select({ id: copilotChats.id })
|
||||
.from(copilotChats)
|
||||
.where(
|
||||
and(
|
||||
inArray(copilotChats.workspaceId, workspaceIds),
|
||||
lt(copilotChats.updatedAt, retentionDate)
|
||||
const doomedChats = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
|
||||
db
|
||||
.select({ id: copilotChats.id })
|
||||
.from(copilotChats)
|
||||
.where(
|
||||
and(inArray(copilotChats.workspaceId, chunkIds), lt(copilotChats.updatedAt, retentionDate))
|
||||
)
|
||||
)
|
||||
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
|
||||
.limit(chunkLimit)
|
||||
)
|
||||
|
||||
const doomedChatIds = doomedChats.map((c) => c.id)
|
||||
|
||||
|
||||
@@ -46,6 +46,10 @@ export const SlackBlock: BlockConfig<SlackResponse> = {
|
||||
{ label: 'Get User Presence', id: 'get_user_presence' },
|
||||
{ label: 'Edit Canvas', id: 'edit_canvas' },
|
||||
{ label: 'Create Channel Canvas', id: 'create_channel_canvas' },
|
||||
{ label: 'Get Canvas Info', id: 'get_canvas' },
|
||||
{ label: 'List Canvases', id: 'list_canvases' },
|
||||
{ label: 'Lookup Canvas Sections', id: 'lookup_canvas_sections' },
|
||||
{ label: 'Delete Canvas', id: 'delete_canvas' },
|
||||
{ label: 'Create Conversation', id: 'create_conversation' },
|
||||
{ label: 'Invite to Conversation', id: 'invite_to_conversation' },
|
||||
{ label: 'Open View', id: 'open_view' },
|
||||
@@ -146,6 +150,9 @@ export const SlackBlock: BlockConfig<SlackResponse> = {
|
||||
'get_user',
|
||||
'get_user_presence',
|
||||
'edit_canvas',
|
||||
'get_canvas',
|
||||
'lookup_canvas_sections',
|
||||
'delete_canvas',
|
||||
'create_conversation',
|
||||
'open_view',
|
||||
'update_view',
|
||||
@@ -160,7 +167,11 @@ export const SlackBlock: BlockConfig<SlackResponse> = {
|
||||
},
|
||||
}
|
||||
},
|
||||
required: true,
|
||||
required: {
|
||||
field: 'operation',
|
||||
value: 'list_canvases',
|
||||
not: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
id: 'manualChannel',
|
||||
@@ -182,6 +193,9 @@ export const SlackBlock: BlockConfig<SlackResponse> = {
|
||||
'get_user',
|
||||
'get_user_presence',
|
||||
'edit_canvas',
|
||||
'get_canvas',
|
||||
'lookup_canvas_sections',
|
||||
'delete_canvas',
|
||||
'create_conversation',
|
||||
'open_view',
|
||||
'update_view',
|
||||
@@ -196,7 +210,11 @@ export const SlackBlock: BlockConfig<SlackResponse> = {
|
||||
},
|
||||
}
|
||||
},
|
||||
required: true,
|
||||
required: {
|
||||
field: 'operation',
|
||||
value: 'list_canvases',
|
||||
not: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
id: 'dmUserId',
|
||||
@@ -820,6 +838,121 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
|
||||
value: 'create_channel_canvas',
|
||||
},
|
||||
},
|
||||
// Get Canvas specific fields
|
||||
{
|
||||
id: 'getCanvasId',
|
||||
title: 'Canvas ID',
|
||||
type: 'short-input',
|
||||
placeholder: 'Enter canvas ID (e.g., F1234ABCD)',
|
||||
condition: {
|
||||
field: 'operation',
|
||||
value: 'get_canvas',
|
||||
},
|
||||
required: true,
|
||||
},
|
||||
// List Canvases specific fields
|
||||
{
|
||||
id: 'canvasListCount',
|
||||
title: 'Canvas Limit',
|
||||
type: 'short-input',
|
||||
placeholder: '100',
|
||||
condition: {
|
||||
field: 'operation',
|
||||
value: 'list_canvases',
|
||||
},
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'canvasListPage',
|
||||
title: 'Page',
|
||||
type: 'short-input',
|
||||
placeholder: '1',
|
||||
condition: {
|
||||
field: 'operation',
|
||||
value: 'list_canvases',
|
||||
},
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'canvasListUser',
|
||||
title: 'User ID',
|
||||
type: 'short-input',
|
||||
placeholder: 'Optional creator filter (e.g., U1234567890)',
|
||||
condition: {
|
||||
field: 'operation',
|
||||
value: 'list_canvases',
|
||||
},
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'canvasListTsFrom',
|
||||
title: 'Created After',
|
||||
type: 'short-input',
|
||||
placeholder: 'Unix timestamp (e.g., 123456789)',
|
||||
condition: {
|
||||
field: 'operation',
|
||||
value: 'list_canvases',
|
||||
},
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'canvasListTsTo',
|
||||
title: 'Created Before',
|
||||
type: 'short-input',
|
||||
placeholder: 'Unix timestamp (e.g., 123456789)',
|
||||
condition: {
|
||||
field: 'operation',
|
||||
value: 'list_canvases',
|
||||
},
|
||||
mode: 'advanced',
|
||||
},
|
||||
{
|
||||
id: 'canvasListTeamId',
|
||||
title: 'Team ID',
|
||||
type: 'short-input',
|
||||
placeholder: 'Encoded team ID (org tokens only)',
|
||||
condition: {
|
||||
field: 'operation',
|
||||
value: 'list_canvases',
|
||||
},
|
||||
mode: 'advanced',
|
||||
},
|
||||
// Lookup Canvas Sections specific fields
|
||||
{
|
||||
id: 'lookupCanvasId',
|
||||
title: 'Canvas ID',
|
||||
type: 'short-input',
|
||||
placeholder: 'Enter canvas ID (e.g., F1234ABCD)',
|
||||
condition: {
|
||||
field: 'operation',
|
||||
value: 'lookup_canvas_sections',
|
||||
},
|
||||
required: true,
|
||||
},
|
||||
{
|
||||
id: 'sectionCriteria',
|
||||
title: 'Section Criteria',
|
||||
type: 'code',
|
||||
language: 'json',
|
||||
placeholder: '{"section_types":["h1"],"contains_text":"Roadmap"}',
|
||||
condition: {
|
||||
field: 'operation',
|
||||
value: 'lookup_canvas_sections',
|
||||
},
|
||||
required: true,
|
||||
},
|
||||
// Delete Canvas specific fields
|
||||
{
|
||||
id: 'deleteCanvasId',
|
||||
title: 'Canvas ID',
|
||||
type: 'short-input',
|
||||
placeholder: 'Enter canvas ID (e.g., F1234ABCD)',
|
||||
condition: {
|
||||
field: 'operation',
|
||||
value: 'delete_canvas',
|
||||
},
|
||||
required: true,
|
||||
},
|
||||
// Create Conversation specific fields
|
||||
{
|
||||
id: 'conversationName',
|
||||
@@ -1058,6 +1191,10 @@ Do not include any explanations, markdown formatting, or other text outside the
|
||||
'slack_get_user_presence',
|
||||
'slack_edit_canvas',
|
||||
'slack_create_channel_canvas',
|
||||
'slack_get_canvas',
|
||||
'slack_list_canvases',
|
||||
'slack_lookup_canvas_sections',
|
||||
'slack_delete_canvas',
|
||||
'slack_create_conversation',
|
||||
'slack_invite_to_conversation',
|
||||
'slack_open_view',
|
||||
@@ -1106,6 +1243,14 @@ Do not include any explanations, markdown formatting, or other text outside the
|
||||
return 'slack_edit_canvas'
|
||||
case 'create_channel_canvas':
|
||||
return 'slack_create_channel_canvas'
|
||||
case 'get_canvas':
|
||||
return 'slack_get_canvas'
|
||||
case 'list_canvases':
|
||||
return 'slack_list_canvases'
|
||||
case 'lookup_canvas_sections':
|
||||
return 'slack_lookup_canvas_sections'
|
||||
case 'delete_canvas':
|
||||
return 'slack_delete_canvas'
|
||||
case 'create_conversation':
|
||||
return 'slack_create_conversation'
|
||||
case 'invite_to_conversation':
|
||||
@@ -1164,6 +1309,16 @@ Do not include any explanations, markdown formatting, or other text outside the
|
||||
canvasTitle,
|
||||
channelCanvasTitle,
|
||||
channelCanvasContent,
|
||||
getCanvasId,
|
||||
canvasListCount,
|
||||
canvasListPage,
|
||||
canvasListUser,
|
||||
canvasListTsFrom,
|
||||
canvasListTsTo,
|
||||
canvasListTeamId,
|
||||
lookupCanvasId,
|
||||
sectionCriteria,
|
||||
deleteCanvasId,
|
||||
conversationName,
|
||||
isPrivate,
|
||||
teamId,
|
||||
@@ -1343,6 +1498,46 @@ Do not include any explanations, markdown formatting, or other text outside the
|
||||
}
|
||||
break
|
||||
|
||||
case 'get_canvas':
|
||||
baseParams.canvasId = getCanvasId
|
||||
break
|
||||
|
||||
case 'list_canvases':
|
||||
if (canvasListCount) {
|
||||
const parsedCount = Number.parseInt(canvasListCount, 10)
|
||||
if (!Number.isNaN(parsedCount) && parsedCount > 0) {
|
||||
baseParams.count = parsedCount
|
||||
}
|
||||
}
|
||||
if (canvasListPage) {
|
||||
const parsedPage = Number.parseInt(canvasListPage, 10)
|
||||
if (!Number.isNaN(parsedPage) && parsedPage > 0) {
|
||||
baseParams.page = parsedPage
|
||||
}
|
||||
}
|
||||
if (canvasListUser) {
|
||||
baseParams.user = String(canvasListUser).trim()
|
||||
}
|
||||
if (canvasListTsFrom) {
|
||||
baseParams.tsFrom = String(canvasListTsFrom).trim()
|
||||
}
|
||||
if (canvasListTsTo) {
|
||||
baseParams.tsTo = String(canvasListTsTo).trim()
|
||||
}
|
||||
if (canvasListTeamId) {
|
||||
baseParams.teamId = String(canvasListTeamId).trim()
|
||||
}
|
||||
break
|
||||
|
||||
case 'lookup_canvas_sections':
|
||||
baseParams.canvasId = lookupCanvasId
|
||||
baseParams.criteria = sectionCriteria
|
||||
break
|
||||
|
||||
case 'delete_canvas':
|
||||
baseParams.canvasId = deleteCanvasId
|
||||
break
|
||||
|
||||
case 'create_conversation':
|
||||
baseParams.name = conversationName
|
||||
baseParams.isPrivate = isPrivate === 'true'
|
||||
@@ -1461,6 +1656,23 @@ Do not include any explanations, markdown formatting, or other text outside the
|
||||
// Create Channel Canvas inputs
|
||||
channelCanvasTitle: { type: 'string', description: 'Title for channel canvas' },
|
||||
channelCanvasContent: { type: 'string', description: 'Content for channel canvas' },
|
||||
// Canvas management inputs
|
||||
getCanvasId: { type: 'string', description: 'Canvas ID to retrieve' },
|
||||
canvasListCount: { type: 'string', description: 'Maximum number of canvases to return' },
|
||||
canvasListPage: { type: 'string', description: 'Canvas list page number' },
|
||||
canvasListUser: { type: 'string', description: 'Optional canvas creator user filter' },
|
||||
canvasListTsFrom: {
|
||||
type: 'string',
|
||||
description: 'Filter canvases created after this timestamp',
|
||||
},
|
||||
canvasListTsTo: {
|
||||
type: 'string',
|
||||
description: 'Filter canvases created before this timestamp',
|
||||
},
|
||||
canvasListTeamId: { type: 'string', description: 'Encoded team ID for org tokens' },
|
||||
lookupCanvasId: { type: 'string', description: 'Canvas ID to search for sections' },
|
||||
sectionCriteria: { type: 'json', description: 'Canvas section lookup criteria' },
|
||||
deleteCanvasId: { type: 'string', description: 'Canvas ID to delete' },
|
||||
// Create Conversation inputs
|
||||
conversationName: { type: 'string', description: 'Name for the new channel' },
|
||||
isPrivate: { type: 'string', description: 'Create as private channel (true/false)' },
|
||||
@@ -1511,6 +1723,26 @@ Do not include any explanations, markdown formatting, or other text outside the
|
||||
// slack_canvas outputs
|
||||
canvas_id: { type: 'string', description: 'Canvas identifier for created canvases' },
|
||||
title: { type: 'string', description: 'Canvas title' },
|
||||
canvas: {
|
||||
type: 'json',
|
||||
description: 'Canvas file metadata returned by Slack',
|
||||
},
|
||||
canvases: {
|
||||
type: 'json',
|
||||
description: 'Array of canvas file objects returned by Slack',
|
||||
},
|
||||
paging: {
|
||||
type: 'json',
|
||||
description: 'Pagination information for listed canvases',
|
||||
},
|
||||
sections: {
|
||||
type: 'json',
|
||||
description: 'Canvas section IDs returned by Slack section lookup',
|
||||
},
|
||||
ok: {
|
||||
type: 'boolean',
|
||||
description: 'Whether Slack completed the canvas operation successfully',
|
||||
},
|
||||
|
||||
// slack_message_reader outputs (read operation)
|
||||
messages: {
|
||||
|
||||
@@ -34,6 +34,7 @@ import {
|
||||
type ExecutionContext,
|
||||
getNextExecutionOrder,
|
||||
type NormalizedBlockOutput,
|
||||
type StreamingExecution,
|
||||
} from '@/executor/types'
|
||||
import { streamingResponseFormatProcessor } from '@/executor/utils'
|
||||
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
|
||||
@@ -140,7 +141,7 @@ export class BlockExecutor {
|
||||
|
||||
let normalizedOutput: NormalizedBlockOutput
|
||||
if (isStreamingExecution) {
|
||||
const streamingExec = output as { stream: ReadableStream; execution: any }
|
||||
const streamingExec = output as StreamingExecution
|
||||
|
||||
if (ctx.onStream) {
|
||||
await this.handleStreamingExecution(
|
||||
@@ -602,7 +603,7 @@ export class BlockExecutor {
|
||||
ctx: ExecutionContext,
|
||||
node: DAGNode,
|
||||
block: SerializedBlock,
|
||||
streamingExec: { stream: ReadableStream; execution: any },
|
||||
streamingExec: StreamingExecution,
|
||||
resolvedInputs: Record<string, any>,
|
||||
selectedOutputs: string[]
|
||||
): Promise<void> {
|
||||
@@ -613,56 +614,39 @@ export class BlockExecutor {
|
||||
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
|
||||
(block.config as Record<string, any> | undefined)?.responseFormat
|
||||
|
||||
const stream = streamingExec.stream
|
||||
if (typeof stream.tee !== 'function') {
|
||||
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
|
||||
return
|
||||
}
|
||||
const sourceReader = streamingExec.stream.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
const accumulated: string[] = []
|
||||
let drainError: unknown
|
||||
let sourceFullyDrained = false
|
||||
|
||||
const [clientStream, executorStream] = stream.tee()
|
||||
const clientSource = new ReadableStream<Uint8Array>({
|
||||
async pull(controller) {
|
||||
try {
|
||||
const { done, value } = await sourceReader.read()
|
||||
if (done) {
|
||||
const tail = decoder.decode()
|
||||
if (tail) accumulated.push(tail)
|
||||
sourceFullyDrained = true
|
||||
controller.close()
|
||||
return
|
||||
}
|
||||
accumulated.push(decoder.decode(value, { stream: true }))
|
||||
controller.enqueue(value)
|
||||
} catch (error) {
|
||||
drainError = error
|
||||
controller.error(error)
|
||||
}
|
||||
},
|
||||
async cancel(reason) {
|
||||
try {
|
||||
await sourceReader.cancel(reason)
|
||||
} catch {}
|
||||
},
|
||||
})
|
||||
|
||||
const processedClientStream = streamingResponseFormatProcessor.processStream(
|
||||
clientStream,
|
||||
blockId,
|
||||
selectedOutputs,
|
||||
responseFormat
|
||||
)
|
||||
|
||||
const clientStreamingExec = {
|
||||
...streamingExec,
|
||||
stream: processedClientStream,
|
||||
}
|
||||
|
||||
const executorConsumption = this.consumeExecutorStream(
|
||||
executorStream,
|
||||
streamingExec,
|
||||
blockId,
|
||||
responseFormat
|
||||
)
|
||||
|
||||
const clientConsumption = (async () => {
|
||||
try {
|
||||
await ctx.onStream?.(clientStreamingExec)
|
||||
} catch (error) {
|
||||
this.execLogger.error('Error in onStream callback', { blockId, error })
|
||||
// Cancel the client stream to release the tee'd buffer
|
||||
await processedClientStream.cancel().catch(() => {})
|
||||
}
|
||||
})()
|
||||
|
||||
await Promise.all([clientConsumption, executorConsumption])
|
||||
}
|
||||
|
||||
private async forwardStream(
|
||||
ctx: ExecutionContext,
|
||||
blockId: string,
|
||||
streamingExec: { stream: ReadableStream; execution: any },
|
||||
stream: ReadableStream,
|
||||
responseFormat: any,
|
||||
selectedOutputs: string[]
|
||||
): Promise<void> {
|
||||
const processedStream = streamingResponseFormatProcessor.processStream(
|
||||
stream,
|
||||
clientSource,
|
||||
blockId,
|
||||
selectedOutputs,
|
||||
responseFormat
|
||||
@@ -670,72 +654,75 @@ export class BlockExecutor {
|
||||
|
||||
try {
|
||||
await ctx.onStream?.({
|
||||
...streamingExec,
|
||||
stream: processedStream,
|
||||
stream: processedClientStream,
|
||||
execution: streamingExec.execution,
|
||||
})
|
||||
} catch (error) {
|
||||
this.execLogger.error('Error in onStream callback', { blockId, error })
|
||||
await processedStream.cancel().catch(() => {})
|
||||
}
|
||||
}
|
||||
|
||||
private async consumeExecutorStream(
|
||||
stream: ReadableStream,
|
||||
streamingExec: { execution: any },
|
||||
blockId: string,
|
||||
responseFormat: any
|
||||
): Promise<void> {
|
||||
const reader = stream.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
const chunks: string[] = []
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
chunks.push(decoder.decode(value, { stream: true }))
|
||||
}
|
||||
const tail = decoder.decode()
|
||||
if (tail) chunks.push(tail)
|
||||
} catch (error) {
|
||||
this.execLogger.error('Error reading executor stream for block', { blockId, error })
|
||||
await processedClientStream.cancel().catch(() => {})
|
||||
} finally {
|
||||
try {
|
||||
await reader.cancel().catch(() => {})
|
||||
sourceReader.releaseLock()
|
||||
} catch {}
|
||||
}
|
||||
|
||||
const fullContent = chunks.join('')
|
||||
if (drainError) {
|
||||
this.execLogger.error('Error reading stream for block', { blockId, error: drainError })
|
||||
return
|
||||
}
|
||||
|
||||
// If the onStream consumer exited before the source drained (e.g. it caught
|
||||
// an internal error and returned normally), `accumulated` holds a truncated
|
||||
// response. Persisting that to memory or setting it as the block output
|
||||
// would corrupt downstream state — skip and log instead.
|
||||
if (!sourceFullyDrained) {
|
||||
this.execLogger.warn(
|
||||
'Stream consumer exited before source drained; skipping content persistence',
|
||||
{
|
||||
blockId,
|
||||
}
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
const fullContent = accumulated.join('')
|
||||
if (!fullContent) {
|
||||
return
|
||||
}
|
||||
|
||||
const executionOutput = streamingExec.execution?.output
|
||||
if (!executionOutput || typeof executionOutput !== 'object') {
|
||||
return
|
||||
}
|
||||
|
||||
if (responseFormat) {
|
||||
try {
|
||||
const parsed = JSON.parse(fullContent.trim())
|
||||
|
||||
streamingExec.execution.output = {
|
||||
...parsed,
|
||||
tokens: executionOutput.tokens,
|
||||
toolCalls: executionOutput.toolCalls,
|
||||
providerTiming: executionOutput.providerTiming,
|
||||
cost: executionOutput.cost,
|
||||
model: executionOutput.model,
|
||||
if (executionOutput && typeof executionOutput === 'object') {
|
||||
let parsedForFormat = false
|
||||
if (responseFormat) {
|
||||
try {
|
||||
const parsed = JSON.parse(fullContent.trim())
|
||||
streamingExec.execution.output = {
|
||||
...parsed,
|
||||
tokens: executionOutput.tokens,
|
||||
toolCalls: executionOutput.toolCalls,
|
||||
providerTiming: executionOutput.providerTiming,
|
||||
cost: executionOutput.cost,
|
||||
model: executionOutput.model,
|
||||
}
|
||||
parsedForFormat = true
|
||||
} catch (error) {
|
||||
this.execLogger.warn('Failed to parse streamed content for response format', {
|
||||
blockId,
|
||||
error,
|
||||
})
|
||||
}
|
||||
return
|
||||
} catch (error) {
|
||||
this.execLogger.warn('Failed to parse streamed content for response format', {
|
||||
blockId,
|
||||
error,
|
||||
})
|
||||
}
|
||||
if (!parsedForFormat) {
|
||||
executionOutput.content = fullContent
|
||||
}
|
||||
}
|
||||
|
||||
executionOutput.content = fullContent
|
||||
if (streamingExec.onFullContent) {
|
||||
try {
|
||||
await streamingExec.onFullContent(fullContent)
|
||||
} catch (error) {
|
||||
this.execLogger.error('onFullContent callback failed', { blockId, error })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -958,8 +958,16 @@ export class AgentBlockHandler implements BlockHandler {
|
||||
streamingExec: StreamingExecution
|
||||
): StreamingExecution {
|
||||
return {
|
||||
stream: memoryService.wrapStreamForPersistence(streamingExec.stream, ctx, inputs),
|
||||
stream: streamingExec.stream,
|
||||
execution: streamingExec.execution,
|
||||
onFullContent: async (content: string) => {
|
||||
if (!content.trim()) return
|
||||
try {
|
||||
await memoryService.appendToMemory(ctx, inputs, { role: 'assistant', content })
|
||||
} catch (error) {
|
||||
logger.error('Failed to persist streaming response:', error)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -111,35 +111,6 @@ export class Memory {
|
||||
})
|
||||
}
|
||||
|
||||
wrapStreamForPersistence(
|
||||
stream: ReadableStream<Uint8Array>,
|
||||
ctx: ExecutionContext,
|
||||
inputs: AgentInputs
|
||||
): ReadableStream<Uint8Array> {
|
||||
const chunks: string[] = []
|
||||
const decoder = new TextDecoder()
|
||||
|
||||
const transformStream = new TransformStream<Uint8Array, Uint8Array>({
|
||||
transform: (chunk, controller) => {
|
||||
controller.enqueue(chunk)
|
||||
const decoded = decoder.decode(chunk, { stream: true })
|
||||
chunks.push(decoded)
|
||||
},
|
||||
|
||||
flush: () => {
|
||||
const content = chunks.join('')
|
||||
if (content.trim()) {
|
||||
this.appendToMemory(ctx, inputs, {
|
||||
role: 'assistant',
|
||||
content,
|
||||
}).catch((error) => logger.error('Failed to persist streaming response:', error))
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
return stream.pipeThrough(transformStream)
|
||||
}
|
||||
|
||||
private requireWorkspaceId(ctx: ExecutionContext): string {
|
||||
if (!ctx.workspaceId) {
|
||||
throw new Error('workspaceId is required for memory operations')
|
||||
|
||||
@@ -359,6 +359,12 @@ export interface ExecutionResult {
|
||||
export interface StreamingExecution {
|
||||
stream: ReadableStream
|
||||
execution: ExecutionResult & { isStreaming?: boolean }
|
||||
/**
|
||||
* Invoked with the assembled response text after the stream drains. Lets agent
|
||||
* blocks persist the full response without interposing a TransformStream on a
|
||||
* fetch-backed source — that pattern amplifies memory on Bun via #28035.
|
||||
*/
|
||||
onFullContent?: (content: string) => void | Promise<void>
|
||||
}
|
||||
|
||||
export interface BlockExecutor {
|
||||
|
||||
@@ -7,6 +7,55 @@ const logger = createLogger('BatchDelete')
|
||||
|
||||
export const DEFAULT_BATCH_SIZE = 2000
|
||||
export const DEFAULT_MAX_BATCHES_PER_TABLE = 10
|
||||
/**
|
||||
* Split workspaceIds into this-sized groups before running SELECT/DELETE. Large
|
||||
* IN lists combined with `started_at < X` force Postgres to probe every
|
||||
* workspace range in the composite index, which blows the 90s statement timeout
|
||||
* at the scale of the full free tier.
|
||||
*/
|
||||
export const DEFAULT_WORKSPACE_CHUNK_SIZE = 50
|
||||
|
||||
export function chunkArray<T>(arr: T[], size: number): T[][] {
|
||||
const out: T[][] = []
|
||||
for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size))
|
||||
return out
|
||||
}
|
||||
|
||||
export interface SelectByIdChunksOptions {
|
||||
/** Cap on rows returned across all chunks. Defaults to a full per-table cleanup budget. */
|
||||
overallLimit?: number
|
||||
chunkSize?: number
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a SELECT query once per ID chunk and concatenate results up to
|
||||
* `overallLimit`. Each chunk's query is passed the remaining row budget so the
|
||||
* total never exceeds the cap. Use this when you need the selected row set
|
||||
* (e.g. to drive S3 or copilot-backend cleanup alongside the DB delete).
|
||||
*
|
||||
* Works for any large ID set — workspace IDs, workflow IDs, etc. Avoids
|
||||
* sending one massive `IN (...)` list that would blow Postgres's statement
|
||||
* timeout.
|
||||
*/
|
||||
export async function selectRowsByIdChunks<T>(
|
||||
ids: string[],
|
||||
query: (chunkIds: string[], chunkLimit: number) => Promise<T[]>,
|
||||
{
|
||||
overallLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE,
|
||||
chunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE,
|
||||
}: SelectByIdChunksOptions = {}
|
||||
): Promise<T[]> {
|
||||
if (ids.length === 0) return []
|
||||
|
||||
const rows: T[] = []
|
||||
for (const chunkIds of chunkArray(ids, chunkSize)) {
|
||||
if (rows.length >= overallLimit) break
|
||||
const remaining = overallLimit - rows.length
|
||||
const chunkRows = await query(chunkIds, remaining)
|
||||
rows.push(...chunkRows)
|
||||
}
|
||||
return rows
|
||||
}
|
||||
|
||||
export interface TableCleanupResult {
|
||||
table: string
|
||||
@@ -14,6 +63,111 @@ export interface TableCleanupResult {
|
||||
failed: number
|
||||
}
|
||||
|
||||
export interface ChunkedBatchDeleteOptions<TRow extends { id: string }> {
|
||||
tableDef: PgTable
|
||||
workspaceIds: string[]
|
||||
tableName: string
|
||||
/** SELECT eligible rows for one workspace chunk. The result must include `id`. */
|
||||
selectChunk: (chunkIds: string[], limit: number) => Promise<TRow[]>
|
||||
/** Runs between SELECT and DELETE; receives the just-selected rows. */
|
||||
onBatch?: (rows: TRow[]) => Promise<void>
|
||||
batchSize?: number
|
||||
/** Max batches per workspace chunk. */
|
||||
maxBatches?: number
|
||||
/**
|
||||
* Hard cap on rows processed (deleted + failed) across all chunks per call.
|
||||
* Defaults to `DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE`. Cron
|
||||
* runs frequently enough to catch up the backlog over multiple invocations.
|
||||
*/
|
||||
totalRowLimit?: number
|
||||
workspaceChunkSize?: number
|
||||
}
|
||||
|
||||
/**
|
||||
* Inner loop primitive for cleanup jobs.
|
||||
*
|
||||
* For each workspace chunk: SELECT a batch of eligible rows → run optional
|
||||
* `onBatch` hook (e.g. to delete S3 files) → DELETE those rows by ID. Repeats
|
||||
* until exhausted or `maxBatches` is hit, then moves to the next chunk. Stops
|
||||
* the whole call once `totalRowLimit` rows have been processed.
|
||||
*
|
||||
* Workspace IDs are chunked before the SELECT — see
|
||||
* `DEFAULT_WORKSPACE_CHUNK_SIZE` for why.
|
||||
*/
|
||||
export async function chunkedBatchDelete<TRow extends { id: string }>({
|
||||
tableDef,
|
||||
workspaceIds,
|
||||
tableName,
|
||||
selectChunk,
|
||||
onBatch,
|
||||
batchSize = DEFAULT_BATCH_SIZE,
|
||||
maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE,
|
||||
totalRowLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE,
|
||||
workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE,
|
||||
}: ChunkedBatchDeleteOptions<TRow>): Promise<TableCleanupResult> {
|
||||
const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 }
|
||||
|
||||
if (workspaceIds.length === 0) {
|
||||
logger.info(`[${tableName}] Skipped — no workspaces in scope`)
|
||||
return result
|
||||
}
|
||||
|
||||
const chunks = chunkArray(workspaceIds, workspaceChunkSize)
|
||||
let stoppedEarly = false
|
||||
|
||||
for (const [chunkIdx, chunkIds] of chunks.entries()) {
|
||||
if (result.deleted + result.failed >= totalRowLimit) {
|
||||
stoppedEarly = true
|
||||
break
|
||||
}
|
||||
|
||||
let batchesProcessed = 0
|
||||
let hasMore = true
|
||||
|
||||
while (
|
||||
hasMore &&
|
||||
batchesProcessed < maxBatches &&
|
||||
result.deleted + result.failed < totalRowLimit
|
||||
) {
|
||||
let rows: TRow[] = []
|
||||
try {
|
||||
rows = await selectChunk(chunkIds, batchSize)
|
||||
|
||||
if (rows.length === 0) {
|
||||
hasMore = false
|
||||
break
|
||||
}
|
||||
|
||||
if (onBatch) await onBatch(rows)
|
||||
|
||||
const ids = rows.map((r) => r.id)
|
||||
const deleted = await db
|
||||
.delete(tableDef)
|
||||
.where(inArray(sql`id`, ids))
|
||||
.returning({ id: sql`id` })
|
||||
|
||||
result.deleted += deleted.length
|
||||
hasMore = rows.length === batchSize
|
||||
batchesProcessed++
|
||||
} catch (error) {
|
||||
// Count rows we tried to delete; SELECT-stage errors leave rows=[].
|
||||
result.failed += rows.length
|
||||
logger.error(
|
||||
`[${tableName}] Batch failed (chunk ${chunkIdx + 1}/${chunks.length}, ${rows.length} rows):`,
|
||||
{ error }
|
||||
)
|
||||
hasMore = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[${tableName}] Complete: ${result.deleted} deleted, ${result.failed} failed across ${chunks.length} chunks${stoppedEarly ? ' (row-limit reached, remaining chunks deferred to next run)' : ''}`
|
||||
)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
export interface BatchDeleteOptions {
|
||||
tableDef: PgTable
|
||||
workspaceIdCol: PgColumn
|
||||
@@ -25,13 +179,13 @@ export interface BatchDeleteOptions {
|
||||
requireTimestampNotNull?: boolean
|
||||
batchSize?: number
|
||||
maxBatches?: number
|
||||
workspaceChunkSize?: number
|
||||
}
|
||||
|
||||
/**
|
||||
* Iteratively delete rows in a table matching a workspace + time-based predicate.
|
||||
*
|
||||
* Uses a SELECT-with-LIMIT → DELETE-by-ID pattern to keep each round bounded in
|
||||
* memory and I/O (PostgreSQL DELETE does not support LIMIT directly).
|
||||
* Convenience wrapper around `chunkedBatchDelete` for the common case: delete
|
||||
* rows where `workspaceId IN (...) AND timestamp < retentionDate`. Use this
|
||||
* when there's no per-row side effect (e.g. no S3 files to clean up alongside).
|
||||
*/
|
||||
export async function batchDeleteByWorkspaceAndTimestamp({
|
||||
tableDef,
|
||||
@@ -41,56 +195,23 @@ export async function batchDeleteByWorkspaceAndTimestamp({
|
||||
retentionDate,
|
||||
tableName,
|
||||
requireTimestampNotNull = false,
|
||||
batchSize = DEFAULT_BATCH_SIZE,
|
||||
maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE,
|
||||
...rest
|
||||
}: BatchDeleteOptions): Promise<TableCleanupResult> {
|
||||
const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 }
|
||||
|
||||
if (workspaceIds.length === 0) {
|
||||
logger.info(`[${tableName}] Skipped — no workspaces in scope`)
|
||||
return result
|
||||
}
|
||||
|
||||
const predicates = [inArray(workspaceIdCol, workspaceIds), lt(timestampCol, retentionDate)]
|
||||
if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol))
|
||||
const whereClause = and(...predicates)
|
||||
|
||||
let batchesProcessed = 0
|
||||
let hasMore = true
|
||||
|
||||
while (hasMore && batchesProcessed < maxBatches) {
|
||||
try {
|
||||
const batch = await db
|
||||
return chunkedBatchDelete({
|
||||
tableDef,
|
||||
workspaceIds,
|
||||
tableName,
|
||||
selectChunk: (chunkIds, limit) => {
|
||||
const predicates = [inArray(workspaceIdCol, chunkIds), lt(timestampCol, retentionDate)]
|
||||
if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol))
|
||||
return db
|
||||
.select({ id: sql<string>`id` })
|
||||
.from(tableDef)
|
||||
.where(whereClause)
|
||||
.limit(batchSize)
|
||||
|
||||
if (batch.length === 0) {
|
||||
logger.info(`[${tableName}] No expired rows found`)
|
||||
hasMore = false
|
||||
break
|
||||
}
|
||||
|
||||
const ids = batch.map((r) => r.id)
|
||||
const deleted = await db
|
||||
.delete(tableDef)
|
||||
.where(inArray(sql`id`, ids))
|
||||
.returning({ id: sql`id` })
|
||||
|
||||
result.deleted += deleted.length
|
||||
hasMore = batch.length === batchSize
|
||||
batchesProcessed++
|
||||
|
||||
logger.info(`[${tableName}] Batch ${batchesProcessed}: deleted ${deleted.length} rows`)
|
||||
} catch (error) {
|
||||
result.failed++
|
||||
logger.error(`[${tableName}] Batch delete failed:`, { error })
|
||||
hasMore = false
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
.where(and(...predicates))
|
||||
.limit(limit)
|
||||
},
|
||||
...rest,
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -3,10 +3,11 @@ import { credential } from '@sim/db/schema'
|
||||
import { toError } from '@sim/utils/errors'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types'
|
||||
import { getCredentialActorContext } from '@/lib/credentials/access'
|
||||
|
||||
export function executeManageCredential(
|
||||
rawParams: Record<string, unknown>,
|
||||
_context: ExecutionContext
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
const params = rawParams as {
|
||||
operation: string
|
||||
@@ -17,26 +18,30 @@ export function executeManageCredential(
|
||||
const { operation, displayName } = params
|
||||
return (async () => {
|
||||
try {
|
||||
if (!context?.userId) {
|
||||
return { success: false, error: 'Authentication required' }
|
||||
}
|
||||
|
||||
switch (operation) {
|
||||
case 'rename': {
|
||||
const credentialId = params.credentialId
|
||||
if (!credentialId) return { success: false, error: 'credentialId is required for rename' }
|
||||
if (!displayName) return { success: false, error: 'displayName is required for rename' }
|
||||
const [row] = await db
|
||||
.select({
|
||||
id: credential.id,
|
||||
type: credential.type,
|
||||
displayName: credential.displayName,
|
||||
})
|
||||
.from(credential)
|
||||
.where(eq(credential.id, credentialId))
|
||||
.limit(1)
|
||||
if (!row) return { success: false, error: 'Credential not found' }
|
||||
if (row.type !== 'oauth')
|
||||
|
||||
const actor = await getCredentialActorContext(credentialId, context.userId)
|
||||
if (!actor.credential || !actor.hasWorkspaceAccess) {
|
||||
return { success: false, error: 'Credential not found' }
|
||||
}
|
||||
if (actor.credential.type !== 'oauth') {
|
||||
return {
|
||||
success: false,
|
||||
error: 'Only OAuth credentials can be managed with this tool.',
|
||||
}
|
||||
}
|
||||
if (!actor.canWriteWorkspace && !actor.isAdmin) {
|
||||
return { success: false, error: 'Write access required to rename this credential' }
|
||||
}
|
||||
|
||||
await db
|
||||
.update(credential)
|
||||
.set({ displayName, updatedAt: new Date() })
|
||||
@@ -53,12 +58,16 @@ export function executeManageCredential(
|
||||
const failed: string[] = []
|
||||
|
||||
for (const id of ids) {
|
||||
const [row] = await db
|
||||
.select({ id: credential.id, type: credential.type })
|
||||
.from(credential)
|
||||
.where(eq(credential.id, id))
|
||||
.limit(1)
|
||||
if (!row || row.type !== 'oauth') {
|
||||
const actor = await getCredentialActorContext(id, context.userId)
|
||||
if (
|
||||
!actor.credential ||
|
||||
!actor.hasWorkspaceAccess ||
|
||||
actor.credential.type !== 'oauth'
|
||||
) {
|
||||
failed.push(id)
|
||||
continue
|
||||
}
|
||||
if (!actor.canWriteWorkspace && !actor.isAdmin) {
|
||||
failed.push(id)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import { db } from '@sim/db'
|
||||
import { knowledgeBase } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { toError } from '@sim/utils/errors'
|
||||
import { generateId } from '@sim/utils/id'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types'
|
||||
import { restoreKnowledgeBase } from '@/lib/knowledge/service'
|
||||
import { getTableById, restoreTable } from '@/lib/table/service'
|
||||
@@ -10,6 +13,8 @@ import {
|
||||
} from '@/lib/uploads/contexts/workspace/workspace-file-manager'
|
||||
import { restoreWorkflow } from '@/lib/workflows/lifecycle'
|
||||
import { performRestoreFolder } from '@/lib/workflows/orchestration/folder-lifecycle'
|
||||
import { getWorkflowById } from '@/lib/workflows/utils'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
const logger = createLogger('RestoreResource')
|
||||
|
||||
@@ -33,10 +38,25 @@ export async function executeRestoreResource(
|
||||
}
|
||||
|
||||
const requestId = generateId().slice(0, 8)
|
||||
const callerWorkspaceId = context.workspaceId
|
||||
|
||||
const hasWriteAccess = async (resourceWorkspaceId: string | null | undefined) => {
|
||||
if (!resourceWorkspaceId || resourceWorkspaceId !== callerWorkspaceId) return false
|
||||
const permission = await getUserEntityPermissions(
|
||||
context.userId,
|
||||
'workspace',
|
||||
resourceWorkspaceId
|
||||
)
|
||||
return permission === 'write' || permission === 'admin'
|
||||
}
|
||||
|
||||
try {
|
||||
switch (type) {
|
||||
case 'workflow': {
|
||||
const existing = await getWorkflowById(id, { includeArchived: true })
|
||||
if (!existing || !(await hasWriteAccess(existing.workspaceId))) {
|
||||
return { success: false, error: 'Workflow not found' }
|
||||
}
|
||||
const result = await restoreWorkflow(id, { requestId })
|
||||
if (!result.restored) {
|
||||
return { success: false, error: 'Workflow not found or not archived' }
|
||||
@@ -50,9 +70,13 @@ export async function executeRestoreResource(
|
||||
}
|
||||
|
||||
case 'table': {
|
||||
const existing = await getTableById(id, { includeArchived: true })
|
||||
if (!existing || !(await hasWriteAccess(existing.workspaceId))) {
|
||||
return { success: false, error: 'Table not found' }
|
||||
}
|
||||
await restoreTable(id, requestId)
|
||||
const table = await getTableById(id)
|
||||
const tableName = table?.name || id
|
||||
const tableName = table?.name || existing.name
|
||||
logger.info('Table restored via copilot', { tableId: id, name: tableName })
|
||||
return {
|
||||
success: true,
|
||||
@@ -62,6 +86,9 @@ export async function executeRestoreResource(
|
||||
}
|
||||
|
||||
case 'file': {
|
||||
if (!(await hasWriteAccess(context.workspaceId))) {
|
||||
return { success: false, error: 'File not found' }
|
||||
}
|
||||
await restoreWorkspaceFile(context.workspaceId, id)
|
||||
const fileRecord = await getWorkspaceFile(context.workspaceId, id)
|
||||
const fileName = fileRecord?.name || id
|
||||
@@ -74,6 +101,14 @@ export async function executeRestoreResource(
|
||||
}
|
||||
|
||||
case 'knowledgebase': {
|
||||
const [existing] = await db
|
||||
.select({ workspaceId: knowledgeBase.workspaceId })
|
||||
.from(knowledgeBase)
|
||||
.where(eq(knowledgeBase.id, id))
|
||||
.limit(1)
|
||||
if (!existing || !(await hasWriteAccess(existing.workspaceId))) {
|
||||
return { success: false, error: 'Knowledge base not found' }
|
||||
}
|
||||
await restoreKnowledgeBase(id, requestId)
|
||||
logger.info('Knowledge base restored via copilot', { knowledgeBaseId: id })
|
||||
return {
|
||||
@@ -83,6 +118,9 @@ export async function executeRestoreResource(
|
||||
}
|
||||
|
||||
case 'folder': {
|
||||
if (!(await hasWriteAccess(context.workspaceId))) {
|
||||
return { success: false, error: 'Folder not found' }
|
||||
}
|
||||
const result = await performRestoreFolder({
|
||||
folderId: id,
|
||||
workspaceId: context.workspaceId,
|
||||
|
||||
@@ -28,6 +28,7 @@ import {
|
||||
setWorkflowVariables,
|
||||
updateFolderRecord,
|
||||
updateWorkflowRecord,
|
||||
verifyFolderWorkspace,
|
||||
} from '@/lib/workflows/utils'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import type { BlockState, WorkflowState } from '@/stores/workflows/workflow/types'
|
||||
@@ -522,7 +523,13 @@ export async function executeMoveWorkflow(
|
||||
|
||||
for (const workflowId of workflowIds) {
|
||||
try {
|
||||
await ensureWorkflowAccess(workflowId, context.userId, 'write')
|
||||
const { workspaceId } = await ensureWorkflowAccess(workflowId, context.userId, 'write')
|
||||
if (folderId) {
|
||||
if (!workspaceId || !(await verifyFolderWorkspace(folderId, workspaceId))) {
|
||||
failed.push(workflowId)
|
||||
continue
|
||||
}
|
||||
}
|
||||
assertWorkflowMutationNotAborted(context)
|
||||
await updateWorkflowRecord(workflowId, { folderId })
|
||||
moved.push(workflowId)
|
||||
@@ -562,6 +569,14 @@ export async function executeMoveFolder(
|
||||
|
||||
const workspaceId = context.workspaceId || (await getDefaultWorkspaceId(context.userId))
|
||||
await ensureWorkspaceAccess(workspaceId, context.userId, 'write')
|
||||
|
||||
if (!(await verifyFolderWorkspace(folderId, workspaceId))) {
|
||||
return { success: false, error: 'Folder not found' }
|
||||
}
|
||||
if (parentId && !(await verifyFolderWorkspace(parentId, workspaceId))) {
|
||||
return { success: false, error: 'Parent folder not found' }
|
||||
}
|
||||
|
||||
assertWorkflowMutationNotAborted(context)
|
||||
await updateFolderRecord(folderId, { parentId })
|
||||
|
||||
@@ -1007,6 +1022,11 @@ export async function executeRenameFolder(
|
||||
|
||||
const workspaceId = context.workspaceId || (await getDefaultWorkspaceId(context.userId))
|
||||
await ensureWorkspaceAccess(workspaceId, context.userId, 'write')
|
||||
|
||||
if (!(await verifyFolderWorkspace(folderId, workspaceId))) {
|
||||
return { success: false, error: 'Folder not found' }
|
||||
}
|
||||
|
||||
assertWorkflowMutationNotAborted(context)
|
||||
await updateFolderRecord(folderId, { name })
|
||||
|
||||
|
||||
@@ -105,11 +105,12 @@ export const getJobLogsServerTool: BaseServerTool<GetJobLogsArgs, JobLogEntry[]>
|
||||
}
|
||||
|
||||
const wsId = workspaceId || context.workspaceId
|
||||
if (wsId) {
|
||||
const access = await checkWorkspaceAccess(wsId, context.userId)
|
||||
if (!access.hasAccess) {
|
||||
throw new Error('Unauthorized workspace access')
|
||||
}
|
||||
if (!wsId) {
|
||||
throw new Error('Workspace context required')
|
||||
}
|
||||
const access = await checkWorkspaceAccess(wsId, context.userId)
|
||||
if (!access.hasAccess) {
|
||||
throw new Error('Unauthorized workspace access')
|
||||
}
|
||||
|
||||
const clampedLimit = Math.min(Math.max(1, limit), 5)
|
||||
@@ -121,7 +122,10 @@ export const getJobLogsServerTool: BaseServerTool<GetJobLogsArgs, JobLogEntry[]>
|
||||
includeDetails,
|
||||
})
|
||||
|
||||
const conditions = [eq(jobExecutionLogs.scheduleId, jobId)]
|
||||
const conditions = [
|
||||
eq(jobExecutionLogs.scheduleId, jobId),
|
||||
eq(jobExecutionLogs.workspaceId, wsId),
|
||||
]
|
||||
if (executionId) {
|
||||
conditions.push(eq(jobExecutionLogs.executionId, executionId))
|
||||
}
|
||||
|
||||
@@ -37,6 +37,11 @@ import {
|
||||
import { StorageService } from '@/lib/uploads'
|
||||
import { resolveWorkspaceFileReference } from '@/lib/uploads/contexts/workspace/workspace-file-manager'
|
||||
import { getQueryStrategy, handleVectorOnlySearch } from '@/app/api/knowledge/search/utils'
|
||||
import {
|
||||
checkDocumentWriteAccess,
|
||||
checkKnowledgeBaseAccess,
|
||||
checkKnowledgeBaseWriteAccess,
|
||||
} from '@/app/api/knowledge/utils'
|
||||
|
||||
const logger = createLogger('KnowledgeBaseServerTool')
|
||||
|
||||
@@ -141,6 +146,14 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
}
|
||||
}
|
||||
|
||||
const access = await checkKnowledgeBaseAccess(args.knowledgeBaseId, context.userId)
|
||||
if (!access.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
|
||||
}
|
||||
}
|
||||
|
||||
const knowledgeBase = await getKnowledgeBaseById(args.knowledgeBaseId)
|
||||
if (!knowledgeBase) {
|
||||
return {
|
||||
@@ -187,6 +200,14 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
}
|
||||
}
|
||||
|
||||
const access = await checkKnowledgeBaseAccess(args.knowledgeBaseId, context.userId)
|
||||
if (!access.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
|
||||
}
|
||||
}
|
||||
|
||||
const kb = await getKnowledgeBaseById(args.knowledgeBaseId)
|
||||
if (!kb) {
|
||||
return {
|
||||
@@ -257,6 +278,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
}
|
||||
}
|
||||
|
||||
const writeAccess = await checkKnowledgeBaseWriteAccess(
|
||||
args.knowledgeBaseId,
|
||||
context.userId
|
||||
)
|
||||
if (!writeAccess.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
|
||||
}
|
||||
}
|
||||
|
||||
const targetKb = await getKnowledgeBaseById(args.knowledgeBaseId)
|
||||
if (!targetKb || !targetKb.workspaceId) {
|
||||
return {
|
||||
@@ -363,6 +395,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
}
|
||||
}
|
||||
|
||||
const writeAccess = await checkKnowledgeBaseWriteAccess(
|
||||
args.knowledgeBaseId,
|
||||
context.userId
|
||||
)
|
||||
if (!writeAccess.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
|
||||
}
|
||||
}
|
||||
|
||||
const requestId = generateId().slice(0, 8)
|
||||
assertNotAborted()
|
||||
const updatedKb = await updateKnowledgeBase(args.knowledgeBaseId, updates, requestId)
|
||||
@@ -400,6 +443,12 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
const notFound: string[] = []
|
||||
|
||||
for (const kbId of kbIds) {
|
||||
const writeAccess = await checkKnowledgeBaseWriteAccess(kbId, context.userId)
|
||||
if (!writeAccess.hasAccess) {
|
||||
notFound.push(kbId)
|
||||
continue
|
||||
}
|
||||
|
||||
const kbToDelete = await getKnowledgeBaseById(kbId)
|
||||
if (!kbToDelete) {
|
||||
notFound.push(kbId)
|
||||
@@ -444,8 +493,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
const failed: string[] = []
|
||||
|
||||
for (const docId of docIds) {
|
||||
const requestId = generateId().slice(0, 8)
|
||||
assertNotAborted()
|
||||
const docAccess = await checkDocumentWriteAccess(
|
||||
args.knowledgeBaseId,
|
||||
docId,
|
||||
context.userId
|
||||
)
|
||||
if (!docAccess.hasAccess) {
|
||||
failed.push(docId)
|
||||
continue
|
||||
}
|
||||
const requestId = generateId().slice(0, 8)
|
||||
const result = await deleteDocument(docId, requestId)
|
||||
if (result.success) {
|
||||
deleted.push(docId)
|
||||
@@ -481,6 +539,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
message: 'At least one of filename or enabled is required for update_document',
|
||||
}
|
||||
}
|
||||
const docAccess = await checkDocumentWriteAccess(
|
||||
args.knowledgeBaseId,
|
||||
args.documentId,
|
||||
context.userId
|
||||
)
|
||||
if (!docAccess.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Document with ID "${args.documentId}" not found`,
|
||||
}
|
||||
}
|
||||
const requestId = generateId().slice(0, 8)
|
||||
assertNotAborted()
|
||||
await updateDocument(args.documentId, updateData, requestId)
|
||||
@@ -503,6 +572,14 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
}
|
||||
}
|
||||
|
||||
const access = await checkKnowledgeBaseAccess(args.knowledgeBaseId, context.userId)
|
||||
if (!access.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
|
||||
}
|
||||
}
|
||||
|
||||
const tagDefinitions = await getDocumentTagDefinitions(args.knowledgeBaseId)
|
||||
|
||||
logger.info('Tag definitions listed via copilot', {
|
||||
@@ -537,6 +614,18 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
message: 'tagDisplayName is required for create_tag operation',
|
||||
}
|
||||
}
|
||||
|
||||
const writeAccess = await checkKnowledgeBaseWriteAccess(
|
||||
args.knowledgeBaseId,
|
||||
context.userId
|
||||
)
|
||||
if (!writeAccess.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
|
||||
}
|
||||
}
|
||||
|
||||
const fieldType = args.tagFieldType || 'text'
|
||||
|
||||
const tagSlot = await getNextAvailableSlot(args.knowledgeBaseId, fieldType)
|
||||
@@ -606,6 +695,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
}
|
||||
}
|
||||
|
||||
const writeAccess = await checkKnowledgeBaseWriteAccess(
|
||||
existingTag.knowledgeBaseId,
|
||||
context.userId
|
||||
)
|
||||
if (!writeAccess.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Tag definition with ID "${args.tagDefinitionId}" not found`,
|
||||
}
|
||||
}
|
||||
|
||||
const requestId = generateId().slice(0, 8)
|
||||
assertNotAborted()
|
||||
const updatedTag = await updateTagDefinition(args.tagDefinitionId, updateData, requestId)
|
||||
@@ -643,6 +743,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
}
|
||||
}
|
||||
|
||||
const writeAccess = await checkKnowledgeBaseWriteAccess(
|
||||
args.knowledgeBaseId,
|
||||
context.userId
|
||||
)
|
||||
if (!writeAccess.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
|
||||
}
|
||||
}
|
||||
|
||||
const requestId = generateId().slice(0, 8)
|
||||
assertNotAborted()
|
||||
const deleted = await deleteTagDefinition(
|
||||
@@ -677,6 +788,14 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
}
|
||||
}
|
||||
|
||||
const access = await checkKnowledgeBaseAccess(args.knowledgeBaseId, context.userId)
|
||||
if (!access.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
|
||||
}
|
||||
}
|
||||
|
||||
const requestId = generateId().slice(0, 8)
|
||||
const stats = await getTagUsageStats(args.knowledgeBaseId, requestId)
|
||||
|
||||
@@ -702,6 +821,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
}
|
||||
}
|
||||
|
||||
const writeAccess = await checkKnowledgeBaseWriteAccess(
|
||||
args.knowledgeBaseId,
|
||||
context.userId
|
||||
)
|
||||
if (!writeAccess.hasAccess) {
|
||||
return {
|
||||
success: false,
|
||||
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
|
||||
}
|
||||
}
|
||||
|
||||
const createBody: Record<string, unknown> = {
|
||||
connectorType: args.connectorType,
|
||||
sourceConfig: args.sourceConfig ?? {},
|
||||
@@ -762,6 +892,11 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
return { success: false, message: `Connector "${args.connectorId}" not found` }
|
||||
}
|
||||
|
||||
const writeAccess = await checkKnowledgeBaseWriteAccess(kbId, context.userId)
|
||||
if (!writeAccess.hasAccess) {
|
||||
return { success: false, message: `Connector "${args.connectorId}" not found` }
|
||||
}
|
||||
|
||||
const updateBody: Record<string, unknown> = {}
|
||||
if (args.sourceConfig !== undefined) updateBody.sourceConfig = args.sourceConfig
|
||||
if (args.syncIntervalMinutes !== undefined)
|
||||
@@ -810,6 +945,11 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
return { success: false, message: `Connector "${args.connectorId}" not found` }
|
||||
}
|
||||
|
||||
const writeAccess = await checkKnowledgeBaseWriteAccess(deleteKbId, context.userId)
|
||||
if (!writeAccess.hasAccess) {
|
||||
return { success: false, message: `Connector "${args.connectorId}" not found` }
|
||||
}
|
||||
|
||||
assertNotAborted()
|
||||
const deleteRes = await connectorApiCall(
|
||||
context.userId,
|
||||
@@ -843,6 +983,11 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
|
||||
return { success: false, message: `Connector "${args.connectorId}" not found` }
|
||||
}
|
||||
|
||||
const writeAccess = await checkKnowledgeBaseWriteAccess(syncKbId, context.userId)
|
||||
if (!writeAccess.hasAccess) {
|
||||
return { success: false, message: `Connector "${args.connectorId}" not found` }
|
||||
}
|
||||
|
||||
assertNotAborted()
|
||||
const syncRes = await connectorApiCall(
|
||||
context.userId,
|
||||
|
||||
@@ -223,9 +223,12 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
if (!args.tableId) {
|
||||
return { success: false, message: 'Table ID is required' }
|
||||
}
|
||||
if (!workspaceId) {
|
||||
return { success: false, message: 'Workspace ID is required' }
|
||||
}
|
||||
|
||||
const table = await getTableById(args.tableId)
|
||||
if (!table) {
|
||||
if (!table || table.workspaceId !== workspaceId) {
|
||||
return { success: false, message: `Table not found: ${args.tableId}` }
|
||||
}
|
||||
|
||||
@@ -240,9 +243,12 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
if (!args.tableId) {
|
||||
return { success: false, message: 'Table ID is required' }
|
||||
}
|
||||
if (!workspaceId) {
|
||||
return { success: false, message: 'Workspace ID is required' }
|
||||
}
|
||||
|
||||
const table = await getTableById(args.tableId)
|
||||
if (!table) {
|
||||
if (!table || table.workspaceId !== workspaceId) {
|
||||
return { success: false, message: `Table not found: ${args.tableId}` }
|
||||
}
|
||||
|
||||
@@ -816,6 +822,9 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
if (!args.tableId) {
|
||||
return { success: false, message: 'Table ID is required' }
|
||||
}
|
||||
if (!workspaceId) {
|
||||
return { success: false, message: 'Workspace ID is required' }
|
||||
}
|
||||
const col = (args as Record<string, unknown>).column as
|
||||
| {
|
||||
name: string
|
||||
@@ -830,6 +839,10 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
message: 'column with name and type is required for add_column',
|
||||
}
|
||||
}
|
||||
const tableForAdd = await getTableById(args.tableId)
|
||||
if (!tableForAdd || tableForAdd.workspaceId !== workspaceId) {
|
||||
return { success: false, message: `Table not found: ${args.tableId}` }
|
||||
}
|
||||
const requestId = generateId().slice(0, 8)
|
||||
assertNotAborted()
|
||||
const updated = await addTableColumn(args.tableId, col, requestId)
|
||||
@@ -844,11 +857,18 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
if (!args.tableId) {
|
||||
return { success: false, message: 'Table ID is required' }
|
||||
}
|
||||
if (!workspaceId) {
|
||||
return { success: false, message: 'Workspace ID is required' }
|
||||
}
|
||||
const colName = (args as Record<string, unknown>).columnName as string | undefined
|
||||
const newColName = (args as Record<string, unknown>).newName as string | undefined
|
||||
if (!colName || !newColName) {
|
||||
return { success: false, message: 'columnName and newName are required' }
|
||||
}
|
||||
const tableForRename = await getTableById(args.tableId)
|
||||
if (!tableForRename || tableForRename.workspaceId !== workspaceId) {
|
||||
return { success: false, message: `Table not found: ${args.tableId}` }
|
||||
}
|
||||
const requestId = generateId().slice(0, 8)
|
||||
assertNotAborted()
|
||||
const updated = await renameColumn(
|
||||
@@ -866,12 +886,19 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
if (!args.tableId) {
|
||||
return { success: false, message: 'Table ID is required' }
|
||||
}
|
||||
if (!workspaceId) {
|
||||
return { success: false, message: 'Workspace ID is required' }
|
||||
}
|
||||
const colName = (args as Record<string, unknown>).columnName as string | undefined
|
||||
const colNames = (args as Record<string, unknown>).columnNames as string[] | undefined
|
||||
const names = colNames ?? (colName ? [colName] : null)
|
||||
if (!names || names.length === 0) {
|
||||
return { success: false, message: 'columnName or columnNames is required' }
|
||||
}
|
||||
const tableForDelete = await getTableById(args.tableId)
|
||||
if (!tableForDelete || tableForDelete.workspaceId !== workspaceId) {
|
||||
return { success: false, message: `Table not found: ${args.tableId}` }
|
||||
}
|
||||
const requestId = generateId().slice(0, 8)
|
||||
if (names.length === 1) {
|
||||
assertNotAborted()
|
||||
@@ -901,6 +928,9 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
if (!args.tableId) {
|
||||
return { success: false, message: 'Table ID is required' }
|
||||
}
|
||||
if (!workspaceId) {
|
||||
return { success: false, message: 'Workspace ID is required' }
|
||||
}
|
||||
const colName = (args as Record<string, unknown>).columnName as string | undefined
|
||||
if (!colName) {
|
||||
return { success: false, message: 'columnName is required' }
|
||||
@@ -913,6 +943,10 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
|
||||
message: 'At least one of newType or unique must be provided',
|
||||
}
|
||||
}
|
||||
const tableForUpdate = await getTableById(args.tableId)
|
||||
if (!tableForUpdate || tableForUpdate.workspaceId !== workspaceId) {
|
||||
return { success: false, message: `Table not found: ${args.tableId}` }
|
||||
}
|
||||
const requestId = generateId().slice(0, 8)
|
||||
let result: TableDefinition | undefined
|
||||
if (newType !== undefined) {
|
||||
|
||||
@@ -704,7 +704,7 @@ export const OAUTH_PROVIDERS: Record<string, OAuthProviderConfig> = {
|
||||
services: {
|
||||
slack: {
|
||||
name: 'Slack',
|
||||
description: 'Send messages using a bot for Slack.',
|
||||
description: 'Use Slack messaging, files, reactions, views, and canvases.',
|
||||
providerId: 'slack',
|
||||
icon: SlackIcon,
|
||||
baseProviderIcon: SlackIcon,
|
||||
@@ -722,6 +722,7 @@ export const OAUTH_PROVIDERS: Record<string, OAuthProviderConfig> = {
|
||||
// TODO: Add 'users:read.email' once Slack app review is approved
|
||||
'files:write',
|
||||
'files:read',
|
||||
'canvases:read',
|
||||
'canvases:write',
|
||||
'reactions:write',
|
||||
],
|
||||
|
||||
@@ -278,7 +278,8 @@ export const SCOPE_DESCRIPTIONS: Record<string, string> = {
|
||||
'users:read.email': 'View user email addresses',
|
||||
'files:write': 'Upload files',
|
||||
'files:read': 'Download and read files',
|
||||
'canvases:write': 'Create canvas documents',
|
||||
'canvases:read': 'Read canvas sections',
|
||||
'canvases:write': 'Create, edit, and delete canvas documents',
|
||||
'reactions:write': 'Add emoji reactions to messages',
|
||||
|
||||
// Webflow scopes
|
||||
|
||||
87
apps/sim/lib/uploads/core/upload-token.ts
Normal file
87
apps/sim/lib/uploads/core/upload-token.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
import { safeCompare } from '@sim/security/compare'
|
||||
import { hmacSha256Base64 } from '@sim/security/hmac'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import type { StorageContext } from '@/lib/uploads/shared/types'
|
||||
|
||||
export interface UploadTokenPayload {
|
||||
uploadId: string
|
||||
key: string
|
||||
userId: string
|
||||
workspaceId: string
|
||||
context: StorageContext
|
||||
}
|
||||
|
||||
interface SignedPayload extends UploadTokenPayload {
|
||||
exp: number
|
||||
v: 1
|
||||
}
|
||||
|
||||
const toBase64Url = (input: string): string => Buffer.from(input, 'utf8').toString('base64url')
|
||||
|
||||
const fromBase64Url = (input: string): string => Buffer.from(input, 'base64url').toString('utf8')
|
||||
|
||||
const sign = (payload: string): string => hmacSha256Base64(payload, env.INTERNAL_API_SECRET)
|
||||
|
||||
/**
|
||||
* Sign an upload session token binding (uploadId, key, userId, workspaceId, context).
|
||||
* Used to prevent IDOR on multipart upload follow-up calls (get-part-urls, complete, abort).
|
||||
*/
|
||||
export function signUploadToken(payload: UploadTokenPayload, expiresInSeconds = 60 * 60): string {
|
||||
const signed: SignedPayload = {
|
||||
...payload,
|
||||
exp: Math.floor(Date.now() / 1000) + expiresInSeconds,
|
||||
v: 1,
|
||||
}
|
||||
const encoded = toBase64Url(JSON.stringify(signed))
|
||||
return `${encoded}.${sign(encoded)}`
|
||||
}
|
||||
|
||||
export type UploadTokenVerification =
|
||||
| { valid: true; payload: UploadTokenPayload }
|
||||
| { valid: false }
|
||||
|
||||
export function verifyUploadToken(token: string): UploadTokenVerification {
|
||||
if (typeof token !== 'string') {
|
||||
return { valid: false }
|
||||
}
|
||||
const parts = token.split('.')
|
||||
if (parts.length !== 2) return { valid: false }
|
||||
const [encoded, signature] = parts
|
||||
if (!encoded || !signature) return { valid: false }
|
||||
|
||||
const expected = sign(encoded)
|
||||
if (!safeCompare(signature, expected)) {
|
||||
return { valid: false }
|
||||
}
|
||||
|
||||
let parsed: SignedPayload
|
||||
try {
|
||||
parsed = JSON.parse(fromBase64Url(encoded)) as SignedPayload
|
||||
} catch {
|
||||
return { valid: false }
|
||||
}
|
||||
|
||||
if (
|
||||
parsed.v !== 1 ||
|
||||
typeof parsed.exp !== 'number' ||
|
||||
parsed.exp < Math.floor(Date.now() / 1000) ||
|
||||
typeof parsed.uploadId !== 'string' ||
|
||||
typeof parsed.key !== 'string' ||
|
||||
typeof parsed.userId !== 'string' ||
|
||||
typeof parsed.workspaceId !== 'string' ||
|
||||
typeof parsed.context !== 'string'
|
||||
) {
|
||||
return { valid: false }
|
||||
}
|
||||
|
||||
return {
|
||||
valid: true,
|
||||
payload: {
|
||||
uploadId: parsed.uploadId,
|
||||
key: parsed.key,
|
||||
userId: parsed.userId,
|
||||
workspaceId: parsed.workspaceId,
|
||||
context: parsed.context as StorageContext,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -564,6 +564,18 @@ export async function updateFolderRecord(
|
||||
await db.update(workflowFolder).set(setData).where(eq(workflowFolder.id, folderId))
|
||||
}
|
||||
|
||||
export async function verifyFolderWorkspace(
|
||||
folderId: string,
|
||||
workspaceId: string
|
||||
): Promise<boolean> {
|
||||
const [row] = await db
|
||||
.select({ id: workflowFolder.id })
|
||||
.from(workflowFolder)
|
||||
.where(and(eq(workflowFolder.id, folderId), eq(workflowFolder.workspaceId, workspaceId)))
|
||||
.limit(1)
|
||||
return Boolean(row)
|
||||
}
|
||||
|
||||
export async function deleteFolderRecord(folderId: string): Promise<boolean> {
|
||||
const [folder] = await db
|
||||
.select({ parentId: workflowFolder.parentId })
|
||||
|
||||
@@ -2364,19 +2364,23 @@ import {
|
||||
slackCanvasTool,
|
||||
slackCreateChannelCanvasTool,
|
||||
slackCreateConversationTool,
|
||||
slackDeleteCanvasTool,
|
||||
slackDeleteMessageTool,
|
||||
slackDownloadTool,
|
||||
slackEditCanvasTool,
|
||||
slackEphemeralMessageTool,
|
||||
slackGetCanvasTool,
|
||||
slackGetChannelInfoTool,
|
||||
slackGetMessageTool,
|
||||
slackGetThreadTool,
|
||||
slackGetUserPresenceTool,
|
||||
slackGetUserTool,
|
||||
slackInviteToConversationTool,
|
||||
slackListCanvasesTool,
|
||||
slackListChannelsTool,
|
||||
slackListMembersTool,
|
||||
slackListUsersTool,
|
||||
slackLookupCanvasSectionsTool,
|
||||
slackMessageReaderTool,
|
||||
slackMessageTool,
|
||||
slackOpenViewTool,
|
||||
@@ -3360,6 +3364,10 @@ export const tools: Record<string, ToolConfig> = {
|
||||
slack_publish_view: slackPublishViewTool,
|
||||
slack_edit_canvas: slackEditCanvasTool,
|
||||
slack_create_channel_canvas: slackCreateChannelCanvasTool,
|
||||
slack_get_canvas: slackGetCanvasTool,
|
||||
slack_list_canvases: slackListCanvasesTool,
|
||||
slack_lookup_canvas_sections: slackLookupCanvasSectionsTool,
|
||||
slack_delete_canvas: slackDeleteCanvasTool,
|
||||
slack_create_conversation: slackCreateConversationTool,
|
||||
slack_invite_to_conversation: slackInviteToConversationTool,
|
||||
github_repo_info: githubRepoInfoTool,
|
||||
|
||||
79
apps/sim/tools/slack/delete_canvas.ts
Normal file
79
apps/sim/tools/slack/delete_canvas.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import type { SlackDeleteCanvasParams, SlackDeleteCanvasResponse } from '@/tools/slack/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
export const slackDeleteCanvasTool: ToolConfig<SlackDeleteCanvasParams, SlackDeleteCanvasResponse> =
|
||||
{
|
||||
id: 'slack_delete_canvas',
|
||||
name: 'Slack Delete Canvas',
|
||||
description: 'Delete a Slack canvas by its canvas ID',
|
||||
version: '1.0.0',
|
||||
|
||||
oauth: {
|
||||
required: true,
|
||||
provider: 'slack',
|
||||
},
|
||||
|
||||
params: {
|
||||
authMethod: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Authentication method: oauth or bot_token',
|
||||
},
|
||||
botToken: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Bot token for Custom Bot',
|
||||
},
|
||||
accessToken: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'hidden',
|
||||
description: 'OAuth access token or bot token for Slack API',
|
||||
},
|
||||
canvasId: {
|
||||
type: 'string',
|
||||
required: true,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Canvas ID to delete (e.g., F1234ABCD)',
|
||||
},
|
||||
},
|
||||
|
||||
request: {
|
||||
url: 'https://slack.com/api/canvases.delete',
|
||||
method: 'POST',
|
||||
headers: (params: SlackDeleteCanvasParams) => ({
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${params.accessToken || params.botToken}`,
|
||||
}),
|
||||
body: (params: SlackDeleteCanvasParams) => ({
|
||||
canvas_id: params.canvasId.trim(),
|
||||
}),
|
||||
},
|
||||
|
||||
transformResponse: async (response: Response) => {
|
||||
const data = await response.json()
|
||||
|
||||
if (!data.ok) {
|
||||
if (data.error === 'canvas_not_found') {
|
||||
throw new Error('Canvas not found or not visible to the authenticated Slack user or bot.')
|
||||
}
|
||||
if (data.error === 'canvas_deleting_disabled') {
|
||||
throw new Error('Canvas deletion is disabled for this workspace.')
|
||||
}
|
||||
throw new Error(data.error || 'Failed to delete canvas')
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
ok: data.ok,
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
outputs: {
|
||||
ok: { type: 'boolean', description: 'Whether Slack deleted the canvas successfully' },
|
||||
},
|
||||
}
|
||||
85
apps/sim/tools/slack/get_canvas.ts
Normal file
85
apps/sim/tools/slack/get_canvas.ts
Normal file
@@ -0,0 +1,85 @@
|
||||
import type { SlackGetCanvasParams, SlackGetCanvasResponse } from '@/tools/slack/types'
|
||||
import { CANVAS_FILE_OUTPUT_PROPERTIES } from '@/tools/slack/types'
|
||||
import { mapCanvasFile } from '@/tools/slack/utils'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
export const slackGetCanvasTool: ToolConfig<SlackGetCanvasParams, SlackGetCanvasResponse> = {
|
||||
id: 'slack_get_canvas',
|
||||
name: 'Slack Get Canvas Info',
|
||||
description: 'Get Slack canvas file metadata by canvas ID',
|
||||
version: '1.0.0',
|
||||
|
||||
oauth: {
|
||||
required: true,
|
||||
provider: 'slack',
|
||||
},
|
||||
|
||||
params: {
|
||||
authMethod: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Authentication method: oauth or bot_token',
|
||||
},
|
||||
botToken: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Bot token for Custom Bot',
|
||||
},
|
||||
accessToken: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'hidden',
|
||||
description: 'OAuth access token or bot token for Slack API',
|
||||
},
|
||||
canvasId: {
|
||||
type: 'string',
|
||||
required: true,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Canvas file ID to retrieve (e.g., F1234ABCD)',
|
||||
},
|
||||
},
|
||||
|
||||
request: {
|
||||
url: (params: SlackGetCanvasParams) => {
|
||||
const url = new URL('https://slack.com/api/files.info')
|
||||
url.searchParams.append('file', params.canvasId.trim())
|
||||
return url.toString()
|
||||
},
|
||||
method: 'GET',
|
||||
headers: (params: SlackGetCanvasParams) => ({
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${params.accessToken || params.botToken}`,
|
||||
}),
|
||||
},
|
||||
|
||||
transformResponse: async (response: Response) => {
|
||||
const data = await response.json()
|
||||
|
||||
if (!data.ok) {
|
||||
if (data.error === 'file_not_found') {
|
||||
throw new Error('Canvas not found. Please check the canvas ID and try again.')
|
||||
}
|
||||
if (data.error === 'not_visible') {
|
||||
throw new Error('Canvas is not visible to the authenticated Slack user or bot.')
|
||||
}
|
||||
throw new Error(data.error || 'Failed to get canvas from Slack')
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
canvas: mapCanvasFile(data.file),
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
outputs: {
|
||||
canvas: {
|
||||
type: 'object',
|
||||
description: 'Canvas file information returned by Slack',
|
||||
properties: CANVAS_FILE_OUTPUT_PROPERTIES,
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -2,19 +2,23 @@ import { slackAddReactionTool } from '@/tools/slack/add_reaction'
|
||||
import { slackCanvasTool } from '@/tools/slack/canvas'
|
||||
import { slackCreateChannelCanvasTool } from '@/tools/slack/create_channel_canvas'
|
||||
import { slackCreateConversationTool } from '@/tools/slack/create_conversation'
|
||||
import { slackDeleteCanvasTool } from '@/tools/slack/delete_canvas'
|
||||
import { slackDeleteMessageTool } from '@/tools/slack/delete_message'
|
||||
import { slackDownloadTool } from '@/tools/slack/download'
|
||||
import { slackEditCanvasTool } from '@/tools/slack/edit_canvas'
|
||||
import { slackEphemeralMessageTool } from '@/tools/slack/ephemeral_message'
|
||||
import { slackGetCanvasTool } from '@/tools/slack/get_canvas'
|
||||
import { slackGetChannelInfoTool } from '@/tools/slack/get_channel_info'
|
||||
import { slackGetMessageTool } from '@/tools/slack/get_message'
|
||||
import { slackGetThreadTool } from '@/tools/slack/get_thread'
|
||||
import { slackGetUserTool } from '@/tools/slack/get_user'
|
||||
import { slackGetUserPresenceTool } from '@/tools/slack/get_user_presence'
|
||||
import { slackInviteToConversationTool } from '@/tools/slack/invite_to_conversation'
|
||||
import { slackListCanvasesTool } from '@/tools/slack/list_canvases'
|
||||
import { slackListChannelsTool } from '@/tools/slack/list_channels'
|
||||
import { slackListMembersTool } from '@/tools/slack/list_members'
|
||||
import { slackListUsersTool } from '@/tools/slack/list_users'
|
||||
import { slackLookupCanvasSectionsTool } from '@/tools/slack/lookup_canvas_sections'
|
||||
import { slackMessageTool } from '@/tools/slack/message'
|
||||
import { slackMessageReaderTool } from '@/tools/slack/message_reader'
|
||||
import { slackOpenViewTool } from '@/tools/slack/open_view'
|
||||
@@ -29,6 +33,10 @@ export {
|
||||
slackCanvasTool,
|
||||
slackCreateConversationTool,
|
||||
slackCreateChannelCanvasTool,
|
||||
slackGetCanvasTool,
|
||||
slackListCanvasesTool,
|
||||
slackLookupCanvasSectionsTool,
|
||||
slackDeleteCanvasTool,
|
||||
slackMessageReaderTool,
|
||||
slackDownloadTool,
|
||||
slackEditCanvasTool,
|
||||
@@ -51,3 +59,5 @@ export {
|
||||
slackGetThreadTool,
|
||||
slackInviteToConversationTool,
|
||||
}
|
||||
|
||||
export * from './types'
|
||||
|
||||
142
apps/sim/tools/slack/list_canvases.ts
Normal file
142
apps/sim/tools/slack/list_canvases.ts
Normal file
@@ -0,0 +1,142 @@
|
||||
import type { SlackListCanvasesParams, SlackListCanvasesResponse } from '@/tools/slack/types'
|
||||
import { CANVAS_FILE_OUTPUT_PROPERTIES, CANVAS_PAGING_OUTPUT_PROPERTIES } from '@/tools/slack/types'
|
||||
import { mapCanvasFile } from '@/tools/slack/utils'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
export const slackListCanvasesTool: ToolConfig<SlackListCanvasesParams, SlackListCanvasesResponse> =
|
||||
{
|
||||
id: 'slack_list_canvases',
|
||||
name: 'Slack List Canvases',
|
||||
description: 'List Slack canvases available to the authenticated user or bot',
|
||||
version: '1.0.0',
|
||||
|
||||
oauth: {
|
||||
required: true,
|
||||
provider: 'slack',
|
||||
},
|
||||
|
||||
params: {
|
||||
authMethod: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Authentication method: oauth or bot_token',
|
||||
},
|
||||
botToken: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Bot token for Custom Bot',
|
||||
},
|
||||
accessToken: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'hidden',
|
||||
description: 'OAuth access token or bot token for Slack API',
|
||||
},
|
||||
channel: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Filter canvases appearing in a specific channel ID',
|
||||
},
|
||||
count: {
|
||||
type: 'number',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Number of canvases to return per page',
|
||||
},
|
||||
page: {
|
||||
type: 'number',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Page number to return',
|
||||
},
|
||||
user: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Filter canvases created by a single user ID',
|
||||
},
|
||||
tsFrom: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Filter canvases created after this Unix timestamp',
|
||||
},
|
||||
tsTo: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Filter canvases created before this Unix timestamp',
|
||||
},
|
||||
teamId: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Encoded team ID, required when using an org-level token',
|
||||
},
|
||||
},
|
||||
|
||||
request: {
|
||||
url: (params: SlackListCanvasesParams) => {
|
||||
const url = new URL('https://slack.com/api/files.list')
|
||||
url.searchParams.append('types', 'canvas')
|
||||
|
||||
if (params.channel) url.searchParams.append('channel', params.channel.trim())
|
||||
if (params.count) url.searchParams.append('count', String(params.count))
|
||||
if (params.page) url.searchParams.append('page', String(params.page))
|
||||
if (params.user) url.searchParams.append('user', params.user.trim())
|
||||
if (params.tsFrom) url.searchParams.append('ts_from', params.tsFrom.trim())
|
||||
if (params.tsTo) url.searchParams.append('ts_to', params.tsTo.trim())
|
||||
if (params.teamId) url.searchParams.append('team_id', params.teamId.trim())
|
||||
|
||||
return url.toString()
|
||||
},
|
||||
method: 'GET',
|
||||
headers: (params: SlackListCanvasesParams) => ({
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${params.accessToken || params.botToken}`,
|
||||
}),
|
||||
},
|
||||
|
||||
transformResponse: async (response: Response) => {
|
||||
const data = await response.json()
|
||||
|
||||
if (!data.ok) {
|
||||
if (data.error === 'unknown_type') {
|
||||
throw new Error('Slack did not recognize the canvas file type filter.')
|
||||
}
|
||||
throw new Error(data.error || 'Failed to list canvases from Slack')
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
canvases: (data.files ?? []).map(mapCanvasFile),
|
||||
paging: {
|
||||
count: data.paging?.count ?? 0,
|
||||
total: data.paging?.total ?? 0,
|
||||
page: data.paging?.page ?? 0,
|
||||
pages: data.paging?.pages ?? 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
outputs: {
|
||||
canvases: {
|
||||
type: 'array',
|
||||
description: 'Canvas file objects returned by Slack',
|
||||
items: {
|
||||
type: 'object',
|
||||
properties: CANVAS_FILE_OUTPUT_PROPERTIES,
|
||||
},
|
||||
},
|
||||
paging: {
|
||||
type: 'object',
|
||||
description: 'Pagination information from Slack',
|
||||
properties: CANVAS_PAGING_OUTPUT_PROPERTIES,
|
||||
},
|
||||
},
|
||||
}
|
||||
114
apps/sim/tools/slack/lookup_canvas_sections.ts
Normal file
114
apps/sim/tools/slack/lookup_canvas_sections.ts
Normal file
@@ -0,0 +1,114 @@
|
||||
import type {
|
||||
SlackLookupCanvasSectionsParams,
|
||||
SlackLookupCanvasSectionsResponse,
|
||||
} from '@/tools/slack/types'
|
||||
import { CANVAS_SECTION_OUTPUT_PROPERTIES } from '@/tools/slack/types'
|
||||
import type { ToolConfig } from '@/tools/types'
|
||||
|
||||
const parseCriteria = (criteria: SlackLookupCanvasSectionsParams['criteria']) => {
|
||||
if (typeof criteria !== 'string') {
|
||||
return criteria
|
||||
}
|
||||
|
||||
try {
|
||||
return JSON.parse(criteria)
|
||||
} catch {
|
||||
throw new Error('Canvas section criteria must be a valid JSON object')
|
||||
}
|
||||
}
|
||||
|
||||
export const slackLookupCanvasSectionsTool: ToolConfig<
|
||||
SlackLookupCanvasSectionsParams,
|
||||
SlackLookupCanvasSectionsResponse
|
||||
> = {
|
||||
id: 'slack_lookup_canvas_sections',
|
||||
name: 'Slack Lookup Canvas Sections',
|
||||
description: 'Find Slack canvas section IDs matching criteria for later edits',
|
||||
version: '1.0.0',
|
||||
|
||||
oauth: {
|
||||
required: true,
|
||||
provider: 'slack',
|
||||
},
|
||||
|
||||
params: {
|
||||
authMethod: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Authentication method: oauth or bot_token',
|
||||
},
|
||||
botToken: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'user-only',
|
||||
description: 'Bot token for Custom Bot',
|
||||
},
|
||||
accessToken: {
|
||||
type: 'string',
|
||||
required: false,
|
||||
visibility: 'hidden',
|
||||
description: 'OAuth access token or bot token for Slack API',
|
||||
},
|
||||
canvasId: {
|
||||
type: 'string',
|
||||
required: true,
|
||||
visibility: 'user-or-llm',
|
||||
description: 'Canvas ID to search (e.g., F1234ABCD)',
|
||||
},
|
||||
criteria: {
|
||||
type: 'json',
|
||||
required: true,
|
||||
visibility: 'user-or-llm',
|
||||
description:
|
||||
'Section lookup criteria, such as {"section_types":["h1"],"contains_text":"Roadmap"}',
|
||||
},
|
||||
},
|
||||
|
||||
request: {
|
||||
url: 'https://slack.com/api/canvases.sections.lookup',
|
||||
method: 'POST',
|
||||
headers: (params: SlackLookupCanvasSectionsParams) => ({
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${params.accessToken || params.botToken}`,
|
||||
}),
|
||||
body: (params: SlackLookupCanvasSectionsParams) => ({
|
||||
canvas_id: params.canvasId.trim(),
|
||||
criteria: parseCriteria(params.criteria),
|
||||
}),
|
||||
},
|
||||
|
||||
transformResponse: async (response: Response) => {
|
||||
const data = await response.json()
|
||||
|
||||
if (!data.ok) {
|
||||
if (data.error === 'canvas_not_found') {
|
||||
throw new Error('Canvas not found or not visible to the authenticated Slack user or bot.')
|
||||
}
|
||||
if (data.error === 'missing_scope') {
|
||||
throw new Error(
|
||||
'Missing required permissions. Please reconnect your Slack account with the canvases:read scope.'
|
||||
)
|
||||
}
|
||||
throw new Error(data.error || 'Failed to look up canvas sections')
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
sections: data.sections ?? [],
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
outputs: {
|
||||
sections: {
|
||||
type: 'array',
|
||||
description: 'Canvas sections matching the lookup criteria',
|
||||
items: {
|
||||
type: 'object',
|
||||
properties: CANVAS_SECTION_OUTPUT_PROPERTIES,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -488,6 +488,91 @@ export const CANVAS_OUTPUT_PROPERTIES = {
|
||||
title: { type: 'string', description: 'Canvas title' },
|
||||
} as const satisfies Record<string, OutputProperty>
|
||||
|
||||
/**
|
||||
* Canvas file object output properties.
|
||||
* Based on Slack file objects returned by files.info and files.list for canvases.
|
||||
*/
|
||||
export const CANVAS_FILE_OUTPUT_PROPERTIES = {
|
||||
id: { type: 'string', description: 'Unique canvas file identifier' },
|
||||
created: { type: 'number', description: 'Unix timestamp when the canvas was created' },
|
||||
timestamp: { type: 'number', description: 'Unix timestamp associated with the canvas' },
|
||||
name: { type: 'string', description: 'Canvas file name', optional: true },
|
||||
title: { type: 'string', description: 'Canvas title', optional: true },
|
||||
mimetype: { type: 'string', description: 'MIME type of the canvas file', optional: true },
|
||||
filetype: { type: 'string', description: 'Slack file type for the canvas', optional: true },
|
||||
pretty_type: { type: 'string', description: 'Human-readable file type', optional: true },
|
||||
user: { type: 'string', description: 'User ID of the canvas creator', optional: true },
|
||||
editable: { type: 'boolean', description: 'Whether the canvas file is editable', optional: true },
|
||||
size: { type: 'number', description: 'Canvas file size in bytes', optional: true },
|
||||
mode: { type: 'string', description: 'File mode', optional: true },
|
||||
is_external: {
|
||||
type: 'boolean',
|
||||
description: 'Whether the canvas is externally hosted',
|
||||
optional: true,
|
||||
},
|
||||
is_public: { type: 'boolean', description: 'Whether the canvas is public', optional: true },
|
||||
url_private: {
|
||||
type: 'string',
|
||||
description: 'Private URL for the canvas file',
|
||||
optional: true,
|
||||
},
|
||||
url_private_download: {
|
||||
type: 'string',
|
||||
description: 'Private download URL for the canvas file',
|
||||
optional: true,
|
||||
},
|
||||
permalink: { type: 'string', description: 'Permanent URL for the canvas', optional: true },
|
||||
channels: {
|
||||
type: 'array',
|
||||
description: 'Public channel IDs where the canvas appears',
|
||||
items: { type: 'string', description: 'Channel ID' },
|
||||
optional: true,
|
||||
},
|
||||
groups: {
|
||||
type: 'array',
|
||||
description: 'Private channel IDs where the canvas appears',
|
||||
items: { type: 'string', description: 'Channel ID' },
|
||||
optional: true,
|
||||
},
|
||||
ims: {
|
||||
type: 'array',
|
||||
description: 'Direct message IDs where the canvas appears',
|
||||
items: { type: 'string', description: 'Conversation ID' },
|
||||
optional: true,
|
||||
},
|
||||
canvas_readtime: {
|
||||
type: 'number',
|
||||
description: 'Approximate read time for canvas content',
|
||||
optional: true,
|
||||
},
|
||||
is_channel_space: {
|
||||
type: 'boolean',
|
||||
description: 'Whether this canvas is linked to a channel',
|
||||
optional: true,
|
||||
},
|
||||
linked_channel_id: {
|
||||
type: 'string',
|
||||
description: 'Channel ID linked to this canvas',
|
||||
optional: true,
|
||||
},
|
||||
canvas_creator_id: {
|
||||
type: 'string',
|
||||
description: 'User ID of the canvas creator',
|
||||
optional: true,
|
||||
},
|
||||
} as const satisfies Record<string, OutputProperty>
|
||||
|
||||
export const CANVAS_PAGING_OUTPUT_PROPERTIES = {
|
||||
count: { type: 'number', description: 'Number of items requested per page' },
|
||||
total: { type: 'number', description: 'Total number of matching files' },
|
||||
page: { type: 'number', description: 'Current page number' },
|
||||
pages: { type: 'number', description: 'Total number of pages' },
|
||||
} as const satisfies Record<string, OutputProperty>
|
||||
|
||||
export const CANVAS_SECTION_OUTPUT_PROPERTIES = {
|
||||
id: { type: 'string', description: 'Canvas section identifier' },
|
||||
} as const satisfies Record<string, OutputProperty>
|
||||
|
||||
/**
|
||||
* Output definition for modal view objects
|
||||
* Based on Slack views.open response structure
|
||||
@@ -735,6 +820,29 @@ export interface SlackCreateChannelCanvasParams extends SlackBaseParams {
|
||||
content?: string
|
||||
}
|
||||
|
||||
export interface SlackGetCanvasParams extends SlackBaseParams {
|
||||
canvasId: string
|
||||
}
|
||||
|
||||
export interface SlackListCanvasesParams extends SlackBaseParams {
|
||||
channel?: string
|
||||
count?: number
|
||||
page?: number
|
||||
user?: string
|
||||
tsFrom?: string
|
||||
tsTo?: string
|
||||
teamId?: string
|
||||
}
|
||||
|
||||
export interface SlackLookupCanvasSectionsParams extends SlackBaseParams {
|
||||
canvasId: string
|
||||
criteria: Record<string, unknown> | string
|
||||
}
|
||||
|
||||
export interface SlackDeleteCanvasParams extends SlackBaseParams {
|
||||
canvasId: string
|
||||
}
|
||||
|
||||
export interface SlackOpenViewParams extends SlackBaseParams {
|
||||
triggerId: string
|
||||
interactivityPointer?: string
|
||||
@@ -1078,6 +1186,69 @@ export interface SlackCreateChannelCanvasResponse extends ToolResponse {
|
||||
}
|
||||
}
|
||||
|
||||
export interface SlackCanvasFile {
|
||||
id: string
|
||||
created: number | null
|
||||
timestamp: number | null
|
||||
name?: string | null
|
||||
title?: string | null
|
||||
mimetype?: string | null
|
||||
filetype?: string | null
|
||||
pretty_type?: string | null
|
||||
user?: string | null
|
||||
editable?: boolean | null
|
||||
size?: number | null
|
||||
mode?: string | null
|
||||
is_external?: boolean | null
|
||||
is_public?: boolean | null
|
||||
url_private?: string | null
|
||||
url_private_download?: string | null
|
||||
permalink?: string | null
|
||||
channels?: string[]
|
||||
groups?: string[]
|
||||
ims?: string[]
|
||||
canvas_readtime?: number | null
|
||||
is_channel_space?: boolean | null
|
||||
linked_channel_id?: string | null
|
||||
canvas_creator_id?: string | null
|
||||
}
|
||||
|
||||
export interface SlackCanvasPaging {
|
||||
count: number
|
||||
total: number
|
||||
page: number
|
||||
pages: number
|
||||
}
|
||||
|
||||
export interface SlackCanvasSection {
|
||||
id: string
|
||||
}
|
||||
|
||||
export interface SlackGetCanvasResponse extends ToolResponse {
|
||||
output: {
|
||||
canvas: SlackCanvasFile
|
||||
}
|
||||
}
|
||||
|
||||
export interface SlackListCanvasesResponse extends ToolResponse {
|
||||
output: {
|
||||
canvases: SlackCanvasFile[]
|
||||
paging: SlackCanvasPaging
|
||||
}
|
||||
}
|
||||
|
||||
export interface SlackLookupCanvasSectionsResponse extends ToolResponse {
|
||||
output: {
|
||||
sections: SlackCanvasSection[]
|
||||
}
|
||||
}
|
||||
|
||||
export interface SlackDeleteCanvasResponse extends ToolResponse {
|
||||
output: {
|
||||
ok: boolean
|
||||
}
|
||||
}
|
||||
|
||||
export interface SlackView {
|
||||
id: string
|
||||
team_id?: string | null
|
||||
@@ -1143,6 +1314,10 @@ export type SlackResponse =
|
||||
| SlackGetUserPresenceResponse
|
||||
| SlackEditCanvasResponse
|
||||
| SlackCreateChannelCanvasResponse
|
||||
| SlackGetCanvasResponse
|
||||
| SlackListCanvasesResponse
|
||||
| SlackLookupCanvasSectionsResponse
|
||||
| SlackDeleteCanvasResponse
|
||||
| SlackCreateConversationResponse
|
||||
| SlackInviteToConversationResponse
|
||||
| SlackOpenViewResponse
|
||||
|
||||
28
apps/sim/tools/slack/utils.ts
Normal file
28
apps/sim/tools/slack/utils.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
import type { SlackCanvasFile } from '@/tools/slack/types'
|
||||
|
||||
export const mapCanvasFile = (file: SlackCanvasFile): SlackCanvasFile => ({
|
||||
id: file.id,
|
||||
created: file.created ?? null,
|
||||
timestamp: file.timestamp ?? null,
|
||||
name: file.name ?? null,
|
||||
title: file.title ?? null,
|
||||
mimetype: file.mimetype ?? null,
|
||||
filetype: file.filetype ?? null,
|
||||
pretty_type: file.pretty_type ?? null,
|
||||
user: file.user ?? null,
|
||||
editable: file.editable ?? null,
|
||||
size: file.size ?? null,
|
||||
mode: file.mode ?? null,
|
||||
is_external: file.is_external ?? null,
|
||||
is_public: file.is_public ?? null,
|
||||
url_private: file.url_private ?? null,
|
||||
url_private_download: file.url_private_download ?? null,
|
||||
permalink: file.permalink ?? null,
|
||||
channels: file.channels ?? [],
|
||||
groups: file.groups ?? [],
|
||||
ims: file.ims ?? [],
|
||||
canvas_readtime: file.canvas_readtime ?? null,
|
||||
is_channel_space: file.is_channel_space ?? null,
|
||||
linked_channel_id: file.linked_channel_id ?? null,
|
||||
canvas_creator_id: file.canvas_creator_id ?? null,
|
||||
})
|
||||
Reference in New Issue
Block a user