feat: broadcast and expect multiple acks

This feature was added in `socket.io@4.5.0`:

```js
io.timeout(1000).emit("some-event", (err, responses) => {
  // ...
});
```

Thanks to this change, it will now work with multiple Socket.IO
servers.

Related: https://github.com/socketio/socket.io/issues/4163
This commit is contained in:
Damien Arrachequesne
2022-04-28 15:53:29 +02:00
parent 6397c1bdfd
commit 055b7840d8
5 changed files with 3175 additions and 50 deletions

View File

@@ -24,6 +24,8 @@ enum EventType {
FETCH_SOCKETS_RESPONSE,
SERVER_SIDE_EMIT,
SERVER_SIDE_EMIT_RESPONSE,
BROADCAST_CLIENT_COUNT,
BROADCAST_ACK,
}
interface Request {
@@ -35,6 +37,12 @@ interface Request {
responses: any[];
}
interface AckRequest {
type: EventType.BROADCAST;
clientCountCallback: (clientCount: number) => void;
ack: (...args: any[]) => void;
}
export interface ClusterAdapterOptions {
/**
* after this timeout the adapter will stop waiting from responses to request
@@ -61,6 +69,7 @@ export class ClusterAdapter extends Adapter {
private workerIds: Set<number> = new Set();
private requests: Map<string, Request> = new Map();
private ackRequests: Map<string, AckRequest> = new Map();
/**
* Adapter constructor.
@@ -114,12 +123,54 @@ export class ClusterAdapter extends Adapter {
break;
case EventType.BROADCAST: {
debug("broadcast with opts %j", message.data.opts);
super.broadcast(
message.data.packet,
ClusterAdapter.deserializeOptions(message.data.opts)
);
const withAck = message.data.requestId !== undefined;
if (withAck) {
super.broadcastWithAck(
message.data.packet,
ClusterAdapter.deserializeOptions(message.data.opts),
(clientCount) => {
debug("waiting for %d client acknowledgements", clientCount);
this.publish({
type: EventType.BROADCAST_CLIENT_COUNT,
data: {
requestId: message.data.requestId,
clientCount,
},
});
},
(arg) => {
debug("received acknowledgement with value %j", arg);
this.publish({
type: EventType.BROADCAST_ACK,
data: {
requestId: message.data.requestId,
packet: arg,
},
});
}
);
} else {
super.broadcast(
message.data.packet,
ClusterAdapter.deserializeOptions(message.data.opts)
);
}
break;
}
case EventType.BROADCAST_CLIENT_COUNT: {
const request = this.ackRequests.get(message.data.requestId);
request?.clientCountCallback(message.data.clientCount);
break;
}
case EventType.BROADCAST_ACK: {
const request = this.ackRequests.get(message.data.requestId);
request?.ack(message.data.packet);
break;
}
case EventType.SOCKETS_JOIN: {
debug("calling addSockets with opts %j", message.data.opts);
super.addSockets(
@@ -287,6 +338,48 @@ export class ClusterAdapter extends Adapter {
});
}
public broadcastWithAck(
packet: any,
opts: BroadcastOptions,
clientCountCallback: (clientCount: number) => void,
ack: (...args: any[]) => void
) {
const onlyLocal = opts?.flags?.local;
if (!onlyLocal) {
const requestId = randomId();
this.publish({
type: EventType.BROADCAST,
data: {
packet,
requestId,
opts: ClusterAdapter.serializeOptions(opts),
},
});
this.ackRequests.set(requestId, {
type: EventType.BROADCAST,
clientCountCallback,
ack,
});
// we have no way to know at this level whether the server has received an acknowledgement from each client, so we
// will simply clean up the ackRequests map after the given delay
setTimeout(() => {
this.ackRequests.delete(requestId);
}, opts.flags!.timeout);
}
// packets with binary contents are modified by the broadcast method, hence the nextTick()
process.nextTick(() => {
super.broadcastWithAck(packet, opts, clientCountCallback, ack);
});
}
public serverCount(): Promise<number> {
return Promise.resolve(1 + this.workerIds.size);
}
addSockets(opts: BroadcastOptions, rooms: Room[]) {
super.addSockets(opts, rooms);

2990
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -20,7 +20,7 @@
},
"dependencies": {
"debug": "~4.3.1",
"socket.io-adapter": "~2.3.0"
"socket.io-adapter": "~2.4.0"
},
"devDependencies": {
"@types/expect.js": "^0.3.29",
@@ -30,7 +30,7 @@
"mocha": "^8.4.0",
"nyc": "^15.1.0",
"prettier": "^2.1.2",
"socket.io": "^4.1.1",
"socket.io": "^4.5.0",
"socket.io-client": "^4.1.1",
"ts-node": "^9.1.1",
"typescript": "^4.0.5"

View File

@@ -144,6 +144,84 @@ describe("@socket.io/cluster-adapter", () => {
workers[0].send("broadcasts to local clients only");
});
it("broadcasts with multiple acknowledgements", (done) => {
clientSockets[0].on("test", (cb) => {
cb(1);
});
clientSockets[1].on("test", (cb) => {
cb(2);
});
clientSockets[2].on("test", (cb) => {
cb(3);
});
workers[0].send("broadcasts with multiple acknowledgements");
workers[0].on("message", (result) => {
if (result === "ok") {
done();
}
});
});
it("broadcasts with multiple acknowledgements (binary content)", (done) => {
clientSockets[0].on("test", (cb) => {
cb(Buffer.from([1]));
});
clientSockets[1].on("test", (cb) => {
cb(Buffer.from([2]));
});
clientSockets[2].on("test", (cb) => {
cb(Buffer.from([3]));
});
workers[0].send(
"broadcasts with multiple acknowledgements (binary content)"
);
workers[0].on("message", (result) => {
if (result === "ok") {
done();
}
});
});
it("broadcasts with multiple acknowledgements (no client)", (done) => {
workers[0].send("broadcasts with multiple acknowledgements (no client)");
workers[0].on("message", (result) => {
if (result === "ok") {
done();
}
});
});
it("broadcasts with multiple acknowledgements (timeout)", (done) => {
clientSockets[0].on("test", (cb) => {
cb(1);
});
clientSockets[1].on("test", (cb) => {
cb(2);
});
clientSockets[2].on("test", (cb) => {
// do nothing
});
workers[0].send("broadcasts with multiple acknowledgements (timeout)");
workers[0].on("message", (result) => {
if (result === "ok") {
done();
}
});
});
});
describe("socketsJoin", () => {

View File

@@ -43,6 +43,58 @@ process.on("message", async (msg) => {
io.local.emit("test");
break;
case "broadcasts with multiple acknowledgements": {
io.timeout(500).emit("test", (err, responses) => {
expect(err).to.be(null);
expect(responses).to.contain(1);
expect(responses).to.contain(2);
expect(responses).to.contain(3);
setTimeout(() => {
expect(io.of("/").adapter.ackRequests.size).to.eql(0);
process.send("ok");
}, 500);
});
break;
}
case "broadcasts with multiple acknowledgements (binary content)": {
io.timeout(500).emit("test", (err, responses) => {
expect(err).to.be(null);
responses.forEach((response) => {
expect(Buffer.isBuffer(response)).to.be(true);
});
process.send("ok");
});
break;
}
case "broadcasts with multiple acknowledgements (no client)": {
io
.to("abc")
.timeout(500)
.emit("test", (err, responses) => {
expect(err).to.be(null);
expect(responses).to.eql([]);
process.send("ok");
});
break;
}
case "broadcasts with multiple acknowledgements (timeout)": {
io.timeout(500).emit("test", (err, responses) => {
expect(err).to.be.an(Error);
expect(responses).to.contain(1);
expect(responses).to.contain(2);
process.send("ok");
});
break;
}
case "get rooms":
process.send(serverSocket.rooms);
break;