mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
Checkpoint
This commit is contained in:
@@ -264,6 +264,7 @@ export async function POST(req: NextRequest) {
|
||||
{
|
||||
message,
|
||||
workflowId,
|
||||
workflowName: resolved.workflowName,
|
||||
userId: authenticatedUserId,
|
||||
userMessageId: userMessageIdToUse,
|
||||
mode,
|
||||
|
||||
@@ -8,7 +8,10 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom-tools-persistence'
|
||||
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
|
||||
import {
|
||||
loadWorkflowFromNormalizedTables,
|
||||
saveWorkflowToNormalizedTables,
|
||||
} from '@/lib/workflows/persistence/utils'
|
||||
import { sanitizeAgentToolsInBlocks } from '@/lib/workflows/sanitization/validation'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
import type { BlockState } from '@/stores/workflows/workflow/types'
|
||||
@@ -108,6 +111,49 @@ const WorkflowStateSchema = z.object({
|
||||
variables: z.any().optional(), // Workflow variables
|
||||
})
|
||||
|
||||
/**
|
||||
* GET /api/workflows/[id]/state
|
||||
* Fetch the current workflow state from normalized tables.
|
||||
* Used by the client after server-side edits (edit_workflow) to stay in sync.
|
||||
*/
|
||||
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
|
||||
const { id: workflowId } = await params
|
||||
|
||||
try {
|
||||
const auth = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
|
||||
if (!auth.success || !auth.userId) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const authorization = await authorizeWorkflowByWorkspacePermission({
|
||||
workflowId,
|
||||
userId: auth.userId,
|
||||
action: 'read',
|
||||
})
|
||||
if (!authorization.allowed) {
|
||||
return NextResponse.json({ error: 'Forbidden' }, { status: 403 })
|
||||
}
|
||||
|
||||
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
|
||||
if (!normalized) {
|
||||
return NextResponse.json({ error: 'Workflow state not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
return NextResponse.json({
|
||||
blocks: normalized.blocks,
|
||||
edges: normalized.edges,
|
||||
loops: normalized.loops || {},
|
||||
parallels: normalized.parallels || {},
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Failed to fetch workflow state', {
|
||||
workflowId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* PUT /api/workflows/[id]/state
|
||||
* Save complete workflow state to normalized database tables
|
||||
|
||||
@@ -10,6 +10,7 @@ const logger = createLogger('CopilotChatPayload')
|
||||
export interface BuildPayloadParams {
|
||||
message: string
|
||||
workflowId: string
|
||||
workflowName?: string
|
||||
userId: string
|
||||
userMessageId: string
|
||||
mode: string
|
||||
@@ -152,6 +153,7 @@ export async function buildCopilotRequestPayload(
|
||||
return {
|
||||
message,
|
||||
workflowId,
|
||||
...(params.workflowName ? { workflowName: params.workflowName } : {}),
|
||||
userId,
|
||||
model: selectedModel,
|
||||
...(provider ? { provider } : {}),
|
||||
|
||||
@@ -317,22 +317,45 @@ export const sseHandlers: Record<string, SSEHandler> = {
|
||||
const resultPayload = asRecord(
|
||||
data?.result || eventData.result || eventData.data || data?.data
|
||||
)
|
||||
const workflowState = asRecord(resultPayload?.workflowState)
|
||||
const hasWorkflowState = !!resultPayload?.workflowState
|
||||
logger.info('[SSE] edit_workflow result received', {
|
||||
hasWorkflowState,
|
||||
blockCount: hasWorkflowState ? Object.keys(workflowState.blocks ?? {}).length : 0,
|
||||
edgeCount: Array.isArray(workflowState.edges) ? workflowState.edges.length : 0,
|
||||
})
|
||||
if (hasWorkflowState) {
|
||||
const diffStore = useWorkflowDiffStore.getState()
|
||||
diffStore
|
||||
.setProposedChanges(resultPayload.workflowState as WorkflowState)
|
||||
.catch((err) => {
|
||||
logger.error('[SSE] Failed to apply edit_workflow diff', {
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
const input = asRecord(current.params || current.input)
|
||||
const workflowId =
|
||||
(input?.workflowId as string) ||
|
||||
useWorkflowRegistry.getState().activeWorkflowId
|
||||
|
||||
if (!workflowId) {
|
||||
logger.warn('[SSE] edit_workflow result has no workflowId, skipping diff')
|
||||
} else {
|
||||
// Re-fetch the state the server just wrote to DB.
|
||||
// Never use the response's workflowState directly — that would
|
||||
// mean client and server independently track state, creating
|
||||
// race conditions when the build agent makes sequential calls.
|
||||
logger.info('[SSE] edit_workflow success, fetching state from DB', { workflowId })
|
||||
fetch(`/api/workflows/${workflowId}/state`)
|
||||
.then((res) => {
|
||||
if (!res.ok) throw new Error(`State fetch failed: ${res.status}`)
|
||||
return res.json()
|
||||
})
|
||||
.then((freshState) => {
|
||||
const diffStore = useWorkflowDiffStore.getState()
|
||||
return diffStore.setProposedChanges(freshState as WorkflowState, undefined, {
|
||||
skipPersist: true,
|
||||
})
|
||||
})
|
||||
.catch((err) => {
|
||||
logger.error('[SSE] Failed to fetch/apply edit_workflow state', {
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
workflowId,
|
||||
})
|
||||
// Fallback: use the response's workflowState if DB fetch failed
|
||||
if (resultPayload?.workflowState) {
|
||||
const diffStore = useWorkflowDiffStore.getState()
|
||||
diffStore
|
||||
.setProposedChanges(resultPayload.workflowState as WorkflowState, undefined, {
|
||||
skipPersist: true,
|
||||
})
|
||||
.catch(() => {})
|
||||
}
|
||||
})
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('[SSE] edit_workflow result handling failed', {
|
||||
|
||||
@@ -43,18 +43,6 @@ import type { DeploymentData } from '@/lib/copilot/vfs/serializers'
|
||||
|
||||
const logger = createLogger('WorkspaceVFS')
|
||||
|
||||
/** Cache entry for a materialized VFS */
|
||||
interface VFSCacheEntry {
|
||||
vfs: WorkspaceVFS
|
||||
expiresAt: number
|
||||
}
|
||||
|
||||
/** Module-level VFS cache keyed by workspaceId */
|
||||
const vfsCache = new Map<string, VFSCacheEntry>()
|
||||
|
||||
/** Cache TTL in milliseconds (30 seconds) */
|
||||
const VFS_CACHE_TTL_MS = 30_000
|
||||
|
||||
/** Static component files, computed once and shared across all VFS instances */
|
||||
let staticComponentFiles: Map<string, string> | null = null
|
||||
|
||||
@@ -73,15 +61,19 @@ function getStaticComponentFiles(): Map<string, string> {
|
||||
const files = new Map<string, string>()
|
||||
|
||||
const allBlocks = getAllBlocks()
|
||||
for (const block of allBlocks) {
|
||||
const visibleBlocks = allBlocks.filter((b) => !b.hideFromToolbar)
|
||||
|
||||
let blocksFiltered = 0
|
||||
for (const block of visibleBlocks) {
|
||||
const path = `components/blocks/${block.type}.json`
|
||||
files.set(path, serializeBlockSchema(block))
|
||||
}
|
||||
blocksFiltered = allBlocks.length - visibleBlocks.length
|
||||
|
||||
// Build a reverse index: tool ID → service name from block registry.
|
||||
// The block type (stripped of version suffix) is used as the service directory.
|
||||
const toolToService = new Map<string, string>()
|
||||
for (const block of allBlocks) {
|
||||
for (const block of visibleBlocks) {
|
||||
if (!block.tools?.access) continue
|
||||
const service = stripVersionSuffix(block.type)
|
||||
for (const toolId of block.tools.access) {
|
||||
@@ -110,8 +102,37 @@ function getStaticComponentFiles(): Map<string, string> {
|
||||
integrationCount++
|
||||
}
|
||||
|
||||
// Add synthetic component files for subflow containers (not in block registry)
|
||||
files.set('components/blocks/loop.json', JSON.stringify({
|
||||
type: 'loop',
|
||||
name: 'Loop',
|
||||
description: 'Iterate over a collection or repeat a fixed number of times. Blocks inside the loop run once per iteration.',
|
||||
inputs: {
|
||||
loopType: { type: 'string', enum: ['for', 'forEach', 'while', 'doWhile'], description: 'Loop strategy' },
|
||||
iterations: { type: 'number', description: 'Number of iterations (for loopType "for")' },
|
||||
collection: { type: 'string', description: 'Collection expression to iterate (for loopType "forEach")' },
|
||||
condition: { type: 'string', description: 'Condition expression (for loopType "while" or "doWhile")' },
|
||||
},
|
||||
sourceHandles: ['loop-start-source', 'source'],
|
||||
notes: 'Use "loop-start-source" to connect to blocks inside the loop. Use "source" for the edge that runs after the loop completes. Blocks inside the loop must have parentId set to the loop block ID.',
|
||||
}, null, 2))
|
||||
|
||||
files.set('components/blocks/parallel.json', JSON.stringify({
|
||||
type: 'parallel',
|
||||
name: 'Parallel',
|
||||
description: 'Run blocks in parallel branches. All branches execute concurrently.',
|
||||
inputs: {
|
||||
parallelType: { type: 'string', enum: ['count', 'collection'], description: 'Parallel strategy' },
|
||||
count: { type: 'number', description: 'Number of parallel branches (for parallelType "count")' },
|
||||
collection: { type: 'string', description: 'Collection to distribute (for parallelType "collection")' },
|
||||
},
|
||||
sourceHandles: ['parallel-start-source', 'source'],
|
||||
notes: 'Use "parallel-start-source" to connect to blocks inside the parallel container. Use "source" for the edge after all branches complete. Blocks inside must have parentId set to the parallel block ID.',
|
||||
}, null, 2))
|
||||
|
||||
logger.info('Static component files built', {
|
||||
blocks: allBlocks.length,
|
||||
blocks: visibleBlocks.length,
|
||||
blocksFiltered,
|
||||
integrations: integrationCount,
|
||||
})
|
||||
|
||||
@@ -557,41 +578,25 @@ export class WorkspaceVFS {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a cached VFS for a workspace.
|
||||
* Re-materializes if the cache is expired.
|
||||
* Create a fresh VFS for a workspace.
|
||||
* Dynamic data (workflows, KBs, env) is always fetched fresh.
|
||||
* Static component files (blocks, integrations) are cached per-process.
|
||||
*/
|
||||
export async function getOrMaterializeVFS(
|
||||
workspaceId: string,
|
||||
userId: string
|
||||
): Promise<WorkspaceVFS> {
|
||||
const now = Date.now()
|
||||
const cached = vfsCache.get(workspaceId)
|
||||
|
||||
if (cached && cached.expiresAt > now) {
|
||||
return cached.vfs
|
||||
}
|
||||
|
||||
const vfs = new WorkspaceVFS()
|
||||
await vfs.materialize(workspaceId, userId)
|
||||
|
||||
vfsCache.set(workspaceId, {
|
||||
vfs,
|
||||
expiresAt: now + VFS_CACHE_TTL_MS,
|
||||
})
|
||||
|
||||
return vfs
|
||||
}
|
||||
|
||||
/**
|
||||
* Sanitize a name for use as a VFS path segment.
|
||||
* Converts to lowercase, replaces spaces/special chars with hyphens.
|
||||
* Uses the raw name as-is — only trims whitespace and replaces forward
|
||||
* slashes (which would break path hierarchy).
|
||||
*/
|
||||
function sanitizeName(name: string): string {
|
||||
return name
|
||||
.trim()
|
||||
.toLowerCase()
|
||||
.replace(/[^a-z0-9]+/g, '-')
|
||||
.replace(/^-|-$/g, '')
|
||||
.slice(0, 64)
|
||||
return name.trim().replace(/\//g, '-')
|
||||
}
|
||||
|
||||
|
||||
@@ -30,7 +30,8 @@ export async function resolveWorkflowIdForUser(
|
||||
if (!authorization.allowed) {
|
||||
return null
|
||||
}
|
||||
return { workflowId }
|
||||
const wf = await getWorkflowById(workflowId)
|
||||
return { workflowId, workflowName: wf?.name || undefined }
|
||||
}
|
||||
|
||||
const workspaceIds = await db
|
||||
|
||||
@@ -178,33 +178,34 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
|
||||
edges: candidateState.edges?.length || 0,
|
||||
})
|
||||
|
||||
// BACKGROUND: Broadcast and persist without blocking
|
||||
// These operations happen after the UI has already updated
|
||||
const cleanState = stripWorkflowDiffMarkers(cloneWorkflowState(candidateState))
|
||||
// When skipPersist is set, the server tool (edit_workflow) already
|
||||
// saved to DB. Both the Socket.IO broadcast and HTTP persist would
|
||||
// race with subsequent edit_workflow calls and overwrite newer state,
|
||||
// causing block IDs to thrash.
|
||||
if (!options?.skipPersist) {
|
||||
const cleanState = stripWorkflowDiffMarkers(cloneWorkflowState(candidateState))
|
||||
|
||||
// Fire and forget: broadcast to other users (don't await)
|
||||
enqueueReplaceWorkflowState({
|
||||
workflowId: activeWorkflowId,
|
||||
state: cleanState,
|
||||
}).catch((error) => {
|
||||
logger.warn('Failed to broadcast workflow state (non-blocking)', { error })
|
||||
})
|
||||
enqueueReplaceWorkflowState({
|
||||
workflowId: activeWorkflowId,
|
||||
state: cleanState,
|
||||
}).catch((error) => {
|
||||
logger.warn('Failed to broadcast workflow state (non-blocking)', { error })
|
||||
})
|
||||
|
||||
// Fire and forget: persist to database (don't await)
|
||||
persistWorkflowStateToServer(activeWorkflowId, candidateState)
|
||||
.then((persisted) => {
|
||||
if (!persisted) {
|
||||
logger.warn('Failed to persist copilot edits (state already applied locally)')
|
||||
// Don't revert - user can retry or state will sync on next save
|
||||
} else {
|
||||
logger.info('Workflow diff persisted to database', {
|
||||
workflowId: activeWorkflowId,
|
||||
})
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
logger.warn('Failed to persist workflow state (non-blocking)', { error })
|
||||
})
|
||||
persistWorkflowStateToServer(activeWorkflowId, candidateState)
|
||||
.then((persisted) => {
|
||||
if (!persisted) {
|
||||
logger.warn('Failed to persist copilot edits (state already applied locally)')
|
||||
} else {
|
||||
logger.info('Workflow diff persisted to database', {
|
||||
workflowId: activeWorkflowId,
|
||||
})
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
logger.warn('Failed to persist workflow state (non-blocking)', { error })
|
||||
})
|
||||
}
|
||||
|
||||
// Emit event for undo/redo recording
|
||||
if (!options?.skipRecording) {
|
||||
|
||||
@@ -16,6 +16,8 @@ export interface WorkflowDiffState {
|
||||
export interface DiffActionOptions {
|
||||
/** Skip recording this operation for undo/redo. Used during undo/redo replay. */
|
||||
skipRecording?: boolean
|
||||
/** Skip persisting to DB. Use when the server tool already saved (e.g. edit_workflow). */
|
||||
skipPersist?: boolean
|
||||
}
|
||||
|
||||
export interface WorkflowDiffActions {
|
||||
|
||||
Reference in New Issue
Block a user