mirror of
https://github.com/socketio/socket.io.git
synced 2026-04-30 03:00:39 -04:00
fix: cleanup pending acks on timeout to prevent memory leak (#5442)
When using `emitWithAck` with a timeout, if clients didn't respond and the timeout triggers, the ack callbacks remained in `socket.acks` Map indefinitely, causing a memory leak. Related: https://github.com/socketio/socket.io/issues/4984
This commit is contained in:
committed by
Damien Arrachequesne
parent
74599a6b9e
commit
da04267ffc
@@ -508,7 +508,7 @@ export abstract class ClusterAdapter extends Adapter {
|
||||
}, opts.flags!.timeout);
|
||||
}
|
||||
|
||||
super.broadcastWithAck(packet, opts, clientCountCallback, ack);
|
||||
return super.broadcastWithAck(packet, opts, clientCountCallback, ack);
|
||||
}
|
||||
|
||||
override async addSockets(opts: BroadcastOptions, rooms: Room[]) {
|
||||
|
||||
@@ -229,6 +229,14 @@ export class Adapter extends EventEmitter {
|
||||
});
|
||||
|
||||
clientCountCallback(clientCount);
|
||||
|
||||
return {
|
||||
cleanup: () => {
|
||||
this.apply(opts, (socket) => {
|
||||
socket.acks.delete(packet.id);
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
private _encode(packet: unknown, packetOpts: Record<string, unknown>) {
|
||||
|
||||
@@ -232,9 +232,11 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
|
||||
const ack = data.pop() as (...args: any[]) => void;
|
||||
let timedOut = false;
|
||||
let responses: any[] = [];
|
||||
let cleanupPendingAcks: (() => void) | undefined;
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
timedOut = true;
|
||||
cleanupPendingAcks?.();
|
||||
ack.apply(this, [
|
||||
new Error("operation has timed out"),
|
||||
this.flags.expectSingleResponse ? null : responses,
|
||||
@@ -259,7 +261,7 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
|
||||
}
|
||||
};
|
||||
|
||||
this.adapter.broadcastWithAck(
|
||||
const result = this.adapter.broadcastWithAck(
|
||||
packet,
|
||||
{
|
||||
rooms: this.rooms,
|
||||
@@ -279,6 +281,10 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
|
||||
},
|
||||
);
|
||||
|
||||
if (result && typeof result.cleanup === "function") {
|
||||
cleanupPendingAcks = result.cleanup;
|
||||
}
|
||||
|
||||
this.adapter.serverCount().then((serverCount) => {
|
||||
expectedServerCount = serverCount;
|
||||
checkCompleteness();
|
||||
|
||||
@@ -534,6 +534,11 @@ describe("messaging many", () => {
|
||||
// @ts-ignore
|
||||
expect(err.responses).to.contain(1, 2);
|
||||
|
||||
for (const [, serverSocket] of io.of("/").sockets) {
|
||||
// @ts-ignore accessing private acks map to verify cleanup
|
||||
expect(serverSocket.acks.size).to.be(0);
|
||||
}
|
||||
|
||||
success(done, io, socket1, socket2, socket3);
|
||||
}
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user