From 1c9827ec1136e24094295907efaf4d4e6c2fef2f Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Sat, 27 Feb 2021 01:37:53 +0100 Subject: [PATCH] feat: add some utility methods This commit adds the following methods: - fetchSockets: return the matching socket instances - addSockets: make the matching socket instances join the specified rooms - delSockets: make the matching socket instances leave the specified rooms - disconnectSockets: disconnect the matching socket instances Those methods will then be exposed by the Socket.IO server: ```js // clear room io.socketsLeave("room1"); // disconnect all sockets in room io.in("room2").disconnectSockets(); // fetch socket instances in room io.in("room3").fetchSockets(); ``` This feature will also be extended in the Redis adapter to handle multiple Socket.IO servers. --- lib/index.ts | 148 ++++++++++++++++++++++++++++++++------------------ test/index.js | 47 +++++++++++++++- 2 files changed, 140 insertions(+), 55 deletions(-) diff --git a/lib/index.ts b/lib/index.ts index 87b430b3..a8a76757 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -125,53 +125,19 @@ export class Adapter extends EventEmitter { * @public */ public broadcast(packet: any, opts: BroadcastOptions): void { - const rooms = opts.rooms; const flags = opts.flags || {}; const packetOpts = { preEncoded: true, volatile: flags.volatile, compress: flags.compress }; - const ids = new Set(); - let except = opts.except || new Set(); packet.nsp = this.nsp.name; const encodedPackets = this.encoder.encode(packet); - // Allow ids in `except` to be room ids. - if (except.size > 0) { - const exclude = except; - except = new Set(except); - for (const id of exclude) { - if (!this.rooms.has(id)) continue; - for (const sid of this.rooms.get(id)) { - if (sid !== id) { - except.add(sid); - } - } - } - } - - if (rooms.size) { - for (const room of rooms) { - if (!this.rooms.has(room)) continue; - - for (const id of this.rooms.get(room)) { - if (ids.has(id) || except.has(id)) continue; - const socket = this.nsp.sockets.get(id); - if (socket) { - socket.packet(encodedPackets, packetOpts); - ids.add(id); - } - } - } - } else { - for (const [id] of this.sids) { - if (except.has(id)) continue; - const socket = this.nsp.sockets.get(id); - if (socket) socket.packet(encodedPackets, packetOpts); - } - } + this.apply(opts, socket => { + socket.packet(encodedPackets, packetOpts); + }); } /** @@ -182,21 +148,9 @@ export class Adapter extends EventEmitter { public sockets(rooms: Set): Promise> { const sids = new Set(); - if (rooms.size) { - for (const room of rooms) { - if (!this.rooms.has(room)) continue; - - for (const id of this.rooms.get(room)) { - if (this.nsp.sockets.has(id)) { - sids.add(id); - } - } - } - } else { - for (const [id] of this.sids) { - if (this.nsp.sockets.has(id)) sids.add(id); - } - } + this.apply({ rooms }, socket => { + sids.add(socket.id); + }); return Promise.resolve(sids); } @@ -209,4 +163,94 @@ export class Adapter extends EventEmitter { public socketRooms(id: SocketId): Set | undefined { return this.sids.get(id); } + + /** + * Returns the matching socket instances + * + * @param opts - the filters to apply + */ + public fetchSockets(opts: BroadcastOptions): Promise { + const sockets = []; + + this.apply(opts, socket => { + sockets.push(socket); + }); + + return Promise.resolve(sockets); + } + + /** + * Makes the matching socket instances join the specified rooms + * + * @param opts - the filters to apply + * @param rooms - the rooms to join + */ + public addSockets(opts: BroadcastOptions, rooms: Room[]): void { + this.apply(opts, socket => { + socket.join(rooms); + }); + } + + /** + * Makes the matching socket instances leave the specified rooms + * + * @param opts - the filters to apply + * @param rooms - the rooms to leave + */ + public delSockets(opts: BroadcastOptions, rooms: Room[]): void { + this.apply(opts, socket => { + rooms.forEach(room => socket.leave(room)); + }); + } + + /** + * Makes the matching socket instances disconnect + * + * @param opts - the filters to apply + * @param close - whether to close the underlying connection + */ + public disconnectSockets(opts: BroadcastOptions, close: boolean): void { + this.apply(opts, socket => { + socket.disconnect(close); + }); + } + + private apply(opts: BroadcastOptions, callback: (socket) => void): void { + const rooms = opts.rooms; + const except = this.computeExceptSids(opts.except); + + if (rooms.size) { + const ids = new Set(); + for (const room of rooms) { + if (!this.rooms.has(room)) continue; + + for (const id of this.rooms.get(room)) { + if (ids.has(id) || except.has(id)) continue; + const socket = this.nsp.sockets.get(id); + if (socket) { + callback(socket); + ids.add(id); + } + } + } + } else { + for (const [id] of this.sids) { + if (except.has(id)) continue; + const socket = this.nsp.sockets.get(id); + if (socket) callback(socket); + } + } + } + + private computeExceptSids(exceptRooms?: Set) { + const exceptSids = new Set(); + if (exceptRooms && exceptRooms.size > 0) { + for (const room of exceptRooms) { + if (this.rooms.has(room)) { + this.rooms.get(room).forEach(sid => exceptSids.add(sid)); + } + } + } + return exceptSids; + } } diff --git a/test/index.js b/test/index.js index af79272b..d9767713 100644 --- a/test/index.js +++ b/test/index.js @@ -30,9 +30,9 @@ describe("socket.io-adapter", () => { const adapter = new Adapter({ server: { encoder: null }, sockets: new Map([ - ["s1", true], - ["s2", true], - ["s3", true] + ["s1", { id: "s1" }], + ["s2", { id: "s2" }], + ["s3", { id: "s3" }] ]) }); adapter.addAll("s1", new Set(["r1", "r2"])); @@ -124,6 +124,47 @@ describe("socket.io-adapter", () => { expect(ids).to.eql(["s3"]); }); + describe("utility methods", () => { + let adapter; + + before(() => { + adapter = new Adapter({ + server: { encoder: null }, + sockets: new Map([ + ["s1", { id: "s1" }], + ["s2", { id: "s2" }], + ["s3", { id: "s3" }] + ]) + }); + }); + + describe("fetchSockets", () => { + it("returns the matching socket instances", async () => { + adapter.addAll("s1", new Set(["s1"])); + adapter.addAll("s2", new Set(["s2"])); + adapter.addAll("s3", new Set(["s3"])); + const matchingSockets = await adapter.fetchSockets({ + rooms: new Set() + }); + expect(matchingSockets).to.be.an(Array); + expect(matchingSockets.length).to.be(3); + }); + + it("returns the matching socket instances within room", async () => { + adapter.addAll("s1", new Set(["r1", "r2"])); + adapter.addAll("s2", new Set(["r1"])); + adapter.addAll("s3", new Set(["r2"])); + const matchingSockets = await adapter.fetchSockets({ + rooms: new Set(["r1"]), + except: new Set(["r2"]) + }); + expect(matchingSockets).to.be.an(Array); + expect(matchingSockets.length).to.be(1); + expect(matchingSockets[0].id).to.be("s2"); + }); + }); + }); + describe("events", () => { it("should emit a 'create-room' event", done => { const adapter = new Adapter({ server: { encoder: null } });