fix(logs-page): optimize logs retrieval queries, consolidate useEffects to prevent dup calls (#845)

* fix(logs-page): optimize loading times by improving query, removing unused index, adding new index

* add migration files

* remove fake min loading time
This commit is contained in:
Vikhyath Mondreti
2025-08-01 16:26:13 -07:00
committed by GitHub
parent 3bd7a6c402
commit 545ec791df
6 changed files with 5796 additions and 153 deletions

View File

@@ -75,16 +75,40 @@ export async function GET(request: NextRequest) {
const { searchParams } = new URL(request.url)
const params = QueryParamsSchema.parse(Object.fromEntries(searchParams.entries()))
const workflowConditions = and(
eq(workflow.workspaceId, params.workspaceId),
eq(permissions.userId, userId),
eq(permissions.entityType, 'workspace')
)
const userWorkflows = await db
.select({ id: workflow.id, folderId: workflow.folderId })
.from(workflow)
.leftJoin(
const baseQuery = db
.select({
id: workflowExecutionLogs.id,
workflowId: workflowExecutionLogs.workflowId,
executionId: workflowExecutionLogs.executionId,
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
level: workflowExecutionLogs.level,
message: workflowExecutionLogs.message,
trigger: workflowExecutionLogs.trigger,
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
blockCount: workflowExecutionLogs.blockCount,
successCount: workflowExecutionLogs.successCount,
errorCount: workflowExecutionLogs.errorCount,
skippedCount: workflowExecutionLogs.skippedCount,
totalCost: workflowExecutionLogs.totalCost,
totalInputCost: workflowExecutionLogs.totalInputCost,
totalOutputCost: workflowExecutionLogs.totalOutputCost,
totalTokens: workflowExecutionLogs.totalTokens,
metadata: workflowExecutionLogs.metadata,
createdAt: workflowExecutionLogs.createdAt,
workflowName: workflow.name,
workflowDescription: workflow.description,
workflowColor: workflow.color,
workflowFolderId: workflow.folderId,
workflowUserId: workflow.userId,
workflowWorkspaceId: workflow.workspaceId,
workflowCreatedAt: workflow.createdAt,
workflowUpdatedAt: workflow.updatedAt,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
@@ -92,16 +116,9 @@ export async function GET(request: NextRequest) {
eq(permissions.userId, userId)
)
)
.where(workflowConditions)
const userWorkflowIds = userWorkflows.map((w) => w.id)
if (userWorkflowIds.length === 0) {
return NextResponse.json({ data: [], total: 0 }, { status: 200 })
}
// Build conditions for logs
let conditions: SQL | undefined = inArray(workflowExecutionLogs.workflowId, userWorkflowIds)
// Build conditions for the joined query
let conditions: SQL | undefined = eq(workflow.workspaceId, params.workspaceId)
// Filter by level
if (params.level && params.level !== 'all') {
@@ -111,27 +128,16 @@ export async function GET(request: NextRequest) {
// Filter by specific workflow IDs
if (params.workflowIds) {
const workflowIds = params.workflowIds.split(',').filter(Boolean)
const filteredWorkflowIds = workflowIds.filter((id) => userWorkflowIds.includes(id))
if (filteredWorkflowIds.length > 0) {
conditions = and(
conditions,
inArray(workflowExecutionLogs.workflowId, filteredWorkflowIds)
)
if (workflowIds.length > 0) {
conditions = and(conditions, inArray(workflow.id, workflowIds))
}
}
// Filter by folder IDs
if (params.folderIds) {
const folderIds = params.folderIds.split(',').filter(Boolean)
const workflowsInFolders = userWorkflows
.filter((w) => w.folderId && folderIds.includes(w.folderId))
.map((w) => w.id)
if (workflowsInFolders.length > 0) {
conditions = and(
conditions,
inArray(workflowExecutionLogs.workflowId, workflowsInFolders)
)
if (folderIds.length > 0) {
conditions = and(conditions, inArray(workflow.folderId, folderIds))
}
}
@@ -166,21 +172,30 @@ export async function GET(request: NextRequest) {
)
}
// Execute the query
const logs = await db
.select()
.from(workflowExecutionLogs)
// Execute the query using the optimized join
const logs = await baseQuery
.where(conditions)
.orderBy(desc(workflowExecutionLogs.startedAt))
.limit(params.limit)
.offset(params.offset)
// Get total count for pagination
const countResult = await db
// Get total count for pagination using the same join structure
const countQuery = db
.select({ count: sql<number>`count(*)` })
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(conditions)
const countResult = await countQuery
const count = countResult[0]?.count || 0
// Block executions are now extracted from trace spans instead of separate table
@@ -271,7 +286,7 @@ export async function GET(request: NextRequest) {
}
}
// Transform to clean log format
// Transform to clean log format with workflow data included
const enhancedLogs = logs.map((log) => {
const blockExecutions = blockExecutionsByExecution[log.executionId] || []
@@ -298,6 +313,19 @@ export async function GET(request: NextRequest) {
models: (log.metadata as any)?.models || {},
}
// Build workflow object from joined data
const workflow = {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
color: log.workflowColor,
folderId: log.workflowFolderId,
userId: log.workflowUserId,
workspaceId: log.workflowWorkspaceId,
createdAt: log.workflowCreatedAt,
updatedAt: log.workflowUpdatedAt,
}
return {
id: log.id,
workflowId: log.workflowId,
@@ -307,6 +335,7 @@ export async function GET(request: NextRequest) {
duration: log.totalDurationMs ? `${log.totalDurationMs}ms` : null,
trigger: log.trigger,
createdAt: log.startedAt.toISOString(),
workflow: params.includeWorkflow ? workflow : undefined,
metadata: {
totalDuration: log.totalDurationMs,
cost: costSummary,
@@ -323,30 +352,6 @@ export async function GET(request: NextRequest) {
}
})
if (params.includeWorkflow) {
const workflowIds = [...new Set(logs.map((log) => log.workflowId))]
const workflowConditions = inArray(workflow.id, workflowIds)
const workflowData = await db.select().from(workflow).where(workflowConditions)
const workflowMap = new Map(workflowData.map((w) => [w.id, w]))
const logsWithWorkflow = enhancedLogs.map((log) => ({
...log,
workflow: workflowMap.get(log.workflowId) || null,
}))
return NextResponse.json(
{
data: logsWithWorkflow,
total: Number(count),
page: Math.floor(params.offset / params.limit) + 1,
pageSize: params.limit,
totalPages: Math.ceil(Number(count) / params.limit),
},
{ status: 200 }
)
}
// Include block execution data if requested
if (params.includeBlocks) {
// Block executions are now extracted from stored trace spans in metadata

View File

@@ -67,7 +67,6 @@ export default function Logs() {
setHasMore,
isFetchingMore,
setIsFetchingMore,
buildQueryParams,
initializeFromURL,
timeRange,
level,
@@ -81,7 +80,7 @@ export default function Logs() {
// Set workspace ID in store when component mounts or workspaceId changes
useEffect(() => {
setWorkspaceId(workspaceId)
}, [workspaceId, setWorkspaceId])
}, [workspaceId])
const [selectedLog, setSelectedLog] = useState<WorkflowLog | null>(null)
const [selectedLogIndex, setSelectedLogIndex] = useState<number>(-1)
@@ -107,10 +106,10 @@ export default function Logs() {
// Update store when debounced search query changes
useEffect(() => {
if (debouncedSearchQuery !== storeSearchQuery) {
if (isInitialized.current && debouncedSearchQuery !== storeSearchQuery) {
setStoreSearchQuery(debouncedSearchQuery)
}
}, [debouncedSearchQuery, storeSearchQuery, setStoreSearchQuery])
}, [debouncedSearchQuery, storeSearchQuery])
const handleLogClick = (log: WorkflowLog) => {
setSelectedLog(log)
@@ -119,21 +118,21 @@ export default function Logs() {
setIsSidebarOpen(true)
}
const handleNavigateNext = () => {
const handleNavigateNext = useCallback(() => {
if (selectedLogIndex < logs.length - 1) {
const nextIndex = selectedLogIndex + 1
setSelectedLogIndex(nextIndex)
setSelectedLog(logs[nextIndex])
}
}
}, [selectedLogIndex, logs])
const handleNavigatePrev = () => {
const handleNavigatePrev = useCallback(() => {
if (selectedLogIndex > 0) {
const prevIndex = selectedLogIndex - 1
setSelectedLogIndex(prevIndex)
setSelectedLog(logs[prevIndex])
}
}
}, [selectedLogIndex, logs])
const handleCloseSidebar = () => {
setIsSidebarOpen(false)
@@ -150,56 +149,51 @@ export default function Logs() {
}
}, [selectedLogIndex])
const fetchLogs = useCallback(
async (pageNum: number, append = false) => {
try {
if (pageNum === 1) {
setLoading(true)
} else {
setIsFetchingMore(true)
}
const queryParams = buildQueryParams(pageNum, LOGS_PER_PAGE)
const response = await fetch(`/api/logs?${queryParams}`)
if (!response.ok) {
throw new Error(`Error fetching logs: ${response.statusText}`)
}
const data: LogsResponse = await response.json()
setHasMore(data.data.length === LOGS_PER_PAGE && data.page < data.totalPages)
setLogs(data.data, append)
setError(null)
} catch (err) {
logger.error('Failed to fetch logs:', { err })
setError(err instanceof Error ? err.message : 'An unknown error occurred')
} finally {
if (pageNum === 1) {
setLoading(false)
} else {
setIsFetchingMore(false)
}
const fetchLogs = useCallback(async (pageNum: number, append = false) => {
try {
if (pageNum === 1) {
setLoading(true)
} else {
setIsFetchingMore(true)
}
},
[setLogs, setLoading, setError, setHasMore, setIsFetchingMore, buildQueryParams]
)
// Get fresh query params by calling buildQueryParams from store
const { buildQueryParams: getCurrentQueryParams } = useFilterStore.getState()
const queryParams = getCurrentQueryParams(pageNum, LOGS_PER_PAGE)
const response = await fetch(`/api/logs?${queryParams}`)
if (!response.ok) {
throw new Error(`Error fetching logs: ${response.statusText}`)
}
const data: LogsResponse = await response.json()
setHasMore(data.data.length === LOGS_PER_PAGE && data.page < data.totalPages)
setLogs(data.data, append)
setError(null)
} catch (err) {
logger.error('Failed to fetch logs:', { err })
setError(err instanceof Error ? err.message : 'An unknown error occurred')
} finally {
if (pageNum === 1) {
setLoading(false)
} else {
setIsFetchingMore(false)
}
}
}, [])
const handleRefresh = async () => {
if (isRefreshing) return
setIsRefreshing(true)
const minLoadingTime = new Promise((resolve) => setTimeout(resolve, 1000))
try {
const logsResponse = await fetchLogs(1)
await minLoadingTime
await fetchLogs(1)
setError(null)
} catch (err) {
await minLoadingTime
setError(err instanceof Error ? err.message : 'An unknown error occurred')
} finally {
setIsRefreshing(false)
@@ -250,30 +244,57 @@ export default function Logs() {
return () => window.removeEventListener('popstate', handlePopState)
}, [initializeFromURL])
// Single useEffect to handle both initial load and filter changes
useEffect(() => {
// Only fetch logs after initialization
if (isInitialized.current) {
fetchLogs(1)
}
}, [fetchLogs])
// Refetch when filters change (but not on initial load)
useEffect(() => {
// Only fetch when initialized and filters change
if (!isInitialized.current) {
return
}
// Reset pagination and fetch from beginning when filters change
// Reset pagination and fetch from beginning
setPage(1)
setHasMore(true)
// Fetch logs with new filters
const fetchWithNewFilters = async () => {
// Inline fetch logic to avoid circular dependency
const fetchWithFilters = async () => {
try {
setLoading(true)
const queryParams = buildQueryParams(1, LOGS_PER_PAGE)
const response = await fetch(`/api/logs?${queryParams}`)
// Build query params inline to avoid dependency issues
const params = new URLSearchParams()
params.set('includeWorkflow', 'true')
params.set('limit', LOGS_PER_PAGE.toString())
params.set('offset', '0') // Always start from page 1
params.set('workspaceId', workspaceId)
// Add filters
if (level !== 'all') params.set('level', level)
if (triggers.length > 0) params.set('triggers', triggers.join(','))
if (workflowIds.length > 0) params.set('workflowIds', workflowIds.join(','))
if (folderIds.length > 0) params.set('folderIds', folderIds.join(','))
if (searchQuery.trim()) params.set('search', searchQuery.trim())
// Add time range filter
if (timeRange !== 'All time') {
const now = new Date()
let startDate: Date
switch (timeRange) {
case 'Past 30 minutes':
startDate = new Date(now.getTime() - 30 * 60 * 1000)
break
case 'Past hour':
startDate = new Date(now.getTime() - 60 * 60 * 1000)
break
case 'Past 24 hours':
startDate = new Date(now.getTime() - 24 * 60 * 60 * 1000)
break
default:
startDate = new Date(0)
}
params.set('startDate', startDate.toISOString())
}
const response = await fetch(`/api/logs?${params.toString()}`)
if (!response.ok) {
throw new Error(`Error fetching logs: ${response.statusText}`)
@@ -291,21 +312,8 @@ export default function Logs() {
}
}
fetchWithNewFilters()
}, [
timeRange,
level,
workflowIds,
folderIds,
searchQuery,
triggers,
setPage,
setHasMore,
setLoading,
setLogs,
setError,
buildQueryParams,
])
fetchWithFilters()
}, [workspaceId, timeRange, level, workflowIds, folderIds, searchQuery, triggers])
const loadMoreLogs = useCallback(() => {
if (!isFetchingMore && hasMore) {
@@ -316,7 +324,7 @@ export default function Logs() {
fetchLogs(nextPage, true)
}, 50)
}
}, [fetchLogs, isFetchingMore, hasMore, page, setPage, setIsFetchingMore])
}, [isFetchingMore, hasMore, page])
useEffect(() => {
if (loading || !hasMore) return
@@ -398,15 +406,7 @@ export default function Logs() {
window.addEventListener('keydown', handleKeyDown)
return () => window.removeEventListener('keydown', handleKeyDown)
}, [
logs,
selectedLogIndex,
isSidebarOpen,
selectedLog,
handleNavigateNext,
handleNavigatePrev,
setIsSidebarOpen,
])
}, [logs, selectedLogIndex, isSidebarOpen, selectedLog, handleNavigateNext, handleNavigatePrev])
return (
<div className='flex h-[100vh] min-w-0 flex-col pl-64'>

View File

@@ -0,0 +1,3 @@
DROP INDEX "workflow_execution_logs_cost_idx";--> statement-breakpoint
DROP INDEX "workflow_execution_logs_duration_idx";--> statement-breakpoint
CREATE INDEX "workflow_execution_logs_workflow_started_at_idx" ON "workflow_execution_logs" USING btree ("workflow_id","started_at");

File diff suppressed because it is too large Load Diff

View File

@@ -442,6 +442,13 @@
"when": 1753558819517,
"tag": "0063_lame_sandman",
"breakpoints": true
},
{
"idx": 64,
"version": "7",
"when": 1754088313157,
"tag": "0064_elite_hedge_knight",
"breakpoints": true
}
]
}

View File

@@ -307,11 +307,14 @@ export const workflowExecutionLogs = pgTable(
triggerIdx: index('workflow_execution_logs_trigger_idx').on(table.trigger),
levelIdx: index('workflow_execution_logs_level_idx').on(table.level),
startedAtIdx: index('workflow_execution_logs_started_at_idx').on(table.startedAt),
costIdx: index('workflow_execution_logs_cost_idx').on(table.totalCost),
durationIdx: index('workflow_execution_logs_duration_idx').on(table.totalDurationMs),
executionIdUnique: uniqueIndex('workflow_execution_logs_execution_id_unique').on(
table.executionId
),
// Composite index for the new join-based query pattern
workflowStartedAtIdx: index('workflow_execution_logs_workflow_started_at_idx').on(
table.workflowId,
table.startedAt
),
})
)