feat: add some utility methods

This commit adds the following methods:

- fetchSockets: return the matching socket instances
- addSockets: make the matching socket instances join the specified rooms
- delSockets: make the matching socket instances leave the specified rooms
- disconnectSockets: disconnect the matching socket instances

Those methods will then be exposed by the Socket.IO server:

```js
// clear room
io.socketsLeave("room1");

// disconnect all sockets in room
io.in("room2").disconnectSockets();

// fetch socket instances in room
io.in("room3").fetchSockets();
```

This feature will also be extended in the Redis adapter to handle
multiple Socket.IO servers.
This commit is contained in:
Damien Arrachequesne
2021-02-27 01:37:53 +01:00
parent 985bb41fa2
commit 1c9827ec11
2 changed files with 140 additions and 55 deletions

View File

@@ -125,53 +125,19 @@ export class Adapter extends EventEmitter {
* @public
*/
public broadcast(packet: any, opts: BroadcastOptions): void {
const rooms = opts.rooms;
const flags = opts.flags || {};
const packetOpts = {
preEncoded: true,
volatile: flags.volatile,
compress: flags.compress
};
const ids = new Set();
let except = opts.except || new Set();
packet.nsp = this.nsp.name;
const encodedPackets = this.encoder.encode(packet);
// Allow ids in `except` to be room ids.
if (except.size > 0) {
const exclude = except;
except = new Set(except);
for (const id of exclude) {
if (!this.rooms.has(id)) continue;
for (const sid of this.rooms.get(id)) {
if (sid !== id) {
except.add(sid);
}
}
}
}
if (rooms.size) {
for (const room of rooms) {
if (!this.rooms.has(room)) continue;
for (const id of this.rooms.get(room)) {
if (ids.has(id) || except.has(id)) continue;
const socket = this.nsp.sockets.get(id);
if (socket) {
socket.packet(encodedPackets, packetOpts);
ids.add(id);
}
}
}
} else {
for (const [id] of this.sids) {
if (except.has(id)) continue;
const socket = this.nsp.sockets.get(id);
if (socket) socket.packet(encodedPackets, packetOpts);
}
}
this.apply(opts, socket => {
socket.packet(encodedPackets, packetOpts);
});
}
/**
@@ -182,21 +148,9 @@ export class Adapter extends EventEmitter {
public sockets(rooms: Set<Room>): Promise<Set<SocketId>> {
const sids = new Set<SocketId>();
if (rooms.size) {
for (const room of rooms) {
if (!this.rooms.has(room)) continue;
for (const id of this.rooms.get(room)) {
if (this.nsp.sockets.has(id)) {
sids.add(id);
}
}
}
} else {
for (const [id] of this.sids) {
if (this.nsp.sockets.has(id)) sids.add(id);
}
}
this.apply({ rooms }, socket => {
sids.add(socket.id);
});
return Promise.resolve(sids);
}
@@ -209,4 +163,94 @@ export class Adapter extends EventEmitter {
public socketRooms(id: SocketId): Set<Room> | undefined {
return this.sids.get(id);
}
/**
* Returns the matching socket instances
*
* @param opts - the filters to apply
*/
public fetchSockets(opts: BroadcastOptions): Promise<any[]> {
const sockets = [];
this.apply(opts, socket => {
sockets.push(socket);
});
return Promise.resolve(sockets);
}
/**
* Makes the matching socket instances join the specified rooms
*
* @param opts - the filters to apply
* @param rooms - the rooms to join
*/
public addSockets(opts: BroadcastOptions, rooms: Room[]): void {
this.apply(opts, socket => {
socket.join(rooms);
});
}
/**
* Makes the matching socket instances leave the specified rooms
*
* @param opts - the filters to apply
* @param rooms - the rooms to leave
*/
public delSockets(opts: BroadcastOptions, rooms: Room[]): void {
this.apply(opts, socket => {
rooms.forEach(room => socket.leave(room));
});
}
/**
* Makes the matching socket instances disconnect
*
* @param opts - the filters to apply
* @param close - whether to close the underlying connection
*/
public disconnectSockets(opts: BroadcastOptions, close: boolean): void {
this.apply(opts, socket => {
socket.disconnect(close);
});
}
private apply(opts: BroadcastOptions, callback: (socket) => void): void {
const rooms = opts.rooms;
const except = this.computeExceptSids(opts.except);
if (rooms.size) {
const ids = new Set();
for (const room of rooms) {
if (!this.rooms.has(room)) continue;
for (const id of this.rooms.get(room)) {
if (ids.has(id) || except.has(id)) continue;
const socket = this.nsp.sockets.get(id);
if (socket) {
callback(socket);
ids.add(id);
}
}
}
} else {
for (const [id] of this.sids) {
if (except.has(id)) continue;
const socket = this.nsp.sockets.get(id);
if (socket) callback(socket);
}
}
}
private computeExceptSids(exceptRooms?: Set<Room>) {
const exceptSids = new Set();
if (exceptRooms && exceptRooms.size > 0) {
for (const room of exceptRooms) {
if (this.rooms.has(room)) {
this.rooms.get(room).forEach(sid => exceptSids.add(sid));
}
}
}
return exceptSids;
}
}

View File

@@ -30,9 +30,9 @@ describe("socket.io-adapter", () => {
const adapter = new Adapter({
server: { encoder: null },
sockets: new Map([
["s1", true],
["s2", true],
["s3", true]
["s1", { id: "s1" }],
["s2", { id: "s2" }],
["s3", { id: "s3" }]
])
});
adapter.addAll("s1", new Set(["r1", "r2"]));
@@ -124,6 +124,47 @@ describe("socket.io-adapter", () => {
expect(ids).to.eql(["s3"]);
});
describe("utility methods", () => {
let adapter;
before(() => {
adapter = new Adapter({
server: { encoder: null },
sockets: new Map([
["s1", { id: "s1" }],
["s2", { id: "s2" }],
["s3", { id: "s3" }]
])
});
});
describe("fetchSockets", () => {
it("returns the matching socket instances", async () => {
adapter.addAll("s1", new Set(["s1"]));
adapter.addAll("s2", new Set(["s2"]));
adapter.addAll("s3", new Set(["s3"]));
const matchingSockets = await adapter.fetchSockets({
rooms: new Set()
});
expect(matchingSockets).to.be.an(Array);
expect(matchingSockets.length).to.be(3);
});
it("returns the matching socket instances within room", async () => {
adapter.addAll("s1", new Set(["r1", "r2"]));
adapter.addAll("s2", new Set(["r1"]));
adapter.addAll("s3", new Set(["r2"]));
const matchingSockets = await adapter.fetchSockets({
rooms: new Set(["r1"]),
except: new Set(["r2"])
});
expect(matchingSockets).to.be.an(Array);
expect(matchingSockets.length).to.be(1);
expect(matchingSockets[0].id).to.be("s2");
});
});
});
describe("events", () => {
it("should emit a 'create-room' event", done => {
const adapter = new Adapter({ server: { encoder: null } });