mirror of
https://github.com/socketio/socket.io.git
synced 2026-01-09 15:08:12 -05:00
refactor: use the ClusterAdapter class from socket.io-adapter package
The ClusterAdapter class has been moved to [1], so that this adapter
only needs to implement to pub/sub mechanism.
Also, [2] should reduce the number of "timeout reached: only x
responses received out of y" errors, since the fetchSockets() requests
will now succeed even if a server leaves the cluster.
[1]: https://github.com/socketio/socket.io/tree/main/packages/socket.io-adapter
[2]: 0e23ff0cc6
This commit is contained in:
599
lib/index.ts
599
lib/index.ts
@@ -1,553 +1,80 @@
|
|||||||
import cluster from "node:cluster";
|
import cluster from "node:cluster";
|
||||||
import { Adapter, BroadcastOptions, Room } from "socket.io-adapter";
|
import {
|
||||||
import { randomBytes } from "node:crypto";
|
ClusterAdapterWithHeartbeat,
|
||||||
|
ClusterAdapterOptions,
|
||||||
const randomId = () => randomBytes(8).toString("hex");
|
ClusterMessage,
|
||||||
const debug = require("debug")("socket.io-cluster-adapter");
|
ServerId,
|
||||||
|
ClusterResponse,
|
||||||
|
MessageType,
|
||||||
|
} from "socket.io-adapter";
|
||||||
|
import debugModule from "debug";
|
||||||
|
|
||||||
|
const debug = debugModule("socket.io-cluster-adapter");
|
||||||
const MESSAGE_SOURCE = "_sio_adapter";
|
const MESSAGE_SOURCE = "_sio_adapter";
|
||||||
const hasOwnProperty = Object.prototype.hasOwnProperty;
|
const hasOwnProperty = Object.prototype.hasOwnProperty;
|
||||||
|
|
||||||
/**
|
|
||||||
* Event types, for messages between nodes
|
|
||||||
*/
|
|
||||||
|
|
||||||
enum EventType {
|
|
||||||
WORKER_INIT = 1,
|
|
||||||
WORKER_PING,
|
|
||||||
WORKER_EXIT,
|
|
||||||
BROADCAST,
|
|
||||||
SOCKETS_JOIN,
|
|
||||||
SOCKETS_LEAVE,
|
|
||||||
DISCONNECT_SOCKETS,
|
|
||||||
FETCH_SOCKETS,
|
|
||||||
FETCH_SOCKETS_RESPONSE,
|
|
||||||
SERVER_SIDE_EMIT,
|
|
||||||
SERVER_SIDE_EMIT_RESPONSE,
|
|
||||||
BROADCAST_CLIENT_COUNT,
|
|
||||||
BROADCAST_ACK,
|
|
||||||
}
|
|
||||||
|
|
||||||
interface Request {
|
|
||||||
type: EventType;
|
|
||||||
resolve: Function;
|
|
||||||
timeout: NodeJS.Timeout;
|
|
||||||
expected: number;
|
|
||||||
current: number;
|
|
||||||
responses: any[];
|
|
||||||
}
|
|
||||||
|
|
||||||
interface AckRequest {
|
|
||||||
type: EventType.BROADCAST;
|
|
||||||
clientCountCallback: (clientCount: number) => void;
|
|
||||||
ack: (...args: any[]) => void;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ClusterAdapterOptions {
|
|
||||||
/**
|
|
||||||
* after this timeout the adapter will stop waiting from responses to request
|
|
||||||
* @default 5000
|
|
||||||
*/
|
|
||||||
requestsTimeout: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
function ignoreError() {}
|
function ignoreError() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a function that will create a ClusterAdapter instance.
|
* Returns a function that will create a NodeClusterAdapter instance.
|
||||||
*
|
*
|
||||||
* @param opts - additional options
|
* @param opts - additional options
|
||||||
*
|
*
|
||||||
* @public
|
* @public
|
||||||
|
* @see https://nodejs.org/api/cluster.html
|
||||||
*/
|
*/
|
||||||
export function createAdapter(opts: Partial<ClusterAdapterOptions> = {}) {
|
export function createAdapter(opts: Partial<ClusterAdapterOptions> = {}) {
|
||||||
return function (nsp) {
|
return function (nsp: any) {
|
||||||
return new ClusterAdapter(nsp, opts);
|
return new NodeClusterAdapter(nsp, opts);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export class ClusterAdapter extends Adapter {
|
export class NodeClusterAdapter extends ClusterAdapterWithHeartbeat {
|
||||||
public requestsTimeout: number;
|
constructor(nsp: any, opts: ClusterAdapterOptions = {}) {
|
||||||
|
super(nsp, opts);
|
||||||
|
process.on("message", (message: any) => {
|
||||||
|
const isValidSource = message?.source === MESSAGE_SOURCE;
|
||||||
|
if (!isValidSource) {
|
||||||
|
debug("[%s] ignore unknown source", this.uid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
private workerIds: Set<number> = new Set();
|
// note: this check should be done in the onMessage() handler
|
||||||
private requests: Map<string, Request> = new Map();
|
if (message.nsp !== this.nsp.name) {
|
||||||
private ackRequests: Map<string, AckRequest> = new Map();
|
debug("[%s] ignore other namespace", this.uid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
this.onMessage(message);
|
||||||
* Adapter constructor.
|
|
||||||
*
|
|
||||||
* @param nsp - the namespace
|
|
||||||
* @param opts - additional options
|
|
||||||
*
|
|
||||||
* @public
|
|
||||||
*/
|
|
||||||
constructor(nsp: any, opts: Partial<ClusterAdapterOptions> = {}) {
|
|
||||||
super(nsp);
|
|
||||||
this.requestsTimeout = opts.requestsTimeout || 5000;
|
|
||||||
|
|
||||||
this.publish({
|
|
||||||
type: EventType.WORKER_INIT,
|
|
||||||
data: cluster.worker.id,
|
|
||||||
});
|
});
|
||||||
|
|
||||||
process.on("message", this.onMessage.bind(this));
|
// until https://github.com/socketio/socket.io/commit/f3e1f5ebdf59158d0c8d1e20f8230275617fb355 is released
|
||||||
|
this.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
public async onMessage(message: any) {
|
protected override doPublish(message: ClusterMessage & { source: string }) {
|
||||||
const isValidSource = message?.source === MESSAGE_SOURCE;
|
|
||||||
if (!isValidSource) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (message.type === EventType.WORKER_EXIT) {
|
|
||||||
this.workerIds.delete(message.data);
|
|
||||||
debug("workers count is now %d", this.workerIds.size);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (message.nsp !== this.nsp.name) {
|
|
||||||
debug("ignore other namespace");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (message.type) {
|
|
||||||
case EventType.WORKER_INIT:
|
|
||||||
this.workerIds.add(message.data);
|
|
||||||
debug("workers count is now %d", this.workerIds.size);
|
|
||||||
this.publish({
|
|
||||||
type: EventType.WORKER_PING,
|
|
||||||
data: cluster.worker.id,
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
case EventType.WORKER_PING:
|
|
||||||
this.workerIds.add(message.data);
|
|
||||||
debug("workers count is now %d", this.workerIds.size);
|
|
||||||
break;
|
|
||||||
case EventType.BROADCAST: {
|
|
||||||
debug("broadcast with opts %j", message.data.opts);
|
|
||||||
|
|
||||||
const withAck = message.data.requestId !== undefined;
|
|
||||||
if (withAck) {
|
|
||||||
super.broadcastWithAck(
|
|
||||||
message.data.packet,
|
|
||||||
ClusterAdapter.deserializeOptions(message.data.opts),
|
|
||||||
(clientCount) => {
|
|
||||||
debug("waiting for %d client acknowledgements", clientCount);
|
|
||||||
this.publish({
|
|
||||||
type: EventType.BROADCAST_CLIENT_COUNT,
|
|
||||||
data: {
|
|
||||||
requestId: message.data.requestId,
|
|
||||||
clientCount,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
},
|
|
||||||
(arg) => {
|
|
||||||
debug("received acknowledgement with value %j", arg);
|
|
||||||
this.publish({
|
|
||||||
type: EventType.BROADCAST_ACK,
|
|
||||||
data: {
|
|
||||||
requestId: message.data.requestId,
|
|
||||||
packet: arg,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
},
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
super.broadcast(
|
|
||||||
message.data.packet,
|
|
||||||
ClusterAdapter.deserializeOptions(message.data.opts),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case EventType.BROADCAST_CLIENT_COUNT: {
|
|
||||||
const request = this.ackRequests.get(message.data.requestId);
|
|
||||||
request?.clientCountCallback(message.data.clientCount);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case EventType.BROADCAST_ACK: {
|
|
||||||
const request = this.ackRequests.get(message.data.requestId);
|
|
||||||
request?.ack(message.data.packet);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case EventType.SOCKETS_JOIN: {
|
|
||||||
debug("calling addSockets with opts %j", message.data.opts);
|
|
||||||
super.addSockets(
|
|
||||||
ClusterAdapter.deserializeOptions(message.data.opts),
|
|
||||||
message.data.rooms,
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case EventType.SOCKETS_LEAVE: {
|
|
||||||
debug("calling delSockets with opts %j", message.data.opts);
|
|
||||||
super.delSockets(
|
|
||||||
ClusterAdapter.deserializeOptions(message.data.opts),
|
|
||||||
message.data.rooms,
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case EventType.DISCONNECT_SOCKETS: {
|
|
||||||
debug("calling disconnectSockets with opts %j", message.data.opts);
|
|
||||||
super.disconnectSockets(
|
|
||||||
ClusterAdapter.deserializeOptions(message.data.opts),
|
|
||||||
message.data.close,
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case EventType.FETCH_SOCKETS: {
|
|
||||||
debug("calling fetchSockets with opts %j", message.data.opts);
|
|
||||||
const localSockets = await super.fetchSockets(
|
|
||||||
ClusterAdapter.deserializeOptions(message.data.opts),
|
|
||||||
);
|
|
||||||
|
|
||||||
this.publish({
|
|
||||||
type: EventType.FETCH_SOCKETS_RESPONSE,
|
|
||||||
data: {
|
|
||||||
requestId: message.data.requestId,
|
|
||||||
workerId: message.data.workerId,
|
|
||||||
sockets: localSockets.map((socket) => ({
|
|
||||||
id: socket.id,
|
|
||||||
handshake: socket.handshake,
|
|
||||||
rooms: [...socket.rooms],
|
|
||||||
data: socket.data,
|
|
||||||
})),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case EventType.FETCH_SOCKETS_RESPONSE: {
|
|
||||||
const request = this.requests.get(message.data.requestId);
|
|
||||||
|
|
||||||
if (!request) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
request.current++;
|
|
||||||
message.data.sockets.forEach((socket: any) =>
|
|
||||||
request.responses.push(socket),
|
|
||||||
);
|
|
||||||
|
|
||||||
if (request.current === request.expected) {
|
|
||||||
clearTimeout(request.timeout);
|
|
||||||
request.resolve(request.responses);
|
|
||||||
this.requests.delete(message.data.requestId);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case EventType.SERVER_SIDE_EMIT: {
|
|
||||||
const packet = message.data.packet;
|
|
||||||
const withAck = message.data.requestId !== undefined;
|
|
||||||
if (!withAck) {
|
|
||||||
this.nsp._onServerSideEmit(packet);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let called = false;
|
|
||||||
const callback = (arg: any) => {
|
|
||||||
// only one argument is expected
|
|
||||||
if (called) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
called = true;
|
|
||||||
debug("calling acknowledgement with %j", arg);
|
|
||||||
this.publish({
|
|
||||||
type: EventType.SERVER_SIDE_EMIT_RESPONSE,
|
|
||||||
data: {
|
|
||||||
requestId: message.data.requestId,
|
|
||||||
workerId: message.data.workerId,
|
|
||||||
packet: arg,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
packet.push(callback);
|
|
||||||
this.nsp._onServerSideEmit(packet);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case EventType.SERVER_SIDE_EMIT_RESPONSE: {
|
|
||||||
const request = this.requests.get(message.data.requestId);
|
|
||||||
|
|
||||||
if (!request) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
request.current++;
|
|
||||||
request.responses.push(message.data.packet);
|
|
||||||
|
|
||||||
if (request.current === request.expected) {
|
|
||||||
clearTimeout(request.timeout);
|
|
||||||
request.resolve(null, request.responses);
|
|
||||||
this.requests.delete(message.data.requestId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async publish(message: any) {
|
|
||||||
// to be able to ignore unrelated messages on the cluster message bus
|
|
||||||
message.source = MESSAGE_SOURCE;
|
message.source = MESSAGE_SOURCE;
|
||||||
// to be able to ignore messages from other namespaces
|
|
||||||
message.nsp = this.nsp.name;
|
|
||||||
|
|
||||||
debug(
|
|
||||||
"publish event of type %s for namespace %s",
|
|
||||||
message.type,
|
|
||||||
message.nsp,
|
|
||||||
);
|
|
||||||
|
|
||||||
process.send(message, null, {}, ignoreError);
|
process.send(message, null, {}, ignoreError);
|
||||||
|
|
||||||
|
return Promise.resolve(""); // connection state recovery is not supported
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
protected override doPublishResponse(
|
||||||
* Transform ES6 Set into plain arrays.
|
requesterUid: ServerId,
|
||||||
*
|
response: ClusterResponse & { source: string; requesterUid: string },
|
||||||
* Note: we manually serialize ES6 Sets so that using `serialization: "advanced"` is not needed when using plaintext
|
|
||||||
* packets (reference: https://nodejs.org/api/child_process.html#child_process_advanced_serialization)
|
|
||||||
*/
|
|
||||||
private static serializeOptions(opts: BroadcastOptions) {
|
|
||||||
return {
|
|
||||||
rooms: [...opts.rooms],
|
|
||||||
except: opts.except ? [...opts.except] : [],
|
|
||||||
flags: opts.flags,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private static deserializeOptions(opts: any): BroadcastOptions {
|
|
||||||
return {
|
|
||||||
rooms: new Set(opts.rooms),
|
|
||||||
except: new Set(opts.except),
|
|
||||||
flags: opts.flags,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
public broadcast(packet: any, opts: BroadcastOptions) {
|
|
||||||
const onlyLocal = opts?.flags?.local;
|
|
||||||
if (!onlyLocal) {
|
|
||||||
this.publish({
|
|
||||||
type: EventType.BROADCAST,
|
|
||||||
data: {
|
|
||||||
packet,
|
|
||||||
opts: ClusterAdapter.serializeOptions(opts),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// packets with binary contents are modified by the broadcast method, hence the nextTick()
|
|
||||||
process.nextTick(() => {
|
|
||||||
super.broadcast(packet, opts);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public broadcastWithAck(
|
|
||||||
packet: any,
|
|
||||||
opts: BroadcastOptions,
|
|
||||||
clientCountCallback: (clientCount: number) => void,
|
|
||||||
ack: (...args: any[]) => void,
|
|
||||||
) {
|
) {
|
||||||
const onlyLocal = opts?.flags?.local;
|
response.source = MESSAGE_SOURCE;
|
||||||
if (!onlyLocal) {
|
response.requesterUid = requesterUid;
|
||||||
const requestId = randomId();
|
|
||||||
|
|
||||||
this.publish({
|
process.send(response, null, {}, ignoreError);
|
||||||
type: EventType.BROADCAST,
|
|
||||||
data: {
|
|
||||||
packet,
|
|
||||||
requestId,
|
|
||||||
opts: ClusterAdapter.serializeOptions(opts),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
this.ackRequests.set(requestId, {
|
return Promise.resolve();
|
||||||
type: EventType.BROADCAST,
|
|
||||||
clientCountCallback,
|
|
||||||
ack,
|
|
||||||
});
|
|
||||||
|
|
||||||
// we have no way to know at this level whether the server has received an acknowledgement from each client, so we
|
|
||||||
// will simply clean up the ackRequests map after the given delay
|
|
||||||
setTimeout(() => {
|
|
||||||
this.ackRequests.delete(requestId);
|
|
||||||
}, opts.flags!.timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
// packets with binary contents are modified by the broadcast method, hence the nextTick()
|
|
||||||
process.nextTick(() => {
|
|
||||||
super.broadcastWithAck(packet, opts, clientCountCallback, ack);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public serverCount(): Promise<number> {
|
|
||||||
return Promise.resolve(1 + this.workerIds.size);
|
|
||||||
}
|
|
||||||
|
|
||||||
addSockets(opts: BroadcastOptions, rooms: Room[]) {
|
|
||||||
super.addSockets(opts, rooms);
|
|
||||||
|
|
||||||
const onlyLocal = opts.flags?.local;
|
|
||||||
if (onlyLocal) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.publish({
|
|
||||||
type: EventType.SOCKETS_JOIN,
|
|
||||||
data: {
|
|
||||||
opts: ClusterAdapter.serializeOptions(opts),
|
|
||||||
rooms,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
delSockets(opts: BroadcastOptions, rooms: Room[]) {
|
|
||||||
super.delSockets(opts, rooms);
|
|
||||||
|
|
||||||
const onlyLocal = opts.flags?.local;
|
|
||||||
if (onlyLocal) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.publish({
|
|
||||||
type: EventType.SOCKETS_LEAVE,
|
|
||||||
data: {
|
|
||||||
opts: ClusterAdapter.serializeOptions(opts),
|
|
||||||
rooms,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
disconnectSockets(opts: BroadcastOptions, close: boolean) {
|
|
||||||
super.disconnectSockets(opts, close);
|
|
||||||
|
|
||||||
const onlyLocal = opts.flags?.local;
|
|
||||||
if (onlyLocal) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.publish({
|
|
||||||
type: EventType.DISCONNECT_SOCKETS,
|
|
||||||
data: {
|
|
||||||
opts: ClusterAdapter.serializeOptions(opts),
|
|
||||||
close,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private getExpectedResponseCount() {
|
|
||||||
return this.workerIds.size;
|
|
||||||
}
|
|
||||||
|
|
||||||
async fetchSockets(opts: BroadcastOptions): Promise<any[]> {
|
|
||||||
const localSockets = await super.fetchSockets(opts);
|
|
||||||
const expectedResponseCount = this.getExpectedResponseCount();
|
|
||||||
|
|
||||||
if (opts.flags?.local || expectedResponseCount === 0) {
|
|
||||||
return localSockets;
|
|
||||||
}
|
|
||||||
|
|
||||||
const requestId = randomId();
|
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
const timeout = setTimeout(() => {
|
|
||||||
const storedRequest = this.requests.get(requestId);
|
|
||||||
if (storedRequest) {
|
|
||||||
reject(
|
|
||||||
new Error(
|
|
||||||
`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
this.requests.delete(requestId);
|
|
||||||
}
|
|
||||||
}, this.requestsTimeout);
|
|
||||||
|
|
||||||
const storedRequest = {
|
|
||||||
type: EventType.FETCH_SOCKETS,
|
|
||||||
resolve,
|
|
||||||
timeout,
|
|
||||||
current: 0,
|
|
||||||
expected: expectedResponseCount,
|
|
||||||
responses: localSockets,
|
|
||||||
};
|
|
||||||
this.requests.set(requestId, storedRequest);
|
|
||||||
|
|
||||||
this.publish({
|
|
||||||
type: EventType.FETCH_SOCKETS,
|
|
||||||
data: {
|
|
||||||
requestId,
|
|
||||||
workerId: cluster.worker.id,
|
|
||||||
opts: ClusterAdapter.serializeOptions(opts),
|
|
||||||
},
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public serverSideEmit(packet: any[]): void {
|
|
||||||
const withAck = typeof packet[packet.length - 1] === "function";
|
|
||||||
|
|
||||||
if (withAck) {
|
|
||||||
this.serverSideEmitWithAck(packet).catch(() => {
|
|
||||||
// ignore errors
|
|
||||||
});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.publish({
|
|
||||||
type: EventType.SERVER_SIDE_EMIT,
|
|
||||||
data: {
|
|
||||||
packet,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private async serverSideEmitWithAck(packet: any[]) {
|
|
||||||
const ack = packet.pop();
|
|
||||||
const expectedResponseCount = this.getExpectedResponseCount();
|
|
||||||
|
|
||||||
debug(
|
|
||||||
'waiting for %d responses to "serverSideEmit" request',
|
|
||||||
expectedResponseCount,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (expectedResponseCount <= 0) {
|
|
||||||
return ack(null, []);
|
|
||||||
}
|
|
||||||
|
|
||||||
const requestId = randomId();
|
|
||||||
|
|
||||||
const timeout = setTimeout(() => {
|
|
||||||
const storedRequest = this.requests.get(requestId);
|
|
||||||
if (storedRequest) {
|
|
||||||
ack(
|
|
||||||
new Error(
|
|
||||||
`timeout reached: only ${storedRequest.current} responses received out of ${storedRequest.expected}`,
|
|
||||||
),
|
|
||||||
storedRequest.responses,
|
|
||||||
);
|
|
||||||
this.requests.delete(requestId);
|
|
||||||
}
|
|
||||||
}, this.requestsTimeout);
|
|
||||||
|
|
||||||
const storedRequest = {
|
|
||||||
type: EventType.FETCH_SOCKETS,
|
|
||||||
resolve: ack,
|
|
||||||
timeout,
|
|
||||||
current: 0,
|
|
||||||
expected: expectedResponseCount,
|
|
||||||
responses: [],
|
|
||||||
};
|
|
||||||
this.requests.set(requestId, storedRequest);
|
|
||||||
|
|
||||||
this.publish({
|
|
||||||
type: EventType.SERVER_SIDE_EMIT,
|
|
||||||
data: {
|
|
||||||
requestId, // the presence of this attribute defines whether an acknowledgement is needed
|
|
||||||
workerId: cluster.worker.id,
|
|
||||||
packet,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const UIDS = Symbol("uids");
|
||||||
|
|
||||||
export function setupPrimary() {
|
export function setupPrimary() {
|
||||||
cluster.on("message", (worker, message) => {
|
cluster.on("message", (worker, message) => {
|
||||||
const isValidSource = message?.source === MESSAGE_SOURCE;
|
const isValidSource = message?.source === MESSAGE_SOURCE;
|
||||||
@@ -555,17 +82,26 @@ export function setupPrimary() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// store the requester's uids (one per namespace) so that the response can be sent specifically to them
|
||||||
|
worker[UIDS] = worker[UIDS] || new Set();
|
||||||
|
worker[UIDS].add(message.uid);
|
||||||
|
|
||||||
switch (message.type) {
|
switch (message.type) {
|
||||||
case EventType.FETCH_SOCKETS_RESPONSE:
|
case MessageType.FETCH_SOCKETS_RESPONSE:
|
||||||
case EventType.SERVER_SIDE_EMIT_RESPONSE:
|
case MessageType.SERVER_SIDE_EMIT_RESPONSE:
|
||||||
const workerId = message.data.workerId;
|
const requesterUid = message.requesterUid;
|
||||||
// emit back to the requester
|
for (const workerId in cluster.workers) {
|
||||||
if (hasOwnProperty.call(cluster.workers, workerId)) {
|
if (
|
||||||
cluster.workers[workerId].send(message, null, ignoreError);
|
hasOwnProperty.call(cluster.workers, workerId) &&
|
||||||
|
cluster.workers[workerId][UIDS]?.has(requesterUid)
|
||||||
|
) {
|
||||||
|
cluster.workers[workerId].send(message, null, ignoreError);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
const emitterIdAsString = "" + worker.id;
|
const emitterIdAsString = String(worker.id);
|
||||||
// emit to all workers but the requester
|
// emit to all workers but the requester
|
||||||
for (const workerId in cluster.workers) {
|
for (const workerId in cluster.workers) {
|
||||||
if (
|
if (
|
||||||
@@ -577,21 +113,4 @@ export function setupPrimary() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
cluster.on("exit", (worker) => {
|
|
||||||
// notify all active workers
|
|
||||||
for (const workerId in cluster.workers) {
|
|
||||||
if (hasOwnProperty.call(cluster.workers, workerId)) {
|
|
||||||
cluster.workers[workerId].send(
|
|
||||||
{
|
|
||||||
source: MESSAGE_SOURCE,
|
|
||||||
type: EventType.WORKER_EXIT,
|
|
||||||
data: worker.id,
|
|
||||||
},
|
|
||||||
null,
|
|
||||||
ignoreError,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|||||||
34
package-lock.json
generated
34
package-lock.json
generated
@@ -28,7 +28,7 @@
|
|||||||
"node": ">=10.0.0"
|
"node": ">=10.0.0"
|
||||||
},
|
},
|
||||||
"peerDependencies": {
|
"peerDependencies": {
|
||||||
"socket.io-adapter": "^2.4.0"
|
"socket.io-adapter": "~2.5.5"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/@ampproject/remapping": {
|
"node_modules/@ampproject/remapping": {
|
||||||
@@ -2838,11 +2838,34 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/socket.io-adapter": {
|
"node_modules/socket.io-adapter": {
|
||||||
"version": "2.5.2",
|
"version": "2.5.5",
|
||||||
"resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.2.tgz",
|
"resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.5.tgz",
|
||||||
"integrity": "sha512-87C3LO/NOMc+eMcpcxUBebGjkpMDkNBS9tf7KJqcDsmL936EChtVva71Dw2q4tQcuVC+hAUy4an2NO/sYXmwRA==",
|
"integrity": "sha512-eLDQas5dzPgOWCk9GuuJC2lBqItuhKI4uxGgo9aIV7MYbk2h9Q6uULEh8WBzThoI7l+qU9Ast9fVUmkqPP9wYg==",
|
||||||
|
"license": "MIT",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"ws": "~8.11.0"
|
"debug": "~4.3.4",
|
||||||
|
"ws": "~8.17.1"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"node_modules/socket.io-adapter/node_modules/ws": {
|
||||||
|
"version": "8.17.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz",
|
||||||
|
"integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==",
|
||||||
|
"license": "MIT",
|
||||||
|
"engines": {
|
||||||
|
"node": ">=10.0.0"
|
||||||
|
},
|
||||||
|
"peerDependencies": {
|
||||||
|
"bufferutil": "^4.0.1",
|
||||||
|
"utf-8-validate": ">=5.0.2"
|
||||||
|
},
|
||||||
|
"peerDependenciesMeta": {
|
||||||
|
"bufferutil": {
|
||||||
|
"optional": true
|
||||||
|
},
|
||||||
|
"utf-8-validate": {
|
||||||
|
"optional": true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"node_modules/socket.io-client": {
|
"node_modules/socket.io-client": {
|
||||||
@@ -3194,6 +3217,7 @@
|
|||||||
"version": "8.11.0",
|
"version": "8.11.0",
|
||||||
"resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz",
|
"resolved": "https://registry.npmjs.org/ws/-/ws-8.11.0.tgz",
|
||||||
"integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==",
|
"integrity": "sha512-HPG3wQd9sNQoT9xHyNCXoDUa+Xw/VevmY9FoHyQ+g+rrMn4j6FB4np7Z0OhdTgjx6MgQLK7jwSy1YecU1+4Asg==",
|
||||||
|
"dev": true,
|
||||||
"engines": {
|
"engines": {
|
||||||
"node": ">=10.0.0"
|
"node": ">=10.0.0"
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -22,7 +22,7 @@
|
|||||||
"debug": "~4.3.1"
|
"debug": "~4.3.1"
|
||||||
},
|
},
|
||||||
"peerDependencies": {
|
"peerDependencies": {
|
||||||
"socket.io-adapter": "^2.4.0"
|
"socket.io-adapter": "~2.5.5"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@types/expect.js": "^0.3.29",
|
"@types/expect.js": "^0.3.29",
|
||||||
|
|||||||
@@ -349,7 +349,9 @@ describe("@socket.io/cluster-adapter", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it("sends an event but timeout if one server does not respond", (done) => {
|
it("sends an event but timeout if one server does not respond", function (done) {
|
||||||
|
this.timeout(6000); // currently not possible to configure the timeout delay
|
||||||
|
|
||||||
workers[0].send(
|
workers[0].send(
|
||||||
"sends an event but timeout if one server does not respond (1)",
|
"sends an event but timeout if one server does not respond (1)",
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -150,11 +150,9 @@ process.on("message", async (msg) => {
|
|||||||
});
|
});
|
||||||
break;
|
break;
|
||||||
case "sends an event but timeout if one server does not respond (1)":
|
case "sends an event but timeout if one server does not respond (1)":
|
||||||
io.of("/").adapter.requestsTimeout = 200;
|
|
||||||
|
|
||||||
io.serverSideEmit("hello with ack", (err, response) => {
|
io.serverSideEmit("hello with ack", (err, response) => {
|
||||||
expect(err.message).to.be(
|
expect(err.message).to.be(
|
||||||
"timeout reached: only 1 responses received out of 2"
|
"timeout reached: missing 1 responses"
|
||||||
);
|
);
|
||||||
expect(response).to.be.an(Array);
|
expect(response).to.be.an(Array);
|
||||||
expect(response).to.contain(2);
|
expect(response).to.contain(2);
|
||||||
|
|||||||
Reference in New Issue
Block a user