Compare commits

..

31 Commits

Author SHA1 Message Date
Waleed
dff1c9d083 v0.5.64: unsubscribe, search improvements, metrics, additional SSO configuration 2026-01-20 00:34:11 -08:00
Vikhyath Mondreti
b09f683072 v0.5.63: ui and performance improvements, more google tools 2026-01-18 15:22:42 -08:00
Vikhyath Mondreti
a8bb0db660 v0.5.62: webhook bug fixes, seeding default subblock values, block selection fixes 2026-01-16 20:27:06 -08:00
Waleed
af82820a28 v0.5.61: webhook improvements, workflow controls, react query for deployment status, chat fixes, reducto and pulse OCR, linear fixes 2026-01-16 18:06:23 -08:00
Waleed
4372841797 v0.5.60: invitation flow improvements, chat fixes, a2a improvements, additional copilot actions 2026-01-15 00:02:18 -08:00
Waleed
5e8c843241 v0.5.59: a2a support, documentation 2026-01-13 13:21:21 -08:00
Waleed
7bf3d73ee6 v0.5.58: export folders, new tools, permissions groups enhancements 2026-01-13 00:56:59 -08:00
Vikhyath Mondreti
7ffc11a738 v0.5.57: subagents, context menu improvements, bug fixes 2026-01-11 11:38:40 -08:00
Waleed
be578e2ed7 v0.5.56: batch operations, access control and permission groups, billing fixes 2026-01-10 00:31:34 -08:00
Waleed
f415e5edc4 v0.5.55: polling groups, bedrock provider, devcontainer fixes, workflow preview enhancements 2026-01-08 23:36:56 -08:00
Waleed
13a6e6c3fa v0.5.54: seo, model blacklist, helm chart updates, fireflies integration, autoconnect improvements, billing fixes 2026-01-07 16:09:45 -08:00
Waleed
f5ab7f21ae v0.5.53: hotkey improvements, added redis fallback, fixes for workflow tool 2026-01-06 23:34:52 -08:00
Waleed
bfb6fffe38 v0.5.52: new port-based router block, combobox expression and variable support 2026-01-06 16:14:10 -08:00
Waleed
4fbec0a43f v0.5.51: triggers, kb, condition block improvements, supabase and grain integration updates 2026-01-06 14:26:46 -08:00
Waleed
585f5e365b v0.5.50: import improvements, ui upgrades, kb styling and performance improvements 2026-01-05 00:35:55 -08:00
Waleed
3792bdd252 v0.5.49: hitl improvements, new email styles, imap trigger, logs context menu (#2672)
* feat(logs-context-menu): consolidated logs utils and types, added logs record context menu (#2659)

* feat(email): welcome email; improvement(emails): ui/ux (#2658)

* feat(email): welcome email; improvement(emails): ui/ux

* improvement(emails): links, accounts, preview

* refactor(emails): file structure and wrapper components

* added envvar for personal emails sent, added isHosted gate

* fixed failing tests, added env mock

* fix: removed comment

---------

Co-authored-by: waleed <walif6@gmail.com>

* fix(logging): hitl + trigger dev crash protection (#2664)

* hitl gaps

* deal with trigger worker crashes

* cleanup import strcuture

* feat(imap): added support for imap trigger (#2663)

* feat(tools): added support for imap trigger

* feat(imap): added parity, tested

* ack PR comments

* final cleanup

* feat(i18n): update translations (#2665)

Co-authored-by: waleedlatif1 <waleedlatif1@users.noreply.github.com>

* fix(grain): updated grain trigger to auto-establish trigger (#2666)

Co-authored-by: aadamgough <adam@sim.ai>

* feat(admin): routes to manage deployments (#2667)

* feat(admin): routes to manage deployments

* fix naming fo deployed by

* feat(time-picker): added timepicker emcn component, added to playground, added searchable prop for dropdown, added more timezones for schedule, updated license and notice date (#2668)

* feat(time-picker): added timepicker emcn component, added to playground, added searchable prop for dropdown, added more timezones for schedule, updated license and notice date

* removed unused params, cleaned up redundant utils

* improvement(invite): aligned styling (#2669)

* improvement(invite): aligned with rest of app

* fix(invite): error handling

* fix: addressed comments

---------

Co-authored-by: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: waleedlatif1 <waleedlatif1@users.noreply.github.com>
Co-authored-by: Adam Gough <77861281+aadamgough@users.noreply.github.com>
Co-authored-by: aadamgough <adam@sim.ai>
2026-01-03 13:19:18 -08:00
Waleed
eb5d1f3e5b v0.5.48: copy-paste workflow blocks, docs updates, mcp tool fixes 2025-12-31 18:00:04 -08:00
Waleed
54ab82c8dd v0.5.47: deploy workflow as mcp, kb chunks tokenizer, UI improvements, jira service management tools 2025-12-30 23:18:58 -08:00
Waleed
f895bf469b v0.5.46: build improvements, greptile, light mode improvements 2025-12-29 02:17:52 -08:00
Waleed
dd3209af06 v0.5.45: light mode fixes, realtime usage indicator, docker build improvements 2025-12-27 19:57:42 -08:00
Waleed
b6ba3b50a7 v0.5.44: keyboard shortcuts, autolayout, light mode, byok, testing improvements 2025-12-26 21:25:19 -08:00
Waleed
b304233062 v0.5.43: export logs, circleback, grain, vertex, code hygiene, schedule improvements 2025-12-23 19:19:18 -08:00
Vikhyath Mondreti
57e4b49bd6 v0.5.42: fix memory migration 2025-12-23 01:24:54 -08:00
Vikhyath Mondreti
e12dd204ed v0.5.41: memory fixes, copilot improvements, knowledgebase improvements, LLM providers standardization 2025-12-23 00:15:18 -08:00
Vikhyath Mondreti
3d9d9cbc54 v0.5.40: supabase ops to allow non-public schemas, jira uuid 2025-12-21 22:28:05 -08:00
Waleed
0f4ec962ad v0.5.39: notion, workflow variables fixes 2025-12-20 20:44:00 -08:00
Waleed
4827866f9a v0.5.38: snap to grid, copilot ux improvements, billing line items 2025-12-20 17:24:38 -08:00
Waleed
3e697d9ed9 v0.5.37: redaction utils consolidation, logs updates, autoconnect improvements, additional kb tag types 2025-12-19 22:31:55 -08:00
Martin Yankov
4431a1a484 fix(helm): add custom egress rules to realtime network policy (#2481)
The realtime service network policy was missing the custom egress rules section
that allows configuration of additional egress rules via values.yaml. This caused
the realtime pods to be unable to connect to external databases (e.g., PostgreSQL
on port 5432) when using external database configurations.

The app network policy already had this section, but the realtime network policy
was missing it, creating an inconsistency and preventing the realtime service
from accessing external databases configured via networkPolicy.egress values.

This fix adds the same custom egress rules template section to the realtime
network policy, matching the app network policy behavior and allowing users to
configure database connectivity via values.yaml.
2025-12-19 18:59:08 -08:00
Waleed
4d1a9a3f22 v0.5.36: hitl improvements, opengraph, slack fixes, one-click unsubscribe, auth checks, new db indexes 2025-12-19 01:27:49 -08:00
Vikhyath Mondreti
eb07a080fb v0.5.35: helm updates, copilot improvements, 404 for docs, salesforce fixes, subflow resize clamping 2025-12-18 16:23:19 -08:00
15 changed files with 159 additions and 1485 deletions

View File

@@ -33,7 +33,6 @@ const BlockDataSchema = z.object({
doWhileCondition: z.string().optional(),
parallelType: z.enum(['collection', 'count']).optional(),
type: z.string().optional(),
canonicalModes: z.record(z.enum(['basic', 'advanced'])).optional(),
})
const SubBlockStateSchema = z.object({

View File

@@ -57,12 +57,6 @@ export const BrowserUseBlock: BlockConfig<BrowserUseResponse> = {
type: 'switch',
placeholder: 'Save browser data',
},
{
id: 'profile_id',
title: 'Profile ID',
type: 'short-input',
placeholder: 'Enter browser profile ID (optional)',
},
{
id: 'apiKey',
title: 'API Key',
@@ -81,7 +75,6 @@ export const BrowserUseBlock: BlockConfig<BrowserUseResponse> = {
variables: { type: 'json', description: 'Task variables' },
model: { type: 'string', description: 'AI model to use' },
save_browser_data: { type: 'boolean', description: 'Save browser data' },
profile_id: { type: 'string', description: 'Browser profile ID for persistent sessions' },
},
outputs: {
id: { type: 'string', description: 'Task execution identifier' },

View File

@@ -1,599 +0,0 @@
/**
* @vitest-environment node
*/
import { loggerMock } from '@sim/testing'
import { afterEach, beforeEach, describe, expect, it, type Mock, vi } from 'vitest'
vi.mock('@sim/logger', () => loggerMock)
vi.mock('@/lib/execution/cancellation', () => ({
isExecutionCancelled: vi.fn(),
isRedisCancellationEnabled: vi.fn(),
}))
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { EdgeManager } from '@/executor/execution/edge-manager'
import type { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import type { ExecutionContext } from '@/executor/types'
import type { SerializedBlock } from '@/serializer/types'
import { ExecutionEngine } from './engine'
function createMockBlock(id: string): SerializedBlock {
return {
id,
metadata: { id: 'test', name: 'Test Block' },
position: { x: 0, y: 0 },
config: { tool: '', params: {} },
inputs: {},
outputs: {},
enabled: true,
}
}
function createMockNode(id: string, blockType = 'test'): DAGNode {
return {
id,
block: {
...createMockBlock(id),
metadata: { id: blockType, name: `Block ${id}` },
},
outgoingEdges: new Map(),
incomingEdges: new Set(),
metadata: {},
}
}
function createMockContext(overrides: Partial<ExecutionContext> = {}): ExecutionContext {
return {
workflowId: 'test-workflow',
workspaceId: 'test-workspace',
executionId: 'test-execution',
userId: 'test-user',
blockStates: new Map(),
executedBlocks: new Set(),
blockLogs: [],
loopExecutions: new Map(),
parallelExecutions: new Map(),
completedLoops: new Set(),
activeExecutionPath: new Set(),
metadata: {
executionId: 'test-execution',
startTime: new Date().toISOString(),
pendingBlocks: [],
},
envVars: {},
...overrides,
}
}
function createMockDAG(nodes: DAGNode[]): DAG {
const nodeMap = new Map<string, DAGNode>()
nodes.forEach((node) => nodeMap.set(node.id, node))
return {
nodes: nodeMap,
loopConfigs: new Map(),
parallelConfigs: new Map(),
}
}
interface MockEdgeManager extends EdgeManager {
processOutgoingEdges: ReturnType<typeof vi.fn>
}
function createMockEdgeManager(
processOutgoingEdgesImpl?: (node: DAGNode) => string[]
): MockEdgeManager {
const mockFn = vi.fn().mockImplementation(processOutgoingEdgesImpl || (() => []))
return {
processOutgoingEdges: mockFn,
isNodeReady: vi.fn().mockReturnValue(true),
deactivateEdgeAndDescendants: vi.fn(),
restoreIncomingEdge: vi.fn(),
clearDeactivatedEdges: vi.fn(),
clearDeactivatedEdgesForNodes: vi.fn(),
} as unknown as MockEdgeManager
}
interface MockNodeOrchestrator extends NodeExecutionOrchestrator {
executionCount: number
}
function createMockNodeOrchestrator(executeDelay = 0): MockNodeOrchestrator {
const mock = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async () => {
mock.executionCount++
if (executeDelay > 0) {
await new Promise((resolve) => setTimeout(resolve, executeDelay))
}
return { nodeId: 'test', output: {}, isFinalOutput: false }
}),
handleNodeCompletion: vi.fn(),
}
return mock as unknown as MockNodeOrchestrator
}
describe('ExecutionEngine', () => {
beforeEach(() => {
vi.clearAllMocks()
;(isExecutionCancelled as Mock).mockResolvedValue(false)
;(isRedisCancellationEnabled as Mock).mockReturnValue(false)
})
afterEach(() => {
vi.useRealTimers()
})
describe('Normal execution', () => {
it('should execute a simple linear workflow', async () => {
const startNode = createMockNode('start', 'starter')
const endNode = createMockNode('end', 'function')
startNode.outgoingEdges.set('edge1', { target: 'end' })
endNode.incomingEdges.add('start')
const dag = createMockDAG([startNode, endNode])
const context = createMockContext()
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return ['end']
return []
})
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('start')
expect(result.success).toBe(true)
expect(nodeOrchestrator.executionCount).toBe(2)
})
it('should mark execution as successful when completed without cancellation', async () => {
const startNode = createMockNode('start', 'starter')
const dag = createMockDAG([startNode])
const context = createMockContext()
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('start')
expect(result.success).toBe(true)
expect(result.status).toBeUndefined()
})
it('should execute all nodes in a multi-node workflow', async () => {
const nodes = [
createMockNode('start', 'starter'),
createMockNode('middle1', 'function'),
createMockNode('middle2', 'function'),
createMockNode('end', 'function'),
]
nodes[0].outgoingEdges.set('e1', { target: 'middle1' })
nodes[1].outgoingEdges.set('e2', { target: 'middle2' })
nodes[2].outgoingEdges.set('e3', { target: 'end' })
const dag = createMockDAG(nodes)
const context = createMockContext()
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return ['middle1']
if (node.id === 'middle1') return ['middle2']
if (node.id === 'middle2') return ['end']
return []
})
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('start')
expect(result.success).toBe(true)
expect(nodeOrchestrator.executionCount).toBe(4)
})
})
describe('Cancellation via AbortSignal', () => {
it('should stop execution immediately when aborted before start', async () => {
const abortController = new AbortController()
abortController.abort()
const startNode = createMockNode('start', 'starter')
const dag = createMockDAG([startNode])
const context = createMockContext({ abortSignal: abortController.signal })
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('start')
expect(result.status).toBe('cancelled')
expect(nodeOrchestrator.executionCount).toBe(0)
})
it('should stop execution when aborted mid-workflow', async () => {
const abortController = new AbortController()
const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
for (let i = 0; i < nodes.length - 1; i++) {
nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
}
const dag = createMockDAG(nodes)
const context = createMockContext({ abortSignal: abortController.signal })
let callCount = 0
const edgeManager = createMockEdgeManager((node) => {
callCount++
if (callCount === 2) abortController.abort()
const idx = Number.parseInt(node.id.replace('node', ''))
if (idx < 4) return [`node${idx + 1}`]
return []
})
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('node0')
expect(result.success).toBe(false)
expect(result.status).toBe('cancelled')
expect(nodeOrchestrator.executionCount).toBeLessThan(5)
})
it('should not wait for slow executions when cancelled', async () => {
const abortController = new AbortController()
const startNode = createMockNode('start', 'starter')
const slowNode = createMockNode('slow', 'function')
startNode.outgoingEdges.set('edge1', { target: 'slow' })
const dag = createMockDAG([startNode, slowNode])
const context = createMockContext({ abortSignal: abortController.signal })
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return ['slow']
return []
})
const nodeOrchestrator = createMockNodeOrchestrator(500)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const executionPromise = engine.run('start')
setTimeout(() => abortController.abort(), 50)
const startTime = Date.now()
const result = await executionPromise
const duration = Date.now() - startTime
expect(result.status).toBe('cancelled')
expect(duration).toBeLessThan(400)
})
it('should return cancelled status even if error thrown during cancellation', async () => {
const abortController = new AbortController()
abortController.abort()
const startNode = createMockNode('start', 'starter')
const dag = createMockDAG([startNode])
const context = createMockContext({ abortSignal: abortController.signal })
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('start')
expect(result.status).toBe('cancelled')
expect(result.success).toBe(false)
})
})
describe('Cancellation via Redis', () => {
it('should check Redis for cancellation when enabled', async () => {
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
;(isExecutionCancelled as Mock).mockResolvedValue(false)
const startNode = createMockNode('start', 'starter')
const dag = createMockDAG([startNode])
const context = createMockContext()
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await engine.run('start')
expect(isExecutionCancelled as Mock).toHaveBeenCalled()
})
it('should stop execution when Redis reports cancellation', async () => {
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
let checkCount = 0
;(isExecutionCancelled as Mock).mockImplementation(async () => {
checkCount++
return checkCount > 1
})
const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
for (let i = 0; i < nodes.length - 1; i++) {
nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
}
const dag = createMockDAG(nodes)
const context = createMockContext()
const edgeManager = createMockEdgeManager((node) => {
const idx = Number.parseInt(node.id.replace('node', ''))
if (idx < 4) return [`node${idx + 1}`]
return []
})
const nodeOrchestrator = createMockNodeOrchestrator(150)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('node0')
expect(result.success).toBe(false)
expect(result.status).toBe('cancelled')
})
it('should respect cancellation check interval', async () => {
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
;(isExecutionCancelled as Mock).mockResolvedValue(false)
const startNode = createMockNode('start', 'starter')
const dag = createMockDAG([startNode])
const context = createMockContext()
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await engine.run('start')
expect((isExecutionCancelled as Mock).mock.calls.length).toBeGreaterThanOrEqual(1)
})
})
describe('Loop execution with cancellation', () => {
it('should break out of loop when cancelled mid-iteration', async () => {
const abortController = new AbortController()
const loopStartNode = createMockNode('loop-start', 'loop_sentinel')
loopStartNode.metadata = { isSentinel: true, sentinelType: 'start', loopId: 'loop1' }
const loopBodyNode = createMockNode('loop-body', 'function')
loopBodyNode.metadata = { isLoopNode: true, loopId: 'loop1' }
const loopEndNode = createMockNode('loop-end', 'loop_sentinel')
loopEndNode.metadata = { isSentinel: true, sentinelType: 'end', loopId: 'loop1' }
loopStartNode.outgoingEdges.set('edge1', { target: 'loop-body' })
loopBodyNode.outgoingEdges.set('edge2', { target: 'loop-end' })
loopEndNode.outgoingEdges.set('loop_continue', {
target: 'loop-start',
sourceHandle: 'loop_continue',
})
const dag = createMockDAG([loopStartNode, loopBodyNode, loopEndNode])
const context = createMockContext({ abortSignal: abortController.signal })
let iterationCount = 0
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'loop-start') return ['loop-body']
if (node.id === 'loop-body') return ['loop-end']
if (node.id === 'loop-end') {
iterationCount++
if (iterationCount === 3) abortController.abort()
return ['loop-start']
}
return []
})
const nodeOrchestrator = createMockNodeOrchestrator(5)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('loop-start')
expect(result.status).toBe('cancelled')
expect(iterationCount).toBeLessThan(100)
})
})
describe('Parallel execution with cancellation', () => {
it('should stop queueing parallel branches when cancelled', async () => {
const abortController = new AbortController()
const startNode = createMockNode('start', 'starter')
const parallelNodes = Array.from({ length: 10 }, (_, i) =>
createMockNode(`parallel${i}`, 'function')
)
parallelNodes.forEach((_, i) => {
startNode.outgoingEdges.set(`edge${i}`, { target: `parallel${i}` })
})
const dag = createMockDAG([startNode, ...parallelNodes])
const context = createMockContext({ abortSignal: abortController.signal })
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') {
return parallelNodes.map((_, i) => `parallel${i}`)
}
return []
})
const nodeOrchestrator = createMockNodeOrchestrator(50)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const executionPromise = engine.run('start')
setTimeout(() => abortController.abort(), 30)
const result = await executionPromise
expect(result.status).toBe('cancelled')
expect(nodeOrchestrator.executionCount).toBeLessThan(11)
})
it('should not wait for all parallel branches when cancelled', async () => {
const abortController = new AbortController()
const startNode = createMockNode('start', 'starter')
const slowNodes = Array.from({ length: 5 }, (_, i) => createMockNode(`slow${i}`, 'function'))
slowNodes.forEach((_, i) => {
startNode.outgoingEdges.set(`edge${i}`, { target: `slow${i}` })
})
const dag = createMockDAG([startNode, ...slowNodes])
const context = createMockContext({ abortSignal: abortController.signal })
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return slowNodes.map((_, i) => `slow${i}`)
return []
})
const nodeOrchestrator = createMockNodeOrchestrator(200)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const executionPromise = engine.run('start')
setTimeout(() => abortController.abort(), 50)
const startTime = Date.now()
const result = await executionPromise
const duration = Date.now() - startTime
expect(result.status).toBe('cancelled')
expect(duration).toBeLessThan(500)
})
})
describe('Edge cases', () => {
it('should handle empty DAG gracefully', async () => {
const dag = createMockDAG([])
const context = createMockContext()
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run()
expect(result.success).toBe(true)
expect(nodeOrchestrator.executionCount).toBe(0)
})
it('should preserve partial output when cancelled', async () => {
const abortController = new AbortController()
const startNode = createMockNode('start', 'starter')
const endNode = createMockNode('end', 'function')
endNode.outgoingEdges = new Map()
startNode.outgoingEdges.set('edge1', { target: 'end' })
const dag = createMockDAG([startNode, endNode])
const context = createMockContext({ abortSignal: abortController.signal })
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'start') return ['end']
return []
})
const nodeOrchestrator = {
executionCount: 0,
executeNode: vi.fn().mockImplementation(async (_ctx: ExecutionContext, nodeId: string) => {
if (nodeId === 'start') {
return { nodeId: 'start', output: { startData: 'value' }, isFinalOutput: false }
}
abortController.abort()
return { nodeId: 'end', output: { endData: 'value' }, isFinalOutput: true }
}),
handleNodeCompletion: vi.fn(),
} as unknown as MockNodeOrchestrator
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('start')
expect(result.status).toBe('cancelled')
expect(result.output).toBeDefined()
})
it('should populate metadata on cancellation', async () => {
const abortController = new AbortController()
abortController.abort()
const startNode = createMockNode('start', 'starter')
const dag = createMockDAG([startNode])
const context = createMockContext({ abortSignal: abortController.signal })
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('start')
expect(result.metadata).toBeDefined()
expect(result.metadata.endTime).toBeDefined()
expect(result.metadata.duration).toBeDefined()
})
it('should return logs even when cancelled', async () => {
const abortController = new AbortController()
const startNode = createMockNode('start', 'starter')
const dag = createMockDAG([startNode])
const context = createMockContext({ abortSignal: abortController.signal })
context.blockLogs.push({
blockId: 'test',
blockName: 'Test',
blockType: 'test',
startedAt: '',
endedAt: '',
durationMs: 0,
success: true,
})
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()
abortController.abort()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('start')
expect(result.logs).toBeDefined()
expect(result.logs.length).toBeGreaterThan(0)
})
})
describe('Cancellation flag behavior', () => {
it('should set cancelledFlag when abort signal fires', async () => {
const abortController = new AbortController()
const nodes = Array.from({ length: 3 }, (_, i) => createMockNode(`node${i}`, 'function'))
for (let i = 0; i < nodes.length - 1; i++) {
nodes[i].outgoingEdges.set(`e${i}`, { target: `node${i + 1}` })
}
const dag = createMockDAG(nodes)
const context = createMockContext({ abortSignal: abortController.signal })
const edgeManager = createMockEdgeManager((node) => {
if (node.id === 'node0') {
abortController.abort()
return ['node1']
}
return node.id === 'node1' ? ['node2'] : []
})
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
const result = await engine.run('node0')
expect(result.status).toBe('cancelled')
})
it('should cache Redis cancellation result', async () => {
;(isRedisCancellationEnabled as Mock).mockReturnValue(true)
;(isExecutionCancelled as Mock).mockResolvedValue(true)
const nodes = Array.from({ length: 5 }, (_, i) => createMockNode(`node${i}`, 'function'))
const dag = createMockDAG(nodes)
const context = createMockContext()
const edgeManager = createMockEdgeManager()
const nodeOrchestrator = createMockNodeOrchestrator()
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await engine.run('node0')
expect((isExecutionCancelled as Mock).mock.calls.length).toBeLessThanOrEqual(3)
})
})
})

