mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-31 01:37:58 -05:00
fix(terminal): start precision (#3078)
* fix(executor): use performance.now() for precise block timing Replace Date.now() with performance.now() for timing measurements in the executor to provide sub-millisecond precision. This fixes timing discrepancies with fast-executing blocks like the start block where millisecond precision was insufficient. Changes: - block-executor.ts: Use performance.now() for block execution timing - engine.ts: Use performance.now() for overall execution timing Co-authored-by: emir <emir@simstudio.ai> * format ms as whole nums,round secs to 2 decimal places and compute all started/ended times on server and passback to clinet --------- Co-authored-by: Cursor Agent <cursoragent@cursor.com> Co-authored-by: waleed <walif6@gmail.com>
This commit is contained in:
@@ -616,6 +616,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
input: callbackData.input,
|
||||
error: callbackData.output.error,
|
||||
durationMs: callbackData.executionTime || 0,
|
||||
startedAt: callbackData.startedAt,
|
||||
endedAt: callbackData.endedAt,
|
||||
...(iterationContext && {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationTotal: iterationContext.iterationTotal,
|
||||
@@ -641,6 +643,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
input: callbackData.input,
|
||||
output: callbackData.output,
|
||||
durationMs: callbackData.executionTime || 0,
|
||||
startedAt: callbackData.startedAt,
|
||||
endedAt: callbackData.endedAt,
|
||||
...(iterationContext && {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationTotal: iterationContext.iterationTotal,
|
||||
|
||||
@@ -58,7 +58,9 @@ export function getBlockColor(blockType: string): string {
|
||||
*/
|
||||
export function formatDuration(ms?: number): string {
|
||||
if (ms === undefined || ms === null) return '-'
|
||||
if (ms < 1000) return `${ms}ms`
|
||||
if (ms < 1000) {
|
||||
return `${Math.round(ms)}ms`
|
||||
}
|
||||
return `${(ms / 1000).toFixed(2)}s`
|
||||
}
|
||||
|
||||
|
||||
@@ -964,8 +964,8 @@ export function useWorkflowExecution() {
|
||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
||||
if (isContainerBlock) return
|
||||
|
||||
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
|
||||
const endedAt = new Date().toISOString()
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
@@ -1013,8 +1013,8 @@ export function useWorkflowExecution() {
|
||||
// Track failed block execution in run path
|
||||
setBlockRunStatus(data.blockId, 'error')
|
||||
|
||||
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
|
||||
const endedAt = new Date().toISOString()
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
// Accumulate block error log for the execution result
|
||||
accumulatedBlockLogs.push({
|
||||
@@ -1603,8 +1603,8 @@ export function useWorkflowExecution() {
|
||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
||||
if (isContainerBlock) return
|
||||
|
||||
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
|
||||
const endedAt = new Date().toISOString()
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
@@ -1642,8 +1642,8 @@ export function useWorkflowExecution() {
|
||||
|
||||
setBlockRunStatus(data.blockId, 'error')
|
||||
|
||||
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
|
||||
const endedAt = new Date().toISOString()
|
||||
const startedAt = data.startedAt
|
||||
const endedAt = data.endedAt
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
|
||||
@@ -71,7 +71,7 @@ export class BlockExecutor {
|
||||
this.callOnBlockStart(ctx, node, block)
|
||||
}
|
||||
|
||||
const startTime = Date.now()
|
||||
const startTime = performance.now()
|
||||
let resolvedInputs: Record<string, any> = {}
|
||||
|
||||
const nodeMetadata = this.buildNodeMetadata(node)
|
||||
@@ -145,7 +145,7 @@ export class BlockExecutor {
|
||||
})) as NormalizedBlockOutput
|
||||
}
|
||||
|
||||
const duration = Date.now() - startTime
|
||||
const duration = performance.now() - startTime
|
||||
|
||||
if (blockLog) {
|
||||
blockLog.endedAt = new Date().toISOString()
|
||||
@@ -169,7 +169,9 @@ export class BlockExecutor {
|
||||
block,
|
||||
this.sanitizeInputsForLog(resolvedInputs),
|
||||
displayOutput,
|
||||
duration
|
||||
duration,
|
||||
blockLog!.startedAt,
|
||||
blockLog!.endedAt
|
||||
)
|
||||
}
|
||||
|
||||
@@ -221,7 +223,7 @@ export class BlockExecutor {
|
||||
isSentinel: boolean,
|
||||
phase: 'input_resolution' | 'execution'
|
||||
): NormalizedBlockOutput {
|
||||
const duration = Date.now() - startTime
|
||||
const duration = performance.now() - startTime
|
||||
const errorMessage = normalizeError(error)
|
||||
const hasResolvedInputs =
|
||||
resolvedInputs && typeof resolvedInputs === 'object' && Object.keys(resolvedInputs).length > 0
|
||||
@@ -274,7 +276,9 @@ export class BlockExecutor {
|
||||
block,
|
||||
this.sanitizeInputsForLog(input),
|
||||
displayOutput,
|
||||
duration
|
||||
duration,
|
||||
blockLog!.startedAt,
|
||||
blockLog!.endedAt
|
||||
)
|
||||
}
|
||||
|
||||
@@ -423,7 +427,9 @@ export class BlockExecutor {
|
||||
block: SerializedBlock,
|
||||
input: Record<string, any>,
|
||||
output: NormalizedBlockOutput,
|
||||
duration: number
|
||||
duration: number,
|
||||
startedAt: string,
|
||||
endedAt: string
|
||||
): void {
|
||||
const blockId = node.id
|
||||
const blockName = block.metadata?.name ?? blockId
|
||||
@@ -440,6 +446,8 @@ export class BlockExecutor {
|
||||
input,
|
||||
output,
|
||||
executionTime: duration,
|
||||
startedAt,
|
||||
endedAt,
|
||||
},
|
||||
iterationContext
|
||||
)
|
||||
|
||||
@@ -101,7 +101,7 @@ export class ExecutionEngine {
|
||||
}
|
||||
|
||||
async run(triggerBlockId?: string): Promise<ExecutionResult> {
|
||||
const startTime = Date.now()
|
||||
const startTime = performance.now()
|
||||
try {
|
||||
this.initializeQueue(triggerBlockId)
|
||||
|
||||
@@ -125,8 +125,8 @@ export class ExecutionEngine {
|
||||
return this.buildPausedResult(startTime)
|
||||
}
|
||||
|
||||
const endTime = Date.now()
|
||||
this.context.metadata.endTime = new Date(endTime).toISOString()
|
||||
const endTime = performance.now()
|
||||
this.context.metadata.endTime = new Date().toISOString()
|
||||
this.context.metadata.duration = endTime - startTime
|
||||
|
||||
if (this.cancelledFlag) {
|
||||
@@ -146,8 +146,8 @@ export class ExecutionEngine {
|
||||
metadata: this.context.metadata,
|
||||
}
|
||||
} catch (error) {
|
||||
const endTime = Date.now()
|
||||
this.context.metadata.endTime = new Date(endTime).toISOString()
|
||||
const endTime = performance.now()
|
||||
this.context.metadata.endTime = new Date().toISOString()
|
||||
this.context.metadata.duration = endTime - startTime
|
||||
|
||||
if (this.cancelledFlag) {
|
||||
@@ -433,8 +433,8 @@ export class ExecutionEngine {
|
||||
}
|
||||
|
||||
private buildPausedResult(startTime: number): ExecutionResult {
|
||||
const endTime = Date.now()
|
||||
this.context.metadata.endTime = new Date(endTime).toISOString()
|
||||
const endTime = performance.now()
|
||||
this.context.metadata.endTime = new Date().toISOString()
|
||||
this.context.metadata.duration = endTime - startTime
|
||||
this.context.metadata.status = 'paused'
|
||||
|
||||
|
||||
@@ -103,7 +103,13 @@ export interface ContextExtensions {
|
||||
blockId: string,
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
|
||||
output: {
|
||||
input?: any
|
||||
output: NormalizedBlockOutput
|
||||
executionTime: number
|
||||
startedAt: string
|
||||
endedAt: string
|
||||
},
|
||||
iterationContext?: IterationContext
|
||||
) => Promise<void>
|
||||
|
||||
|
||||
@@ -281,9 +281,12 @@ export class LoopOrchestrator {
|
||||
|
||||
// Emit onBlockComplete for the loop container so the UI can track it
|
||||
if (this.contextExtensions?.onBlockComplete) {
|
||||
const now = new Date().toISOString()
|
||||
this.contextExtensions.onBlockComplete(loopId, 'Loop', 'loop', {
|
||||
output,
|
||||
executionTime: DEFAULTS.EXECUTION_TIME,
|
||||
startedAt: now,
|
||||
endedAt: now,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -265,9 +265,12 @@ export class ParallelOrchestrator {
|
||||
|
||||
// Emit onBlockComplete for the parallel container so the UI can track it
|
||||
if (this.contextExtensions?.onBlockComplete) {
|
||||
const now = new Date().toISOString()
|
||||
this.contextExtensions.onBlockComplete(parallelId, 'Parallel', 'parallel', {
|
||||
output,
|
||||
executionTime: 0,
|
||||
startedAt: now,
|
||||
endedAt: now,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -232,6 +232,8 @@ export function addSubflowErrorLog(
|
||||
input: inputData,
|
||||
output: { error: errorMessage },
|
||||
executionTime: 0,
|
||||
startedAt: now,
|
||||
endedAt: now,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,18 @@
|
||||
import { useCallback, useRef } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events'
|
||||
import type {
|
||||
BlockCompletedData,
|
||||
BlockErrorData,
|
||||
BlockStartedData,
|
||||
ExecutionCancelledData,
|
||||
ExecutionCompletedData,
|
||||
ExecutionErrorData,
|
||||
ExecutionEvent,
|
||||
ExecutionStartedData,
|
||||
StreamChunkData,
|
||||
StreamDoneData,
|
||||
} from '@/lib/workflows/executor/execution-events'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type { SubflowType } from '@/stores/workflows/workflow/types'
|
||||
|
||||
const logger = createLogger('useExecutionStream')
|
||||
|
||||
@@ -81,48 +91,15 @@ async function processSSEStream(
|
||||
}
|
||||
|
||||
export interface ExecutionStreamCallbacks {
|
||||
onExecutionStarted?: (data: { startTime: string }) => void
|
||||
onExecutionCompleted?: (data: {
|
||||
success: boolean
|
||||
output: any
|
||||
duration: number
|
||||
startTime: string
|
||||
endTime: string
|
||||
}) => void
|
||||
onExecutionError?: (data: { error: string; duration: number }) => void
|
||||
onExecutionCancelled?: (data: { duration: number }) => void
|
||||
onBlockStarted?: (data: {
|
||||
blockId: string
|
||||
blockName: string
|
||||
blockType: string
|
||||
iterationCurrent?: number
|
||||
iterationTotal?: number
|
||||
iterationType?: SubflowType
|
||||
}) => void
|
||||
onBlockCompleted?: (data: {
|
||||
blockId: string
|
||||
blockName: string
|
||||
blockType: string
|
||||
input?: any
|
||||
output: any
|
||||
durationMs: number
|
||||
iterationCurrent?: number
|
||||
iterationTotal?: number
|
||||
iterationType?: SubflowType
|
||||
}) => void
|
||||
onBlockError?: (data: {
|
||||
blockId: string
|
||||
blockName: string
|
||||
blockType: string
|
||||
input?: any
|
||||
error: string
|
||||
durationMs: number
|
||||
iterationCurrent?: number
|
||||
iterationTotal?: number
|
||||
iterationType?: SubflowType
|
||||
}) => void
|
||||
onStreamChunk?: (data: { blockId: string; chunk: string }) => void
|
||||
onStreamDone?: (data: { blockId: string }) => void
|
||||
onExecutionStarted?: (data: ExecutionStartedData) => void
|
||||
onExecutionCompleted?: (data: ExecutionCompletedData) => void
|
||||
onExecutionError?: (data: ExecutionErrorData) => void
|
||||
onExecutionCancelled?: (data: ExecutionCancelledData) => void
|
||||
onBlockStarted?: (data: BlockStartedData) => void
|
||||
onBlockCompleted?: (data: BlockCompletedData) => void
|
||||
onBlockError?: (data: BlockErrorData) => void
|
||||
onStreamChunk?: (data: StreamChunkData) => void
|
||||
onStreamDone?: (data: StreamDoneData) => void
|
||||
}
|
||||
|
||||
export interface ExecuteStreamOptions {
|
||||
|
||||
@@ -283,7 +283,13 @@ export async function executeWorkflowCore(
|
||||
blockId: string,
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
output: { input?: unknown; output: NormalizedBlockOutput; executionTime: number },
|
||||
output: {
|
||||
input?: unknown
|
||||
output: NormalizedBlockOutput
|
||||
executionTime: number
|
||||
startedAt: string
|
||||
endedAt: string
|
||||
},
|
||||
iterationContext?: IterationContext
|
||||
) => {
|
||||
await loggingSession.onBlockComplete(blockId, blockName, blockType, output)
|
||||
|
||||
@@ -103,6 +103,8 @@ export interface BlockCompletedEvent extends BaseExecutionEvent {
|
||||
input?: any
|
||||
output: any
|
||||
durationMs: number
|
||||
startedAt: string
|
||||
endedAt: string
|
||||
// Iteration context for loops and parallels
|
||||
iterationCurrent?: number
|
||||
iterationTotal?: number
|
||||
@@ -123,6 +125,8 @@ export interface BlockErrorEvent extends BaseExecutionEvent {
|
||||
input?: any
|
||||
error: string
|
||||
durationMs: number
|
||||
startedAt: string
|
||||
endedAt: string
|
||||
// Iteration context for loops and parallels
|
||||
iterationCurrent?: number
|
||||
iterationTotal?: number
|
||||
@@ -167,6 +171,19 @@ export type ExecutionEvent =
|
||||
| StreamChunkEvent
|
||||
| StreamDoneEvent
|
||||
|
||||
/**
|
||||
* Extracted data types for use in callbacks
|
||||
*/
|
||||
export type ExecutionStartedData = ExecutionStartedEvent['data']
|
||||
export type ExecutionCompletedData = ExecutionCompletedEvent['data']
|
||||
export type ExecutionErrorData = ExecutionErrorEvent['data']
|
||||
export type ExecutionCancelledData = ExecutionCancelledEvent['data']
|
||||
export type BlockStartedData = BlockStartedEvent['data']
|
||||
export type BlockCompletedData = BlockCompletedEvent['data']
|
||||
export type BlockErrorData = BlockErrorEvent['data']
|
||||
export type StreamChunkData = StreamChunkEvent['data']
|
||||
export type StreamDoneData = StreamDoneEvent['data']
|
||||
|
||||
/**
|
||||
* Helper to create SSE formatted message
|
||||
*/
|
||||
@@ -235,7 +252,13 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
blockId: string,
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
callbackData: { input?: unknown; output: any; executionTime: number },
|
||||
callbackData: {
|
||||
input?: unknown
|
||||
output: any
|
||||
executionTime: number
|
||||
startedAt: string
|
||||
endedAt: string
|
||||
},
|
||||
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
|
||||
) => {
|
||||
const hasError = callbackData.output?.error
|
||||
@@ -260,6 +283,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
input: callbackData.input,
|
||||
error: callbackData.output.error,
|
||||
durationMs: callbackData.executionTime || 0,
|
||||
startedAt: callbackData.startedAt,
|
||||
endedAt: callbackData.endedAt,
|
||||
...iterationData,
|
||||
},
|
||||
})
|
||||
@@ -276,6 +301,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
input: callbackData.input,
|
||||
output: callbackData.output,
|
||||
durationMs: callbackData.executionTime || 0,
|
||||
startedAt: callbackData.startedAt,
|
||||
endedAt: callbackData.endedAt,
|
||||
...iterationData,
|
||||
},
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user