From 0c431243e28913fdd2a4a3de3e67a9f38d67a3aa Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Thu, 16 Oct 2025 19:16:20 +0200 Subject: [PATCH] refactor: use the ClusterAdapter class from socket.io-adapter package The ClusterAdapter class has been moved to [1], so that this adapter only needs to implement to pub/sub mechanism. Also, [2] should reduce the number of "timeout reached: only x responses received out of y" errors, since the fetchSockets() requests will now succeed even if a server leaves the cluster. [1]: https://github.com/socketio/socket.io/tree/main/packages/socket.io-adapter [2]: https://github.com/socketio/socket.io/commit/0e23ff0cc671e3186510f7cfb8a4c1147457296f --- lib/index.ts | 599 +++++----------------------------------------- package-lock.json | 34 ++- package.json | 2 +- test/index.ts | 4 +- test/worker.js | 4 +- 5 files changed, 93 insertions(+), 550 deletions(-) diff --git a/lib/index.ts b/lib/index.ts index 0b984195..49d660bb 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,553 +1,80 @@ import cluster from "node:cluster"; -import { Adapter, BroadcastOptions, Room } from "socket.io-adapter"; -import { randomBytes } from "node:crypto"; - -const randomId = () => randomBytes(8).toString("hex"); -const debug = require("debug")("socket.io-cluster-adapter"); +import { + ClusterAdapterWithHeartbeat, + ClusterAdapterOptions, + ClusterMessage, + ServerId, + ClusterResponse, + MessageType, +} from "socket.io-adapter"; +import debugModule from "debug"; +const debug = debugModule("socket.io-cluster-adapter"); const MESSAGE_SOURCE = "_sio_adapter"; const hasOwnProperty = Object.prototype.hasOwnProperty; -/** - * Event types, for messages between nodes - */ - -enum EventType { - WORKER_INIT = 1, - WORKER_PING, - WORKER_EXIT, - BROADCAST, - SOCKETS_JOIN, - SOCKETS_LEAVE, - DISCONNECT_SOCKETS, - FETCH_SOCKETS, - FETCH_SOCKETS_RESPONSE, - SERVER_SIDE_EMIT, - SERVER_SIDE_EMIT_RESPONSE, - BROADCAST_CLIENT_COUNT, - BROADCAST_ACK, -} - -interface Request { - type: EventType; - resolve: Function; - timeout: NodeJS.Timeout; - expected: number; - current: number; - responses: any[]; -} - -interface AckRequest { - type: EventType.BROADCAST; - clientCountCallback: (clientCount: number) => void; - ack: (...args: any[]) => void; -} - -export interface ClusterAdapterOptions { - /** - * after this timeout the adapter will stop waiting from responses to request - * @default 5000 - */ - requestsTimeout: number; -} - function ignoreError() {} /** - * Returns a function that will create a ClusterAdapter instance. + * Returns a function that will create a NodeClusterAdapter instance. * * @param opts - additional options * * @public + * @see https://nodejs.org/api/cluster.html */ export function createAdapter(opts: Partial = {}) { - return function (nsp) { - return new ClusterAdapter(nsp, opts); + return function (nsp: any) { + return new NodeClusterAdapter(nsp, opts); }; } -export class ClusterAdapter extends Adapter { - public requestsTimeout: number; +export class NodeClusterAdapter extends ClusterAdapterWithHeartbeat { + constructor(nsp: any, opts: ClusterAdapterOptions = {}) { + super(nsp, opts); + process.on("message", (message: any) => { + const isValidSource = message?.source === MESSAGE_SOURCE; + if (!isValidSource) { + debug("[%s] ignore unknown source", this.uid); + return; + } - private workerIds: Set = new Set(); - private requests: Map = new Map(); - private ackRequests: Map = new Map(); + // note: this check should be done in the onMessage() handler + if (message.nsp !== this.nsp.name) { + debug("[%s] ignore other namespace", this.uid); + return; + } - /** - * Adapter constructor. - * - * @param nsp - the namespace - * @param opts - additional options - * - * @public - */ - constructor(nsp: any, opts: Partial = {}) { - super(nsp); - this.requestsTimeout = opts.requestsTimeout || 5000; - - this.publish({ - type: EventType.WORKER_INIT, - data: cluster.worker.id, + this.onMessage(message); }); - process.on("message", this.onMessage.bind(this)); + // until https://github.com/socketio/socket.io/commit/f3e1f5ebdf59158d0c8d1e20f8230275617fb355 is released + this.init(); } - public async onMessage(message: any) { - const isValidSource = message?.source === MESSAGE_SOURCE; - if (!isValidSource) { - return; - } - - if (message.type === EventType.WORKER_EXIT) { - this.workerIds.delete(message.data); - debug("workers count is now %d", this.workerIds.size); - return; - } - - if (message.nsp !== this.nsp.name) { - debug("ignore other namespace"); - return; - } - - switch (message.type) { - case EventType.WORKER_INIT: - this.workerIds.add(message.data); - debug("workers count is now %d", this.workerIds.size); - this.publish({ - type: EventType.WORKER_PING, - data: cluster.worker.id, - }); - break; - case EventType.WORKER_PING: - this.workerIds.add(message.data); - debug("workers count is now %d", this.workerIds.size); - break; - case EventType.BROADCAST: { - debug("broadcast with opts %j", message.data.opts); - - const withAck = message.data.requestId !== undefined; - if (withAck) { - super.broadcastWithAck( - message.data.packet, - ClusterAdapter.deserializeOptions(message.data.opts), - (clientCount) => { - debug("waiting for %d client acknowledgements", clientCount); - this.publish({ - type: EventType.BROADCAST_CLIENT_COUNT, - data: { - requestId: message.data.requestId, - clientCount, - }, - }); - }, - (arg) => { - debug("received acknowledgement with value %j", arg); - this.publish({ - type: EventType.BROADCAST_ACK, - data: { - requestId: message.data.requestId, - packet: arg, - }, - }); - }, - ); - } else { - super.broadcast( - message.data.packet, - ClusterAdapter.deserializeOptions(message.data.opts), - ); - } - break; - } - - case EventType.BROADCAST_CLIENT_COUNT: { - const request = this.ackRequests.get(message.data.requestId); - request?.clientCountCallback(message.data.clientCount); - break; - } - - case EventType.BROADCAST_ACK: { - const request = this.ackRequests.get(message.data.requestId); - request?.ack(message.data.packet); - break; - } - - case EventType.SOCKETS_JOIN: { - debug("calling addSockets with opts %j", message.data.opts); - super.addSockets( - ClusterAdapter.deserializeOptions(message.data.opts), - message.data.rooms, - ); - break; - } - case EventType.SOCKETS_LEAVE: { - debug("calling delSockets with opts %j", message.data.opts); - super.delSockets( - ClusterAdapter.deserializeOptions(message.data.opts), - message.data.rooms, - ); - break; - } - case EventType.DISCONNECT_SOCKETS: { - debug("calling disconnectSockets with opts %j", message.data.opts); - super.disconnectSockets( - ClusterAdapter.deserializeOptions(message.data.opts), - message.data.close, - ); - break; - } - case EventType.FETCH_SOCKETS: { - debug("calling fetchSockets with opts %j", message.data.opts); - const localSockets = await super.fetchSockets( - ClusterAdapter.deserializeOptions(message.data.opts), - ); - - this.publish({ - type: EventType.FETCH_SOCKETS_RESPONSE, - data: { - requestId: message.data.requestId, - workerId: message.data.workerId, - sockets: localSockets.map((socket) => ({ - id: socket.id, - handshake: socket.handshake, - rooms: [...socket.rooms], - data: socket.data, - })), - }, - }); - break; - } - case EventType.FETCH_SOCKETS_RESPONSE: { - const request = this.requests.get(message.data.requestId); - - if (!request) { - return; - } - - request.current++; - message.data.sockets.forEach((socket: any) => - request.responses.push(socket), - ); - - if (request.current === request.expected) { - clearTimeout(request.timeout); - request.resolve(request.responses); - this.requests.delete(message.data.requestId); - } - break; - } - case EventType.SERVER_SIDE_EMIT: { - const packet = message.data.packet; - const withAck = message.data.requestId !== undefined; - if (!withAck) { - this.nsp._onServerSideEmit(packet); - return; - } - let called = false; - const callback = (arg: any) => { - // only one argument is expected - if (called) { - return; - } - called = true; - debug("calling acknowledgement with %j", arg); - this.publish({ - type: EventType.SERVER_SIDE_EMIT_RESPONSE, - data: { - requestId: message.data.requestId, - workerId: message.data.workerId, - packet: arg, - }, - }); - }; - - packet.push(callback); - this.nsp._onServerSideEmit(packet); - break; - } - case EventType.SERVER_SIDE_EMIT_RESPONSE: { - const request = this.requests.get(message.data.requestId); - - if (!request) { - return; - } - - request.current++; - request.responses.push(message.data.packet); - - if (request.current === request.expected) { - clearTimeout(request.timeout); - request.resolve(null, request.responses); - this.requests.delete(message.data.requestId); - } - } - } - } - - private async publish(message: any) { - // to be able to ignore unrelated messages on the cluster message bus + protected override doPublish(message: ClusterMessage & { source: string }) { message.source = MESSAGE_SOURCE; - // to be able to ignore messages from other namespaces - message.nsp = this.nsp.name; - - debug( - "publish event of type %s for namespace %s", - message.type, - message.nsp, - ); process.send(message, null, {}, ignoreError); + + return Promise.resolve(""); // connection state recovery is not supported } - /** - * Transform ES6 Set into plain arrays. - * - * Note: we manually serialize ES6 Sets so that using `serialization: "advanced"` is not needed when using plaintext - * packets (reference: https://nodejs.org/api/child_process.html#child_process_advanced_serialization) - */ - private static serializeOptions(opts: BroadcastOptions) { - return { - rooms: [...opts.rooms], - except: opts.except ? [...opts.except] : [], - flags: opts.flags, - }; - } - - private static deserializeOptions(opts: any): BroadcastOptions { - return { - rooms: new Set(opts.rooms), - except: new Set(opts.except), - flags: opts.flags, - }; - } - - public broadcast(packet: any, opts: BroadcastOptions) { - const onlyLocal = opts?.flags?.local; - if (!onlyLocal) { - this.publish({ - type: EventType.BROADCAST, - data: { - packet, - opts: ClusterAdapter.serializeOptions(opts), - }, - }); - } - - // packets with binary contents are modified by the broadcast method, hence the nextTick() - process.nextTick(() => { - super.broadcast(packet, opts); - }); - } - - public broadcastWithAck( - packet: any, - opts: BroadcastOptions, - clientCountCallback: (clientCount: number) => void, - ack: (...args: any[]) => void, + protected override doPublishResponse( + requesterUid: ServerId, + response: ClusterResponse & { source: string; requesterUid: string }, ) { - const onlyLocal = opts?.flags?.local; - if (!onlyLocal) { - const requestId = randomId(); + response.source = MESSAGE_SOURCE; + response.requesterUid = requesterUid; - this.publish({ - type: EventType.BROADCAST, - data: { - packet, - requestId, - opts: ClusterAdapter.serializeOptions(opts), - }, - }); + process.send(response, null, {}, ignoreError); - this.ackRequests.set(requestId, { - type: EventType.BROADCAST, - clientCountCallback, - ack, - }); - - // we have no way to know at this level whether the server has received an acknowledgement from each client, so we - // will simply clean up the ackRequests map after the given delay - setTimeout(() => { - this.ackRequests.delete(requestId); - }, opts.flags!.timeout); - } - - // packets with binary contents are modified by the broadcast method, hence the nextTick() - process.nextTick(() => { - super.broadcastWithAck(packet, opts, clientCountCallback, ack); - }); - } - - public serverCount(): Promise { - return Promise.resolve(1 + this.workerIds.size); - } - - addSockets(opts: BroadcastOptions, rooms: Room[]) { - super.addSockets(opts, rooms); - - const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; - } - - this.publish({ - type: EventType.SOCKETS_JOIN, - data: { - opts: ClusterAdapter.serializeOptions(opts), - rooms, - }, - }); - } - - delSockets(opts: BroadcastOptions, rooms: Room[]) { - super.delSockets(opts, rooms); - - const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; - } - - this.publish({ - type: EventType.SOCKETS_LEAVE, - data: { - opts: ClusterAdapter.serializeOptions(opts), - rooms, - }, - }); - } - - disconnectSockets(opts: BroadcastOptions, close: boolean) { - super.disconnectSockets(opts, close); - - const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; - } - - this.publish({ - type: EventType.DISCONNECT_SOCKETS, - data: { - opts: ClusterAdapter.serializeOptions(opts), - close, - }, - }); - } - - private getExpectedResponseCount() { - return this.workerIds.size; - } - - async fetchSockets(opts: BroadcastOptions): Promise { - const localSockets = await super.fetchSockets(opts); - const expectedResponseCount = this.getExpectedResponseCount(); - - if (opts.flags?.local || expectedResponseCount === 0) { - return localSockets; - } - - const requestId = randomId(); - - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - const storedRequest = this.requests.get(requestId); - if (storedRequest) { - reject( - new Error( - `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`, - ), - ); - this.requests.delete(requestId); - } - }, this.requestsTimeout); - - const storedRequest = { - type: EventType.FETCH_SOCKETS, - resolve, - timeout, - current: 0, - expected: expectedResponseCount, - responses: localSockets, - }; - this.requests.set(requestId, storedRequest); - - this.publish({ - type: EventType.FETCH_SOCKETS, - data: { - requestId, - workerId: cluster.worker.id, - opts: ClusterAdapter.serializeOptions(opts), - }, - }); - }); - } - - public serverSideEmit(packet: any[]): void { - const withAck = typeof packet[packet.length - 1] === "function"; - - if (withAck) { - this.serverSideEmitWithAck(packet).catch(() => { - // ignore errors - }); - return; - } - - this.publish({ - type: EventType.SERVER_SIDE_EMIT, - data: { - packet, - }, - }); - } - - private async serverSideEmitWithAck(packet: any[]) { - const ack = packet.pop(); - const expectedResponseCount = this.getExpectedResponseCount(); - - debug( - 'waiting for %d responses to "serverSideEmit" request', - expectedResponseCount, - ); - - if (expectedResponseCount <= 0) { - return ack(null, []); - } - - const requestId = randomId(); - - const timeout = setTimeout(() => { - const storedRequest = this.requests.get(requestId); - if (storedRequest) { - ack( - new Error( - `timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`, - ), - storedRequest.responses, - ); - this.requests.delete(requestId); - } - }, this.requestsTimeout); - - const storedRequest = { - type: EventType.FETCH_SOCKETS, - resolve: ack, - timeout, - current: 0, - expected: expectedResponseCount, - responses: [], - }; - this.requests.set(requestId, storedRequest); - - this.publish({ - type: EventType.SERVER_SIDE_EMIT, - data: { - requestId, // the presence of this attribute defines whether an acknowledgement is needed - workerId: cluster.worker.id, - packet, - }, - }); + return Promise.resolve(); } } +const UIDS = Symbol("uids"); + export function setupPrimary() { cluster.on("message", (worker, message) => { const isValidSource = message?.source === MESSAGE_SOURCE; @@ -555,17 +82,26 @@ export function setupPrimary() { return; } + // store the requester's uids (one per namespace) so that the response can be sent specifically to them + worker[UIDS] = worker[UIDS] || new Set(); + worker[UIDS].add(message.uid); + switch (message.type) { - case EventType.FETCH_SOCKETS_RESPONSE: - case EventType.SERVER_SIDE_EMIT_RESPONSE: - const workerId = message.data.workerId; - // emit back to the requester - if (hasOwnProperty.call(cluster.workers, workerId)) { - cluster.workers[workerId].send(message, null, ignoreError); + case MessageType.FETCH_SOCKETS_RESPONSE: + case MessageType.SERVER_SIDE_EMIT_RESPONSE: + const requesterUid = message.requesterUid; + for (const workerId in cluster.workers) { + if ( + hasOwnProperty.call(cluster.workers, workerId) && + cluster.workers[workerId][UIDS]?.has(requesterUid) + ) { + cluster.workers[workerId].send(message, null, ignoreError); + break; + } } break; default: - const emitterIdAsString = "" + worker.id; + const emitterIdAsString = String(worker.id); // emit to all workers but the requester for (const workerId in cluster.workers) { if ( @@ -577,21 +113,4 @@ export function setupPrimary() { } } }); - - cluster.on("exit", (worker) => { - // notify all active workers - for (const workerId in cluster.workers) { - if (hasOwnProperty.call(cluster.workers, workerId)) { - cluster.workers[workerId].send( - { - source: MESSAGE_SOURCE, - type: EventType.WORKER_EXIT, - data: worker.id, - }, - null, - ignoreError, - ); - } - } - }); } diff --git a/package-lock.json b/package-lock.json index a539227f..8e018981 100644 --- a/package-lock.json +++ b/package-lock.json @@ -28,7 +28,7 @@ "node": ">=10.0.0" }, "peerDependencies": { - "socket.io-adapter": "^2.4.0" + "socket.io-adapter": "~2.5.5" } }, "node_modules/@ampproject/remapping": { @@ -2838,11 +2838,34 @@ } }, "node_modules/socket.io-adapter": { - "version": "2.5.2", - "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz", - "integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==", + "version": "2.5.5", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.5.tgz", + "integrity": "sha512-eLDQas5dzPgOWCk9GuuJC2lBqItuhKI4uxGgo9aIV7MYbk2h9Q6uULEh8WBzThoI7l+qU9Ast9fVUmkqPP9wYg==", + "license": "MIT", "dependencies": { - "ws": "~8.11.0" + "debug": "~4.3.4", + "ws": "~8.17.1" + } + }, + "node_modules/socket.io-adapter/node_modules/ws": { + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } } }, "node_modules/socket.io-client": { @@ -3194,6 +3217,7 @@ "version": "8.11.0", "resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz", "integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==", + "dev": true, "engines": { "node": ">=10.0.0" }, diff --git a/package.json b/package.json index df945fd0..9b7daea3 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "debug": "~4.3.1" }, "peerDependencies": { - "socket.io-adapter": "^2.4.0" + "socket.io-adapter": "~2.5.5" }, "devDependencies": { "@types/expect.js": "^0.3.29", diff --git a/test/index.ts b/test/index.ts index 720f3242..205bde02 100644 --- a/test/index.ts +++ b/test/index.ts @@ -349,7 +349,9 @@ describe("@socket.io/cluster-adapter", () => { }); }); - it("sends an event but timeout if one server does not respond", (done) => { + it("sends an event but timeout if one server does not respond", function (done) { + this.timeout(6000); // currently not possible to configure the timeout delay + workers[0].send( "sends an event but timeout if one server does not respond (1)", ); diff --git a/test/worker.js b/test/worker.js index d6a2c50a..ac9c447a 100644 --- a/test/worker.js +++ b/test/worker.js @@ -150,11 +150,9 @@ process.on("message", async (msg) => { }); break; case "sends an event but timeout if one server does not respond (1)": - io.of("/").adapter.requestsTimeout = 200; - io.serverSideEmit("hello with ack", (err, response) => { expect(err.message).to.be( - "timeout reached: only 1 responses received out of 2" + "timeout reached: missing 1 responses" ); expect(response).to.be.an(Array); expect(response).to.contain(2);