feat(ui): ensure promise always marked as settled, better comments

This commit is contained in:
psychedelicious
2025-06-28 20:47:11 +10:00
parent 8dc6d0b5ae
commit 051876dcff

View File

@@ -88,7 +88,7 @@ export const buildRunGraphDependencies = (
* @param arg.destination The destination to assign to the batch. If omitted, the destination is not set.
* @param arg.prepend Whether to prepend the graph to the front of the queue. If omitted, the graph is appended to the end of the queue.
* @param arg.timeout The timeout for the batch. If omitted, there is no timeout.
* @param arg.signal An optional signal to cancel the operation. If omitted, the operation cannot be canceled!
* @param arg.signal An optional signal to cancel the operation. If omitted, the operation cannot be canceled.
*
* @returns A promise that resolves to the output and completed session, or rejects with an error if the graph fails or is canceled.
*
@@ -128,12 +128,13 @@ export const runGraph = (arg: RunGraphArg): Promise<RunGraphReturn> => {
/**
* We will use the origin to handle events from the graph. Ideally we'd just use the queue item's id, but there's a
* race condition:
* - The queue item id is not available until the graph is enqueued.
* - The graph may complete before we get a response back from enqueuing, so our listeners would miss the event.
* race condition for fast-running graphs:
* - We enqueue the batch and wait for the respose from the network request, which will include the queue item id.
* - The queue item is executed.
* - We get status change events for the queue item, but we don't have the queue item id yet, so we miss the event.
*
* The origin is the only unique identifier that we can set before enqueuing the graph, so we use it to filter
* queue item status change events.
* The origin is the only unique identifier that we can set before enqueuing the graph. We set it to something
* unique and use it to filter for events relevant to this graph.
*/
const origin = getPrefixedId(graph.id);
@@ -148,15 +149,20 @@ export const runGraph = (arg: RunGraphArg): Promise<RunGraphReturn> => {
};
/**
* Flag to indicate whether the graph has already been resolved. This is used to prevent multiple resolutions.
* Flag to indicate whether the promise is settled (resolved or rejected). This is used to prevent multiple
* resolutions. This flag must be set to true before the promise is resolved or rejected.
*/
let isResolved = false;
let isSettled = false;
/**
* The queue item id is set to null initially, but will be updated once the graph is enqueued.
* The queue item id is set to null initially, but will be updated once the graph is enqueued. It will be used to
* retrieve the queue item.
*/
let queueItemId: number | null = null;
/**
* Set of cleanup functions for listeners, timeouts, etc that need to be called when the graph is settled.
*/
const cleanupFunctions: Set<() => void> = new Set();
const cleanup = () => {
for (const func of cleanupFunctions) {
@@ -168,14 +174,18 @@ export const runGraph = (arg: RunGraphArg): Promise<RunGraphReturn> => {
}
};
// If a timeout value is provided, we create a timer to reject the promise.
if (timeout !== undefined) {
const timeoutId = setTimeout(() => {
if (isResolved) {
if (isSettled) {
return;
}
isSettled = true;
log.trace('Graph canceled by timeout');
cleanup();
if (queueItemId !== null) {
// It's possible the cancelation will fail, but we have no way to handle that gracefully. Log a warning
// and move on to reject.
dependencies.executor.cancelQueueItem(queueItemId).catch((error) => {
log.warn({ error: parseify(error) }, 'Failed to cancel queue item during timeout');
});
@@ -188,14 +198,18 @@ export const runGraph = (arg: RunGraphArg): Promise<RunGraphReturn> => {
});
}
// If a signal is provided, we add an abort handler to reject the promise if the signal is aborted.
if (signal !== undefined) {
const abortHandler = () => {
if (isResolved) {
if (isSettled) {
return;
}
isSettled = true;
log.trace('Graph canceled by signal');
cleanup();
if (queueItemId !== null) {
// It's possible the cancelation will fail, but we have no way to handle that gracefully. Log a warning
// and move on to reject.
dependencies.executor.cancelQueueItem(queueItemId).catch((error) => {
log.warn({ error: parseify(error) }, 'Failed to cancel queue item during abort');
});
@@ -209,8 +223,9 @@ export const runGraph = (arg: RunGraphArg): Promise<RunGraphReturn> => {
});
}
// Handle the queue item status change events.
const onQueueItemStatusChanged = async (event: S['QueueItemStatusChangedEvent']) => {
if (isResolved) {
if (isSettled) {
return;
}
@@ -224,10 +239,11 @@ export const runGraph = (arg: RunGraphArg): Promise<RunGraphReturn> => {
return;
}
// The queue item is finished
isResolved = true;
// The queue item is finished - retrieve it, extract results and resolve or reject the promise
isSettled = true;
cleanup();
// We need to handle any errors, including retrieving the queue item
const queueItemResult = await withResultAsync(() => dependencies.executor.getQueueItem(event.item_id));
if (queueItemResult.isErr()) {
reject(queueItemResult.error);
@@ -283,11 +299,14 @@ export const runGraph = (arg: RunGraphArg): Promise<RunGraphReturn> => {
queueItemId = data.item_ids[0];
})
.catch((error) => {
if (!isResolved) {
isResolved = true;
cleanup();
reject(error);
if (isSettled) {
// Not sure how it could happen that we are settled at this point, but if it does, we don't want to
// reject the promise again.
return;
}
isSettled = true;
cleanup();
reject(error);
});
});
@@ -315,8 +334,7 @@ class NodeNotFoundError extends Error {
nodeId: string;
constructor(nodeId: string, session: S['SessionQueueItem']['session']) {
const availableNodes = Object.keys(session.source_prepared_mapping);
super(`Node '${nodeId}' not found in session. Available nodes: ${availableNodes.join(', ')}`);
super(`Node '${nodeId}' not found in session.`);
this.name = this.constructor.name;
this.session = session;
this.nodeId = nodeId;
@@ -328,8 +346,7 @@ class ResultNotFoundError extends Error {
nodeId: string;
constructor(nodeId: string, session: S['SessionQueueItem']['session']) {
const availableResults = Object.keys(session.results);
super(`Result for node '${nodeId}' not found in session. Available results: ${availableResults.join(', ')}`);
super(`Result for node '${nodeId}' not found in session.`);
this.name = this.constructor.name;
this.session = session;
this.nodeId = nodeId;