Compare commits

...

10 Commits

Author SHA1 Message Date
Vikhyath Mondreti
489f2d3bd0 v0.6.60: copilot security improvements, slack canvas ops, retention jobs fixes 2026-04-27 12:37:28 -07:00
Theodore Li
65e17de065 fix(retention-job): add chunking strategy for cleanup (#4305)
* fix(retention-job): add chunking strategy for cleanup

* change stats to be perjob not per chunk
2026-04-27 15:13:00 -04:00
Vikhyath Mondreti
79ff5d80b3 improvement(slack): channel selector for list canvasses (#4307) 2026-04-27 12:11:14 -07:00
Vikhyath Mondreti
2a52141d2f feat(slack): canvas related operations (#4306)
* feat(slack): canvas related operations

* extract shared code
2026-04-27 12:00:40 -07:00
Theodore Li
76ad59fd7d fix(stream): Avoid bun memory leak bug from TransformStream (#4255)
* Avoid bun memory leak bug from TransformStream

* fix(executor): skip content persistence when stream consumer exits early

Previously, if the onStream consumer caught an internal error without
re-throwing, the block-executor would treat the shortened accumulator
as the complete response, persist a truncated string to memory via
appendToMemory, and set it as executionOutput.content.

Track whether the source ReadableStream actually closed (done=true) in
the pull handler. If onStream returns before the source drains, skip
content persistence and log a warning — the old tee()-based flow was
immune to this because the executor branch drained independently of
the client branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix lint

* fix(executor): early-return when no streamed content to make onFullContent symmetric

Previously, executionOutput.content was guarded by `if (fullContent)`
but `onFullContent` fired regardless. The agent-handler implementor
defensively bails on empty/whitespace content, but that's a callee
contract, not enforced at the call site — future implementors could
spuriously persist empty assistant turns to memory.

Hoist the `!fullContent` check to a single early return, so both the
output write and the callback share the same precondition.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 14:23:36 -04:00
Waleed
c32c1cb917 fix(security): patch copilot tool & multipart upload IDORs (#4304)
* fix(security): patch copilot tool & multipart upload IDORs

- multipart upload: bind upload session to (userId, workspaceId, key)
  via short-lived HMAC-signed token; require workspace write access at
  initiate; source key/uploadId/context from verified token (never
  client) at get-part-urls/complete/abort
- copilot knowledge-base tools: gate all 11 read/write/tag/connector
  ops with checkKnowledgeBaseAccess / checkKnowledgeBaseWriteAccess
- copilot user-table tools: add workspace-id check to get, get_schema,
  add/rename/delete/update_column to match existing op pattern
- copilot manage-credential: add full ownership/write-permission auth
  via getCredentialActorContext (previously had no auth)
- copilot restore-resource: verify workspace ownership and write
  permission for workflow, table, knowledgebase, file, and folder
  restores
- copilot folder rename/move: verify both folderId and parentId belong
  to the caller's workspace via new verifyFolderWorkspace helper
- copilot get-job-logs: verify schedule belongs to caller's workspace

* fix(security): address PR review — document IDOR, log count, token split

- knowledge-base delete_document/update_document: verify each document
  belongs to the claimed knowledgeBaseId via checkDocumentWriteAccess
  (was: trusted args.knowledgeBaseId without binding it to the document)
- multipart batch complete: log verifiedEntries.length instead of raw
  client-supplied data.uploads.length
- upload-token: reject tokens with !=2 dot-delimited segments

* fix(security): close folder workspace bypass when workspaceId is falsy
2026-04-27 11:05:22 -07:00
Vikhyath Mondreti
58a3ae2aa4 v0.6.59: gpt 5.5, security hardening, parallel subagents rendering 2026-04-27 10:02:06 -07:00
Vikhyath Mondreti
50e74f75ef fix(mothership): parallel subagent rendering, exec stream re-attach (#4299)
* fix(mothership): parallel subagent rendering

* fix rendering logic

* address comments

* cleanup dead code

* address mothership legacy key

* prevent id collision

* rollout stream edge case

* address bugbot comments"

* remove unused fallbacks

* fix execution stream attach, cleanup comments

* remove debug logs

* cleanup failed reconnect
2026-04-27 00:40:39 -07:00
Waleed
60652e621c fix(security): credential-set invite email check + shopify authorize XSS (#4302) 2026-04-26 20:52:42 -07:00
Waleed
8863f1132a feat(models): add gpt-5.5 models (#4300)
* feat(models): add gpt-5.5 models

* fix(models): address gpt-5.5 review feedback

* fix(models): align gpt-5.5 pro controls
2026-04-25 19:13:29 -07:00
44 changed files with 2336 additions and 741 deletions

View File

@@ -32,7 +32,9 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
const returnUrl = request.nextUrl.searchParams.get('returnUrl')
if (!shopDomain) {
const returnUrlParam = returnUrl ? encodeURIComponent(returnUrl) : ''
const safeReturnUrl =
returnUrl && isSameOrigin(returnUrl) ? encodeURIComponent(returnUrl) : ''
const returnUrlJsLiteral = JSON.stringify(safeReturnUrl)
return new NextResponse(
`<!DOCTYPE html>
<html>
@@ -120,7 +122,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
</div>
<script>
const returnUrl = '${returnUrlParam}';
const returnUrl = ${returnUrlJsLiteral};
function handleSubmit(e) {
e.preventDefault();
let shop = document.getElementById('shop').value.trim().toLowerCase();

View File

@@ -12,6 +12,7 @@ import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { normalizeEmail } from '@/lib/invitations/core'
import { syncAllWebhooksForCredentialSet } from '@/lib/webhooks/utils.server'
const logger = createLogger('CredentialSetInviteToken')
@@ -111,6 +112,21 @@ export const POST = withRouteHandler(
return NextResponse.json({ error: 'Invitation has expired' }, { status: 410 })
}
if (invitation.email) {
const sessionEmail = session.user.email
if (!sessionEmail || normalizeEmail(sessionEmail) !== normalizeEmail(invitation.email)) {
logger.warn('Rejected credential set invitation accept due to email mismatch', {
invitationId: invitation.id,
credentialSetId: invitation.credentialSetId,
userId: session.user.id,
})
return NextResponse.json(
{ error: 'This invitation was sent to a different email address' },
{ status: 403 }
)
}
}
const existingMember = await db
.select()
.from(credentialSetMember)

View File

@@ -8,21 +8,61 @@ import {
isUsingCloudStorage,
type StorageContext,
} from '@/lib/uploads'
import {
signUploadToken,
type UploadTokenPayload,
verifyUploadToken,
} from '@/lib/uploads/core/upload-token'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
const logger = createLogger('MultipartUploadAPI')
const ALLOWED_UPLOAD_CONTEXTS = new Set<StorageContext>([
'knowledge-base',
'chat',
'copilot',
'mothership',
'execution',
'workspace',
'profile-pictures',
'og-images',
'logs',
'workspace-logos',
])
interface InitiateMultipartRequest {
fileName: string
contentType: string
fileSize: number
workspaceId: string
context?: StorageContext
}
interface GetPartUrlsRequest {
uploadId: string
key: string
interface TokenBoundRequest {
uploadToken: string
}
interface GetPartUrlsRequest extends TokenBoundRequest {
partNumbers: number[]
context?: StorageContext
}
interface CompleteSingleRequest extends TokenBoundRequest {
parts: unknown
}
interface CompleteBatchRequest {
uploads: Array<TokenBoundRequest & { parts: unknown }>
}
const verifyTokenForUser = (token: string | undefined, userId: string) => {
if (!token || typeof token !== 'string') {
return null
}
const result = verifyUploadToken(token)
if (!result.valid || result.payload.userId !== userId) {
return null
}
return result.payload
}
export const POST = withRouteHandler(async (request: NextRequest) => {
@@ -31,6 +71,7 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const userId = session.user.id
const action = request.nextUrl.searchParams.get('action')
@@ -45,32 +86,34 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
switch (action) {
case 'initiate': {
const data: InitiateMultipartRequest = await request.json()
const { fileName, contentType, fileSize, context = 'knowledge-base' } = data
const data = (await request.json()) as InitiateMultipartRequest
const { fileName, contentType, fileSize, workspaceId, context = 'knowledge-base' } = data
if (!workspaceId || typeof workspaceId !== 'string') {
return NextResponse.json({ error: 'workspaceId is required' }, { status: 400 })
}
if (!ALLOWED_UPLOAD_CONTEXTS.has(context)) {
return NextResponse.json({ error: 'Invalid storage context' }, { status: 400 })
}
const permission = await getUserEntityPermissions(userId, 'workspace', workspaceId)
if (permission !== 'write' && permission !== 'admin') {
return NextResponse.json({ error: 'Forbidden' }, { status: 403 })
}
const config = getStorageConfig(context)
let uploadId: string
let key: string
if (storageProvider === 's3') {
const { initiateS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client')
const result = await initiateS3MultipartUpload({
fileName,
contentType,
fileSize,
})
logger.info(
`Initiated S3 multipart upload for ${fileName} (context: ${context}): ${result.uploadId}`
)
return NextResponse.json({
uploadId: result.uploadId,
key: result.key,
})
}
if (storageProvider === 'blob') {
const result = await initiateS3MultipartUpload({ fileName, contentType, fileSize })
uploadId = result.uploadId
key = result.key
} else if (storageProvider === 'blob') {
const { initiateMultipartUpload } = await import('@/lib/uploads/providers/blob/client')
const result = await initiateMultipartUpload({
fileName,
contentType,
@@ -82,46 +125,55 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
connectionString: config.connectionString,
},
})
logger.info(
`Initiated Azure multipart upload for ${fileName} (context: ${context}): ${result.uploadId}`
uploadId = result.uploadId
key = result.key
} else {
return NextResponse.json(
{ error: `Unsupported storage provider: ${storageProvider}` },
{ status: 400 }
)
return NextResponse.json({
uploadId: result.uploadId,
key: result.key,
})
}
return NextResponse.json(
{ error: `Unsupported storage provider: ${storageProvider}` },
{ status: 400 }
const uploadToken = signUploadToken({
uploadId,
key,
userId,
workspaceId,
context,
})
logger.info(
`Initiated ${storageProvider} multipart upload for ${fileName} (context: ${context}, workspace: ${workspaceId}): ${uploadId}`
)
return NextResponse.json({ uploadId, key, uploadToken })
}
case 'get-part-urls': {
const data: GetPartUrlsRequest = await request.json()
const { uploadId, key, partNumbers, context = 'knowledge-base' } = data
const data = (await request.json()) as GetPartUrlsRequest
const { partNumbers } = data
const tokenPayload = verifyTokenForUser(data.uploadToken, userId)
if (!tokenPayload) {
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
}
const { uploadId, key, context } = tokenPayload
const config = getStorageConfig(context)
if (storageProvider === 's3') {
const { getS3MultipartPartUrls } = await import('@/lib/uploads/providers/s3/client')
const presignedUrls = await getS3MultipartPartUrls(key, uploadId, partNumbers)
return NextResponse.json({ presignedUrls })
}
if (storageProvider === 'blob') {
const { getMultipartPartUrls } = await import('@/lib/uploads/providers/blob/client')
const presignedUrls = await getMultipartPartUrls(key, partNumbers, {
containerName: config.containerName!,
accountName: config.accountName!,
accountKey: config.accountKey,
connectionString: config.connectionString,
})
return NextResponse.json({ presignedUrls })
}
@@ -132,24 +184,32 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
}
case 'complete': {
const data = await request.json()
const context: StorageContext = data.context || 'knowledge-base'
const data = (await request.json()) as CompleteSingleRequest | CompleteBatchRequest
const config = getStorageConfig(context)
if ('uploads' in data && Array.isArray(data.uploads)) {
const verified = data.uploads.map((upload) => {
const payload = verifyTokenForUser(upload.uploadToken, userId)
return payload ? { payload, parts: upload.parts } : null
})
if (verified.some((entry) => entry === null)) {
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
}
const verifiedEntries = verified.filter(
(entry): entry is { payload: UploadTokenPayload; parts: unknown } => entry !== null
)
if ('uploads' in data) {
const results = await Promise.all(
data.uploads.map(async (upload: any) => {
const { uploadId, key } = upload
verifiedEntries.map(async ({ payload, parts }) => {
const { uploadId, key, context } = payload
const config = getStorageConfig(context)
if (storageProvider === 's3') {
const { completeS3MultipartUpload } = await import(
'@/lib/uploads/providers/s3/client'
)
const parts = upload.parts // S3 format: { ETag, PartNumber }
const result = await completeS3MultipartUpload(key, uploadId, parts)
const result = await completeS3MultipartUpload(key, uploadId, parts as any)
return {
success: true,
location: result.location,
@@ -161,15 +221,12 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
const { completeMultipartUpload } = await import(
'@/lib/uploads/providers/blob/client'
)
const parts = upload.parts // Azure format: { blockId, partNumber }
const result = await completeMultipartUpload(key, parts, {
const result = await completeMultipartUpload(key, parts as any, {
containerName: config.containerName!,
accountName: config.accountName!,
accountKey: config.accountKey,
connectionString: config.connectionString,
})
return {
success: true,
location: result.location,
@@ -182,19 +239,23 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
})
)
logger.info(`Completed ${data.uploads.length} multipart uploads (context: ${context})`)
logger.info(`Completed ${verifiedEntries.length} multipart uploads`)
return NextResponse.json({ results })
}
const { uploadId, key, parts } = data
const single = data as CompleteSingleRequest
const tokenPayload = verifyTokenForUser(single.uploadToken, userId)
if (!tokenPayload) {
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
}
const { uploadId, key, context } = tokenPayload
const config = getStorageConfig(context)
if (storageProvider === 's3') {
const { completeS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client')
const result = await completeS3MultipartUpload(key, uploadId, parts)
const result = await completeS3MultipartUpload(key, uploadId, single.parts as any)
logger.info(`Completed S3 multipart upload for key ${key} (context: ${context})`)
return NextResponse.json({
success: true,
location: result.location,
@@ -204,16 +265,13 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
}
if (storageProvider === 'blob') {
const { completeMultipartUpload } = await import('@/lib/uploads/providers/blob/client')
const result = await completeMultipartUpload(key, parts, {
const result = await completeMultipartUpload(key, single.parts as any, {
containerName: config.containerName!,
accountName: config.accountName!,
accountKey: config.accountKey,
connectionString: config.connectionString,
})
logger.info(`Completed Azure multipart upload for key ${key} (context: ${context})`)
return NextResponse.json({
success: true,
location: result.location,
@@ -229,27 +287,27 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
}
case 'abort': {
const data = await request.json()
const { uploadId, key, context = 'knowledge-base' } = data
const data = (await request.json()) as TokenBoundRequest
const tokenPayload = verifyTokenForUser(data.uploadToken, userId)
if (!tokenPayload) {
return NextResponse.json({ error: 'Invalid or expired upload token' }, { status: 403 })
}
const config = getStorageConfig(context as StorageContext)
const { uploadId, key, context } = tokenPayload
const config = getStorageConfig(context)
if (storageProvider === 's3') {
const { abortS3MultipartUpload } = await import('@/lib/uploads/providers/s3/client')
await abortS3MultipartUpload(key, uploadId)
logger.info(`Aborted S3 multipart upload for key ${key} (context: ${context})`)
} else if (storageProvider === 'blob') {
const { abortMultipartUpload } = await import('@/lib/uploads/providers/blob/client')
await abortMultipartUpload(key, {
containerName: config.containerName!,
accountName: config.accountName!,
accountKey: config.accountKey,
connectionString: config.connectionString,
})
logger.info(`Aborted Azure multipart upload for key ${key} (context: ${context})`)
} else {
return NextResponse.json(

View File

@@ -156,32 +156,86 @@ function toToolData(tc: NonNullable<ContentBlock['toolCall']>): ToolCallData {
*/
function parseBlocks(blocks: ContentBlock[]): MessageSegment[] {
const segments: MessageSegment[] = []
let group: AgentGroupSegment | null = null
const pushGroup = (nextGroup: AgentGroupSegment, isOpen = false) => {
segments.push({ ...nextGroup, isOpen })
const groupsByKey = new Map<string, AgentGroupSegment>()
let activeGroupKey: string | null = null
const groupKey = (name: string, parentToolCallId: string | undefined) =>
parentToolCallId ? `${name}:${parentToolCallId}` : `${name}:legacy`
const resolveGroupKey = (name: string, parentToolCallId: string | undefined) => {
if (parentToolCallId) return groupKey(name, parentToolCallId)
if (activeGroupKey && groupsByKey.get(activeGroupKey)?.agentName === name) {
return activeGroupKey
}
for (const [key, g] of groupsByKey) {
if (g.agentName === name && g.isOpen) return key
}
return groupKey(name, undefined)
}
const ensureGroup = (
name: string,
parentToolCallId: string | undefined
): { group: AgentGroupSegment; created: boolean } => {
const key = resolveGroupKey(name, parentToolCallId)
const existing = groupsByKey.get(key)
if (existing) return { group: existing, created: false }
const group: AgentGroupSegment = {
type: 'agent_group',
id: `agent-${key}-${segments.length}`,
agentName: name,
agentLabel: resolveAgentLabel(name),
items: [],
isDelegating: false,
isOpen: false,
}
segments.push(group)
groupsByKey.set(key, group)
return { group, created: true }
}
const findGroupForSubagentChunk = (
parentToolCallId: string | undefined
): AgentGroupSegment | undefined => {
if (parentToolCallId) {
for (const [key, g] of groupsByKey) {
if (key.endsWith(`:${parentToolCallId}`)) return g
}
return undefined
}
if (activeGroupKey) return groupsByKey.get(activeGroupKey)
return undefined
}
const flushLanes = () => {
for (const g of groupsByKey.values()) {
g.isOpen = false
g.isDelegating = false
}
groupsByKey.clear()
activeGroupKey = null
}
for (let i = 0; i < blocks.length; i++) {
const block = blocks[i]
if (block.type === 'subagent_text' || block.type === 'subagent_thinking') {
if (!block.content || !group) continue
group.isDelegating = false
const lastItem = group.items[group.items.length - 1]
if (!block.content) continue
const g = findGroupForSubagentChunk(block.parentToolCallId)
if (!g) continue
g.isDelegating = false
const lastItem = g.items[g.items.length - 1]
if (lastItem?.type === 'text') {
lastItem.content += block.content
} else {
group.items.push({ type: 'text', content: block.content })
g.items.push({ type: 'text', content: block.content })
}
continue
}
if (block.type === 'thinking') {
if (!block.content?.trim()) continue
if (group) {
pushGroup(group)
group = null
}
flushLanes()
const last = segments[segments.length - 1]
if (last?.type === 'thinking' && last.endedAt === undefined) {
last.content += block.content
@@ -201,21 +255,19 @@ function parseBlocks(blocks: ContentBlock[]): MessageSegment[] {
if (block.type === 'text') {
if (!block.content) continue
if (block.subagent) {
if (group && group.agentName === block.subagent) {
group.isDelegating = false
const lastItem = group.items[group.items.length - 1]
const g = groupsByKey.get(resolveGroupKey(block.subagent, block.parentToolCallId))
if (g) {
g.isDelegating = false
const lastItem = g.items[g.items.length - 1]
if (lastItem?.type === 'text') {
lastItem.content += block.content
} else {
group.items.push({ type: 'text', content: block.content })
g.items.push({ type: 'text', content: block.content })
}
continue
}
}
if (group) {
pushGroup(group)
group = null
}
flushLanes()
const last = segments[segments.length - 1]
if (last?.type === 'text') {
last.content += block.content
@@ -228,34 +280,23 @@ function parseBlocks(blocks: ContentBlock[]): MessageSegment[] {
if (block.type === 'subagent') {
if (!block.content) continue
const key = block.content
if (group && group.agentName === key) continue
const dispatchToolName = SUBAGENT_DISPATCH_TOOLS[key]
let inheritedDelegation = false
if (group && dispatchToolName) {
const last: AgentGroupItem | undefined = group.items[group.items.length - 1]
if (last?.type === 'tool' && last.data.toolName === dispatchToolName) {
inheritedDelegation = !isToolDone(last.data.status) && Boolean(last.data.streamingArgs)
group.items.pop()
const dispatchToolName = SUBAGENT_DISPATCH_TOOLS[key]
if (dispatchToolName) {
const mship = groupsByKey.get(groupKey('mothership', undefined))
if (mship) {
const last = mship.items[mship.items.length - 1]
if (last?.type === 'tool' && last.data.toolName === dispatchToolName) {
inheritedDelegation = !isToolDone(last.data.status) && Boolean(last.data.streamingArgs)
mship.items.pop()
}
}
if (group.items.length > 0) {
pushGroup(group)
}
group = null
} else if (group) {
pushGroup(group)
group = null
}
group = {
type: 'agent_group',
id: `agent-${key}-${i}`,
agentName: key,
agentLabel: resolveAgentLabel(key),
items: [],
isDelegating: inheritedDelegation,
isOpen: false,
}
groupsByKey.delete(groupKey('mothership', undefined))
const { group: g } = ensureGroup(key, block.parentToolCallId)
if (inheritedDelegation) g.isDelegating = true
g.isOpen = true
activeGroupKey = resolveGroupKey(key, block.parentToolCallId)
continue
}
@@ -267,95 +308,75 @@ function parseBlocks(blocks: ContentBlock[]): MessageSegment[] {
const isDispatch = SUBAGENT_KEYS.has(tc.name) && !tc.calledBy
if (isDispatch) {
if (!group || group.agentName !== tc.name) {
if (group) {
pushGroup(group)
group = null
}
group = {
type: 'agent_group',
id: `agent-${tc.name}-${i}`,
agentName: tc.name,
agentLabel: resolveAgentLabel(tc.name),
items: [],
isDelegating: false,
isOpen: false,
}
}
group.isDelegating = isDelegatingTool(tc)
groupsByKey.delete(groupKey('mothership', undefined))
const { group: g } = ensureGroup(tc.name, tc.id)
g.isDelegating = isDelegatingTool(tc)
g.isOpen = g.isDelegating
continue
}
const tool = toToolData(tc)
if (tc.calledBy && group && group.agentName === tc.calledBy) {
group.isDelegating = false
group.items.push({ type: 'tool', data: tool })
} else if (tc.calledBy) {
if (group) {
pushGroup(group)
group = null
}
group = {
type: 'agent_group',
id: `agent-${tc.calledBy}-${i}`,
agentName: tc.calledBy,
agentLabel: resolveAgentLabel(tc.calledBy),
items: [{ type: 'tool', data: tool }],
isDelegating: false,
isOpen: false,
}
if (tc.calledBy) {
const { group: g, created } = ensureGroup(tc.calledBy, block.parentToolCallId)
g.isDelegating = false
if (created && block.parentToolCallId) g.isOpen = true
g.items.push({ type: 'tool', data: tool })
activeGroupKey = resolveGroupKey(tc.calledBy, block.parentToolCallId)
} else {
if (group && group.agentName === 'mothership') {
group.items.push({ type: 'tool', data: tool })
} else {
if (group) {
pushGroup(group)
group = null
}
group = {
type: 'agent_group',
id: `agent-mothership-${i}`,
agentName: 'mothership',
agentLabel: 'Mothership',
items: [{ type: 'tool', data: tool }],
isDelegating: false,
isOpen: false,
}
}
const { group: g } = ensureGroup('mothership', undefined)
g.items.push({ type: 'tool', data: tool })
}
continue
}
if (block.type === 'options') {
if (!block.options?.length) continue
if (group) {
pushGroup(group)
group = null
}
flushLanes()
segments.push({ type: 'options', items: block.options })
continue
}
if (block.type === 'subagent_end') {
if (group) {
pushGroup(group)
group = null
if (block.parentToolCallId) {
for (const [key, g] of groupsByKey) {
if (key.endsWith(`:${block.parentToolCallId}`)) {
g.isOpen = false
g.isDelegating = false
}
}
if (activeGroupKey?.endsWith(`:${block.parentToolCallId}`)) {
activeGroupKey = null
}
} else {
for (const [key, g] of groupsByKey) {
if (key.endsWith(':legacy') && g.agentName !== 'mothership') {
g.isOpen = false
g.isDelegating = false
}
}
if (activeGroupKey?.endsWith(':legacy')) {
activeGroupKey = null
}
}
continue
}
if (block.type === 'stopped') {
if (group) {
pushGroup(group)
group = null
}
flushLanes()
segments.push({ type: 'stopped' })
}
}
if (group) pushGroup(group, true)
return segments
const visibleSegments = segments.filter(
(segment) =>
segment.type !== 'agent_group' ||
segment.items.length > 0 ||
segment.isDelegating ||
segment.isOpen
)
return visibleSegments
}
/**
@@ -428,12 +449,6 @@ export function MessageContent({
isStreaming &&
!hasTrailingContent &&
(lastSegment.type === 'thinking' || hasSubagentEnded || allLastGroupToolsDone)
const lastOpenSubagentGroupId = [...segments]
.reverse()
.find(
(segment): segment is AgentGroupSegment =>
segment.type === 'agent_group' && segment.agentName !== 'mothership' && segment.isOpen
)?.id
return (
<div className='space-y-[10px]'>
@@ -488,8 +503,8 @@ export function MessageContent({
items={segment.items}
isDelegating={segment.isDelegating}
isStreaming={isStreaming}
autoCollapse={allToolsDone && hasFollowingText}
defaultExpanded={segment.id === lastOpenSubagentGroupId}
autoCollapse={!segment.isOpen && allToolsDone && hasFollowingText}
defaultExpanded={segment.isOpen}
/>
</div>
)

View File

@@ -131,6 +131,7 @@ import type {
MothershipResource,
MothershipResourceType,
QueuedMessage,
ToolCallInfo,
} from '../types'
import { ToolCallStatus } from '../types'
@@ -701,7 +702,9 @@ function parseStreamBatchResponse(value: unknown): StreamBatchResponse {
function toRawPersistedContentBlock(block: ContentBlock): Record<string, unknown> | null {
const persisted = toRawPersistedContentBlockBody(block)
return persisted ? withBlockTiming(persisted, block) : null
if (!persisted) return null
if (block.parentToolCallId) persisted.parentToolCallId = block.parentToolCallId
return withBlockTiming(persisted, block)
}
function toRawPersistedContentBlockBody(block: ContentBlock): Record<string, unknown> | null {
@@ -1215,7 +1218,7 @@ export function useChat(
reader: ReadableStreamDefaultReader<Uint8Array>,
assistantId: string,
expectedGen?: number,
options?: { preserveExistingState?: boolean }
options?: { preserveExistingState?: boolean; suppressWorkflowToolStarts?: boolean }
) => Promise<{ sawStreamError: boolean; sawComplete: boolean }>
>(async () => ({ sawStreamError: false, sawComplete: false }))
const attachToExistingStreamRef = useRef<
@@ -1457,6 +1460,9 @@ export function useChat(
if (handledClientWorkflowToolIdsRef.current.has(toolCallId)) {
return
}
if (recoveringClientWorkflowToolIdsRef.current.has(toolCallId)) {
return
}
handledClientWorkflowToolIdsRef.current.add(toolCallId)
ensureWorkflowToolResource(toolArgs)
@@ -1467,41 +1473,41 @@ export function useChat(
const recoverPendingClientWorkflowTools = useCallback(
async (nextMessages: ChatMessage[]) => {
const pending: ToolCallInfo[] = []
for (const message of nextMessages) {
for (const block of message.contentBlocks ?? []) {
const toolCall = block.toolCall
if (!toolCall || !isWorkflowToolName(toolCall.name)) {
continue
}
if (toolCall.status !== 'executing') {
continue
}
if (!toolCall || !isWorkflowToolName(toolCall.name)) continue
if (toolCall.status !== 'executing') continue
if (
handledClientWorkflowToolIdsRef.current.has(toolCall.id) ||
recoveringClientWorkflowToolIdsRef.current.has(toolCall.id)
) {
continue
}
recoveringClientWorkflowToolIdsRef.current.add(toolCall.id)
pending.push(toolCall)
}
}
try {
const toolArgs = toolCall.params ?? {}
const targetWorkflowId = ensureWorkflowToolResource(toolArgs)
for (const toolCall of pending) {
try {
const toolArgs = toolCall.params ?? {}
const targetWorkflowId = ensureWorkflowToolResource(toolArgs)
if (targetWorkflowId) {
const rebound = await bindRunToolToExecution(toolCall.id, targetWorkflowId)
if (rebound) {
handledClientWorkflowToolIdsRef.current.add(toolCall.id)
continue
}
if (targetWorkflowId) {
const rebound = await bindRunToolToExecution(toolCall.id, targetWorkflowId)
if (rebound) {
handledClientWorkflowToolIdsRef.current.add(toolCall.id)
continue
}
startClientWorkflowTool(toolCall.id, toolCall.name, toolArgs)
} finally {
recoveringClientWorkflowToolIdsRef.current.delete(toolCall.id)
}
recoveringClientWorkflowToolIdsRef.current.delete(toolCall.id)
startClientWorkflowTool(toolCall.id, toolCall.name, toolArgs)
} finally {
recoveringClientWorkflowToolIdsRef.current.delete(toolCall.id)
}
}
},
@@ -1701,7 +1707,7 @@ export function useChat(
reader: ReadableStreamDefaultReader<Uint8Array>,
assistantId: string,
expectedGen?: number,
options?: { preserveExistingState?: boolean }
options?: { preserveExistingState?: boolean; suppressWorkflowToolStarts?: boolean }
) => {
const decoder = new TextDecoder()
streamReaderRef.current = reader
@@ -1731,6 +1737,7 @@ export function useChat(
for (let i = blocks.length - 1; i >= 0; i--) {
if (blocks[i].type === 'subagent' && blocks[i].content) {
activeSubagent = blocks[i].content
activeSubagentParentToolCallId = blocks[i].parentToolCallId
break
}
if (blocks[i].type === 'subagent_end') {
@@ -1760,23 +1767,45 @@ export function useChat(
if (block && block.endedAt === undefined) block.endedAt = toEventMs(ts)
}
const ensureTextBlock = (subagentName: string | undefined, ts?: string): ContentBlock => {
const ensureTextBlock = (
subagentName: string | undefined,
parentToolCallId: string | undefined,
ts?: string
): ContentBlock => {
const last = blocks[blocks.length - 1]
if (last?.type === 'text' && last.subagent === subagentName) return last
if (
last?.type === 'text' &&
last.subagent === subagentName &&
last.parentToolCallId === parentToolCallId
) {
return last
}
stampBlockEnd(last, ts)
const b: ContentBlock = { type: 'text', content: '', timestamp: toEventMs(ts) }
if (subagentName) b.subagent = subagentName
if (parentToolCallId) b.parentToolCallId = parentToolCallId
blocks.push(b)
return b
}
const ensureThinkingBlock = (subagentName: string | undefined, ts?: string): ContentBlock => {
const ensureThinkingBlock = (
subagentName: string | undefined,
parentToolCallId: string | undefined,
ts?: string
): ContentBlock => {
const targetType = subagentName ? 'subagent_thinking' : 'thinking'
const last = blocks[blocks.length - 1]
if (last?.type === targetType && last.subagent === subagentName) return last
if (
last?.type === targetType &&
last.subagent === subagentName &&
last.parentToolCallId === parentToolCallId
) {
return last
}
stampBlockEnd(last, ts)
const b: ContentBlock = { type: targetType, content: '', timestamp: toEventMs(ts) }
if (subagentName) b.subagent = subagentName
if (parentToolCallId) b.parentToolCallId = parentToolCallId
blocks.push(b)
return b
}
@@ -1793,9 +1822,27 @@ export function useChat(
return activeSubagent
}
const appendInlineErrorTag = (tag: string, subagentName?: string, ts?: string) => {
const resolveParentForSubagentBlock = (
subagent: string | undefined,
scopedParent: string | undefined
): string | undefined => {
if (!subagent) return undefined
if (scopedParent) return scopedParent
if (activeSubagent === subagent) return activeSubagentParentToolCallId
for (const [parent, name] of subagentByParentToolCallId) {
if (name === subagent) return parent
}
return undefined
}
const appendInlineErrorTag = (
tag: string,
subagentName?: string,
parentToolCallId?: string,
ts?: string
) => {
if (runningText.includes(tag)) return
const tb = ensureTextBlock(subagentName, ts)
const tb = ensureTextBlock(subagentName, parentToolCallId, ts)
const prefix = runningText.length > 0 && !runningText.endsWith('\n') ? '\n' : ''
tb.content = `${tb.content ?? ''}${prefix}${tag}`
runningText += `${prefix}${tag}`
@@ -2008,7 +2055,11 @@ export function useChat(
if (chunk) {
const eventTs = typeof parsed.ts === 'string' ? parsed.ts : undefined
if (parsed.payload.channel === MothershipStreamV1TextChannel.thinking) {
const tb = ensureThinkingBlock(scopedSubagent, eventTs)
const scopedParentForBlock = resolveParentForSubagentBlock(
scopedSubagent,
scopedParentToolCallId
)
const tb = ensureThinkingBlock(scopedSubagent, scopedParentForBlock, eventTs)
tb.content = (tb.content ?? '') + chunk
flushText()
break
@@ -2019,7 +2070,11 @@ export function useChat(
lastContentSource !== contentSource &&
runningText.length > 0 &&
!runningText.endsWith('\n')
const tb = ensureTextBlock(scopedSubagent, eventTs)
const scopedParentForBlock = resolveParentForSubagentBlock(
scopedSubagent,
scopedParentToolCallId
)
const tb = ensureTextBlock(scopedSubagent, scopedParentForBlock, eventTs)
const normalizedChunk = needsBoundaryNewline ? `\n${chunk}` : chunk
tb.content = (tb.content ?? '') + normalizedChunk
runningText += normalizedChunk
@@ -2355,9 +2410,17 @@ export function useChat(
}
}
if (!toolMap.has(id)) {
const existingToolCall = toolMap.has(id)
? blocks[toolMap.get(id)!]?.toolCall
: undefined
const isNewToolCall = !existingToolCall
if (isNewToolCall) {
stampBlockEnd(blocks[blocks.length - 1])
toolMap.set(id, blocks.length)
const parentToolCallIdForBlock = resolveParentForSubagentBlock(
scopedSubagent,
scopedParentToolCallId
)
blocks.push({
type: 'tool_call',
toolCall: {
@@ -2368,6 +2431,9 @@ export function useChat(
params: args,
calledBy: scopedSubagent,
},
...(parentToolCallIdForBlock
? { parentToolCallId: parentToolCallIdForBlock }
: {}),
timestamp: Date.now(),
})
if (name === ReadTool.id || isResourceToolName(name)) {
@@ -2385,7 +2451,14 @@ export function useChat(
flush()
if (isWorkflowToolName(name) && !isPartial) {
startClientWorkflowTool(id, name, args ?? {})
const shouldStartWorkflowTool =
!options?.suppressWorkflowToolStarts &&
(isNewToolCall ||
(existingToolCall?.status === ToolCallStatus.executing &&
!existingToolCall.result))
if (shouldStartWorkflowTool) {
startClientWorkflowTool(id, name, args ?? {})
}
}
break
}
@@ -2488,9 +2561,13 @@ export function useChat(
break
}
const spanData = asPayloadRecord(payload.data)
const parentToolCallId =
scopedParentToolCallId ??
(typeof spanData?.tool_call_id === 'string' ? spanData.tool_call_id : undefined)
const parentToolCallIdFromData =
typeof spanData?.tool_call_id === 'string'
? spanData.tool_call_id
: typeof spanData?.toolCallId === 'string'
? spanData.toolCallId
: undefined
const parentToolCallId = scopedParentToolCallId ?? parentToolCallIdFromData
const isPendingPause = spanData?.pending === true
const name = typeof payload.agent === 'string' ? payload.agent : scopedAgentId
if (payload.event === MothershipStreamV1SpanLifecycleEvent.start && name) {
@@ -2505,7 +2582,12 @@ export function useChat(
activeSubagentParentToolCallId = parentToolCallId
if (!isSameActiveSubagent) {
stampBlockEnd(blocks[blocks.length - 1])
blocks.push({ type: 'subagent', content: name, timestamp: Date.now() })
blocks.push({
type: 'subagent',
content: name,
...(parentToolCallId ? { parentToolCallId } : {}),
timestamp: Date.now(),
})
}
if (name === FILE_SUBAGENT_ID && !isSameActiveSubagent) {
applyPreviewSessionUpdate({
@@ -2549,14 +2631,23 @@ export function useChat(
if (name) {
for (let i = blocks.length - 1; i >= 0; i--) {
const b = blocks[i]
if (b.type === 'subagent' && b.content === name && b.endedAt === undefined) {
if (
b.type === 'subagent' &&
b.content === name &&
b.endedAt === undefined &&
(!parentToolCallId || b.parentToolCallId === parentToolCallId)
) {
b.endedAt = endNow
break
}
}
}
stampBlockEnd(blocks[blocks.length - 1])
blocks.push({ type: 'subagent_end', timestamp: endNow })
blocks.push({
type: 'subagent_end',
...(parentToolCallId ? { parentToolCallId } : {}),
timestamp: endNow,
})
flush()
}
break
@@ -2567,6 +2658,7 @@ export function useChat(
appendInlineErrorTag(
buildInlineErrorTag(parsed.payload),
scopedSubagent,
resolveParentForSubagentBlock(scopedSubagent, scopedParentToolCallId),
typeof parsed.ts === 'string' ? parsed.ts : undefined
)
break
@@ -2671,6 +2763,7 @@ export function useChat(
let latestCursor = afterCursor
let seedEvents = opts.initialBatch?.events ?? []
let streamStatus = opts.initialBatch?.status ?? 'unknown'
let suppressSeedWorkflowStarts = seedEvents.length > 0
const isStaleReconnect = () =>
streamGenRef.current !== expectedGen || abortControllerRef.current?.signal.aborted === true
@@ -2689,11 +2782,15 @@ export function useChat(
buildReplayStream(seedEvents).getReader(),
assistantId,
expectedGen,
{ preserveExistingState: true }
{
preserveExistingState: true,
suppressWorkflowToolStarts: suppressSeedWorkflowStarts,
}
)
latestCursor = String(seedEvents[seedEvents.length - 1]?.eventId ?? latestCursor)
lastCursorRef.current = latestCursor
seedEvents = []
suppressSeedWorkflowStarts = false
if (replayResult.sawStreamError) {
return { error: true, aborted: false }
@@ -2998,6 +3095,7 @@ export function useChat(
...(display ? { display } : {}),
calledBy: block.toolCall.calledBy,
},
...(block.parentToolCallId ? { parentToolCallId: block.parentToolCallId } : {}),
...timing,
}
}
@@ -3005,6 +3103,7 @@ export function useChat(
type: block.type,
content: block.content,
...(block.subagent ? { lane: 'subagent' } : {}),
...(block.parentToolCallId ? { parentToolCallId: block.parentToolCallId } : {}),
...timing,
}
})

View File

@@ -133,6 +133,7 @@ export interface ContentBlock {
options?: OptionItem[]
timestamp?: number
endedAt?: number
parentToolCallId?: string
}
export interface ChatMessageAttachment {

View File

@@ -604,6 +604,10 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
const startTime = getHighResTime()
try {
if (!options.workspaceId) {
throw new Error('workspaceId is required for multipart upload')
}
const initiateResponse = await fetch('/api/files/multipart?action=initiate', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
@@ -611,6 +615,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
fileName: file.name,
contentType: getFileContentType(file),
fileSize: file.size,
workspaceId: options.workspaceId,
}),
})
@@ -618,7 +623,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
throw new Error(`Failed to initiate multipart upload: ${initiateResponse.statusText}`)
}
const { uploadId, key } = await initiateResponse.json()
const { uploadId, key, uploadToken } = await initiateResponse.json()
logger.info(`Initiated multipart upload with ID: ${uploadId}`)
const chunkSize = UPLOAD_CONFIG.CHUNK_SIZE
@@ -629,8 +634,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
uploadId,
key,
uploadToken,
partNumbers,
}),
})
@@ -639,7 +643,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
await fetch('/api/files/multipart?action=abort', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ uploadId, key }),
body: JSON.stringify({ uploadToken }),
})
throw new Error(`Failed to get part URLs: ${partUrlsResponse.statusText}`)
}
@@ -723,8 +727,7 @@ export function useKnowledgeUpload(options: UseKnowledgeUploadOptions = {}) {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
uploadId,
key,
uploadToken,
parts: uploadedParts,
}),
})

View File

@@ -34,7 +34,7 @@ import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { getWorkflows } from '@/hooks/queries/utils/workflow-cache'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { isExecutionStreamHttpError, useExecutionStream } from '@/hooks/use-execution-stream'
import { WorkflowValidationError } from '@/serializer'
import { useCurrentWorkflowExecution, useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
@@ -60,6 +60,13 @@ const logger = createLogger('useWorkflowExecution')
*/
const activeReconnections = new Set<string>()
function isReconnectTerminal(error: unknown): boolean {
return (
isExecutionStreamHttpError(error) &&
(error.httpStatus === 404 || error.httpStatus === 403 || error.httpStatus === 401)
)
}
interface DebugValidationResult {
isValid: boolean
error?: string
@@ -1283,8 +1290,7 @@ export function useWorkflowExecution() {
} else {
if (!executor) {
try {
const httpStatus =
isRecord(error) && typeof error.httpStatus === 'number' ? error.httpStatus : undefined
const httpStatus = isExecutionStreamHttpError(error) ? error.httpStatus : undefined
const storeAddConsole = useTerminalConsoleStore.getState().addConsole
if (httpStatus && activeWorkflowId) {
@@ -1867,8 +1873,6 @@ export function useWorkflowExecution() {
activeReconnections.add(reconnectWorkflowId)
executionStream.cancel(reconnectWorkflowId)
setCurrentExecutionId(reconnectWorkflowId, executionId)
setIsExecuting(reconnectWorkflowId, true)
const workflowEdges = useWorkflowStore.getState().edges
const activeBlocksSet = new Set<string>()
@@ -1891,13 +1895,47 @@ export function useWorkflowExecution() {
includeStartConsoleEntry: true,
})
clearExecutionEntries(executionId)
const capturedExecutionId = executionId
const MAX_ATTEMPTS = 5
const BASE_DELAY_MS = 1000
const MAX_DELAY_MS = 15000
let activated = false
const ensureActivated = () => {
if (activated || cleanupRan) return
activated = true
setCurrentExecutionId(reconnectWorkflowId, capturedExecutionId)
setIsExecuting(reconnectWorkflowId, true)
clearExecutionEntries(capturedExecutionId)
}
const wrapHandler =
<T>(handler: (data: T) => void) =>
(data: T) => {
ensureActivated()
handler(data)
}
const cleanupFailedReconnect = () => {
const currentId = useExecutionStore.getState().getCurrentExecutionId(reconnectWorkflowId)
if (currentId && currentId !== capturedExecutionId) return
const hasRunningEntry = useTerminalConsoleStore
.getState()
.getWorkflowEntries(reconnectWorkflowId)
.some((entry) => entry.isRunning && entry.executionId === capturedExecutionId)
if (activated || hasRunningEntry) {
cancelRunningEntries(reconnectWorkflowId)
}
if (currentId === capturedExecutionId) {
setCurrentExecutionId(reconnectWorkflowId, null)
setIsExecuting(reconnectWorkflowId, false)
setActiveBlocks(reconnectWorkflowId, new Set())
}
}
const attemptReconnect = async (attempt: number): Promise<void> => {
if (cleanupRan || reconnectionComplete) return
@@ -1914,38 +1952,39 @@ export function useWorkflowExecution() {
fromEventId,
callbacks: {
onEventId: (eid) => {
ensureActivated()
fromEventId = eid
},
onBlockStarted: handlers.onBlockStarted,
onBlockCompleted: handlers.onBlockCompleted,
onBlockError: handlers.onBlockError,
onBlockChildWorkflowStarted: handlers.onBlockChildWorkflowStarted,
onBlockStarted: wrapHandler(handlers.onBlockStarted),
onBlockCompleted: wrapHandler(handlers.onBlockCompleted),
onBlockError: wrapHandler(handlers.onBlockError),
onBlockChildWorkflowStarted: wrapHandler(handlers.onBlockChildWorkflowStarted),
onExecutionCompleted: () => {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
if (!activated) {
clearExecutionPointer(reconnectWorkflowId)
return
}
const currentId = useExecutionStore
.getState()
.getCurrentExecutionId(reconnectWorkflowId)
if (currentId !== capturedExecutionId) {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
return
}
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
if (currentId !== capturedExecutionId) return
setCurrentExecutionId(reconnectWorkflowId, null)
setIsExecuting(reconnectWorkflowId, false)
setActiveBlocks(reconnectWorkflowId, new Set())
},
onExecutionError: (data) => {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
if (!activated) {
clearExecutionPointer(reconnectWorkflowId)
return
}
const currentId = useExecutionStore
.getState()
.getCurrentExecutionId(reconnectWorkflowId)
if (currentId !== capturedExecutionId) {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
return
}
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
if (currentId !== capturedExecutionId) return
setCurrentExecutionId(reconnectWorkflowId, null)
setIsExecuting(reconnectWorkflowId, false)
setActiveBlocks(reconnectWorkflowId, new Set())
@@ -1957,16 +1996,16 @@ export function useWorkflowExecution() {
})
},
onExecutionCancelled: () => {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
if (!activated) {
clearExecutionPointer(reconnectWorkflowId)
return
}
const currentId = useExecutionStore
.getState()
.getCurrentExecutionId(reconnectWorkflowId)
if (currentId !== capturedExecutionId) {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
return
}
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
if (currentId !== capturedExecutionId) return
setCurrentExecutionId(reconnectWorkflowId, null)
setIsExecuting(reconnectWorkflowId, false)
setActiveBlocks(reconnectWorkflowId, new Set())
@@ -1978,6 +2017,17 @@ export function useWorkflowExecution() {
},
})
} catch (error) {
if (isReconnectTerminal(error)) {
logger.info('Reconnection skipped; run buffer no longer exists', {
executionId: capturedExecutionId,
})
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
clearExecutionPointer(reconnectWorkflowId)
cleanupFailedReconnect()
return
}
logger.warn('Execution reconnection attempt failed', {
executionId: capturedExecutionId,
attempt,
@@ -1986,17 +2036,27 @@ export function useWorkflowExecution() {
if (!cleanupRan && !reconnectionComplete && attempt < MAX_ATTEMPTS) {
return attemptReconnect(attempt + 1)
}
if (!cleanupRan && !reconnectionComplete) {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
cleanupFailedReconnect()
return
}
}
if (!reconnectionComplete && !cleanupRan) {
reconnectionComplete = true
activeReconnections.delete(reconnectWorkflowId)
const currentId = useExecutionStore.getState().getCurrentExecutionId(reconnectWorkflowId)
if (currentId === capturedExecutionId) {
cancelRunningEntries(reconnectWorkflowId)
setCurrentExecutionId(reconnectWorkflowId, null)
setIsExecuting(reconnectWorkflowId, false)
setActiveBlocks(reconnectWorkflowId, new Set())
if (activated) {
const currentId = useExecutionStore
.getState()
.getCurrentExecutionId(reconnectWorkflowId)
if (currentId === capturedExecutionId) {
cancelRunningEntries(reconnectWorkflowId)
setCurrentExecutionId(reconnectWorkflowId, null)
setIsExecuting(reconnectWorkflowId, false)
setActiveBlocks(reconnectWorkflowId, new Set())
}
}
}
}

View File

@@ -4,169 +4,71 @@ import { createLogger } from '@sim/logger'
import { task } from '@trigger.dev/sdk'
import { and, inArray, lt } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
chunkedBatchDelete,
type TableCleanupResult,
} from '@/lib/cleanup/batch-delete'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
import { deleteFileMetadata } from '@/lib/uploads/server/metadata'
const logger = createLogger('CleanupLogs')
const BATCH_SIZE = 2000
const MAX_BATCHES_PER_TIER = 10
interface TierResults {
total: number
deleted: number
deleteFailed: number
interface FileDeleteStats {
filesTotal: number
filesDeleted: number
filesDeleteFailed: number
}
function emptyTierResults(): TierResults {
return {
total: 0,
deleted: 0,
deleteFailed: 0,
filesTotal: 0,
filesDeleted: 0,
filesDeleteFailed: 0,
}
}
async function deleteExecutionFiles(files: unknown, results: TierResults): Promise<void> {
async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise<void> {
if (!isUsingCloudStorage() || !files || !Array.isArray(files)) return
const keys = files.filter((f) => f && typeof f === 'object' && f.key).map((f) => f.key as string)
results.filesTotal += keys.length
stats.filesTotal += keys.length
await Promise.all(
keys.map(async (key) => {
try {
await StorageService.deleteFile({ key, context: 'execution' })
await deleteFileMetadata(key)
results.filesDeleted++
stats.filesDeleted++
} catch (fileError) {
results.filesDeleteFailed++
stats.filesDeleteFailed++
logger.error(`Failed to delete file ${key}:`, { fileError })
}
})
)
}
async function cleanupTier(
async function cleanupWorkflowExecutionLogs(
workspaceIds: string[],
retentionDate: Date,
label: string
): Promise<TierResults> {
const results = emptyTierResults()
if (workspaceIds.length === 0) return results
): Promise<TableCleanupResult & FileDeleteStats> {
const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 }
let batchesProcessed = 0
let hasMore = true
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
const batch = await db
.select({
id: workflowExecutionLogs.id,
files: workflowExecutionLogs.files,
})
.from(workflowExecutionLogs)
.where(
and(
inArray(workflowExecutionLogs.workspaceId, workspaceIds),
lt(workflowExecutionLogs.startedAt, retentionDate)
const dbStats = await chunkedBatchDelete({
tableDef: workflowExecutionLogs,
workspaceIds,
tableName: `${label}/workflow_execution_logs`,
selectChunk: (chunkIds, limit) =>
db
.select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files })
.from(workflowExecutionLogs)
.where(
and(
inArray(workflowExecutionLogs.workspaceId, chunkIds),
lt(workflowExecutionLogs.startedAt, retentionDate)
)
)
)
.limit(BATCH_SIZE)
.limit(limit),
onBatch: async (rows) => {
for (const row of rows) await deleteExecutionFiles(row.files, fileStats)
},
})
results.total += batch.length
if (batch.length === 0) {
hasMore = false
break
}
for (const log of batch) {
await deleteExecutionFiles(log.files, results)
}
const logIds = batch.map((log) => log.id)
try {
const deleted = await db
.delete(workflowExecutionLogs)
.where(inArray(workflowExecutionLogs.id, logIds))
.returning({ id: workflowExecutionLogs.id })
results.deleted += deleted.length
} catch (deleteError) {
results.deleteFailed += logIds.length
logger.error(`Batch delete failed for ${label}:`, { deleteError })
}
batchesProcessed++
hasMore = batch.length === BATCH_SIZE
logger.info(`[${label}] Batch ${batchesProcessed}: ${batch.length} logs processed`)
}
return results
}
interface JobLogCleanupResults {
deleted: number
deleteFailed: number
}
async function cleanupJobExecutionLogsTier(
workspaceIds: string[],
retentionDate: Date,
label: string
): Promise<JobLogCleanupResults> {
const results: JobLogCleanupResults = { deleted: 0, deleteFailed: 0 }
if (workspaceIds.length === 0) return results
let batchesProcessed = 0
let hasMore = true
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
const batch = await db
.select({ id: jobExecutionLogs.id })
.from(jobExecutionLogs)
.where(
and(
inArray(jobExecutionLogs.workspaceId, workspaceIds),
lt(jobExecutionLogs.startedAt, retentionDate)
)
)
.limit(BATCH_SIZE)
if (batch.length === 0) {
hasMore = false
break
}
const logIds = batch.map((log) => log.id)
try {
const deleted = await db
.delete(jobExecutionLogs)
.where(inArray(jobExecutionLogs.id, logIds))
.returning({ id: jobExecutionLogs.id })
results.deleted += deleted.length
} catch (deleteError) {
results.deleteFailed += logIds.length
logger.error(`Batch delete failed for ${label} (job_execution_logs):`, { deleteError })
}
batchesProcessed++
hasMore = batch.length === BATCH_SIZE
logger.info(
`[${label}] job_execution_logs batch ${batchesProcessed}: ${batch.length} rows processed`
)
}
return results
return { ...dbStats, ...fileStats }
}
export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void> {
@@ -190,15 +92,19 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
`[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
)
const results = await cleanupTier(workspaceIds, retentionDate, label)
const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label)
logger.info(
`[${label}] workflow_execution_logs: ${results.deleted} deleted, ${results.deleteFailed} failed out of ${results.total} candidates`
`[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed`
)
const jobLogResults = await cleanupJobExecutionLogsTier(workspaceIds, retentionDate, label)
logger.info(
`[${label}] job_execution_logs: ${jobLogResults.deleted} deleted, ${jobLogResults.deleteFailed} failed`
)
await batchDeleteByWorkspaceAndTimestamp({
tableDef: jobExecutionLogs,
workspaceIdCol: jobExecutionLogs.workspaceId,
timestampCol: jobExecutionLogs.startedAt,
workspaceIds,
retentionDate,
tableName: `${label}/job_execution_logs`,
})
// Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces.
if (payload.plan === 'free') {

View File

@@ -18,9 +18,8 @@ import { and, inArray, isNotNull, lt } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
DEFAULT_BATCH_SIZE,
DEFAULT_MAX_BATCHES_PER_TABLE,
deleteRowsById,
selectRowsByIdChunks,
} from '@/lib/cleanup/batch-delete'
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
import type { StorageContext } from '@/lib/uploads'
@@ -44,35 +43,37 @@ async function selectExpiredWorkspaceFiles(
workspaceIds: string[],
retentionDate: Date
): Promise<WorkspaceFileScope> {
const limit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE
const [legacyRows, multiContextRows] = await Promise.all([
db
.select({ id: workspaceFile.id, key: workspaceFile.key })
.from(workspaceFile)
.where(
and(
inArray(workspaceFile.workspaceId, workspaceIds),
isNotNull(workspaceFile.deletedAt),
lt(workspaceFile.deletedAt, retentionDate)
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: workspaceFile.id, key: workspaceFile.key })
.from(workspaceFile)
.where(
and(
inArray(workspaceFile.workspaceId, chunkIds),
isNotNull(workspaceFile.deletedAt),
lt(workspaceFile.deletedAt, retentionDate)
)
)
)
.limit(limit),
db
.select({
id: workspaceFiles.id,
key: workspaceFiles.key,
context: workspaceFiles.context,
})
.from(workspaceFiles)
.where(
and(
inArray(workspaceFiles.workspaceId, workspaceIds),
isNotNull(workspaceFiles.deletedAt),
lt(workspaceFiles.deletedAt, retentionDate)
.limit(chunkLimit)
),
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({
id: workspaceFiles.id,
key: workspaceFiles.key,
context: workspaceFiles.context,
})
.from(workspaceFiles)
.where(
and(
inArray(workspaceFiles.workspaceId, chunkIds),
isNotNull(workspaceFiles.deletedAt),
lt(workspaceFiles.deletedAt, retentionDate)
)
)
)
.limit(limit),
.limit(chunkLimit)
),
])
return {
@@ -182,17 +183,19 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
// (chats + S3) AND the DB deletes below — selecting twice could return
// different subsets above the LIMIT cap and orphan or prematurely purge data.
const [doomedWorkflows, fileScope] = await Promise.all([
db
.select({ id: workflow.id })
.from(workflow)
.where(
and(
inArray(workflow.workspaceId, workspaceIds),
isNotNull(workflow.archivedAt),
lt(workflow.archivedAt, retentionDate)
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: workflow.id })
.from(workflow)
.where(
and(
inArray(workflow.workspaceId, chunkIds),
isNotNull(workflow.archivedAt),
lt(workflow.archivedAt, retentionDate)
)
)
)
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE),
.limit(chunkLimit)
),
selectExpiredWorkspaceFiles(workspaceIds, retentionDate),
])
@@ -200,11 +203,13 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
let chatCleanup: { execute: () => Promise<void> } | null = null
if (doomedWorkflowIds.length > 0) {
const doomedChats = await db
.select({ id: copilotChats.id })
.from(copilotChats)
.where(inArray(copilotChats.workflowId, doomedWorkflowIds))
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
const doomedChats = await selectRowsByIdChunks(doomedWorkflowIds, (chunkIds, chunkLimit) =>
db
.select({ id: copilotChats.id })
.from(copilotChats)
.where(inArray(copilotChats.workflowId, chunkIds))
.limit(chunkLimit)
)
const doomedChatIds = doomedChats.map((c) => c.id)
if (doomedChatIds.length > 0) {

View File

@@ -13,9 +13,8 @@ import { and, inArray, lt, sql } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
DEFAULT_BATCH_SIZE,
DEFAULT_MAX_BATCHES_PER_TABLE,
deleteRowsById,
selectRowsByIdChunks,
type TableCleanupResult,
} from '@/lib/cleanup/batch-delete'
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
@@ -67,13 +66,15 @@ async function cleanupRunChildren(
): Promise<TableCleanupResult[]> {
if (workspaceIds.length === 0) return []
const runIds = await db
.select({ id: copilotRuns.id })
.from(copilotRuns)
.where(
and(inArray(copilotRuns.workspaceId, workspaceIds), lt(copilotRuns.updatedAt, retentionDate))
)
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
const runIds = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: copilotRuns.id })
.from(copilotRuns)
.where(
and(inArray(copilotRuns.workspaceId, chunkIds), lt(copilotRuns.updatedAt, retentionDate))
)
.limit(chunkLimit)
)
if (runIds.length === 0) {
return RUN_CHILD_TABLES.map((t) => ({ table: `${label}/${t.name}`, deleted: 0, failed: 0 }))
@@ -107,17 +108,15 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void>
`[${label}] Processing ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
)
// Collect chat IDs before deleting so we can clean up the copilot backend after
const doomedChats = await db
.select({ id: copilotChats.id })
.from(copilotChats)
.where(
and(
inArray(copilotChats.workspaceId, workspaceIds),
lt(copilotChats.updatedAt, retentionDate)
const doomedChats = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: copilotChats.id })
.from(copilotChats)
.where(
and(inArray(copilotChats.workspaceId, chunkIds), lt(copilotChats.updatedAt, retentionDate))
)
)
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
.limit(chunkLimit)
)
const doomedChatIds = doomedChats.map((c) => c.id)

View File

@@ -46,6 +46,10 @@ export const SlackBlock: BlockConfig<SlackResponse> = {
{ label: 'Get User Presence', id: 'get_user_presence' },
{ label: 'Edit Canvas', id: 'edit_canvas' },
{ label: 'Create Channel Canvas', id: 'create_channel_canvas' },
{ label: 'Get Canvas Info', id: 'get_canvas' },
{ label: 'List Canvases', id: 'list_canvases' },
{ label: 'Lookup Canvas Sections', id: 'lookup_canvas_sections' },
{ label: 'Delete Canvas', id: 'delete_canvas' },
{ label: 'Create Conversation', id: 'create_conversation' },
{ label: 'Invite to Conversation', id: 'invite_to_conversation' },
{ label: 'Open View', id: 'open_view' },
@@ -146,6 +150,9 @@ export const SlackBlock: BlockConfig<SlackResponse> = {
'get_user',
'get_user_presence',
'edit_canvas',
'get_canvas',
'lookup_canvas_sections',
'delete_canvas',
'create_conversation',
'open_view',
'update_view',
@@ -160,7 +167,11 @@ export const SlackBlock: BlockConfig<SlackResponse> = {
},
}
},
required: true,
required: {
field: 'operation',
value: 'list_canvases',
not: true,
},
},
{
id: 'manualChannel',
@@ -182,6 +193,9 @@ export const SlackBlock: BlockConfig<SlackResponse> = {
'get_user',
'get_user_presence',
'edit_canvas',
'get_canvas',
'lookup_canvas_sections',
'delete_canvas',
'create_conversation',
'open_view',
'update_view',
@@ -196,7 +210,11 @@ export const SlackBlock: BlockConfig<SlackResponse> = {
},
}
},
required: true,
required: {
field: 'operation',
value: 'list_canvases',
not: true,
},
},
{
id: 'dmUserId',
@@ -820,6 +838,121 @@ Return ONLY the timestamp string - no explanations, no quotes, no extra text.`,
value: 'create_channel_canvas',
},
},
// Get Canvas specific fields
{
id: 'getCanvasId',
title: 'Canvas ID',
type: 'short-input',
placeholder: 'Enter canvas ID (e.g., F1234ABCD)',
condition: {
field: 'operation',
value: 'get_canvas',
},
required: true,
},
// List Canvases specific fields
{
id: 'canvasListCount',
title: 'Canvas Limit',
type: 'short-input',
placeholder: '100',
condition: {
field: 'operation',
value: 'list_canvases',
},
mode: 'advanced',
},
{
id: 'canvasListPage',
title: 'Page',
type: 'short-input',
placeholder: '1',
condition: {
field: 'operation',
value: 'list_canvases',
},
mode: 'advanced',
},
{
id: 'canvasListUser',
title: 'User ID',
type: 'short-input',
placeholder: 'Optional creator filter (e.g., U1234567890)',
condition: {
field: 'operation',
value: 'list_canvases',
},
mode: 'advanced',
},
{
id: 'canvasListTsFrom',
title: 'Created After',
type: 'short-input',
placeholder: 'Unix timestamp (e.g., 123456789)',
condition: {
field: 'operation',
value: 'list_canvases',
},
mode: 'advanced',
},
{
id: 'canvasListTsTo',
title: 'Created Before',
type: 'short-input',
placeholder: 'Unix timestamp (e.g., 123456789)',
condition: {
field: 'operation',
value: 'list_canvases',
},
mode: 'advanced',
},
{
id: 'canvasListTeamId',
title: 'Team ID',
type: 'short-input',
placeholder: 'Encoded team ID (org tokens only)',
condition: {
field: 'operation',
value: 'list_canvases',
},
mode: 'advanced',
},
// Lookup Canvas Sections specific fields
{
id: 'lookupCanvasId',
title: 'Canvas ID',
type: 'short-input',
placeholder: 'Enter canvas ID (e.g., F1234ABCD)',
condition: {
field: 'operation',
value: 'lookup_canvas_sections',
},
required: true,
},
{
id: 'sectionCriteria',
title: 'Section Criteria',
type: 'code',
language: 'json',
placeholder: '{"section_types":["h1"],"contains_text":"Roadmap"}',
condition: {
field: 'operation',
value: 'lookup_canvas_sections',
},
required: true,
},
// Delete Canvas specific fields
{
id: 'deleteCanvasId',
title: 'Canvas ID',
type: 'short-input',
placeholder: 'Enter canvas ID (e.g., F1234ABCD)',
condition: {
field: 'operation',
value: 'delete_canvas',
},
required: true,
},
// Create Conversation specific fields
{
id: 'conversationName',
@@ -1058,6 +1191,10 @@ Do not include any explanations, markdown formatting, or other text outside the
'slack_get_user_presence',
'slack_edit_canvas',
'slack_create_channel_canvas',
'slack_get_canvas',
'slack_list_canvases',
'slack_lookup_canvas_sections',
'slack_delete_canvas',
'slack_create_conversation',
'slack_invite_to_conversation',
'slack_open_view',
@@ -1106,6 +1243,14 @@ Do not include any explanations, markdown formatting, or other text outside the
return 'slack_edit_canvas'
case 'create_channel_canvas':
return 'slack_create_channel_canvas'
case 'get_canvas':
return 'slack_get_canvas'
case 'list_canvases':
return 'slack_list_canvases'
case 'lookup_canvas_sections':
return 'slack_lookup_canvas_sections'
case 'delete_canvas':
return 'slack_delete_canvas'
case 'create_conversation':
return 'slack_create_conversation'
case 'invite_to_conversation':
@@ -1164,6 +1309,16 @@ Do not include any explanations, markdown formatting, or other text outside the
canvasTitle,
channelCanvasTitle,
channelCanvasContent,
getCanvasId,
canvasListCount,
canvasListPage,
canvasListUser,
canvasListTsFrom,
canvasListTsTo,
canvasListTeamId,
lookupCanvasId,
sectionCriteria,
deleteCanvasId,
conversationName,
isPrivate,
teamId,
@@ -1343,6 +1498,46 @@ Do not include any explanations, markdown formatting, or other text outside the
}
break
case 'get_canvas':
baseParams.canvasId = getCanvasId
break
case 'list_canvases':
if (canvasListCount) {
const parsedCount = Number.parseInt(canvasListCount, 10)
if (!Number.isNaN(parsedCount) && parsedCount > 0) {
baseParams.count = parsedCount
}
}
if (canvasListPage) {
const parsedPage = Number.parseInt(canvasListPage, 10)
if (!Number.isNaN(parsedPage) && parsedPage > 0) {
baseParams.page = parsedPage
}
}
if (canvasListUser) {
baseParams.user = String(canvasListUser).trim()
}
if (canvasListTsFrom) {
baseParams.tsFrom = String(canvasListTsFrom).trim()
}
if (canvasListTsTo) {
baseParams.tsTo = String(canvasListTsTo).trim()
}
if (canvasListTeamId) {
baseParams.teamId = String(canvasListTeamId).trim()
}
break
case 'lookup_canvas_sections':
baseParams.canvasId = lookupCanvasId
baseParams.criteria = sectionCriteria
break
case 'delete_canvas':
baseParams.canvasId = deleteCanvasId
break
case 'create_conversation':
baseParams.name = conversationName
baseParams.isPrivate = isPrivate === 'true'
@@ -1461,6 +1656,23 @@ Do not include any explanations, markdown formatting, or other text outside the
// Create Channel Canvas inputs
channelCanvasTitle: { type: 'string', description: 'Title for channel canvas' },
channelCanvasContent: { type: 'string', description: 'Content for channel canvas' },
// Canvas management inputs
getCanvasId: { type: 'string', description: 'Canvas ID to retrieve' },
canvasListCount: { type: 'string', description: 'Maximum number of canvases to return' },
canvasListPage: { type: 'string', description: 'Canvas list page number' },
canvasListUser: { type: 'string', description: 'Optional canvas creator user filter' },
canvasListTsFrom: {
type: 'string',
description: 'Filter canvases created after this timestamp',
},
canvasListTsTo: {
type: 'string',
description: 'Filter canvases created before this timestamp',
},
canvasListTeamId: { type: 'string', description: 'Encoded team ID for org tokens' },
lookupCanvasId: { type: 'string', description: 'Canvas ID to search for sections' },
sectionCriteria: { type: 'json', description: 'Canvas section lookup criteria' },
deleteCanvasId: { type: 'string', description: 'Canvas ID to delete' },
// Create Conversation inputs
conversationName: { type: 'string', description: 'Name for the new channel' },
isPrivate: { type: 'string', description: 'Create as private channel (true/false)' },
@@ -1511,6 +1723,26 @@ Do not include any explanations, markdown formatting, or other text outside the
// slack_canvas outputs
canvas_id: { type: 'string', description: 'Canvas identifier for created canvases' },
title: { type: 'string', description: 'Canvas title' },
canvas: {
type: 'json',
description: 'Canvas file metadata returned by Slack',
},
canvases: {
type: 'json',
description: 'Array of canvas file objects returned by Slack',
},
paging: {
type: 'json',
description: 'Pagination information for listed canvases',
},
sections: {
type: 'json',
description: 'Canvas section IDs returned by Slack section lookup',
},
ok: {
type: 'boolean',
description: 'Whether Slack completed the canvas operation successfully',
},
// slack_message_reader outputs (read operation)
messages: {

View File

@@ -34,6 +34,7 @@ import {
type ExecutionContext,
getNextExecutionOrder,
type NormalizedBlockOutput,
type StreamingExecution,
} from '@/executor/types'
import { streamingResponseFormatProcessor } from '@/executor/utils'
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
@@ -140,7 +141,7 @@ export class BlockExecutor {
let normalizedOutput: NormalizedBlockOutput
if (isStreamingExecution) {
const streamingExec = output as { stream: ReadableStream; execution: any }
const streamingExec = output as StreamingExecution
if (ctx.onStream) {
await this.handleStreamingExecution(
@@ -602,7 +603,7 @@ export class BlockExecutor {
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
streamingExec: { stream: ReadableStream; execution: any },
streamingExec: StreamingExecution,
resolvedInputs: Record<string, any>,
selectedOutputs: string[]
): Promise<void> {
@@ -613,56 +614,39 @@ export class BlockExecutor {
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
(block.config as Record<string, any> | undefined)?.responseFormat
const stream = streamingExec.stream
if (typeof stream.tee !== 'function') {
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
return
}
const sourceReader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
const accumulated: string[] = []
let drainError: unknown
let sourceFullyDrained = false
const [clientStream, executorStream] = stream.tee()
const clientSource = new ReadableStream<Uint8Array>({
async pull(controller) {
try {
const { done, value } = await sourceReader.read()
if (done) {
const tail = decoder.decode()
if (tail) accumulated.push(tail)
sourceFullyDrained = true
controller.close()
return
}
accumulated.push(decoder.decode(value, { stream: true }))
controller.enqueue(value)
} catch (error) {
drainError = error
controller.error(error)
}
},
async cancel(reason) {
try {
await sourceReader.cancel(reason)
} catch {}
},
})
const processedClientStream = streamingResponseFormatProcessor.processStream(
clientStream,
blockId,
selectedOutputs,
responseFormat
)
const clientStreamingExec = {
...streamingExec,
stream: processedClientStream,
}
const executorConsumption = this.consumeExecutorStream(
executorStream,
streamingExec,
blockId,
responseFormat
)
const clientConsumption = (async () => {
try {
await ctx.onStream?.(clientStreamingExec)
} catch (error) {
this.execLogger.error('Error in onStream callback', { blockId, error })
// Cancel the client stream to release the tee'd buffer
await processedClientStream.cancel().catch(() => {})
}
})()
await Promise.all([clientConsumption, executorConsumption])
}
private async forwardStream(
ctx: ExecutionContext,
blockId: string,
streamingExec: { stream: ReadableStream; execution: any },
stream: ReadableStream,
responseFormat: any,
selectedOutputs: string[]
): Promise<void> {
const processedStream = streamingResponseFormatProcessor.processStream(
stream,
clientSource,
blockId,
selectedOutputs,
responseFormat
@@ -670,72 +654,75 @@ export class BlockExecutor {
try {
await ctx.onStream?.({
...streamingExec,
stream: processedStream,
stream: processedClientStream,
execution: streamingExec.execution,
})
} catch (error) {
this.execLogger.error('Error in onStream callback', { blockId, error })
await processedStream.cancel().catch(() => {})
}
}
private async consumeExecutorStream(
stream: ReadableStream,
streamingExec: { execution: any },
blockId: string,
responseFormat: any
): Promise<void> {
const reader = stream.getReader()
const decoder = new TextDecoder()
const chunks: string[] = []
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
chunks.push(decoder.decode(value, { stream: true }))
}
const tail = decoder.decode()
if (tail) chunks.push(tail)
} catch (error) {
this.execLogger.error('Error reading executor stream for block', { blockId, error })
await processedClientStream.cancel().catch(() => {})
} finally {
try {
await reader.cancel().catch(() => {})
sourceReader.releaseLock()
} catch {}
}
const fullContent = chunks.join('')
if (drainError) {
this.execLogger.error('Error reading stream for block', { blockId, error: drainError })
return
}
// If the onStream consumer exited before the source drained (e.g. it caught
// an internal error and returned normally), `accumulated` holds a truncated
// response. Persisting that to memory or setting it as the block output
// would corrupt downstream state — skip and log instead.
if (!sourceFullyDrained) {
this.execLogger.warn(
'Stream consumer exited before source drained; skipping content persistence',
{
blockId,
}
)
return
}
const fullContent = accumulated.join('')
if (!fullContent) {
return
}
const executionOutput = streamingExec.execution?.output
if (!executionOutput || typeof executionOutput !== 'object') {
return
}
if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())
streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
if (executionOutput && typeof executionOutput === 'object') {
let parsedForFormat = false
if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())
streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
}
parsedForFormat = true
} catch (error) {
this.execLogger.warn('Failed to parse streamed content for response format', {
blockId,
error,
})
}
return
} catch (error) {
this.execLogger.warn('Failed to parse streamed content for response format', {
blockId,
error,
})
}
if (!parsedForFormat) {
executionOutput.content = fullContent
}
}
executionOutput.content = fullContent
if (streamingExec.onFullContent) {
try {
await streamingExec.onFullContent(fullContent)
} catch (error) {
this.execLogger.error('onFullContent callback failed', { blockId, error })
}
}
}
}

View File

@@ -958,8 +958,16 @@ export class AgentBlockHandler implements BlockHandler {
streamingExec: StreamingExecution
): StreamingExecution {
return {
stream: memoryService.wrapStreamForPersistence(streamingExec.stream, ctx, inputs),
stream: streamingExec.stream,
execution: streamingExec.execution,
onFullContent: async (content: string) => {
if (!content.trim()) return
try {
await memoryService.appendToMemory(ctx, inputs, { role: 'assistant', content })
} catch (error) {
logger.error('Failed to persist streaming response:', error)
}
},
}
}

View File

@@ -111,35 +111,6 @@ export class Memory {
})
}
wrapStreamForPersistence(
stream: ReadableStream<Uint8Array>,
ctx: ExecutionContext,
inputs: AgentInputs
): ReadableStream<Uint8Array> {
const chunks: string[] = []
const decoder = new TextDecoder()
const transformStream = new TransformStream<Uint8Array, Uint8Array>({
transform: (chunk, controller) => {
controller.enqueue(chunk)
const decoded = decoder.decode(chunk, { stream: true })
chunks.push(decoded)
},
flush: () => {
const content = chunks.join('')
if (content.trim()) {
this.appendToMemory(ctx, inputs, {
role: 'assistant',
content,
}).catch((error) => logger.error('Failed to persist streaming response:', error))
}
},
})
return stream.pipeThrough(transformStream)
}
private requireWorkspaceId(ctx: ExecutionContext): string {
if (!ctx.workspaceId) {
throw new Error('workspaceId is required for memory operations')

View File

@@ -359,6 +359,12 @@ export interface ExecutionResult {
export interface StreamingExecution {
stream: ReadableStream
execution: ExecutionResult & { isStreaming?: boolean }
/**
* Invoked with the assembled response text after the stream drains. Lets agent
* blocks persist the full response without interposing a TransformStream on a
* fetch-backed source — that pattern amplifies memory on Bun via #28035.
*/
onFullContent?: (content: string) => void | Promise<void>
}
export interface BlockExecutor {

View File

@@ -18,6 +18,20 @@ import type { SerializableExecutionState } from '@/executor/execution/types'
const logger = createLogger('useExecutionStream')
export class ExecutionStreamHttpError extends Error {
constructor(
message: string,
public readonly httpStatus: number
) {
super(message)
this.name = 'ExecutionStreamHttpError'
}
}
export function isExecutionStreamHttpError(error: unknown): error is ExecutionStreamHttpError {
return error instanceof ExecutionStreamHttpError
}
/**
* Detects errors caused by the browser killing a fetch (page refresh, navigation, tab close).
* These should be treated as clean disconnects, not execution errors.
@@ -205,11 +219,13 @@ export function useExecutionStream() {
if (!response.ok) {
const errorResponse = await response.json()
const error = new Error(errorResponse.error || 'Failed to start execution')
const error = new ExecutionStreamHttpError(
errorResponse.error || 'Failed to start execution',
response.status
)
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
Object.assign(error, { httpStatus: response.status })
throw error
}
@@ -279,15 +295,18 @@ export function useExecutionStream() {
try {
errorResponse = await response.json()
} catch {
const error = new Error(`Server error (${response.status}): ${response.statusText}`)
Object.assign(error, { httpStatus: response.status })
throw error
throw new ExecutionStreamHttpError(
`Server error (${response.status}): ${response.statusText}`,
response.status
)
}
const error = new Error(errorResponse.error || 'Failed to start execution')
const error = new ExecutionStreamHttpError(
errorResponse.error || 'Failed to start execution',
response.status
)
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
Object.assign(error, { httpStatus: response.status })
throw error
}
@@ -335,7 +354,9 @@ export function useExecutionStream() {
`/api/workflows/${workflowId}/executions/${executionId}/stream?from=${fromEventId}`,
{ signal: abortController.signal }
)
if (!response.ok) throw new Error(`Reconnect failed (${response.status})`)
if (!response.ok) {
throw new ExecutionStreamHttpError(`Reconnect failed (${response.status})`, response.status)
}
if (!response.body) throw new Error('No response body')
await processSSEStream(response.body.getReader(), callbacks, 'Reconnect')

View File

@@ -7,6 +7,55 @@ const logger = createLogger('BatchDelete')
export const DEFAULT_BATCH_SIZE = 2000
export const DEFAULT_MAX_BATCHES_PER_TABLE = 10
/**
* Split workspaceIds into this-sized groups before running SELECT/DELETE. Large
* IN lists combined with `started_at < X` force Postgres to probe every
* workspace range in the composite index, which blows the 90s statement timeout
* at the scale of the full free tier.
*/
export const DEFAULT_WORKSPACE_CHUNK_SIZE = 50
export function chunkArray<T>(arr: T[], size: number): T[][] {
const out: T[][] = []
for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size))
return out
}
export interface SelectByIdChunksOptions {
/** Cap on rows returned across all chunks. Defaults to a full per-table cleanup budget. */
overallLimit?: number
chunkSize?: number
}
/**
* Run a SELECT query once per ID chunk and concatenate results up to
* `overallLimit`. Each chunk's query is passed the remaining row budget so the
* total never exceeds the cap. Use this when you need the selected row set
* (e.g. to drive S3 or copilot-backend cleanup alongside the DB delete).
*
* Works for any large ID set — workspace IDs, workflow IDs, etc. Avoids
* sending one massive `IN (...)` list that would blow Postgres's statement
* timeout.
*/
export async function selectRowsByIdChunks<T>(
ids: string[],
query: (chunkIds: string[], chunkLimit: number) => Promise<T[]>,
{
overallLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE,
chunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE,
}: SelectByIdChunksOptions = {}
): Promise<T[]> {
if (ids.length === 0) return []
const rows: T[] = []
for (const chunkIds of chunkArray(ids, chunkSize)) {
if (rows.length >= overallLimit) break
const remaining = overallLimit - rows.length
const chunkRows = await query(chunkIds, remaining)
rows.push(...chunkRows)
}
return rows
}
export interface TableCleanupResult {
table: string
@@ -14,6 +63,111 @@ export interface TableCleanupResult {
failed: number
}
export interface ChunkedBatchDeleteOptions<TRow extends { id: string }> {
tableDef: PgTable
workspaceIds: string[]
tableName: string
/** SELECT eligible rows for one workspace chunk. The result must include `id`. */
selectChunk: (chunkIds: string[], limit: number) => Promise<TRow[]>
/** Runs between SELECT and DELETE; receives the just-selected rows. */
onBatch?: (rows: TRow[]) => Promise<void>
batchSize?: number
/** Max batches per workspace chunk. */
maxBatches?: number
/**
* Hard cap on rows processed (deleted + failed) across all chunks per call.
* Defaults to `DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE`. Cron
* runs frequently enough to catch up the backlog over multiple invocations.
*/
totalRowLimit?: number
workspaceChunkSize?: number
}
/**
* Inner loop primitive for cleanup jobs.
*
* For each workspace chunk: SELECT a batch of eligible rows → run optional
* `onBatch` hook (e.g. to delete S3 files) → DELETE those rows by ID. Repeats
* until exhausted or `maxBatches` is hit, then moves to the next chunk. Stops
* the whole call once `totalRowLimit` rows have been processed.
*
* Workspace IDs are chunked before the SELECT — see
* `DEFAULT_WORKSPACE_CHUNK_SIZE` for why.
*/
export async function chunkedBatchDelete<TRow extends { id: string }>({
tableDef,
workspaceIds,
tableName,
selectChunk,
onBatch,
batchSize = DEFAULT_BATCH_SIZE,
maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE,
totalRowLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE,
workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE,
}: ChunkedBatchDeleteOptions<TRow>): Promise<TableCleanupResult> {
const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 }
if (workspaceIds.length === 0) {
logger.info(`[${tableName}] Skipped — no workspaces in scope`)
return result
}
const chunks = chunkArray(workspaceIds, workspaceChunkSize)
let stoppedEarly = false
for (const [chunkIdx, chunkIds] of chunks.entries()) {
if (result.deleted + result.failed >= totalRowLimit) {
stoppedEarly = true
break
}
let batchesProcessed = 0
let hasMore = true
while (
hasMore &&
batchesProcessed < maxBatches &&
result.deleted + result.failed < totalRowLimit
) {
let rows: TRow[] = []
try {
rows = await selectChunk(chunkIds, batchSize)
if (rows.length === 0) {
hasMore = false
break
}
if (onBatch) await onBatch(rows)
const ids = rows.map((r) => r.id)
const deleted = await db
.delete(tableDef)
.where(inArray(sql`id`, ids))
.returning({ id: sql`id` })
result.deleted += deleted.length
hasMore = rows.length === batchSize
batchesProcessed++
} catch (error) {
// Count rows we tried to delete; SELECT-stage errors leave rows=[].
result.failed += rows.length
logger.error(
`[${tableName}] Batch failed (chunk ${chunkIdx + 1}/${chunks.length}, ${rows.length} rows):`,
{ error }
)
hasMore = false
}
}
}
logger.info(
`[${tableName}] Complete: ${result.deleted} deleted, ${result.failed} failed across ${chunks.length} chunks${stoppedEarly ? ' (row-limit reached, remaining chunks deferred to next run)' : ''}`
)
return result
}
export interface BatchDeleteOptions {
tableDef: PgTable
workspaceIdCol: PgColumn
@@ -25,13 +179,13 @@ export interface BatchDeleteOptions {
requireTimestampNotNull?: boolean
batchSize?: number
maxBatches?: number
workspaceChunkSize?: number
}
/**
* Iteratively delete rows in a table matching a workspace + time-based predicate.
*
* Uses a SELECT-with-LIMIT → DELETE-by-ID pattern to keep each round bounded in
* memory and I/O (PostgreSQL DELETE does not support LIMIT directly).
* Convenience wrapper around `chunkedBatchDelete` for the common case: delete
* rows where `workspaceId IN (...) AND timestamp < retentionDate`. Use this
* when there's no per-row side effect (e.g. no S3 files to clean up alongside).
*/
export async function batchDeleteByWorkspaceAndTimestamp({
tableDef,
@@ -41,56 +195,23 @@ export async function batchDeleteByWorkspaceAndTimestamp({
retentionDate,
tableName,
requireTimestampNotNull = false,
batchSize = DEFAULT_BATCH_SIZE,
maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE,
...rest
}: BatchDeleteOptions): Promise<TableCleanupResult> {
const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 }
if (workspaceIds.length === 0) {
logger.info(`[${tableName}] Skipped — no workspaces in scope`)
return result
}
const predicates = [inArray(workspaceIdCol, workspaceIds), lt(timestampCol, retentionDate)]
if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol))
const whereClause = and(...predicates)
let batchesProcessed = 0
let hasMore = true
while (hasMore && batchesProcessed < maxBatches) {
try {
const batch = await db
return chunkedBatchDelete({
tableDef,
workspaceIds,
tableName,
selectChunk: (chunkIds, limit) => {
const predicates = [inArray(workspaceIdCol, chunkIds), lt(timestampCol, retentionDate)]
if (requireTimestampNotNull) predicates.push(isNotNull(timestampCol))
return db
.select({ id: sql<string>`id` })
.from(tableDef)
.where(whereClause)
.limit(batchSize)
if (batch.length === 0) {
logger.info(`[${tableName}] No expired rows found`)
hasMore = false
break
}
const ids = batch.map((r) => r.id)
const deleted = await db
.delete(tableDef)
.where(inArray(sql`id`, ids))
.returning({ id: sql`id` })
result.deleted += deleted.length
hasMore = batch.length === batchSize
batchesProcessed++
logger.info(`[${tableName}] Batch ${batchesProcessed}: deleted ${deleted.length} rows`)
} catch (error) {
result.failed++
logger.error(`[${tableName}] Batch delete failed:`, { error })
hasMore = false
}
}
return result
.where(and(...predicates))
.limit(limit)
},
...rest,
})
}
/**

View File

@@ -46,7 +46,11 @@ function toToolCallInfo(block: PersistedContentBlock): ToolCallInfo | undefined
function toDisplayBlock(block: PersistedContentBlock): ContentBlock | undefined {
const displayed = toDisplayBlockBody(block)
return displayed ? withBlockTiming(displayed, block) : undefined
if (!displayed) return undefined
if (block.parentToolCallId && displayed.parentToolCallId === undefined) {
displayed.parentToolCallId = block.parentToolCallId
}
return withBlockTiming(displayed, block)
}
function toDisplayBlockBody(block: PersistedContentBlock): ContentBlock | undefined {

View File

@@ -77,11 +77,16 @@ function appendTextBlock(
content: string,
options: {
lane?: 'subagent'
parentToolCallId?: string
}
): void {
if (!content) return
const last = blocks[blocks.length - 1]
if (last?.type === MothershipStreamV1EventType.text && last.lane === options.lane) {
if (
last?.type === MothershipStreamV1EventType.text &&
last.lane === options.lane &&
last.parentToolCallId === options.parentToolCallId
) {
last.content = `${typeof last.content === 'string' ? last.content : ''}${content}`
return
}
@@ -89,6 +94,7 @@ function appendTextBlock(
blocks.push({
type: MothershipStreamV1EventType.text,
...(options.lane ? { lane: options.lane } : {}),
...(options.parentToolCallId ? { parentToolCallId: options.parentToolCallId } : {}),
content,
})
}
@@ -122,10 +128,24 @@ function buildLiveAssistantMessage(params: {
return activeSubagent
}
const resolveParentForSubagentBlock = (
subagent: string | undefined,
scopedParent: string | undefined
): string | undefined => {
if (!subagent) return undefined
if (scopedParent) return scopedParent
if (activeSubagent === subagent) return activeSubagentParentToolCallId
for (const [parent, name] of subagentByParentToolCallId) {
if (name === subagent) return parent
}
return undefined
}
const ensureToolBlock = (input: {
toolCallId: string
toolName: string
calledBy?: string
parentToolCallId?: string
displayTitle?: string
params?: Record<string, unknown>
result?: { success: boolean; output?: unknown; error?: string }
@@ -155,6 +175,7 @@ function buildLiveAssistantMessage(params: {
? { display: existingToolCall.display }
: {}),
}
if (input.parentToolCallId) existing.parentToolCallId = input.parentToolCallId
return existing
}
@@ -176,6 +197,7 @@ function buildLiveAssistantMessage(params: {
}
: {}),
},
...(input.parentToolCallId ? { parentToolCallId: input.parentToolCallId } : {}),
}
toolIndexById.set(input.toolCallId, blocks.length)
blocks.push(nextBlock)
@@ -219,8 +241,10 @@ function buildLiveAssistantMessage(params: {
runningText.length > 0 &&
!runningText.endsWith('\n')
const normalizedChunk = needsBoundaryNewline ? `\n${chunk}` : chunk
const parentForBlock = resolveParentForSubagentBlock(scopedSubagent, scopedParentToolCallId)
appendTextBlock(blocks, normalizedChunk, {
...(scopedSubagent ? { lane: 'subagent' as const } : {}),
...(parentForBlock ? { parentToolCallId: parentForBlock } : {}),
})
runningText += normalizedChunk
lastContentSource = contentSource
@@ -239,11 +263,14 @@ function buildLiveAssistantMessage(params: {
continue
}
const parentForBlock = resolveParentForSubagentBlock(scopedSubagent, scopedParentToolCallId)
if (payload.phase === MothershipStreamV1ToolPhase.result) {
ensureToolBlock({
toolCallId,
toolName: payload.toolName,
calledBy: scopedSubagent,
...(parentForBlock ? { parentToolCallId: parentForBlock } : {}),
state: resolveStreamToolOutcome(payload),
result: {
success: payload.success,
@@ -258,6 +285,7 @@ function buildLiveAssistantMessage(params: {
toolCallId,
toolName: payload.toolName,
calledBy: scopedSubagent,
...(parentForBlock ? { parentToolCallId: parentForBlock } : {}),
displayTitle,
params: isRecord(payload.arguments) ? payload.arguments : undefined,
state: typeof payload.status === 'string' ? payload.status : 'executing',
@@ -270,9 +298,13 @@ function buildLiveAssistantMessage(params: {
}
const spanData = asPayloadRecord(parsed.payload.data)
const parentToolCallId =
scopedParentToolCallId ??
(typeof spanData?.tool_call_id === 'string' ? spanData.tool_call_id : undefined)
const parentToolCallIdFromData =
typeof spanData?.tool_call_id === 'string'
? spanData.tool_call_id
: typeof spanData?.toolCallId === 'string'
? spanData.toolCallId
: undefined
const parentToolCallId = scopedParentToolCallId ?? parentToolCallIdFromData
const name = typeof parsed.payload.agent === 'string' ? parsed.payload.agent : scopedAgentId
if (parsed.payload.event === MothershipStreamV1SpanLifecycleEvent.start && name) {
if (parentToolCallId) {
@@ -285,6 +317,7 @@ function buildLiveAssistantMessage(params: {
kind: MothershipStreamV1SpanPayloadKind.subagent,
lifecycle: MothershipStreamV1SpanLifecycleEvent.start,
content: name,
...(parentToolCallId ? { parentToolCallId } : {}),
})
continue
}
@@ -308,6 +341,7 @@ function buildLiveAssistantMessage(params: {
type: MothershipStreamV1EventType.span,
kind: MothershipStreamV1SpanPayloadKind.subagent,
lifecycle: MothershipStreamV1SpanLifecycleEvent.end,
...(parentToolCallId ? { parentToolCallId } : {}),
})
}
continue
@@ -343,8 +377,10 @@ function buildLiveAssistantMessage(params: {
}
const prefix = runningText.length > 0 && !runningText.endsWith('\n') ? '\n' : ''
const content = `${prefix}${tag}`
const errorParent = resolveParentForSubagentBlock(scopedSubagent, scopedParentToolCallId)
appendTextBlock(blocks, content, {
...(scopedSubagent ? { lane: 'subagent' as const } : {}),
...(errorParent ? { parentToolCallId: errorParent } : {}),
})
runningText += content
continue

View File

@@ -41,6 +41,7 @@ export interface PersistedContentBlock {
toolCall?: PersistedToolCall
timestamp?: number
endedAt?: number
parentToolCallId?: string
}
export interface PersistedFileAttachment {
@@ -101,9 +102,16 @@ export function withBlockTiming<T>(target: T, src: { timestamp?: number; endedAt
return target
}
function withBlockParent<T>(target: T, src: { parentToolCallId?: string }): T {
if (src.parentToolCallId) {
;(target as { parentToolCallId?: string }).parentToolCallId = src.parentToolCallId
}
return target
}
function mapContentBlock(block: ContentBlock): PersistedContentBlock {
const persisted = mapContentBlockBody(block)
return withBlockTiming(persisted, block)
return withBlockParent(withBlockTiming(persisted, block), block)
}
function mapContentBlockBody(block: ContentBlock): PersistedContentBlock {
@@ -265,6 +273,7 @@ interface RawBlock {
status?: string
timestamp?: number
endedAt?: number
parentToolCallId?: string
toolCall?: {
id?: string
name?: string
@@ -321,6 +330,7 @@ function normalizeCanonicalBlock(block: RawBlock): PersistedContentBlock {
if (block.kind) result.kind = block.kind as MothershipStreamV1SpanPayloadKind
if (block.lifecycle) result.lifecycle = block.lifecycle as MothershipStreamV1SpanLifecycleEvent
if (block.status) result.status = block.status as MothershipStreamV1CompletionStatus
if (block.parentToolCallId) result.parentToolCallId = block.parentToolCallId
if (block.toolCall) {
result.toolCall = {
id: block.toolCall.id ?? '',
@@ -438,6 +448,9 @@ function normalizeBlock(block: RawBlock): PersistedContentBlock {
if (typeof block.endedAt === 'number' && result.endedAt === undefined) {
result.endedAt = block.endedAt
}
if (block.parentToolCallId && result.parentToolCallId === undefined) {
result.parentToolCallId = block.parentToolCallId
}
return result
}

View File

@@ -361,28 +361,29 @@ export async function runStreamLoop(
flushSubagentThinkingBlock(context)
flushThinkingBlock(context)
if (spanEvt === MothershipStreamV1SpanLifecycleEvent.start) {
const lastParent = context.subAgentParentStack[context.subAgentParentStack.length - 1]
const lastBlock = context.contentBlocks[context.contentBlocks.length - 1]
if (toolCallId) {
if (lastParent !== toolCallId) {
if (!context.subAgentParentStack.includes(toolCallId)) {
context.subAgentParentStack.push(toolCallId)
}
context.subAgentParentToolCallId = toolCallId
context.subAgentContent[toolCallId] ??= ''
context.subAgentToolCalls[toolCallId] ??= []
}
if (
subagentName &&
!(
lastParent === toolCallId &&
lastBlock?.type === 'subagent' &&
lastBlock.content === subagentName
)
) {
context.contentBlocks.push({
type: 'subagent',
content: subagentName,
timestamp: Date.now(),
if (toolCallId && subagentName) {
const openParents = (context.openSubagentParents ??= new Set<string>())
if (!openParents.has(toolCallId)) {
openParents.add(toolCallId)
context.contentBlocks.push({
type: 'subagent',
content: subagentName,
parentToolCallId: toolCallId,
timestamp: Date.now(),
})
}
} else {
logger.warn('subagent start missing toolCallId or agent name', {
hasToolCallId: Boolean(toolCallId),
hasSubagentName: Boolean(subagentName),
})
}
return
@@ -391,27 +392,33 @@ export async function runStreamLoop(
if (isPendingPause) {
return
}
if (context.subAgentParentStack.length > 0) {
context.subAgentParentStack.pop()
if (toolCallId) {
const idx = context.subAgentParentStack.lastIndexOf(toolCallId)
if (idx >= 0) {
context.subAgentParentStack.splice(idx, 1)
} else {
logger.warn('subagent end without matching start', { toolCallId })
}
} else {
logger.warn('subagent end without matching start')
logger.warn('subagent end missing toolCallId')
}
context.subAgentParentToolCallId =
context.subAgentParentStack.length > 0
? context.subAgentParentStack[context.subAgentParentStack.length - 1]
: undefined
if (subagentName) {
if (toolCallId) {
for (let i = context.contentBlocks.length - 1; i >= 0; i--) {
const b = context.contentBlocks[i]
if (
b.type === 'subagent' &&
b.content === subagentName &&
b.endedAt === undefined
b.endedAt === undefined &&
b.parentToolCallId === toolCallId
) {
b.endedAt = Date.now()
break
}
}
context.openSubagentParents?.delete(toolCallId)
}
return
}

View File

@@ -22,10 +22,17 @@ export function handleTextEvent(scope: ToolScope): StreamHandler {
const parentToolCallId = getScopedParentToolCallId(event, context)
if (!parentToolCallId) return
if (event.payload.channel === MothershipStreamV1TextChannel.thinking) {
if (
context.currentSubagentThinkingBlock &&
context.currentSubagentThinkingBlock.parentToolCallId !== parentToolCallId
) {
flushSubagentThinkingBlock(context)
}
if (!context.currentSubagentThinkingBlock) {
context.currentSubagentThinkingBlock = {
type: 'subagent_thinking',
content: '',
parentToolCallId,
timestamp: Date.now(),
}
}
@@ -40,7 +47,7 @@ export function handleTextEvent(scope: ToolScope): StreamHandler {
}
context.subAgentContent[parentToolCallId] =
(context.subAgentContent[parentToolCallId] || '') + chunk
addContentBlock(context, { type: 'subagent_text', content: chunk })
addContentBlock(context, { type: 'subagent_text', content: chunk, parentToolCallId })
return
}

View File

@@ -340,6 +340,7 @@ function registerSubagentToolCall(
type: 'tool_call',
toolCall,
calledBy: parentToolCall?.name,
parentToolCallId,
})
}
}

View File

@@ -56,6 +56,7 @@ export interface ContentBlock {
calledBy?: string
timestamp: number
endedAt?: number
parentToolCallId?: string
}
export interface StreamingContext {
@@ -86,6 +87,7 @@ export interface StreamingContext {
subAgentParentStack: string[]
subAgentContent: Record<string, string>
subAgentToolCalls: Record<string, ToolCallState[]>
openSubagentParents?: Set<string>
pendingContent: string
streamComplete: boolean
wasAborted: boolean
@@ -136,31 +138,12 @@ export interface OrchestratorOptions {
onComplete?: (result: OrchestratorResult) => void | Promise<void>
onError?: (error: Error) => void | Promise<void>
abortSignal?: AbortSignal
/**
* Invoked when the orchestrator infers that the run was aborted via
* an out-of-band signal (currently: a Redis abort marker observed
* at SSE body close). Callers wire this to fire their local
* `AbortController` so `signal.reason` is set and `recordCancelled`
* classifies as `explicit_stop` rather than `unknown`.
*/
onAbortObserved?: (reason: string) => void
interactive?: boolean
}
export interface OrchestratorResult {
success: boolean
/**
* True iff the non-success outcome was a user-initiated cancel
* (abort signal fired or client disconnected). Lets callers treat
* cancels differently from actual errors — notably, `buildOnComplete`
* must NOT finalize the chat row on cancel, because the browser's
* `/api/copilot/chat/stop` POST owns writing the partial assistant
* content and clearing `conversationId` in one UPDATE. Finalizing
* here would race and clear `conversationId` first, making the stop
* UPDATE match zero rows and the partial content vanish on refetch.
*
* Always false when `success=true`.
*/
cancelled?: boolean
content: string
contentBlocks: ContentBlock[]

View File

@@ -3,10 +3,11 @@ import { credential } from '@sim/db/schema'
import { toError } from '@sim/utils/errors'
import { eq } from 'drizzle-orm'
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types'
import { getCredentialActorContext } from '@/lib/credentials/access'
export function executeManageCredential(
rawParams: Record<string, unknown>,
_context: ExecutionContext
context: ExecutionContext
): Promise<ToolCallResult> {
const params = rawParams as {
operation: string
@@ -17,26 +18,30 @@ export function executeManageCredential(
const { operation, displayName } = params
return (async () => {
try {
if (!context?.userId) {
return { success: false, error: 'Authentication required' }
}
switch (operation) {
case 'rename': {
const credentialId = params.credentialId
if (!credentialId) return { success: false, error: 'credentialId is required for rename' }
if (!displayName) return { success: false, error: 'displayName is required for rename' }
const [row] = await db
.select({
id: credential.id,
type: credential.type,
displayName: credential.displayName,
})
.from(credential)
.where(eq(credential.id, credentialId))
.limit(1)
if (!row) return { success: false, error: 'Credential not found' }
if (row.type !== 'oauth')
const actor = await getCredentialActorContext(credentialId, context.userId)
if (!actor.credential || !actor.hasWorkspaceAccess) {
return { success: false, error: 'Credential not found' }
}
if (actor.credential.type !== 'oauth') {
return {
success: false,
error: 'Only OAuth credentials can be managed with this tool.',
}
}
if (!actor.canWriteWorkspace && !actor.isAdmin) {
return { success: false, error: 'Write access required to rename this credential' }
}
await db
.update(credential)
.set({ displayName, updatedAt: new Date() })
@@ -53,12 +58,16 @@ export function executeManageCredential(
const failed: string[] = []
for (const id of ids) {
const [row] = await db
.select({ id: credential.id, type: credential.type })
.from(credential)
.where(eq(credential.id, id))
.limit(1)
if (!row || row.type !== 'oauth') {
const actor = await getCredentialActorContext(id, context.userId)
if (
!actor.credential ||
!actor.hasWorkspaceAccess ||
actor.credential.type !== 'oauth'
) {
failed.push(id)
continue
}
if (!actor.canWriteWorkspace && !actor.isAdmin) {
failed.push(id)
continue
}

View File

@@ -1,6 +1,9 @@
import { db } from '@sim/db'
import { knowledgeBase } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { generateId } from '@sim/utils/id'
import { eq } from 'drizzle-orm'
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types'
import { restoreKnowledgeBase } from '@/lib/knowledge/service'
import { getTableById, restoreTable } from '@/lib/table/service'
@@ -10,6 +13,8 @@ import {
} from '@/lib/uploads/contexts/workspace/workspace-file-manager'
import { restoreWorkflow } from '@/lib/workflows/lifecycle'
import { performRestoreFolder } from '@/lib/workflows/orchestration/folder-lifecycle'
import { getWorkflowById } from '@/lib/workflows/utils'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
const logger = createLogger('RestoreResource')
@@ -33,10 +38,25 @@ export async function executeRestoreResource(
}
const requestId = generateId().slice(0, 8)
const callerWorkspaceId = context.workspaceId
const hasWriteAccess = async (resourceWorkspaceId: string | null | undefined) => {
if (!resourceWorkspaceId || resourceWorkspaceId !== callerWorkspaceId) return false
const permission = await getUserEntityPermissions(
context.userId,
'workspace',
resourceWorkspaceId
)
return permission === 'write' || permission === 'admin'
}
try {
switch (type) {
case 'workflow': {
const existing = await getWorkflowById(id, { includeArchived: true })
if (!existing || !(await hasWriteAccess(existing.workspaceId))) {
return { success: false, error: 'Workflow not found' }
}
const result = await restoreWorkflow(id, { requestId })
if (!result.restored) {
return { success: false, error: 'Workflow not found or not archived' }
@@ -50,9 +70,13 @@ export async function executeRestoreResource(
}
case 'table': {
const existing = await getTableById(id, { includeArchived: true })
if (!existing || !(await hasWriteAccess(existing.workspaceId))) {
return { success: false, error: 'Table not found' }
}
await restoreTable(id, requestId)
const table = await getTableById(id)
const tableName = table?.name || id
const tableName = table?.name || existing.name
logger.info('Table restored via copilot', { tableId: id, name: tableName })
return {
success: true,
@@ -62,6 +86,9 @@ export async function executeRestoreResource(
}
case 'file': {
if (!(await hasWriteAccess(context.workspaceId))) {
return { success: false, error: 'File not found' }
}
await restoreWorkspaceFile(context.workspaceId, id)
const fileRecord = await getWorkspaceFile(context.workspaceId, id)
const fileName = fileRecord?.name || id
@@ -74,6 +101,14 @@ export async function executeRestoreResource(
}
case 'knowledgebase': {
const [existing] = await db
.select({ workspaceId: knowledgeBase.workspaceId })
.from(knowledgeBase)
.where(eq(knowledgeBase.id, id))
.limit(1)
if (!existing || !(await hasWriteAccess(existing.workspaceId))) {
return { success: false, error: 'Knowledge base not found' }
}
await restoreKnowledgeBase(id, requestId)
logger.info('Knowledge base restored via copilot', { knowledgeBaseId: id })
return {
@@ -83,6 +118,9 @@ export async function executeRestoreResource(
}
case 'folder': {
if (!(await hasWriteAccess(context.workspaceId))) {
return { success: false, error: 'Folder not found' }
}
const result = await performRestoreFolder({
folderId: id,
workspaceId: context.workspaceId,

View File

@@ -28,6 +28,7 @@ import {
setWorkflowVariables,
updateFolderRecord,
updateWorkflowRecord,
verifyFolderWorkspace,
} from '@/lib/workflows/utils'
import { hasExecutionResult } from '@/executor/utils/errors'
import type { BlockState, WorkflowState } from '@/stores/workflows/workflow/types'
@@ -522,7 +523,13 @@ export async function executeMoveWorkflow(
for (const workflowId of workflowIds) {
try {
await ensureWorkflowAccess(workflowId, context.userId, 'write')
const { workspaceId } = await ensureWorkflowAccess(workflowId, context.userId, 'write')
if (folderId) {
if (!workspaceId || !(await verifyFolderWorkspace(folderId, workspaceId))) {
failed.push(workflowId)
continue
}
}
assertWorkflowMutationNotAborted(context)
await updateWorkflowRecord(workflowId, { folderId })
moved.push(workflowId)
@@ -562,6 +569,14 @@ export async function executeMoveFolder(
const workspaceId = context.workspaceId || (await getDefaultWorkspaceId(context.userId))
await ensureWorkspaceAccess(workspaceId, context.userId, 'write')
if (!(await verifyFolderWorkspace(folderId, workspaceId))) {
return { success: false, error: 'Folder not found' }
}
if (parentId && !(await verifyFolderWorkspace(parentId, workspaceId))) {
return { success: false, error: 'Parent folder not found' }
}
assertWorkflowMutationNotAborted(context)
await updateFolderRecord(folderId, { parentId })
@@ -1007,6 +1022,11 @@ export async function executeRenameFolder(
const workspaceId = context.workspaceId || (await getDefaultWorkspaceId(context.userId))
await ensureWorkspaceAccess(workspaceId, context.userId, 'write')
if (!(await verifyFolderWorkspace(folderId, workspaceId))) {
return { success: false, error: 'Folder not found' }
}
assertWorkflowMutationNotAborted(context)
await updateFolderRecord(folderId, { name })

View File

@@ -105,11 +105,12 @@ export const getJobLogsServerTool: BaseServerTool<GetJobLogsArgs, JobLogEntry[]>
}
const wsId = workspaceId || context.workspaceId
if (wsId) {
const access = await checkWorkspaceAccess(wsId, context.userId)
if (!access.hasAccess) {
throw new Error('Unauthorized workspace access')
}
if (!wsId) {
throw new Error('Workspace context required')
}
const access = await checkWorkspaceAccess(wsId, context.userId)
if (!access.hasAccess) {
throw new Error('Unauthorized workspace access')
}
const clampedLimit = Math.min(Math.max(1, limit), 5)
@@ -121,7 +122,10 @@ export const getJobLogsServerTool: BaseServerTool<GetJobLogsArgs, JobLogEntry[]>
includeDetails,
})
const conditions = [eq(jobExecutionLogs.scheduleId, jobId)]
const conditions = [
eq(jobExecutionLogs.scheduleId, jobId),
eq(jobExecutionLogs.workspaceId, wsId),
]
if (executionId) {
conditions.push(eq(jobExecutionLogs.executionId, executionId))
}

View File

@@ -37,6 +37,11 @@ import {
import { StorageService } from '@/lib/uploads'
import { resolveWorkspaceFileReference } from '@/lib/uploads/contexts/workspace/workspace-file-manager'
import { getQueryStrategy, handleVectorOnlySearch } from '@/app/api/knowledge/search/utils'
import {
checkDocumentWriteAccess,
checkKnowledgeBaseAccess,
checkKnowledgeBaseWriteAccess,
} from '@/app/api/knowledge/utils'
const logger = createLogger('KnowledgeBaseServerTool')
@@ -141,6 +146,14 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
const access = await checkKnowledgeBaseAccess(args.knowledgeBaseId, context.userId)
if (!access.hasAccess) {
return {
success: false,
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
}
}
const knowledgeBase = await getKnowledgeBaseById(args.knowledgeBaseId)
if (!knowledgeBase) {
return {
@@ -187,6 +200,14 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
const access = await checkKnowledgeBaseAccess(args.knowledgeBaseId, context.userId)
if (!access.hasAccess) {
return {
success: false,
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
}
}
const kb = await getKnowledgeBaseById(args.knowledgeBaseId)
if (!kb) {
return {
@@ -257,6 +278,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
const writeAccess = await checkKnowledgeBaseWriteAccess(
args.knowledgeBaseId,
context.userId
)
if (!writeAccess.hasAccess) {
return {
success: false,
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
}
}
const targetKb = await getKnowledgeBaseById(args.knowledgeBaseId)
if (!targetKb || !targetKb.workspaceId) {
return {
@@ -363,6 +395,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
const writeAccess = await checkKnowledgeBaseWriteAccess(
args.knowledgeBaseId,
context.userId
)
if (!writeAccess.hasAccess) {
return {
success: false,
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
}
}
const requestId = generateId().slice(0, 8)
assertNotAborted()
const updatedKb = await updateKnowledgeBase(args.knowledgeBaseId, updates, requestId)
@@ -400,6 +443,12 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
const notFound: string[] = []
for (const kbId of kbIds) {
const writeAccess = await checkKnowledgeBaseWriteAccess(kbId, context.userId)
if (!writeAccess.hasAccess) {
notFound.push(kbId)
continue
}
const kbToDelete = await getKnowledgeBaseById(kbId)
if (!kbToDelete) {
notFound.push(kbId)
@@ -444,8 +493,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
const failed: string[] = []
for (const docId of docIds) {
const requestId = generateId().slice(0, 8)
assertNotAborted()
const docAccess = await checkDocumentWriteAccess(
args.knowledgeBaseId,
docId,
context.userId
)
if (!docAccess.hasAccess) {
failed.push(docId)
continue
}
const requestId = generateId().slice(0, 8)
const result = await deleteDocument(docId, requestId)
if (result.success) {
deleted.push(docId)
@@ -481,6 +539,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
message: 'At least one of filename or enabled is required for update_document',
}
}
const docAccess = await checkDocumentWriteAccess(
args.knowledgeBaseId,
args.documentId,
context.userId
)
if (!docAccess.hasAccess) {
return {
success: false,
message: `Document with ID "${args.documentId}" not found`,
}
}
const requestId = generateId().slice(0, 8)
assertNotAborted()
await updateDocument(args.documentId, updateData, requestId)
@@ -503,6 +572,14 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
const access = await checkKnowledgeBaseAccess(args.knowledgeBaseId, context.userId)
if (!access.hasAccess) {
return {
success: false,
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
}
}
const tagDefinitions = await getDocumentTagDefinitions(args.knowledgeBaseId)
logger.info('Tag definitions listed via copilot', {
@@ -537,6 +614,18 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
message: 'tagDisplayName is required for create_tag operation',
}
}
const writeAccess = await checkKnowledgeBaseWriteAccess(
args.knowledgeBaseId,
context.userId
)
if (!writeAccess.hasAccess) {
return {
success: false,
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
}
}
const fieldType = args.tagFieldType || 'text'
const tagSlot = await getNextAvailableSlot(args.knowledgeBaseId, fieldType)
@@ -606,6 +695,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
const writeAccess = await checkKnowledgeBaseWriteAccess(
existingTag.knowledgeBaseId,
context.userId
)
if (!writeAccess.hasAccess) {
return {
success: false,
message: `Tag definition with ID "${args.tagDefinitionId}" not found`,
}
}
const requestId = generateId().slice(0, 8)
assertNotAborted()
const updatedTag = await updateTagDefinition(args.tagDefinitionId, updateData, requestId)
@@ -643,6 +743,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
const writeAccess = await checkKnowledgeBaseWriteAccess(
args.knowledgeBaseId,
context.userId
)
if (!writeAccess.hasAccess) {
return {
success: false,
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
}
}
const requestId = generateId().slice(0, 8)
assertNotAborted()
const deleted = await deleteTagDefinition(
@@ -677,6 +788,14 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
const access = await checkKnowledgeBaseAccess(args.knowledgeBaseId, context.userId)
if (!access.hasAccess) {
return {
success: false,
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
}
}
const requestId = generateId().slice(0, 8)
const stats = await getTagUsageStats(args.knowledgeBaseId, requestId)
@@ -702,6 +821,17 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
const writeAccess = await checkKnowledgeBaseWriteAccess(
args.knowledgeBaseId,
context.userId
)
if (!writeAccess.hasAccess) {
return {
success: false,
message: `Knowledge base with ID "${args.knowledgeBaseId}" not found`,
}
}
const createBody: Record<string, unknown> = {
connectorType: args.connectorType,
sourceConfig: args.sourceConfig ?? {},
@@ -762,6 +892,11 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
return { success: false, message: `Connector "${args.connectorId}" not found` }
}
const writeAccess = await checkKnowledgeBaseWriteAccess(kbId, context.userId)
if (!writeAccess.hasAccess) {
return { success: false, message: `Connector "${args.connectorId}" not found` }
}
const updateBody: Record<string, unknown> = {}
if (args.sourceConfig !== undefined) updateBody.sourceConfig = args.sourceConfig
if (args.syncIntervalMinutes !== undefined)
@@ -810,6 +945,11 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
return { success: false, message: `Connector "${args.connectorId}" not found` }
}
const writeAccess = await checkKnowledgeBaseWriteAccess(deleteKbId, context.userId)
if (!writeAccess.hasAccess) {
return { success: false, message: `Connector "${args.connectorId}" not found` }
}
assertNotAborted()
const deleteRes = await connectorApiCall(
context.userId,
@@ -843,6 +983,11 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
return { success: false, message: `Connector "${args.connectorId}" not found` }
}
const writeAccess = await checkKnowledgeBaseWriteAccess(syncKbId, context.userId)
if (!writeAccess.hasAccess) {
return { success: false, message: `Connector "${args.connectorId}" not found` }
}
assertNotAborted()
const syncRes = await connectorApiCall(
context.userId,

View File

@@ -223,9 +223,12 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
if (!args.tableId) {
return { success: false, message: 'Table ID is required' }
}
if (!workspaceId) {
return { success: false, message: 'Workspace ID is required' }
}
const table = await getTableById(args.tableId)
if (!table) {
if (!table || table.workspaceId !== workspaceId) {
return { success: false, message: `Table not found: ${args.tableId}` }
}
@@ -240,9 +243,12 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
if (!args.tableId) {
return { success: false, message: 'Table ID is required' }
}
if (!workspaceId) {
return { success: false, message: 'Workspace ID is required' }
}
const table = await getTableById(args.tableId)
if (!table) {
if (!table || table.workspaceId !== workspaceId) {
return { success: false, message: `Table not found: ${args.tableId}` }
}
@@ -816,6 +822,9 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
if (!args.tableId) {
return { success: false, message: 'Table ID is required' }
}
if (!workspaceId) {
return { success: false, message: 'Workspace ID is required' }
}
const col = (args as Record<string, unknown>).column as
| {
name: string
@@ -830,6 +839,10 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
message: 'column with name and type is required for add_column',
}
}
const tableForAdd = await getTableById(args.tableId)
if (!tableForAdd || tableForAdd.workspaceId !== workspaceId) {
return { success: false, message: `Table not found: ${args.tableId}` }
}
const requestId = generateId().slice(0, 8)
assertNotAborted()
const updated = await addTableColumn(args.tableId, col, requestId)
@@ -844,11 +857,18 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
if (!args.tableId) {
return { success: false, message: 'Table ID is required' }
}
if (!workspaceId) {
return { success: false, message: 'Workspace ID is required' }
}
const colName = (args as Record<string, unknown>).columnName as string | undefined
const newColName = (args as Record<string, unknown>).newName as string | undefined
if (!colName || !newColName) {
return { success: false, message: 'columnName and newName are required' }
}
const tableForRename = await getTableById(args.tableId)
if (!tableForRename || tableForRename.workspaceId !== workspaceId) {
return { success: false, message: `Table not found: ${args.tableId}` }
}
const requestId = generateId().slice(0, 8)
assertNotAborted()
const updated = await renameColumn(
@@ -866,12 +886,19 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
if (!args.tableId) {
return { success: false, message: 'Table ID is required' }
}
if (!workspaceId) {
return { success: false, message: 'Workspace ID is required' }
}
const colName = (args as Record<string, unknown>).columnName as string | undefined
const colNames = (args as Record<string, unknown>).columnNames as string[] | undefined
const names = colNames ?? (colName ? [colName] : null)
if (!names || names.length === 0) {
return { success: false, message: 'columnName or columnNames is required' }
}
const tableForDelete = await getTableById(args.tableId)
if (!tableForDelete || tableForDelete.workspaceId !== workspaceId) {
return { success: false, message: `Table not found: ${args.tableId}` }
}
const requestId = generateId().slice(0, 8)
if (names.length === 1) {
assertNotAborted()
@@ -901,6 +928,9 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
if (!args.tableId) {
return { success: false, message: 'Table ID is required' }
}
if (!workspaceId) {
return { success: false, message: 'Workspace ID is required' }
}
const colName = (args as Record<string, unknown>).columnName as string | undefined
if (!colName) {
return { success: false, message: 'columnName is required' }
@@ -913,6 +943,10 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
message: 'At least one of newType or unique must be provided',
}
}
const tableForUpdate = await getTableById(args.tableId)
if (!tableForUpdate || tableForUpdate.workspaceId !== workspaceId) {
return { success: false, message: `Table not found: ${args.tableId}` }
}
const requestId = generateId().slice(0, 8)
let result: TableDefinition | undefined
if (newType !== undefined) {

View File

@@ -704,7 +704,7 @@ export const OAUTH_PROVIDERS: Record<string, OAuthProviderConfig> = {
services: {
slack: {
name: 'Slack',
description: 'Send messages using a bot for Slack.',
description: 'Use Slack messaging, files, reactions, views, and canvases.',
providerId: 'slack',
icon: SlackIcon,
baseProviderIcon: SlackIcon,
@@ -722,6 +722,7 @@ export const OAUTH_PROVIDERS: Record<string, OAuthProviderConfig> = {
// TODO: Add 'users:read.email' once Slack app review is approved
'files:write',
'files:read',
'canvases:read',
'canvases:write',
'reactions:write',
],

View File

@@ -278,7 +278,8 @@ export const SCOPE_DESCRIPTIONS: Record<string, string> = {
'users:read.email': 'View user email addresses',
'files:write': 'Upload files',
'files:read': 'Download and read files',
'canvases:write': 'Create canvas documents',
'canvases:read': 'Read canvas sections',
'canvases:write': 'Create, edit, and delete canvas documents',
'reactions:write': 'Add emoji reactions to messages',
// Webflow scopes

View File

@@ -0,0 +1,87 @@
import { safeCompare } from '@sim/security/compare'
import { hmacSha256Base64 } from '@sim/security/hmac'
import { env } from '@/lib/core/config/env'
import type { StorageContext } from '@/lib/uploads/shared/types'
export interface UploadTokenPayload {
uploadId: string
key: string
userId: string
workspaceId: string
context: StorageContext
}
interface SignedPayload extends UploadTokenPayload {
exp: number
v: 1
}
const toBase64Url = (input: string): string => Buffer.from(input, 'utf8').toString('base64url')
const fromBase64Url = (input: string): string => Buffer.from(input, 'base64url').toString('utf8')
const sign = (payload: string): string => hmacSha256Base64(payload, env.INTERNAL_API_SECRET)
/**
* Sign an upload session token binding (uploadId, key, userId, workspaceId, context).
* Used to prevent IDOR on multipart upload follow-up calls (get-part-urls, complete, abort).
*/
export function signUploadToken(payload: UploadTokenPayload, expiresInSeconds = 60 * 60): string {
const signed: SignedPayload = {
...payload,
exp: Math.floor(Date.now() / 1000) + expiresInSeconds,
v: 1,
}
const encoded = toBase64Url(JSON.stringify(signed))
return `${encoded}.${sign(encoded)}`
}
export type UploadTokenVerification =
| { valid: true; payload: UploadTokenPayload }
| { valid: false }
export function verifyUploadToken(token: string): UploadTokenVerification {
if (typeof token !== 'string') {
return { valid: false }
}
const parts = token.split('.')
if (parts.length !== 2) return { valid: false }
const [encoded, signature] = parts
if (!encoded || !signature) return { valid: false }
const expected = sign(encoded)
if (!safeCompare(signature, expected)) {
return { valid: false }
}
let parsed: SignedPayload
try {
parsed = JSON.parse(fromBase64Url(encoded)) as SignedPayload
} catch {
return { valid: false }
}
if (
parsed.v !== 1 ||
typeof parsed.exp !== 'number' ||
parsed.exp < Math.floor(Date.now() / 1000) ||
typeof parsed.uploadId !== 'string' ||
typeof parsed.key !== 'string' ||
typeof parsed.userId !== 'string' ||
typeof parsed.workspaceId !== 'string' ||
typeof parsed.context !== 'string'
) {
return { valid: false }
}
return {
valid: true,
payload: {
uploadId: parsed.uploadId,
key: parsed.key,
userId: parsed.userId,
workspaceId: parsed.workspaceId,
context: parsed.context as StorageContext,
},
}
}

View File

@@ -564,6 +564,18 @@ export async function updateFolderRecord(
await db.update(workflowFolder).set(setData).where(eq(workflowFolder.id, folderId))
}
export async function verifyFolderWorkspace(
folderId: string,
workspaceId: string
): Promise<boolean> {
const [row] = await db
.select({ id: workflowFolder.id })
.from(workflowFolder)
.where(and(eq(workflowFolder.id, folderId), eq(workflowFolder.workspaceId, workspaceId)))
.limit(1)
return Boolean(row)
}
export async function deleteFolderRecord(folderId: string): Promise<boolean> {
const [folder] = await db
.select({ parentId: workflowFolder.parentId })

View File

@@ -183,6 +183,49 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
contextWindow: 1047576,
releaseDate: '2025-04-14',
},
// GPT-5.5 family
{
id: 'gpt-5.5-pro',
pricing: {
input: 30.0,
output: 180.0,
updatedAt: '2026-04-23',
},
capabilities: {
nativeStructuredOutputs: true,
reasoningEffort: {
values: ['none', 'low', 'medium', 'high', 'xhigh'],
},
verbosity: {
values: ['low', 'medium', 'high'],
},
maxOutputTokens: 128000,
},
contextWindow: 1050000,
releaseDate: '2026-04-23',
},
{
id: 'gpt-5.5',
pricing: {
input: 5.0,
cachedInput: 0.5,
output: 30.0,
updatedAt: '2026-04-23',
},
capabilities: {
nativeStructuredOutputs: true,
reasoningEffort: {
values: ['none', 'low', 'medium', 'high', 'xhigh'],
},
verbosity: {
values: ['low', 'medium', 'high'],
},
maxOutputTokens: 128000,
},
contextWindow: 1050000,
releaseDate: '2026-04-23',
recommended: true,
},
// GPT-5.4 family
{
id: 'gpt-5.4-pro',
@@ -219,7 +262,6 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
},
contextWindow: 1050000,
releaseDate: '2026-03-05',
recommended: true,
},
{
id: 'gpt-5.4-mini',

View File

@@ -2364,19 +2364,23 @@ import {
slackCanvasTool,
slackCreateChannelCanvasTool,
slackCreateConversationTool,
slackDeleteCanvasTool,
slackDeleteMessageTool,
slackDownloadTool,
slackEditCanvasTool,
slackEphemeralMessageTool,
slackGetCanvasTool,
slackGetChannelInfoTool,
slackGetMessageTool,
slackGetThreadTool,
slackGetUserPresenceTool,
slackGetUserTool,
slackInviteToConversationTool,
slackListCanvasesTool,
slackListChannelsTool,
slackListMembersTool,
slackListUsersTool,
slackLookupCanvasSectionsTool,
slackMessageReaderTool,
slackMessageTool,
slackOpenViewTool,
@@ -3360,6 +3364,10 @@ export const tools: Record<string, ToolConfig> = {
slack_publish_view: slackPublishViewTool,
slack_edit_canvas: slackEditCanvasTool,
slack_create_channel_canvas: slackCreateChannelCanvasTool,
slack_get_canvas: slackGetCanvasTool,
slack_list_canvases: slackListCanvasesTool,
slack_lookup_canvas_sections: slackLookupCanvasSectionsTool,
slack_delete_canvas: slackDeleteCanvasTool,
slack_create_conversation: slackCreateConversationTool,
slack_invite_to_conversation: slackInviteToConversationTool,
github_repo_info: githubRepoInfoTool,

View File

@@ -0,0 +1,79 @@
import type { SlackDeleteCanvasParams, SlackDeleteCanvasResponse } from '@/tools/slack/types'
import type { ToolConfig } from '@/tools/types'
export const slackDeleteCanvasTool: ToolConfig<SlackDeleteCanvasParams, SlackDeleteCanvasResponse> =
{
id: 'slack_delete_canvas',
name: 'Slack Delete Canvas',
description: 'Delete a Slack canvas by its canvas ID',
version: '1.0.0',
oauth: {
required: true,
provider: 'slack',
},
params: {
authMethod: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Authentication method: oauth or bot_token',
},
botToken: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Bot token for Custom Bot',
},
accessToken: {
type: 'string',
required: false,
visibility: 'hidden',
description: 'OAuth access token or bot token for Slack API',
},
canvasId: {
type: 'string',
required: true,
visibility: 'user-or-llm',
description: 'Canvas ID to delete (e.g., F1234ABCD)',
},
},
request: {
url: 'https://slack.com/api/canvases.delete',
method: 'POST',
headers: (params: SlackDeleteCanvasParams) => ({
'Content-Type': 'application/json',
Authorization: `Bearer ${params.accessToken || params.botToken}`,
}),
body: (params: SlackDeleteCanvasParams) => ({
canvas_id: params.canvasId.trim(),
}),
},
transformResponse: async (response: Response) => {
const data = await response.json()
if (!data.ok) {
if (data.error === 'canvas_not_found') {
throw new Error('Canvas not found or not visible to the authenticated Slack user or bot.')
}
if (data.error === 'canvas_deleting_disabled') {
throw new Error('Canvas deletion is disabled for this workspace.')
}
throw new Error(data.error || 'Failed to delete canvas')
}
return {
success: true,
output: {
ok: data.ok,
},
}
},
outputs: {
ok: { type: 'boolean', description: 'Whether Slack deleted the canvas successfully' },
},
}

View File

@@ -0,0 +1,85 @@
import type { SlackGetCanvasParams, SlackGetCanvasResponse } from '@/tools/slack/types'
import { CANVAS_FILE_OUTPUT_PROPERTIES } from '@/tools/slack/types'
import { mapCanvasFile } from '@/tools/slack/utils'
import type { ToolConfig } from '@/tools/types'
export const slackGetCanvasTool: ToolConfig<SlackGetCanvasParams, SlackGetCanvasResponse> = {
id: 'slack_get_canvas',
name: 'Slack Get Canvas Info',
description: 'Get Slack canvas file metadata by canvas ID',
version: '1.0.0',
oauth: {
required: true,
provider: 'slack',
},
params: {
authMethod: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Authentication method: oauth or bot_token',
},
botToken: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Bot token for Custom Bot',
},
accessToken: {
type: 'string',
required: false,
visibility: 'hidden',
description: 'OAuth access token or bot token for Slack API',
},
canvasId: {
type: 'string',
required: true,
visibility: 'user-or-llm',
description: 'Canvas file ID to retrieve (e.g., F1234ABCD)',
},
},
request: {
url: (params: SlackGetCanvasParams) => {
const url = new URL('https://slack.com/api/files.info')
url.searchParams.append('file', params.canvasId.trim())
return url.toString()
},
method: 'GET',
headers: (params: SlackGetCanvasParams) => ({
'Content-Type': 'application/json',
Authorization: `Bearer ${params.accessToken || params.botToken}`,
}),
},
transformResponse: async (response: Response) => {
const data = await response.json()
if (!data.ok) {
if (data.error === 'file_not_found') {
throw new Error('Canvas not found. Please check the canvas ID and try again.')
}
if (data.error === 'not_visible') {
throw new Error('Canvas is not visible to the authenticated Slack user or bot.')
}
throw new Error(data.error || 'Failed to get canvas from Slack')
}
return {
success: true,
output: {
canvas: mapCanvasFile(data.file),
},
}
},
outputs: {
canvas: {
type: 'object',
description: 'Canvas file information returned by Slack',
properties: CANVAS_FILE_OUTPUT_PROPERTIES,
},
},
}

View File

@@ -2,19 +2,23 @@ import { slackAddReactionTool } from '@/tools/slack/add_reaction'
import { slackCanvasTool } from '@/tools/slack/canvas'
import { slackCreateChannelCanvasTool } from '@/tools/slack/create_channel_canvas'
import { slackCreateConversationTool } from '@/tools/slack/create_conversation'
import { slackDeleteCanvasTool } from '@/tools/slack/delete_canvas'
import { slackDeleteMessageTool } from '@/tools/slack/delete_message'
import { slackDownloadTool } from '@/tools/slack/download'
import { slackEditCanvasTool } from '@/tools/slack/edit_canvas'
import { slackEphemeralMessageTool } from '@/tools/slack/ephemeral_message'
import { slackGetCanvasTool } from '@/tools/slack/get_canvas'
import { slackGetChannelInfoTool } from '@/tools/slack/get_channel_info'
import { slackGetMessageTool } from '@/tools/slack/get_message'
import { slackGetThreadTool } from '@/tools/slack/get_thread'
import { slackGetUserTool } from '@/tools/slack/get_user'
import { slackGetUserPresenceTool } from '@/tools/slack/get_user_presence'
import { slackInviteToConversationTool } from '@/tools/slack/invite_to_conversation'
import { slackListCanvasesTool } from '@/tools/slack/list_canvases'
import { slackListChannelsTool } from '@/tools/slack/list_channels'
import { slackListMembersTool } from '@/tools/slack/list_members'
import { slackListUsersTool } from '@/tools/slack/list_users'
import { slackLookupCanvasSectionsTool } from '@/tools/slack/lookup_canvas_sections'
import { slackMessageTool } from '@/tools/slack/message'
import { slackMessageReaderTool } from '@/tools/slack/message_reader'
import { slackOpenViewTool } from '@/tools/slack/open_view'
@@ -29,6 +33,10 @@ export {
slackCanvasTool,
slackCreateConversationTool,
slackCreateChannelCanvasTool,
slackGetCanvasTool,
slackListCanvasesTool,
slackLookupCanvasSectionsTool,
slackDeleteCanvasTool,
slackMessageReaderTool,
slackDownloadTool,
slackEditCanvasTool,
@@ -51,3 +59,5 @@ export {
slackGetThreadTool,
slackInviteToConversationTool,
}
export * from './types'

View File

@@ -0,0 +1,142 @@
import type { SlackListCanvasesParams, SlackListCanvasesResponse } from '@/tools/slack/types'
import { CANVAS_FILE_OUTPUT_PROPERTIES, CANVAS_PAGING_OUTPUT_PROPERTIES } from '@/tools/slack/types'
import { mapCanvasFile } from '@/tools/slack/utils'
import type { ToolConfig } from '@/tools/types'
export const slackListCanvasesTool: ToolConfig<SlackListCanvasesParams, SlackListCanvasesResponse> =
{
id: 'slack_list_canvases',
name: 'Slack List Canvases',
description: 'List Slack canvases available to the authenticated user or bot',
version: '1.0.0',
oauth: {
required: true,
provider: 'slack',
},
params: {
authMethod: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Authentication method: oauth or bot_token',
},
botToken: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Bot token for Custom Bot',
},
accessToken: {
type: 'string',
required: false,
visibility: 'hidden',
description: 'OAuth access token or bot token for Slack API',
},
channel: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description: 'Filter canvases appearing in a specific channel ID',
},
count: {
type: 'number',
required: false,
visibility: 'user-or-llm',
description: 'Number of canvases to return per page',
},
page: {
type: 'number',
required: false,
visibility: 'user-or-llm',
description: 'Page number to return',
},
user: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description: 'Filter canvases created by a single user ID',
},
tsFrom: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description: 'Filter canvases created after this Unix timestamp',
},
tsTo: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description: 'Filter canvases created before this Unix timestamp',
},
teamId: {
type: 'string',
required: false,
visibility: 'user-or-llm',
description: 'Encoded team ID, required when using an org-level token',
},
},
request: {
url: (params: SlackListCanvasesParams) => {
const url = new URL('https://slack.com/api/files.list')
url.searchParams.append('types', 'canvas')
if (params.channel) url.searchParams.append('channel', params.channel.trim())
if (params.count) url.searchParams.append('count', String(params.count))
if (params.page) url.searchParams.append('page', String(params.page))
if (params.user) url.searchParams.append('user', params.user.trim())
if (params.tsFrom) url.searchParams.append('ts_from', params.tsFrom.trim())
if (params.tsTo) url.searchParams.append('ts_to', params.tsTo.trim())
if (params.teamId) url.searchParams.append('team_id', params.teamId.trim())
return url.toString()
},
method: 'GET',
headers: (params: SlackListCanvasesParams) => ({
'Content-Type': 'application/json',
Authorization: `Bearer ${params.accessToken || params.botToken}`,
}),
},
transformResponse: async (response: Response) => {
const data = await response.json()
if (!data.ok) {
if (data.error === 'unknown_type') {
throw new Error('Slack did not recognize the canvas file type filter.')
}
throw new Error(data.error || 'Failed to list canvases from Slack')
}
return {
success: true,
output: {
canvases: (data.files ?? []).map(mapCanvasFile),
paging: {
count: data.paging?.count ?? 0,
total: data.paging?.total ?? 0,
page: data.paging?.page ?? 0,
pages: data.paging?.pages ?? 0,
},
},
}
},
outputs: {
canvases: {
type: 'array',
description: 'Canvas file objects returned by Slack',
items: {
type: 'object',
properties: CANVAS_FILE_OUTPUT_PROPERTIES,
},
},
paging: {
type: 'object',
description: 'Pagination information from Slack',
properties: CANVAS_PAGING_OUTPUT_PROPERTIES,
},
},
}

View File

@@ -0,0 +1,114 @@
import type {
SlackLookupCanvasSectionsParams,
SlackLookupCanvasSectionsResponse,
} from '@/tools/slack/types'
import { CANVAS_SECTION_OUTPUT_PROPERTIES } from '@/tools/slack/types'
import type { ToolConfig } from '@/tools/types'
const parseCriteria = (criteria: SlackLookupCanvasSectionsParams['criteria']) => {
if (typeof criteria !== 'string') {
return criteria
}
try {
return JSON.parse(criteria)
} catch {
throw new Error('Canvas section criteria must be a valid JSON object')
}
}
export const slackLookupCanvasSectionsTool: ToolConfig<
SlackLookupCanvasSectionsParams,
SlackLookupCanvasSectionsResponse
> = {
id: 'slack_lookup_canvas_sections',
name: 'Slack Lookup Canvas Sections',
description: 'Find Slack canvas section IDs matching criteria for later edits',
version: '1.0.0',
oauth: {
required: true,
provider: 'slack',
},
params: {
authMethod: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Authentication method: oauth or bot_token',
},
botToken: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Bot token for Custom Bot',
},
accessToken: {
type: 'string',
required: false,
visibility: 'hidden',
description: 'OAuth access token or bot token for Slack API',
},
canvasId: {
type: 'string',
required: true,
visibility: 'user-or-llm',
description: 'Canvas ID to search (e.g., F1234ABCD)',
},
criteria: {
type: 'json',
required: true,
visibility: 'user-or-llm',
description:
'Section lookup criteria, such as {"section_types":["h1"],"contains_text":"Roadmap"}',
},
},
request: {
url: 'https://slack.com/api/canvases.sections.lookup',
method: 'POST',
headers: (params: SlackLookupCanvasSectionsParams) => ({
'Content-Type': 'application/json',
Authorization: `Bearer ${params.accessToken || params.botToken}`,
}),
body: (params: SlackLookupCanvasSectionsParams) => ({
canvas_id: params.canvasId.trim(),
criteria: parseCriteria(params.criteria),
}),
},
transformResponse: async (response: Response) => {
const data = await response.json()
if (!data.ok) {
if (data.error === 'canvas_not_found') {
throw new Error('Canvas not found or not visible to the authenticated Slack user or bot.')
}
if (data.error === 'missing_scope') {
throw new Error(
'Missing required permissions. Please reconnect your Slack account with the canvases:read scope.'
)
}
throw new Error(data.error || 'Failed to look up canvas sections')
}
return {
success: true,
output: {
sections: data.sections ?? [],
},
}
},
outputs: {
sections: {
type: 'array',
description: 'Canvas sections matching the lookup criteria',
items: {
type: 'object',
properties: CANVAS_SECTION_OUTPUT_PROPERTIES,
},
},
},
}

View File

@@ -488,6 +488,91 @@ export const CANVAS_OUTPUT_PROPERTIES = {
title: { type: 'string', description: 'Canvas title' },
} as const satisfies Record<string, OutputProperty>
/**
* Canvas file object output properties.
* Based on Slack file objects returned by files.info and files.list for canvases.
*/
export const CANVAS_FILE_OUTPUT_PROPERTIES = {
id: { type: 'string', description: 'Unique canvas file identifier' },
created: { type: 'number', description: 'Unix timestamp when the canvas was created' },
timestamp: { type: 'number', description: 'Unix timestamp associated with the canvas' },
name: { type: 'string', description: 'Canvas file name', optional: true },
title: { type: 'string', description: 'Canvas title', optional: true },
mimetype: { type: 'string', description: 'MIME type of the canvas file', optional: true },
filetype: { type: 'string', description: 'Slack file type for the canvas', optional: true },
pretty_type: { type: 'string', description: 'Human-readable file type', optional: true },
user: { type: 'string', description: 'User ID of the canvas creator', optional: true },
editable: { type: 'boolean', description: 'Whether the canvas file is editable', optional: true },
size: { type: 'number', description: 'Canvas file size in bytes', optional: true },
mode: { type: 'string', description: 'File mode', optional: true },
is_external: {
type: 'boolean',
description: 'Whether the canvas is externally hosted',
optional: true,
},
is_public: { type: 'boolean', description: 'Whether the canvas is public', optional: true },
url_private: {
type: 'string',
description: 'Private URL for the canvas file',
optional: true,
},
url_private_download: {
type: 'string',
description: 'Private download URL for the canvas file',
optional: true,
},
permalink: { type: 'string', description: 'Permanent URL for the canvas', optional: true },
channels: {
type: 'array',
description: 'Public channel IDs where the canvas appears',
items: { type: 'string', description: 'Channel ID' },
optional: true,
},
groups: {
type: 'array',
description: 'Private channel IDs where the canvas appears',
items: { type: 'string', description: 'Channel ID' },
optional: true,
},
ims: {
type: 'array',
description: 'Direct message IDs where the canvas appears',
items: { type: 'string', description: 'Conversation ID' },
optional: true,
},
canvas_readtime: {
type: 'number',
description: 'Approximate read time for canvas content',
optional: true,
},
is_channel_space: {
type: 'boolean',
description: 'Whether this canvas is linked to a channel',
optional: true,
},
linked_channel_id: {
type: 'string',
description: 'Channel ID linked to this canvas',
optional: true,
},
canvas_creator_id: {
type: 'string',
description: 'User ID of the canvas creator',
optional: true,
},
} as const satisfies Record<string, OutputProperty>
export const CANVAS_PAGING_OUTPUT_PROPERTIES = {
count: { type: 'number', description: 'Number of items requested per page' },
total: { type: 'number', description: 'Total number of matching files' },
page: { type: 'number', description: 'Current page number' },
pages: { type: 'number', description: 'Total number of pages' },
} as const satisfies Record<string, OutputProperty>
export const CANVAS_SECTION_OUTPUT_PROPERTIES = {
id: { type: 'string', description: 'Canvas section identifier' },
} as const satisfies Record<string, OutputProperty>
/**
* Output definition for modal view objects
* Based on Slack views.open response structure
@@ -735,6 +820,29 @@ export interface SlackCreateChannelCanvasParams extends SlackBaseParams {
content?: string
}
export interface SlackGetCanvasParams extends SlackBaseParams {
canvasId: string
}
export interface SlackListCanvasesParams extends SlackBaseParams {
channel?: string
count?: number
page?: number
user?: string
tsFrom?: string
tsTo?: string
teamId?: string
}
export interface SlackLookupCanvasSectionsParams extends SlackBaseParams {
canvasId: string
criteria: Record<string, unknown> | string
}
export interface SlackDeleteCanvasParams extends SlackBaseParams {
canvasId: string
}
export interface SlackOpenViewParams extends SlackBaseParams {
triggerId: string
interactivityPointer?: string
@@ -1078,6 +1186,69 @@ export interface SlackCreateChannelCanvasResponse extends ToolResponse {
}
}
export interface SlackCanvasFile {
id: string
created: number | null
timestamp: number | null
name?: string | null
title?: string | null
mimetype?: string | null
filetype?: string | null
pretty_type?: string | null
user?: string | null
editable?: boolean | null
size?: number | null
mode?: string | null
is_external?: boolean | null
is_public?: boolean | null
url_private?: string | null
url_private_download?: string | null
permalink?: string | null
channels?: string[]
groups?: string[]
ims?: string[]
canvas_readtime?: number | null
is_channel_space?: boolean | null
linked_channel_id?: string | null
canvas_creator_id?: string | null
}
export interface SlackCanvasPaging {
count: number
total: number
page: number
pages: number
}
export interface SlackCanvasSection {
id: string
}
export interface SlackGetCanvasResponse extends ToolResponse {
output: {
canvas: SlackCanvasFile
}
}
export interface SlackListCanvasesResponse extends ToolResponse {
output: {
canvases: SlackCanvasFile[]
paging: SlackCanvasPaging
}
}
export interface SlackLookupCanvasSectionsResponse extends ToolResponse {
output: {
sections: SlackCanvasSection[]
}
}
export interface SlackDeleteCanvasResponse extends ToolResponse {
output: {
ok: boolean
}
}
export interface SlackView {
id: string
team_id?: string | null
@@ -1143,6 +1314,10 @@ export type SlackResponse =
| SlackGetUserPresenceResponse
| SlackEditCanvasResponse
| SlackCreateChannelCanvasResponse
| SlackGetCanvasResponse
| SlackListCanvasesResponse
| SlackLookupCanvasSectionsResponse
| SlackDeleteCanvasResponse
| SlackCreateConversationResponse
| SlackInviteToConversationResponse
| SlackOpenViewResponse

View File

@@ -0,0 +1,28 @@
import type { SlackCanvasFile } from '@/tools/slack/types'
export const mapCanvasFile = (file: SlackCanvasFile): SlackCanvasFile => ({
id: file.id,
created: file.created ?? null,
timestamp: file.timestamp ?? null,
name: file.name ?? null,
title: file.title ?? null,
mimetype: file.mimetype ?? null,
filetype: file.filetype ?? null,
pretty_type: file.pretty_type ?? null,
user: file.user ?? null,
editable: file.editable ?? null,
size: file.size ?? null,
mode: file.mode ?? null,
is_external: file.is_external ?? null,
is_public: file.is_public ?? null,
url_private: file.url_private ?? null,
url_private_download: file.url_private_download ?? null,
permalink: file.permalink ?? null,
channels: file.channels ?? [],
groups: file.groups ?? [],
ims: file.ims ?? [],
canvas_readtime: file.canvas_readtime ?? null,
is_channel_space: file.is_channel_space ?? null,
linked_channel_id: file.linked_channel_id ?? null,
canvas_creator_id: file.canvas_creator_id ?? null,
})