View File

@@ -28,8 +28,6 @@ export class ExecutionEngine {
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
private abortPromise: Promise<void> | null = null
private abortResolve: (() => void) | null = null
constructor(
private context: ExecutionContext,
@@ -39,34 +37,6 @@ export class ExecutionEngine {
) {
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
this.initializeAbortHandler()
}
/**
* Sets up a single abort promise that can be reused throughout execution.
* This avoids creating multiple event listeners and potential memory leaks.
*/
private initializeAbortHandler(): void {
if (!this.context.abortSignal) return
if (this.context.abortSignal.aborted) {
this.cancelledFlag = true
this.abortPromise = Promise.resolve()
return
}
this.abortPromise = new Promise<void>((resolve) => {
this.abortResolve = resolve
})
this.context.abortSignal.addEventListener(
'abort',
() => {
this.cancelledFlag = true
this.abortResolve?.()
},
{ once: true }
)
}
private async checkCancellation(): Promise<boolean> {
@@ -103,15 +73,12 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
if (await this.checkCancellation()) {
if ((await this.checkCancellation()) && this.executing.size === 0) {
break
}
await this.processQueue()
}
if (!this.cancelledFlag) {
await this.waitForAllExecutions()
}
await this.waitForAllExecutions()
if (this.pausedBlocks.size > 0) {
return this.buildPausedResult(startTime)
@@ -197,7 +164,11 @@ export class ExecutionEngine {
private trackExecution(promise: Promise<void>): void {
this.executing.add(promise)
promise.catch(() => {})
// Attach error handler to prevent unhandled rejection warnings
// The actual error handling happens in waitForAllExecutions/waitForAnyExecution
promise.catch(() => {
// Error will be properly handled by Promise.all/Promise.race in wait methods
})
promise.finally(() => {
this.executing.delete(promise)
})
@@ -205,30 +176,12 @@ export class ExecutionEngine {
private async waitForAnyExecution(): Promise<void> {
if (this.executing.size > 0) {
const abortPromise = this.getAbortPromise()
if (abortPromise) {
await Promise.race([...this.executing, abortPromise])
} else {
await Promise.race(this.executing)
}
await Promise.race(this.executing)
}
}
private async waitForAllExecutions(): Promise<void> {
const abortPromise = this.getAbortPromise()
if (abortPromise) {
await Promise.race([Promise.all(this.executing), abortPromise])
} else {
await Promise.all(this.executing)
}
}
/**
* Returns the cached abort promise. This is safe to call multiple times
* as it reuses the same promise instance created during initialization.
*/
private getAbortPromise(): Promise<void> | null {
return this.abortPromise
await Promise.all(Array.from(this.executing))
}
private async withQueueLock<T>(fn: () => Promise<T> | T): Promise<T> {
@@ -324,7 +277,7 @@ export class ExecutionEngine {
this.trackExecution(promise)
}
if (this.executing.size > 0 && !this.cancelledFlag) {
if (this.executing.size > 0) {
await this.waitForAnyExecution()
}
}
@@ -383,6 +336,7 @@ export class ExecutionEngine {
this.addMultipleToQueue(readyNodes)
// Check for dynamically added nodes (e.g., from parallel expansion)
if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) {
const dynamicNodes = this.context.pendingDynamicNodes
this.context.pendingDynamicNodes = []

View File

@@ -377,7 +377,10 @@ function buildManualTriggerOutput(
return mergeFilesIntoOutput(output, workflowInput)
}
function buildIntegrationTriggerOutput(workflowInput: unknown): NormalizedBlockOutput {
function buildIntegrationTriggerOutput(
_finalInput: unknown,
workflowInput: unknown
): NormalizedBlockOutput {
return isPlainObject(workflowInput) ? (workflowInput as NormalizedBlockOutput) : {}
}
@@ -427,7 +430,7 @@ export function buildStartBlockOutput(options: StartBlockOutputOptions): Normali
return buildManualTriggerOutput(finalInput, workflowInput)
case StartBlockPath.EXTERNAL_TRIGGER:
return buildIntegrationTriggerOutput(workflowInput)
return buildIntegrationTriggerOutput(finalInput, workflowInput)
case StartBlockPath.LEGACY_STARTER:
return buildLegacyStarterOutput(

View File

@@ -897,17 +897,6 @@ export function useCollaborativeWorkflow() {
// Collect all edge IDs to remove
const edgeIdsToRemove = updates.flatMap((u) => u.affectedEdges.map((e) => e.id))
if (edgeIdsToRemove.length > 0) {
const edgeOperationId = crypto.randomUUID()
addToQueue({
id: edgeOperationId,
operation: {
operation: EDGES_OPERATIONS.BATCH_REMOVE_EDGES,
target: OPERATION_TARGETS.EDGES,
payload: { ids: edgeIdsToRemove },
},
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
})
useWorkflowStore.getState().batchRemoveEdges(edgeIdsToRemove)
}

View File

@@ -54,17 +54,6 @@ export interface SimplifiedImapEmail {
}
export interface ImapWebhookPayload {
messageId: string
subject: string
from: string
to: string
cc: string
date: string | null
bodyText: string
bodyHtml: string
mailbox: string
hasAttachments: boolean
attachments: ImapAttachment[]
email: SimplifiedImapEmail
timestamp: string
}
@@ -624,17 +613,6 @@ async function processEmails(
}
const payload: ImapWebhookPayload = {
messageId: simplifiedEmail.messageId,
subject: simplifiedEmail.subject,
from: simplifiedEmail.from,
to: simplifiedEmail.to,
cc: simplifiedEmail.cc,
date: simplifiedEmail.date,
bodyText: simplifiedEmail.bodyText,
bodyHtml: simplifiedEmail.bodyHtml,
mailbox: simplifiedEmail.mailbox,
hasAttachments: simplifiedEmail.hasAttachments,
attachments: simplifiedEmail.attachments,
email: simplifiedEmail,
timestamp: new Date().toISOString(),
}

View File

@@ -48,9 +48,6 @@ interface RssFeed {
}
export interface RssWebhookPayload {
title?: string
link?: string
pubDate?: string
item: RssItem
feed: {
title?: string
@@ -352,9 +349,6 @@ async function processRssItems(
`${webhookData.id}:${itemGuid}`,
async () => {
const payload: RssWebhookPayload = {
title: item.title,
link: item.link,
pubDate: item.pubDate,
item: {
title: item.title,
link: item.link,

View File

@@ -686,9 +686,6 @@ export async function formatWebhookInput(
if (foundWebhook.provider === 'rss') {
if (body && typeof body === 'object' && 'item' in body) {
return {
title: body.title,
link: body.link,
pubDate: body.pubDate,
item: body.item,
feed: body.feed,
timestamp: body.timestamp,
@@ -700,17 +697,6 @@ export async function formatWebhookInput(
if (foundWebhook.provider === 'imap') {
if (body && typeof body === 'object' && 'email' in body) {
return {
messageId: body.messageId,
subject: body.subject,
from: body.from,
to: body.to,
cc: body.cc,
date: body.date,
bodyText: body.bodyText,
bodyHtml: body.bodyHtml,
mailbox: body.mailbox,
hasAttachments: body.hasAttachments,
attachments: body.attachments,
email: body.email,
timestamp: body.timestamp,
}

View File

@@ -19,7 +19,6 @@ import {
convertToGeminiFormat,
convertUsageMetadata,
createReadableStreamFromGeminiStream,
ensureStructResponse,
extractFunctionCallPart,
extractTextContent,
mapToThinkingLevel,
@@ -105,7 +104,7 @@ async function executeToolCall(
const duration = toolCallEndTime - toolCallStartTime
const resultContent: Record<string, unknown> = result.success
? ensureStructResponse(result.output)
? (result.output as Record<string, unknown>)
: { error: true, message: result.error || 'Tool execution failed', tool: toolName }
const toolCall: FunctionCallResponse = {

View File

@@ -1,453 +0,0 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import { convertToGeminiFormat, ensureStructResponse } from '@/providers/google/utils'
import type { ProviderRequest } from '@/providers/types'
describe('ensureStructResponse', () => {
describe('should return objects unchanged', () => {
it('should return plain object unchanged', () => {
const input = { key: 'value', nested: { a: 1 } }
const result = ensureStructResponse(input)
expect(result).toBe(input) // Same reference
expect(result).toEqual({ key: 'value', nested: { a: 1 } })
})
it('should return empty object unchanged', () => {
const input = {}
const result = ensureStructResponse(input)
expect(result).toBe(input)
expect(result).toEqual({})
})
})
describe('should wrap primitive values in { value: ... }', () => {
it('should wrap boolean true', () => {
const result = ensureStructResponse(true)
expect(result).toEqual({ value: true })
expect(typeof result).toBe('object')
})
it('should wrap boolean false', () => {
const result = ensureStructResponse(false)
expect(result).toEqual({ value: false })
expect(typeof result).toBe('object')
})
it('should wrap string', () => {
const result = ensureStructResponse('success')
expect(result).toEqual({ value: 'success' })
expect(typeof result).toBe('object')
})
it('should wrap empty string', () => {
const result = ensureStructResponse('')
expect(result).toEqual({ value: '' })
expect(typeof result).toBe('object')
})
it('should wrap number', () => {
const result = ensureStructResponse(42)
expect(result).toEqual({ value: 42 })
expect(typeof result).toBe('object')
})
it('should wrap zero', () => {
const result = ensureStructResponse(0)
expect(result).toEqual({ value: 0 })
expect(typeof result).toBe('object')
})
it('should wrap null', () => {
const result = ensureStructResponse(null)
expect(result).toEqual({ value: null })
expect(typeof result).toBe('object')
})
it('should wrap undefined', () => {
const result = ensureStructResponse(undefined)
expect(result).toEqual({ value: undefined })
expect(typeof result).toBe('object')
})
})
describe('should wrap arrays in { value: ... }', () => {
it('should wrap array of strings', () => {
const result = ensureStructResponse(['a', 'b', 'c'])
expect(result).toEqual({ value: ['a', 'b', 'c'] })
expect(typeof result).toBe('object')
expect(Array.isArray(result)).toBe(false)
})
it('should wrap array of objects', () => {
const result = ensureStructResponse([{ id: 1 }, { id: 2 }])
expect(result).toEqual({ value: [{ id: 1 }, { id: 2 }] })
expect(typeof result).toBe('object')
expect(Array.isArray(result)).toBe(false)
})
it('should wrap empty array', () => {
const result = ensureStructResponse([])
expect(result).toEqual({ value: [] })
expect(typeof result).toBe('object')
expect(Array.isArray(result)).toBe(false)
})
})
describe('edge cases', () => {
it('should handle nested objects correctly', () => {
const input = { a: { b: { c: 1 } }, d: [1, 2, 3] }
const result = ensureStructResponse(input)
expect(result).toBe(input) // Same reference, unchanged
})
it('should handle object with array property correctly', () => {
const input = { items: ['a', 'b'], count: 2 }
const result = ensureStructResponse(input)
expect(result).toBe(input) // Same reference, unchanged
})
})
})
describe('convertToGeminiFormat', () => {
describe('tool message handling', () => {
it('should convert tool message with object response correctly', () => {
const request: ProviderRequest = {
model: 'gemini-2.5-flash',
messages: [
{ role: 'user', content: 'Hello' },
{
role: 'assistant',
content: '',
tool_calls: [
{
id: 'call_123',
type: 'function',
function: { name: 'get_weather', arguments: '{"city": "London"}' },
},
],
},
{
role: 'tool',
name: 'get_weather',
tool_call_id: 'call_123',
content: '{"temperature": 20, "condition": "sunny"}',
},
],
}
const result = convertToGeminiFormat(request)
const toolResponseContent = result.contents.find(
(c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
)
expect(toolResponseContent).toBeDefined()
const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
?.functionResponse as { response?: unknown }
expect(functionResponse?.response).toEqual({ temperature: 20, condition: 'sunny' })
expect(typeof functionResponse?.response).toBe('object')
})
it('should wrap boolean true response in an object for Gemini compatibility', () => {
const request: ProviderRequest = {
model: 'gemini-2.5-flash',
messages: [
{ role: 'user', content: 'Check if user exists' },
{
role: 'assistant',
content: '',
tool_calls: [
{
id: 'call_456',
type: 'function',
function: { name: 'user_exists', arguments: '{"userId": "123"}' },
},
],
},
{
role: 'tool',
name: 'user_exists',
tool_call_id: 'call_456',
content: 'true', // Boolean true as JSON string
},
],
}
const result = convertToGeminiFormat(request)
const toolResponseContent = result.contents.find(
(c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
)
expect(toolResponseContent).toBeDefined()
const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
?.functionResponse as { response?: unknown }
expect(typeof functionResponse?.response).toBe('object')
expect(functionResponse?.response).not.toBe(true)
expect(functionResponse?.response).toEqual({ value: true })
})
it('should wrap boolean false response in an object for Gemini compatibility', () => {
const request: ProviderRequest = {
model: 'gemini-2.5-flash',
messages: [
{ role: 'user', content: 'Check if user exists' },
{
role: 'assistant',
content: '',
tool_calls: [
{
id: 'call_789',
type: 'function',
function: { name: 'user_exists', arguments: '{"userId": "999"}' },
},
],
},
{
role: 'tool',
name: 'user_exists',
tool_call_id: 'call_789',
content: 'false', // Boolean false as JSON string
},
],
}
const result = convertToGeminiFormat(request)
const toolResponseContent = result.contents.find(
(c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
)
const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
?.functionResponse as { response?: unknown }
expect(typeof functionResponse?.response).toBe('object')
expect(functionResponse?.response).toEqual({ value: false })
})
it('should wrap string response in an object for Gemini compatibility', () => {
const request: ProviderRequest = {
model: 'gemini-2.5-flash',
messages: [
{ role: 'user', content: 'Get status' },
{
role: 'assistant',
content: '',
tool_calls: [
{
id: 'call_str',
type: 'function',
function: { name: 'get_status', arguments: '{}' },
},
],
},
{
role: 'tool',
name: 'get_status',
tool_call_id: 'call_str',
content: '"success"', // String as JSON
},
],
}
const result = convertToGeminiFormat(request)
const toolResponseContent = result.contents.find(
(c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
)
const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
?.functionResponse as { response?: unknown }
expect(typeof functionResponse?.response).toBe('object')
expect(functionResponse?.response).toEqual({ value: 'success' })
})
it('should wrap number response in an object for Gemini compatibility', () => {
const request: ProviderRequest = {
model: 'gemini-2.5-flash',
messages: [
{ role: 'user', content: 'Get count' },
{
role: 'assistant',
content: '',
tool_calls: [
{
id: 'call_num',
type: 'function',
function: { name: 'get_count', arguments: '{}' },
},
],
},
{
role: 'tool',
name: 'get_count',
tool_call_id: 'call_num',
content: '42', // Number as JSON
},
],
}
const result = convertToGeminiFormat(request)
const toolResponseContent = result.contents.find(
(c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
)
const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
?.functionResponse as { response?: unknown }
expect(typeof functionResponse?.response).toBe('object')
expect(functionResponse?.response).toEqual({ value: 42 })
})
it('should wrap null response in an object for Gemini compatibility', () => {
const request: ProviderRequest = {
model: 'gemini-2.5-flash',
messages: [
{ role: 'user', content: 'Get data' },
{
role: 'assistant',
content: '',
tool_calls: [
{
id: 'call_null',
type: 'function',
function: { name: 'get_data', arguments: '{}' },
},
],
},
{
role: 'tool',
name: 'get_data',
tool_call_id: 'call_null',
content: 'null', // null as JSON
},
],
}
const result = convertToGeminiFormat(request)
const toolResponseContent = result.contents.find(
(c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
)
const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
?.functionResponse as { response?: unknown }
expect(typeof functionResponse?.response).toBe('object')
expect(functionResponse?.response).toEqual({ value: null })
})
it('should keep array response as-is since arrays are valid Struct values', () => {
const request: ProviderRequest = {
model: 'gemini-2.5-flash',
messages: [
{ role: 'user', content: 'Get items' },
{
role: 'assistant',
content: '',
tool_calls: [
{
id: 'call_arr',
type: 'function',
function: { name: 'get_items', arguments: '{}' },
},
],
},
{
role: 'tool',
name: 'get_items',
tool_call_id: 'call_arr',
content: '["item1", "item2"]', // Array as JSON
},
],
}
const result = convertToGeminiFormat(request)
const toolResponseContent = result.contents.find(
(c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
)
const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
?.functionResponse as { response?: unknown }
expect(typeof functionResponse?.response).toBe('object')
expect(functionResponse?.response).toEqual({ value: ['item1', 'item2'] })
})
it('should handle invalid JSON by wrapping in output object', () => {
const request: ProviderRequest = {
model: 'gemini-2.5-flash',
messages: [
{ role: 'user', content: 'Get data' },
{
role: 'assistant',
content: '',
tool_calls: [
{
id: 'call_invalid',
type: 'function',
function: { name: 'get_data', arguments: '{}' },
},
],
},
{
role: 'tool',
name: 'get_data',
tool_call_id: 'call_invalid',
content: 'not valid json {',
},
],
}
const result = convertToGeminiFormat(request)
const toolResponseContent = result.contents.find(
(c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
)
const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
?.functionResponse as { response?: unknown }
expect(typeof functionResponse?.response).toBe('object')
expect(functionResponse?.response).toEqual({ output: 'not valid json {' })
})
it('should handle empty content by wrapping in output object', () => {
const request: ProviderRequest = {
model: 'gemini-2.5-flash',
messages: [
{ role: 'user', content: 'Do something' },
{
role: 'assistant',
content: '',
tool_calls: [
{
id: 'call_empty',
type: 'function',
function: { name: 'do_action', arguments: '{}' },
},
],
},
{
role: 'tool',
name: 'do_action',
tool_call_id: 'call_empty',
content: '', // Empty content - falls back to default '{}'
},
],
}
const result = convertToGeminiFormat(request)
const toolResponseContent = result.contents.find(
(c) => c.parts?.[0] && 'functionResponse' in c.parts[0]
)
const functionResponse = (toolResponseContent?.parts?.[0] as { functionResponse?: unknown })
?.functionResponse as { response?: unknown }
expect(typeof functionResponse?.response).toBe('object')
// Empty string is not valid JSON, so it falls back to { output: "" }
expect(functionResponse?.response).toEqual({ output: '' })
})
})
})

View File

@@ -18,22 +18,6 @@ import { trackForcedToolUsage } from '@/providers/utils'
const logger = createLogger('GoogleUtils')
/**
* Ensures a value is a valid object for Gemini's functionResponse.response field.
* Gemini's API requires functionResponse.response to be a google.protobuf.Struct,
* which must be an object with string keys. Primitive values (boolean, string,
* number, null) and arrays are wrapped in { value: ... }.
*
* @param value - The value to ensure is a Struct-compatible object
* @returns A Record<string, unknown> suitable for functionResponse.response
*/
export function ensureStructResponse(value: unknown): Record<string, unknown> {
if (typeof value === 'object' && value !== null && !Array.isArray(value)) {
return value as Record<string, unknown>
}
return { value }
}
/**
* Usage metadata for Google Gemini responses
*/
@@ -196,8 +180,7 @@ export function convertToGeminiFormat(request: ProviderRequest): {
}
let responseData: Record<string, unknown>
try {
const parsed = JSON.parse(message.content ?? '{}')
responseData = ensureStructResponse(parsed)
responseData = JSON.parse(message.content ?? '{}')
} catch {
responseData = { output: message.content }
}

View File

@@ -337,11 +337,10 @@ async function handleBlockOperationTx(
const currentData = currentBlock?.data || {}
// Update data with parentId and extent
const { parentId: _removedParentId, extent: _removedExtent, ...restData } = currentData
const updatedData = isRemovingFromParent
? restData
? {} // Clear data entirely when removing from parent
: {
...restData,
...currentData,
...(payload.parentId ? { parentId: payload.parentId } : {}),
...(payload.extent ? { extent: payload.extent } : {}),
}
@@ -829,11 +828,10 @@ async function handleBlocksOperationTx(
const currentData = currentBlock?.data || {}
const { parentId: _removedParentId, extent: _removedExtent, ...restData } = currentData
const updatedData = isRemovingFromParent
? restData
? {}
: {
...restData,
...currentData,
...(parentId ? { parentId, extent: 'parent' } : {}),
}

View File

@@ -1,214 +1,11 @@
import { createLogger } from '@sim/logger'
import type { BrowserUseRunTaskParams, BrowserUseRunTaskResponse } from '@/tools/browser_use/types'
import type { ToolConfig, ToolResponse } from '@/tools/types'
import type { ToolConfig } from '@/tools/types'
const logger = createLogger('BrowserUseTool')
const POLL_INTERVAL_MS = 5000
const MAX_POLL_TIME_MS = 180000
const MAX_CONSECUTIVE_ERRORS = 3
async function createSessionWithProfile(
profileId: string,
apiKey: string
): Promise<{ sessionId: string } | { error: string }> {
try {
const response = await fetch('https://api.browser-use.com/api/v2/sessions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Browser-Use-API-Key': apiKey,
},
body: JSON.stringify({
profileId: profileId.trim(),
}),
})
if (!response.ok) {
const errorText = await response.text()
logger.error(`Failed to create session with profile: ${errorText}`)
return { error: `Failed to create session with profile: ${response.statusText}` }
}
const data = (await response.json()) as { id: string }
logger.info(`Created session ${data.id} with profile ${profileId}`)
return { sessionId: data.id }
} catch (error: any) {
logger.error('Error creating session with profile:', error)
return { error: `Error creating session: ${error.message}` }
}
}
async function stopSession(sessionId: string, apiKey: string): Promise<void> {
try {
const response = await fetch(`https://api.browser-use.com/api/v2/sessions/${sessionId}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'X-Browser-Use-API-Key': apiKey,
},
body: JSON.stringify({ action: 'stop' }),
})
if (response.ok) {
logger.info(`Stopped session ${sessionId}`)
} else {
logger.warn(`Failed to stop session ${sessionId}: ${response.statusText}`)
}
} catch (error: any) {
logger.warn(`Error stopping session ${sessionId}:`, error)
}
}
function buildRequestBody(
params: BrowserUseRunTaskParams,
sessionId?: string
): Record<string, any> {
const requestBody: Record<string, any> = {
task: params.task,
}
if (sessionId) {
requestBody.sessionId = sessionId
logger.info(`Using session ${sessionId} for task`)
}
if (params.variables) {
let secrets: Record<string, string> = {}
if (Array.isArray(params.variables)) {
logger.info('Converting variables array to dictionary format')
params.variables.forEach((row: any) => {
if (row.cells?.Key && row.cells.Value !== undefined) {
secrets[row.cells.Key] = row.cells.Value
logger.info(`Added secret for key: ${row.cells.Key}`)
} else if (row.Key && row.Value !== undefined) {
secrets[row.Key] = row.Value
logger.info(`Added secret for key: ${row.Key}`)
}
})
} else if (typeof params.variables === 'object' && params.variables !== null) {
logger.info('Using variables object directly')
secrets = params.variables
}
if (Object.keys(secrets).length > 0) {
logger.info(`Found ${Object.keys(secrets).length} secrets to include`)
requestBody.secrets = secrets
} else {
logger.warn('No usable secrets found in variables')
}
}
if (params.model) {
requestBody.llm_model = params.model
}
if (params.save_browser_data) {
requestBody.save_browser_data = params.save_browser_data
}
requestBody.use_adblock = true
requestBody.highlight_elements = true
return requestBody
}
async function fetchTaskStatus(
taskId: string,
apiKey: string
): Promise<{ ok: true; data: any } | { ok: false; error: string }> {
try {
const response = await fetch(`https://api.browser-use.com/api/v2/tasks/${taskId}`, {
method: 'GET',
headers: {
'X-Browser-Use-API-Key': apiKey,
},
})
if (!response.ok) {
return { ok: false, error: `HTTP ${response.status}: ${response.statusText}` }
}
const data = await response.json()
return { ok: true, data }
} catch (error: any) {
return { ok: false, error: error.message || 'Network error' }
}
}
async function pollForCompletion(
taskId: string,
apiKey: string
): Promise<{ success: boolean; output: any; steps: any[]; error?: string }> {
let liveUrlLogged = false
let consecutiveErrors = 0
const startTime = Date.now()
while (Date.now() - startTime < MAX_POLL_TIME_MS) {
const result = await fetchTaskStatus(taskId, apiKey)
if (!result.ok) {
consecutiveErrors++
logger.warn(
`Error polling task ${taskId} (attempt ${consecutiveErrors}/${MAX_CONSECUTIVE_ERRORS}): ${result.error}`
)
if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) {
logger.error(`Max consecutive errors reached for task ${taskId}`)
return {
success: false,
output: null,
steps: [],
error: `Failed to poll task status after ${MAX_CONSECUTIVE_ERRORS} attempts: ${result.error}`,
}
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
continue
}
consecutiveErrors = 0
const taskData = result.data
const status = taskData.status
logger.info(`BrowserUse task ${taskId} status: ${status}`)
if (['finished', 'failed', 'stopped'].includes(status)) {
return {
success: status === 'finished',
output: taskData.output ?? null,
steps: taskData.steps || [],
}
}
if (!liveUrlLogged && taskData.live_url) {
logger.info(`BrowserUse task ${taskId} live URL: ${taskData.live_url}`)
liveUrlLogged = true
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
}
const finalResult = await fetchTaskStatus(taskId, apiKey)
if (finalResult.ok && ['finished', 'failed', 'stopped'].includes(finalResult.data.status)) {
return {
success: finalResult.data.status === 'finished',
output: finalResult.data.output ?? null,
steps: finalResult.data.steps || [],
}
}
logger.warn(
`Task ${taskId} did not complete within the maximum polling time (${MAX_POLL_TIME_MS / 1000}s)`
)
return {
success: false,
output: null,
steps: [],
error: `Task did not complete within the maximum polling time (${MAX_POLL_TIME_MS / 1000}s)`,
}
}
const POLL_INTERVAL_MS = 5000 // 5 seconds between polls
const MAX_POLL_TIME_MS = 180000 // 3 minutes maximum polling time
export const runTaskTool: ToolConfig<BrowserUseRunTaskParams, BrowserUseRunTaskResponse> = {
id: 'browser_use_run_task',
@@ -247,14 +44,7 @@ export const runTaskTool: ToolConfig<BrowserUseRunTaskParams, BrowserUseRunTaskR
visibility: 'user-only',
description: 'API key for BrowserUse API',
},
profile_id: {
type: 'string',
required: false,
visibility: 'user-only',
description: 'Browser profile ID for persistent sessions (cookies, login state)',
},
},
request: {
url: 'https://api.browser-use.com/api/v2/tasks',
method: 'POST',
@@ -262,94 +52,155 @@ export const runTaskTool: ToolConfig<BrowserUseRunTaskParams, BrowserUseRunTaskR
'Content-Type': 'application/json',
'X-Browser-Use-API-Key': params.apiKey,
}),
body: (params) => {
const requestBody: Record<string, any> = {
task: params.task,
}
if (params.variables) {
let secrets: Record<string, string> = {}
if (Array.isArray(params.variables)) {
logger.info('Converting variables array to dictionary format')
params.variables.forEach((row) => {
if (row.cells?.Key && row.cells.Value !== undefined) {
secrets[row.cells.Key] = row.cells.Value
logger.info(`Added secret for key: ${row.cells.Key}`)
} else if (row.Key && row.Value !== undefined) {
secrets[row.Key] = row.Value
logger.info(`Added secret for key: ${row.Key}`)
}
})
} else if (typeof params.variables === 'object' && params.variables !== null) {
logger.info('Using variables object directly')
secrets = params.variables
}
if (Object.keys(secrets).length > 0) {
logger.info(`Found ${Object.keys(secrets).length} secrets to include`)
requestBody.secrets = secrets
} else {
logger.warn('No usable secrets found in variables')
}
}
if (params.model) {
requestBody.llm_model = params.model
}
if (params.save_browser_data) {
requestBody.save_browser_data = params.save_browser_data
}
requestBody.use_adblock = true
requestBody.highlight_elements = true
return requestBody
},
},
directExecution: async (params: BrowserUseRunTaskParams): Promise<ToolResponse> => {
let sessionId: string | undefined
transformResponse: async (response: Response) => {
const data = (await response.json()) as { id: string }
return {
success: true,
output: {
id: data.id,
success: true,
output: null,
steps: [],
},
}
},
if (params.profile_id) {
logger.info(`Creating session with profile ID: ${params.profile_id}`)
const sessionResult = await createSessionWithProfile(params.profile_id, params.apiKey)
if ('error' in sessionResult) {
return {
success: false,
output: {
id: null,
success: false,
output: null,
steps: [],
},
error: sessionResult.error,
}
}
sessionId = sessionResult.sessionId
postProcess: async (result, params) => {
if (!result.success) {
return result
}
const requestBody = buildRequestBody(params, sessionId)
logger.info('Creating BrowserUse task', { hasSession: !!sessionId })
const taskId = result.output.id
let liveUrlLogged = false
try {
const response = await fetch('https://api.browser-use.com/api/v2/tasks', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Browser-Use-API-Key': params.apiKey,
},
body: JSON.stringify(requestBody),
})
if (!response.ok) {
const errorText = await response.text()
logger.error(`Failed to create task: ${errorText}`)
return {
success: false,
output: {
id: null,
success: false,
output: null,
steps: [],
const initialTaskResponse = await fetch(
`https://api.browser-use.com/api/v2/tasks/${taskId}`,
{
method: 'GET',
headers: {
'X-Browser-Use-API-Key': params.apiKey,
},
error: `Failed to create task: ${response.statusText}`,
}
)
if (initialTaskResponse.ok) {
const initialTaskData = await initialTaskResponse.json()
if (initialTaskData.live_url) {
logger.info(
`BrowserUse task ${taskId} launched with live URL: ${initialTaskData.live_url}`
)
liveUrlLogged = true
}
}
} catch (error) {
logger.warn(`Failed to get initial task details for ${taskId}:`, error)
}
const data = (await response.json()) as { id: string }
const taskId = data.id
logger.info(`Created BrowserUse task: ${taskId}`)
let elapsedTime = 0
const result = await pollForCompletion(taskId, params.apiKey)
while (elapsedTime < MAX_POLL_TIME_MS) {
try {
const statusResponse = await fetch(`https://api.browser-use.com/api/v2/tasks/${taskId}`, {
method: 'GET',
headers: {
'X-Browser-Use-API-Key': params.apiKey,
},
})
if (sessionId) {
await stopSession(sessionId, params.apiKey)
if (!statusResponse.ok) {
throw new Error(`Failed to get task status: ${statusResponse.statusText}`)
}
const taskData = await statusResponse.json()
const status = taskData.status
logger.info(`BrowserUse task ${taskId} status: ${status}`)
if (['finished', 'failed', 'stopped'].includes(status)) {
result.output = {
id: taskId,
success: status === 'finished',
output: taskData.output ?? null,
steps: taskData.steps || [],
}
return result
}
if (!liveUrlLogged && status === 'running' && taskData.live_url) {
logger.info(`BrowserUse task ${taskId} running with live URL: ${taskData.live_url}`)
liveUrlLogged = true
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
elapsedTime += POLL_INTERVAL_MS
} catch (error: any) {
logger.error('Error polling for task status:', {
message: error.message || 'Unknown error',
taskId,
})
return {
...result,
error: `Error polling for task status: ${error.message || 'Unknown error'}`,
}
}
}
return {
success: result.success && !result.error,
output: {
id: taskId,
success: result.success,
output: result.output,
steps: result.steps,
},
error: result.error,
}
} catch (error: any) {
logger.error('Error creating BrowserUse task:', error)
if (sessionId) {
await stopSession(sessionId, params.apiKey)
}
return {
success: false,
output: {
id: null,
success: false,
output: null,
steps: [],
},
error: `Error creating task: ${error.message}`,
}
logger.warn(
`Task ${taskId} did not complete within the maximum polling time (${MAX_POLL_TIME_MS / 1000}s)`
)
return {
...result,
error: `Task did not complete within the maximum polling time (${MAX_POLL_TIME_MS / 1000}s)`,
}
},

View File

@@ -6,7 +6,6 @@ export interface BrowserUseRunTaskParams {
variables?: Record<string, string>
model?: string
save_browser_data?: boolean
profile_id?: string
}
export interface BrowserUseTaskStep {