feat(memory): added memory block and tool (#372)

* feat(memory): added memory block and service

* feat(memory): ran migrations

* improvement(memory): appending memories; console messages

* feat(memory): added agent raw message history input UI

* feat(agent-messages): added agent message history

* improvement: added tests
This commit is contained in:
Emir Karabeg
2025-05-19 02:54:39 -07:00
committed by Emir Karabeg
parent b29827c4ee
commit 0af7fb2a7a
25 changed files with 4029 additions and 94 deletions

View File

@@ -0,0 +1,329 @@
import { NextRequest, NextResponse } from 'next/server'
import { and, eq, isNull } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console-logger'
import { db } from '@/db'
import { memory } from '@/db/schema'
const logger = createLogger('MemoryByIdAPI')
export const dynamic = 'force-dynamic'
export const runtime = 'nodejs'
/**
* GET handler for retrieving a specific memory by ID
*/
export async function GET(
request: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id } = await params
try {
logger.info(`[${requestId}] Processing memory get request for ID: ${id}`)
// Get workflowId from query parameter (required)
const url = new URL(request.url)
const workflowId = url.searchParams.get('workflowId')
if (!workflowId) {
logger.warn(`[${requestId}] Missing required parameter: workflowId`)
return NextResponse.json(
{
success: false,
error: {
message: 'workflowId parameter is required',
},
},
{ status: 400 }
)
}
// Query the database for the memory
const memories = await db
.select()
.from(memory)
.where(
and(
eq(memory.key, id),
eq(memory.workflowId, workflowId),
isNull(memory.deletedAt)
)
)
.orderBy(memory.createdAt)
.limit(1)
if (memories.length === 0) {
logger.warn(`[${requestId}] Memory not found: ${id} for workflow: ${workflowId}`)
return NextResponse.json(
{
success: false,
error: {
message: 'Memory not found',
},
},
{ status: 404 }
)
}
logger.info(`[${requestId}] Memory retrieved successfully: ${id} for workflow: ${workflowId}`)
return NextResponse.json(
{
success: true,
data: memories[0],
},
{ status: 200 }
)
} catch (error: any) {
return NextResponse.json(
{
success: false,
error: {
message: error.message || 'Failed to retrieve memory',
},
},
{ status: 500 }
)
}
}
/**
* DELETE handler for removing a specific memory
*/
export async function DELETE(
request: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id } = await params
try {
logger.info(`[${requestId}] Processing memory delete request for ID: ${id}`)
// Get workflowId from query parameter (required)
const url = new URL(request.url)
const workflowId = url.searchParams.get('workflowId')
if (!workflowId) {
logger.warn(`[${requestId}] Missing required parameter: workflowId`)
return NextResponse.json(
{
success: false,
error: {
message: 'workflowId parameter is required',
},
},
{ status: 400 }
)
}
// Verify memory exists before attempting to delete
const existingMemory = await db
.select({ id: memory.id })
.from(memory)
.where(
and(
eq(memory.key, id),
eq(memory.workflowId, workflowId),
isNull(memory.deletedAt)
)
)
.limit(1)
if (existingMemory.length === 0) {
logger.warn(`[${requestId}] Memory not found: ${id} for workflow: ${workflowId}`)
return NextResponse.json(
{
success: false,
error: {
message: 'Memory not found',
},
},
{ status: 404 }
)
}
// Soft delete by setting deletedAt timestamp
await db
.update(memory)
.set({
deletedAt: new Date(),
updatedAt: new Date()
})
.where(
and(
eq(memory.key, id),
eq(memory.workflowId, workflowId)
)
)
logger.info(`[${requestId}] Memory deleted successfully: ${id} for workflow: ${workflowId}`)
return NextResponse.json(
{
success: true,
data: { message: 'Memory deleted successfully' },
},
{ status: 200 }
)
} catch (error: any) {
return NextResponse.json(
{
success: false,
error: {
message: error.message || 'Failed to delete memory',
},
},
{ status: 500 }
)
}
}
/**
* PUT handler for updating a specific memory
*/
export async function PUT(
request: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id } = await params
try {
logger.info(`[${requestId}] Processing memory update request for ID: ${id}`)
// Parse request body
const body = await request.json()
const { data, workflowId } = body
if (!data) {
logger.warn(`[${requestId}] Missing required field: data`)
return NextResponse.json(
{
success: false,
error: {
message: 'Memory data is required',
},
},
{ status: 400 }
)
}
if (!workflowId) {
logger.warn(`[${requestId}] Missing required field: workflowId`)
return NextResponse.json(
{
success: false,
error: {
message: 'workflowId is required',
},
},
{ status: 400 }
)
}
// Verify memory exists before attempting to update
const existingMemories = await db
.select()
.from(memory)
.where(
and(
eq(memory.key, id),
eq(memory.workflowId, workflowId),
isNull(memory.deletedAt)
)
)
.limit(1)
if (existingMemories.length === 0) {
logger.warn(`[${requestId}] Memory not found: ${id} for workflow: ${workflowId}`)
return NextResponse.json(
{
success: false,
error: {
message: 'Memory not found',
},
},
{ status: 404 }
)
}
const existingMemory = existingMemories[0]
// Validate memory data based on the existing memory type
if (existingMemory.type === 'agent') {
if (!data.role || !data.content) {
logger.warn(`[${requestId}] Missing agent memory fields`)
return NextResponse.json(
{
success: false,
error: {
message: 'Agent memory requires role and content',
},
},
{ status: 400 }
)
}
if (!['user', 'assistant', 'system'].includes(data.role)) {
logger.warn(`[${requestId}] Invalid agent role: ${data.role}`)
return NextResponse.json(
{
success: false,
error: {
message: 'Agent role must be user, assistant, or system',
},
},
{ status: 400 }
)
}
}
// Update the memory with new data
await db
.update(memory)
.set({
data,
updatedAt: new Date()
})
.where(
and(
eq(memory.key, id),
eq(memory.workflowId, workflowId)
)
)
// Fetch the updated memory
const updatedMemories = await db
.select()
.from(memory)
.where(
and(
eq(memory.key, id),
eq(memory.workflowId, workflowId)
)
)
.limit(1)
logger.info(`[${requestId}] Memory updated successfully: ${id} for workflow: ${workflowId}`)
return NextResponse.json(
{
success: true,
data: updatedMemories[0],
},
{ status: 200 }
)
} catch (error: any) {
return NextResponse.json(
{
success: false,
error: {
message: error.message || 'Failed to update memory',
},
},
{ status: 500 }
)
}
}

