This commit is contained in:
Siddharth Ganesan
2025-08-30 13:43:10 -07:00
parent a1acbc9616
commit 396c9db204
8 changed files with 182 additions and 9 deletions

View File

@@ -4,6 +4,7 @@ import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
import { loadWorkflowStateForExecution } from '@/lib/logs/execution/logging-factory'
const logger = createLogger('WorkflowLogAPI')
@@ -30,6 +31,22 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
success: result.success,
})
// Log current normalized state before starting logging session (what snapshot will save)
try {
const normalizedState = await loadWorkflowStateForExecution(id)
logger.info(`[${requestId}] 🔍 Normalized workflow state at persistence time:`, {
blocks: Object.entries(normalizedState.blocks || {}).map(([bid, b]: [string, any]) => ({
id: bid,
type: (b as any).type,
triggerMode: (b as any).triggerMode,
enabled: (b as any).enabled,
})),
edgesCount: (normalizedState.edges || []).length,
})
} catch (e) {
logger.warn(`[${requestId}] Failed to load normalized state for logging snapshot context`)
}
// Check if this execution is from chat using only the explicit source flag
const isChatExecution = result.metadata?.source === 'chat'

View File

@@ -344,7 +344,13 @@ export function TriggerConfig({
// Check if the trigger is connected
// Both webhook and credential-based triggers now have webhook database entries
const isTriggerConnected = Boolean(triggerId && actualTriggerId)
// In preview, consider it configured if the snapshot contains any trigger fields
const isConfiguredInPreview = isPreview && Boolean(
(propValue?.triggerPath && propValue.triggerPath.length > 0) ||
(propValue?.triggerConfig && Object.keys(propValue.triggerConfig).length > 0) ||
propValue?.triggerId
)
const isTriggerConnected = isConfiguredInPreview || Boolean(triggerId && actualTriggerId)
// Debug logging to help with troubleshooting
useEffect(() => {

View File

@@ -437,8 +437,12 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
stateToUse = mergedState?.subBlocks || {}
}
const isAdvancedMode = useWorkflowStore.getState().blocks[blockId]?.advancedMode ?? false
const isTriggerMode = useWorkflowStore.getState().blocks[blockId]?.triggerMode ?? false
const isAdvancedMode = data.isPreview
? ((data.blockState as any)?.advancedMode ?? false)
: useWorkflowStore.getState().blocks[blockId]?.advancedMode ?? false
const isTriggerMode = data.isPreview
? ((data.blockState as any)?.triggerMode ?? false)
: useWorkflowStore.getState().blocks[blockId]?.triggerMode ?? false
const effectiveAdvanced = currentWorkflow.isDiffMode ? displayAdvancedMode : isAdvancedMode
const effectiveTrigger = currentWorkflow.isDiffMode ? displayTriggerMode : isTriggerMode

View File

@@ -131,6 +131,17 @@ export async function executeWorkflowWithLogging(
// Merge subblock states from the appropriate store
const mergedStates = mergeSubblockState(validBlocks)
// Log the current workflow state before filtering
logger.info('🔍 Current workflow state before filtering:', {
totalBlocks: Object.keys(mergedStates).length,
blocks: Object.entries(mergedStates).map(([id, block]) => ({
id,
type: block.type,
triggerMode: block.triggerMode,
category: block.type ? getBlock(block.type)?.category : undefined,
})),
})
// Filter out trigger blocks for manual execution
const filteredStates = Object.entries(mergedStates).reduce(
(acc, [id, block]) => {
@@ -142,16 +153,29 @@ export async function executeWorkflowWithLogging(
const blockConfig = getBlock(block.type)
const isTriggerBlock = blockConfig?.category === 'triggers'
const isInTriggerMode = block.triggerMode === true
// Skip trigger blocks during manual execution
if (!isTriggerBlock) {
// Skip trigger blocks AND blocks in trigger mode during manual execution
if (!isTriggerBlock && !isInTriggerMode) {
acc[id] = block
} else {
logger.info(`🚫 Filtering out block ${id} - trigger category: ${isTriggerBlock}, trigger mode: ${isInTriggerMode}`)
}
return acc
},
{} as typeof mergedStates
)
// Log the filtered state that will be used for execution (not snapshots)
logger.info('📦 Filtered workflow state for execution:', {
totalBlocks: Object.keys(filteredStates).length,
blocks: Object.entries(filteredStates).map(([id, block]) => ({
id,
type: block.type,
triggerMode: block.triggerMode,
})),
})
const currentBlockStates = Object.entries(filteredStates).reduce(
(acc, [id, block]) => {
acc[id] = Object.entries(block.subBlocks).reduce(

View File

@@ -782,6 +782,16 @@ export function useCollaborativeWorkflow() {
const newTriggerMode = !currentBlock.triggerMode
// If enabling trigger mode, proactively remove incoming edges for consistency across clients
if (newTriggerMode) {
const incomingEdges = Object.values(workflowStore.edges).filter((e) => e.target === id)
for (const edge of incomingEdges) {
executeQueuedOperation('remove', 'edge', { id: edge.id }, () =>
workflowStore.removeEdge(edge.id)
)
}
}
executeQueuedOperation(
'update-trigger-mode',
'block',

View File

@@ -11,6 +11,7 @@ import type {
} from '@/lib/logs/types'
import { db } from '@/db'
import { workflowExecutionSnapshots } from '@/db/schema'
import { filterEdgesForTriggers } from '@/lib/workflows/trigger-rules'
const logger = createLogger('SnapshotService')
@@ -27,8 +28,27 @@ export class SnapshotService implements ISnapshotService {
workflowId: string,
state: WorkflowState
): Promise<SnapshotCreationResult> {
// Ensure consistency: apply the same trigger-edge filtering used by the editor/execution
const filteredState = filterEdgesForTriggers(state)
// Hash the position-less state for deduplication (functional equivalence)
const stateHash = this.computeStateHash(state)
const stateHash = this.computeStateHash(filteredState)
// Log a concise preview of the state being considered for snapshot
try {
logger.info('📸 Preparing workflow snapshot', {
workflowId,
stateHash,
blocks: Object.entries(filteredState.blocks || {}).map(([id, b]: [string, any]) => ({
id,
type: (b as any)?.type,
name: (b as any)?.name,
triggerMode: (b as any)?.triggerMode === true,
enabled: (b as any)?.enabled !== false,
})),
edgesCount: (filteredState.edges || []).length,
})
} catch {}
const existingSnapshot = await this.getSnapshotByHash(workflowId, stateHash)
if (existingSnapshot) {
@@ -45,7 +65,7 @@ export class SnapshotService implements ISnapshotService {
id: uuidv4(),
workflowId,
stateHash,
stateData: state, // Full state with positions, subblock values, etc.
stateData: filteredState, // Full state with positions, subblock values, etc., after consistent filtering
}
const [newSnapshot] = await db
@@ -53,8 +73,24 @@ export class SnapshotService implements ISnapshotService {
.values(snapshotData)
.returning()
logger.info('✅ Saved workflow snapshot', {
workflowId,
snapshotId: newSnapshot.id,
stateHash,
blocksCount: Object.keys(filteredState.blocks || {}).length,
edgesCount: (filteredState.edges || []).length,
})
// Emit the exact state saved (debug level to avoid log noise); redact sensitive values if needed
try {
// Lazy import to avoid cycles
const utils = await import('@/lib/utils')
const redactedState = utils.redactApiKeys(newSnapshot.stateData as any)
logger.debug('🧩 Snapshot state data (exact):', redactedState)
} catch {}
logger.debug(`Created new snapshot for workflow ${workflowId} with hash ${stateHash}`)
logger.debug(`Stored full state with ${Object.keys(state.blocks || {}).length} blocks`)
logger.debug(`Stored full state with ${Object.keys(filteredState.blocks || {}).length} blocks`)
return {
snapshot: {
...newSnapshot,

View File

@@ -0,0 +1,43 @@
import { getBlock } from '@/blocks'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
/**
* Decide whether incoming edges should be blocked for a target block.
* - Block if the block is a pure trigger category (webhook, etc.)
* - Block if the block is currently in triggerMode
* - Block if the block is the starter block
*/
export function shouldBlockIncomingEdgesForTarget(blockType: string, triggerMode: boolean | undefined): boolean {
// Starter blocks should never have incoming edges
if (blockType === 'starter') return true
// Runtime toggle
if (triggerMode === true) return true
// Pure trigger categories
try {
const config = getBlock(blockType)
if (config?.category === 'triggers') return true
} catch {}
return false
}
/**
* Return a copy of state with edges to trigger-like targets removed.
*/
export function filterEdgesForTriggers(state: WorkflowState): WorkflowState {
const blocks = state.blocks || {}
const edges = state.edges || []
const filteredEdges = edges.filter((edge) => {
const target = blocks[edge.target]
if (!target) return false // Drop dangling edges defensively
return !shouldBlockIncomingEdgesForTarget(target.type, target.triggerMode)
})
return {
...state,
edges: filteredEdges,
}
}

View File

@@ -6,6 +6,7 @@ import { createLogger } from '@/lib/logs/console/logger'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import * as schema from '@/db/schema'
import { workflow, workflowBlocks, workflowEdges, workflowSubflows } from '@/db/schema'
import { shouldBlockIncomingEdgesForTarget } from '@/lib/workflows/trigger-rules'
const logger = createLogger('SocketDatabase')
@@ -597,7 +598,21 @@ async function handleBlockOperationTx(
throw new Error(`Block ${payload.id} not found in workflow ${workflowId}`)
}
logger.debug(`Updated block trigger mode: ${payload.id} -> ${payload.triggerMode}`)
// When enabling trigger mode, remove all incoming edges to this block at the database level
if (payload.triggerMode === true) {
const removed = await tx
.delete(workflowEdges)
.where(
and(eq(workflowEdges.workflowId, workflowId), eq(workflowEdges.targetBlockId, payload.id))
)
.returning({ id: workflowEdges.id })
logger.debug(
`Updated block trigger mode: ${payload.id} -> ${payload.triggerMode}. Removed ${removed.length} incoming edges for trigger mode.`
)
} else {
logger.debug(`Updated block trigger mode: ${payload.id} -> ${payload.triggerMode}`)
}
break
}
@@ -743,6 +758,24 @@ async function handleEdgeOperationTx(
throw new Error('Missing required fields for add edge operation')
}
// Guard: do not allow incoming edges to trigger-like targets
const [targetBlock] = await tx
.select({ id: workflowBlocks.id, type: workflowBlocks.type, triggerMode: workflowBlocks.triggerMode })
.from(workflowBlocks)
.where(and(eq(workflowBlocks.workflowId, workflowId), eq(workflowBlocks.id, payload.target)))
.limit(1)
if (!targetBlock) {
throw new Error(`Target block ${payload.target} not found in workflow ${workflowId}`)
}
if (shouldBlockIncomingEdgesForTarget(targetBlock.type as string, targetBlock.triggerMode as boolean)) {
logger.debug(
`Rejected edge add ${payload.id}: incoming edges not allowed to ${payload.target} (type=${targetBlock.type}, triggerMode=${Boolean(targetBlock.triggerMode)})`
)
return
}
await tx.insert(workflowEdges).values({
id: payload.id,
workflowId,