fix(deployed-state): use deployed state for API sync and async execs, deployed state modal visual for enabled/disabled (#885)

* fix(deployments): use deployed state for API sync and async execs

* fix deployed workflow modal visualization for enabled

* fix tests
This commit is contained in:
Vikhyath Mondreti
2025-08-05 15:26:57 -07:00
committed by GitHub
parent 746b87743a
commit 062e2a2c40
5 changed files with 65 additions and 34 deletions

View File

@@ -87,7 +87,7 @@ describe('Workflow Execution API Route', () => {
}))
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
loadDeployedWorkflowState: vi.fn().mockResolvedValue({
blocks: {
'starter-id': {
id: 'starter-id',
@@ -121,7 +121,7 @@ describe('Workflow Execution API Route', () => {
],
loops: {},
parallels: {},
isFromNormalizedTables: true,
isFromNormalizedTables: false, // Changed to false since it's from deployed state
}),
}))
@@ -516,7 +516,7 @@ describe('Workflow Execution API Route', () => {
}))
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
loadDeployedWorkflowState: vi.fn().mockResolvedValue({
blocks: {
'starter-id': {
id: 'starter-id',
@@ -550,7 +550,7 @@ describe('Workflow Execution API Route', () => {
],
loops: {},
parallels: {},
isFromNormalizedTables: true,
isFromNormalizedTables: false, // Changed to false since it's from deployed state
}),
}))

View File

@@ -9,7 +9,7 @@ import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import {
createHttpResponseFromBlock,
updateWorkflowRunCounts,
@@ -111,20 +111,13 @@ async function executeWorkflow(workflow: any, requestId: string, input?: any): P
runningExecutions.add(executionKey)
logger.info(`[${requestId}] Starting workflow execution: ${workflowId}`)
// Load workflow data from normalized tables
logger.debug(`[${requestId}] Loading workflow ${workflowId} from normalized tables`)
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
// Load workflow data from deployed state for API executions
const deployedData = await loadDeployedWorkflowState(workflowId)
if (!normalizedData) {
throw new Error(
`Workflow ${workflowId} has no normalized data available. Ensure the workflow is properly saved to normalized tables.`
)
}
// Use normalized data as primary source
const { blocks, edges, loops, parallels } = normalizedData
logger.info(`[${requestId}] Using normalized tables for workflow execution: ${workflowId}`)
logger.debug(`[${requestId}] Normalized data loaded:`, {
// Use deployed data as primary source for API executions
const { blocks, edges, loops, parallels } = deployedData
logger.info(`[${requestId}] Using deployed state for workflow execution: ${workflowId}`)
logger.debug(`[${requestId}] Deployed data loaded:`, {
blocksCount: Object.keys(blocks || {}).length,
edgesCount: (edges || []).length,
loopsCount: Object.keys(loops || {}).length,

View File

@@ -70,7 +70,10 @@ export function WorkflowBlock({ id, data }: NodeProps<WorkflowBlockProps>) {
const currentWorkflow = useCurrentWorkflow()
const currentBlock = currentWorkflow.getBlockById(id)
const isEnabled = currentBlock?.enabled ?? true
// In preview mode, use the blockState provided; otherwise use current workflow state
const isEnabled = data.isPreview
? (data.blockState?.enabled ?? true)
: (currentBlock?.enabled ?? true)
// Get diff status from the block itself (set by diff engine)
const diffStatus =

View File

@@ -1,8 +1,8 @@
import { eq } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console/logger'
import { db } from '@/db'
import { workflowBlocks, workflowEdges, workflowSubflows } from '@/db/schema'
import type { LoopConfig, WorkflowState } from '@/stores/workflows/workflow/types'
import { workflow, workflowBlocks, workflowEdges, workflowSubflows } from '@/db/schema'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import { SUBFLOW_TYPES } from '@/stores/workflows/workflow/types'
const logger = createLogger('WorkflowDBHelpers')
@@ -12,7 +12,49 @@ export interface NormalizedWorkflowData {
edges: any[]
loops: Record<string, any>
parallels: Record<string, any>
isFromNormalizedTables: true // Flag to indicate this came from new tables
isFromNormalizedTables: boolean // Flag to indicate source (true = normalized tables, false = deployed state)
}
/**
* Load deployed workflow state for execution
* Returns deployed state if available, otherwise throws error
*/
export async function loadDeployedWorkflowState(
workflowId: string
): Promise<NormalizedWorkflowData> {
try {
// First check if workflow is deployed and get deployed state
const [workflowResult] = await db
.select({
isDeployed: workflow.isDeployed,
deployedState: workflow.deployedState,
})
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!workflowResult) {
throw new Error(`Workflow ${workflowId} not found`)
}
if (!workflowResult.isDeployed || !workflowResult.deployedState) {
throw new Error(`Workflow ${workflowId} is not deployed or has no deployed state`)
}
const deployedState = workflowResult.deployedState as any
// Convert deployed state to normalized format
return {
blocks: deployedState.blocks || {},
edges: deployedState.edges || [],
loops: deployedState.loops || {},
parallels: deployedState.parallels || {},
isFromNormalizedTables: false, // Flag to indicate this came from deployed state
}
} catch (error) {
logger.error(`Error loading deployed workflow state ${workflowId}:`, error)
throw error
}
}
/**
@@ -88,7 +130,6 @@ export async function loadWorkflowFromNormalizedTables(
const config = subflow.config || {}
if (subflow.type === SUBFLOW_TYPES.LOOP) {
const loopConfig = config as LoopConfig
loops[subflow.id] = {
id: subflow.id,
...config,
@@ -126,7 +167,7 @@ export async function saveWorkflowToNormalizedTables(
): Promise<{ success: boolean; jsonBlob?: any; error?: string }> {
try {
// Start a transaction
const result = await db.transaction(async (tx) => {
await db.transaction(async (tx) => {
// Clear existing data for this workflow
await Promise.all([
tx.delete(workflowBlocks).where(eq(workflowBlocks.workflowId, workflowId)),

View File

@@ -6,7 +6,7 @@ import { createLogger } from '@/lib/logs/console/logger'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { db } from '@/db'
import { environment as environmentTable, userStats } from '@/db/schema'
@@ -60,16 +60,10 @@ export const workflowExecution = task({
)
}
// Load workflow data from normalized tables
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
if (!normalizedData) {
logger.error(`[${requestId}] Workflow not found in normalized tables: ${workflowId}`)
throw new Error(`Workflow ${workflowId} data not found in normalized tables`)
}
// Load workflow data from deployed state (this task is only used for API executions right now)
const workflowData = await loadDeployedWorkflowState(workflowId)
logger.info(`[${requestId}] Workflow loaded successfully: ${workflowId}`)
const { blocks, edges, loops, parallels } = normalizedData
const { blocks, edges, loops, parallels } = workflowData
// Merge subblock states (server-safe version doesn't need workflowId)
const mergedStates = mergeSubblockState(blocks, {})