improvement(serializer): canonical subblock, serialization cleanups, schedules/webhooks are deployment version friendly (#2848)

* hide form deployment tab from docs

* progress

* fix resolution

* cleanup code

* fix positioning

* cleanup dead sockets adv mode ops

* address greptile comments

* fix tests plus more simplification

* fix cleanup

* bring back advanced mode with specific definition

* revert feature flags

* improvement(subblock): ui

* resolver change to make all var references optional chaining

* fix(webhooks/schedules): deployment version friendly

* fix tests

* fix credential sets with new lifecycle

* prep merge

* add back migration

* fix display check for adv fields

* fix trigger vs block scoping

---------

Co-authored-by: Emir Karabeg <emirkarabeg@berkeley.edu>
This commit is contained in:
Vikhyath Mondreti
2026-01-16 15:23:43 -08:00
committed by GitHub
parent ce3ddb6ba0
commit 78e4ca9d45
70 changed files with 12806 additions and 1011 deletions

View File

@@ -5,7 +5,6 @@ import { Cron } from 'croner'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import type { ZodRecord, ZodString } from 'zod'
import { decryptSecret } from '@/lib/core/security/encryption'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
@@ -22,12 +21,9 @@ import {
getScheduleTimeValues,
getSubBlockValue,
} from '@/lib/workflows/schedules/utils'
import { REFERENCE } from '@/executor/constants'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { createEnvVarPattern } from '@/executor/utils/reference-validation'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
const logger = createLogger('TriggerScheduleExecution')
@@ -119,68 +115,6 @@ async function determineNextRunAfterError(
return new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
async function ensureBlockVariablesResolvable(
blocks: Record<string, BlockState>,
variables: Record<string, string>,
requestId: string
) {
await Promise.all(
Object.values(blocks).map(async (block) => {
const subBlocks = block.subBlocks ?? {}
await Promise.all(
Object.values(subBlocks).map(async (subBlock) => {
const value = subBlock.value
if (
typeof value !== 'string' ||
!value.includes(REFERENCE.ENV_VAR_START) ||
!value.includes(REFERENCE.ENV_VAR_END)
) {
return
}
const envVarPattern = createEnvVarPattern()
const matches = value.match(envVarPattern)
if (!matches) {
return
}
for (const match of matches) {
const varName = match.slice(
REFERENCE.ENV_VAR_START.length,
-REFERENCE.ENV_VAR_END.length
)
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
}
try {
await decryptSecret(encryptedValue)
} catch (error) {
logger.error(`[${requestId}] Error decrypting value for variable "${varName}"`, error)
const message = error instanceof Error ? error.message : 'Unknown error'
throw new Error(`Failed to decrypt environment variable "${varName}": ${message}`)
}
}
})
)
})
)
}
async function ensureEnvVarsDecryptable(variables: Record<string, string>, requestId: string) {
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
await decryptSecret(encryptedValue)
} catch (error) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}"`, error)
const message = error instanceof Error ? error.message : 'Unknown error'
throw new Error(`Failed to decrypt environment variable "${key}": ${message}`)
}
}
}
async function runWorkflowExecution({
payload,
workflowRecord,
@@ -217,8 +151,6 @@ async function runWorkflowExecution({
}
}
const mergedStates = mergeSubblockState(blocks)
const workspaceId = workflowRecord.workspaceId
if (!workspaceId) {
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
@@ -236,9 +168,6 @@ async function runWorkflowExecution({
...workspaceEncrypted,
})
await ensureBlockVariablesResolvable(mergedStates, variables, requestId)
await ensureEnvVarsDecryptable(variables, requestId)
const input = {
_context: {
workflowId: payload.workflowId,
@@ -348,6 +277,7 @@ export type ScheduleExecutionPayload = {
failedCount?: number
now: string
scheduledFor?: string
preflighted?: boolean
}
function calculateNextRunTime(
@@ -407,6 +337,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
checkRateLimit: true,
checkDeployment: true,
loggingSession,
preflightEnvVars: !payload.preflighted,
})
if (!preprocessResult.success) {

View File

@@ -12,16 +12,11 @@ import { WebhookAttachmentProcessor } from '@/lib/webhooks/attachment-processor'
import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webhooks/utils.server'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import {
loadDeployedWorkflowState,
loadWorkflowFromNormalizedTables,
} from '@/lib/workflows/persistence/utils'
import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import { getTrigger, isTriggerValid } from '@/triggers'
const logger = createLogger('TriggerWebhookExecution')
@@ -92,7 +87,6 @@ export type WebhookExecutionPayload = {
headers: Record<string, string>
path: string
blockId?: string
executionTarget?: 'deployed' | 'live'
credentialId?: string
credentialAccountUserId?: string
}
@@ -143,20 +137,16 @@ async function executeWebhookJobInternal(
let deploymentVersionId: string | undefined
try {
const useDraftState = payload.executionTarget === 'live'
const workflowData = useDraftState
? await loadWorkflowFromNormalizedTables(payload.workflowId)
: await loadDeployedWorkflowState(payload.workflowId)
const workflowData = await loadDeployedWorkflowState(payload.workflowId)
if (!workflowData) {
throw new Error(
`Workflow state not found. The workflow may not be ${useDraftState ? 'saved' : 'deployed'} or the deployment data may be corrupted.`
'Workflow state not found. The workflow may not be deployed or the deployment data may be corrupted.'
)
}
const { blocks, edges, loops, parallels } = workflowData
// Only deployed executions have a deployment version ID
deploymentVersionId =
!useDraftState && 'deploymentVersionId' in workflowData
'deploymentVersionId' in workflowData
? (workflowData.deploymentVersionId as string)
: undefined
@@ -171,19 +161,6 @@ async function executeWebhookJobInternal(
}
const workflowVariables = (wfRows[0]?.variables as Record<string, any>) || {}
// Merge subblock states (matching workflow-execution pattern)
const mergedStates = mergeSubblockState(blocks)
// Create serialized workflow
const serializer = new Serializer()
const serializedWorkflow = serializer.serializeWorkflow(
mergedStates,
edges,
loops || {},
parallels || {},
true // Enable validation during execution
)
// Handle special Airtable case
if (payload.provider === 'airtable') {
logger.info(`[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads`)
@@ -318,7 +295,6 @@ async function executeWebhookJobInternal(
variables: {},
triggerData: {
isTest: false,
executionTarget: payload.executionTarget || 'deployed',
},
deploymentVersionId,
})
@@ -376,7 +352,6 @@ async function executeWebhookJobInternal(
variables: {},
triggerData: {
isTest: false,
executionTarget: payload.executionTarget || 'deployed',
},
deploymentVersionId,
})
@@ -595,7 +570,6 @@ async function executeWebhookJobInternal(
variables: {},
triggerData: {
isTest: false,
executionTarget: payload.executionTarget || 'deployed',
},
deploymentVersionId,
})

View File

@@ -20,6 +20,7 @@ export type WorkflowExecutionPayload = {
input?: any
triggerType?: CoreTriggerType
metadata?: Record<string, any>
preflighted?: boolean
}
/**
@@ -51,6 +52,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
checkRateLimit: true,
checkDeployment: true,
loggingSession: loggingSession,
preflightEnvVars: !payload.preflighted,
})
if (!preprocessResult.success) {