fix(frontend): Fix unreliable websocket connection for node execution update (#9666)

The current execution update is unreliable, once you lose WebSocket
connection, you will receive no updates.

### Changes 🏗️

Fix web socket re-connection logic.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] Run the app and execute an agent, then restart the API server, and
re-execute the app without refreshing the page.
This commit is contained in:
Zamil Majdy
2025-03-21 18:50:20 +07:00
committed by GitHub
parent a1ac7b18f9
commit b85f6196aa
2 changed files with 34 additions and 22 deletions

View File

@@ -114,10 +114,19 @@ export default function useAgentGraph(
});
if (flowID && flowVersion) {
api.subscribeToExecution(flowID, flowVersion);
console.debug(
`Subscribed to execution events for ${flowID} v.${flowVersion}`,
);
api
.subscribeToExecution(flowID, flowVersion)
.then(() =>
console.debug(
`Subscribed to execution events for ${flowID} v.${flowVersion}`,
),
)
.catch((error) =>
console.error(
`Failed to subscribe to execution events for ${flowID} v.${flowVersion}:`,
error,
),
);
}
}, [api, flowID, flowVersion, flowExecutionID]);

View File

@@ -827,7 +827,7 @@ export default class BackendAPI {
this.webSocket.onclose = (event) => {
console.warn("WebSocket connection closed", event);
this.stopHeartbeat(); // Stop heartbeat when connection closes
this.webSocket = null;
this.wsConnecting = null;
// Attempt to reconnect after a delay
setTimeout(() => this.connectWebSocket(), 1000);
};
@@ -835,6 +835,7 @@ export default class BackendAPI {
this.webSocket.onerror = (error) => {
console.error("WebSocket error:", error);
this.stopHeartbeat(); // Stop heartbeat on error
this.wsConnecting = null;
reject(error);
};
@@ -868,26 +869,28 @@ export default class BackendAPI {
this.webSocket.close();
}
}
sendWebSocketMessage<M extends keyof WebsocketMessageTypeMap>(
async sendWebSocketMessage<M extends keyof WebsocketMessageTypeMap>(
method: M,
data: WebsocketMessageTypeMap[M],
callCount = 0,
) {
callCountLimit = 4,
): Promise<void> {
if (this.webSocket && this.webSocket.readyState === WebSocket.OPEN) {
this.webSocket.send(JSON.stringify({ method, data }));
} else {
this.connectWebSocket().then(() => {
callCount == 0
? this.sendWebSocketMessage(method, data, callCount + 1)
: setTimeout(
() => {
this.sendWebSocketMessage(method, data, callCount + 1);
},
2 ** (callCount - 1) * 1000,
);
});
const result = this.webSocket.send(JSON.stringify({ method, data }));
return;
}
if (callCount >= callCountLimit) {
throw new Error(
`WebSocket connection not open after ${callCountLimit} attempts`,
);
}
await this.connectWebSocket();
if (callCount === 0) {
return this.sendWebSocketMessage(method, data, callCount + 1);
}
const delayMs = 2 ** (callCount - 1) * 1000;
await new Promise((res) => setTimeout(res, delayMs));
return this.sendWebSocketMessage(method, data, callCount + 1);
}
onWebSocketMessage<M extends keyof WebsocketMessageTypeMap>(
@@ -901,8 +904,8 @@ export default class BackendAPI {
return () => this.wsMessageHandlers[method].delete(handler);
}
subscribeToExecution(graphId: string, graphVersion: number) {
this.sendWebSocketMessage("subscribe", {
async subscribeToExecution(graphId: string, graphVersion: number) {
await this.sendWebSocketMessage("subscribe", {
graph_id: graphId,
graph_version: graphVersion,
});