revert: fix: cleanup pending acks on timeout to prevent memory leak

This reverts commit da04267ffc.

The reverted fix was incorrect because the rooms might have changed between the emit() and the timeout.
This commit is contained in:
Damien Arrachequesne
2026-03-11 17:51:03 +01:00
parent 84c2fb7821
commit ba9cd6900d
4 changed files with 2 additions and 21 deletions

View File

@@ -508,7 +508,7 @@ export abstract class ClusterAdapter extends Adapter {
}, opts.flags!.timeout);
}
return super.broadcastWithAck(packet, opts, clientCountCallback, ack);
super.broadcastWithAck(packet, opts, clientCountCallback, ack);
}
override async addSockets(opts: BroadcastOptions, rooms: Room[]) {

View File

@@ -229,14 +229,6 @@ export class Adapter extends EventEmitter {
});
clientCountCallback(clientCount);
return {
cleanup: () => {
this.apply(opts, (socket) => {
socket.acks.delete(packet.id);
});
},
};
}
private _encode(packet: unknown, packetOpts: Record<string, unknown>) {

View File

@@ -232,11 +232,9 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
const ack = data.pop() as (...args: any[]) => void;
let timedOut = false;
let responses: any[] = [];
let cleanupPendingAcks: (() => void) | undefined;
const timer = setTimeout(() => {
timedOut = true;
cleanupPendingAcks?.();
ack.apply(this, [
new Error("operation has timed out"),
this.flags.expectSingleResponse ? null : responses,
@@ -261,7 +259,7 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
}
};
const result = this.adapter.broadcastWithAck(
this.adapter.broadcastWithAck(
packet,
{
rooms: this.rooms,
@@ -281,10 +279,6 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
},
);
if (result && typeof result.cleanup === "function") {
cleanupPendingAcks = result.cleanup;
}
this.adapter.serverCount().then((serverCount) => {
expectedServerCount = serverCount;
checkCompleteness();

View File

@@ -534,11 +534,6 @@ describe("messaging many", () => {
// @ts-ignore
expect(err.responses).to.contain(1, 2);
for (const [, serverSocket] of io.of("/").sockets) {
// @ts-ignore accessing private acks map to verify cleanup
expect(serverSocket.acks.size).to.be(0);
}
success(done, io, socket1, socket2, socket3);
}
});