diff --git a/packages/socket.io-adapter/lib/cluster-adapter.ts b/packages/socket.io-adapter/lib/cluster-adapter.ts index c53a48b4..64604b99 100644 --- a/packages/socket.io-adapter/lib/cluster-adapter.ts +++ b/packages/socket.io-adapter/lib/cluster-adapter.ts @@ -508,7 +508,7 @@ export abstract class ClusterAdapter extends Adapter { }, opts.flags!.timeout); } - super.broadcastWithAck(packet, opts, clientCountCallback, ack); + return super.broadcastWithAck(packet, opts, clientCountCallback, ack); } override async addSockets(opts: BroadcastOptions, rooms: Room[]) { diff --git a/packages/socket.io-adapter/lib/in-memory-adapter.ts b/packages/socket.io-adapter/lib/in-memory-adapter.ts index cf178170..9ba553cf 100644 --- a/packages/socket.io-adapter/lib/in-memory-adapter.ts +++ b/packages/socket.io-adapter/lib/in-memory-adapter.ts @@ -229,6 +229,14 @@ export class Adapter extends EventEmitter { }); clientCountCallback(clientCount); + + return { + cleanup: () => { + this.apply(opts, (socket) => { + socket.acks.delete(packet.id); + }); + }, + }; } private _encode(packet: unknown, packetOpts: Record) { diff --git a/packages/socket.io/lib/broadcast-operator.ts b/packages/socket.io/lib/broadcast-operator.ts index 6a229dae..10d8a360 100644 --- a/packages/socket.io/lib/broadcast-operator.ts +++ b/packages/socket.io/lib/broadcast-operator.ts @@ -232,9 +232,11 @@ export class BroadcastOperator const ack = data.pop() as (...args: any[]) => void; let timedOut = false; let responses: any[] = []; + let cleanupPendingAcks: (() => void) | undefined; const timer = setTimeout(() => { timedOut = true; + cleanupPendingAcks?.(); ack.apply(this, [ new Error("operation has timed out"), this.flags.expectSingleResponse ? null : responses, @@ -259,7 +261,7 @@ export class BroadcastOperator } }; - this.adapter.broadcastWithAck( + const result = this.adapter.broadcastWithAck( packet, { rooms: this.rooms, @@ -279,6 +281,10 @@ export class BroadcastOperator }, ); + if (result && typeof result.cleanup === "function") { + cleanupPendingAcks = result.cleanup; + } + this.adapter.serverCount().then((serverCount) => { expectedServerCount = serverCount; checkCompleteness(); diff --git a/packages/socket.io/test/messaging-many.ts b/packages/socket.io/test/messaging-many.ts index 0a518150..87407a2f 100644 --- a/packages/socket.io/test/messaging-many.ts +++ b/packages/socket.io/test/messaging-many.ts @@ -534,6 +534,11 @@ describe("messaging many", () => { // @ts-ignore expect(err.responses).to.contain(1, 2); + for (const [, serverSocket] of io.of("/").sockets) { + // @ts-ignore accessing private acks map to verify cleanup + expect(serverSocket.acks.size).to.be(0); + } + success(done, io, socket1, socket2, socket3); } });