Refactor to iterate per workspace to avoid overconsuming memory

This commit is contained in:
Theodore Li
2026-03-07 16:32:21 -08:00
parent 01b21ed004
commit 6e7bdaedda

View File

@@ -213,6 +213,37 @@ type RawKeyRef = {
userId: string
}
type EnvLookup = {
wsEnvVars: Record<string, string>
personalEnvCache: Map<string, Record<string, string>>
}
async function resolveKey(
ref: RawKeyRef,
context: string,
env: EnvLookup
): Promise<{ key: string | null; envVarFailed: boolean }> {
if (!isEnvVarReference(ref.rawValue)) return { key: ref.rawValue, envVarFailed: false }
const varName = extractEnvVarName(ref.rawValue)
if (!varName) return { key: null, envVarFailed: true }
const personalVars = env.personalEnvCache.get(ref.userId)
const encryptedValue = env.wsEnvVars[varName] ?? personalVars?.[varName]
if (!encryptedValue) {
console.warn(` [WARN] Env var "${varName}" not found (${context})`)
return { key: null, envVarFailed: true }
}
try {
const decrypted = await decryptSecret(encryptedValue)
return { key: decrypted, envVarFailed: false }
} catch (error) {
console.warn(` [WARN] Failed to decrypt env var "${varName}" (${context}): ${error}`)
return { key: null, envVarFailed: true }
}
}
// ---------- Main ----------
async function run() {
console.log(`Mode: ${DRY_RUN ? 'DRY RUN (audit + preview)' : 'LIVE'}`)
@@ -232,59 +263,54 @@ async function run() {
}
try {
// 1. Find all blocks that match our mapped types or contain nested tools
// 1. Get distinct workspace IDs that have matching blocks
const mappedBlockTypes = Object.keys(BLOCK_TYPE_TO_PROVIDER)
const agentTypes = Object.keys(TOOL_INPUT_SUBBLOCK_IDS)
const allBlockTypes = [...new Set([...mappedBlockTypes, ...agentTypes])]
const rows = await db
.select({
blockId: workflowBlocks.id,
blockName: workflowBlocks.name,
blockType: workflowBlocks.type,
subBlocks: workflowBlocks.subBlocks,
workflowId: workflow.id,
workflowName: workflow.name,
userId: workflow.userId,
workspaceId: workflow.workspaceId,
})
const workspaceIdRows = await db
.selectDistinct({ workspaceId: workflow.workspaceId })
.from(workflowBlocks)
.innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id))
.where(
sql`${workflowBlocks.type} IN (${sql.join(
sql`${workflow.workspaceId} IS NOT NULL AND ${workflowBlocks.type} IN (${sql.join(
allBlockTypes.map((t) => sql`${t}`),
sql`, `
)})`
)
// Group rows by workspace
const workspaceRows = new Map<string, typeof rows>()
let skippedNoWorkspace = 0
for (const row of rows) {
if (!row.workspaceId) {
skippedNoWorkspace++
continue
}
if (!workspaceRows.has(row.workspaceId)) workspaceRows.set(row.workspaceId, [])
workspaceRows.get(row.workspaceId)!.push(row)
}
const workspaceIds = workspaceIdRows
.map((r) => r.workspaceId)
.filter((id): id is string => id !== null)
console.log(`Found ${rows.length} candidate blocks across ${workspaceRows.size} workspaces`)
if (skippedNoWorkspace > 0) console.log(`Skipped ${skippedNoWorkspace} blocks with no workspace`)
console.log()
console.log(`Found ${workspaceIds.length} workspaces with candidate blocks\n`)
// 2. Process one workspace at a time
for (const workspaceId of workspaceIds) {
const blocks = await db
.select({
blockId: workflowBlocks.id,
blockName: workflowBlocks.name,
blockType: workflowBlocks.type,
subBlocks: workflowBlocks.subBlocks,
workflowId: workflow.id,
workflowName: workflow.name,
userId: workflow.userId,
})
.from(workflowBlocks)
.innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id))
.where(
sql`${workflow.workspaceId} = ${workspaceId} AND ${workflowBlocks.type} IN (${sql.join(
allBlockTypes.map((t) => sql`${t}`),
sql`, `
)})`
)
// 2. Iterate per workspace
for (const [workspaceId, blocks] of workspaceRows) {
console.log(`[Workspace ${workspaceId}] ${blocks.length} blocks`)
// 2a. Extract all raw key references grouped by provider
const providerKeys = new Map<string, RawKeyRef[]>()
function addRef(providerId: string, ref: RawKeyRef) {
if (!providerKeys.has(providerId)) providerKeys.set(providerId, [])
providerKeys.get(providerId)!.push(ref)
}
for (const block of blocks) {
const subBlocks = block.subBlocks as Record<string, { value?: any }>
@@ -292,12 +318,14 @@ async function run() {
if (providerId) {
const val = subBlocks?.apiKey?.value
if (typeof val === 'string' && val.trim()) {
addRef(providerId, {
const refs = providerKeys.get(providerId) ?? []
refs.push({
rawValue: val,
blockName: block.blockName,
workflowName: block.workflowName,
userId: block.userId,
})
providerKeys.set(providerId, refs)
}
}
@@ -310,12 +338,14 @@ async function run() {
if (!toolType || !toolApiKey || !toolApiKey.trim()) continue
const toolProviderId = BLOCK_TYPE_TO_PROVIDER[toolType]
if (!toolProviderId) continue
addRef(toolProviderId, {
const refs = providerKeys.get(toolProviderId) ?? []
refs.push({
rawValue: toolApiKey,
blockName: `${block.blockName} > tool "${tool.title || toolType}"`,
workflowName: block.workflowName,
userId: block.userId,
})
providerKeys.set(toolProviderId, refs)
}
}
}
@@ -361,34 +391,7 @@ async function run() {
}
}
async function resolveKey(
ref: RawKeyRef,
context: string
): Promise<string | null> {
if (!isEnvVarReference(ref.rawValue)) return ref.rawValue
const varName = extractEnvVarName(ref.rawValue)
if (!varName) {
stats.envVarFailures++
return null
}
const personalVars = personalEnvCache.get(ref.userId)
const encryptedValue = wsEnvVars[varName] ?? personalVars?.[varName]
if (!encryptedValue) {
console.warn(` [WARN] Env var "${varName}" not found (${context})`)
stats.envVarFailures++
return null
}
try {
return await decryptSecret(encryptedValue)
} catch (error) {
console.warn(` [WARN] Failed to decrypt env var "${varName}" (${context}): ${error}`)
stats.envVarFailures++
return null
}
}
const envLookup: EnvLookup = { wsEnvVars, personalEnvCache }
// 2c. For each provider, detect conflicts then resolve and insert
stats.workspacesProcessed++
@@ -397,7 +400,9 @@ async function run() {
// Resolve all keys for this provider to check for conflicts
const resolved: { ref: RawKeyRef; key: string }[] = []
for (const ref of refs) {
const key = await resolveKey(ref, `"${ref.blockName}" in "${ref.workflowName}"`)
const context = `"${ref.blockName}" in "${ref.workflowName}"`
const { key, envVarFailed } = await resolveKey(ref, context, envLookup)
if (envVarFailed) stats.envVarFailures++
if (key?.trim()) resolved.push({ ref, key })
}