Make migration concurrent

This commit is contained in:
Theodore Li
2026-03-10 00:18:58 -07:00
parent 11f402b031
commit efb23dad5e

View File

@@ -215,6 +215,10 @@ const workspaceEnvironment = pgTable('workspace_environment', {
variables: json('variables').notNull().default('{}'),
})
const WORKSPACE_CONCURRENCY = 100
const WORKSPACE_BATCH_SIZE = 1000
const SLEEP_MS = 30_000
// ---------- DB ----------
const postgresClient = postgres(CONNECTION_STRING, {
prepare: false,
@@ -225,22 +229,6 @@ const postgresClient = postgres(CONNECTION_STRING, {
})
const db = drizzle(postgresClient)
// ---------- Throttle ----------
const WORKSPACE_BATCH_SIZE = 1000
const SLEEP_MS = 30_000
let workspaceCount = 0
async function throttleBetweenWorkspaces(workspaceId: string, total: number) {
workspaceCount++
console.log(` [${workspaceCount}/${total}] Done with workspace ${workspaceId}`)
if (workspaceCount % WORKSPACE_BATCH_SIZE === 0) {
console.log(
` [THROTTLE] ${workspaceCount}/${total} workspaces — sleeping ${SLEEP_MS / 1000}s`
)
await new Promise((r) => setTimeout(r, SLEEP_MS))
}
}
// ---------- Helpers ----------
const TOOL_INPUT_SUBBLOCK_IDS: Record<string, string> = {
agent: 'tools',
@@ -300,6 +288,47 @@ interface ResolveKeyContext {
workspaceOwnerId: string | null
}
type MigrationStats = {
workspacesProcessed: number
workspacesSkipped: number
conflicts: number
inserted: number
skippedExisting: number
errors: number
envVarFailures: number
}
type WorkspaceResult = {
stats: MigrationStats
shouldWriteWorkspaceId: boolean
}
function createEmptyStats(): MigrationStats {
return {
workspacesProcessed: 0,
workspacesSkipped: 0,
conflicts: 0,
inserted: 0,
skippedExisting: 0,
errors: 0,
envVarFailures: 0,
}
}
function mergeStats(target: MigrationStats, source: MigrationStats) {
target.workspacesProcessed += source.workspacesProcessed
target.workspacesSkipped += source.workspacesSkipped
target.conflicts += source.conflicts
target.inserted += source.inserted
target.skippedExisting += source.skippedExisting
target.errors += source.errors
target.envVarFailures += source.envVarFailures
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}
async function resolveKey(
ref: RawKeyRef,
env: EnvLookup,
@@ -341,6 +370,221 @@ async function resolveKey(
}
}
async function processWorkspace(
workspaceId: string,
allBlockTypes: string[],
userFilter: ReturnType<typeof sql>,
total: number,
index: number
): Promise<WorkspaceResult> {
const stats = createEmptyStats()
try {
const [blocks, wsRows] = await Promise.all([
db
.select({
blockId: workflowBlocks.id,
blockName: workflowBlocks.name,
blockType: workflowBlocks.type,
subBlocks: workflowBlocks.subBlocks,
workflowId: workflow.id,
workflowName: workflow.name,
userId: workflow.userId,
})
.from(workflowBlocks)
.innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id))
.where(
sql`${workflow.workspaceId} = ${workspaceId} AND ${workflowBlocks.type} IN (${sql.join(
allBlockTypes.map((t) => sql`${t}`),
sql`, `
)})${userFilter}`
),
db
.select({ ownerId: workspaceTable.ownerId })
.from(workspaceTable)
.where(eq(workspaceTable.id, workspaceId))
.limit(1),
])
const workspaceOwnerId = wsRows[0]?.ownerId ?? null
console.log(
`[${index}/${total}] [Workspace ${workspaceId}] ${blocks.length} blocks, owner=${workspaceOwnerId ?? 'unknown'}`
)
const providerKeys = new Map<string, RawKeyRef[]>()
for (const block of blocks) {
const subBlocks = block.subBlocks as Record<string, { value?: any }>
const providerId = BLOCK_TYPE_TO_PROVIDER[block.blockType]
if (providerId) {
const val = subBlocks?.apiKey?.value
if (typeof val === 'string' && val.trim()) {
const refs = providerKeys.get(providerId) ?? []
refs.push({
rawValue: val,
blockName: block.blockName,
workflowId: block.workflowId,
workflowName: block.workflowName,
userId: block.userId,
})
providerKeys.set(providerId, refs)
}
}
const toolInputId = TOOL_INPUT_SUBBLOCK_IDS[block.blockType]
if (toolInputId) {
const tools = parseToolInputValue(subBlocks?.[toolInputId]?.value)
for (const tool of tools) {
const toolType = tool?.type as string | undefined
const toolApiKey = tool?.params?.apiKey as string | undefined
if (!toolType || !toolApiKey || !toolApiKey.trim()) continue
const toolProviderId = BLOCK_TYPE_TO_PROVIDER[toolType]
if (!toolProviderId) continue
const refs = providerKeys.get(toolProviderId) ?? []
refs.push({
rawValue: toolApiKey,
blockName: `${block.blockName} > tool "${tool.title || toolType}"`,
workflowId: block.workflowId,
workflowName: block.workflowName,
userId: block.userId,
})
providerKeys.set(toolProviderId, refs)
}
}
}
if (providerKeys.size === 0) {
console.log(` [${index}/${total}] No API keys found, skipping\n`)
stats.workspacesSkipped++
return { stats, shouldWriteWorkspaceId: false }
}
const needsEnvVars = [...providerKeys.values()]
.flat()
.some((ref) => isEnvVarReference(ref.rawValue))
let wsEnvVars: Record<string, string> = {}
const personalEnvCache = new Map<string, Record<string, string>>()
if (needsEnvVars) {
const wsEnvRows = await db
.select()
.from(workspaceEnvironment)
.where(sql`${workspaceEnvironment.workspaceId} = ${workspaceId}`)
.limit(1)
if (wsEnvRows[0]) {
wsEnvVars = (wsEnvRows[0].variables as Record<string, string>) || {}
}
const userIds = [...new Set([...providerKeys.values()].flat().map((r) => r.userId))]
if (userIds.length > 0) {
const personalRows = await db
.select()
.from(environment)
.where(
sql`${environment.userId} IN (${sql.join(
userIds.map((id) => sql`${id}`),
sql`, `
)})`
)
for (const row of personalRows) {
personalEnvCache.set(row.userId, (row.variables as Record<string, string>) || {})
}
}
}
const envLookup: EnvLookup = { wsEnvVars, personalEnvCache }
stats.workspacesProcessed++
for (const [providerId, refs] of providerKeys) {
const resolved: { ref: RawKeyRef; key: string; source: KeySource }[] = []
const resolveCtx: ResolveKeyContext = { workspaceId, workspaceOwnerId }
for (const ref of refs) {
const { key, source, envVarFailed } = await resolveKey(ref, envLookup, resolveCtx)
if (envVarFailed) stats.envVarFailures++
if (!key?.trim()) continue
if (source === 'personal' && ref.userId !== workspaceOwnerId) {
console.log(
` [SKIP-PERSONAL] Ignoring non-owner personal key from user=${ref.userId} workflow=${ref.workflowId} "${ref.blockName}" in "${ref.workflowName}"`
)
continue
}
resolved.push({ ref, key: key.trim(), source })
}
if (resolved.length === 0) continue
resolved.sort((a, b) => KEY_SOURCE_PRIORITY[a.source] - KEY_SOURCE_PRIORITY[b.source])
const distinctKeys = new Set(resolved.map((r) => r.key))
if (distinctKeys.size > 1) {
stats.conflicts++
console.log(` [CONFLICT] provider "${providerId}": ${distinctKeys.size} distinct keys`)
for (const { ref, key, source } of resolved) {
const isOwner = ref.userId === workspaceOwnerId ? ' (owner)' : ''
const display = isEnvVarReference(ref.rawValue)
? `${ref.rawValue} -> ${maskKey(key)}`
: maskKey(ref.rawValue)
console.log(
` [${source}] user=${ref.userId}${isOwner} workflow=${ref.workflowId} "${ref.blockName}" in "${ref.workflowName}": ${display}`
)
}
const chosenIsOwner = resolved[0].ref.userId === workspaceOwnerId ? ', owner' : ''
console.log(
` Using highest-priority key (${resolved[0].source}${chosenIsOwner}, user=${resolved[0].ref.userId})`
)
}
const chosen = resolved[0]
if (DRY_RUN) {
console.log(` [DRY RUN] Would insert BYOK for provider "${providerId}": ${maskKey(chosen.key)}`)
continue
}
try {
const encrypted = await encryptSecret(chosen.key)
const result = await db
.insert(workspaceBYOKKeys)
.values({
id: uuidv4(),
workspaceId,
providerId,
encryptedApiKey: encrypted,
createdBy: chosen.ref.userId,
})
.onConflictDoNothing({
target: [workspaceBYOKKeys.workspaceId, workspaceBYOKKeys.providerId],
})
.returning({ id: workspaceBYOKKeys.id })
if (result.length === 0) {
console.log(` [SKIP] BYOK already exists for provider "${providerId}"`)
stats.skippedExisting++
} else {
console.log(` [INSERT] BYOK for provider "${providerId}": ${maskKey(chosen.key)}`)
stats.inserted++
}
} catch (error) {
console.error(` [ERROR] Failed to insert BYOK for provider "${providerId}":`, error)
stats.errors++
}
}
console.log(` [${index}/${total}] Done with workspace ${workspaceId}\n`)
return { stats, shouldWriteWorkspaceId: DRY_RUN }
} catch (error) {
console.error(` [ERROR] Failed workspace ${workspaceId}:`, error)
stats.errors++
return { stats, shouldWriteWorkspaceId: false }
}
}
// ---------- Main ----------
async function run() {
console.log(`Mode: ${DRY_RUN ? 'DRY RUN (audit + preview)' : 'LIVE'}`)
@@ -353,15 +597,7 @@ async function run() {
if (FROM_FILE) console.log(`From file: ${FROM_FILE}`)
console.log('---\n')
const stats = {
workspacesProcessed: 0,
workspacesSkipped: 0,
conflicts: 0,
inserted: 0,
skippedExisting: 0,
errors: 0,
envVarFailures: 0,
}
const stats = createEmptyStats()
try {
// 1. Get distinct workspace IDs that have matching blocks
@@ -409,219 +645,55 @@ async function run() {
console.log(`Loaded ${workspaceIds.length} workspace IDs from ${FROM_FILE}\n`)
}
// 2. Process one workspace at a time
for (const workspaceId of workspaceIds) {
const blocks = await db
.select({
blockId: workflowBlocks.id,
blockName: workflowBlocks.name,
blockType: workflowBlocks.type,
subBlocks: workflowBlocks.subBlocks,
workflowId: workflow.id,
workflowName: workflow.name,
userId: workflow.userId,
})
.from(workflowBlocks)
.innerJoin(workflow, eq(workflowBlocks.workflowId, workflow.id))
.where(
sql`${workflow.workspaceId} = ${workspaceId} AND ${workflowBlocks.type} IN (${sql.join(
allBlockTypes.map((t) => sql`${t}`),
sql`, `
)})${userFilter}`
)
const wsRows = await db
.select({ ownerId: workspaceTable.ownerId })
.from(workspaceTable)
.where(eq(workspaceTable.id, workspaceId))
.limit(1)
const workspaceOwnerId = wsRows[0]?.ownerId ?? null
// 2. Process workspaces in parallel groups of 100, pausing for 60s after each 1000
for (let batchStart = 0; batchStart < workspaceIds.length; batchStart += WORKSPACE_BATCH_SIZE) {
const batch = workspaceIds.slice(batchStart, batchStart + WORKSPACE_BATCH_SIZE)
console.log(
`[Workspace ${workspaceId}] ${blocks.length} blocks, owner=${workspaceOwnerId ?? 'unknown'}`
`[BATCH] Processing workspaces ${batchStart + 1}-${batchStart + batch.length} of ${workspaceIds.length}`
)
// 2a. Extract all raw key references grouped by provider
const providerKeys = new Map<string, RawKeyRef[]>()
for (const block of blocks) {
const subBlocks = block.subBlocks as Record<string, { value?: any }>
const providerId = BLOCK_TYPE_TO_PROVIDER[block.blockType]
if (providerId) {
const val = subBlocks?.apiKey?.value
if (typeof val === 'string' && val.trim()) {
const refs = providerKeys.get(providerId) ?? []
refs.push({
rawValue: val,
blockName: block.blockName,
workflowId: block.workflowId,
workflowName: block.workflowName,
userId: block.userId,
})
providerKeys.set(providerId, refs)
}
}
const toolInputId = TOOL_INPUT_SUBBLOCK_IDS[block.blockType]
if (toolInputId) {
const tools = parseToolInputValue(subBlocks?.[toolInputId]?.value)
for (const tool of tools) {
const toolType = tool?.type as string | undefined
const toolApiKey = tool?.params?.apiKey as string | undefined
if (!toolType || !toolApiKey || !toolApiKey.trim()) continue
const toolProviderId = BLOCK_TYPE_TO_PROVIDER[toolType]
if (!toolProviderId) continue
const refs = providerKeys.get(toolProviderId) ?? []
refs.push({
rawValue: toolApiKey,
blockName: `${block.blockName} > tool "${tool.title || toolType}"`,
workflowId: block.workflowId,
workflowName: block.workflowName,
userId: block.userId,
})
providerKeys.set(toolProviderId, refs)
}
}
}
if (providerKeys.size === 0) {
console.log(' No API keys found, skipping\n')
stats.workspacesSkipped++
continue
}
if (DRY_RUN) {
appendFileSync(resolve('migrate-byok-workspace-ids.txt'), `${workspaceId}\n`)
}
// 2b. Load env vars only if this workspace has env var references
const needsEnvVars = [...providerKeys.values()]
.flat()
.some((ref) => isEnvVarReference(ref.rawValue))
let wsEnvVars: Record<string, string> = {}
const personalEnvCache = new Map<string, Record<string, string>>()
if (needsEnvVars) {
const wsEnvRows = await db
.select()
.from(workspaceEnvironment)
.where(sql`${workspaceEnvironment.workspaceId} = ${workspaceId}`)
.limit(1)
if (wsEnvRows[0]) {
wsEnvVars = (wsEnvRows[0].variables as Record<string, string>) || {}
}
const userIds = [...new Set([...providerKeys.values()].flat().map((r) => r.userId))]
if (userIds.length > 0) {
const personalRows = await db
.select()
.from(environment)
.where(
sql`${environment.userId} IN (${sql.join(
userIds.map((id) => sql`${id}`),
sql`, `
)})`
for (
let concurrencyStart = 0;
concurrencyStart < batch.length;
concurrencyStart += WORKSPACE_CONCURRENCY
) {
const workspaceChunk = batch.slice(
concurrencyStart,
concurrencyStart + WORKSPACE_CONCURRENCY
)
const results = await Promise.all(
workspaceChunk.map((workspaceId, chunkIndex) =>
processWorkspace(
workspaceId,
allBlockTypes,
userFilter,
workspaceIds.length,
batchStart + concurrencyStart + chunkIndex + 1
)
for (const row of personalRows) {
personalEnvCache.set(row.userId, (row.variables as Record<string, string>) || {})
}
}
}
const envLookup: EnvLookup = { wsEnvVars, personalEnvCache }
// 2c. For each provider, detect conflicts then resolve and insert
stats.workspacesProcessed++
for (const [providerId, refs] of providerKeys) {
// Resolve all keys for this provider to check for conflicts
const resolved: { ref: RawKeyRef; key: string; source: KeySource }[] = []
const resolveCtx: ResolveKeyContext = { workspaceId, workspaceOwnerId }
for (const ref of refs) {
const { key, source, envVarFailed } = await resolveKey(ref, envLookup, resolveCtx)
if (envVarFailed) stats.envVarFailures++
if (!key?.trim()) continue
// For personal env vars, only use the workspace owner's — never another user's
if (source === 'personal' && ref.userId !== workspaceOwnerId) {
console.log(
` [SKIP-PERSONAL] Ignoring non-owner personal key from user=${ref.userId} workflow=${ref.workflowId} "${ref.blockName}" in "${ref.workflowName}"`
)
continue
}
resolved.push({ ref, key: key.trim(), source })
}
if (resolved.length === 0) continue
// Sort by priority: plaintext > workspace > personal
resolved.sort((a, b) => KEY_SOURCE_PRIORITY[a.source] - KEY_SOURCE_PRIORITY[b.source])
// Detect conflicting values
const distinctKeys = new Set(resolved.map((r) => r.key))
if (distinctKeys.size > 1) {
stats.conflicts++
console.log(` [CONFLICT] provider "${providerId}": ${distinctKeys.size} distinct keys`)
for (const { ref, key, source } of resolved) {
const isOwner = ref.userId === workspaceOwnerId ? ' (owner)' : ''
const display = isEnvVarReference(ref.rawValue)
? `${ref.rawValue} -> ${maskKey(key)}`
: maskKey(ref.rawValue)
console.log(
` [${source}] user=${ref.userId}${isOwner} workflow=${ref.workflowId} "${ref.blockName}" in "${ref.workflowName}": ${display}`
)
}
const chosenIsOwner = resolved[0].ref.userId === workspaceOwnerId ? ', owner' : ''
console.log(
` Using highest-priority key (${resolved[0].source}${chosenIsOwner}, user=${resolved[0].ref.userId})`
)
}
// Use the highest-priority resolved key
const chosen = resolved[0]
)
if (DRY_RUN) {
console.log(
` [DRY RUN] Would insert BYOK for provider "${providerId}": ${maskKey(chosen.key)}`
)
continue
const workspaceIdsWithKeys = results
.map((result, resultIndex) => (result.shouldWriteWorkspaceId ? workspaceChunk[resultIndex] : null))
.filter((id): id is string => id !== null)
if (workspaceIdsWithKeys.length > 0) {
appendFileSync(resolve('migrate-byok-workspace-ids.txt'), `${workspaceIdsWithKeys.join('\n')}\n`)
}
}
// Insert into BYOK
try {
const encrypted = await encryptSecret(chosen.key)
const result = await db
.insert(workspaceBYOKKeys)
.values({
id: uuidv4(),
workspaceId,
providerId,
encryptedApiKey: encrypted,
createdBy: chosen.ref.userId,
})
.onConflictDoNothing({
target: [workspaceBYOKKeys.workspaceId, workspaceBYOKKeys.providerId],
})
.returning({ id: workspaceBYOKKeys.id })
if (result.length === 0) {
console.log(` [SKIP] BYOK already exists for provider "${providerId}"`)
stats.skippedExisting++
} else {
console.log(` [INSERT] BYOK for provider "${providerId}": ${maskKey(chosen.key)}`)
stats.inserted++
}
} catch (error) {
console.error(` [ERROR] Failed to insert BYOK for provider "${providerId}":`, error)
stats.errors++
for (const result of results) {
mergeStats(stats, result.stats)
}
}
console.log()
await throttleBetweenWorkspaces(workspaceId, workspaceIds.length)
if (batchStart + batch.length < workspaceIds.length) {
console.log(
` [THROTTLE] ${batchStart + batch.length}/${workspaceIds.length} workspaces — sleeping ${SLEEP_MS / 1000}s`
)
await sleep(SLEEP_MS)
}
}
// 3. Summary