v0.6.3: hubspot integration, kb block improvements

This commit is contained in:
Waleed
2026-03-18 11:19:55 -07:00
committed by GitHub
8 changed files with 356 additions and 235 deletions

View File

@@ -16,8 +16,12 @@ import {
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import {
addHttpErrorConsoleEntry,
type BlockEventHandlerConfig,
createBlockEventHandlers,
addExecutionErrorConsoleEntry as sharedAddExecutionErrorConsoleEntry,
handleExecutionCancelledConsole as sharedHandleExecutionCancelledConsole,
handleExecutionErrorConsole as sharedHandleExecutionErrorConsole,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
import { getBlock } from '@/blocks'
import type { SerializableExecutionState } from '@/executor/execution/types'
@@ -159,99 +163,6 @@ export function useWorkflowExecution() {
setActiveBlocks,
])
/**
* Builds timing fields for execution-level console entries.
*/
const buildExecutionTiming = useCallback((durationMs?: number) => {
const normalizedDuration = durationMs || 0
return {
durationMs: normalizedDuration,
startedAt: new Date(Date.now() - normalizedDuration).toISOString(),
endedAt: new Date().toISOString(),
}
}, [])
/**
* Adds an execution-level error entry to the console when appropriate.
*/
const addExecutionErrorConsoleEntry = useCallback(
(params: {
workflowId?: string
executionId?: string
error?: string
durationMs?: number
blockLogs: BlockLog[]
isPreExecutionError?: boolean
}) => {
if (!params.workflowId) return
const hasBlockError = params.blockLogs.some((log) => log.error)
const isPreExecutionError = params.isPreExecutionError ?? false
if (!isPreExecutionError && hasBlockError) {
return
}
const errorMessage = params.error || 'Execution failed'
const isTimeout = errorMessage.toLowerCase().includes('timed out')
const timing = buildExecutionTiming(params.durationMs)
addConsole({
input: {},
output: {},
success: false,
error: errorMessage,
durationMs: timing.durationMs,
startedAt: timing.startedAt,
executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER,
endedAt: timing.endedAt,
workflowId: params.workflowId,
blockId: isPreExecutionError
? 'validation'
: isTimeout
? 'timeout-error'
: 'execution-error',
executionId: params.executionId,
blockName: isPreExecutionError
? 'Workflow Validation'
: isTimeout
? 'Timeout Error'
: 'Execution Error',
blockType: isPreExecutionError ? 'validation' : 'error',
})
},
[addConsole, buildExecutionTiming]
)
/**
* Adds an execution-level cancellation entry to the console.
*/
const addExecutionCancelledConsoleEntry = useCallback(
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
if (!params.workflowId) return
const timing = buildExecutionTiming(params.durationMs)
addConsole({
input: {},
output: {},
success: false,
error: 'Execution was cancelled',
durationMs: timing.durationMs,
startedAt: timing.startedAt,
executionOrder: Number.MAX_SAFE_INTEGER,
endedAt: timing.endedAt,
workflowId: params.workflowId,
blockId: 'cancelled',
executionId: params.executionId,
blockName: 'Execution Cancelled',
blockType: 'cancelled',
})
},
[addConsole, buildExecutionTiming]
)
/**
* Handles workflow-level execution errors for console output.
*/
const handleExecutionErrorConsole = useCallback(
(params: {
workflowId?: string
@@ -261,25 +172,24 @@ export function useWorkflowExecution() {
blockLogs: BlockLog[]
isPreExecutionError?: boolean
}) => {
if (params.workflowId) {
cancelRunningEntries(params.workflowId)
}
addExecutionErrorConsoleEntry(params)
if (!params.workflowId) return
sharedHandleExecutionErrorConsole(addConsole, cancelRunningEntries, {
...params,
workflowId: params.workflowId,
})
},
[addExecutionErrorConsoleEntry, cancelRunningEntries]
[addConsole, cancelRunningEntries]
)
/**
* Handles workflow-level execution cancellations for console output.
*/
const handleExecutionCancelledConsole = useCallback(
(params: { workflowId?: string; executionId?: string; durationMs?: number }) => {
if (params.workflowId) {
cancelRunningEntries(params.workflowId)
}
addExecutionCancelledConsoleEntry(params)
if (!params.workflowId) return
sharedHandleExecutionCancelledConsole(addConsole, cancelRunningEntries, {
...params,
workflowId: params.workflowId,
})
},
[addExecutionCancelledConsoleEntry, cancelRunningEntries]
[addConsole, cancelRunningEntries]
)
const buildBlockEventHandlers = useCallback(
@@ -1319,31 +1229,42 @@ export function useWorkflowExecution() {
} else {
if (!executor) {
try {
let blockId = 'serialization'
let blockName = 'Workflow'
let blockType = 'serializer'
if (error instanceof WorkflowValidationError) {
blockId = error.blockId || blockId
blockName = error.blockName || blockName
blockType = error.blockType || blockType
}
const httpStatus =
isRecord(error) && typeof error.httpStatus === 'number' ? error.httpStatus : undefined
const storeAddConsole = useTerminalConsoleStore.getState().addConsole
// Use MAX_SAFE_INTEGER so execution errors appear at the end of the log
useTerminalConsoleStore.getState().addConsole({
input: {},
output: {},
success: false,
error: normalizedMessage,
durationMs: 0,
startedAt: new Date().toISOString(),
executionOrder: Number.MAX_SAFE_INTEGER,
endedAt: new Date().toISOString(),
workflowId: activeWorkflowId || '',
blockId,
executionId: options?.executionId,
blockName,
blockType,
})
if (httpStatus && activeWorkflowId) {
addHttpErrorConsoleEntry(storeAddConsole, {
workflowId: activeWorkflowId,
executionId: options?.executionId,
error: normalizedMessage,
httpStatus,
})
} else if (error instanceof WorkflowValidationError) {
storeAddConsole({
input: {},
output: {},
success: false,
error: normalizedMessage,
durationMs: 0,
startedAt: new Date().toISOString(),
executionOrder: Number.MAX_SAFE_INTEGER,
endedAt: new Date().toISOString(),
workflowId: activeWorkflowId || '',
blockId: error.blockId || 'serialization',
executionId: options?.executionId,
blockName: error.blockName || 'Workflow',
blockType: error.blockType || 'serializer',
})
} else {
sharedAddExecutionErrorConsoleEntry(storeAddConsole, {
workflowId: activeWorkflowId || '',
executionId: options?.executionId,
error: normalizedMessage,
blockLogs: [],
isPreExecutionError: true,
})
}
} catch {}
}
@@ -1681,8 +1602,8 @@ export function useWorkflowExecution() {
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
consoleMode: 'add',
includeStartConsoleEntry: false,
consoleMode: 'update',
includeStartConsoleEntry: true,
})
await executionStream.executeFromBlock({

View File

@@ -13,6 +13,7 @@ import type {
StreamingExecution,
} from '@/executor/types'
import { stripCloneSuffixes } from '@/executor/utils/subflow-utils'
import { processSSEStream } from '@/hooks/use-execution-stream'
const logger = createLogger('workflow-execution-utils')
@@ -406,6 +407,161 @@ export function createBlockEventHandlers(
return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted }
}
type AddConsoleFn = (entry: Omit<ConsoleEntry, 'id' | 'timestamp'>) => ConsoleEntry
type CancelRunningEntriesFn = (workflowId: string) => void
export interface ExecutionTimingFields {
durationMs: number
startedAt: string
endedAt: string
}
/**
* Builds timing fields for an execution-level console entry.
*/
export function buildExecutionTiming(durationMs?: number): ExecutionTimingFields {
const normalizedDuration = durationMs || 0
return {
durationMs: normalizedDuration,
startedAt: new Date(Date.now() - normalizedDuration).toISOString(),
endedAt: new Date().toISOString(),
}
}
export interface ExecutionErrorConsoleParams {
workflowId: string
executionId?: string
error?: string
durationMs?: number
blockLogs: BlockLog[]
isPreExecutionError?: boolean
}
/**
* Adds an execution-level error entry to the console when no block-level error already covers it.
* Shared between direct user execution and mothership-initiated execution.
*/
export function addExecutionErrorConsoleEntry(
addConsole: AddConsoleFn,
params: ExecutionErrorConsoleParams
): void {
const hasBlockError = params.blockLogs.some((log) => log.error)
const isPreExecutionError = params.isPreExecutionError ?? false
if (!isPreExecutionError && hasBlockError) return
const errorMessage = params.error || 'Execution failed'
const isTimeout = errorMessage.toLowerCase().includes('timed out')
const timing = buildExecutionTiming(params.durationMs)
addConsole({
input: {},
output: {},
success: false,
error: errorMessage,
durationMs: timing.durationMs,
startedAt: timing.startedAt,
executionOrder: isPreExecutionError ? 0 : Number.MAX_SAFE_INTEGER,
endedAt: timing.endedAt,
workflowId: params.workflowId,
blockId: isPreExecutionError ? 'validation' : isTimeout ? 'timeout-error' : 'execution-error',
executionId: params.executionId,
blockName: isPreExecutionError
? 'Workflow Validation'
: isTimeout
? 'Timeout Error'
: 'Execution Error',
blockType: isPreExecutionError ? 'validation' : 'error',
})
}
/**
* Cancels running entries and adds an execution-level error console entry.
*/
export function handleExecutionErrorConsole(
addConsole: AddConsoleFn,
cancelRunningEntries: CancelRunningEntriesFn,
params: ExecutionErrorConsoleParams
): void {
cancelRunningEntries(params.workflowId)
addExecutionErrorConsoleEntry(addConsole, params)
}
export interface HttpErrorConsoleParams {
workflowId: string
executionId?: string
error: string
httpStatus: number
}
/**
* Adds a console entry for HTTP-level execution errors (non-OK response before SSE streaming).
*/
export function addHttpErrorConsoleEntry(
addConsole: AddConsoleFn,
params: HttpErrorConsoleParams
): void {
const isValidationError = params.httpStatus >= 400 && params.httpStatus < 500
const now = new Date().toISOString()
addConsole({
input: {},
output: {},
success: false,
error: params.error,
durationMs: 0,
startedAt: now,
executionOrder: 0,
endedAt: now,
workflowId: params.workflowId,
blockId: isValidationError ? 'validation' : 'execution-error',
executionId: params.executionId,
blockName: isValidationError ? 'Workflow Validation' : 'Execution Error',
blockType: isValidationError ? 'validation' : 'error',
})
}
export interface CancelledConsoleParams {
workflowId: string
executionId?: string
durationMs?: number
}
/**
* Adds a console entry for execution cancellation.
*/
export function addCancelledConsoleEntry(
addConsole: AddConsoleFn,
params: CancelledConsoleParams
): void {
const timing = buildExecutionTiming(params.durationMs)
addConsole({
input: {},
output: {},
success: false,
error: 'Execution was cancelled',
durationMs: timing.durationMs,
startedAt: timing.startedAt,
executionOrder: Number.MAX_SAFE_INTEGER,
endedAt: timing.endedAt,
workflowId: params.workflowId,
blockId: 'cancelled',
executionId: params.executionId,
blockName: 'Execution Cancelled',
blockType: 'cancelled',
})
}
/**
* Cancels running entries and adds a cancelled console entry.
*/
export function handleExecutionCancelledConsole(
addConsole: AddConsoleFn,
cancelRunningEntries: CancelRunningEntriesFn,
params: CancelledConsoleParams
): void {
cancelRunningEntries(params.workflowId)
addCancelledConsoleEntry(addConsole, params)
}
export interface WorkflowExecutionOptions {
workflowId?: string
workflowInput?: any
@@ -436,7 +592,7 @@ export async function executeWorkflowWithFullLogging(
}
const executionId = options.executionId || uuidv4()
const { addConsole, updateConsole } = useTerminalConsoleStore.getState()
const { addConsole, updateConsole, cancelRunningEntries } = useTerminalConsoleStore.getState()
const { setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, setCurrentExecutionId } =
useExecutionStore.getState()
const wfId = targetWorkflowId
@@ -445,6 +601,7 @@ export async function executeWorkflowWithFullLogging(
const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
const executionIdRef = { current: executionId }
const accumulatedBlockLogs: BlockLog[] = []
const blockHandlers = createBlockEventHandlers(
{
@@ -453,7 +610,7 @@ export async function executeWorkflowWithFullLogging(
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs: [],
accumulatedBlockLogs,
accumulatedBlockStates: new Map(),
executedBlockIds: new Set(),
consoleMode: 'update',
@@ -490,16 +647,26 @@ export async function executeWorkflowWithFullLogging(
if (!response.ok) {
const error = await response.json()
throw new Error(error.error || 'Workflow execution failed')
const errorMessage = error.error || 'Workflow execution failed'
addHttpErrorConsoleEntry(addConsole, {
workflowId: wfId,
executionId,
error: errorMessage,
httpStatus: response.status,
})
throw new Error(errorMessage)
}
if (!response.body) {
throw new Error('No response body')
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
const serverExecutionId = response.headers.get('X-Execution-Id')
if (serverExecutionId) {
executionIdRef.current = serverExecutionId
setCurrentExecutionId(wfId, serverExecutionId)
}
let executionResult: ExecutionResult = {
success: false,
output: {},
@@ -507,89 +674,67 @@ export async function executeWorkflowWithFullLogging(
}
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
await processSSEStream(
response.body.getReader(),
{
onExecutionStarted: (data) => {
logger.info('Execution started', { startTime: data.startTime })
},
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
onBlockStarted: blockHandlers.onBlockStarted,
onBlockCompleted: blockHandlers.onBlockCompleted,
onBlockError: blockHandlers.onBlockError,
onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted,
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) continue
onExecutionCompleted: (data) => {
setCurrentExecutionId(wfId, null)
executionResult = {
success: data.success,
output: data.output,
logs: accumulatedBlockLogs,
metadata: {
duration: data.duration,
startTime: data.startTime,
endTime: data.endTime,
},
}
},
const data = line.substring(6).trim()
if (data === '[DONE]') continue
onExecutionCancelled: () => {
setCurrentExecutionId(wfId, null)
executionResult = {
success: false,
output: {},
error: 'Execution was cancelled',
logs: accumulatedBlockLogs,
}
},
let event: any
try {
event = JSON.parse(data)
} catch {
continue
}
switch (event.type) {
case 'execution:started': {
setCurrentExecutionId(wfId, event.executionId)
executionIdRef.current = event.executionId || executionId
break
onExecutionError: (data) => {
setCurrentExecutionId(wfId, null)
const errorMessage = data.error || 'Execution failed'
executionResult = {
success: false,
output: {},
error: errorMessage,
logs: accumulatedBlockLogs,
metadata: { duration: data.duration },
}
case 'block:started':
blockHandlers.onBlockStarted(event.data)
break
case 'block:completed':
blockHandlers.onBlockCompleted(event.data)
break
case 'block:error':
blockHandlers.onBlockError(event.data)
break
case 'block:childWorkflowStarted':
blockHandlers.onBlockChildWorkflowStarted(event.data)
break
case 'execution:completed':
setCurrentExecutionId(wfId, null)
executionResult = {
success: event.data.success,
output: event.data.output,
logs: [],
metadata: {
duration: event.data.duration,
startTime: event.data.startTime,
endTime: event.data.endTime,
},
}
break
case 'execution:cancelled':
setCurrentExecutionId(wfId, null)
executionResult = {
success: false,
output: {},
error: 'Execution was cancelled',
logs: [],
}
break
case 'execution:error':
setCurrentExecutionId(wfId, null)
executionResult = {
success: false,
output: {},
error: event.data.error || 'Execution failed',
logs: [],
}
break
}
}
}
handleExecutionErrorConsole(addConsole, cancelRunningEntries, {
workflowId: wfId,
executionId: executionIdRef.current,
error: errorMessage,
durationMs: data.duration || 0,
blockLogs: accumulatedBlockLogs,
isPreExecutionError: accumulatedBlockLogs.length === 0,
})
},
},
'CopilotExecution'
)
} finally {
setCurrentExecutionId(wfId, null)
reader.releaseLock()
setActiveBlocks(wfId, new Set())
}

View File

@@ -31,8 +31,9 @@ function isClientDisconnectError(error: any): boolean {
/**
* Processes SSE events from a response body and invokes appropriate callbacks.
* Exported for use by standalone (non-hook) execution paths like executeWorkflowWithFullLogging.
*/
async function processSSEStream(
export async function processSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
callbacks: ExecutionStreamCallbacks,
logPrefix: string
@@ -198,6 +199,7 @@ export function useExecutionStream() {
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
Object.assign(error, { httpStatus: response.status })
throw error
}
@@ -267,12 +269,15 @@ export function useExecutionStream() {
try {
errorResponse = await response.json()
} catch {
throw new Error(`Server error (${response.status}): ${response.statusText}`)
const error = new Error(`Server error (${response.status}): ${response.statusText}`)
Object.assign(error, { httpStatus: response.status })
throw error
}
const error = new Error(errorResponse.error || 'Failed to start execution')
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
Object.assign(error, { httpStatus: response.status })
throw error
}

View File

@@ -836,6 +836,9 @@ export const OAUTH_PROVIDERS: Record<string, OAuthProviderConfig> = {
'crm.lists.read',
'crm.lists.write',
'crm.objects.tickets.read',
'crm.objects.tickets.write',
'tickets',
'oauth',
],
},
},

View File

@@ -308,6 +308,9 @@ export const SCOPE_DESCRIPTIONS: Record<string, string> = {
'crm.lists.read': 'Read HubSpot lists',
'crm.lists.write': 'Create and update HubSpot lists',
'crm.objects.tickets.read': 'Read HubSpot tickets',
'crm.objects.tickets.write': 'Create and update HubSpot tickets',
tickets: 'Access HubSpot tickets',
oauth: 'Authenticate with HubSpot OAuth',
// Salesforce scopes
api: 'Access Salesforce API',

View File

@@ -1,4 +1,7 @@
import type { KnowledgeCreateDocumentResponse } from '@/tools/knowledge/types'
import {
inferDocumentFileInfo,
type KnowledgeCreateDocumentResponse,
} from '@/tools/knowledge/types'
import { enrichKBTagsSchema } from '@/tools/schema-enrichers'
import { formatDocumentTagsForAPI, parseDocumentTags } from '@/tools/shared/tags'
import type { ToolConfig } from '@/tools/types'
@@ -63,30 +66,36 @@ export const knowledgeCreateDocumentTool: ToolConfig<any, KnowledgeCreateDocumen
if (!textContent || textContent.length < 1) {
throw new Error('Document content cannot be empty')
}
if (textContent.length > 1000000) {
const utf8Bytes = new TextEncoder().encode(textContent)
const contentBytes = utf8Bytes.length
if (contentBytes > 1_000_000) {
throw new Error('Document content exceeds maximum size of 1MB')
}
const contentBytes = new TextEncoder().encode(textContent).length
let base64Content: string
if (typeof Buffer !== 'undefined') {
base64Content = Buffer.from(textContent, 'utf8').toString('base64')
} else {
let binary = ''
for (let i = 0; i < utf8Bytes.length; i++) {
binary += String.fromCharCode(utf8Bytes[i])
}
base64Content = btoa(binary)
}
const utf8Bytes = new TextEncoder().encode(textContent)
const base64Content =
typeof Buffer !== 'undefined'
? Buffer.from(textContent, 'utf8').toString('base64')
: btoa(String.fromCharCode(...utf8Bytes))
const { filename, mimeType } = inferDocumentFileInfo(documentName)
const dataUri = `data:${mimeType};base64,${base64Content}`
const dataUri = `data:text/plain;base64,${base64Content}`
// Parse document tags from various formats (object, array, JSON string)
const parsedTags = parseDocumentTags(params.documentTags)
const tagData = formatDocumentTagsForAPI(parsedTags)
const documents = [
{
filename: documentName.endsWith('.txt') ? documentName : `${documentName}.txt`,
filename,
fileUrl: dataUri,
fileSize: contentBytes,
mimeType: 'text/plain',
mimeType,
...tagData,
},
]

View File

@@ -1,3 +1,38 @@
import {
getFileExtension,
getMimeTypeFromExtension as getUploadMimeType,
} from '@/lib/uploads/utils/file-utils'
const TEXT_COMPATIBLE_MIME_TYPES = new Set([
'text/plain',
'text/html',
'text/markdown',
'text/csv',
'application/json',
'application/xml',
'application/x-yaml',
])
/**
* Extracts extension from a filename and returns the normalized filename and MIME type.
* If the extension maps to a recognized text-compatible MIME type, it is preserved.
* Otherwise, the filename is normalized to `.txt` with `text/plain`.
*/
export function inferDocumentFileInfo(documentName: string): {
filename: string
mimeType: string
} {
const ext = getFileExtension(documentName)
if (ext) {
const mimeType = getUploadMimeType(ext)
if (TEXT_COMPATIBLE_MIME_TYPES.has(mimeType)) {
return { filename: documentName, mimeType }
}
}
const base = ext ? documentName.slice(0, documentName.lastIndexOf('.')) : documentName
return { filename: `${base || documentName}.txt`, mimeType: 'text/plain' }
}
export interface KnowledgeSearchResult {
documentId: string
documentName: string

View File

@@ -1,6 +1,7 @@
import type {
KnowledgeUpsertDocumentParams,
KnowledgeUpsertDocumentResponse,
import {
inferDocumentFileInfo,
type KnowledgeUpsertDocumentParams,
type KnowledgeUpsertDocumentResponse,
} from '@/tools/knowledge/types'
import { enrichKBTagsSchema } from '@/tools/schema-enrichers'
import { formatDocumentTagsForAPI, parseDocumentTags } from '@/tools/shared/tags'
@@ -94,18 +95,17 @@ export const knowledgeUpsertDocumentTool: ToolConfig<
base64Content = btoa(binary)
}
const dataUri = `data:text/plain;base64,${base64Content}`
const { filename, mimeType } = inferDocumentFileInfo(documentName)
const dataUri = `data:${mimeType};base64,${base64Content}`
const parsedTags = parseDocumentTags(params.documentTags)
const tagData = formatDocumentTagsForAPI(parsedTags)
const filename = documentName.endsWith('.txt') ? documentName : `${documentName}.txt`
const requestBody: Record<string, unknown> = {
filename,
fileUrl: dataUri,
fileSize: contentBytes,
mimeType: 'text/plain',
mimeType,
...tagData,
processingOptions: {
chunkSize: 1024,