improvement(queries): add workspaceId to execution logs, added missing indexes based on query insights (#2471)

* improvement(queries): added missing indexes

* add workspaceId to execution logs

* remove migration to prep merge

* regen migration

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
This commit is contained in:
Waleed
2025-12-20 13:33:10 -08:00
committed by GitHub
parent 6385d82b85
commit 6247f421bc
22 changed files with 8427 additions and 71 deletions

View File

@@ -1,6 +1,6 @@
import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { chat } from '@sim/db/schema'
import { chat, workflow } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
@@ -94,6 +94,21 @@ export async function POST(
if (!deployment.isActive) {
logger.warn(`[${requestId}] Chat is not active: ${identifier}`)
const [workflowRecord] = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, deployment.workflowId))
.limit(1)
const workspaceId = workflowRecord?.workspaceId
if (!workspaceId) {
logger.warn(`[${requestId}] Cannot log: workflow ${deployment.workflowId} has no workspace`)
return addCorsHeaders(
createErrorResponse('This chat is currently unavailable', 403),
request
)
}
const executionId = randomUUID()
const loggingSession = new LoggingSession(
deployment.workflowId,
@@ -104,7 +119,7 @@ export async function POST(
await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: '', // Will be resolved if needed
workspaceId,
variables: {},
})
@@ -169,7 +184,14 @@ export async function POST(
const { actorUserId, workflowRecord } = preprocessResult
const workspaceOwnerId = actorUserId!
const workspaceId = workflowRecord?.workspaceId || ''
const workspaceId = workflowRecord?.workspaceId
if (!workspaceId) {
logger.error(`[${requestId}] Workflow ${deployment.workflowId} has no workspaceId`)
return addCorsHeaders(
createErrorResponse('Workflow has no associated workspace', 500),
request
)
}
try {
const selectedOutputs: string[] = []

View File

@@ -57,7 +57,7 @@ export async function GET(request: NextRequest) {
workflowName: workflow.name,
}
let conditions: SQL | undefined = eq(workflow.workspaceId, params.workspaceId)
let conditions: SQL | undefined = eq(workflowExecutionLogs.workspaceId, params.workspaceId)
if (params.level && params.level !== 'all') {
const levels = params.level.split(',').filter(Boolean)
@@ -134,7 +134,7 @@ export async function GET(request: NextRequest) {
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)

View File

@@ -130,6 +130,8 @@ export async function GET(request: NextRequest) {
deploymentVersionName: sql<null>`NULL`,
}
const workspaceFilter = eq(workflowExecutionLogs.workspaceId, params.workspaceId)
const baseQuery = db
.select(selectColumns)
.from(workflowExecutionLogs)
@@ -141,18 +143,12 @@ export async function GET(request: NextRequest) {
workflowDeploymentVersion,
eq(workflowDeploymentVersion.id, workflowExecutionLogs.deploymentVersionId)
)
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId)
)
)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)
@@ -300,7 +296,7 @@ export async function GET(request: NextRequest) {
}
const logs = await baseQuery
.where(conditions)
.where(and(workspaceFilter, conditions))
.orderBy(desc(workflowExecutionLogs.startedAt))
.limit(params.limit)
.offset(params.offset)
@@ -312,22 +308,16 @@ export async function GET(request: NextRequest) {
pausedExecutions,
eq(pausedExecutions.executionId, workflowExecutionLogs.executionId)
)
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId)
)
)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)
.where(conditions)
.where(and(eq(workflowExecutionLogs.workspaceId, params.workspaceId), conditions))
const countResult = await countQuery

View File

