feat(ui): improved logging for runGraph

This commit is contained in:
psychedelicious
2025-06-29 12:34:49 +10:00
parent eb406aa07e
commit 6dafa67286

View File

@@ -9,6 +9,7 @@ import type { Graph } from 'features/nodes/util/graph/generation/Graph';
import type { S } from 'services/api/types';
import type { Equals } from 'tsafe';
import { assert } from 'tsafe';
import type { JsonObject } from 'type-fest';
import { enqueueMutationFixedCacheKeyOptions, queueApi } from './endpoints/queue';
import type { EnqueueBatchArg } from './types';
@@ -170,6 +171,15 @@ const _runGraph = async (
): Promise<void> => {
const { graph, outputNodeId, dependencies, destination, prepend, timeout, signal } = arg;
const loggingCtx: JsonObject = {
graphId: graph.id,
outputNodeId,
destination: destination ?? 'not provided',
prepend: prepend ?? false,
timeout: timeout ?? 'not provided',
signal: signal !== undefined ? 'provided' : 'not provided',
};
/**
* We will use the origin to filter out socket events unrelated to this graph.
*
@@ -200,7 +210,7 @@ const _runGraph = async (
try {
func();
} catch (error) {
log.warn({ error: parseify(error) }, 'Error during cleanup');
log.warn({ ...loggingCtx, error: parseify(error) }, 'Error during cleanup');
}
}
cleanupFunctions.clear();
@@ -263,9 +273,8 @@ const _runGraph = async (
if (timeout !== undefined) {
const timeoutId = setTimeout(async () => {
await settle(async () => {
log.trace('Graph canceled by timeout');
let cancellationFailed = false;
let cancellationError: Error | undefined;
let cancellationError: Error | null = null;
const cancelResult = await withResultAsync(async () => {
if (queueItemId !== null) {
@@ -276,9 +285,11 @@ const _runGraph = async (
if (cancelResult.isErr()) {
cancellationFailed = true;
cancellationError = cancelResult.error;
log.warn({ error: parseify(cancelResult.error) }, 'Failed to cancel queue item during timeout');
}
log.debug(
{ ...loggingCtx, queueItemId, cancellationFailed, cancellationError: parseify(cancellationError) },
'Run timed out'
);
return ErrResult(new SessionTimeoutError(queueItemId, cancellationFailed, cancellationError));
});
}, timeout);
@@ -292,9 +303,8 @@ const _runGraph = async (
if (signal !== undefined) {
const abortHandler = async () => {
await settle(async () => {
log.trace('Graph canceled by signal');
let cancellationFailed = false;
let cancellationError: Error | undefined;
let cancellationError: Error | null = null;
const cancelResult = await withResultAsync(async () => {
if (queueItemId !== null) {
@@ -305,9 +315,11 @@ const _runGraph = async (
if (cancelResult.isErr()) {
cancellationFailed = true;
cancellationError = cancelResult.error;
log.warn({ error: parseify(cancelResult.error) }, 'Failed to cancel queue item during abort');
}
log.debug(
{ ...loggingCtx, queueItemId, cancellationFailed, cancellationError: parseify(cancellationError) },
'Run aborted by signal'
);
return ErrResult(new SessionAbortedError(queueItemId, cancellationFailed, cancellationError));
});
};
@@ -334,6 +346,10 @@ const _runGraph = async (
// We need to handle any errors, including retrieving the queue item
const queueItemResult = await withResultAsync(() => dependencies.executor.getQueueItem(event.item_id));
if (queueItemResult.isErr()) {
log.debug(
{ ...loggingCtx, queueItemId, error: parseify(queueItemResult.error) },
'Failed to retrieve queue item'
);
return ErrResult(queueItemResult.error);
}
@@ -347,17 +363,24 @@ const _runGraph = async (
if (status === 'completed') {
const getOutputResult = withResult(() => getOutputFromSession(queueItemId, session, outputNodeId));
if (getOutputResult.isErr()) {
log.debug(
{ ...loggingCtx, queueItemId, status, error: parseify(getOutputResult.error) },
'Failed to retrieve output result'
);
return ErrResult(getOutputResult.error);
}
const output = getOutputResult.value;
log.debug({ ...loggingCtx, queueItemId, status, output: parseify(output) }, 'Run completed successfully');
return OkResult({ session, output });
}
if (status === 'failed') {
log.debug({ ...loggingCtx, queueItemId, status, error_type, error_message, error_traceback }, 'Session failed');
return ErrResult(new SessionFailedError(queueItemId, session, error_type, error_message, error_traceback));
}
if (status === 'canceled') {
log.debug({ ...loggingCtx, queueItemId, status }, 'Session canceled');
return ErrResult(new SessionCanceledError(queueItemId, session));
}
@@ -384,6 +407,7 @@ const _runGraph = async (
});
if (enqueueResult.isErr()) {
// The enqueue operation itself failed - we cannot proceed.
log.debug({ ...loggingCtx }, 'Enqueue failed');
await settle(() => ErrResult(enqueueResult.error));
return;
}
@@ -518,9 +542,9 @@ export class SessionCanceledError extends BaseSessionError {
export class SessionAbortedError extends BaseQueueItemError {
public readonly cancellationFailed: boolean;
public readonly cancellationError?: Error;
public readonly cancellationError: Error | null;
constructor(queueItemId: number | null, cancellationFailed = false, cancellationError?: Error) {
constructor(queueItemId: number | null, cancellationFailed = false, cancellationError: Error | null) {
const message = cancellationFailed
? 'Session execution was aborted via signal and cancellation failed'
: 'Session execution was aborted via signal';
@@ -533,9 +557,9 @@ export class SessionAbortedError extends BaseQueueItemError {
export class SessionTimeoutError extends BaseQueueItemError {
public readonly cancellationFailed: boolean;
public readonly cancellationError?: Error;
public readonly cancellationError: Error | null;
constructor(queueItemId: number | null, cancellationFailed = false, cancellationError?: Error) {
constructor(queueItemId: number | null, cancellationFailed = false, cancellationError: Error | null) {
const message = cancellationFailed
? 'Session execution timed out and cancellation failed'
: 'Session execution timed out';