improvement(performance): remove writes to workflow updated_at on position updates for blocks, edges, & subflows (#1531)

* improvement(performance): remove writes to workflow updated_at on position updates for blocks, edges, & subflows

* update query pattern for logs routes
This commit is contained in:
Waleed
2025-10-02 11:53:50 -07:00
committed by GitHub
parent ace83ebcae
commit 15138629cb
3 changed files with 35 additions and 24 deletions

View File

@@ -94,10 +94,18 @@ export async function GET(request: NextRequest) {
workflowUpdatedAt: workflow.updatedAt,
}
// Optimized query: Start by filtering workflows in the workspace with user permissions
// This ensures we scan only relevant logs instead of the entire table
const baseQuery = db
.select(selectColumns)
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId) // Filter workspace during join!
)
)
.innerJoin(
permissions,
and(
@@ -107,8 +115,8 @@ export async function GET(request: NextRequest) {
)
)
// Build conditions for the joined query
let conditions: SQL | undefined = eq(workflow.workspaceId, params.workspaceId)
// Build additional conditions for the query
let conditions: SQL | undefined
// Filter by level
if (params.level && params.level !== 'all') {
@@ -176,11 +184,17 @@ export async function GET(request: NextRequest) {
.limit(params.limit)
.offset(params.offset)
// Get total count for pagination using the same join structure
// Get total count for pagination using the same optimized join structure
const countQuery = db
.select({ count: sql<number>`count(*)` })
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId) // Same optimization
)
)
.innerJoin(
permissions,
and(

View File

@@ -106,7 +106,7 @@ export async function GET(request: NextRequest) {
const conditions = buildLogFilters(filters)
const orderBy = getOrderBy(params.order)
// Build and execute query
// Build and execute query - optimized to filter workspace during join
const baseQuery = db
.select({
id: workflowExecutionLogs.id,
@@ -124,7 +124,13 @@ export async function GET(request: NextRequest) {
workflowDescription: workflow.description,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
workflow,
and(
eq(workflowExecutionLogs.workflowId, workflow.id),
eq(workflow.workspaceId, params.workspaceId) // Filter workspace during join!
)
)
.innerJoin(
permissions,
and(

View File

@@ -168,24 +168,8 @@ export async function persistWorkflowOperation(workflowId: string, operation: an
try {
const { operation: op, target, payload, timestamp, userId } = operation
// Log high-frequency operations for monitoring
if (op === 'update-position' && Math.random() < 0.01) {
// Log 1% of position updates
logger.debug('Socket DB operation sample:', {
operation: op,
target,
workflowId: `${workflowId.substring(0, 8)}...`,
})
}
await db.transaction(async (tx) => {
// Update the workflow's last modified timestamp first
await tx
.update(workflow)
.set({ updatedAt: new Date(timestamp) })
.where(eq(workflow.id, workflowId))
// Handle different operation types within the transaction
// Handle different operation types within the transaction first
switch (target) {
case 'block':
await handleBlockOperationTx(tx, workflowId, op, payload, userId)
@@ -202,6 +186,13 @@ export async function persistWorkflowOperation(workflowId: string, operation: an
default:
throw new Error(`Unknown operation target: ${target}`)
}
if (op !== 'update-position') {
await tx
.update(workflow)
.set({ updatedAt: new Date(timestamp) })
.where(eq(workflow.id, workflowId))
}
})
// Log slow operations for monitoring