View File

@@ -0,0 +1,335 @@
import { NextRequest, NextResponse } from 'next/server'
import { and, eq, like, isNull } from 'drizzle-orm'
import { db } from '@/db'
import { memory } from '@/db/schema'
import { createLogger } from '@/lib/logs/console-logger'
const logger = createLogger('MemoryAPI')
export const dynamic = 'force-dynamic'
export const runtime = 'nodejs'
/**
* GET handler for searching and retrieving memories
* Supports query parameters:
* - query: Search string for memory keys
* - type: Filter by memory type
* - limit: Maximum number of results (default: 50)
* - workflowId: Filter by workflow ID (required)
*/
export async function GET(request: NextRequest) {
const requestId = crypto.randomUUID().slice(0, 8)
try {
logger.info(`[${requestId}] Processing memory search request`)
// Extract workflowId from query parameters
const url = new URL(request.url)
const workflowId = url.searchParams.get('workflowId')
const searchQuery = url.searchParams.get('query')
const type = url.searchParams.get('type')
const limit = parseInt(url.searchParams.get('limit') || '50')
// Require workflowId for security
if (!workflowId) {
logger.warn(`[${requestId}] Missing required parameter: workflowId`)
return NextResponse.json(
{
success: false,
error: {
message: 'workflowId parameter is required',
},
},
{ status: 400 }
)
}
// Build query conditions
const conditions = []
// Only include non-deleted memories
conditions.push(isNull(memory.deletedAt))
// Filter by workflow ID (required)
conditions.push(eq(memory.workflowId, workflowId))
// Add type filter if provided
if (type) {
conditions.push(eq(memory.type, type))
}
// Add search query if provided (leverages index on key field)
if (searchQuery) {
conditions.push(like(memory.key, `%${searchQuery}%`))
}
// Execute the query
const memories = await db
.select()
.from(memory)
.where(and(...conditions))
.orderBy(memory.createdAt)
.limit(limit)
logger.info(`[${requestId}] Found ${memories.length} memories for workflow: ${workflowId}`)
return NextResponse.json(
{
success: true,
data: { memories }
},
{ status: 200 }
)
} catch (error: any) {
return NextResponse.json(
{
success: false,
error: {
message: error.message || 'Failed to search memories',
},
},
{ status: 500 }
)
}
}
/**
* POST handler for creating new memories
* Requires:
* - key: Unique identifier for the memory (within workflow scope)
* - type: Memory type ('agent' or 'raw')
* - data: Memory content (varies by type)
* - workflowId: ID of the workflow this memory belongs to
*/
export async function POST(request: NextRequest) {
const requestId = crypto.randomUUID().slice(0, 8)
try {
logger.info(`[${requestId}] Processing memory creation request`)
// Parse request body
const body = await request.json()
const { key, type, data, workflowId } = body
// Validate required fields
if (!key) {
logger.warn(`[${requestId}] Missing required field: key`)
return NextResponse.json(
{
success: false,
error: {
message: 'Memory key is required',
},
},
{ status: 400 }
)
}
if (!type || !['agent', 'raw'].includes(type)) {
logger.warn(`[${requestId}] Invalid memory type: ${type}`)
return NextResponse.json(
{
success: false,
error: {
message: 'Valid memory type (agent or raw) is required',
},
},
{ status: 400 }
)
}
if (!data) {
logger.warn(`[${requestId}] Missing required field: data`)
return NextResponse.json(
{
success: false,
error: {
message: 'Memory data is required',
},
},
{ status: 400 }
)
}
if (!workflowId) {
logger.warn(`[${requestId}] Missing required field: workflowId`)
return NextResponse.json(
{
success: false,
error: {
message: 'workflowId is required',
},
},
{ status: 400 }
)
}
// Additional validation for agent type
if (type === 'agent') {
if (!data.role || !data.content) {
logger.warn(`[${requestId}] Missing agent memory fields`)
return NextResponse.json(
{
success: false,
error: {
message: 'Agent memory requires role and content',
},
},
{ status: 400 }
)
}
if (!['user', 'assistant', 'system'].includes(data.role)) {
logger.warn(`[${requestId}] Invalid agent role: ${data.role}`)
return NextResponse.json(
{
success: false,
error: {
message: 'Agent role must be user, assistant, or system',
},
},
{ status: 400 }
)
}
}
// Check if memory with the same key already exists for this workflow
const existingMemory = await db
.select()
.from(memory)
.where(
and(
eq(memory.key, key),
eq(memory.workflowId, workflowId),
isNull(memory.deletedAt)
)
)
.limit(1)
if (existingMemory.length > 0) {
logger.info(`[${requestId}] Memory with key ${key} exists, checking if we can append`)
// Check if types match
if (existingMemory[0].type !== type) {
logger.warn(`[${requestId}] Memory type mismatch: existing=${existingMemory[0].type}, new=${type}`)
return NextResponse.json(
{
success: false,
error: {
message: `Cannot append memory of type '${type}' to existing memory of type '${existingMemory[0].type}'`,
},
},
{ status: 400 }
)
}
// Handle appending based on memory type
let updatedData;
if (type === 'agent') {
// For agent type
const newMessage = data;
const existingData = existingMemory[0].data;
// If existing data is an array, append to it
if (Array.isArray(existingData)) {
updatedData = [...existingData, newMessage];
}
// If existing data is a single message object, convert to array
else {
updatedData = [existingData, newMessage];
}
} else {
// For raw type
// Merge objects if they're objects, otherwise use the new data
if (typeof existingMemory[0].data === 'object' && typeof data === 'object') {
updatedData = { ...existingMemory[0].data, ...data };
} else {
updatedData = data;
}
}
// Update the existing memory with appended data
await db
.update(memory)
.set({
data: updatedData,
updatedAt: new Date()
})
.where(
and(
eq(memory.key, key),
eq(memory.workflowId, workflowId)
)
)
// Fetch the updated memory
const updatedMemory = await db
.select()
.from(memory)
.where(
and(
eq(memory.key, key),
eq(memory.workflowId, workflowId)
)
)
.limit(1)
logger.info(`[${requestId}] Memory appended successfully: ${key} for workflow: ${workflowId}`)
return NextResponse.json(
{
success: true,
data: updatedMemory[0]
},
{ status: 200 }
)
}
// Insert the new memory
const newMemory = {
id: `mem_${crypto.randomUUID().replace(/-/g, '')}`,
workflowId,
key,
type,
data: type === 'agent' ? Array.isArray(data) ? data : [data] : data,
createdAt: new Date(),
updatedAt: new Date()
}
await db.insert(memory).values(newMemory)
logger.info(`[${requestId}] Memory created successfully: ${key} for workflow: ${workflowId}`)
return NextResponse.json(
{
success: true,
data: newMemory
},
{ status: 201 }
)
} catch (error: any) {
// Handle unique constraint violation
if (error.code === '23505') {
logger.warn(`[${requestId}] Duplicate key violation`)
return NextResponse.json(
{
success: false,
error: {
message: 'Memory with this key already exists',
},
},
{ status: 409 }
)
}
return NextResponse.json(
{
success: false,
error: {
message: error.message || 'Failed to create memory',
},
},
{ status: 500 }
)
}
}