improvement(processing): reduce redundant DB queries in execution preprocessing (#3320)

* improvement(processing): reduce redundant DB queries in execution preprocessing

* improvement(processing): add defensive ID check for prefetched workflow record

* improvement(processing): fix type safety in execution error logging

Replace `as any` cast in non-SSE error path with proper `buildTraceSpans()`
transformation, matching the SSE error path. Remove redundant `as any` cast
in preprocessing.ts where the types already align.

* improvement(processing): replace `as any` casts with proper types in logging

- logger.ts: cast JSONB cost column to `WorkflowExecutionLog['cost']` instead
  of `any` in both `completeWorkflowExecution` and `getWorkflowExecution`
- logger.ts: replace `(orgUsageBefore as any)?.toString?.()` with `String()`
  since COALESCE guarantees a non-null SQL aggregate value
- logging-session.ts: cast JSONB cost to `AccumulatedCost` (the local
  interface) instead of `any` in `loadExistingCost`

* improvement(processing): use exported HighestPrioritySubscription type in usage.ts

Replace inline `Awaited<ReturnType<typeof getHighestPrioritySubscription>>`
with the already-exported `HighestPrioritySubscription` type alias.

* improvement(processing): replace remaining `as any` casts with proper types

- preprocessing.ts: use exported `HighestPrioritySubscription` type instead
  of redeclaring via `Awaited<ReturnType<...>>`
- deploy/route.ts, status/route.ts: cast `hasWorkflowChanged` args to
  `WorkflowState` instead of `any` (JSONB + object literal narrowing)
- state/route.ts: type block sanitization and save with `BlockState` and
  `WorkflowState` instead of `any`
- search-suggestions.ts: remove 8 unnecessary `as any` casts on `'date'`
  literal that already satisfies the `Suggestion['category']` union

* fix(processing): prevent double-billing race in LoggingSession completion

When executeWorkflowCore throws, its catch block fire-and-forgets
safeCompleteWithError, then re-throws. The caller's catch block also
fire-and-forgets safeCompleteWithError on the same LoggingSession. Both
check this.completed (still false) before either's async DB write resolves,
so both proceed to completeWorkflowExecution which uses additive SQL for
billing — doubling the charged cost on every failed execution.

Fix: add a synchronous `completing` flag set immediately before the async
work begins. This blocks concurrent callers at the guard check. On failure,
the flag is reset so the safe* fallback path (completeWithCostOnlyLog) can
still attempt recovery.

* fix(processing): unblock error responses and isolate run-count failures

Remove unnecessary `await waitForCompletion()` from non-SSE and SSE error
paths where no `markAsFailed()` follows — these were blocking error responses
on log persistence for no reason. Wrap `updateWorkflowRunCounts` in its own
try/catch so a run-count DB failure cannot prevent session completion, billing,
and trace span persistence.

* improvement(processing): remove dead setupExecutor method

The method body was just a debug log with an `any` parameter — logging
now works entirely through trace spans with no executor integration.

* remove logger.debug

* fix(processing): guard completionPromise as write-once (singleton promise)

Prevent concurrent safeComplete* calls from overwriting completionPromise
with a no-op. The guard now lives at the assignment site — if a completion
is already in-flight, return its promise instead of starting a new one.
This ensures waitForCompletion() always awaits the real work.

* improvement(processing): remove empty else/catch blocks left by debug log cleanup

* fix(processing): enforce waitForCompletion inside markAsFailed to prevent completion races

Move waitForCompletion() into markAsFailed() so every call site is
automatically safe against in-flight fire-and-forget completions.
Remove the now-redundant external waitForCompletion() calls in route.ts.

* fix(processing): reset completing flag on fallback failure, clean up empty catch

- completeWithCostOnlyLog now resets this.completing = false when
  the fallback itself fails, preventing a permanently stuck session
- Use _disconnectError in MCP test-connection to signal intentional ignore

* fix(processing): restore disconnect error logging in MCP test-connection

Revert unrelated debug log removal — this file isn't part of the
processing improvements and the log aids connection leak detection.

* fix(processing): address audit findings across branch

- preprocessing.ts: use undefined (not null) for failed subscription
  fetch so getUserUsageLimit does a fresh lookup instead of silently
  falling back to free-tier limits
- deployed/route.ts: log warning on loadDeployedWorkflowState failure
  instead of silently swallowing the error
- schedule-execution.ts: remove dead successLog parameter and all
  call-site arguments left over from logger.debug cleanup
- mcp/middleware.ts: drop unused error binding in empty catch
- audit/log.ts, wand.ts: promote logger.debug to logger.warn in catch
  blocks where these are the only failure signal

* revert: undo unnecessary subscription null→undefined change

getHighestPrioritySubscription never throws (it catches internally
and returns null), so the catch block in preprocessExecution is dead
code. The null vs undefined distinction doesn't matter and the
coercions added unnecessary complexity.

* improvement(processing): remove dead try/catch around getHighestPrioritySubscription

getHighestPrioritySubscription catches internally and returns null
on error, so the wrapping try/catch was unreachable dead code.

* improvement(processing): remove dead getSnapshotByHash method

No longer called after createSnapshotWithDeduplication was refactored
to use a single upsert instead of select-then-insert.

---------
This commit is contained in:
Waleed
2026-02-24 11:55:59 -08:00
committed by GitHub
parent 9e817bc5b0
commit 9a31c7d8ad
54 changed files with 350 additions and 629 deletions

View File

@@ -33,7 +33,6 @@ export async function POST(req: NextRequest) {
logger.info(`[${requestId}] Update cost request started`)
if (!isBillingEnabled) {
logger.debug(`[${requestId}] Billing is disabled, skipping cost update`)
return NextResponse.json({
success: true,
message: 'Billing disabled, cost update skipped',

View File

@@ -117,8 +117,6 @@ export async function POST(
const requestId = generateRequestId()
try {
logger.debug(`[${requestId}] Processing OTP request for identifier: ${identifier}`)
const body = await request.json()
const { email } = otpRequestSchema.parse(body)
@@ -211,8 +209,6 @@ export async function PUT(
const requestId = generateRequestId()
try {
logger.debug(`[${requestId}] Verifying OTP for identifier: ${identifier}`)
const body = await request.json()
const { email, otp } = otpVerifySchema.parse(body)

View File

@@ -42,8 +42,6 @@ export async function POST(
const requestId = generateRequestId()
try {
logger.debug(`[${requestId}] Processing chat request for identifier: ${identifier}`)
let parsedBody
try {
const rawBody = await request.json()
@@ -294,8 +292,6 @@ export async function GET(
const requestId = generateRequestId()
try {
logger.debug(`[${requestId}] Fetching chat info for identifier: ${identifier}`)
const deploymentResult = await db
.select({
id: chat.id,

View File

@@ -95,11 +95,6 @@ export async function POST(request: NextRequest) {
const body = await request.json()
const data = CreateCreatorProfileSchema.parse(body)
logger.debug(`[${requestId}] Creating creator profile:`, {
referenceType: data.referenceType,
referenceId: data.referenceId,
})
// Validate permissions
if (data.referenceType === 'user') {
if (data.referenceId !== session.user.id) {

View File

@@ -58,8 +58,6 @@ export async function POST(
const requestId = generateRequestId()
try {
logger.debug(`[${requestId}] Processing form submission for identifier: ${identifier}`)
let parsedBody
try {
const rawBody = await request.json()
@@ -300,8 +298,6 @@ export async function GET(
const requestId = generateRequestId()
try {
logger.debug(`[${requestId}] Fetching form info for identifier: ${identifier}`)
const deploymentResult = await db
.select({
id: form.id,

View File

@@ -77,8 +77,6 @@ export async function POST(req: NextRequest) {
}
}
logger.debug(`[${requestId}] Help request includes ${images.length} images`)
const userId = session.user.id
let emailText = `
Type: ${type}

View File

@@ -186,8 +186,6 @@ export async function POST(request: NextRequest) {
valueTo: filter.valueTo,
}
})
logger.debug(`[${requestId}] Processed ${structuredFilters.length} structured filters`)
}
if (accessibleKbIds.length === 0) {
@@ -220,7 +218,6 @@ export async function POST(request: NextRequest) {
if (!hasQuery && hasFilters) {
// Tag-only search without vector similarity
logger.debug(`[${requestId}] Executing tag-only search with filters:`, structuredFilters)
results = await handleTagOnlySearch({
knowledgeBaseIds: accessibleKbIds,
topK: validatedData.topK,
@@ -244,7 +241,6 @@ export async function POST(request: NextRequest) {
})
} else if (hasQuery && !hasFilters) {
// Vector-only search
logger.debug(`[${requestId}] Executing vector-only search`)
const strategy = getQueryStrategy(accessibleKbIds.length, validatedData.topK)
const queryVector = JSON.stringify(await queryEmbeddingPromise)

View File

@@ -1,11 +1,8 @@
import { db } from '@sim/db'
import { document, embedding } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray, isNull, sql } from 'drizzle-orm'
import type { StructuredFilter } from '@/lib/knowledge/types'
const logger = createLogger('KnowledgeSearchUtils')
export async function getDocumentNamesByIds(
documentIds: string[]
): Promise<Record<string, string>> {
@@ -140,17 +137,12 @@ function buildFilterCondition(filter: StructuredFilter, embeddingTable: any) {
const { tagSlot, fieldType, operator, value, valueTo } = filter
if (!isTagSlotKey(tagSlot)) {
logger.debug(`[getStructuredTagFilters] Unknown tag slot: ${tagSlot}`)
return null
}
const column = embeddingTable[tagSlot]
if (!column) return null
logger.debug(
`[getStructuredTagFilters] Processing ${tagSlot} (${fieldType}) ${operator} ${value}`
)
// Handle text operators
if (fieldType === 'text') {
const stringValue = String(value)
@@ -208,7 +200,6 @@ function buildFilterCondition(filter: StructuredFilter, embeddingTable: any) {
const dateStr = String(value)
// Validate YYYY-MM-DD format
if (!/^\d{4}-\d{2}-\d{2}$/.test(dateStr)) {
logger.debug(`[getStructuredTagFilters] Invalid date format: ${dateStr}, expected YYYY-MM-DD`)
return null
}
@@ -287,9 +278,6 @@ function getStructuredTagFilters(filters: StructuredFilter[], embeddingTable: an
conditions.push(slotConditions[0])
} else {
// Multiple conditions for same slot - OR them together
logger.debug(
`[getStructuredTagFilters] OR'ing ${slotConditions.length} conditions for ${slot}`
)
conditions.push(sql`(${sql.join(slotConditions, sql` OR `)})`)
}
}
@@ -380,8 +368,6 @@ export async function handleTagOnlySearch(params: SearchParams): Promise<SearchR
throw new Error('Tag filters are required for tag-only search')
}
logger.debug(`[handleTagOnlySearch] Executing tag-only search with filters:`, structuredFilters)
const strategy = getQueryStrategy(knowledgeBaseIds.length, topK)
const tagFilterConditions = getStructuredTagFilters(structuredFilters, embedding)
@@ -431,8 +417,6 @@ export async function handleVectorOnlySearch(params: SearchParams): Promise<Sear
throw new Error('Query vector and distance threshold are required for vector-only search')
}
logger.debug(`[handleVectorOnlySearch] Executing vector-only search`)
const strategy = getQueryStrategy(knowledgeBaseIds.length, topK)
const distanceExpr = sql<number>`${embedding.embedding} <=> ${queryVector}::vector`.as('distance')
@@ -489,23 +473,13 @@ export async function handleTagAndVectorSearch(params: SearchParams): Promise<Se
throw new Error('Query vector and distance threshold are required for tag and vector search')
}
logger.debug(
`[handleTagAndVectorSearch] Executing tag + vector search with filters:`,
structuredFilters
)
// Step 1: Filter by tags first
const tagFilteredIds = await executeTagFilterQuery(knowledgeBaseIds, structuredFilters)
if (tagFilteredIds.length === 0) {
logger.debug(`[handleTagAndVectorSearch] No results found after tag filtering`)
return []
}
logger.debug(
`[handleTagAndVectorSearch] Found ${tagFilteredIds.length} results after tag filtering`
)
// Step 2: Perform vector search only on tag-filtered results
return await executeVectorSearchOnIds(
tagFilteredIds.map((r) => r.id),

View File

@@ -34,10 +34,6 @@ export async function GET(
const authenticatedUserId = authResult.userId
logger.debug(
`[${requestId}] Fetching execution data for: ${executionId} (auth: ${authResult.authType})`
)
const [workflowLog] = await db
.select({
id: workflowExecutionLogs.id,
@@ -125,11 +121,6 @@ export async function GET(
},
}
logger.debug(`[${requestId}] Successfully fetched execution data for: ${executionId}`)
logger.debug(
`[${requestId}] Workflow state contains ${Object.keys((snapshot.stateData as any)?.blocks || {}).length} blocks`
)
return NextResponse.json(response)
} catch (error) {
logger.error(`[${requestId}] Error fetching execution data:`, error)

View File

@@ -83,7 +83,6 @@ export const POST = withMcpAuth('read')(
serverId: serverId,
serverName: 'provided-schema',
} as McpTool
logger.debug(`[${requestId}] Using provided schema for ${toolName}, skipping discovery`)
} else {
const tools = await mcpService.discoverServerTools(userId, serverId, workspaceId)
tool = tools.find((t) => t.name === toolName) ?? null

View File

@@ -26,7 +26,6 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
try {
const { id: scheduleId } = await params
logger.debug(`[${requestId}] Reactivating schedule with ID: ${scheduleId}`)
const session = await getSession()
if (!session?.user?.id) {

View File

@@ -51,7 +51,6 @@ export async function GET(request: NextRequest) {
lastQueuedAt: workflowSchedule.lastQueuedAt,
})
logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`)
logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`)
const jobQueue = await getJobQueue()

View File

@@ -24,8 +24,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
try {
const session = await getSession()
logger.debug(`[${requestId}] Fetching template: ${id}`)
const result = await db
.select({
template: templates,
@@ -74,8 +72,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
views: sql`${templates.views} + 1`,
})
.where(eq(templates.id, id))
logger.debug(`[${requestId}] Incremented view count for template: ${id}`)
} catch (viewError) {
logger.warn(`[${requestId}] Failed to increment view count for template: ${id}`, viewError)
}

View File

@@ -58,8 +58,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
logger.debug(`[${requestId}] Adding star for template: ${id}, user: ${session.user.id}`)
// Verify the template exists
const templateExists = await db
.select({ id: templates.id })
@@ -133,8 +131,6 @@ export async function DELETE(
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
logger.debug(`[${requestId}] Removing star for template: ${id}, user: ${session.user.id}`)
// Check if the star exists
const existingStar = await db
.select({ id: templateStars.id })

View File

@@ -68,8 +68,6 @@ export async function GET(request: NextRequest) {
const { searchParams } = new URL(request.url)
const params = QueryParamsSchema.parse(Object.fromEntries(searchParams.entries()))
logger.debug(`[${requestId}] Fetching templates with params:`, params)
// Check if user is a super user
const { effectiveSuperUser } = await verifyEffectiveSuperUser(session.user.id)
const isSuperUser = effectiveSuperUser
@@ -187,11 +185,6 @@ export async function POST(request: NextRequest) {
const body = await request.json()
const data = CreateTemplateSchema.parse(body)
logger.debug(`[${requestId}] Creating template:`, {
name: data.name,
workflowId: data.workflowId,
})
// Verify the workflow exists and belongs to the user
const workflowExists = await db
.select({ id: workflow.id })

View File

@@ -18,7 +18,6 @@ export async function DELETE(
const { id } = await params
try {
logger.debug(`[${requestId}] Deleting API key: ${id}`)
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })

View File

@@ -103,12 +103,10 @@ async function updateUserStatsForWand(
isBYOK = false
): Promise<void> {
if (!isBillingEnabled) {
logger.debug(`[${requestId}] Billing is disabled, skipping wand usage cost update`)
return
}
if (!usage.total_tokens || usage.total_tokens <= 0) {
logger.debug(`[${requestId}] No tokens to update in user stats`)
return
}
@@ -146,13 +144,6 @@ async function updateUserStatsForWand(
})
.where(eq(userStats.userId, userId))
logger.debug(`[${requestId}] Updated user stats for wand usage`, {
userId,
tokensUsed: totalTokens,
costAdded: costToStore,
isBYOK,
})
await logModelUsage({
userId,
source: 'wand',
@@ -291,23 +282,8 @@ export async function POST(req: NextRequest) {
messages.push({ role: 'user', content: prompt })
logger.debug(
`[${requestId}] Calling ${useWandAzure ? 'Azure OpenAI' : 'OpenAI'} API for wand generation`,
{
stream,
historyLength: history.length,
endpoint: useWandAzure ? azureEndpoint : 'api.openai.com',
model: useWandAzure ? wandModelName : 'gpt-4o',
apiVersion: useWandAzure ? azureApiVersion : 'N/A',
}
)
if (stream) {
try {
logger.debug(
`[${requestId}] Starting streaming request to ${useWandAzure ? 'Azure OpenAI' : 'OpenAI'}`
)
logger.info(
`[${requestId}] About to create stream with model: ${useWandAzure ? wandModelName : 'gpt-4o'}`
)
@@ -327,8 +303,6 @@ export async function POST(req: NextRequest) {
headers.Authorization = `Bearer ${activeOpenAIKey}`
}
logger.debug(`[${requestId}] Making streaming request to: ${apiUrl}`)
const response = await fetch(apiUrl, {
method: 'POST',
headers,
@@ -429,7 +403,6 @@ export async function POST(req: NextRequest) {
try {
parsed = JSON.parse(data)
} catch (parseError) {
logger.debug(`[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}`)
continue
}

View File

@@ -21,7 +21,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
try {
const { id } = await params
logger.debug(`[${requestId}] Fetching webhook with ID: ${id}`)
const auth = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
@@ -77,7 +76,6 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
try {
const { id } = await params
logger.debug(`[${requestId}] Updating webhook with ID: ${id}`)
const auth = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
@@ -129,11 +127,6 @@ export async function PATCH(request: NextRequest, { params }: { params: Promise<
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
logger.debug(`[${requestId}] Updating webhook properties`, {
hasActiveUpdate: isActive !== undefined,
hasFailedCountUpdate: failedCount !== undefined,
})
const updatedWebhook = await db
.update(webhook)
.set({
@@ -161,7 +154,6 @@ export async function DELETE(
try {
const { id } = await params
logger.debug(`[${requestId}] Deleting webhook with ID: ${id}`)
const auth = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {

View File

@@ -112,7 +112,6 @@ export async function GET(request: NextRequest) {
return NextResponse.json({ webhooks: [] }, { status: 200 })
}
logger.debug(`[${requestId}] Fetching workspace-accessible webhooks for ${session.user.id}`)
const workspacePermissionRows = await db
.select({ workspaceId: permissions.entityId })
.from(permissions)

View File

@@ -35,8 +35,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
)
}
logger.debug(`[${requestId}] Checking chat deployment status for workflow: ${id}`)
// Find any active chat deployments for this workflow
const deploymentResults = await db
.select({

View File

@@ -22,6 +22,7 @@ import {
} from '@/lib/workflows/schedules'
import { validateWorkflowPermissions } from '@/lib/workflows/utils'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
const logger = createLogger('WorkflowDeployAPI')
@@ -33,8 +34,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
const { id } = await params
try {
logger.debug(`[${requestId}] Fetching deployment info for workflow: ${id}`)
const { error, workflow: workflowData } = await validateWorkflowPermissions(
id,
requestId,
@@ -86,7 +85,10 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
variables: workflowRecord?.variables || {},
}
const { hasWorkflowChanged } = await import('@/lib/workflows/comparison')
needsRedeployment = hasWorkflowChanged(currentState as any, active.state as any)
needsRedeployment = hasWorkflowChanged(
currentState as WorkflowState,
active.state as WorkflowState
)
}
}
@@ -112,8 +114,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
const { id } = await params
try {
logger.debug(`[${requestId}] Deploying workflow: ${id}`)
const {
error,
session,
@@ -355,8 +355,6 @@ export async function DELETE(
const { id } = await params
try {
logger.debug(`[${requestId}] Undeploying workflow: ${id}`)
const {
error,
session,

View File

@@ -21,8 +21,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
const { id } = await params
try {
logger.debug(`[${requestId}] Fetching deployed state for workflow: ${id}`)
const authHeader = request.headers.get('authorization')
let isInternalCall = false
@@ -38,8 +36,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
const response = createErrorResponse(error.message, error.status)
return addNoCacheHeaders(response)
}
} else {
logger.debug(`[${requestId}] Internal API call for deployed workflow: ${id}`)
}
let deployedState = null
@@ -52,7 +48,8 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
parallels: data.parallels,
variables: data.variables,
}
} catch {
} catch (error) {
logger.warn(`[${requestId}] Failed to load deployed state for workflow ${id}`, { error })
deployedState = null
}

View File

@@ -22,7 +22,6 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import {
cleanupExecutionBase64Cache,
hydrateUserFilesWithBase64,
@@ -409,18 +408,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const shouldUseDraftState = isPublicApiAccess
? false
: (useDraftState ?? auth.authType === 'session')
const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({
workflowId,
userId,
action: shouldUseDraftState ? 'write' : 'read',
})
if (!workflowAuthorization.allowed) {
return NextResponse.json(
{ error: workflowAuthorization.message || 'Access denied' },
{ status: workflowAuthorization.status }
)
}
const streamHeader = req.headers.get('X-Stream-Response') === 'true'
const enableSSE = streamHeader || streamParam === true
const executionModeHeader = req.headers.get('X-Execution-Mode')
@@ -455,6 +442,21 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const useAuthenticatedUserAsActor =
isClientSession || (auth.authType === 'api_key' && auth.apiKeyType === 'personal')
// Authorization fetches the full workflow record and checks workspace permissions.
// Run it first so we can pass the record to preprocessing (eliminates a duplicate DB query).
const workflowAuthorization = await authorizeWorkflowByWorkspacePermission({
workflowId,
userId,
action: shouldUseDraftState ? 'write' : 'read',
})
if (!workflowAuthorization.allowed) {
return NextResponse.json(
{ error: workflowAuthorization.message || 'Access denied' },
{ status: workflowAuthorization.status }
)
}
// Pass the pre-fetched workflow record to skip the redundant Step 1 DB query in preprocessing.
const preprocessResult = await preprocessExecution({
workflowId,
userId,
@@ -465,6 +467,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
loggingSession,
useDraftState: shouldUseDraftState,
useAuthenticatedUserAsActor,
workflowRecord: workflowAuthorization.workflow ?? undefined,
})
if (!preprocessResult.success) {
@@ -514,7 +517,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
try {
const workflowData = shouldUseDraftState
? await loadWorkflowFromNormalizedTables(workflowId)
: await loadDeployedWorkflowState(workflowId)
: await loadDeployedWorkflowState(workflowId, workspaceId)
if (workflowData) {
const deployedVariables =
@@ -694,12 +697,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
await loggingSession.safeCompleteWithError({
totalDurationMs: executionResult?.metadata?.duration,
error: { message: errorMessage },
traceSpans: executionResult?.logs as any,
})
return NextResponse.json(
{
success: false,
@@ -718,11 +715,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
} finally {
timeoutController.cleanup()
if (executionId) {
try {
await cleanupExecutionBase64Cache(executionId)
} catch (error) {
void cleanupExecutionBase64Cache(executionId).catch((error) => {
logger.error(`[${requestId}] Failed to cleanup base64 cache`, { error })
}
})
}
}
}
@@ -1123,15 +1118,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`, { isTimeout })
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans, totalDuration } = executionResult
? buildTraceSpans(executionResult)
: { traceSpans: [], totalDuration: 0 }
await loggingSession.safeCompleteWithError({
totalDurationMs: totalDuration || executionResult?.metadata?.duration,
error: { message: errorMessage },
traceSpans,
})
sendEvent({
type: 'execution:error',

View File

@@ -77,18 +77,9 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
}
}
logger.debug(`[${requestId}] Attempting to load workflow ${workflowId} from normalized tables`)
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
if (normalizedData) {
logger.debug(`[${requestId}] Found normalized data for workflow ${workflowId}:`, {
blocksCount: Object.keys(normalizedData.blocks).length,
edgesCount: normalizedData.edges.length,
loopsCount: Object.keys(normalizedData.loops).length,
parallelsCount: Object.keys(normalizedData.parallels).length,
loops: normalizedData.loops,
})
const finalWorkflowData = {
...workflowData,
state: {

View File

@@ -11,7 +11,7 @@ import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
import { sanitizeAgentToolsInBlocks } from '@/lib/workflows/sanitization/validation'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
import type { BlockState } from '@/stores/workflows/workflow/types'
import type { BlockState, WorkflowState } from '@/stores/workflows/workflow/types'
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
const logger = createLogger('WorkflowStateAPI')
@@ -153,13 +153,15 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
}
// Sanitize custom tools in agent blocks before saving
const { blocks: sanitizedBlocks, warnings } = sanitizeAgentToolsInBlocks(state.blocks as any)
const { blocks: sanitizedBlocks, warnings } = sanitizeAgentToolsInBlocks(
state.blocks as Record<string, BlockState>
)
// Save to normalized tables
// Ensure all required fields are present for WorkflowState type
// Filter out blocks without type or name before saving
const filteredBlocks = Object.entries(sanitizedBlocks).reduce(
(acc, [blockId, block]: [string, any]) => {
(acc, [blockId, block]: [string, BlockState]) => {
if (block.type && block.name) {
// Ensure all required fields are present
acc[blockId] = {
@@ -191,7 +193,10 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
deployedAt: state.deployedAt,
}
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowState as any)
const saveResult = await saveWorkflowToNormalizedTables(
workflowId,
workflowState as WorkflowState
)
if (!saveResult.success) {
logger.error(`[${requestId}] Failed to save workflow ${workflowId} state:`, saveResult.error)

View File

@@ -7,6 +7,7 @@ import { hasWorkflowChanged } from '@/lib/workflows/comparison'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
const logger = createLogger('WorkflowStatusAPI')
@@ -64,7 +65,10 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
.limit(1)
if (active?.state) {
needsRedeployment = hasWorkflowChanged(currentState as any, active.state as any)
needsRedeployment = hasWorkflowChanged(
currentState as WorkflowState,
active.state as WorkflowState
)
}
}

View File

@@ -40,15 +40,10 @@ async function applyScheduleUpdate(
scheduleId: string,
updates: WorkflowScheduleUpdate,
requestId: string,
context: string,
successLog?: string
context: string
) {
try {
await db.update(workflowSchedule).set(updates).where(eq(workflowSchedule.id, scheduleId))
if (successLog) {
logger.debug(`[${requestId}] ${successLog}`)
}
} catch (error) {
logger.error(`[${requestId}] ${context}`, error)
}
@@ -132,8 +127,10 @@ async function runWorkflowExecution({
asyncTimeout?: number
}): Promise<RunWorkflowResult> {
try {
logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`)
const deployedData = await loadDeployedWorkflowState(payload.workflowId)
const deployedData = await loadDeployedWorkflowState(
payload.workflowId,
workflowRecord.workspaceId ?? undefined
)
const blocks = deployedData.blocks
const { deploymentVersionId } = deployedData
@@ -351,8 +348,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
status: 'disabled',
},
requestId,
`Failed to disable schedule ${payload.scheduleId} after authentication error`,
`Disabled schedule ${payload.scheduleId} due to authentication failure (401)`
`Failed to disable schedule ${payload.scheduleId} after authentication error`
)
return
}
@@ -370,8 +366,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
status: 'disabled',
},
requestId,
`Failed to disable schedule ${payload.scheduleId} after authorization error`,
`Disabled schedule ${payload.scheduleId} due to authorization failure (403)`
`Failed to disable schedule ${payload.scheduleId} after authorization error`
)
return
}
@@ -386,8 +381,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
status: 'disabled',
},
requestId,
`Failed to disable schedule ${payload.scheduleId} after missing workflow`,
`Disabled schedule ${payload.scheduleId} because the workflow no longer exists`
`Failed to disable schedule ${payload.scheduleId} after missing workflow`
)
return
}
@@ -404,8 +398,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
nextRunAt: nextRetryAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} for rate limit`,
`Updated next retry time for schedule ${payload.scheduleId} due to rate limit`
`Error updating schedule ${payload.scheduleId} for rate limit`
)
return
}
@@ -421,8 +414,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
nextRunAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} after usage limit check`,
`Scheduled next run for ${payload.scheduleId} after usage limit`
`Error updating schedule ${payload.scheduleId} after usage limit check`
)
}
return
@@ -450,8 +442,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
status: shouldDisable ? 'disabled' : 'active',
},
requestId,
`Error updating schedule ${payload.scheduleId} after preprocessing failure`,
`Updated schedule ${payload.scheduleId} after preprocessing failure`
`Error updating schedule ${payload.scheduleId} after preprocessing failure`
)
return
}
@@ -503,8 +494,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
lastQueuedAt: null,
},
requestId,
`Error updating schedule ${payload.scheduleId} after success`,
`Updated next run time for workflow ${payload.workflowId} to ${nextRunAt.toISOString()}`
`Error updating schedule ${payload.scheduleId} after success`
)
return
}
@@ -531,8 +521,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
status: shouldDisable ? 'disabled' : 'active',
},
requestId,
`Error updating schedule ${payload.scheduleId} after failure`,
`Updated schedule ${payload.scheduleId} after failure`
`Error updating schedule ${payload.scheduleId} after failure`
)
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : String(error)
@@ -550,8 +539,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
nextRunAt: nextRetryAt,
},
requestId,
`Error updating schedule ${payload.scheduleId} for service overload`,
`Updated schedule ${payload.scheduleId} retry time due to service overload`
`Error updating schedule ${payload.scheduleId} for service overload`
)
return
}
@@ -578,8 +566,7 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
status: shouldDisable ? 'disabled' : 'active',
},
requestId,
`Error updating schedule ${payload.scheduleId} after execution error`,
`Updated schedule ${payload.scheduleId} after execution error`
`Error updating schedule ${payload.scheduleId} after execution error`
)
}
} catch (error: unknown) {

View File

@@ -440,7 +440,6 @@ async function executeWebhookJobInternal(
const triggerConfig = getTrigger(resolvedTriggerId)
if (triggerConfig.outputs) {
logger.debug(`[${requestId}] Processing trigger ${resolvedTriggerId} file outputs`)
const processedInput = await processTriggerFileOutputs(input, triggerConfig.outputs, {
workspaceId,
workflowId: payload.workflowId,
@@ -450,8 +449,6 @@ async function executeWebhookJobInternal(
})
safeAssign(input, processedInput as Record<string, unknown>)
}
} else {
logger.debug(`[${requestId}] No valid triggerId found for block ${payload.blockId}`)
}
} catch (error) {
logger.error(`[${requestId}] Error processing trigger file outputs:`, error)
@@ -469,7 +466,6 @@ async function executeWebhookJobInternal(
name: string
type: 'string' | 'number' | 'boolean' | 'object' | 'array' | 'file[]'
}>
logger.debug(`[${requestId}] Processing generic webhook files from inputFormat`)
const fileFields = inputFormat.filter((field) => field.type === 'file[]')

View File

@@ -7,7 +7,6 @@ import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { getWorkflowById } from '@/lib/workflows/utils'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import { hasExecutionResult } from '@/executor/utils/errors'
@@ -79,10 +78,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
variables: {},
})
const workflow = await getWorkflowById(workflowId)
if (!workflow) {
throw new Error(`Workflow ${workflowId} not found after preprocessing`)
}
const workflow = preprocessResult.workflowRecord!
const metadata: ExecutionMetadata = {
requestId,

View File

@@ -215,7 +215,7 @@ async function insertAuditLog(params: AuditLogParams): Promise<void> {
actorName = row?.name ?? undefined
actorEmail = row?.email ?? undefined
} catch (error) {
logger.debug('Failed to resolve actor info', { error, actorId: params.actorId })
logger.warn('Failed to resolve actor info', { error, actorId: params.actorId })
}
}

View File

@@ -2,6 +2,7 @@ import { db } from '@sim/db'
import { member, organization, userStats } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray } from 'drizzle-orm'
import type { HighestPrioritySubscription } from '@/lib/billing/core/plan'
import { getUserUsageLimit } from '@/lib/billing/core/usage'
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
@@ -21,7 +22,10 @@ interface UsageData {
* Checks a user's cost usage against their subscription plan limit
* and returns usage information including whether they're approaching the limit
*/
export async function checkUsageStatus(userId: string): Promise<UsageData> {
export async function checkUsageStatus(
userId: string,
preloadedSubscription?: HighestPrioritySubscription
): Promise<UsageData> {
try {
// If billing is disabled, always return permissive limits
if (!isBillingEnabled) {
@@ -42,7 +46,7 @@ export async function checkUsageStatus(userId: string): Promise<UsageData> {
}
// Get usage limit from user_stats (per-user cap)
const limit = await getUserUsageLimit(userId)
const limit = await getUserUsageLimit(userId, preloadedSubscription)
logger.info('Using stored usage limit', { userId, limit })
// Get actual usage from the database
@@ -228,7 +232,10 @@ export async function checkAndNotifyUsage(userId: string): Promise<void> {
* @param userId The ID of the user to check
* @returns An object containing the exceeded status and usage details
*/
export async function checkServerSideUsageLimits(userId: string): Promise<{
export async function checkServerSideUsageLimits(
userId: string,
preloadedSubscription?: HighestPrioritySubscription
): Promise<{
isExceeded: boolean
currentUsage: number
limit: number
@@ -314,7 +321,7 @@ export async function checkServerSideUsageLimits(userId: string): Promise<{
}
}
const usageData = await checkUsageStatus(userId)
const usageData = await checkUsageStatus(userId, preloadedSubscription)
return {
isExceeded: usageData.isExceeded,

View File

@@ -6,6 +6,8 @@ import { checkEnterprisePlan, checkProPlan, checkTeamPlan } from '@/lib/billing/
const logger = createLogger('PlanLookup')
export type HighestPrioritySubscription = Awaited<ReturnType<typeof getHighestPrioritySubscription>>
/**
* Get the highest priority active subscription for a user
* Priority: Enterprise > Team > Pro > Free

View File

@@ -7,7 +7,10 @@ import {
renderFreeTierUpgradeEmail,
renderUsageThresholdEmail,
} from '@/components/emails'
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
import {
getHighestPrioritySubscription,
type HighestPrioritySubscription,
} from '@/lib/billing/core/plan'
import {
canEditUsageLimit,
getFreeTierLimit,
@@ -352,8 +355,14 @@ export async function updateUserUsageLimit(
* Free/Pro: Individual user limit from userStats
* Team/Enterprise: Organization limit
*/
export async function getUserUsageLimit(userId: string): Promise<number> {
const subscription = await getHighestPrioritySubscription(userId)
export async function getUserUsageLimit(
userId: string,
preloadedSubscription?: HighestPrioritySubscription
): Promise<number> {
const subscription =
preloadedSubscription !== undefined
? preloadedSubscription
: await getHighestPrioritySubscription(userId)
if (!subscription || subscription.plan === 'free' || subscription.plan === 'pro') {
// Free/Pro: Use individual limit from userStats

View File

@@ -154,7 +154,6 @@ export async function restoreUserProSubscription(userId: string): Promise<Restor
.where(eq(subscriptionTable.id, personalPro.id))
result.restored = true
logger.info('Restored personal Pro subscription', {
userId,
subscriptionId: personalPro.id,

View File

@@ -44,8 +44,6 @@ export async function processExecutionFile(
)
}
logger.debug(`[${requestId}] Uploading file: ${file.name} (${buffer.length} bytes)`)
const userFile = await uploadExecutionFile(
executionContext,
buffer,
@@ -54,7 +52,6 @@ export async function processExecutionFile(
userId
)
logger.debug(`[${requestId}] Successfully uploaded ${file.name}`)
return userFile
}
@@ -69,8 +66,6 @@ export async function processExecutionFile(
)
}
logger.debug(`[${requestId}] Uploading file from URL: ${file.name} (${buffer.length} bytes)`)
const userFile = await uploadExecutionFile(
executionContext,
buffer,
@@ -79,7 +74,6 @@ export async function processExecutionFile(
userId
)
logger.debug(`[${requestId}] Successfully uploaded ${file.name} from URL`)
return userFile
}

View File

@@ -3,6 +3,7 @@ import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { checkServerSideUsageLimits } from '@/lib/billing/calculations/usage-monitor'
import type { HighestPrioritySubscription } from '@/lib/billing/core/plan'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getExecutionTimeout } from '@/lib/core/execution-limits'
import { RateLimiter } from '@/lib/core/rate-limiter/rate-limiter'
@@ -39,6 +40,8 @@ export interface PreprocessExecutionOptions {
useAuthenticatedUserAsActor?: boolean // If true, use the authenticated userId as actorUserId (for client-side executions and personal API keys)
/** @deprecated No longer used - background/async executions always use deployed state */
useDraftState?: boolean
/** Pre-fetched workflow record to skip the Step 1 DB query. Must be a full workflow table row. */
workflowRecord?: WorkflowRecord
}
/**
@@ -66,7 +69,7 @@ export interface PreprocessExecutionResult {
}
type WorkflowRecord = typeof workflow.$inferSelect
type SubscriptionInfo = Awaited<ReturnType<typeof getHighestPrioritySubscription>>
type SubscriptionInfo = HighestPrioritySubscription
export async function preprocessExecution(
options: PreprocessExecutionOptions
@@ -84,6 +87,7 @@ export async function preprocessExecution(
loggingSession: providedLoggingSession,
isResumeContext: _isResumeContext = false,
useAuthenticatedUserAsActor = false,
workflowRecord: prefetchedWorkflowRecord,
} = options
logger.info(`[${requestId}] Starting execution preprocessing`, {
@@ -94,58 +98,69 @@ export async function preprocessExecution(
})
// ========== STEP 1: Validate Workflow Exists ==========
let workflowRecord: WorkflowRecord | null = null
try {
const records = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1)
if (prefetchedWorkflowRecord && prefetchedWorkflowRecord.id !== workflowId) {
logger.error(`[${requestId}] Prefetched workflow record ID mismatch`, {
expected: workflowId,
received: prefetchedWorkflowRecord.id,
})
throw new Error(
`Prefetched workflow record ID mismatch: expected ${workflowId}, got ${prefetchedWorkflowRecord.id}`
)
}
let workflowRecord: WorkflowRecord | null = prefetchedWorkflowRecord ?? null
if (!workflowRecord) {
try {
const records = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1)
if (records.length === 0) {
logger.warn(`[${requestId}] Workflow not found: ${workflowId}`)
if (records.length === 0) {
logger.warn(`[${requestId}] Workflow not found: ${workflowId}`)
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: 'unknown',
workspaceId: '',
errorMessage:
'Workflow not found. The workflow may have been deleted or is no longer accessible.',
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: 'Workflow not found',
statusCode: 404,
logCreated: true,
},
}
}
workflowRecord = records[0]
} catch (error) {
logger.error(`[${requestId}] Error fetching workflow`, { error, workflowId })
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: 'unknown',
workspaceId: '',
errorMessage:
'Workflow not found. The workflow may have been deleted or is no longer accessible.',
userId: userId || 'unknown',
workspaceId: providedWorkspaceId || '',
errorMessage: 'Internal error while fetching workflow',
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: 'Workflow not found',
statusCode: 404,
message: 'Internal error while fetching workflow',
statusCode: 500,
logCreated: true,
},
}
}
workflowRecord = records[0]
} catch (error) {
logger.error(`[${requestId}] Error fetching workflow`, { error, workflowId })
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: userId || 'unknown',
workspaceId: providedWorkspaceId || '',
errorMessage: 'Internal error while fetching workflow',
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: 'Internal error while fetching workflow',
statusCode: 500,
logCreated: true,
},
}
}
const workspaceId = workflowRecord.workspaceId || providedWorkspaceId || ''
@@ -249,20 +264,77 @@ export async function preprocessExecution(
}
}
// ========== STEP 4: Get User Subscription ==========
let userSubscription: SubscriptionInfo = null
try {
userSubscription = await getHighestPrioritySubscription(actorUserId)
logger.debug(`[${requestId}] User subscription retrieved`, {
actorUserId,
hasSub: !!userSubscription,
plan: userSubscription?.plan,
})
} catch (error) {
logger.error(`[${requestId}] Error fetching subscription`, { error, actorUserId })
// ========== STEP 4: Get Subscription ==========
const userSubscription = await getHighestPrioritySubscription(actorUserId)
// ========== STEP 5: Check Usage Limits ==========
if (!skipUsageLimits) {
try {
const usageCheck = await checkServerSideUsageLimits(actorUserId, userSubscription)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${actorUserId} has exceeded usage limits. Blocking execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId,
triggerType,
}
)
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: actorUserId,
workspaceId,
errorMessage:
usageCheck.message ||
`Usage limit exceeded: $${usageCheck.currentUsage?.toFixed(2)} used of $${usageCheck.limit?.toFixed(2)} limit. Please upgrade your plan to continue.`,
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
statusCode: 402,
logCreated: true,
},
}
}
} catch (error) {
logger.error(`[${requestId}] Error checking usage limits`, {
error,
actorUserId,
})
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: actorUserId,
workspaceId,
errorMessage:
'Unable to determine usage limits. Execution blocked for security. Please contact support.',
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: 'Unable to determine usage limits. Execution blocked for security.',
statusCode: 500,
logCreated: true,
},
}
}
}
// ========== STEP 5: Check Rate Limits ==========
// ========== STEP 6: Check Rate Limits ==========
let rateLimitInfo: { allowed: boolean; remaining: number; resetAt: Date } | undefined
if (checkRateLimit) {
@@ -302,10 +374,6 @@ export async function preprocessExecution(
},
}
}
logger.debug(`[${requestId}] Rate limit check passed`, {
remaining: rateLimitInfo.remaining,
})
} catch (error) {
logger.error(`[${requestId}] Error checking rate limits`, { error, actorUserId })
@@ -331,78 +399,6 @@ export async function preprocessExecution(
}
}
// ========== STEP 6: Check Usage Limits (CRITICAL) ==========
if (!skipUsageLimits) {
try {
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${actorUserId} has exceeded usage limits. Blocking execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
workflowId,
triggerType,
}
)
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: actorUserId,
workspaceId,
errorMessage:
usageCheck.message ||
`Usage limit exceeded: $${usageCheck.currentUsage?.toFixed(2)} used of $${usageCheck.limit?.toFixed(2)} limit. Please upgrade your plan to continue.`,
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message:
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.',
statusCode: 402,
logCreated: true,
},
}
}
logger.debug(`[${requestId}] Usage limit check passed`, {
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
})
} catch (error) {
logger.error(`[${requestId}] Error checking usage limits`, { error, actorUserId })
await logPreprocessingError({
workflowId,
executionId,
triggerType,
requestId,
userId: actorUserId,
workspaceId,
errorMessage:
'Unable to determine usage limits. Execution blocked for security. Please contact support.',
loggingSession: providedLoggingSession,
})
return {
success: false,
error: {
message: 'Unable to determine usage limits. Execution blocked for security.',
statusCode: 500,
logCreated: true,
},
}
}
} else {
logger.debug(`[${requestId}] Skipping usage limits check (test mode)`)
}
// ========== SUCCESS: All Checks Passed ==========
logger.info(`[${requestId}] All preprocessing checks passed`, {
workflowId,
@@ -461,7 +457,7 @@ async function logPreprocessingError(params: {
try {
const session =
loggingSession || new LoggingSession(workflowId, executionId, triggerType as any, requestId)
loggingSession || new LoggingSession(workflowId, executionId, triggerType, requestId)
await session.safeStart({
userId,
@@ -477,11 +473,6 @@ async function logPreprocessingError(params: {
traceSpans: [],
skipCost: true, // Preprocessing errors should not charge - no execution occurred
})
logger.debug(`[${requestId}] Logged preprocessing error to database`, {
workflowId,
executionId,
})
} catch (error) {
logger.error(`[${requestId}] Failed to log preprocessing error`, {
error,

View File

@@ -54,12 +54,6 @@ async function queryKnowledgeBase(
authHeaders?: { cookie?: string; authorization?: string }
): Promise<string[]> {
try {
logger.info(`[${requestId}] Querying knowledge base`, {
knowledgeBaseId,
query: query.substring(0, 100),
topK,
})
// Call the knowledge base search API directly
const searchUrl = `${getInternalApiBaseUrl()}/api/knowledge/search`
@@ -90,8 +84,6 @@ async function queryKnowledgeBase(
const chunks = results.map((r: any) => r.content || '').filter((c: string) => c.length > 0)
logger.info(`[${requestId}] Retrieved ${chunks.length} chunks from knowledge base`)
return chunks
} catch (error: any) {
logger.error(`[${requestId}] Error querying knowledge base`, {
@@ -194,7 +186,6 @@ Evaluate the consistency and provide your score and reasoning in JSON format.`
}
const content = response.content.trim()
logger.debug(`[${requestId}] LLM response:`, { content })
let jsonContent = content

View File

@@ -359,9 +359,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
.leftJoin(userStats, eq(member.userId, userStats.userId))
.where(eq(member.organizationId, sub.referenceId))
.limit(1)
const orgUsageBeforeNum = Number.parseFloat(
(orgUsageBefore as any)?.toString?.() || '0'
)
const orgUsageBeforeNum = Number.parseFloat(String(orgUsageBefore ?? '0'))
await this.updateUserStats(
updatedLog.workflowId,
@@ -433,7 +431,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
endedAt: updatedLog.endedAt?.toISOString() || endedAt,
totalDurationMs: updatedLog.totalDurationMs || totalDurationMs,
executionData: updatedLog.executionData as WorkflowExecutionLog['executionData'],
cost: updatedLog.cost as any,
cost: updatedLog.cost as WorkflowExecutionLog['cost'],
createdAt: updatedLog.createdAt.toISOString(),
}
@@ -467,7 +465,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
endedAt: workflowLog.endedAt?.toISOString() || workflowLog.startedAt.toISOString(),
totalDurationMs: workflowLog.totalDurationMs || 0,
executionData: workflowLog.executionData as WorkflowExecutionLog['executionData'],
cost: workflowLog.cost as any,
cost: workflowLog.cost as WorkflowExecutionLog['cost'],
createdAt: workflowLog.createdAt.toISOString(),
}
}

View File

@@ -89,6 +89,10 @@ export class LoggingSession {
private workflowState?: WorkflowState
private isResume = false
private completed = false
/** Synchronous flag to prevent concurrent completion attempts (race condition guard) */
private completing = false
/** Tracks the in-flight completion promise so callers can await it */
private completionPromise: Promise<void> | null = null
private accumulatedCost: AccumulatedCost = {
total: BASE_EXECUTION_CHARGE,
input: 0,
@@ -186,7 +190,7 @@ export class LoggingSession {
.limit(1)
if (existing?.cost) {
const cost = existing.cost as any
const cost = existing.cost as AccumulatedCost
this.accumulatedCost = {
total: cost.total || BASE_EXECUTION_CHARGE,
input: cost.input || 0,
@@ -235,18 +239,9 @@ export class LoggingSession {
workflowState: this.workflowState,
deploymentVersionId,
})
if (this.requestId) {
logger.debug(`[${this.requestId}] Started logging for execution ${this.executionId}`)
}
} else {
this.isResume = true
await this.loadExistingCost()
if (this.requestId) {
logger.debug(
`[${this.requestId}] Resuming logging for existing execution ${this.executionId}`
)
}
}
} catch (error) {
if (this.requestId) {
@@ -256,20 +251,11 @@ export class LoggingSession {
}
}
/**
* Set up logging on an executor instance
* Note: Logging now works through trace spans only, no direct executor integration needed
*/
setupExecutor(executor: any): void {
if (this.requestId) {
logger.debug(`[${this.requestId}] Logging session ready for execution ${this.executionId}`)
}
}
async complete(params: SessionCompleteParams = {}): Promise<void> {
if (this.completed) {
if (this.completed || this.completing) {
return
}
this.completing = true
const { endedAt, totalDurationMs, finalOutput, traceSpans, workflowInput, executionState } =
params
@@ -336,11 +322,8 @@ export class LoggingSession {
// Silently fail
}
}
if (this.requestId) {
logger.debug(`[${this.requestId}] Completed logging for execution ${this.executionId}`)
}
} catch (error) {
this.completing = false
logger.error(`Failed to complete logging for execution ${this.executionId}:`, {
requestId: this.requestId,
workflowId: this.workflowId,
@@ -353,9 +336,10 @@ export class LoggingSession {
}
async completeWithError(params: SessionErrorCompleteParams = {}): Promise<void> {
if (this.completed) {
if (this.completed || this.completing) {
return
}
this.completing = true
try {
const { endedAt, totalDurationMs, error, traceSpans, skipCost } = params
@@ -455,6 +439,7 @@ export class LoggingSession {
)
}
} catch (enhancedError) {
this.completing = false
logger.error(`Failed to complete error logging for execution ${this.executionId}:`, {
requestId: this.requestId,
workflowId: this.workflowId,
@@ -467,9 +452,10 @@ export class LoggingSession {
}
async completeWithCancellation(params: SessionCancelledParams = {}): Promise<void> {
if (this.completed) {
if (this.completed || this.completing) {
return
}
this.completing = true
try {
const { endedAt, totalDurationMs, traceSpans } = params
@@ -540,6 +526,7 @@ export class LoggingSession {
)
}
} catch (cancelError) {
this.completing = false
logger.error(`Failed to complete cancelled logging for execution ${this.executionId}:`, {
requestId: this.requestId,
workflowId: this.workflowId,
@@ -686,7 +673,28 @@ export class LoggingSession {
}
}
/**
* Wait for any in-flight fire-and-forget completion to finish.
* Called internally by markAsFailed to ensure completion has settled
* before overwriting execution status.
*/
async waitForCompletion(): Promise<void> {
if (this.completionPromise) {
try {
await this.completionPromise
} catch {
/* already handled by safe* wrapper */
}
}
}
async safeComplete(params: SessionCompleteParams = {}): Promise<void> {
if (this.completionPromise) return this.completionPromise
this.completionPromise = this._safeCompleteImpl(params)
return this.completionPromise
}
private async _safeCompleteImpl(params: SessionCompleteParams = {}): Promise<void> {
try {
await this.complete(params)
} catch (error) {
@@ -706,6 +714,12 @@ export class LoggingSession {
}
async safeCompleteWithError(params?: SessionErrorCompleteParams): Promise<void> {
if (this.completionPromise) return this.completionPromise
this.completionPromise = this._safeCompleteWithErrorImpl(params)
return this.completionPromise
}
private async _safeCompleteWithErrorImpl(params?: SessionErrorCompleteParams): Promise<void> {
try {
await this.completeWithError(params)
} catch (error) {
@@ -727,6 +741,12 @@ export class LoggingSession {
}
async safeCompleteWithCancellation(params?: SessionCancelledParams): Promise<void> {
if (this.completionPromise) return this.completionPromise
this.completionPromise = this._safeCompleteWithCancellationImpl(params)
return this.completionPromise
}
private async _safeCompleteWithCancellationImpl(params?: SessionCancelledParams): Promise<void> {
try {
await this.completeWithCancellation(params)
} catch (error) {
@@ -747,6 +767,12 @@ export class LoggingSession {
}
async safeCompleteWithPause(params?: SessionPausedParams): Promise<void> {
if (this.completionPromise) return this.completionPromise
this.completionPromise = this._safeCompleteWithPauseImpl(params)
return this.completionPromise
}
private async _safeCompleteWithPauseImpl(params?: SessionPausedParams): Promise<void> {
try {
await this.completeWithPause(params)
} catch (error) {
@@ -767,6 +793,7 @@ export class LoggingSession {
}
async markAsFailed(errorMessage?: string): Promise<void> {
await this.waitForCompletion()
await LoggingSession.markExecutionAsFailed(this.executionId, errorMessage, this.requestId)
}
@@ -810,9 +837,10 @@ export class LoggingSession {
isError: boolean
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
}): Promise<void> {
if (this.completed) {
if (this.completed || this.completing) {
return
}
this.completing = true
logger.warn(
`[${this.requestId || 'unknown'}] Logging completion failed for execution ${this.executionId} - attempting cost-only fallback`
@@ -851,6 +879,7 @@ export class LoggingSession {
`[${this.requestId || 'unknown'}] Cost-only fallback succeeded for execution ${this.executionId}`
)
} catch (fallbackError) {
this.completing = false
logger.error(
`[${this.requestId || 'unknown'}] Cost-only fallback also failed for execution ${this.executionId}:`,
{ error: fallbackError instanceof Error ? fallbackError.message : String(fallbackError) }

View File

@@ -82,30 +82,6 @@ export class SnapshotService implements ISnapshotService {
}
}
async getSnapshotByHash(
workflowId: string,
hash: string
): Promise<WorkflowExecutionSnapshot | null> {
const [snapshot] = await db
.select()
.from(workflowExecutionSnapshots)
.where(
and(
eq(workflowExecutionSnapshots.workflowId, workflowId),
eq(workflowExecutionSnapshots.stateHash, hash)
)
)
.limit(1)
if (!snapshot) return null
return {
...snapshot,
stateData: snapshot.stateData as WorkflowState,
createdAt: snapshot.createdAt.toISOString(),
}
}
computeStateHash(state: WorkflowState): string {
const normalizedState = normalizeWorkflowState(state)
const stateString = normalizedStringify(normalizedState)

View File

@@ -433,7 +433,7 @@ export class SearchSuggestions {
value: `date:${partial}`,
label: `${this.formatDateLabel(startDate)} to ${this.formatDateLabel(endDate)}`,
description: 'Custom date range',
category: 'date' as any,
category: 'date',
})
return suggestions
}
@@ -446,7 +446,7 @@ export class SearchSuggestions {
value: `date:${startDate}..`,
label: `${this.formatDateLabel(startDate)} to ...`,
description: 'Type end date (YYYY-MM-DD)',
category: 'date' as any,
category: 'date',
})
return suggestions
}
@@ -458,7 +458,7 @@ export class SearchSuggestions {
value: `date:${partial}`,
label: `Year ${partial}`,
description: 'All logs from this year',
category: 'date' as any,
category: 'date',
})
return suggestions
}
@@ -486,7 +486,7 @@ export class SearchSuggestions {
value: `date:${partial}`,
label: `${monthName} ${year}`,
description: 'All logs from this month',
category: 'date' as any,
category: 'date',
})
return suggestions
}
@@ -500,7 +500,7 @@ export class SearchSuggestions {
value: `date:${partial}`,
label: this.formatDateLabel(partial),
description: 'Single date',
category: 'date' as any,
category: 'date',
})
// Also suggest starting a range
suggestions.push({
@@ -508,7 +508,7 @@ export class SearchSuggestions {
value: `date:${partial}..`,
label: `${this.formatDateLabel(partial)} to ...`,
description: 'Start a date range',
category: 'date' as any,
category: 'date',
})
}
return suggestions
@@ -521,7 +521,7 @@ export class SearchSuggestions {
value: `date:${partial}`,
label: partial,
description: 'Continue typing: YYYY, YYYY-MM, or YYYY-MM-DD',
category: 'date' as any,
category: 'date',
})
}

View File

@@ -337,7 +337,6 @@ export interface BatchInsertResult<T> {
export interface SnapshotService {
createSnapshot(workflowId: string, state: WorkflowState): Promise<WorkflowExecutionSnapshot>
getSnapshot(id: string): Promise<WorkflowExecutionSnapshot | null>
getSnapshotByHash(workflowId: string, hash: string): Promise<WorkflowExecutionSnapshot | null>
computeStateHash(state: WorkflowState): string
cleanupOrphanedSnapshots(olderThanDays: number): Promise<number>
}

View File

@@ -71,9 +71,7 @@ async function validateMcpAuth(
workspaceId = body.workspaceId
;(request as any)._parsedBody = body
}
} catch (error) {
logger.debug(`[${requestId}] Could not parse request body for workspaceId extraction`)
}
} catch {}
}
if (!workspaceId) {

View File

@@ -318,7 +318,6 @@ class McpService {
try {
const cached = await this.cacheAdapter.get(cacheKey)
if (cached) {
logger.debug(`[${requestId}] Using cached tools for user ${userId}`)
return cached.tools
}
} catch (error) {

View File

@@ -68,7 +68,6 @@ export async function syncMcpToolsForWorkflow(options: SyncOptions): Promise<voi
.where(eq(workflowMcpTool.workflowId, workflowId))
if (tools.length === 0) {
logger.debug(`[${requestId}] No MCP tools to sync for workflow: ${workflowId}`)
return
}

View File

@@ -54,7 +54,7 @@ export async function enrichTableSchema(
const label = table.name ? `${table.name} (${tableId})` : tableId
return `Table schema for ${label}:\n${columnLines}\nBuilt-in columns: createdAt, updatedAt`
} catch (error) {
logger.debug('Failed to fetch table schema', { tableId, error })
logger.warn('Failed to fetch table schema', { tableId, error })
return null
}
}

View File

@@ -771,7 +771,6 @@ export async function cleanupWebhooksForWorkflow(
)
if (existingWebhooks.length === 0) {
logger.debug(`[${requestId}] No webhooks to clean up for workflow ${workflowId}`)
return
}
@@ -833,7 +832,6 @@ export async function restorePreviousVersionWebhooks(params: {
.where(eq(webhook.deploymentVersionId, previousVersionId))
if (previousWebhooks.length === 0) {
logger.debug(`[${requestId}] No previous webhooks to restore for version ${previousVersionId}`)
return
}

View File

@@ -474,7 +474,6 @@ function buildGmailSearchQuery(config: {
async function searchEmails(accessToken: string, config: GmailWebhookConfig, requestId: string) {
try {
const baseQuery = buildGmailSearchQuery(config)
logger.debug(`[${requestId}] Gmail search query: ${baseQuery}`)
let timeConstraint = ''
@@ -491,25 +490,19 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req
const timestamp = Math.floor(cutoffTime.getTime() / 1000)
timeConstraint = ` after:${timestamp}`
logger.debug(`[${requestId}] Using timestamp-based query with ${bufferSeconds}s buffer`)
} else if (minutesSinceLastCheck < 24 * 60) {
const hours = Math.ceil(minutesSinceLastCheck / 60) + 1 // Round up and add 1 hour buffer
timeConstraint = ` newer_than:${hours}h`
logger.debug(`[${requestId}] Using hour-based query: newer_than:${hours}h`)
} else {
const days = Math.min(Math.ceil(minutesSinceLastCheck / (24 * 60)), 7) + 1
timeConstraint = ` newer_than:${days}d`
logger.debug(`[${requestId}] Using day-based query: newer_than:${days}d`)
}
} else {
timeConstraint = ' newer_than:1d'
logger.debug(`[${requestId}] No last check time, using default: newer_than:1d`)
}
const query = `${baseQuery}${timeConstraint}`
logger.info(`[${requestId}] Searching for emails with query: ${query}`)
const searchUrl = `https://gmail.googleapis.com/gmail/v1/users/me/messages?q=${encodeURIComponent(query)}&maxResults=${config.maxEmailsPerPoll || 25}`
const searchResponse = await fetch(searchUrl, {
@@ -562,7 +555,6 @@ async function searchEmails(accessToken: string, config: GmailWebhookConfig, req
if (emails.length > 0 && emails[0].historyId) {
latestHistoryId = emails[0].historyId
logger.debug(`[${requestId}] Updated historyId to ${latestHistoryId}`)
}
return { emails, latestHistoryId }
@@ -704,10 +696,6 @@ async function processEmails(
...(config.includeRawEmail ? { rawEmail: email } : {}),
}
logger.debug(
`[${requestId}] Sending ${config.includeRawEmail ? 'simplified + raw' : 'simplified'} email payload for ${email.id}`
)
const webhookUrl = `${getInternalApiBaseUrl()}/api/webhooks/trigger/${webhookData.path}`
const response = await fetch(webhookUrl, {

View File

@@ -285,7 +285,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string) {
try {
await client.connect()
logger.debug(`[${requestId}] Connected to IMAP server ${config.host}`)
const maxEmails = config.maxEmailsPerPoll || 25
let totalEmailsCollected = 0
@@ -295,7 +294,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string) {
try {
const mailbox = await client.mailboxOpen(mailboxPath)
logger.debug(`[${requestId}] Opened mailbox: ${mailbox.path}, exists: ${mailbox.exists}`)
// Parse search criteria - expects JSON object from UI
let searchCriteria: any = { unseen: true }
@@ -335,14 +333,10 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string) {
const searchResult = await client.search(searchCriteria, { uid: true })
messageUids = searchResult === false ? [] : searchResult
} catch (searchError) {
logger.debug(
`[${requestId}] Search returned no messages for ${mailboxPath}: ${searchError}`
)
continue
}
if (messageUids.length === 0) {
logger.debug(`[${requestId}] No messages matching criteria in ${mailboxPath}`)
continue
}
@@ -357,8 +351,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string) {
)
}
logger.info(`[${requestId}] Processing ${uidsToProcess.length} emails from ${mailboxPath}`)
for await (const msg of client.fetch(
uidsToProcess,
{
@@ -384,7 +376,6 @@ async function fetchNewEmails(config: ImapWebhookConfig, requestId: string) {
}
await client.logout()
logger.debug(`[${requestId}] Disconnected from IMAP server`)
return { emails, latestUidByMailbox }
} catch (error) {

View File

@@ -107,7 +107,6 @@ export async function parseWebhookBody(
// Allow empty body - some webhooks send empty payloads
if (!rawBody || rawBody.length === 0) {
logger.debug(`[${requestId}] Received request with empty body, treating as empty object`)
return { body: {}, rawBody: '' }
}
} catch (bodyError) {
@@ -127,19 +126,15 @@ export async function parseWebhookBody(
if (payloadString) {
body = JSON.parse(payloadString)
logger.debug(`[${requestId}] Parsed form-encoded GitHub webhook payload`)
} else {
body = Object.fromEntries(formData.entries())
logger.debug(`[${requestId}] Parsed form-encoded webhook data (direct fields)`)
}
} else {
body = JSON.parse(rawBody)
logger.debug(`[${requestId}] Parsed JSON webhook payload`)
}
// Allow empty JSON objects - some webhooks send empty payloads
if (Object.keys(body).length === 0) {
logger.debug(`[${requestId}] Received empty JSON object`)
}
} catch (parseError) {
logger.error(`[${requestId}] Failed to parse webhook body`, {
@@ -499,8 +494,6 @@ export async function verifyProviderAuth(
logger.warn(`[${requestId}] Microsoft Teams HMAC signature verification failed`)
return new NextResponse('Unauthorized - Invalid HMAC signature', { status: 401 })
}
logger.debug(`[${requestId}] Microsoft Teams HMAC signature verified successfully`)
}
}
@@ -578,8 +571,6 @@ export async function verifyProviderAuth(
})
return new NextResponse('Unauthorized - Invalid Twilio signature', { status: 401 })
}
logger.debug(`[${requestId}] Twilio Voice signature verified successfully`)
}
}
@@ -603,8 +594,6 @@ export async function verifyProviderAuth(
})
return new NextResponse('Unauthorized - Invalid Typeform signature', { status: 401 })
}
logger.debug(`[${requestId}] Typeform signature verified successfully`)
}
}
@@ -628,8 +617,6 @@ export async function verifyProviderAuth(
})
return new NextResponse('Unauthorized - Invalid Linear signature', { status: 401 })
}
logger.debug(`[${requestId}] Linear signature verified successfully`)
}
}
@@ -653,8 +640,6 @@ export async function verifyProviderAuth(
})
return new NextResponse('Unauthorized - Invalid Circleback signature', { status: 401 })
}
logger.debug(`[${requestId}] Circleback signature verified successfully`)
}
}
@@ -678,8 +663,6 @@ export async function verifyProviderAuth(
})
return new NextResponse('Unauthorized - Invalid Cal.com signature', { status: 401 })
}
logger.debug(`[${requestId}] Cal.com signature verified successfully`)
}
}
@@ -703,8 +686,6 @@ export async function verifyProviderAuth(
})
return new NextResponse('Unauthorized - Invalid Jira signature', { status: 401 })
}
logger.debug(`[${requestId}] Jira signature verified successfully`)
}
}
@@ -728,8 +709,6 @@ export async function verifyProviderAuth(
})
return new NextResponse('Unauthorized - Invalid Confluence signature', { status: 401 })
}
logger.debug(`[${requestId}] Confluence signature verified successfully`)
}
}
@@ -757,10 +736,6 @@ export async function verifyProviderAuth(
})
return new NextResponse('Unauthorized - Invalid GitHub signature', { status: 401 })
}
logger.debug(`[${requestId}] GitHub signature verified successfully`, {
usingSha256: !!signature256,
})
}
}
@@ -784,8 +759,6 @@ export async function verifyProviderAuth(
})
return new NextResponse('Unauthorized - Invalid Fireflies signature', { status: 401 })
}
logger.debug(`[${requestId}] Fireflies signature verified successfully`)
}
}
@@ -870,10 +843,6 @@ export async function checkWebhookPreprocessing(
return NextResponse.json({ error: error.message }, { status: error.statusCode })
}
logger.debug(`[${requestId}] Webhook preprocessing passed`, {
provider: foundWebhook.provider,
})
return null
} catch (preprocessError) {
logger.error(`[${requestId}] Error during webhook preprocessing:`, preprocessError)

View File

@@ -263,8 +263,6 @@ async function fetchNewRssItems(
requestId: string
): Promise<{ feed: RssFeed; items: RssItem[] }> {
try {
logger.debug(`[${requestId}] Fetching RSS feed: ${config.feedUrl}`)
const urlValidation = await validateUrlWithDNS(config.feedUrl, 'feedUrl')
if (!urlValidation.isValid) {
logger.error(`[${requestId}] Invalid RSS feed URL: ${urlValidation.error}`)
@@ -288,7 +286,6 @@ async function fetchNewRssItems(
const feed = await parser.parseString(xmlContent)
if (!feed.items || !feed.items.length) {
logger.debug(`[${requestId}] No items in feed`)
return { feed: feed as RssFeed, items: [] }
}

View File

@@ -67,7 +67,6 @@ export async function handleWhatsAppVerification(
const verificationToken = providerConfig.verificationToken
if (!verificationToken) {
logger.debug(`[${requestId}] Webhook ${wh.id} has no verification token, skipping`)
continue
}
@@ -1599,7 +1598,6 @@ export function verifyProviderWebhook(
case 'telegram': {
// Check User-Agent to ensure it's not blocked by middleware
const userAgent = request.headers.get('user-agent') || ''
logger.debug(`[${requestId}] Telegram webhook request received with User-Agent: ${userAgent}`)
if (!userAgent) {
logger.warn(
@@ -1613,8 +1611,6 @@ export function verifyProviderWebhook(
request.headers.get('x-real-ip') ||
'unknown'
logger.debug(`[${requestId}] Telegram webhook request from IP: ${clientIp}`)
break
}
case 'microsoft-teams':
@@ -1774,14 +1770,8 @@ export async function fetchAndProcessAirtablePayloads(
if (storedCursor && typeof storedCursor === 'number') {
currentCursor = storedCursor
logger.debug(
`[${requestId}] Using stored cursor: ${currentCursor} for webhook ${webhookData.id}`
)
} else {
currentCursor = null
logger.debug(
`[${requestId}] No valid stored cursor for webhook ${webhookData.id}, starting from beginning`
)
}
let accessToken: string | null = null
@@ -1797,8 +1787,6 @@ export async function fetchAndProcessAirtablePayloads(
)
throw new Error('Airtable access token not found.')
}
logger.info(`[${requestId}] Successfully obtained Airtable access token`)
} catch (tokenError: any) {
logger.error(
`[${requestId}] Failed to get Airtable OAuth token for credential ${credentialId}`,
@@ -1818,10 +1806,6 @@ export async function fetchAndProcessAirtablePayloads(
apiCallCount++
// Safety break
if (apiCallCount > 10) {
logger.warn(`[${requestId}] Reached maximum polling limit (10 calls)`, {
webhookId: webhookData.id,
consolidatedCount: consolidatedChangesMap.size,
})
mightHaveMore = false
break
}
@@ -1833,11 +1817,6 @@ export async function fetchAndProcessAirtablePayloads(
}
const fullUrl = `${apiUrl}?${queryParams.toString()}`
logger.debug(`[${requestId}] Fetching Airtable payloads (call ${apiCallCount})`, {
url: fullUrl,
webhookId: webhookData.id,
})
try {
const fetchStartTime = Date.now()
const response = await fetch(fullUrl, {
@@ -1848,14 +1827,6 @@ export async function fetchAndProcessAirtablePayloads(
},
})
// DEBUG: Log API response time
logger.debug(`[${requestId}] TRACE: Airtable API response received`, {
status: response.status,
duration: `${Date.now() - fetchStartTime}ms`,
hasBody: true,
apiCall: apiCallCount,
})
const responseBody = await response.json()
if (!response.ok || responseBody.error) {
@@ -1877,9 +1848,6 @@ export async function fetchAndProcessAirtablePayloads(
}
const receivedPayloads = responseBody.payloads || []
logger.debug(
`[${requestId}] Received ${receivedPayloads.length} payloads from Airtable (call ${apiCallCount})`
)
// --- Process and Consolidate Changes ---
if (receivedPayloads.length > 0) {
@@ -1891,13 +1859,6 @@ export async function fetchAndProcessAirtablePayloads(
let changeCount = 0
for (const payload of receivedPayloads) {
if (payload.changedTablesById) {
// DEBUG: Log tables being processed
const tableIds = Object.keys(payload.changedTablesById)
logger.debug(`[${requestId}] TRACE: Processing changes for tables`, {
tables: tableIds,
payloadTimestamp: payload.timestamp,
})
for (const [tableId, tableChangesUntyped] of Object.entries(
payload.changedTablesById
)) {
@@ -1907,10 +1868,6 @@ export async function fetchAndProcessAirtablePayloads(
if (tableChanges.createdRecordsById) {
const createdCount = Object.keys(tableChanges.createdRecordsById).length
changeCount += createdCount
// DEBUG: Log created records count
logger.debug(
`[${requestId}] TRACE: Processing ${createdCount} created records for table ${tableId}`
)
for (const [recordId, recordDataUntyped] of Object.entries(
tableChanges.createdRecordsById
@@ -1940,10 +1897,6 @@ export async function fetchAndProcessAirtablePayloads(
if (tableChanges.changedRecordsById) {
const updatedCount = Object.keys(tableChanges.changedRecordsById).length
changeCount += updatedCount
// DEBUG: Log updated records count
logger.debug(
`[${requestId}] TRACE: Processing ${updatedCount} updated records for table ${tableId}`
)
for (const [recordId, recordDataUntyped] of Object.entries(
tableChanges.changedRecordsById
@@ -1980,21 +1933,12 @@ export async function fetchAndProcessAirtablePayloads(
}
}
}
// DEBUG: Log totals for this batch
logger.debug(
`[${requestId}] TRACE: Processed ${changeCount} changes in API call ${apiCallCount})`,
{
currentMapSize: consolidatedChangesMap.size,
}
)
}
const nextCursor = responseBody.cursor
mightHaveMore = responseBody.mightHaveMore || false
if (nextCursor && typeof nextCursor === 'number' && nextCursor !== currentCursor) {
logger.debug(`[${requestId}] Updating cursor from ${currentCursor} to ${nextCursor}`)
currentCursor = nextCursor
// Follow exactly the old implementation - use awaited update instead of parallel
@@ -2031,7 +1975,6 @@ export async function fetchAndProcessAirtablePayloads(
})
mightHaveMore = false
} else if (nextCursor === currentCursor) {
logger.debug(`[${requestId}] Cursor hasn't changed (${currentCursor}), stopping poll`)
mightHaveMore = false // Explicitly stop if cursor hasn't changed
}
} catch (fetchError: any) {
@@ -2123,14 +2066,6 @@ export async function fetchAndProcessAirtablePayloads(
)
// Error logging handled by logging session
}
// DEBUG: Log function completion
logger.debug(`[${requestId}] TRACE: fetchAndProcessAirtablePayloads completed`, {
totalFetched: payloadsFetched,
totalApiCalls: apiCallCount,
totalChanges: consolidatedChangesMap.size,
timestamp: new Date().toISOString(),
})
}
// Define an interface for AirtableChange

View File

@@ -342,8 +342,6 @@ export async function executeWorkflowCore(
contextExtensions,
})
loggingSession.setupExecutor(executorInstance)
// Convert initial workflow variables to their native types
if (workflowVariables) {
for (const [varId, variable] of Object.entries(workflowVariables)) {
@@ -362,60 +360,52 @@ export async function executeWorkflowCore(
)) as ExecutionResult)
: ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult)
// Build trace spans for logging from the full execution result
const { traceSpans, totalDuration } = buildTraceSpans(result)
// Fire-and-forget: post-execution logging, billing, and cleanup
void (async () => {
try {
const { traceSpans, totalDuration } = buildTraceSpans(result)
// Update workflow run counts
if (result.success && result.status !== 'paused') {
await updateWorkflowRunCounts(workflowId)
}
if (result.success && result.status !== 'paused') {
try {
await updateWorkflowRunCounts(workflowId)
} catch (runCountError) {
logger.error(`[${requestId}] Failed to update run counts`, { error: runCountError })
}
}
if (result.status === 'cancelled') {
await loggingSession.safeCompleteWithCancellation({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
})
if (result.status === 'cancelled') {
await loggingSession.safeCompleteWithCancellation({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
})
} else if (result.status === 'paused') {
await loggingSession.safeCompleteWithPause({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
workflowInput: processedInput,
})
} else {
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: result.output || {},
traceSpans: traceSpans || [],
workflowInput: processedInput,
executionState: result.executionState,
})
}
await clearExecutionCancellation(executionId)
logger.info(`[${requestId}] Workflow execution cancelled`, {
duration: result.metadata?.duration,
})
return result
}
if (result.status === 'paused') {
await loggingSession.safeCompleteWithPause({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
traceSpans: traceSpans || [],
workflowInput: processedInput,
})
await clearExecutionCancellation(executionId)
logger.info(`[${requestId}] Workflow execution paused`, {
duration: result.metadata?.duration,
})
return result
}
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: totalDuration || 0,
finalOutput: result.output || {},
traceSpans: traceSpans || [],
workflowInput: processedInput,
executionState: result.executionState,
})
await clearExecutionCancellation(executionId)
await clearExecutionCancellation(executionId)
} catch (postExecError) {
logger.error(`[${requestId}] Post-execution logging failed`, { error: postExecError })
}
})()
logger.info(`[${requestId}] Workflow execution completed`, {
success: result.success,
status: result.status,
duration: result.metadata?.duration,
})
@@ -423,20 +413,31 @@ export async function executeWorkflowCore(
} catch (error: unknown) {
logger.error(`[${requestId}] Execution failed:`, error)
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
// Fire-and-forget: error logging and cleanup
void (async () => {
try {
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
const { traceSpans } = executionResult
? buildTraceSpans(executionResult)
: { traceSpans: [] }
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: executionResult?.metadata?.duration || 0,
error: {
message: error instanceof Error ? error.message : 'Execution failed',
stackTrace: error instanceof Error ? error.stack : undefined,
},
traceSpans,
})
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: executionResult?.metadata?.duration || 0,
error: {
message: error instanceof Error ? error.message : 'Execution failed',
stackTrace: error instanceof Error ? error.stack : undefined,
},
traceSpans,
})
await clearExecutionCancellation(executionId)
await clearExecutionCancellation(executionId)
} catch (postExecError) {
logger.error(`[${requestId}] Post-execution error logging failed`, {
error: postExecError,
})
}
})()
throw error
}

View File

@@ -76,7 +76,10 @@ export async function blockExistsInDeployment(
}
}
export async function loadDeployedWorkflowState(workflowId: string): Promise<DeployedWorkflowData> {
export async function loadDeployedWorkflowState(
workflowId: string,
providedWorkspaceId?: string
): Promise<DeployedWorkflowData> {
try {
const [active] = await db
.select({
@@ -100,15 +103,19 @@ export async function loadDeployedWorkflowState(workflowId: string): Promise<Dep
const state = active.state as WorkflowState & { variables?: Record<string, unknown> }
const [wfRow] = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
let resolvedWorkspaceId = providedWorkspaceId
if (!resolvedWorkspaceId) {
const [wfRow] = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
resolvedWorkspaceId = wfRow?.workspaceId ?? undefined
}
const resolvedBlocks = state.blocks || {}
const { blocks: migratedBlocks } = wfRow?.workspaceId
? await migrateCredentialIds(resolvedBlocks, wfRow.workspaceId)
const { blocks: migratedBlocks } = resolvedWorkspaceId
? await migrateCredentialIds(resolvedBlocks, resolvedWorkspaceId)
: { blocks: resolvedBlocks }
return {