@@ -1,5 +1,5 @@
import { db } from '@sim/db'
import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema'
import { permissions, workflowExecutionLogs } from '@sim/db/schema'
import { and, eq, isNotNull, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
@@ -42,23 +42,17 @@ export async function GET(request: NextRequest) {
trigger: workflowExecutionLogs.trigger,
})
.from(workflowExecutionLogs)
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId)
)
)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)
.where(
and(
eq(workflowExecutionLogs.workspaceId, params.workspaceId),
isNotNull(workflowExecutionLogs.trigger),
sql`${workflowExecutionLogs.trigger} NOT IN ('api', 'manual', 'webhook', 'chat', 'schedule')`
)

View File

@@ -25,8 +25,7 @@ export interface LogFilters {
export function buildLogFilters(filters: LogFilters): SQL<unknown> {
const conditions: SQL<unknown>[] = []
// Required: workspace and permissions check
conditions.push(eq(workflow.workspaceId, filters.workspaceId))
conditions.push(eq(workflowExecutionLogs.workspaceId, filters.workspaceId))
// Cursor-based pagination
if (filters.cursor) {

View File

@@ -105,7 +105,6 @@ export async function GET(request: NextRequest) {
const conditions = buildLogFilters(filters)
const orderBy = getOrderBy(params.order)
// Build and execute query
const baseQuery = db
.select({
id: workflowExecutionLogs.id,
@@ -124,13 +123,7 @@ export async function GET(request: NextRequest) {
workflowDescription: workflow.description,
})
.from(workflowExecutionLogs)
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId)
)
)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
@@ -197,11 +190,8 @@ export async function GET(request: NextRequest) {
return result
})
// Get user's workflow execution limits and usage
const limits = await getUserLimits(userId)
// Create response with limits information
// The rateLimit object from checkRateLimit is for THIS API endpoint's rate limits
const response = createApiResponse(
{
data: formattedLogs,

View File

@@ -409,10 +409,16 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const actorUserId = preprocessResult.actorUserId!
const workflow = preprocessResult.workflowRecord!
if (!workflow.workspaceId) {
logger.error(`[${requestId}] Workflow ${workflowId} has no workspaceId`)
return NextResponse.json({ error: 'Workflow has no associated workspace' }, { status: 500 })
}
const workspaceId = workflow.workspaceId
logger.info(`[${requestId}] Preprocessing passed`, {
workflowId,
actorUserId,
workspaceId: workflow.workspaceId,
workspaceId,
})
if (isAsyncMode) {
@@ -460,7 +466,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
)
const executionContext = {
workspaceId: workflow.workspaceId || '',
workspaceId,
workflowId,
executionId,
}
@@ -478,7 +484,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflow.workspaceId || '',
workspaceId,
variables: {},
})
@@ -507,7 +513,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
requestId,
executionId,
workflowId,
workspaceId: workflow.workspaceId ?? undefined,
workspaceId,
userId: actorUserId,
sessionUserId: isClientSession ? userId : undefined,
workflowUserId: workflow.userId,
@@ -589,7 +595,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
workflow: {
id: workflow.id,
userId: actorUserId,
workspaceId: workflow.workspaceId,
workspaceId,
isDeployed: workflow.isDeployed,
variables: (workflow as any).variables,
},
@@ -775,7 +781,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
requestId,
executionId,
workflowId,
workspaceId: workflow.workspaceId ?? undefined,
workspaceId,
userId: actorUserId,
sessionUserId: isClientSession ? userId : undefined,
workflowUserId: workflow.userId,

View File

@@ -70,7 +70,11 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
const loggingSession = new LoggingSession(id, executionId, triggerType, requestId)
const userId = accessValidation.workflow.userId
const workspaceId = accessValidation.workflow.workspaceId || ''
const workspaceId = accessValidation.workflow.workspaceId
if (!workspaceId) {
logger.error(`[${requestId}] Workflow ${id} has no workspaceId`)
return createErrorResponse('Workflow has no associated workspace', 500)
}
await loggingSession.safeStart({
userId,

View File

@@ -209,11 +209,16 @@ async function runWorkflowExecution({
const mergedStates = mergeSubblockState(blocks)
const workspaceId = workflowRecord.workspaceId
if (!workspaceId) {
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
}
const personalEnvUserId = workflowRecord.userId
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
personalEnvUserId,
workflowRecord.workspaceId || undefined
workspaceId
)
const variables = EnvVarsSchema.parse({
@@ -232,7 +237,7 @@ async function runWorkflowExecution({
await loggingSession.safeStart({
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
workspaceId,
variables: variables || {},
deploymentVersionId,
})
@@ -241,7 +246,7 @@ async function runWorkflowExecution({
requestId,
executionId,
workflowId: payload.workflowId,
workspaceId: workflowRecord.workspaceId || '',
workspaceId,
userId: actorUserId,
sessionUserId: undefined,
workflowUserId: workflowRecord.userId,

View File

@@ -164,7 +164,10 @@ async function executeWebhookJobInternal(
.from(workflowTable)
.where(eq(workflowTable.id, payload.workflowId))
.limit(1)
const workspaceId = wfRows[0]?.workspaceId || undefined
const workspaceId = wfRows[0]?.workspaceId
if (!workspaceId) {
throw new Error(`Workflow ${payload.workflowId} has no associated workspace`)
}
const workflowVariables = (wfRows[0]?.variables as Record<string, any>) || {}
// Merge subblock states (matching workflow-execution pattern)
@@ -298,7 +301,7 @@ async function executeWebhookJobInternal(
// Start logging session so the complete call has a log entry to update
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: workspaceId || '',
workspaceId,
variables: {},
triggerData: {
isTest: payload.testMode === true,
@@ -356,7 +359,7 @@ async function executeWebhookJobInternal(
// Start logging session so the complete call has a log entry to update
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: workspaceId || '',
workspaceId,
variables: {},
triggerData: {
isTest: payload.testMode === true,
@@ -398,7 +401,7 @@ async function executeWebhookJobInternal(
if (triggerConfig.outputs) {
logger.debug(`[${requestId}] Processing trigger ${resolvedTriggerId} file outputs`)
const processedInput = await processTriggerFileOutputs(input, triggerConfig.outputs, {
workspaceId: workspaceId || '',
workspaceId,
workflowId: payload.workflowId,
executionId,
requestId,
@@ -431,7 +434,7 @@ async function executeWebhookJobInternal(
if (fileFields.length > 0 && typeof input === 'object' && input !== null) {
const executionContext = {
workspaceId: workspaceId || '',
workspaceId,
workflowId: payload.workflowId,
executionId,
}
@@ -542,9 +545,23 @@ async function executeWebhookJobInternal(
})
try {
const wfRow = await db
.select({ workspaceId: workflowTable.workspaceId })
.from(workflowTable)
.where(eq(workflowTable.id, payload.workflowId))
.limit(1)
const errorWorkspaceId = wfRow[0]?.workspaceId
if (!errorWorkspaceId) {
logger.warn(
`[${requestId}] Cannot log error: workflow ${payload.workflowId} has no workspace`
)
throw error
}
await loggingSession.safeStart({
userId: payload.userId,
workspaceId: '', // May not be available for early errors
workspaceId: errorWorkspaceId,
variables: {},
triggerData: {
isTest: payload.testMode === true,

View File

@@ -59,7 +59,10 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
}
const actorUserId = preprocessResult.actorUserId!
const workspaceId = preprocessResult.workflowRecord?.workspaceId || undefined
const workspaceId = preprocessResult.workflowRecord?.workspaceId
if (!workspaceId) {
throw new Error(`Workflow ${workflowId} has no associated workspace`)
}
logger.info(`[${requestId}] Preprocessing passed. Using actor: ${actorUserId}`)

View File

@@ -94,12 +94,19 @@ export function serializePauseSnapshot(
dagIncomingEdges,
}
const workspaceId = metadataFromContext?.workspaceId ?? context.workspaceId
if (!workspaceId) {
throw new Error(
`Cannot serialize pause snapshot: missing workspaceId for workflow ${context.workflowId}`
)
}
const executionMetadata: ExecutionMetadata = {
requestId:
metadataFromContext?.requestId ?? context.executionId ?? context.workflowId ?? 'unknown',
executionId: context.executionId ?? 'unknown',
workflowId: context.workflowId,
workspaceId: context.workspaceId,
workspaceId,
userId: metadataFromContext?.userId ?? '',
sessionUserId: metadataFromContext?.sessionUserId,
workflowUserId: metadataFromContext?.workflowUserId,

View File

@@ -5,7 +5,7 @@ export interface ExecutionMetadata {
requestId: string
executionId: string
workflowId: string
workspaceId?: string
workspaceId: string
userId: string
sessionUserId?: string
workflowUserId?: string

View File

@@ -516,6 +516,15 @@ async function logPreprocessingError(params: {
loggingSession,
} = params
if (!workspaceId) {
logger.warn(`[${requestId}] Cannot log preprocessing error: no workspaceId available`, {
workflowId,
executionId,
errorMessage,
})
return
}
try {
const session =
loggingSession || new LoggingSession(workflowId, executionId, triggerType as any, requestId)

View File

@@ -109,6 +109,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
async startWorkflowExecution(params: {
workflowId: string
workspaceId: string
executionId: string
trigger: ExecutionTrigger
environment: ExecutionEnvironment
@@ -118,8 +119,15 @@ export class ExecutionLogger implements IExecutionLoggerService {
workflowLog: WorkflowExecutionLog
snapshot: WorkflowExecutionSnapshot
}> {
const { workflowId, executionId, trigger, environment, workflowState, deploymentVersionId } =
params
const {
workflowId,
workspaceId,
executionId,
trigger,
environment,
workflowState,
deploymentVersionId,
} = params
logger.debug(`Starting workflow execution ${executionId} for workflow ${workflowId}`)
@@ -168,6 +176,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
.values({
id: uuidv4(),
workflowId,
workspaceId,
executionId,
stateSnapshotId: snapshotResult.snapshot.id,
deploymentVersionId: deploymentVersionId ?? null,

View File

@@ -18,7 +18,7 @@ const logger = createLogger('LoggingSession')
export interface SessionStartParams {
userId?: string
workspaceId?: string
workspaceId: string
variables?: Record<string, string>
triggerData?: Record<string, unknown>
skipLogCreation?: boolean // For resume executions - reuse existing log entry
@@ -65,7 +65,7 @@ export class LoggingSession {
this.requestId = requestId
}
async start(params: SessionStartParams = {}): Promise<void> {
async start(params: SessionStartParams): Promise<void> {
const { userId, workspaceId, variables, triggerData, skipLogCreation, deploymentVersionId } =
params
@@ -84,6 +84,7 @@ export class LoggingSession {
if (!skipLogCreation) {
await executionLogger.startWorkflowExecution({
workflowId: this.workflowId,
workspaceId,
executionId: this.executionId,
trigger: this.trigger,
environment: this.environment,
@@ -115,7 +116,6 @@ export class LoggingSession {
* Note: Logging now works through trace spans only, no direct executor integration needed
*/
setupExecutor(executor: any): void {
// No longer setting logger on executor - trace spans handle everything
if (this.requestId) {
logger.debug(`[${this.requestId}] Logging session ready for execution ${this.executionId}`)
}
@@ -272,7 +272,7 @@ export class LoggingSession {
}
}
async safeStart(params: SessionStartParams = {}): Promise<boolean> {
async safeStart(params: SessionStartParams): Promise<boolean> {
try {
await this.start(params)
return true
@@ -305,6 +305,7 @@ export class LoggingSession {
await executionLogger.startWorkflowExecution({
workflowId: this.workflowId,
workspaceId,
executionId: this.executionId,
trigger: this.trigger,
environment: this.environment,

View File

@@ -334,6 +334,7 @@ export interface SnapshotCreationResult {
export interface ExecutionLoggerService {
startWorkflowExecution(params: {
workflowId: string
workspaceId: string
executionId: string
trigger: ExecutionTrigger
environment: ExecutionEnvironment

View File

@@ -103,6 +103,9 @@ export async function executeWorkflowCore(
const { onBlockStart, onBlockComplete, onStream, onExecutorCreated } = callbacks
const providedWorkspaceId = metadata.workspaceId
if (!providedWorkspaceId) {
throw new Error(`Execution metadata missing workspaceId for workflow ${workflowId}`)
}
let processedInput = input || {}

View File

@@ -0,0 +1,26 @@
-- Step 1: Add column as NULLABLE first (instant, no lock)
ALTER TABLE "workflow_execution_logs" ADD COLUMN "workspace_id" text;--> statement-breakpoint
-- Step 2: Backfill workspace_id from workflow table
UPDATE "workflow_execution_logs" wel
SET "workspace_id" = w."workspace_id"
FROM "workflow" w
WHERE wel."workflow_id" = w."id"
AND w."workspace_id" IS NOT NULL;--> statement-breakpoint
-- Step 3: Delete orphaned execution logs (from workflows without workspaces)
DELETE FROM "workflow_execution_logs"
WHERE "workspace_id" IS NULL;--> statement-breakpoint
-- Step 4: Add NOT NULL constraint (safe now - all remaining rows have values)
ALTER TABLE "workflow_execution_logs" ALTER COLUMN "workspace_id" SET NOT NULL;--> statement-breakpoint
-- Step 5: Add foreign key constraint
ALTER TABLE "workflow_execution_logs" ADD CONSTRAINT "workflow_execution_logs_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
-- Step 6: Create indexes
CREATE INDEX "workflow_execution_logs_workspace_started_at_idx" ON "workflow_execution_logs" USING btree ("workspace_id","started_at");--> statement-breakpoint
CREATE INDEX "api_key_workspace_type_idx" ON "api_key" USING btree ("workspace_id","type");--> statement-breakpoint
CREATE INDEX "api_key_user_type_idx" ON "api_key" USING btree ("user_id","type");--> statement-breakpoint
CREATE INDEX "verification_expires_at_idx" ON "verification" USING btree ("expires_at");--> statement-breakpoint
CREATE INDEX "workflow_blocks_type_idx" ON "workflow_blocks" USING btree ("type");

File diff suppressed because it is too large Load Diff

View File

@@ -883,6 +883,13 @@
"when": 1766203036010,
"tag": "0126_dapper_midnight",
"breakpoints": true
},
{
"idx": 127,
"version": "7",
"when": 1766209394504,
"tag": "0127_flimsy_sister_grimm",
"breakpoints": true
}
]
}

View File

@@ -292,6 +292,9 @@ export const workflowExecutionLogs = pgTable(
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
workspaceId: text('workspace_id')
.notNull()
.references(() => workspace.id, { onDelete: 'cascade' }),
executionId: text('execution_id').notNull(),
stateSnapshotId: text('state_snapshot_id')
.notNull()
@@ -327,11 +330,14 @@ export const workflowExecutionLogs = pgTable(
executionIdUnique: uniqueIndex('workflow_execution_logs_execution_id_unique').on(
table.executionId
),
// Composite index for the new join-based query pattern
workflowStartedAtIdx: index('workflow_execution_logs_workflow_started_at_idx').on(
table.workflowId,
table.startedAt
),
workspaceStartedAtIdx: index('workflow_execution_logs_workspace_started_at_idx').on(
table.workspaceId,
table.startedAt
),
})
)