feat(ui): runGraph settlement callbacks can simply return or throw

This commit is contained in:
psychedelicious
2025-06-29 13:01:54 +10:00
parent 4558a292b6
commit 30fffae637

View File

@@ -1,8 +1,7 @@
import { logger } from 'app/logging/logger';
import type { AppStore } from 'app/store/store';
import { Mutex } from 'async-mutex';
import type { Result } from 'common/util/result';
import { ErrResult, OkResult, withResult, withResultAsync } from 'common/util/result';
import { withResult, withResultAsync } from 'common/util/result';
import { parseify } from 'common/util/serialize';
import { getPrefixedId } from 'features/controlLayers/konva/util';
import type { Graph } from 'features/nodes/util/graph/generation/Graph';
@@ -240,7 +239,7 @@ const _runGraph = async (
* Once the graph execution is finished, all remaining logic should be wrapped in this function to avoid race
* conditions or multiple resolutions/rejections of the promise.
*/
const settle = async (settlement: () => Promise<Result<RunGraphReturn, Error>> | Result<RunGraphReturn, Error>) => {
const settle = async (settlement: () => Promise<RunGraphReturn> | RunGraphReturn) => {
await settlementMutex.runExclusive(async () => {
// If we are already settling, ignore this call to avoid multiple resolutions or rejections.
// We don't want to _cancel_ pending locks as this would raise.
@@ -253,7 +252,7 @@ const _runGraph = async (
cleanup();
// Normalize the settlement function to always return a promise.
const result = await Promise.resolve(settlement());
const result = await withResultAsync(() => Promise.resolve(settlement()));
if (result.isOk()) {
_resolve(result.value);
@@ -265,14 +264,14 @@ const _runGraph = async (
if (!graph.hasNode(outputNodeId)) {
await settle(() => {
return ErrResult(new OutputNodeNotFoundInGraphError(outputNodeId, graph));
throw new OutputNodeNotFoundInGraphError(outputNodeId, graph);
});
return;
}
if (graph.getNodes().some((node) => node.type === 'iterate')) {
await settle(() => {
return ErrResult(new IterateNodeFoundInGraphError(graph));
throw new IterateNodeFoundInGraphError(graph);
});
return;
}
@@ -298,7 +297,7 @@ const _runGraph = async (
{ ...loggingCtx, queueItemId, cancellationFailed, cancellationError: parseify(cancellationError) },
'Run timed out'
);
return ErrResult(new SessionTimeoutError(queueItemId, cancellationFailed, cancellationError));
throw new SessionTimeoutError(queueItemId, cancellationFailed, cancellationError);
});
}, timeout);
@@ -328,7 +327,7 @@ const _runGraph = async (
{ ...loggingCtx, queueItemId, cancellationFailed, cancellationError: parseify(cancellationError) },
'Run aborted by signal'
);
return ErrResult(new SessionAbortedError(queueItemId, cancellationFailed, cancellationError));
throw new SessionAbortedError(queueItemId, cancellationFailed, cancellationError);
});
};
@@ -358,7 +357,7 @@ const _runGraph = async (
{ ...loggingCtx, queueItemId, error: parseify(queueItemResult.error) },
'Failed to retrieve queue item'
);
return ErrResult(queueItemResult.error);
throw queueItemResult.error;
}
const queueItem = queueItemResult.value;
@@ -375,21 +374,21 @@ const _runGraph = async (
{ ...loggingCtx, queueItemId, status, error: parseify(getOutputResult.error) },
'Failed to retrieve output result'
);
return ErrResult(getOutputResult.error);
throw getOutputResult.error;
}
const output = getOutputResult.value;
log.debug({ ...loggingCtx, queueItemId, status, output: parseify(output) }, 'Run completed successfully');
return OkResult({ session, output });
return { session, output };
}
if (status === 'failed') {
log.debug({ ...loggingCtx, queueItemId, status, error_type, error_message, error_traceback }, 'Session failed');
return ErrResult(new SessionFailedError(queueItemId, session, error_type, error_message, error_traceback));
throw new SessionFailedError(queueItemId, session, error_type, error_message, error_traceback);
}
if (status === 'canceled') {
log.debug({ ...loggingCtx, queueItemId, status }, 'Session canceled');
return ErrResult(new SessionCanceledError(queueItemId, session));
throw new SessionCanceledError(queueItemId, session);
}
assert<Equals<never, typeof status>>(false);
@@ -416,7 +415,9 @@ const _runGraph = async (
if (enqueueResult.isErr()) {
// The enqueue operation itself failed - we cannot proceed.
log.debug({ ...loggingCtx }, 'Enqueue failed');
await settle(() => ErrResult(enqueueResult.error));
await settle(() => {
throw enqueueResult.error;
});
return;
}