feat(agent): messages array, memory (#2023)

* feat(agent): messages array, memory options

* feat(messages-input): re-order messages

* backend for new memory setup, backwards compatibility in loadWorkflowsFromNormalizedTable from old agent block to new format

* added memories all conversation sliding token window, standardized modals

* lint

* fix build

* reorder popover for output selector for chat

* add internal auth, finish memories

* fix rebase

* fix failing test

---------

Co-authored-by: waleed <walif6@gmail.com>
This commit is contained in:
Emir Karabeg
2025-11-18 15:58:10 -08:00
committed by GitHub
parent a8a693f1ff
commit 02d9fedf0c
53 changed files with 11580 additions and 1430 deletions

View File

@@ -1145,11 +1145,13 @@ describe('AgentBlockHandler', () => {
expect(systemMessages[0].content).toBe('You are a helpful assistant.')
})
it('should prioritize explicit systemPrompt over system messages in memories', async () => {
it('should prioritize messages array system message over system messages in memories', async () => {
const inputs = {
model: 'gpt-4o',
systemPrompt: 'You are a helpful assistant.',
userPrompt: 'What should I do?',
messages: [
{ role: 'system' as const, content: 'You are a helpful assistant.' },
{ role: 'user' as const, content: 'What should I do?' },
],
memories: [
{ role: 'system', content: 'Old system message from memories.' },
{ role: 'user', content: 'Hello!' },
@@ -1167,31 +1169,30 @@ describe('AgentBlockHandler', () => {
// Verify messages were built correctly
expect(requestBody.messages).toBeDefined()
expect(requestBody.messages.length).toBe(4) // explicit system + 2 non-system memories + user prompt
expect(requestBody.messages.length).toBe(5) // memory system + 2 non-system memories + 2 from messages array
// Check only one system message exists and it's the explicit one
const systemMessages = requestBody.messages.filter((msg: any) => msg.role === 'system')
expect(systemMessages.length).toBe(1)
expect(systemMessages[0].content).toBe('You are a helpful assistant.')
// Verify the explicit system prompt is first
// All messages should be present (memories first, then messages array)
// Memory messages come first
expect(requestBody.messages[0].role).toBe('system')
expect(requestBody.messages[0].content).toBe('You are a helpful assistant.')
// Verify conversation order is preserved
expect(requestBody.messages[0].content).toBe('Old system message from memories.')
expect(requestBody.messages[1].role).toBe('user')
expect(requestBody.messages[1].content).toBe('Hello!')
expect(requestBody.messages[2].role).toBe('assistant')
expect(requestBody.messages[2].content).toBe('Hi there!')
expect(requestBody.messages[3].role).toBe('user')
expect(requestBody.messages[3].content).toBe('What should I do?')
// Then messages array
expect(requestBody.messages[3].role).toBe('system')
expect(requestBody.messages[3].content).toBe('You are a helpful assistant.')
expect(requestBody.messages[4].role).toBe('user')
expect(requestBody.messages[4].content).toBe('What should I do?')
})
it('should handle multiple system messages in memories with explicit systemPrompt', async () => {
it('should handle multiple system messages in memories with messages array', async () => {
const inputs = {
model: 'gpt-4o',
systemPrompt: 'You are a helpful assistant.',
userPrompt: 'Continue our conversation.',
messages: [
{ role: 'system' as const, content: 'You are a helpful assistant.' },
{ role: 'user' as const, content: 'Continue our conversation.' },
],
memories: [
{ role: 'system', content: 'First system message.' },
{ role: 'user', content: 'Hello!' },
@@ -1211,22 +1212,23 @@ describe('AgentBlockHandler', () => {
// Verify messages were built correctly
expect(requestBody.messages).toBeDefined()
expect(requestBody.messages.length).toBe(4) // explicit system + 2 non-system memories + user prompt
expect(requestBody.messages.length).toBe(7) // 5 memory messages (3 system + 2 conversation) + 2 from messages array
// Check only one system message exists and message order is preserved
const systemMessages = requestBody.messages.filter((msg: any) => msg.role === 'system')
expect(systemMessages.length).toBe(1)
expect(systemMessages[0].content).toBe('You are a helpful assistant.')
// Verify conversation flow is preserved
// All messages should be present in order
expect(requestBody.messages[0].role).toBe('system')
expect(requestBody.messages[0].content).toBe('You are a helpful assistant.')
expect(requestBody.messages[0].content).toBe('First system message.')
expect(requestBody.messages[1].role).toBe('user')
expect(requestBody.messages[1].content).toBe('Hello!')
expect(requestBody.messages[2].role).toBe('assistant')
expect(requestBody.messages[2].content).toBe('Hi there!')
expect(requestBody.messages[3].role).toBe('user')
expect(requestBody.messages[3].content).toBe('Continue our conversation.')
expect(requestBody.messages[2].role).toBe('system')
expect(requestBody.messages[2].content).toBe('Second system message.')
expect(requestBody.messages[3].role).toBe('assistant')
expect(requestBody.messages[3].content).toBe('Hi there!')
expect(requestBody.messages[4].role).toBe('system')
expect(requestBody.messages[4].content).toBe('Third system message.')
expect(requestBody.messages[5].role).toBe('system')
expect(requestBody.messages[5].content).toBe('You are a helpful assistant.')
expect(requestBody.messages[6].role).toBe('user')
expect(requestBody.messages[6].content).toBe('Continue our conversation.')
})
it('should preserve multiple system messages when no explicit systemPrompt is provided', async () => {

View File

@@ -3,6 +3,7 @@ import { createMcpToolId } from '@/lib/mcp/utils'
import { getAllBlocks } from '@/blocks'
import type { BlockOutput } from '@/blocks/types'
import { AGENT, BlockType, DEFAULTS, HTTP } from '@/executor/consts'
import { memoryService } from '@/executor/handlers/agent/memory'
import type {
AgentInputs,
Message,
@@ -39,7 +40,7 @@ export class AgentBlockHandler implements BlockHandler {
const providerId = getProviderFromModel(model)
const formattedTools = await this.formatTools(ctx, inputs.tools || [])
const streamingConfig = this.getStreamingConfig(ctx, block)
const messages = this.buildMessages(inputs)
const messages = await this.buildMessages(ctx, inputs, block.id)
const providerRequest = this.buildProviderRequest({
ctx,
@@ -52,7 +53,18 @@ export class AgentBlockHandler implements BlockHandler {
streaming: streamingConfig.shouldUseStreaming ?? false,
})
return this.executeProviderRequest(ctx, providerRequest, block, responseFormat)
const result = await this.executeProviderRequest(
ctx,
providerRequest,
block,
responseFormat,
inputs
)
// Auto-persist response to memory if configured
await this.persistResponseToMemory(ctx, inputs, result, block.id)
return result
}
private parseResponseFormat(responseFormat?: string | object): any {
@@ -324,25 +336,80 @@ export class AgentBlockHandler implements BlockHandler {
return { shouldUseStreaming, isBlockSelectedForOutput, hasOutgoingConnections }
}
private buildMessages(inputs: AgentInputs): Message[] | undefined {
if (!inputs.memories && !(inputs.systemPrompt && inputs.userPrompt)) {
return undefined
}
private async buildMessages(
ctx: ExecutionContext,
inputs: AgentInputs,
blockId: string
): Promise<Message[] | undefined> {
const messages: Message[] = []
// 1. Fetch memory history if configured (industry standard: chronological order)
if (inputs.memoryType && inputs.memoryType !== 'none') {
const memoryMessages = await memoryService.fetchMemoryMessages(ctx, inputs, blockId)
messages.push(...memoryMessages)
}
// 2. Process legacy memories (backward compatibility - from Memory block)
if (inputs.memories) {
messages.push(...this.processMemories(inputs.memories))
}
if (inputs.systemPrompt) {
// 3. Add messages array (new approach - from messages-input subblock)
if (inputs.messages && Array.isArray(inputs.messages)) {
const validMessages = inputs.messages.filter(
(msg) =>
msg &&
typeof msg === 'object' &&
'role' in msg &&
'content' in msg &&
['system', 'user', 'assistant'].includes(msg.role)
)
messages.push(...validMessages)
}
// Warn if using both new and legacy input formats
if (
inputs.messages &&
inputs.messages.length > 0 &&
(inputs.systemPrompt || inputs.userPrompt)
) {
logger.warn('Agent block using both messages array and legacy prompts', {
hasMessages: true,
hasSystemPrompt: !!inputs.systemPrompt,
hasUserPrompt: !!inputs.userPrompt,
})
}
// 4. Handle legacy systemPrompt (backward compatibility)
// Only add if no system message exists yet
if (inputs.systemPrompt && !messages.some((m) => m.role === 'system')) {
this.addSystemPrompt(messages, inputs.systemPrompt)
}
// 5. Handle legacy userPrompt (backward compatibility)
if (inputs.userPrompt) {
this.addUserPrompt(messages, inputs.userPrompt)
}
// 6. Persist user message(s) to memory if configured
// This ensures conversation history is complete before agent execution
if (inputs.memoryType && inputs.memoryType !== 'none' && messages.length > 0) {
// Find new user messages that need to be persisted
// (messages added via messages array or userPrompt)
const userMessages = messages.filter((m) => m.role === 'user')
const lastUserMessage = userMessages[userMessages.length - 1]
// Only persist if there's a user message AND it's from userPrompt or messages input
// (not from memory history which was already persisted)
if (
lastUserMessage &&
(inputs.userPrompt || (inputs.messages && inputs.messages.length > 0))
) {
await memoryService.persistUserMessage(ctx, inputs, lastUserMessage, blockId)
}
}
// Return messages or undefined if empty (maintains API compatibility)
return messages.length > 0 ? messages : undefined
}
@@ -382,6 +449,10 @@ export class AgentBlockHandler implements BlockHandler {
return messages
}
/**
* Ensures system message is at position 0 (industry standard)
* Preserves existing system message if already at position 0, otherwise adds/moves it
*/
private addSystemPrompt(messages: Message[], systemPrompt: any) {
let content: string
@@ -395,17 +466,31 @@ export class AgentBlockHandler implements BlockHandler {
}
}
const systemMessages = messages.filter((msg) => msg.role === 'system')
// Find first system message
const firstSystemIndex = messages.findIndex((msg) => msg.role === 'system')
if (systemMessages.length > 0) {
messages.splice(0, 0, { role: 'system', content })
for (let i = messages.length - 1; i >= 1; i--) {
if (messages[i].role === 'system') {
messages.splice(i, 1)
}
}
if (firstSystemIndex === -1) {
// No system message exists - add at position 0
messages.unshift({ role: 'system', content })
} else if (firstSystemIndex === 0) {
// System message already at position 0 - replace it
// Explicit systemPrompt parameter takes precedence over memory/messages
messages[0] = { role: 'system', content }
} else {
messages.splice(0, 0, { role: 'system', content })
// System message exists but not at position 0 - move it to position 0
// and update with new content
messages.splice(firstSystemIndex, 1)
messages.unshift({ role: 'system', content })
}
// Remove any additional system messages (keep only the first one)
for (let i = messages.length - 1; i >= 1; i--) {
if (messages[i].role === 'system') {
messages.splice(i, 1)
logger.warn('Removed duplicate system message from conversation history', {
position: i,
})
}
}
}
@@ -484,7 +569,8 @@ export class AgentBlockHandler implements BlockHandler {
ctx: ExecutionContext,
providerRequest: any,
block: SerializedBlock,
responseFormat: any
responseFormat: any,
inputs: AgentInputs
): Promise<BlockOutput | StreamingExecution> {
const providerId = providerRequest.provider
const model = providerRequest.model
@@ -504,7 +590,14 @@ export class AgentBlockHandler implements BlockHandler {
providerStartTime
)
}
return this.executeBrowserSide(ctx, providerRequest, block, responseFormat, providerStartTime)
return this.executeBrowserSide(
ctx,
providerRequest,
block,
responseFormat,
providerStartTime,
inputs
)
} catch (error) {
this.handleExecutionError(error, providerStartTime, providerId, model, ctx, block)
throw error
@@ -554,7 +647,8 @@ export class AgentBlockHandler implements BlockHandler {
providerRequest: any,
block: SerializedBlock,
responseFormat: any,
providerStartTime: number
providerStartTime: number,
inputs: AgentInputs
) {
const url = buildAPIUrl('/api/providers')
const response = await fetch(url.toString(), {
@@ -580,7 +674,7 @@ export class AgentBlockHandler implements BlockHandler {
const contentType = response.headers.get('Content-Type')
if (contentType?.includes(HTTP.CONTENT_TYPE.EVENT_STREAM)) {
return this.handleStreamingResponse(response, block)
return this.handleStreamingResponse(response, block, ctx, inputs)
}
const result = await response.json()
@@ -589,7 +683,9 @@ export class AgentBlockHandler implements BlockHandler {
private async handleStreamingResponse(
response: Response,
block: SerializedBlock
block: SerializedBlock,
ctx?: ExecutionContext,
inputs?: AgentInputs
): Promise<StreamingExecution> {
const executionDataHeader = response.headers.get('X-Execution-Data')
@@ -597,6 +693,20 @@ export class AgentBlockHandler implements BlockHandler {
try {
const executionData = JSON.parse(executionDataHeader)
// If execution data contains full content, persist to memory
if (ctx && inputs && executionData.output?.content) {
const assistantMessage: Message = {
role: 'assistant',
content: executionData.output.content,
}
// Fire and forget - don't await
memoryService
.persistMemoryMessage(ctx, inputs, assistantMessage, block.id)
.catch((error) =>
logger.error('Failed to persist streaming response to memory:', error)
)
}
return {
stream: response.body!,
execution: {
@@ -696,6 +806,49 @@ export class AgentBlockHandler implements BlockHandler {
}
}
private async persistResponseToMemory(
ctx: ExecutionContext,
inputs: AgentInputs,
result: BlockOutput | StreamingExecution,
blockId: string
): Promise<void> {
// Only persist if memoryType is configured
if (!inputs.memoryType || inputs.memoryType === 'none') {
return
}
try {
// Don't persist streaming responses here - they're handled separately
if (this.isStreamingExecution(result)) {
return
}
// Extract content from regular response
const blockOutput = result as any
const content = blockOutput?.content
if (!content || typeof content !== 'string') {
return
}
const assistantMessage: Message = {
role: 'assistant',
content,
}
await memoryService.persistMemoryMessage(ctx, inputs, assistantMessage, blockId)
logger.debug('Persisted assistant response to memory', {
workflowId: ctx.workflowId,
memoryType: inputs.memoryType,
conversationId: inputs.conversationId,
})
} catch (error) {
logger.error('Failed to persist response to memory:', error)
// Don't throw - memory persistence failure shouldn't break workflow execution
}
}
private processProviderResponse(
response: any,
block: SerializedBlock,

View File

@@ -0,0 +1,277 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { Memory } from '@/executor/handlers/agent/memory'
import type { AgentInputs, Message } from '@/executor/handlers/agent/types'
import type { ExecutionContext } from '@/executor/types'
vi.mock('@/lib/logs/console/logger', () => ({
createLogger: () => ({
warn: vi.fn(),
error: vi.fn(),
info: vi.fn(),
debug: vi.fn(),
}),
}))
vi.mock('@/lib/tokenization/estimators', () => ({
getAccurateTokenCount: vi.fn((text: string) => {
return Math.ceil(text.length / 4)
}),
}))
describe('Memory', () => {
let memoryService: Memory
let mockContext: ExecutionContext
beforeEach(() => {
memoryService = new Memory()
mockContext = {
workflowId: 'test-workflow-id',
executionId: 'test-execution-id',
workspaceId: 'test-workspace-id',
} as ExecutionContext
})
describe('applySlidingWindow (message-based)', () => {
it('should keep last N conversation messages', () => {
const messages: Message[] = [
{ role: 'system', content: 'System prompt' },
{ role: 'user', content: 'Message 1' },
{ role: 'assistant', content: 'Response 1' },
{ role: 'user', content: 'Message 2' },
{ role: 'assistant', content: 'Response 2' },
{ role: 'user', content: 'Message 3' },
{ role: 'assistant', content: 'Response 3' },
]
const result = (memoryService as any).applySlidingWindow(messages, '4')
expect(result.length).toBe(5)
expect(result[0].role).toBe('system')
expect(result[0].content).toBe('System prompt')
expect(result[1].content).toBe('Message 2')
expect(result[4].content).toBe('Response 3')
})
it('should preserve only first system message', () => {
const messages: Message[] = [
{ role: 'system', content: 'First system' },
{ role: 'user', content: 'User message' },
{ role: 'system', content: 'Second system' },
{ role: 'assistant', content: 'Assistant message' },
]
const result = (memoryService as any).applySlidingWindow(messages, '10')
const systemMessages = result.filter((m: Message) => m.role === 'system')
expect(systemMessages.length).toBe(1)
expect(systemMessages[0].content).toBe('First system')
})
it('should handle invalid window size', () => {
const messages: Message[] = [{ role: 'user', content: 'Test' }]
const result = (memoryService as any).applySlidingWindow(messages, 'invalid')
expect(result).toEqual(messages)
})
})
describe('applySlidingWindowByTokens (token-based)', () => {
it('should keep messages within token limit', () => {
const messages: Message[] = [
{ role: 'system', content: 'This is a system message' }, // ~6 tokens
{ role: 'user', content: 'Short' }, // ~2 tokens
{ role: 'assistant', content: 'This is a longer response message' }, // ~8 tokens
{ role: 'user', content: 'Another user message here' }, // ~6 tokens
{ role: 'assistant', content: 'Final response' }, // ~3 tokens
]
// Set limit to ~15 tokens - should include last 2-3 messages
const result = (memoryService as any).applySlidingWindowByTokens(messages, '15', 'gpt-4o')
expect(result.length).toBeGreaterThan(0)
expect(result.length).toBeLessThan(messages.length)
// Should include newest messages
expect(result[result.length - 1].content).toBe('Final response')
})
it('should include at least 1 message even if it exceeds limit', () => {
const messages: Message[] = [
{
role: 'user',
content:
'This is a very long message that definitely exceeds our small token limit of just 5 tokens',
},
]
const result = (memoryService as any).applySlidingWindowByTokens(messages, '5', 'gpt-4o')
expect(result.length).toBe(1)
expect(result[0].content).toBe(messages[0].content)
})
it('should preserve first system message and exclude it from token count', () => {
const messages: Message[] = [
{ role: 'system', content: 'A' }, // System message - always preserved
{ role: 'user', content: 'B' }, // ~1 token
{ role: 'assistant', content: 'C' }, // ~1 token
{ role: 'user', content: 'D' }, // ~1 token
]
// Limit to 2 tokens - should fit system message + last 2 conversation messages (D, C)
const result = (memoryService as any).applySlidingWindowByTokens(messages, '2', 'gpt-4o')
// Should have: system message + 2 conversation messages = 3 total
expect(result.length).toBe(3)
expect(result[0].role).toBe('system') // First system message preserved
expect(result[1].content).toBe('C') // Second most recent conversation message
expect(result[2].content).toBe('D') // Most recent conversation message
})
it('should process messages from newest to oldest', () => {
const messages: Message[] = [
{ role: 'user', content: 'Old message' },
{ role: 'assistant', content: 'Old response' },
{ role: 'user', content: 'New message' },
{ role: 'assistant', content: 'New response' },
]
const result = (memoryService as any).applySlidingWindowByTokens(messages, '10', 'gpt-4o')
// Should prioritize newer messages
expect(result[result.length - 1].content).toBe('New response')
})
it('should handle invalid token limit', () => {
const messages: Message[] = [{ role: 'user', content: 'Test' }]
const result = (memoryService as any).applySlidingWindowByTokens(
messages,
'invalid',
'gpt-4o'
)
expect(result).toEqual(messages) // Should return all messages
})
it('should handle zero or negative token limit', () => {
const messages: Message[] = [{ role: 'user', content: 'Test' }]
const result1 = (memoryService as any).applySlidingWindowByTokens(messages, '0', 'gpt-4o')
expect(result1).toEqual(messages)
const result2 = (memoryService as any).applySlidingWindowByTokens(messages, '-5', 'gpt-4o')
expect(result2).toEqual(messages)
})
it('should work with different model names', () => {
const messages: Message[] = [{ role: 'user', content: 'Test message' }]
const result1 = (memoryService as any).applySlidingWindowByTokens(messages, '100', 'gpt-4o')
expect(result1.length).toBe(1)
const result2 = (memoryService as any).applySlidingWindowByTokens(
messages,
'100',
'claude-3-5-sonnet-20241022'
)
expect(result2.length).toBe(1)
const result3 = (memoryService as any).applySlidingWindowByTokens(messages, '100', undefined)
expect(result3.length).toBe(1)
})
it('should handle empty messages array', () => {
const messages: Message[] = []
const result = (memoryService as any).applySlidingWindowByTokens(messages, '100', 'gpt-4o')
expect(result).toEqual([])
})
})
describe('buildMemoryKey', () => {
it('should build correct key with conversationId:blockId format', () => {
const inputs: AgentInputs = {
memoryType: 'conversation',
conversationId: 'emir',
}
const key = (memoryService as any).buildMemoryKey(mockContext, inputs, 'test-block-id')
expect(key).toBe('emir:test-block-id')
})
it('should use same key format regardless of memory type', () => {
const conversationId = 'user-123'
const blockId = 'block-abc'
const conversationKey = (memoryService as any).buildMemoryKey(
mockContext,
{ memoryType: 'conversation', conversationId },
blockId
)
const slidingWindowKey = (memoryService as any).buildMemoryKey(
mockContext,
{ memoryType: 'sliding_window', conversationId },
blockId
)
const slidingTokensKey = (memoryService as any).buildMemoryKey(
mockContext,
{ memoryType: 'sliding_window_tokens', conversationId },
blockId
)
// All should produce the same key - memory type only affects processing
expect(conversationKey).toBe('user-123:block-abc')
expect(slidingWindowKey).toBe('user-123:block-abc')
expect(slidingTokensKey).toBe('user-123:block-abc')
})
it('should throw error for missing conversationId', () => {
const inputs: AgentInputs = {
memoryType: 'conversation',
// conversationId missing
}
expect(() => {
;(memoryService as any).buildMemoryKey(mockContext, inputs, 'test-block-id')
}).toThrow('Conversation ID is required for all memory types')
})
it('should throw error for empty conversationId', () => {
const inputs: AgentInputs = {
memoryType: 'conversation',
conversationId: ' ', // Only whitespace
}
expect(() => {
;(memoryService as any).buildMemoryKey(mockContext, inputs, 'test-block-id')
}).toThrow('Conversation ID is required for all memory types')
})
})
describe('Token-based vs Message-based comparison', () => {
it('should produce different results for same message count limit', () => {
const messages: Message[] = [
{ role: 'user', content: 'A' }, // Short message (~1 token)
{
role: 'assistant',
content: 'This is a much longer response that takes many more tokens',
}, // Long message (~15 tokens)
{ role: 'user', content: 'B' }, // Short message (~1 token)
]
// Message-based: last 2 messages
const messageResult = (memoryService as any).applySlidingWindow(messages, '2')
expect(messageResult.length).toBe(2)
// Token-based: with limit of 10 tokens, might fit all 3 messages or just last 2
const tokenResult = (memoryService as any).applySlidingWindowByTokens(
messages,
'10',
'gpt-4o'
)
// The long message should affect what fits
expect(tokenResult.length).toBeGreaterThanOrEqual(1)
})
})
})

View File

@@ -0,0 +1,663 @@
import { createLogger } from '@/lib/logs/console/logger'
import { getAccurateTokenCount } from '@/lib/tokenization/estimators'
import type { AgentInputs, Message } from '@/executor/handlers/agent/types'
import type { ExecutionContext } from '@/executor/types'
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
import { stringifyJSON } from '@/executor/utils/json'
import { PROVIDER_DEFINITIONS } from '@/providers/models'
const logger = createLogger('Memory')
/**
* Class for managing agent conversation memory
* Handles fetching and persisting messages to the memory table
*/
export class Memory {
/**
* Fetch messages from memory based on memoryType configuration
*/
async fetchMemoryMessages(
ctx: ExecutionContext,
inputs: AgentInputs,
blockId: string
): Promise<Message[]> {
if (!inputs.memoryType || inputs.memoryType === 'none') {
return []
}
if (!ctx.workflowId) {
logger.warn('Cannot fetch memory without workflowId')
return []
}
try {
this.validateInputs(inputs.conversationId)
const memoryKey = this.buildMemoryKey(ctx, inputs, blockId)
let messages = await this.fetchFromMemoryAPI(ctx.workflowId, memoryKey)
switch (inputs.memoryType) {
case 'conversation':
messages = this.applyContextWindowLimit(messages, inputs.model)
break
case 'sliding_window': {
// Default to 10 messages if not specified (matches agent block default)
const windowSize = inputs.slidingWindowSize || '10'
messages = this.applySlidingWindow(messages, windowSize)
break
}
case 'sliding_window_tokens': {
// Default to 4000 tokens if not specified (matches agent block default)
const maxTokens = inputs.slidingWindowTokens || '4000'
messages = this.applySlidingWindowByTokens(messages, maxTokens, inputs.model)
break
}
}
return messages
} catch (error) {
logger.error('Failed to fetch memory messages:', error)
return []
}
}
/**
* Persist assistant response to memory
* Uses atomic append operations to prevent race conditions
*/
async persistMemoryMessage(
ctx: ExecutionContext,
inputs: AgentInputs,
assistantMessage: Message,
blockId: string
): Promise<void> {
if (!inputs.memoryType || inputs.memoryType === 'none') {
return
}
if (!ctx.workflowId) {
logger.warn('Cannot persist memory without workflowId')
return
}
try {
this.validateInputs(inputs.conversationId, assistantMessage.content)
const memoryKey = this.buildMemoryKey(ctx, inputs, blockId)
if (inputs.memoryType === 'sliding_window') {
// Default to 10 messages if not specified (matches agent block default)
const windowSize = inputs.slidingWindowSize || '10'
const existingMessages = await this.fetchFromMemoryAPI(ctx.workflowId, memoryKey)
const updatedMessages = [...existingMessages, assistantMessage]
const messagesToPersist = this.applySlidingWindow(updatedMessages, windowSize)
await this.persistToMemoryAPI(ctx.workflowId, memoryKey, messagesToPersist)
} else if (inputs.memoryType === 'sliding_window_tokens') {
// Default to 4000 tokens if not specified (matches agent block default)
const maxTokens = inputs.slidingWindowTokens || '4000'
const existingMessages = await this.fetchFromMemoryAPI(ctx.workflowId, memoryKey)
const updatedMessages = [...existingMessages, assistantMessage]
const messagesToPersist = this.applySlidingWindowByTokens(
updatedMessages,
maxTokens,
inputs.model
)
await this.persistToMemoryAPI(ctx.workflowId, memoryKey, messagesToPersist)
} else {
// Conversation mode: use atomic append for better concurrency
await this.atomicAppendToMemory(ctx.workflowId, memoryKey, assistantMessage)
}
logger.debug('Successfully persisted memory message', {
workflowId: ctx.workflowId,
key: memoryKey,
})
} catch (error) {
logger.error('Failed to persist memory message:', error)
}
}
/**
* Persist user message to memory before agent execution
*/
async persistUserMessage(
ctx: ExecutionContext,
inputs: AgentInputs,
userMessage: Message,
blockId: string
): Promise<void> {
if (!inputs.memoryType || inputs.memoryType === 'none') {
return
}
if (!ctx.workflowId) {
logger.warn('Cannot persist user message without workflowId')
return
}
try {
const memoryKey = this.buildMemoryKey(ctx, inputs, blockId)
if (inputs.slidingWindowSize && inputs.memoryType === 'sliding_window') {
const existingMessages = await this.fetchFromMemoryAPI(ctx.workflowId, memoryKey)
const updatedMessages = [...existingMessages, userMessage]
const messagesToPersist = this.applySlidingWindow(updatedMessages, inputs.slidingWindowSize)
await this.persistToMemoryAPI(ctx.workflowId, memoryKey, messagesToPersist)
} else if (inputs.slidingWindowTokens && inputs.memoryType === 'sliding_window_tokens') {
const existingMessages = await this.fetchFromMemoryAPI(ctx.workflowId, memoryKey)
const updatedMessages = [...existingMessages, userMessage]
const messagesToPersist = this.applySlidingWindowByTokens(
updatedMessages,
inputs.slidingWindowTokens,
inputs.model
)
await this.persistToMemoryAPI(ctx.workflowId, memoryKey, messagesToPersist)
} else {
await this.atomicAppendToMemory(ctx.workflowId, memoryKey, userMessage)
}
} catch (error) {
logger.error('Failed to persist user message:', error)
}
}
/**
* Build memory key based on conversationId and blockId
* BlockId provides block-level memory isolation
*/
private buildMemoryKey(_ctx: ExecutionContext, inputs: AgentInputs, blockId: string): string {
const { conversationId } = inputs
if (!conversationId || conversationId.trim() === '') {
throw new Error(
'Conversation ID is required for all memory types. ' +
'Please provide a unique identifier (e.g., user-123, session-abc, customer-456).'
)
}
return `${conversationId}:${blockId}`
}
/**
* Apply sliding window to limit number of conversation messages
*
* System message handling:
* - System messages are excluded from the sliding window count
* - Only the first system message is preserved and placed at the start
* - This ensures system prompts remain available while limiting conversation history
*/
private applySlidingWindow(messages: Message[], windowSize: string): Message[] {
const limit = Number.parseInt(windowSize, 10)
if (Number.isNaN(limit) || limit <= 0) {
logger.warn('Invalid sliding window size, returning all messages', { windowSize })
return messages
}
const systemMessages = messages.filter((msg) => msg.role === 'system')
const conversationMessages = messages.filter((msg) => msg.role !== 'system')
const recentMessages = conversationMessages.slice(-limit)
const firstSystemMessage = systemMessages.length > 0 ? [systemMessages[0]] : []
return [...firstSystemMessage, ...recentMessages]
}
/**
* Apply token-based sliding window to limit conversation by token count
*
* System message handling:
* - For consistency with message-based sliding window, the first system message is preserved
* - System messages are excluded from the token count
* - This ensures system prompts are always available while limiting conversation history
*/
private applySlidingWindowByTokens(
messages: Message[],
maxTokens: string,
model?: string
): Message[] {
const tokenLimit = Number.parseInt(maxTokens, 10)
if (Number.isNaN(tokenLimit) || tokenLimit <= 0) {
logger.warn('Invalid token limit, returning all messages', { maxTokens })
return messages
}
// Separate system messages from conversation messages for consistent handling
const systemMessages = messages.filter((msg) => msg.role === 'system')
const conversationMessages = messages.filter((msg) => msg.role !== 'system')
const result: Message[] = []
let currentTokenCount = 0
// Add conversation messages from most recent backwards
for (let i = conversationMessages.length - 1; i >= 0; i--) {
const message = conversationMessages[i]
const messageTokens = getAccurateTokenCount(message.content, model)
if (currentTokenCount + messageTokens <= tokenLimit) {
result.unshift(message)
currentTokenCount += messageTokens
} else if (result.length === 0) {
logger.warn('Single message exceeds token limit, including anyway', {
messageTokens,
tokenLimit,
messageRole: message.role,
})
result.unshift(message)
currentTokenCount += messageTokens
break
} else {
// Token limit reached, stop processing
break
}
}
logger.debug('Applied token-based sliding window', {
totalMessages: messages.length,
conversationMessages: conversationMessages.length,
includedMessages: result.length,
totalTokens: currentTokenCount,
tokenLimit,
})
// Preserve first system message and prepend to results (consistent with message-based window)
const firstSystemMessage = systemMessages.length > 0 ? [systemMessages[0]] : []
return [...firstSystemMessage, ...result]
}
/**
* Apply context window limit based on model's maximum context window
* Auto-trims oldest conversation messages when approaching the model's context limit
* Uses 90% of context window (10% buffer for response)
* Only applies if model has contextWindow defined and contextInformationAvailable !== false
*/
private applyContextWindowLimit(messages: Message[], model?: string): Message[] {
if (!model) {
return messages
}
let contextWindow: number | undefined
for (const provider of Object.values(PROVIDER_DEFINITIONS)) {
if (provider.contextInformationAvailable === false) {
continue
}
const matchesPattern = provider.modelPatterns?.some((pattern) => pattern.test(model))
const matchesModel = provider.models.some((m) => m.id === model)
if (matchesPattern || matchesModel) {
const modelDef = provider.models.find((m) => m.id === model)
if (modelDef?.contextWindow) {
contextWindow = modelDef.contextWindow
break
}
}
}
if (!contextWindow) {
logger.debug('No context window information available for model, skipping auto-trim', {
model,
})
return messages
}
const maxTokens = Math.floor(contextWindow * 0.9)
logger.debug('Applying context window limit', {
model,
contextWindow,
maxTokens,
totalMessages: messages.length,
})
const systemMessages = messages.filter((msg) => msg.role === 'system')
const conversationMessages = messages.filter((msg) => msg.role !== 'system')
// Count tokens used by system messages first
let systemTokenCount = 0
for (const msg of systemMessages) {
systemTokenCount += getAccurateTokenCount(msg.content, model)
}
// Calculate remaining tokens available for conversation messages
const remainingTokens = Math.max(0, maxTokens - systemTokenCount)
if (systemTokenCount >= maxTokens) {
logger.warn('System messages exceed context window limit, including anyway', {
systemTokenCount,
maxTokens,
systemMessageCount: systemMessages.length,
})
return systemMessages
}
const result: Message[] = []
let currentTokenCount = 0
for (let i = conversationMessages.length - 1; i >= 0; i--) {
const message = conversationMessages[i]
const messageTokens = getAccurateTokenCount(message.content, model)
if (currentTokenCount + messageTokens <= remainingTokens) {
result.unshift(message)
currentTokenCount += messageTokens
} else if (result.length === 0) {
logger.warn('Single message exceeds remaining context window, including anyway', {
messageTokens,
remainingTokens,
systemTokenCount,
messageRole: message.role,
})
result.unshift(message)
currentTokenCount += messageTokens
break
} else {
logger.info('Auto-trimmed conversation history to fit context window', {
originalMessages: conversationMessages.length,
trimmedMessages: result.length,
conversationTokens: currentTokenCount,
systemTokens: systemTokenCount,
totalTokens: currentTokenCount + systemTokenCount,
maxTokens,
})
break
}
}
return [...systemMessages, ...result]
}
/**
* Fetch messages from memory API
*/
private async fetchFromMemoryAPI(workflowId: string, key: string): Promise<Message[]> {
try {
const isBrowser = typeof window !== 'undefined'
if (!isBrowser) {
return await this.fetchFromMemoryDirect(workflowId, key)
}
const headers = await buildAuthHeaders()
const url = buildAPIUrl(`/api/memory/${encodeURIComponent(key)}`, { workflowId })
const response = await fetch(url.toString(), {
method: 'GET',
headers,
})
if (!response.ok) {
if (response.status === 404) {
return []
}
throw new Error(`Failed to fetch memory: ${response.status} ${response.statusText}`)
}
const result = await response.json()
if (!result.success) {
throw new Error(result.error || 'Failed to fetch memory')
}
const memoryData = result.data?.data || result.data
if (Array.isArray(memoryData)) {
return memoryData.filter(
(msg) => msg && typeof msg === 'object' && 'role' in msg && 'content' in msg
)
}
return []
} catch (error) {
logger.error('Error fetching from memory API:', error)
return []
}
}
/**
* Direct database access
*/
private async fetchFromMemoryDirect(workflowId: string, key: string): Promise<Message[]> {
try {
const { db } = await import('@sim/db')
const { memory } = await import('@sim/db/schema')
const { and, eq } = await import('drizzle-orm')
const result = await db
.select({
data: memory.data,
})
.from(memory)
.where(and(eq(memory.workflowId, workflowId), eq(memory.key, key)))
.limit(1)
if (result.length === 0) {
return []
}
const memoryData = result[0].data as any
if (Array.isArray(memoryData)) {
return memoryData.filter(
(msg) => msg && typeof msg === 'object' && 'role' in msg && 'content' in msg
)
}
return []
} catch (error) {
logger.error('Error fetching from memory database:', error)
return []
}
}
/**
* Persist messages to memory API
*/
private async persistToMemoryAPI(
workflowId: string,
key: string,
messages: Message[]
): Promise<void> {
try {
const isBrowser = typeof window !== 'undefined'
if (!isBrowser) {
await this.persistToMemoryDirect(workflowId, key, messages)
return
}
const headers = await buildAuthHeaders()
const url = buildAPIUrl('/api/memory')
const response = await fetch(url.toString(), {
method: 'POST',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: stringifyJSON({
workflowId,
key,
data: messages,
}),
})
if (!response.ok) {
throw new Error(`Failed to persist memory: ${response.status} ${response.statusText}`)
}
const result = await response.json()
if (!result.success) {
throw new Error(result.error || 'Failed to persist memory')
}
} catch (error) {
logger.error('Error persisting to memory API:', error)
throw error
}
}
/**
* Atomically append a message to memory
*/
private async atomicAppendToMemory(
workflowId: string,
key: string,
message: Message
): Promise<void> {
try {
const isBrowser = typeof window !== 'undefined'
if (!isBrowser) {
await this.atomicAppendToMemoryDirect(workflowId, key, message)
} else {
const headers = await buildAuthHeaders()
const url = buildAPIUrl('/api/memory')
const response = await fetch(url.toString(), {
method: 'POST',
headers: {
...headers,
'Content-Type': 'application/json',
},
body: stringifyJSON({
workflowId,
key,
data: message,
}),
})
if (!response.ok) {
throw new Error(`Failed to append memory: ${response.status} ${response.statusText}`)
}
const result = await response.json()
if (!result.success) {
throw new Error(result.error || 'Failed to append memory')
}
}
} catch (error) {
logger.error('Error appending to memory:', error)
throw error
}
}
/**
* Direct database atomic append for server-side
* Uses PostgreSQL JSONB concatenation operator for atomic operations
*/
private async atomicAppendToMemoryDirect(
workflowId: string,
key: string,
message: Message
): Promise<void> {
try {
const { db } = await import('@sim/db')
const { memory } = await import('@sim/db/schema')
const { sql } = await import('drizzle-orm')
const { randomUUID } = await import('node:crypto')
const now = new Date()
const id = randomUUID()
await db
.insert(memory)
.values({
id,
workflowId,
key,
data: [message],
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: [memory.workflowId, memory.key],
set: {
data: sql`${memory.data} || ${JSON.stringify([message])}::jsonb`,
updatedAt: now,
},
})
logger.debug('Atomically appended message to memory', {
workflowId,
key,
})
} catch (error) {
logger.error('Error in atomic append to memory database:', error)
throw error
}
}
/**
* Direct database access for server-side persistence
* Uses UPSERT to handle race conditions atomically
*/
private async persistToMemoryDirect(
workflowId: string,
key: string,
messages: Message[]
): Promise<void> {
try {
const { db } = await import('@sim/db')
const { memory } = await import('@sim/db/schema')
const { randomUUID } = await import('node:crypto')
const now = new Date()
const id = randomUUID()
await db
.insert(memory)
.values({
id,
workflowId,
key,
data: messages,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: [memory.workflowId, memory.key],
set: {
data: messages,
updatedAt: now,
},
})
} catch (error) {
logger.error('Error persisting to memory database:', error)
throw error
}
}
/**
* Validate inputs to prevent malicious data or performance issues
*/
private validateInputs(conversationId?: string, content?: string): void {
if (conversationId) {
if (conversationId.length > 255) {
throw new Error('Conversation ID too long (max 255 characters)')
}
if (!/^[a-zA-Z0-9_\-:.@]+$/.test(conversationId)) {
logger.warn('Conversation ID contains special characters', { conversationId })
}
}
if (content) {
const contentSize = Buffer.byteLength(content, 'utf8')
const MAX_CONTENT_SIZE = 100 * 1024 // 100KB
if (contentSize > MAX_CONTENT_SIZE) {
throw new Error(`Message content too large (${contentSize} bytes, max ${MAX_CONTENT_SIZE})`)
}
}
}
}
export const memoryService = new Memory()

View File

@@ -2,9 +2,18 @@ export interface AgentInputs {
model?: string
responseFormat?: string | object
tools?: ToolInput[]
// Legacy inputs (backward compatible)
systemPrompt?: string
userPrompt?: string | object
memories?: any
memories?: any // Legacy memory block output
// New message array input (from messages-input subblock)
messages?: Message[]
// Memory configuration
memoryType?: 'none' | 'conversation' | 'sliding_window' | 'sliding_window_tokens'
conversationId?: string // Required for all non-none memory types
slidingWindowSize?: string // For message-based sliding window
slidingWindowTokens?: string // For token-based sliding window
// LLM parameters
temperature?: number
maxTokens?: number
apiKey?: string