mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-23 22:08:09 -05:00
fix(envvars): resolution standardized (#2957)
* fix(envvars): resolution standardized * remove comments * address bugbot * fix highlighting for env vars * remove comments * address greptile * address bugbot
This commit is contained in:
committed by
GitHub
parent
428781ce7d
commit
b913cff46e
@@ -104,17 +104,11 @@ export async function POST(req: NextRequest) {
|
||||
})
|
||||
|
||||
// Build execution params starting with LLM-provided arguments
|
||||
// Resolve all {{ENV_VAR}} references in the arguments
|
||||
// Resolve all {{ENV_VAR}} references in the arguments (deep for nested objects)
|
||||
const executionParams: Record<string, any> = resolveEnvVarReferences(
|
||||
toolArgs,
|
||||
decryptedEnvVars,
|
||||
{
|
||||
resolveExactMatch: true,
|
||||
allowEmbedded: true,
|
||||
trimKeys: true,
|
||||
onMissing: 'keep',
|
||||
deep: true,
|
||||
}
|
||||
{ deep: true }
|
||||
) as Record<string, any>
|
||||
|
||||
logger.info(`[${tracker.requestId}] Resolved env var references in arguments`, {
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { NextRequest } from 'next/server'
|
||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
import { McpClient } from '@/lib/mcp/client'
|
||||
import { getParsedBody, withMcpAuth } from '@/lib/mcp/middleware'
|
||||
import type { McpServerConfig, McpTransport } from '@/lib/mcp/types'
|
||||
import { resolveMcpConfigEnvVars } from '@/lib/mcp/resolve-config'
|
||||
import type { McpTransport } from '@/lib/mcp/types'
|
||||
import { createMcpErrorResponse, createMcpSuccessResponse } from '@/lib/mcp/utils'
|
||||
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||
|
||||
const logger = createLogger('McpServerTestAPI')
|
||||
|
||||
@@ -19,30 +18,6 @@ function isUrlBasedTransport(transport: McpTransport): boolean {
|
||||
return transport === 'streamable-http'
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve environment variables in strings
|
||||
*/
|
||||
function resolveEnvVars(value: string, envVars: Record<string, string>): string {
|
||||
const missingVars: string[] = []
|
||||
const resolvedValue = resolveEnvVarReferences(value, envVars, {
|
||||
allowEmbedded: true,
|
||||
resolveExactMatch: true,
|
||||
trimKeys: true,
|
||||
onMissing: 'keep',
|
||||
deep: false,
|
||||
missingKeys: missingVars,
|
||||
}) as string
|
||||
|
||||
if (missingVars.length > 0) {
|
||||
const uniqueMissing = Array.from(new Set(missingVars))
|
||||
uniqueMissing.forEach((envKey) => {
|
||||
logger.warn(`Environment variable "${envKey}" not found in MCP server test`)
|
||||
})
|
||||
}
|
||||
|
||||
return resolvedValue
|
||||
}
|
||||
|
||||
interface TestConnectionRequest {
|
||||
name: string
|
||||
transport: McpTransport
|
||||
@@ -96,39 +71,30 @@ export const POST = withMcpAuth('write')(
|
||||
)
|
||||
}
|
||||
|
||||
let resolvedUrl = body.url
|
||||
let resolvedHeaders = body.headers || {}
|
||||
|
||||
try {
|
||||
const envVars = await getEffectiveDecryptedEnv(userId, workspaceId)
|
||||
|
||||
if (resolvedUrl) {
|
||||
resolvedUrl = resolveEnvVars(resolvedUrl, envVars)
|
||||
}
|
||||
|
||||
const resolvedHeadersObj: Record<string, string> = {}
|
||||
for (const [key, value] of Object.entries(resolvedHeaders)) {
|
||||
resolvedHeadersObj[key] = resolveEnvVars(value, envVars)
|
||||
}
|
||||
resolvedHeaders = resolvedHeadersObj
|
||||
} catch (envError) {
|
||||
logger.warn(
|
||||
`[${requestId}] Failed to resolve environment variables, using raw values:`,
|
||||
envError
|
||||
)
|
||||
}
|
||||
|
||||
const testConfig: McpServerConfig = {
|
||||
// Build initial config for resolution
|
||||
const initialConfig = {
|
||||
id: `test-${requestId}`,
|
||||
name: body.name,
|
||||
transport: body.transport,
|
||||
url: resolvedUrl,
|
||||
headers: resolvedHeaders,
|
||||
url: body.url,
|
||||
headers: body.headers || {},
|
||||
timeout: body.timeout || 10000,
|
||||
retries: 1, // Only one retry for tests
|
||||
enabled: true,
|
||||
}
|
||||
|
||||
// Resolve env vars using shared utility (non-strict mode for testing)
|
||||
const { config: testConfig, missingVars } = await resolveMcpConfigEnvVars(
|
||||
initialConfig,
|
||||
userId,
|
||||
workspaceId,
|
||||
{ strict: false }
|
||||
)
|
||||
|
||||
if (missingVars.length > 0) {
|
||||
logger.warn(`[${requestId}] Some environment variables not found:`, { missingVars })
|
||||
}
|
||||
|
||||
const testSecurityPolicy = {
|
||||
requireConsent: false,
|
||||
auditLevel: 'none' as const,
|
||||
|
||||
@@ -7,6 +7,7 @@ import { getSession } from '@/lib/auth'
|
||||
import { validateInteger } from '@/lib/core/security/input-validation'
|
||||
import { PlatformEvents } from '@/lib/core/telemetry'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver'
|
||||
import {
|
||||
cleanupExternalWebhook,
|
||||
createExternalWebhookSubscription,
|
||||
@@ -112,9 +113,9 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
|
||||
}
|
||||
}
|
||||
|
||||
const originalProviderConfig = providerConfig
|
||||
let resolvedProviderConfig = providerConfig
|
||||
if (providerConfig) {
|
||||
const { resolveEnvVarsInObject } = await import('@/lib/webhooks/env-resolver')
|
||||
const webhookDataForResolve = await db
|
||||
.select({
|
||||
workspaceId: workflow.workspaceId,
|
||||
@@ -230,19 +231,23 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
|
||||
hasFailedCountUpdate: failedCount !== undefined,
|
||||
})
|
||||
|
||||
// Merge providerConfig to preserve credential-related fields
|
||||
let finalProviderConfig = webhooks[0].webhook.providerConfig
|
||||
if (providerConfig !== undefined) {
|
||||
if (providerConfig !== undefined && originalProviderConfig) {
|
||||
const existingConfig = existingProviderConfig
|
||||
finalProviderConfig = {
|
||||
...nextProviderConfig,
|
||||
...originalProviderConfig,
|
||||
credentialId: existingConfig.credentialId,
|
||||
credentialSetId: existingConfig.credentialSetId,
|
||||
userId: existingConfig.userId,
|
||||
historyId: existingConfig.historyId,
|
||||
lastCheckedTimestamp: existingConfig.lastCheckedTimestamp,
|
||||
setupCompleted: existingConfig.setupCompleted,
|
||||
externalId: nextProviderConfig.externalId ?? existingConfig.externalId,
|
||||
externalId: existingConfig.externalId,
|
||||
}
|
||||
for (const [key, value] of Object.entries(nextProviderConfig)) {
|
||||
if (!(key in originalProviderConfig)) {
|
||||
;(finalProviderConfig as Record<string, unknown>)[key] = value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { PlatformEvents } from '@/lib/core/telemetry'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { resolveEnvVarsInObject } from '@/lib/webhooks/env-resolver'
|
||||
import { createExternalWebhookSubscription } from '@/lib/webhooks/provider-subscriptions'
|
||||
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
|
||||
|
||||
@@ -298,14 +299,10 @@ export async function POST(request: NextRequest) {
|
||||
}
|
||||
}
|
||||
|
||||
let savedWebhook: any = null // Variable to hold the result of save/update
|
||||
|
||||
// Use the original provider config - Gmail/Outlook configuration functions will inject userId automatically
|
||||
const finalProviderConfig = providerConfig || {}
|
||||
|
||||
const { resolveEnvVarsInObject } = await import('@/lib/webhooks/env-resolver')
|
||||
let savedWebhook: any = null
|
||||
const originalProviderConfig = providerConfig || {}
|
||||
let resolvedProviderConfig = await resolveEnvVarsInObject(
|
||||
finalProviderConfig,
|
||||
originalProviderConfig,
|
||||
userId,
|
||||
workflowRecord.workspaceId || undefined
|
||||
)
|
||||
@@ -469,6 +466,8 @@ export async function POST(request: NextRequest) {
|
||||
providerConfig: providerConfigOverride,
|
||||
})
|
||||
|
||||
const configToSave = { ...originalProviderConfig }
|
||||
|
||||
try {
|
||||
const result = await createExternalWebhookSubscription(
|
||||
request,
|
||||
@@ -477,7 +476,13 @@ export async function POST(request: NextRequest) {
|
||||
userId,
|
||||
requestId
|
||||
)
|
||||
resolvedProviderConfig = result.updatedProviderConfig as Record<string, unknown>
|
||||
const updatedConfig = result.updatedProviderConfig as Record<string, unknown>
|
||||
for (const [key, value] of Object.entries(updatedConfig)) {
|
||||
if (!(key in originalProviderConfig)) {
|
||||
configToSave[key] = value
|
||||
}
|
||||
}
|
||||
resolvedProviderConfig = updatedConfig
|
||||
externalSubscriptionCreated = result.externalSubscriptionCreated
|
||||
} catch (err) {
|
||||
logger.error(`[${requestId}] Error creating external webhook subscription`, err)
|
||||
@@ -490,25 +495,22 @@ export async function POST(request: NextRequest) {
|
||||
)
|
||||
}
|
||||
|
||||
// Now save to database (only if subscription succeeded or provider doesn't need external subscription)
|
||||
try {
|
||||
if (targetWebhookId) {
|
||||
logger.info(`[${requestId}] Updating existing webhook for path: ${finalPath}`, {
|
||||
webhookId: targetWebhookId,
|
||||
provider,
|
||||
hasCredentialId: !!(resolvedProviderConfig as any)?.credentialId,
|
||||
credentialId: (resolvedProviderConfig as any)?.credentialId,
|
||||
hasCredentialId: !!(configToSave as any)?.credentialId,
|
||||
credentialId: (configToSave as any)?.credentialId,
|
||||
})
|
||||
const updatedResult = await db
|
||||
.update(webhook)
|
||||
.set({
|
||||
blockId,
|
||||
provider,
|
||||
providerConfig: resolvedProviderConfig,
|
||||
providerConfig: configToSave,
|
||||
credentialSetId:
|
||||
((resolvedProviderConfig as Record<string, unknown>)?.credentialSetId as
|
||||
| string
|
||||
| null) || null,
|
||||
((configToSave as Record<string, unknown>)?.credentialSetId as string | null) || null,
|
||||
isActive: true,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
@@ -531,11 +533,9 @@ export async function POST(request: NextRequest) {
|
||||
blockId,
|
||||
path: finalPath,
|
||||
provider,
|
||||
providerConfig: resolvedProviderConfig,
|
||||
providerConfig: configToSave,
|
||||
credentialSetId:
|
||||
((resolvedProviderConfig as Record<string, unknown>)?.credentialSetId as
|
||||
| string
|
||||
| null) || null,
|
||||
((configToSave as Record<string, unknown>)?.credentialSetId as string | null) || null,
|
||||
isActive: true,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
@@ -549,7 +549,7 @@ export async function POST(request: NextRequest) {
|
||||
try {
|
||||
const { cleanupExternalWebhook } = await import('@/lib/webhooks/provider-subscriptions')
|
||||
await cleanupExternalWebhook(
|
||||
createTempWebhookData(resolvedProviderConfig),
|
||||
createTempWebhookData(configToSave),
|
||||
workflowRecord,
|
||||
requestId
|
||||
)
|
||||
|
||||
@@ -116,7 +116,6 @@ type AsyncExecutionParams = {
|
||||
userId: string
|
||||
input: any
|
||||
triggerType: CoreTriggerType
|
||||
preflighted?: boolean
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -139,7 +138,6 @@ async function handleAsyncExecution(params: AsyncExecutionParams): Promise<NextR
|
||||
userId,
|
||||
input,
|
||||
triggerType,
|
||||
preflighted: params.preflighted,
|
||||
}
|
||||
|
||||
try {
|
||||
@@ -276,7 +274,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
requestId
|
||||
)
|
||||
|
||||
const shouldPreflightEnvVars = isAsyncMode && isTriggerDevEnabled
|
||||
const preprocessResult = await preprocessExecution({
|
||||
workflowId,
|
||||
userId,
|
||||
@@ -285,9 +282,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
requestId,
|
||||
checkDeployment: !shouldUseDraftState,
|
||||
loggingSession,
|
||||
preflightEnvVars: shouldPreflightEnvVars,
|
||||
useDraftState: shouldUseDraftState,
|
||||
envUserId: isClientSession ? userId : undefined,
|
||||
})
|
||||
|
||||
if (!preprocessResult.success) {
|
||||
@@ -319,7 +314,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
userId: actorUserId,
|
||||
input,
|
||||
triggerType: loggingTriggerType,
|
||||
preflighted: shouldPreflightEnvVars,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ import type { GenerationType } from '@/blocks/types'
|
||||
import { normalizeName } from '@/executor/constants'
|
||||
import { createEnvVarPattern, createReferencePattern } from '@/executor/utils/reference-validation'
|
||||
import { useTagSelection } from '@/hooks/kb/use-tag-selection'
|
||||
import { createShouldHighlightEnvVar, useAvailableEnvVarKeys } from '@/hooks/use-available-env-vars'
|
||||
|
||||
const logger = createLogger('Code')
|
||||
|
||||
@@ -88,21 +89,27 @@ interface CodePlaceholder {
|
||||
/**
|
||||
* Creates a syntax highlighter function with custom reference and environment variable highlighting.
|
||||
* @param effectiveLanguage - The language to use for syntax highlighting
|
||||
* @param shouldHighlightReference - Function to determine if a reference should be highlighted
|
||||
* @param shouldHighlightReference - Function to determine if a block reference should be highlighted
|
||||
* @param shouldHighlightEnvVar - Function to determine if an env var should be highlighted
|
||||
* @returns A function that highlights code with syntax and custom highlights
|
||||
*/
|
||||
const createHighlightFunction = (
|
||||
effectiveLanguage: 'javascript' | 'python' | 'json',
|
||||
shouldHighlightReference: (part: string) => boolean
|
||||
shouldHighlightReference: (part: string) => boolean,
|
||||
shouldHighlightEnvVar: (varName: string) => boolean
|
||||
) => {
|
||||
return (codeToHighlight: string): string => {
|
||||
const placeholders: CodePlaceholder[] = []
|
||||
let processedCode = codeToHighlight
|
||||
|
||||
processedCode = processedCode.replace(createEnvVarPattern(), (match) => {
|
||||
const placeholder = `__ENV_VAR_${placeholders.length}__`
|
||||
placeholders.push({ placeholder, original: match, type: 'env' })
|
||||
return placeholder
|
||||
const varName = match.slice(2, -2).trim()
|
||||
if (shouldHighlightEnvVar(varName)) {
|
||||
const placeholder = `__ENV_VAR_${placeholders.length}__`
|
||||
placeholders.push({ placeholder, original: match, type: 'env' })
|
||||
return placeholder
|
||||
}
|
||||
return match
|
||||
})
|
||||
|
||||
processedCode = processedCode.replace(createReferencePattern(), (match) => {
|
||||
@@ -212,6 +219,7 @@ export const Code = memo(function Code({
|
||||
const accessiblePrefixes = useAccessibleReferencePrefixes(blockId)
|
||||
const emitTagSelection = useTagSelection(blockId, subBlockId)
|
||||
const [languageValue] = useSubBlockValue<string>(blockId, 'language')
|
||||
const availableEnvVars = useAvailableEnvVarKeys(workspaceId)
|
||||
|
||||
const effectiveLanguage = (languageValue as 'javascript' | 'python' | 'json') || language
|
||||
|
||||
@@ -603,9 +611,15 @@ export const Code = memo(function Code({
|
||||
[generateCodeStream, isPromptVisible, isAiStreaming]
|
||||
)
|
||||
|
||||
const shouldHighlightEnvVar = useMemo(
|
||||
() => createShouldHighlightEnvVar(availableEnvVars),
|
||||
[availableEnvVars]
|
||||
)
|
||||
|
||||
const highlightCode = useMemo(
|
||||
() => createHighlightFunction(effectiveLanguage, shouldHighlightReference),
|
||||
[effectiveLanguage, shouldHighlightReference]
|
||||
() =>
|
||||
createHighlightFunction(effectiveLanguage, shouldHighlightReference, shouldHighlightEnvVar),
|
||||
[effectiveLanguage, shouldHighlightReference, shouldHighlightEnvVar]
|
||||
)
|
||||
|
||||
const handleValueChange = useCallback(
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import type { ReactElement } from 'react'
|
||||
import { useEffect, useRef, useState } from 'react'
|
||||
import { useEffect, useMemo, useRef, useState } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { ChevronDown, ChevronsUpDown, ChevronUp, Plus } from 'lucide-react'
|
||||
import { useParams } from 'next/navigation'
|
||||
@@ -35,6 +35,7 @@ import { useAccessibleReferencePrefixes } from '@/app/workspace/[workspaceId]/w/
|
||||
import { normalizeName } from '@/executor/constants'
|
||||
import { createEnvVarPattern, createReferencePattern } from '@/executor/utils/reference-validation'
|
||||
import { useTagSelection } from '@/hooks/kb/use-tag-selection'
|
||||
import { createShouldHighlightEnvVar, useAvailableEnvVarKeys } from '@/hooks/use-available-env-vars'
|
||||
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
|
||||
|
||||
const logger = createLogger('ConditionInput')
|
||||
@@ -123,6 +124,11 @@ export function ConditionInput({
|
||||
|
||||
const emitTagSelection = useTagSelection(blockId, subBlockId)
|
||||
const accessiblePrefixes = useAccessibleReferencePrefixes(blockId)
|
||||
const availableEnvVars = useAvailableEnvVarKeys(workspaceId)
|
||||
const shouldHighlightEnvVar = useMemo(
|
||||
() => createShouldHighlightEnvVar(availableEnvVars),
|
||||
[availableEnvVars]
|
||||
)
|
||||
|
||||
const containerRef = useRef<HTMLDivElement>(null)
|
||||
const inputRefs = useRef<Map<string, HTMLTextAreaElement>>(new Map())
|
||||
@@ -1136,14 +1142,18 @@ export function ConditionInput({
|
||||
let processedCode = codeToHighlight
|
||||
|
||||
processedCode = processedCode.replace(createEnvVarPattern(), (match) => {
|
||||
const placeholder = `__ENV_VAR_${placeholders.length}__`
|
||||
placeholders.push({
|
||||
placeholder,
|
||||
original: match,
|
||||
type: 'env',
|
||||
shouldHighlight: true,
|
||||
})
|
||||
return placeholder
|
||||
const varName = match.slice(2, -2).trim()
|
||||
if (shouldHighlightEnvVar(varName)) {
|
||||
const placeholder = `__ENV_VAR_${placeholders.length}__`
|
||||
placeholders.push({
|
||||
placeholder,
|
||||
original: match,
|
||||
type: 'env',
|
||||
shouldHighlight: true,
|
||||
})
|
||||
return placeholder
|
||||
}
|
||||
return match
|
||||
})
|
||||
|
||||
processedCode = processedCode.replace(
|
||||
|
||||
@@ -7,6 +7,7 @@ import { createCombinedPattern } from '@/executor/utils/reference-validation'
|
||||
|
||||
export interface HighlightContext {
|
||||
accessiblePrefixes?: Set<string>
|
||||
availableEnvVars?: Set<string>
|
||||
highlightAll?: boolean
|
||||
}
|
||||
|
||||
@@ -43,9 +44,17 @@ export function formatDisplayText(text: string, context?: HighlightContext): Rea
|
||||
return false
|
||||
}
|
||||
|
||||
const shouldHighlightEnvVar = (varName: string): boolean => {
|
||||
if (context?.highlightAll) {
|
||||
return true
|
||||
}
|
||||
if (context?.availableEnvVars === undefined) {
|
||||
return true
|
||||
}
|
||||
return context.availableEnvVars.has(varName)
|
||||
}
|
||||
|
||||
const nodes: ReactNode[] = []
|
||||
// Match variable references without allowing nested brackets to prevent matching across references
|
||||
// e.g., "<3. text <real.ref>" should match "<3" and "<real.ref>", not the whole string
|
||||
const regex = createCombinedPattern()
|
||||
let lastIndex = 0
|
||||
let key = 0
|
||||
@@ -65,11 +74,16 @@ export function formatDisplayText(text: string, context?: HighlightContext): Rea
|
||||
}
|
||||
|
||||
if (matchText.startsWith(REFERENCE.ENV_VAR_START)) {
|
||||
nodes.push(
|
||||
<span key={key++} className='text-[var(--brand-secondary)]'>
|
||||
{matchText}
|
||||
</span>
|
||||
)
|
||||
const varName = matchText.slice(2, -2).trim()
|
||||
if (shouldHighlightEnvVar(varName)) {
|
||||
nodes.push(
|
||||
<span key={key++} className='text-[var(--brand-secondary)]'>
|
||||
{matchText}
|
||||
</span>
|
||||
)
|
||||
} else {
|
||||
nodes.push(<span key={key++}>{matchText}</span>)
|
||||
}
|
||||
} else {
|
||||
const split = splitReferenceSegment(matchText)
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { useCallback, useRef, useState } from 'react'
|
||||
import { useCallback, useMemo, useRef, useState } from 'react'
|
||||
import { useParams } from 'next/navigation'
|
||||
import { highlight, languages } from '@/components/emcn'
|
||||
import {
|
||||
isLikelyReferenceSegment,
|
||||
@@ -9,6 +10,7 @@ import { checkTagTrigger } from '@/app/workspace/[workspaceId]/w/[workflowId]/co
|
||||
import { useAccessibleReferencePrefixes } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-accessible-reference-prefixes'
|
||||
import { normalizeName, REFERENCE } from '@/executor/constants'
|
||||
import { createEnvVarPattern, createReferencePattern } from '@/executor/utils/reference-validation'
|
||||
import { createShouldHighlightEnvVar, useAvailableEnvVarKeys } from '@/hooks/use-available-env-vars'
|
||||
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
|
||||
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
|
||||
import type { BlockState } from '@/stores/workflows/workflow/types'
|
||||
@@ -53,6 +55,9 @@ const SUBFLOW_CONFIG = {
|
||||
* @returns Subflow editor state and handlers
|
||||
*/
|
||||
export function useSubflowEditor(currentBlock: BlockState | null, currentBlockId: string | null) {
|
||||
const params = useParams()
|
||||
const workspaceId = params.workspaceId as string
|
||||
|
||||
const textareaRef = useRef<HTMLTextAreaElement | null>(null)
|
||||
const editorContainerRef = useRef<HTMLDivElement>(null)
|
||||
|
||||
@@ -81,6 +86,13 @@ export function useSubflowEditor(currentBlock: BlockState | null, currentBlockId
|
||||
// Get accessible prefixes for tag dropdown
|
||||
const accessiblePrefixes = useAccessibleReferencePrefixes(currentBlockId || '')
|
||||
|
||||
// Get available env vars for highlighting validation
|
||||
const availableEnvVars = useAvailableEnvVarKeys(workspaceId)
|
||||
const shouldHighlightEnvVar = useMemo(
|
||||
() => createShouldHighlightEnvVar(availableEnvVars),
|
||||
[availableEnvVars]
|
||||
)
|
||||
|
||||
// Collaborative actions
|
||||
const {
|
||||
collaborativeUpdateLoopType,
|
||||
@@ -140,9 +152,13 @@ export function useSubflowEditor(currentBlock: BlockState | null, currentBlockId
|
||||
let processedCode = code
|
||||
|
||||
processedCode = processedCode.replace(createEnvVarPattern(), (match) => {
|
||||
const placeholder = `__ENV_VAR_${placeholders.length}__`
|
||||
placeholders.push({ placeholder, original: match, type: 'env' })
|
||||
return placeholder
|
||||
const varName = match.slice(2, -2).trim()
|
||||
if (shouldHighlightEnvVar(varName)) {
|
||||
const placeholder = `__ENV_VAR_${placeholders.length}__`
|
||||
placeholders.push({ placeholder, original: match, type: 'env' })
|
||||
return placeholder
|
||||
}
|
||||
return match
|
||||
})
|
||||
|
||||
// Use [^<>]+ to prevent matching across nested brackets (e.g., "<3 <real.ref>" should match separately)
|
||||
@@ -174,7 +190,7 @@ export function useSubflowEditor(currentBlock: BlockState | null, currentBlockId
|
||||
|
||||
return highlightedCode
|
||||
},
|
||||
[shouldHighlightReference]
|
||||
[shouldHighlightReference, shouldHighlightEnvVar]
|
||||
)
|
||||
|
||||
/**
|
||||
|
||||
@@ -7,7 +7,7 @@ import { useStoreWithEqualityFn } from 'zustand/traditional'
|
||||
import { Badge, Tooltip } from '@/components/emcn'
|
||||
import { cn } from '@/lib/core/utils/cn'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { createMcpToolId } from '@/lib/mcp/utils'
|
||||
import { createMcpToolId } from '@/lib/mcp/shared'
|
||||
import { getProviderIdFromServiceId } from '@/lib/oauth'
|
||||
import { BLOCK_DIMENSIONS, HANDLE_POSITIONS } from '@/lib/workflows/blocks/block-dimensions'
|
||||
import {
|
||||
|
||||
@@ -39,6 +39,7 @@ import {
|
||||
useRefreshMcpServer,
|
||||
useStoredMcpTools,
|
||||
} from '@/hooks/queries/mcp'
|
||||
import { useAvailableEnvVarKeys } from '@/hooks/use-available-env-vars'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
|
||||
import { FormField, McpServerSkeleton } from './components'
|
||||
@@ -157,6 +158,7 @@ interface FormattedInputProps {
|
||||
scrollLeft: number
|
||||
showEnvVars: boolean
|
||||
envVarProps: EnvVarDropdownConfig
|
||||
availableEnvVars?: Set<string>
|
||||
className?: string
|
||||
onChange: (e: React.ChangeEvent<HTMLInputElement>) => void
|
||||
onScroll: (scrollLeft: number) => void
|
||||
@@ -169,6 +171,7 @@ function FormattedInput({
|
||||
scrollLeft,
|
||||
showEnvVars,
|
||||
envVarProps,
|
||||
availableEnvVars,
|
||||
className,
|
||||
onChange,
|
||||
onScroll,
|
||||
@@ -190,7 +193,7 @@ function FormattedInput({
|
||||
/>
|
||||
<div className='pointer-events-none absolute inset-0 flex items-center overflow-hidden px-[8px] py-[6px] font-medium font-sans text-sm'>
|
||||
<div className='whitespace-nowrap' style={{ transform: `translateX(-${scrollLeft}px)` }}>
|
||||
{formatDisplayText(value)}
|
||||
{formatDisplayText(value, { availableEnvVars })}
|
||||
</div>
|
||||
</div>
|
||||
{showEnvVars && (
|
||||
@@ -221,6 +224,7 @@ interface HeaderRowProps {
|
||||
envSearchTerm: string
|
||||
cursorPosition: number
|
||||
workspaceId: string
|
||||
availableEnvVars?: Set<string>
|
||||
onInputChange: (field: InputFieldType, value: string, index?: number) => void
|
||||
onHeaderScroll: (key: string, scrollLeft: number) => void
|
||||
onEnvVarSelect: (value: string) => void
|
||||
@@ -238,6 +242,7 @@ function HeaderRow({
|
||||
envSearchTerm,
|
||||
cursorPosition,
|
||||
workspaceId,
|
||||
availableEnvVars,
|
||||
onInputChange,
|
||||
onHeaderScroll,
|
||||
onEnvVarSelect,
|
||||
@@ -265,6 +270,7 @@ function HeaderRow({
|
||||
scrollLeft={headerScrollLeft[`key-${index}`] || 0}
|
||||
showEnvVars={isKeyActive}
|
||||
envVarProps={envVarProps}
|
||||
availableEnvVars={availableEnvVars}
|
||||
className='flex-1'
|
||||
onChange={(e) => onInputChange('header-key', e.target.value, index)}
|
||||
onScroll={(scrollLeft) => onHeaderScroll(`key-${index}`, scrollLeft)}
|
||||
@@ -276,6 +282,7 @@ function HeaderRow({
|
||||
scrollLeft={headerScrollLeft[`value-${index}`] || 0}
|
||||
showEnvVars={isValueActive}
|
||||
envVarProps={envVarProps}
|
||||
availableEnvVars={availableEnvVars}
|
||||
className='flex-1'
|
||||
onChange={(e) => onInputChange('header-value', e.target.value, index)}
|
||||
onScroll={(scrollLeft) => onHeaderScroll(`value-${index}`, scrollLeft)}
|
||||
@@ -371,6 +378,7 @@ export function MCP({ initialServerId }: MCPProps) {
|
||||
const deleteServerMutation = useDeleteMcpServer()
|
||||
const refreshServerMutation = useRefreshMcpServer()
|
||||
const { testResult, isTestingConnection, testConnection, clearTestResult } = useMcpServerTest()
|
||||
const availableEnvVars = useAvailableEnvVarKeys(workspaceId)
|
||||
|
||||
const urlInputRef = useRef<HTMLInputElement>(null)
|
||||
|
||||
@@ -1061,6 +1069,7 @@ export function MCP({ initialServerId }: MCPProps) {
|
||||
onSelect: handleEnvVarSelect,
|
||||
onClose: resetEnvVarState,
|
||||
}}
|
||||
availableEnvVars={availableEnvVars}
|
||||
onChange={(e) => handleInputChange('url', e.target.value)}
|
||||
onScroll={(scrollLeft) => handleUrlScroll(scrollLeft)}
|
||||
/>
|
||||
@@ -1094,6 +1103,7 @@ export function MCP({ initialServerId }: MCPProps) {
|
||||
envSearchTerm={envSearchTerm}
|
||||
cursorPosition={cursorPosition}
|
||||
workspaceId={workspaceId}
|
||||
availableEnvVars={availableEnvVars}
|
||||
onInputChange={handleInputChange}
|
||||
onHeaderScroll={handleHeaderScroll}
|
||||
onEnvVarSelect={handleEnvVarSelect}
|
||||
|
||||
@@ -4,8 +4,6 @@ import { task } from '@trigger.dev/sdk'
|
||||
import { Cron } from 'croner'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import type { ZodRecord, ZodString } from 'zod'
|
||||
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
|
||||
import { preprocessExecution } from '@/lib/execution/preprocessing'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
|
||||
@@ -122,7 +120,6 @@ async function runWorkflowExecution({
|
||||
loggingSession,
|
||||
requestId,
|
||||
executionId,
|
||||
EnvVarsSchema,
|
||||
}: {
|
||||
payload: ScheduleExecutionPayload
|
||||
workflowRecord: WorkflowRecord
|
||||
@@ -130,7 +127,6 @@ async function runWorkflowExecution({
|
||||
loggingSession: LoggingSession
|
||||
requestId: string
|
||||
executionId: string
|
||||
EnvVarsSchema: ZodRecord<ZodString, ZodString>
|
||||
}): Promise<RunWorkflowResult> {
|
||||
try {
|
||||
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
|
||||
@@ -156,31 +152,12 @@ async function runWorkflowExecution({
|
||||
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
|
||||
}
|
||||
|
||||
const personalEnvUserId = workflowRecord.userId
|
||||
|
||||
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
|
||||
personalEnvUserId,
|
||||
workspaceId
|
||||
)
|
||||
|
||||
const variables = EnvVarsSchema.parse({
|
||||
...personalEncrypted,
|
||||
...workspaceEncrypted,
|
||||
})
|
||||
|
||||
const input = {
|
||||
_context: {
|
||||
workflowId: payload.workflowId,
|
||||
},
|
||||
}
|
||||
|
||||
await loggingSession.safeStart({
|
||||
userId: actorUserId,
|
||||
workspaceId,
|
||||
variables: variables || {},
|
||||
deploymentVersionId,
|
||||
})
|
||||
|
||||
const metadata: ExecutionMetadata = {
|
||||
requestId,
|
||||
executionId,
|
||||
@@ -279,7 +256,6 @@ export type ScheduleExecutionPayload = {
|
||||
failedCount?: number
|
||||
now: string
|
||||
scheduledFor?: string
|
||||
preflighted?: boolean
|
||||
}
|
||||
|
||||
function calculateNextRunTime(
|
||||
@@ -319,9 +295,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
|
||||
executionId,
|
||||
})
|
||||
|
||||
const zod = await import('zod')
|
||||
const EnvVarsSchema = zod.z.record(zod.z.string())
|
||||
|
||||
try {
|
||||
const loggingSession = new LoggingSession(
|
||||
payload.workflowId,
|
||||
@@ -339,7 +312,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
|
||||
checkRateLimit: true,
|
||||
checkDeployment: true,
|
||||
loggingSession,
|
||||
preflightEnvVars: !payload.preflighted,
|
||||
})
|
||||
|
||||
if (!preprocessResult.success) {
|
||||
@@ -482,7 +454,6 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
|
||||
loggingSession,
|
||||
requestId,
|
||||
executionId,
|
||||
EnvVarsSchema,
|
||||
})
|
||||
|
||||
if (executionResult.status === 'skip') {
|
||||
|
||||
@@ -20,7 +20,6 @@ export type WorkflowExecutionPayload = {
|
||||
input?: any
|
||||
triggerType?: CoreTriggerType
|
||||
metadata?: Record<string, any>
|
||||
preflighted?: boolean
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -52,7 +51,6 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
|
||||
checkRateLimit: true,
|
||||
checkDeployment: true,
|
||||
loggingSession: loggingSession,
|
||||
preflightEnvVars: !payload.preflighted,
|
||||
})
|
||||
|
||||
if (!preprocessResult.success) {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { McpIcon } from '@/components/icons'
|
||||
import { createMcpToolId } from '@/lib/mcp/utils'
|
||||
import { createMcpToolId } from '@/lib/mcp/shared'
|
||||
import type { BlockConfig } from '@/blocks/types'
|
||||
import type { ToolResponse } from '@/tools/types'
|
||||
|
||||
|
||||
@@ -1,7 +1,4 @@
|
||||
import { db } from '@sim/db'
|
||||
import { mcpServers } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq, inArray, isNull } from 'drizzle-orm'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import {
|
||||
containsUserFileWithMetadata,
|
||||
@@ -86,10 +83,6 @@ export class BlockExecutor {
|
||||
|
||||
resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block)
|
||||
|
||||
if (block.metadata?.id === BlockType.AGENT && resolvedInputs.tools) {
|
||||
resolvedInputs = await this.filterUnavailableMcpToolsForLog(ctx, resolvedInputs)
|
||||
}
|
||||
|
||||
if (blockLog) {
|
||||
blockLog.input = resolvedInputs
|
||||
}
|
||||
@@ -437,60 +430,6 @@ export class BlockExecutor {
|
||||
return undefined
|
||||
}
|
||||
|
||||
/**
|
||||
* Filters out unavailable MCP tools from agent inputs for logging.
|
||||
* Only includes tools from servers with 'connected' status.
|
||||
*/
|
||||
private async filterUnavailableMcpToolsForLog(
|
||||
ctx: ExecutionContext,
|
||||
inputs: Record<string, any>
|
||||
): Promise<Record<string, any>> {
|
||||
const tools = inputs.tools
|
||||
if (!Array.isArray(tools) || tools.length === 0) return inputs
|
||||
|
||||
const mcpTools = tools.filter((t: any) => t.type === 'mcp')
|
||||
if (mcpTools.length === 0) return inputs
|
||||
|
||||
const serverIds = [
|
||||
...new Set(mcpTools.map((t: any) => t.params?.serverId).filter(Boolean)),
|
||||
] as string[]
|
||||
if (serverIds.length === 0) return inputs
|
||||
|
||||
const availableServerIds = new Set<string>()
|
||||
if (ctx.workspaceId && serverIds.length > 0) {
|
||||
try {
|
||||
const servers = await db
|
||||
.select({ id: mcpServers.id, connectionStatus: mcpServers.connectionStatus })
|
||||
.from(mcpServers)
|
||||
.where(
|
||||
and(
|
||||
eq(mcpServers.workspaceId, ctx.workspaceId),
|
||||
inArray(mcpServers.id, serverIds),
|
||||
isNull(mcpServers.deletedAt)
|
||||
)
|
||||
)
|
||||
|
||||
for (const server of servers) {
|
||||
if (server.connectionStatus === 'connected') {
|
||||
availableServerIds.add(server.id)
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn('Failed to check MCP server availability for logging:', error)
|
||||
return inputs
|
||||
}
|
||||
}
|
||||
|
||||
const filteredTools = tools.filter((tool: any) => {
|
||||
if (tool.type !== 'mcp') return true
|
||||
const serverId = tool.params?.serverId
|
||||
if (!serverId) return false
|
||||
return availableServerIds.has(serverId)
|
||||
})
|
||||
|
||||
return { ...inputs, tools: filteredTools }
|
||||
}
|
||||
|
||||
private preparePauseResumeSelfReference(
|
||||
ctx: ExecutionContext,
|
||||
node: DAGNode,
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { StartBlockPath } from '@/lib/workflows/triggers/triggers'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import { DAGBuilder } from '@/executor/dag/builder'
|
||||
import { BlockExecutor } from '@/executor/execution/block-executor'
|
||||
import { EdgeManager } from '@/executor/execution/edge-manager'
|
||||
@@ -24,7 +23,6 @@ const logger = createLogger('DAGExecutor')
|
||||
|
||||
export interface DAGExecutorOptions {
|
||||
workflow: SerializedWorkflow
|
||||
currentBlockStates?: Record<string, BlockOutput>
|
||||
envVarValues?: Record<string, string>
|
||||
workflowInput?: WorkflowInput
|
||||
workflowVariables?: Record<string, unknown>
|
||||
|
||||
@@ -28,6 +28,23 @@ export interface EnvVarResolveOptions {
|
||||
missingKeys?: string[]
|
||||
}
|
||||
|
||||
/**
|
||||
* Standard defaults for env var resolution across all contexts.
|
||||
*
|
||||
* - `resolveExactMatch: true` - Resolves `{{VAR}}` when it's the entire value
|
||||
* - `allowEmbedded: true` - Resolves `{{VAR}}` embedded in strings like `https://{{HOST}}/api`
|
||||
* - `trimKeys: true` - `{{ VAR }}` works the same as `{{VAR}}` (whitespace tolerant)
|
||||
* - `onMissing: 'keep'` - Unknown patterns pass through (e.g., Grafana's `{{instance}}`)
|
||||
* - `deep: false` - Only processes strings by default; set `true` for nested objects
|
||||
*/
|
||||
export const ENV_VAR_RESOLVE_DEFAULTS: Required<Omit<EnvVarResolveOptions, 'missingKeys'>> = {
|
||||
resolveExactMatch: true,
|
||||
allowEmbedded: true,
|
||||
trimKeys: true,
|
||||
onMissing: 'keep',
|
||||
deep: false,
|
||||
} as const
|
||||
|
||||
/**
|
||||
* Resolve {{ENV_VAR}} references in values using provided env vars.
|
||||
*/
|
||||
@@ -37,11 +54,11 @@ export function resolveEnvVarReferences(
|
||||
options: EnvVarResolveOptions = {}
|
||||
): unknown {
|
||||
const {
|
||||
allowEmbedded = true,
|
||||
resolveExactMatch = true,
|
||||
trimKeys = false,
|
||||
onMissing = 'keep',
|
||||
deep = true,
|
||||
allowEmbedded = ENV_VAR_RESOLVE_DEFAULTS.allowEmbedded,
|
||||
resolveExactMatch = ENV_VAR_RESOLVE_DEFAULTS.resolveExactMatch,
|
||||
trimKeys = ENV_VAR_RESOLVE_DEFAULTS.trimKeys,
|
||||
onMissing = ENV_VAR_RESOLVE_DEFAULTS.onMissing,
|
||||
deep = ENV_VAR_RESOLVE_DEFAULTS.deep,
|
||||
} = options
|
||||
|
||||
if (typeof value === 'string') {
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
|
||||
import { useCallback, useState } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { sanitizeForHttp, sanitizeHeaders } from '@/lib/mcp/shared'
|
||||
import type { McpTransport } from '@/lib/mcp/types'
|
||||
import { sanitizeForHttp, sanitizeHeaders } from '@/lib/mcp/utils'
|
||||
|
||||
const logger = createLogger('useMcpServerTest')
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ import { useCallback, useMemo } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { useQueryClient } from '@tanstack/react-query'
|
||||
import { McpIcon } from '@/components/icons'
|
||||
import { createMcpToolId } from '@/lib/mcp/utils'
|
||||
import { createMcpToolId } from '@/lib/mcp/shared'
|
||||
import { mcpKeys, useMcpToolsQuery } from '@/hooks/queries/mcp'
|
||||
|
||||
const logger = createLogger('useMcpTools')
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query'
|
||||
import { sanitizeForHttp, sanitizeHeaders } from '@/lib/mcp/shared'
|
||||
import type { McpServerStatusConfig, McpTool, StoredMcpTool } from '@/lib/mcp/types'
|
||||
import { sanitizeForHttp, sanitizeHeaders } from '@/lib/mcp/utils'
|
||||
|
||||
const logger = createLogger('McpQueries')
|
||||
|
||||
|
||||
43
apps/sim/hooks/use-available-env-vars.ts
Normal file
43
apps/sim/hooks/use-available-env-vars.ts
Normal file
@@ -0,0 +1,43 @@
|
||||
import { useMemo } from 'react'
|
||||
import { usePersonalEnvironment, useWorkspaceEnvironment } from '@/hooks/queries/environment'
|
||||
|
||||
export function useAvailableEnvVarKeys(workspaceId?: string): Set<string> | undefined {
|
||||
const { data: personalEnv, isLoading: personalLoading } = usePersonalEnvironment()
|
||||
const { data: workspaceEnvData, isLoading: workspaceLoading } = useWorkspaceEnvironment(
|
||||
workspaceId || ''
|
||||
)
|
||||
|
||||
return useMemo(() => {
|
||||
if (personalLoading || (workspaceId && workspaceLoading)) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
const keys = new Set<string>()
|
||||
|
||||
if (personalEnv) {
|
||||
Object.keys(personalEnv).forEach((key) => keys.add(key))
|
||||
}
|
||||
|
||||
if (workspaceId && workspaceEnvData) {
|
||||
if (workspaceEnvData.workspace) {
|
||||
Object.keys(workspaceEnvData.workspace).forEach((key) => keys.add(key))
|
||||
}
|
||||
if (workspaceEnvData.personal) {
|
||||
Object.keys(workspaceEnvData.personal).forEach((key) => keys.add(key))
|
||||
}
|
||||
}
|
||||
|
||||
return keys
|
||||
}, [personalEnv, workspaceEnvData, personalLoading, workspaceLoading, workspaceId])
|
||||
}
|
||||
|
||||
export function createShouldHighlightEnvVar(
|
||||
availableEnvVars: Set<string> | undefined
|
||||
): (varName: string) => boolean {
|
||||
return (varName: string): boolean => {
|
||||
if (availableEnvVars === undefined) {
|
||||
return true
|
||||
}
|
||||
return availableEnvVars.has(varName)
|
||||
}
|
||||
}
|
||||
@@ -3,9 +3,6 @@ import { environment, workspaceEnvironment } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { decryptSecret } from '@/lib/core/security/encryption'
|
||||
import { REFERENCE } from '@/executor/constants'
|
||||
import { createEnvVarPattern } from '@/executor/utils/reference-validation'
|
||||
import type { BlockState } from '@/stores/workflows/workflow/types'
|
||||
|
||||
const logger = createLogger('EnvironmentUtils')
|
||||
|
||||
@@ -54,6 +51,7 @@ export async function getPersonalAndWorkspaceEnv(
|
||||
personalDecrypted: Record<string, string>
|
||||
workspaceDecrypted: Record<string, string>
|
||||
conflicts: string[]
|
||||
decryptionFailures: string[]
|
||||
}> {
|
||||
const [personalRows, workspaceRows] = await Promise.all([
|
||||
db.select().from(environment).where(eq(environment.userId, userId)).limit(1),
|
||||
@@ -69,14 +67,23 @@ export async function getPersonalAndWorkspaceEnv(
|
||||
const personalEncrypted: Record<string, string> = (personalRows[0]?.variables as any) || {}
|
||||
const workspaceEncrypted: Record<string, string> = (workspaceRows[0]?.variables as any) || {}
|
||||
|
||||
const decryptAll = async (src: Record<string, string>) => {
|
||||
const decryptionFailures: string[] = []
|
||||
|
||||
const decryptAll = async (src: Record<string, string>, source: 'personal' | 'workspace') => {
|
||||
const entries = Object.entries(src)
|
||||
const results = await Promise.all(
|
||||
entries.map(async ([k, v]) => {
|
||||
try {
|
||||
const { decrypted } = await decryptSecret(v)
|
||||
return [k, decrypted] as const
|
||||
} catch {
|
||||
} catch (error) {
|
||||
logger.error(`Failed to decrypt ${source} environment variable "${k}"`, {
|
||||
userId,
|
||||
workspaceId,
|
||||
source,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
})
|
||||
decryptionFailures.push(k)
|
||||
return [k, ''] as const
|
||||
}
|
||||
})
|
||||
@@ -85,18 +92,28 @@ export async function getPersonalAndWorkspaceEnv(
|
||||
}
|
||||
|
||||
const [personalDecrypted, workspaceDecrypted] = await Promise.all([
|
||||
decryptAll(personalEncrypted),
|
||||
decryptAll(workspaceEncrypted),
|
||||
decryptAll(personalEncrypted, 'personal'),
|
||||
decryptAll(workspaceEncrypted, 'workspace'),
|
||||
])
|
||||
|
||||
const conflicts = Object.keys(personalEncrypted).filter((k) => k in workspaceEncrypted)
|
||||
|
||||
if (decryptionFailures.length > 0) {
|
||||
logger.warn('Some environment variables failed to decrypt', {
|
||||
userId,
|
||||
workspaceId,
|
||||
failedKeys: decryptionFailures,
|
||||
failedCount: decryptionFailures.length,
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
personalEncrypted,
|
||||
workspaceEncrypted,
|
||||
personalDecrypted,
|
||||
workspaceDecrypted,
|
||||
conflicts,
|
||||
decryptionFailures,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,86 +127,3 @@ export async function getEffectiveDecryptedEnv(
|
||||
)
|
||||
return { ...personalDecrypted, ...workspaceDecrypted }
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure all environment variables can be decrypted.
|
||||
*/
|
||||
export async function ensureEnvVarsDecryptable(
|
||||
variables: Record<string, string>,
|
||||
options: { requestId?: string } = {}
|
||||
): Promise<void> {
|
||||
const requestId = options.requestId
|
||||
for (const [key, encryptedValue] of Object.entries(variables)) {
|
||||
try {
|
||||
await decryptSecret(encryptedValue)
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : 'Unknown error'
|
||||
if (requestId) {
|
||||
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}"`, error)
|
||||
} else {
|
||||
logger.error(`Failed to decrypt environment variable "${key}"`, error)
|
||||
}
|
||||
throw new Error(`Failed to decrypt environment variable "${key}": ${message}`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure all {{ENV_VAR}} references in block subblocks resolve to decryptable values.
|
||||
*/
|
||||
export async function ensureBlockEnvVarsResolvable(
|
||||
blocks: Record<string, BlockState>,
|
||||
variables: Record<string, string>,
|
||||
options: { requestId?: string } = {}
|
||||
): Promise<void> {
|
||||
const requestId = options.requestId
|
||||
const envVarPattern = createEnvVarPattern()
|
||||
await Promise.all(
|
||||
Object.values(blocks).map(async (block) => {
|
||||
const subBlocks = block.subBlocks ?? {}
|
||||
await Promise.all(
|
||||
Object.values(subBlocks).map(async (subBlock) => {
|
||||
const value = subBlock.value
|
||||
if (
|
||||
typeof value !== 'string' ||
|
||||
!value.includes(REFERENCE.ENV_VAR_START) ||
|
||||
!value.includes(REFERENCE.ENV_VAR_END)
|
||||
) {
|
||||
return
|
||||
}
|
||||
|
||||
const matches = value.match(envVarPattern)
|
||||
if (!matches) {
|
||||
return
|
||||
}
|
||||
|
||||
for (const match of matches) {
|
||||
const varName = match.slice(
|
||||
REFERENCE.ENV_VAR_START.length,
|
||||
-REFERENCE.ENV_VAR_END.length
|
||||
)
|
||||
const encryptedValue = variables[varName]
|
||||
if (!encryptedValue) {
|
||||
throw new Error(`Environment variable "${varName}" was not found`)
|
||||
}
|
||||
|
||||
try {
|
||||
await decryptSecret(encryptedValue)
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : 'Unknown error'
|
||||
if (requestId) {
|
||||
logger.error(
|
||||
`[${requestId}] Error decrypting value for variable "${varName}"`,
|
||||
error
|
||||
)
|
||||
} else {
|
||||
logger.error(`Error decrypting value for variable "${varName}"`, error)
|
||||
}
|
||||
throw new Error(`Failed to decrypt environment variable "${varName}": ${message}`)
|
||||
}
|
||||
}
|
||||
})
|
||||
)
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-mon
|
||||
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
|
||||
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { preflightWorkflowEnvVars } from '@/lib/workflows/executor/preflight'
|
||||
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
|
||||
import type { CoreTriggerType } from '@/stores/logs/filters/types'
|
||||
|
||||
@@ -118,15 +117,13 @@ export interface PreprocessExecutionOptions {
|
||||
checkRateLimit?: boolean // Default: false for manual/chat, true for others
|
||||
checkDeployment?: boolean // Default: true for non-manual triggers
|
||||
skipUsageLimits?: boolean // Default: false (only use for test mode)
|
||||
preflightEnvVars?: boolean // Default: false
|
||||
|
||||
// Context information
|
||||
workspaceId?: string // If known, used for billing resolution
|
||||
loggingSession?: LoggingSession // If provided, will be used for error logging
|
||||
isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes)
|
||||
/** @deprecated No longer used - preflight always uses deployed state */
|
||||
/** @deprecated No longer used - background/async executions always use deployed state */
|
||||
useDraftState?: boolean
|
||||
envUserId?: string // Optional override for env var resolution user
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -164,11 +161,9 @@ export async function preprocessExecution(
|
||||
checkRateLimit = triggerType !== 'manual' && triggerType !== 'chat',
|
||||
checkDeployment = triggerType !== 'manual',
|
||||
skipUsageLimits = false,
|
||||
preflightEnvVars = false,
|
||||
workspaceId: providedWorkspaceId,
|
||||
loggingSession: providedLoggingSession,
|
||||
isResumeContext = false,
|
||||
envUserId,
|
||||
} = options
|
||||
|
||||
logger.info(`[${requestId}] Starting execution preprocessing`, {
|
||||
@@ -483,44 +478,6 @@ export async function preprocessExecution(
|
||||
}
|
||||
|
||||
// ========== SUCCESS: All Checks Passed ==========
|
||||
if (preflightEnvVars) {
|
||||
try {
|
||||
const resolvedEnvUserId = envUserId || workflowRecord.userId || userId
|
||||
await preflightWorkflowEnvVars({
|
||||
workflowId,
|
||||
workspaceId,
|
||||
envUserId: resolvedEnvUserId,
|
||||
requestId,
|
||||
})
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : 'Env var preflight failed'
|
||||
logger.warn(`[${requestId}] Env var preflight failed`, {
|
||||
workflowId,
|
||||
message,
|
||||
})
|
||||
|
||||
await logPreprocessingError({
|
||||
workflowId,
|
||||
executionId,
|
||||
triggerType,
|
||||
requestId,
|
||||
userId: actorUserId,
|
||||
workspaceId,
|
||||
errorMessage: message,
|
||||
loggingSession: providedLoggingSession,
|
||||
})
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: {
|
||||
message,
|
||||
statusCode: 400,
|
||||
logCreated: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(`[${requestId}] All preprocessing checks passed`, {
|
||||
workflowId,
|
||||
actorUserId,
|
||||
|
||||
85
apps/sim/lib/mcp/resolve-config.ts
Normal file
85
apps/sim/lib/mcp/resolve-config.ts
Normal file
@@ -0,0 +1,85 @@
|
||||
/**
|
||||
* Server-only MCP config resolution utilities.
|
||||
* This file contains functions that require server-side dependencies (database access).
|
||||
* Do NOT import this file in client components.
|
||||
*/
|
||||
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
import type { McpServerConfig } from '@/lib/mcp/types'
|
||||
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||
|
||||
const logger = createLogger('McpResolveConfig')
|
||||
|
||||
export interface ResolveMcpConfigOptions {
|
||||
/** If true, throws an error when env vars are missing. Default: true */
|
||||
strict?: boolean
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve environment variables in MCP server config (url, headers).
|
||||
* Shared utility used by both MCP service and test-connection endpoint.
|
||||
*
|
||||
* @param config - MCP server config with potential {{ENV_VAR}} patterns
|
||||
* @param userId - User ID to fetch environment variables for
|
||||
* @param workspaceId - Workspace ID for workspace-specific env vars
|
||||
* @param options - Resolution options (strict mode throws on missing vars)
|
||||
* @returns Resolved config with env vars replaced
|
||||
*/
|
||||
export async function resolveMcpConfigEnvVars(
|
||||
config: McpServerConfig,
|
||||
userId: string,
|
||||
workspaceId?: string,
|
||||
options: ResolveMcpConfigOptions = {}
|
||||
): Promise<{ config: McpServerConfig; missingVars: string[] }> {
|
||||
const { strict = true } = options
|
||||
const allMissingVars: string[] = []
|
||||
|
||||
let envVars: Record<string, string> = {}
|
||||
try {
|
||||
envVars = await getEffectiveDecryptedEnv(userId, workspaceId)
|
||||
} catch (error) {
|
||||
logger.error('Failed to fetch environment variables for MCP config:', error)
|
||||
return { config, missingVars: [] }
|
||||
}
|
||||
|
||||
const resolveValue = (value: string): string => {
|
||||
const missingVars: string[] = []
|
||||
const resolved = resolveEnvVarReferences(value, envVars, {
|
||||
missingKeys: missingVars,
|
||||
}) as string
|
||||
allMissingVars.push(...missingVars)
|
||||
return resolved
|
||||
}
|
||||
|
||||
const resolvedConfig = { ...config }
|
||||
|
||||
if (resolvedConfig.url) {
|
||||
resolvedConfig.url = resolveValue(resolvedConfig.url)
|
||||
}
|
||||
|
||||
if (resolvedConfig.headers) {
|
||||
const resolvedHeaders: Record<string, string> = {}
|
||||
for (const [key, value] of Object.entries(resolvedConfig.headers)) {
|
||||
resolvedHeaders[key] = resolveValue(value)
|
||||
}
|
||||
resolvedConfig.headers = resolvedHeaders
|
||||
}
|
||||
|
||||
// Handle missing vars based on strict mode
|
||||
if (allMissingVars.length > 0) {
|
||||
const uniqueMissing = Array.from(new Set(allMissingVars))
|
||||
|
||||
if (strict) {
|
||||
throw new Error(
|
||||
`Missing required environment variable${uniqueMissing.length > 1 ? 's' : ''}: ${uniqueMissing.join(', ')}. ` +
|
||||
`Please set ${uniqueMissing.length > 1 ? 'these variables' : 'this variable'} in your workspace or personal environment settings.`
|
||||
)
|
||||
}
|
||||
uniqueMissing.forEach((envKey) => {
|
||||
logger.warn(`Environment variable "${envKey}" not found in MCP config`)
|
||||
})
|
||||
}
|
||||
|
||||
return { config: resolvedConfig, missingVars: allMissingVars }
|
||||
}
|
||||
@@ -8,8 +8,8 @@ import { createLogger } from '@sim/logger'
|
||||
import { and, eq, isNull } from 'drizzle-orm'
|
||||
import { isTest } from '@/lib/core/config/feature-flags'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
import { McpClient } from '@/lib/mcp/client'
|
||||
import { resolveMcpConfigEnvVars } from '@/lib/mcp/resolve-config'
|
||||
import {
|
||||
createMcpCacheAdapter,
|
||||
getMcpCacheType,
|
||||
@@ -25,7 +25,6 @@ import type {
|
||||
McpTransport,
|
||||
} from '@/lib/mcp/types'
|
||||
import { MCP_CONSTANTS } from '@/lib/mcp/utils'
|
||||
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||
|
||||
const logger = createLogger('McpService')
|
||||
|
||||
@@ -47,60 +46,18 @@ class McpService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve environment variables in strings
|
||||
*/
|
||||
private resolveEnvVars(value: string, envVars: Record<string, string>): string {
|
||||
const missingVars: string[] = []
|
||||
const resolvedValue = resolveEnvVarReferences(value, envVars, {
|
||||
allowEmbedded: true,
|
||||
resolveExactMatch: true,
|
||||
trimKeys: true,
|
||||
onMissing: 'keep',
|
||||
deep: false,
|
||||
missingKeys: missingVars,
|
||||
}) as string
|
||||
|
||||
if (missingVars.length > 0) {
|
||||
const uniqueMissing = Array.from(new Set(missingVars))
|
||||
throw new Error(
|
||||
`Missing required environment variable${uniqueMissing.length > 1 ? 's' : ''}: ${uniqueMissing.join(', ')}. ` +
|
||||
`Please set ${uniqueMissing.length > 1 ? 'these variables' : 'this variable'} in your workspace or personal environment settings.`
|
||||
)
|
||||
}
|
||||
|
||||
return resolvedValue
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve environment variables in server config
|
||||
* Resolve environment variables in server config.
|
||||
* Uses shared utility with strict mode (throws on missing vars).
|
||||
*/
|
||||
private async resolveConfigEnvVars(
|
||||
config: McpServerConfig,
|
||||
userId: string,
|
||||
workspaceId?: string
|
||||
): Promise<McpServerConfig> {
|
||||
try {
|
||||
const envVars = await getEffectiveDecryptedEnv(userId, workspaceId)
|
||||
|
||||
const resolvedConfig = { ...config }
|
||||
|
||||
if (resolvedConfig.url) {
|
||||
resolvedConfig.url = this.resolveEnvVars(resolvedConfig.url, envVars)
|
||||
}
|
||||
|
||||
if (resolvedConfig.headers) {
|
||||
const resolvedHeaders: Record<string, string> = {}
|
||||
for (const [key, value] of Object.entries(resolvedConfig.headers)) {
|
||||
resolvedHeaders[key] = this.resolveEnvVars(value, envVars)
|
||||
}
|
||||
resolvedConfig.headers = resolvedHeaders
|
||||
}
|
||||
|
||||
return resolvedConfig
|
||||
} catch (error) {
|
||||
logger.error('Failed to resolve environment variables for MCP server config:', error)
|
||||
return config
|
||||
}
|
||||
const { config: resolvedConfig } = await resolveMcpConfigEnvVars(config, userId, workspaceId, {
|
||||
strict: true,
|
||||
})
|
||||
return resolvedConfig
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
48
apps/sim/lib/mcp/shared.ts
Normal file
48
apps/sim/lib/mcp/shared.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
/**
|
||||
* Shared MCP utilities - safe for both client and server.
|
||||
* No server-side dependencies (database, fs, etc.) should be imported here.
|
||||
*/
|
||||
|
||||
import { isMcpTool, MCP } from '@/executor/constants'
|
||||
|
||||
/**
|
||||
* Sanitizes a string by removing invisible Unicode characters that cause HTTP header errors.
|
||||
* Handles characters like U+2028 (Line Separator) that can be introduced via copy-paste.
|
||||
*/
|
||||
export function sanitizeForHttp(value: string): string {
|
||||
return value
|
||||
.replace(/[\u2028\u2029\u200B-\u200D\uFEFF]/g, '')
|
||||
.replace(/[\x00-\x1F\x7F]/g, '')
|
||||
.trim()
|
||||
}
|
||||
|
||||
/**
|
||||
* Sanitizes all header key-value pairs for HTTP usage.
|
||||
*/
|
||||
export function sanitizeHeaders(
|
||||
headers: Record<string, string> | undefined
|
||||
): Record<string, string> | undefined {
|
||||
if (!headers) return headers
|
||||
return Object.fromEntries(
|
||||
Object.entries(headers)
|
||||
.map(([key, value]) => [sanitizeForHttp(key), sanitizeForHttp(value)])
|
||||
.filter(([key, value]) => key !== '' && value !== '')
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Client-safe MCP constants
|
||||
*/
|
||||
export const MCP_CLIENT_CONSTANTS = {
|
||||
CLIENT_TIMEOUT: 60000,
|
||||
MAX_RETRIES: 3,
|
||||
RECONNECT_DELAY: 1000,
|
||||
} as const
|
||||
|
||||
/**
|
||||
* Create standardized MCP tool ID from server ID and tool name
|
||||
*/
|
||||
export function createMcpToolId(serverId: string, toolName: string): string {
|
||||
const normalizedServerId = isMcpTool(serverId) ? serverId : `${MCP.TOOL_PREFIX}${serverId}`
|
||||
return `${normalizedServerId}-${toolName}`
|
||||
}
|
||||
@@ -1,72 +1,22 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
import { extractEnvVarName, isEnvVarReference } from '@/executor/constants'
|
||||
|
||||
const logger = createLogger('EnvResolver')
|
||||
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||
|
||||
/**
|
||||
* Resolves environment variable references in a string value
|
||||
* Uses the same helper functions as the executor's EnvResolver
|
||||
* Recursively resolves all environment variable references in a configuration object.
|
||||
* Supports both exact matches (`{{VAR_NAME}}`) and embedded patterns (`https://{{HOST}}/path`).
|
||||
*
|
||||
* @param value - The string that may contain env var references
|
||||
* @param envVars - Object containing environment variable key-value pairs
|
||||
* @returns The resolved string with env vars replaced
|
||||
*/
|
||||
function resolveEnvVarInString(value: string, envVars: Record<string, string>): string {
|
||||
if (!isEnvVarReference(value)) {
|
||||
return value
|
||||
}
|
||||
|
||||
const varName = extractEnvVarName(value)
|
||||
const resolvedValue = envVars[varName]
|
||||
|
||||
if (resolvedValue === undefined) {
|
||||
logger.warn(`Environment variable not found: ${varName}`)
|
||||
return value // Return original if not found
|
||||
}
|
||||
|
||||
logger.debug(`Resolved environment variable: ${varName}`)
|
||||
return resolvedValue
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively resolves all environment variable references in a configuration object
|
||||
* Supports the pattern: {{VAR_NAME}}
|
||||
* Uses `deep: true` because webhook configs have nested structures that need full resolution.
|
||||
*
|
||||
* @param config - Configuration object that may contain env var references
|
||||
* @param userId - User ID to fetch environment variables for
|
||||
* @param workspaceId - Optional workspace ID for workspace-specific env vars
|
||||
* @returns A new object with all env var references resolved
|
||||
*/
|
||||
export async function resolveEnvVarsInObject(
|
||||
config: Record<string, any>,
|
||||
export async function resolveEnvVarsInObject<T extends Record<string, unknown>>(
|
||||
config: T,
|
||||
userId: string,
|
||||
workspaceId?: string
|
||||
): Promise<Record<string, any>> {
|
||||
): Promise<T> {
|
||||
const envVars = await getEffectiveDecryptedEnv(userId, workspaceId)
|
||||
|
||||
const resolved = { ...config }
|
||||
|
||||
function resolveValue(value: any): any {
|
||||
if (typeof value === 'string') {
|
||||
return resolveEnvVarInString(value, envVars)
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
return value.map(resolveValue)
|
||||
}
|
||||
if (value !== null && typeof value === 'object') {
|
||||
const resolvedObj: Record<string, any> = {}
|
||||
for (const [key, val] of Object.entries(value)) {
|
||||
resolvedObj[key] = resolveValue(val)
|
||||
}
|
||||
return resolvedObj
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
for (const [key, value] of Object.entries(resolved)) {
|
||||
resolved[key] = resolveValue(value)
|
||||
}
|
||||
|
||||
return resolved
|
||||
return resolveEnvVarReferences(config, envVars, { deep: true }) as T
|
||||
}
|
||||
|
||||
@@ -413,13 +413,7 @@ export async function findAllWebhooksForPath(
|
||||
* @returns String with all {{VARIABLE}} references replaced
|
||||
*/
|
||||
function resolveEnvVars(value: string, envVars: Record<string, string>): string {
|
||||
return resolveEnvVarReferences(value, envVars, {
|
||||
allowEmbedded: true,
|
||||
resolveExactMatch: true,
|
||||
trimKeys: true,
|
||||
onMissing: 'keep',
|
||||
deep: false,
|
||||
}) as string
|
||||
return resolveEnvVarReferences(value, envVars) as string
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -804,7 +798,6 @@ export async function checkWebhookPreprocessing(
|
||||
checkRateLimit: true,
|
||||
checkDeployment: true,
|
||||
workspaceId: foundWorkflow.workspaceId,
|
||||
preflightEnvVars: isTriggerDevEnabled,
|
||||
})
|
||||
|
||||
if (!preprocessResult.success) {
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { Edge } from 'reactflow'
|
||||
import { z } from 'zod'
|
||||
import { parseResponseFormatSafely } from '@/lib/core/utils/response-format'
|
||||
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
|
||||
import { clearExecutionCancellation } from '@/lib/execution/cancellation'
|
||||
import type { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
@@ -25,7 +24,6 @@ import type {
|
||||
IterationContext,
|
||||
} from '@/executor/execution/types'
|
||||
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
|
||||
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||
import { Serializer } from '@/serializer'
|
||||
import { mergeSubblockState } from '@/stores/workflows/server-utils'
|
||||
|
||||
@@ -202,50 +200,6 @@ export async function executeWorkflowCore(
|
||||
deploymentVersionId,
|
||||
})
|
||||
|
||||
// Process block states with env var substitution using pre-decrypted values
|
||||
const currentBlockStates = Object.entries(mergedStates).reduce(
|
||||
(acc, [id, block]) => {
|
||||
acc[id] = Object.entries(block.subBlocks).reduce(
|
||||
(subAcc, [key, subBlock]) => {
|
||||
let value = subBlock.value
|
||||
|
||||
if (typeof value === 'string') {
|
||||
value = resolveEnvVarReferences(value, decryptedEnvVars, {
|
||||
resolveExactMatch: false,
|
||||
trimKeys: false,
|
||||
onMissing: 'keep',
|
||||
deep: false,
|
||||
}) as string
|
||||
}
|
||||
|
||||
subAcc[key] = value
|
||||
return subAcc
|
||||
},
|
||||
{} as Record<string, any>
|
||||
)
|
||||
return acc
|
||||
},
|
||||
{} as Record<string, Record<string, any>>
|
||||
)
|
||||
|
||||
// Process response format
|
||||
const processedBlockStates = Object.entries(currentBlockStates).reduce(
|
||||
(acc, [blockId, blockState]) => {
|
||||
const responseFormatValue = blockState.responseFormat
|
||||
if (responseFormatValue === undefined || responseFormatValue === null) {
|
||||
acc[blockId] = blockState
|
||||
return acc
|
||||
}
|
||||
|
||||
const responseFormat = parseResponseFormatSafely(responseFormatValue, blockId, {
|
||||
allowReferences: true,
|
||||
})
|
||||
acc[blockId] = { ...blockState, responseFormat: responseFormat ?? undefined }
|
||||
return acc
|
||||
},
|
||||
{} as Record<string, Record<string, any>>
|
||||
)
|
||||
|
||||
// Use edges directly - trigger-to-trigger edges are prevented at creation time
|
||||
const filteredEdges = edges
|
||||
|
||||
@@ -346,7 +300,6 @@ export async function executeWorkflowCore(
|
||||
|
||||
const executorInstance = new Executor({
|
||||
workflow: serializedWorkflow,
|
||||
currentBlockStates: processedBlockStates,
|
||||
envVarValues: decryptedEnvVars,
|
||||
workflowInput: processedInput,
|
||||
workflowVariables,
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import {
|
||||
ensureBlockEnvVarsResolvable,
|
||||
ensureEnvVarsDecryptable,
|
||||
getPersonalAndWorkspaceEnv,
|
||||
} from '@/lib/environment/utils'
|
||||
import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
|
||||
import { mergeSubblockState } from '@/stores/workflows/server-utils'
|
||||
|
||||
const logger = createLogger('ExecutionPreflight')
|
||||
|
||||
export interface EnvVarPreflightOptions {
|
||||
workflowId: string
|
||||
workspaceId: string
|
||||
envUserId: string
|
||||
requestId?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Preflight env var checks to avoid scheduling executions that will fail.
|
||||
* Always uses deployed workflow state since preflight is only done for async
|
||||
* executions which always run on deployed state.
|
||||
*/
|
||||
export async function preflightWorkflowEnvVars({
|
||||
workflowId,
|
||||
workspaceId,
|
||||
envUserId,
|
||||
requestId,
|
||||
}: EnvVarPreflightOptions): Promise<void> {
|
||||
const workflowData = await loadDeployedWorkflowState(workflowId)
|
||||
|
||||
if (!workflowData) {
|
||||
throw new Error('Workflow state not found')
|
||||
}
|
||||
|
||||
const mergedStates = mergeSubblockState(workflowData.blocks)
|
||||
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
|
||||
envUserId,
|
||||
workspaceId
|
||||
)
|
||||
const variables = { ...personalEncrypted, ...workspaceEncrypted }
|
||||
|
||||
await ensureBlockEnvVarsResolvable(mergedStates, variables, { requestId })
|
||||
await ensureEnvVarsDecryptable(variables, { requestId })
|
||||
|
||||
if (requestId) {
|
||||
logger.debug(`[${requestId}] Env var preflight passed`, { workflowId })
|
||||
} else {
|
||||
logger.debug('Env var preflight passed', { workflowId })
|
||||
}
|
||||
}
|
||||
@@ -132,7 +132,6 @@ describe('Serializer', () => {
|
||||
expect(agentBlock?.metadata?.id).toBe('agent')
|
||||
expect(agentBlock?.config.tool).toBe('openai')
|
||||
expect(agentBlock?.config.params.model).toBe('gpt-4o')
|
||||
expect(agentBlock?.outputs.responseFormat).toBeDefined()
|
||||
})
|
||||
|
||||
it.concurrent('should serialize agent block with custom tools correctly', () => {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { Edge } from 'reactflow'
|
||||
import { parseResponseFormatSafely } from '@/lib/core/utils/response-format'
|
||||
import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator'
|
||||
import {
|
||||
buildCanonicalIndex,
|
||||
@@ -275,15 +274,6 @@ export class Serializer {
|
||||
inputs,
|
||||
outputs: {
|
||||
...block.outputs,
|
||||
// Include response format fields if available
|
||||
...(params.responseFormat
|
||||
? {
|
||||
responseFormat:
|
||||
parseResponseFormatSafely(params.responseFormat, block.id, {
|
||||
allowReferences: true,
|
||||
}) ?? undefined,
|
||||
}
|
||||
: {}),
|
||||
},
|
||||
metadata: {
|
||||
id: block.type,
|
||||
|
||||
@@ -257,148 +257,6 @@ describe('Serializer Extended Tests', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('parseResponseFormatSafely edge cases', () => {
|
||||
it('should handle null responseFormat', () => {
|
||||
const serializer = new Serializer()
|
||||
const block: BlockState = {
|
||||
id: 'agent-1',
|
||||
type: 'agent',
|
||||
name: 'Agent',
|
||||
position: { x: 0, y: 0 },
|
||||
subBlocks: {
|
||||
model: { id: 'model', type: 'dropdown', value: 'gpt-4o' },
|
||||
prompt: { id: 'prompt', type: 'long-input', value: 'Test' },
|
||||
responseFormat: { id: 'responseFormat', type: 'code', value: null },
|
||||
},
|
||||
outputs: {},
|
||||
enabled: true,
|
||||
}
|
||||
|
||||
const serialized = serializer.serializeWorkflow({ 'agent-1': block }, [], {})
|
||||
const agentBlock = serialized.blocks.find((b) => b.id === 'agent-1')
|
||||
|
||||
expect(agentBlock?.outputs.responseFormat).toBeUndefined()
|
||||
})
|
||||
|
||||
it('should handle empty string responseFormat', () => {
|
||||
const serializer = new Serializer()
|
||||
const block: BlockState = {
|
||||
id: 'agent-1',
|
||||
type: 'agent',
|
||||
name: 'Agent',
|
||||
position: { x: 0, y: 0 },
|
||||
subBlocks: {
|
||||
model: { id: 'model', type: 'dropdown', value: 'gpt-4o' },
|
||||
prompt: { id: 'prompt', type: 'long-input', value: 'Test' },
|
||||
responseFormat: { id: 'responseFormat', type: 'code', value: ' ' },
|
||||
},
|
||||
outputs: {},
|
||||
enabled: true,
|
||||
}
|
||||
|
||||
const serialized = serializer.serializeWorkflow({ 'agent-1': block }, [], {})
|
||||
const agentBlock = serialized.blocks.find((b) => b.id === 'agent-1')
|
||||
|
||||
expect(agentBlock?.outputs.responseFormat).toBeUndefined()
|
||||
})
|
||||
|
||||
it('should handle variable reference in responseFormat', () => {
|
||||
const serializer = new Serializer()
|
||||
const block: BlockState = {
|
||||
id: 'agent-1',
|
||||
type: 'agent',
|
||||
name: 'Agent',
|
||||
position: { x: 0, y: 0 },
|
||||
subBlocks: {
|
||||
model: { id: 'model', type: 'dropdown', value: 'gpt-4o' },
|
||||
prompt: { id: 'prompt', type: 'long-input', value: 'Test' },
|
||||
responseFormat: { id: 'responseFormat', type: 'code', value: '<start.schema>' },
|
||||
},
|
||||
outputs: {},
|
||||
enabled: true,
|
||||
}
|
||||
|
||||
const serialized = serializer.serializeWorkflow({ 'agent-1': block }, [], {})
|
||||
const agentBlock = serialized.blocks.find((b) => b.id === 'agent-1')
|
||||
|
||||
expect(agentBlock?.outputs.responseFormat).toBe('<start.schema>')
|
||||
})
|
||||
|
||||
it('should handle object responseFormat', () => {
|
||||
const serializer = new Serializer()
|
||||
const schemaObject = { type: 'object', properties: { name: { type: 'string' } } }
|
||||
const block: BlockState = {
|
||||
id: 'agent-1',
|
||||
type: 'agent',
|
||||
name: 'Agent',
|
||||
position: { x: 0, y: 0 },
|
||||
subBlocks: {
|
||||
model: { id: 'model', type: 'dropdown', value: 'gpt-4o' },
|
||||
prompt: { id: 'prompt', type: 'long-input', value: 'Test' },
|
||||
responseFormat: { id: 'responseFormat', type: 'code', value: schemaObject as any },
|
||||
},
|
||||
outputs: {},
|
||||
enabled: true,
|
||||
}
|
||||
|
||||
const serialized = serializer.serializeWorkflow({ 'agent-1': block }, [], {})
|
||||
const agentBlock = serialized.blocks.find((b) => b.id === 'agent-1')
|
||||
|
||||
expect(agentBlock?.outputs.responseFormat).toEqual(schemaObject)
|
||||
})
|
||||
|
||||
it('should handle invalid JSON responseFormat gracefully', () => {
|
||||
const serializer = new Serializer()
|
||||
const block: BlockState = {
|
||||
id: 'agent-1',
|
||||
type: 'agent',
|
||||
name: 'Agent',
|
||||
position: { x: 0, y: 0 },
|
||||
subBlocks: {
|
||||
model: { id: 'model', type: 'dropdown', value: 'gpt-4o' },
|
||||
prompt: { id: 'prompt', type: 'long-input', value: 'Test' },
|
||||
responseFormat: { id: 'responseFormat', type: 'code', value: '{invalid json}' },
|
||||
},
|
||||
outputs: {},
|
||||
enabled: true,
|
||||
}
|
||||
|
||||
const serialized = serializer.serializeWorkflow({ 'agent-1': block }, [], {})
|
||||
const agentBlock = serialized.blocks.find((b) => b.id === 'agent-1')
|
||||
|
||||
expect(agentBlock?.outputs.responseFormat).toBeUndefined()
|
||||
})
|
||||
|
||||
it('should parse valid JSON responseFormat', () => {
|
||||
const serializer = new Serializer()
|
||||
const block: BlockState = {
|
||||
id: 'agent-1',
|
||||
type: 'agent',
|
||||
name: 'Agent',
|
||||
position: { x: 0, y: 0 },
|
||||
subBlocks: {
|
||||
model: { id: 'model', type: 'dropdown', value: 'gpt-4o' },
|
||||
prompt: { id: 'prompt', type: 'long-input', value: 'Test' },
|
||||
responseFormat: {
|
||||
id: 'responseFormat',
|
||||
type: 'code',
|
||||
value: '{"type":"object","properties":{"result":{"type":"string"}}}',
|
||||
},
|
||||
},
|
||||
outputs: {},
|
||||
enabled: true,
|
||||
}
|
||||
|
||||
const serialized = serializer.serializeWorkflow({ 'agent-1': block }, [], {})
|
||||
const agentBlock = serialized.blocks.find((b) => b.id === 'agent-1')
|
||||
|
||||
expect(agentBlock?.outputs.responseFormat).toEqual({
|
||||
type: 'object',
|
||||
properties: { result: { type: 'string' } },
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('subflow block serialization', () => {
|
||||
it('should serialize loop blocks correctly', () => {
|
||||
const serializer = new Serializer()
|
||||
|
||||
Reference in New Issue
Block a user