Compare commits

...

5 Commits
4.0.2 ... 4.1.0

Author SHA1 Message Date
Damien Arrachequesne
fb6b0efec9 chore(release): 4.1.0
Diff: https://github.com/socketio/socket.io/compare/4.0.2...4.1.0
2021-05-11 09:27:52 +02:00
Damien Arrachequesne
95d9e4a42f test: fix randomly failing test 2021-05-11 00:06:03 +02:00
Damien Arrachequesne
499c89250d feat: notify upon namespace creation
A "new_namespace" event will be emitted when a new namespace is created:

```js
io.on("new_namespace", (namespace) => {
  // ...
});
```

This could be used for example for registering the same middleware for
each namespace.

See https://github.com/socketio/socket.io/issues/3851
2021-05-11 00:09:18 +02:00
Damien Arrachequesne
93cce05fb3 feat: add support for inter-server communication
Syntax:

```js
// server A
io.serverSideEmit("hello", "world");

// server B
io.on("hello", (arg) => {
  console.log(arg); // prints "world"
});
```

With acknowledgements:

```js
// server A
io.serverSideEmit("hello", "world", (err, responses) => {
  console.log(responses); // prints ["hi"]
});

// server B
io.on("hello", (arg, callback) => {
  callback("hi");
});
```

This feature replaces the customHook/customRequest API from the Redis
adapter: https://github.com/socketio/socket.io-redis/issues/370
2021-05-11 00:07:20 +02:00
Damien Arrachequesne
dc381b72c6 perf: add support for the "wsPreEncoded" writing option
Packets that are sent to multiple clients will now be pre-encoded for
the WebSocket transport (which means simply prepending "4" - which is
the "message" packet type in Engine.IO).

Note: buffers are not pre-encoded, since they are sent without
modification over the WebSocket connection

See also: 7706b123df

engine.io diff: https://github.com/socketio/engine.io/compare/5.0.0...5.1.0
2021-05-11 00:06:03 +02:00
16 changed files with 518 additions and 276 deletions

View File

@@ -1,3 +1,19 @@
# [4.1.0](https://github.com/socketio/socket.io/compare/4.0.2...4.1.0) (2021-05-11)
### Features
* add support for inter-server communication ([93cce05](https://github.com/socketio/socket.io/commit/93cce05fb3faf91f21fa71212275c776aa161107))
* notify upon namespace creation ([499c892](https://github.com/socketio/socket.io/commit/499c89250d2db1ab7725ab2b74840e188c267c46))
* add a "connection_error" event ([7096e98](https://github.com/socketio/engine.io/commit/7096e98a02295a62c8ea2aa56461d4875887092d), from `engine.io`)
* add the "initial_headers" and "headers" events ([2527543](https://github.com/socketio/engine.io/commit/252754353a0e88eb036ebb3082e9d6a9a5f497db), from `engine.io`)
### Performance Improvements
* add support for the "wsPreEncoded" writing option ([dc381b7](https://github.com/socketio/socket.io/commit/dc381b72c6b2f8172001dedd84116122e4cc95b3))
## [4.0.2](https://github.com/socketio/socket.io/compare/4.0.1...4.0.2) (2021-05-06)

View File

@@ -1,5 +1,5 @@
/*!
* Socket.IO v4.0.2
* Socket.IO v4.1.0
* (c) 2014-2021 Guillermo Rauch
* Released under the MIT License.
*/
@@ -2571,7 +2571,8 @@ var Socket = /*#__PURE__*/function (_Emitter) {
perMessageDeflate: {
threshold: 1024
},
transportOptions: {}
transportOptions: {},
closeOnBeforeunload: true
}, opts);
_this.opts.path = _this.opts.path.replace(/\/$/, "") + "/";
@@ -2588,14 +2589,19 @@ var Socket = /*#__PURE__*/function (_Emitter) {
_this.pingTimeoutTimer = null;
if (typeof addEventListener === "function") {
addEventListener("beforeunload", function () {
if (_this.transport) {
// silently close the transport
_this.transport.removeAllListeners();
if (_this.opts.closeOnBeforeunload) {
// Firefox closes the connection when the "beforeunload" event is emitted but not Chrome. This event listener
// ensures every browser behaves the same (no "disconnect" event at the Socket.IO level when the page is
// closed/reloaded)
addEventListener("beforeunload", function () {
if (_this.transport) {
// silently close the transport
_this.transport.removeAllListeners();
_this.transport.close();
}
}, false);
_this.transport.close();
}
}, false);
}
if (_this.hostname !== "localhost") {
_this.offlineEventListener = function () {
@@ -2651,15 +2657,16 @@ var Socket = /*#__PURE__*/function (_Emitter) {
}, {
key: "open",
value: function open() {
var _this2 = this;
var transport;
if (this.opts.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf("websocket") !== -1) {
transport = "websocket";
} else if (0 === this.transports.length) {
// Emit error on next tick so it can be listened to
var self = this;
setTimeout(function () {
self.emit("error", "No transports available");
_this2.emit("error", "No transports available");
}, 0);
return;
} else {
@@ -2689,8 +2696,9 @@ var Socket = /*#__PURE__*/function (_Emitter) {
}, {
key: "setTransport",
value: function setTransport(transport) {
var _this3 = this;
debug("setting transport %s", transport.name);
var self = this;
if (this.transport) {
debug("clearing existing transport %s", this.transport.name);
@@ -2700,14 +2708,8 @@ var Socket = /*#__PURE__*/function (_Emitter) {
this.transport = transport; // set up transport listeners
transport.on("drain", function () {
self.onDrain();
}).on("packet", function (packet) {
self.onPacket(packet);
}).on("error", function (e) {
self.onError(e);
}).on("close", function () {
self.onClose("transport close");
transport.on("drain", this.onDrain.bind(this)).on("packet", this.onPacket.bind(this)).on("error", this.onError.bind(this)).on("close", function () {
_this3.onClose("transport close");
});
}
/**
@@ -2720,20 +2722,16 @@ var Socket = /*#__PURE__*/function (_Emitter) {
}, {
key: "probe",
value: function probe(name) {
var _this4 = this;
debug('probing transport "%s"', name);
var transport = this.createTransport(name, {
probe: 1
});
var failed = false;
var self = this;
Socket.priorWebsocketSuccess = false;
function onTransportOpen() {
if (self.onlyBinaryUpgrades) {
var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
failed = failed || upgradeLosesBinary;
}
var onTransportOpen = function onTransportOpen() {
if (failed) return;
debug('probe transport "%s" opened', name);
transport.send([{
@@ -2745,33 +2743,42 @@ var Socket = /*#__PURE__*/function (_Emitter) {
if ("pong" === msg.type && "probe" === msg.data) {
debug('probe transport "%s" pong', name);
self.upgrading = true;
self.emit("upgrading", transport);
_this4.upgrading = true;
_this4.emit("upgrading", transport);
if (!transport) return;
Socket.priorWebsocketSuccess = "websocket" === transport.name;
debug('pausing current transport "%s"', self.transport.name);
self.transport.pause(function () {
debug('pausing current transport "%s"', _this4.transport.name);
_this4.transport.pause(function () {
if (failed) return;
if ("closed" === self.readyState) return;
if ("closed" === _this4.readyState) return;
debug("changing transport and sending upgrade packet");
cleanup();
self.setTransport(transport);
_this4.setTransport(transport);
transport.send([{
type: "upgrade"
}]);
self.emit("upgrade", transport);
_this4.emit("upgrade", transport);
transport = null;
self.upgrading = false;
self.flush();
_this4.upgrading = false;
_this4.flush();
});
} else {
debug('probe transport "%s" failed', name);
var err = new Error("probe error");
err.transport = transport.name;
self.emit("upgradeError", err);
_this4.emit("upgradeError", err);
}
});
}
};
function freezeTransport() {
if (failed) return; // Any callback called by transport should be ignored since now
@@ -2783,13 +2790,14 @@ var Socket = /*#__PURE__*/function (_Emitter) {
} // Handle any error that happens while probing
function onerror(err) {
var onerror = function onerror(err) {
var error = new Error("probe error: " + err);
error.transport = transport.name;
freezeTransport();
debug('probe transport "%s" failed because of error: %s', name, err);
self.emit("upgradeError", error);
}
_this4.emit("upgradeError", error);
};
function onTransportClose() {
onerror("transport closed");
@@ -2809,13 +2817,15 @@ var Socket = /*#__PURE__*/function (_Emitter) {
} // Remove all listeners on the transport and on self
function cleanup() {
var cleanup = function cleanup() {
transport.removeListener("open", onTransportOpen);
transport.removeListener("error", onerror);
transport.removeListener("close", onTransportClose);
self.removeListener("close", onclose);
self.removeListener("upgrading", onupgrade);
}
_this4.removeListener("close", onclose);
_this4.removeListener("upgrading", onupgrade);
};
transport.once("open", onTransportOpen);
transport.once("error", onerror);
@@ -2921,11 +2931,11 @@ var Socket = /*#__PURE__*/function (_Emitter) {
}, {
key: "resetPingTimeout",
value: function resetPingTimeout() {
var _this2 = this;
var _this5 = this;
clearTimeout(this.pingTimeoutTimer);
this.pingTimeoutTimer = setTimeout(function () {
_this2.onClose("ping timeout");
_this5.onClose("ping timeout");
}, this.pingInterval + this.pingTimeout);
if (this.opts.autoUnref) {
@@ -3041,14 +3051,37 @@ var Socket = /*#__PURE__*/function (_Emitter) {
}, {
key: "close",
value: function close() {
var self = this;
var _this6 = this;
var close = function close() {
_this6.onClose("forced close");
debug("socket closing - telling transport to close");
_this6.transport.close();
};
var cleanupAndClose = function cleanupAndClose() {
_this6.removeListener("upgrade", cleanupAndClose);
_this6.removeListener("upgradeError", cleanupAndClose);
close();
};
var waitForUpgrade = function waitForUpgrade() {
// wait for upgrade to finish since we can't send packets while pausing a transport
_this6.once("upgrade", cleanupAndClose);
_this6.once("upgradeError", cleanupAndClose);
};
if ("opening" === this.readyState || "open" === this.readyState) {
this.readyState = "closing";
if (this.writeBuffer.length) {
this.once("drain", function () {
if (this.upgrading) {
if (_this6.upgrading) {
waitForUpgrade();
} else {
close();
@@ -3061,24 +3094,6 @@ var Socket = /*#__PURE__*/function (_Emitter) {
}
}
function close() {
self.onClose("forced close");
debug("socket closing - telling transport to close");
self.transport.close();
}
function cleanupAndClose() {
self.removeListener("upgrade", cleanupAndClose);
self.removeListener("upgradeError", cleanupAndClose);
close();
}
function waitForUpgrade() {
// wait for upgrade to finish since we can't send packets while pausing a transport
self.once("upgrade", cleanupAndClose);
self.once("upgradeError", cleanupAndClose);
}
return this;
}
/**
@@ -3105,8 +3120,7 @@ var Socket = /*#__PURE__*/function (_Emitter) {
key: "onClose",
value: function onClose(reason, desc) {
if ("opening" === this.readyState || "open" === this.readyState || "closing" === this.readyState) {
debug('socket close with reason: "%s"', reason);
var self = this; // clear timers
debug('socket close with reason: "%s"', reason); // clear timers
clearTimeout(this.pingIntervalTimer);
clearTimeout(this.pingTimeoutTimer); // stop event from firing again for transport
@@ -3129,8 +3143,8 @@ var Socket = /*#__PURE__*/function (_Emitter) {
this.emit("close", reason, desc); // clean buffers after, so users can still
// grab the buffers on `close` event
self.writeBuffer = [];
self.prevBufferLen = 0;
this.writeBuffer = [];
this.prevBufferLen = 0;
}
}
/**
@@ -3494,11 +3508,7 @@ var JSONPPolling = /*#__PURE__*/function (_Polling) {
_this.index = callbacks.length; // add callback to jsonp global
var self = _assertThisInitialized(_this);
callbacks.push(function (msg) {
self.onData(msg);
}); // append to query string
callbacks.push(_this.onData.bind(_assertThisInitialized(_this))); // append to query string
_this.query.j = _this.index;
return _this;
@@ -3542,7 +3552,8 @@ var JSONPPolling = /*#__PURE__*/function (_Polling) {
}, {
key: "doPoll",
value: function doPoll() {
var self = this;
var _this2 = this;
var script = document.createElement("script");
if (this.script) {
@@ -3554,7 +3565,7 @@ var JSONPPolling = /*#__PURE__*/function (_Polling) {
script.src = this.uri();
script.onerror = function (e) {
self.onError("jsonp poll error", e);
_this2.onError("jsonp poll error", e);
};
var insertAt = document.getElementsByTagName("script")[0];
@@ -3587,7 +3598,8 @@ var JSONPPolling = /*#__PURE__*/function (_Polling) {
}, {
key: "doWrite",
value: function doWrite(data, fn) {
var self = this;
var _this3 = this;
var iframe;
if (!this.form) {
@@ -3615,29 +3627,31 @@ var JSONPPolling = /*#__PURE__*/function (_Polling) {
fn();
}
function initIframe() {
if (self.iframe) {
var initIframe = function initIframe() {
if (_this3.iframe) {
try {
self.form.removeChild(self.iframe);
_this3.form.removeChild(_this3.iframe);
} catch (e) {
self.onError("jsonp polling iframe removal error", e);
_this3.onError("jsonp polling iframe removal error", e);
}
}
try {
// ie6 dynamic iframes with target="" support (thanks Chris Lambacher)
var html = '<iframe src="javascript:0" name="' + self.iframeId + '">';
var html = '<iframe src="javascript:0" name="' + _this3.iframeId + '">';
iframe = document.createElement(html);
} catch (e) {
iframe = document.createElement("iframe");
iframe.name = self.iframeId;
iframe.name = _this3.iframeId;
iframe.src = "javascript:0";
}
iframe.id = self.iframeId;
self.form.appendChild(iframe);
self.iframe = iframe;
}
iframe.id = _this3.iframeId;
_this3.form.appendChild(iframe);
_this3.iframe = iframe;
};
initIframe(); // escape \n to prevent it from being converted into \r\n by some UAs
// double escaping is required for escaped new lines because unescaping of new lines can be done safely on server-side
@@ -3651,7 +3665,7 @@ var JSONPPolling = /*#__PURE__*/function (_Polling) {
if (this.iframe.attachEvent) {
this.iframe.onreadystatechange = function () {
if (self.iframe.readyState === "complete") {
if (_this3.iframe.readyState === "complete") {
complete();
}
};
@@ -3800,14 +3814,15 @@ var XHR = /*#__PURE__*/function (_Polling) {
}, {
key: "doWrite",
value: function doWrite(data, fn) {
var _this2 = this;
var req = this.request({
method: "POST",
data: data
});
var self = this;
req.on("success", fn);
req.on("error", function (err) {
self.onError("xhr post error", err);
_this2.onError("xhr post error", err);
});
}
/**
@@ -3819,14 +3834,13 @@ var XHR = /*#__PURE__*/function (_Polling) {
}, {
key: "doPoll",
value: function doPoll() {
var _this3 = this;
debug("xhr poll");
var req = this.request();
var self = this;
req.on("data", function (data) {
self.onData(data);
});
req.on("data", this.onData.bind(this));
req.on("error", function (err) {
self.onError("xhr poll error", err);
_this3.onError("xhr poll error", err);
});
this.pollXhr = req;
}
@@ -3847,20 +3861,20 @@ var Request = /*#__PURE__*/function (_Emitter) {
* @api public
*/
function Request(uri, opts) {
var _this2;
var _this4;
_classCallCheck(this, Request);
_this2 = _super2.call(this);
_this2.opts = opts;
_this2.method = opts.method || "GET";
_this2.uri = uri;
_this2.async = false !== opts.async;
_this2.data = undefined !== opts.data ? opts.data : null;
_this4 = _super2.call(this);
_this4.opts = opts;
_this4.method = opts.method || "GET";
_this4.uri = uri;
_this4.async = false !== opts.async;
_this4.data = undefined !== opts.data ? opts.data : null;
_this2.create();
_this4.create();
return _this2;
return _this4;
}
/**
* Creates the XHR object and sends the request.
@@ -3872,11 +3886,12 @@ var Request = /*#__PURE__*/function (_Emitter) {
_createClass(Request, [{
key: "create",
value: function create() {
var _this5 = this;
var opts = pick(this.opts, "agent", "enablesXDR", "pfx", "key", "passphrase", "cert", "ca", "ciphers", "rejectUnauthorized", "autoUnref");
opts.xdomain = !!this.opts.xd;
opts.xscheme = !!this.opts.xs;
var xhr = this.xhr = new XMLHttpRequest(opts);
var self = this;
try {
debug("xhr open %s: %s", this.method, this.uri);
@@ -3915,23 +3930,23 @@ var Request = /*#__PURE__*/function (_Emitter) {
if (this.hasXDR()) {
xhr.onload = function () {
self.onLoad();
_this5.onLoad();
};
xhr.onerror = function () {
self.onError(xhr.responseText);
_this5.onError(xhr.responseText);
};
} else {
xhr.onreadystatechange = function () {
if (4 !== xhr.readyState) return;
if (200 === xhr.status || 1223 === xhr.status) {
self.onLoad();
_this5.onLoad();
} else {
// make sure the `error` event handler that's user-set
// does not throw in the same tick and gets caught here
setTimeout(function () {
self.onError(typeof xhr.status === "number" ? xhr.status : 0);
_this5.onError(typeof xhr.status === "number" ? xhr.status : 0);
}, 0);
}
};
@@ -3944,7 +3959,7 @@ var Request = /*#__PURE__*/function (_Emitter) {
// and thus the 'error' event can only be only bound *after* this exception
// occurs. Therefore, also, we cannot throw here at all.
setTimeout(function () {
self.onError(e);
_this5.onError(e);
}, 0);
return;
}
@@ -4167,14 +4182,15 @@ var Polling = /*#__PURE__*/function (_Transport) {
}, {
key: "pause",
value: function pause(onPause) {
var self = this;
var _this = this;
this.readyState = "pausing";
function pause() {
var pause = function pause() {
debug("paused");
self.readyState = "paused";
_this.readyState = "paused";
onPause();
}
};
if (this.polling || !this.writable) {
var total = 0;
@@ -4223,23 +4239,25 @@ var Polling = /*#__PURE__*/function (_Transport) {
}, {
key: "onData",
value: function onData(data) {
var self = this;
var _this2 = this;
debug("polling got data %s", data);
var callback = function callback(packet, index, total) {
var callback = function callback(packet) {
// if its the first message we consider the transport open
if ("opening" === self.readyState && packet.type === "open") {
self.onOpen();
if ("opening" === _this2.readyState && packet.type === "open") {
_this2.onOpen();
} // if its a close packet, we close the ongoing requests
if ("close" === packet.type) {
self.onClose();
_this2.onClose();
return false;
} // otherwise bypass onData and handle the message
self.onPacket(packet);
_this2.onPacket(packet);
}; // decode payload
@@ -4266,14 +4284,15 @@ var Polling = /*#__PURE__*/function (_Transport) {
}, {
key: "doClose",
value: function doClose() {
var self = this;
var _this3 = this;
function close() {
var close = function close() {
debug("writing close packet");
self.write([{
_this3.write([{
type: "close"
}]);
}
};
if ("open" === this.readyState) {
debug("transport open - closing");
@@ -4296,14 +4315,14 @@ var Polling = /*#__PURE__*/function (_Transport) {
}, {
key: "write",
value: function write(packets) {
var _this = this;
var _this4 = this;
this.writable = false;
parser.encodePayload(packets, function (data) {
_this.doWrite(data, function () {
_this.writable = true;
_this4.doWrite(data, function () {
_this4.writable = true;
_this.emit("drain");
_this4.emit("drain");
});
});
}
@@ -4525,61 +4544,60 @@ var WS = /*#__PURE__*/function (_Transport) {
}, {
key: "write",
value: function write(packets) {
var self = this;
var _this3 = this;
this.writable = false; // encodePacket efficient as it uses WS framing
// no need for encodePayload
var total = packets.length;
var i = 0;
var l = total;
var _loop = function _loop(i) {
var packet = packets[i];
var lastPacket = i === packets.length - 1;
parser.encodePacket(packet, _this3.supportsBinary, function (data) {
// always create a new object (GH-437)
var opts = {};
for (; i < l; i++) {
(function (packet) {
parser.encodePacket(packet, self.supportsBinary, function (data) {
// always create a new object (GH-437)
var opts = {};
if (!usingBrowserWebSocket) {
if (packet.options) {
opts.compress = packet.options.compress;
}
if (self.opts.perMessageDeflate) {
var len = "string" === typeof data ? Buffer.byteLength(data) : data.length;
if (len < self.opts.perMessageDeflate.threshold) {
opts.compress = false;
}
}
} // Sometimes the websocket has already been closed but the browser didn't
// have a chance of informing us about it yet, in that case send will
// throw an error
try {
if (usingBrowserWebSocket) {
// TypeError is thrown when passing the second argument on Safari
self.ws.send(data);
} else {
self.ws.send(data, opts);
}
} catch (e) {
debug("websocket closed before onclose event");
if (!usingBrowserWebSocket) {
if (packet.options) {
opts.compress = packet.options.compress;
}
--total || done();
});
})(packets[i]);
}
if (_this3.opts.perMessageDeflate) {
var len = "string" === typeof data ? Buffer.byteLength(data) : data.length;
function done() {
self.emit("flush"); // fake drain
// defer to next tick to allow Socket to clear writeBuffer
if (len < _this3.opts.perMessageDeflate.threshold) {
opts.compress = false;
}
}
} // Sometimes the websocket has already been closed but the browser didn't
// have a chance of informing us about it yet, in that case send will
// throw an error
setTimeout(function () {
self.writable = true;
self.emit("drain");
}, 0);
try {
if (usingBrowserWebSocket) {
// TypeError is thrown when passing the second argument on Safari
_this3.ws.send(data);
} else {
_this3.ws.send(data, opts);
}
} catch (e) {
debug("websocket closed before onclose event");
}
if (lastPacket) {
// fake drain
// defer to next tick to allow Socket to clear writeBuffer
setTimeout(function () {
_this3.writable = true;
_this3.emit("drain");
}, 0);
}
});
};
for (var i = 0; i < packets.length; i++) {
_loop(i);
}
}
/**

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -10,18 +10,31 @@ import type { SocketId } from "socket.io-adapter";
const debug = debugModule("socket.io:client");
interface WriteOptions {
compress?: boolean;
volatile?: boolean;
wsPreEncoded?: string;
}
export class Client<
ListenEvents extends EventsMap,
EmitEvents extends EventsMap
EmitEvents extends EventsMap,
ServerSideEvents extends EventsMap
> {
public readonly conn;
private readonly id: string;
private readonly server: Server<ListenEvents, EmitEvents>;
private readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
private readonly encoder: Encoder;
private readonly decoder: Decoder;
private sockets: Map<SocketId, Socket<ListenEvents, EmitEvents>> = new Map();
private nsps: Map<string, Socket<ListenEvents, EmitEvents>> = new Map();
private sockets: Map<
SocketId,
Socket<ListenEvents, EmitEvents, ServerSideEvents>
> = new Map();
private nsps: Map<
string,
Socket<ListenEvents, EmitEvents, ServerSideEvents>
> = new Map();
private connectTimeout?: NodeJS.Timeout;
/**
@@ -31,7 +44,10 @@ export class Client<
* @param conn
* @package
*/
constructor(server: Server<ListenEvents, EmitEvents>, conn: any) {
constructor(
server: Server<ListenEvents, EmitEvents, ServerSideEvents>,
conn: any
) {
this.server = server;
this.conn = conn;
this.encoder = server.encoder;
@@ -92,7 +108,11 @@ export class Client<
this.server._checkNamespace(
name,
auth,
(dynamicNspName: Namespace<ListenEvents, EmitEvents> | false) => {
(
dynamicNspName:
| Namespace<ListenEvents, EmitEvents, ServerSideEvents>
| false
) => {
if (dynamicNspName) {
debug("dynamic namespace %s was created", dynamicNspName);
this.doConnect(name, auth);
@@ -150,7 +170,7 @@ export class Client<
*
* @private
*/
_remove(socket: Socket<ListenEvents, EmitEvents>): void {
_remove(socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>): void {
if (this.sockets.has(socket.id)) {
const nsp = this.sockets.get(socket.id)!.nsp.name;
this.sockets.delete(socket.id);
@@ -180,31 +200,28 @@ export class Client<
* @param {Object} opts
* @private
*/
_packet(packet: Packet, opts?: any): void {
opts = opts || {};
const self = this;
// this writes to the actual connection
function writeToEngine(encodedPackets: any) {
// TODO clarify this.
if (opts.volatile && !self.conn.transport.writable) return;
for (let i = 0; i < encodedPackets.length; i++) {
self.conn.write(encodedPackets[i], { compress: opts.compress });
}
}
if ("open" === this.conn.readyState) {
debug("writing packet %j", packet);
if (!opts.preEncoded) {
// not broadcasting, need to encode
writeToEngine(this.encoder.encode(packet)); // encode, then write results to engine
} else {
// a broadcast pre-encodes a packet
writeToEngine(packet);
}
} else {
_packet(packet: Packet, opts: WriteOptions = {}): void {
if (this.conn.readyState !== "open") {
debug("ignoring packet write %j", packet);
return;
}
const encodedPackets = this.encoder.encode(packet);
for (const encodedPacket of encodedPackets) {
this.writeToEngine(encodedPacket, opts);
}
}
private writeToEngine(
encodedPacket: String | Buffer,
opts: WriteOptions
): void {
if (opts.volatile && !this.conn.transport.writable) {
debug(
"volatile packet is discarded since the transport is not currently writable"
);
return;
}
this.conn.write(encodedPacket, opts);
}
/**

View File

@@ -7,11 +7,7 @@ import path = require("path");
import engine = require("engine.io");
import { Client } from "./client";
import { EventEmitter } from "events";
import {
ExtendedError,
Namespace,
NamespaceReservedEventsMap,
} from "./namespace";
import { ExtendedError, Namespace, ServerReservedEventsMap } from "./namespace";
import { ParentNamespace } from "./parent-namespace";
import { Adapter, Room, SocketId } from "socket.io-adapter";
import * as parser from "socket.io-parser";
@@ -26,6 +22,7 @@ import {
DefaultEventsMap,
EventParams,
StrictEventEmitter,
EventNames,
} from "./typed-events";
const debug = debugModule("socket.io:server");
@@ -170,13 +167,18 @@ interface ServerOptions extends EngineAttachOptions {
export class Server<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = {}
> extends StrictEventEmitter<
{},
ServerSideEvents,
EmitEvents,
NamespaceReservedEventsMap<ListenEvents, EmitEvents>
ServerReservedEventsMap<ListenEvents, EmitEvents, ServerSideEvents>
> {
public readonly sockets: Namespace<ListenEvents, EmitEvents>;
public readonly sockets: Namespace<
ListenEvents,
EmitEvents,
ServerSideEvents
>;
/**
* A reference to the underlying Engine.IO server.
*
@@ -197,10 +199,13 @@ export class Server<
/**
* @private
*/
_nsps: Map<string, Namespace<ListenEvents, EmitEvents>> = new Map();
_nsps: Map<
string,
Namespace<ListenEvents, EmitEvents, ServerSideEvents>
> = new Map();
private parentNsps: Map<
ParentNspNameMatchFn,
ParentNamespace<ListenEvents, EmitEvents>
ParentNamespace<ListenEvents, EmitEvents, ServerSideEvents>
> = new Map();
private _adapter?: typeof Adapter;
private _serveClient: boolean;
@@ -280,7 +285,9 @@ export class Server<
_checkNamespace(
name: string,
auth: { [key: string]: any },
fn: (nsp: Namespace<ListenEvents, EmitEvents> | false) => void
fn: (
nsp: Namespace<ListenEvents, EmitEvents, ServerSideEvents> | false
) => void
): void {
if (this.parentNsps.size === 0) return fn(false);
@@ -295,7 +302,12 @@ export class Server<
if (err || !allow) {
run();
} else {
fn(this.parentNsps.get(nextFn.value)!.createChild(name));
const namespace = this.parentNsps
.get(nextFn.value)!
.createChild(name);
// @ts-ignore
this.sockets.emitReserved("new_namespace", namespace);
fn(namespace);
}
});
};
@@ -589,8 +601,8 @@ export class Server<
*/
public of(
name: string | RegExp | ParentNspNameMatchFn,
fn?: (socket: Socket<ListenEvents, EmitEvents>) => void
): Namespace<ListenEvents, EmitEvents> {
fn?: (socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>) => void
): Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
if (typeof name === "function" || name instanceof RegExp) {
const parentNsp = new ParentNamespace(this);
debug("initializing parent namespace %s", parentNsp.name);
@@ -616,6 +628,10 @@ export class Server<
debug("initializing namespace %s", name);
nsp = new Namespace(this, name);
this._nsps.set(name, nsp);
if (name !== "/") {
// @ts-ignore
this.sockets.emitReserved("new_namespace", nsp);
}
}
if (fn) nsp.on("connect", fn);
return nsp;
@@ -649,7 +665,7 @@ export class Server<
*/
public use(
fn: (
socket: Socket<ListenEvents, EmitEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
next: (err?: ExtendedError) => void
) => void
): this {
@@ -686,7 +702,9 @@ export class Server<
* @return self
* @public
*/
public except(name: Room | Room[]): Server<ListenEvents, EmitEvents> {
public except(
name: Room | Room[]
): Server<ListenEvents, EmitEvents, ServerSideEvents> {
this.sockets.except(name);
return this;
}
@@ -713,6 +731,20 @@ export class Server<
return this;
}
/**
* Emit a packet to other Socket.IO servers
*
* @param ev - the event name
* @param args - an array of arguments, which may include an acknowledgement callback at the end
* @public
*/
public serverSideEmit<Ev extends EventNames<ServerSideEvents>>(
ev: Ev,
...args: EventParams<ServerSideEvents, Ev>
): boolean {
return this.sockets.serverSideEmit(ev, ...args);
}
/**
* Gets a list of socket ids.
*

View File

@@ -20,35 +20,57 @@ export interface ExtendedError extends Error {
export interface NamespaceReservedEventsMap<
ListenEvents extends EventsMap,
EmitEvents extends EventsMap
EmitEvents extends EventsMap,
ServerSideEvents extends EventsMap
> {
connect: (socket: Socket<ListenEvents, EmitEvents>) => void;
connection: (socket: Socket<ListenEvents, EmitEvents>) => void;
connect: (socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>) => void;
connection: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>
) => void;
}
export interface ServerReservedEventsMap<
ListenEvents,
EmitEvents,
ServerSideEvents
> extends NamespaceReservedEventsMap<
ListenEvents,
EmitEvents,
ServerSideEvents
> {
new_namespace: (
namespace: Namespace<ListenEvents, EmitEvents, ServerSideEvents>
) => void;
}
export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set<
keyof ServerReservedEventsMap<never, never, never>
>(<const>["connect", "connection", "new_namespace"]);
export class Namespace<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = {}
> extends StrictEventEmitter<
{},
ServerSideEvents,
EmitEvents,
NamespaceReservedEventsMap<ListenEvents, EmitEvents>
NamespaceReservedEventsMap<ListenEvents, EmitEvents, ServerSideEvents>
> {
public readonly name: string;
public readonly sockets: Map<
SocketId,
Socket<ListenEvents, EmitEvents>
Socket<ListenEvents, EmitEvents, ServerSideEvents>
> = new Map();
public adapter: Adapter;
/** @private */
readonly server: Server<ListenEvents, EmitEvents>;
readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
/** @private */
_fns: Array<
(
socket: Socket<ListenEvents, EmitEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
next: (err?: ExtendedError) => void
) => void
> = [];
@@ -62,7 +84,10 @@ export class Namespace<
* @param server instance
* @param name
*/
constructor(server: Server<ListenEvents, EmitEvents>, name: string) {
constructor(
server: Server<ListenEvents, EmitEvents, ServerSideEvents>,
name: string
) {
super();
this.server = server;
this.name = name;
@@ -88,7 +113,7 @@ export class Namespace<
*/
public use(
fn: (
socket: Socket<ListenEvents, EmitEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
next: (err?: ExtendedError) => void
) => void
): this {
@@ -104,7 +129,7 @@ export class Namespace<
* @private
*/
private run(
socket: Socket<ListenEvents, EmitEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
fn: (err: ExtendedError | null) => void
) {
const fns = this._fns.slice(0);
@@ -166,10 +191,10 @@ export class Namespace<
* @private
*/
_add(
client: Client<ListenEvents, EmitEvents>,
client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
query,
fn?: () => void
): Socket<ListenEvents, EmitEvents> {
): Socket<ListenEvents, EmitEvents, ServerSideEvents> {
debug("adding socket to nsp %s", this.name);
const socket = new Socket(this, client, query);
this.run(socket, (err) => {
@@ -212,7 +237,7 @@ export class Namespace<
*
* @private
*/
_remove(socket: Socket<ListenEvents, EmitEvents>): void {
_remove(socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>): void {
if (this.sockets.has(socket.id)) {
this.sockets.delete(socket.id);
} else {
@@ -255,6 +280,37 @@ export class Namespace<
return this;
}
/**
* Emit a packet to other Socket.IO servers
*
* @param ev - the event name
* @param args - an array of arguments, which may include an acknowledgement callback at the end
* @public
*/
public serverSideEmit<Ev extends EventNames<ServerSideEvents>>(
ev: Ev,
...args: EventParams<ServerSideEvents, Ev>
): boolean {
if (RESERVED_EVENTS.has(ev)) {
throw new Error(`"${ev}" is a reserved event name`);
}
args.unshift(ev);
this.adapter.serverSideEmit(args);
return true;
}
/**
* Called when a packet is received from another Socket.IO server
*
* @param args - an array of arguments, which may include an acknowledgement callback at the end
*
* @private
*/
_onServerSideEmit(args: any[]) {
const event = args.shift();
this.emitUntyped(event, args);
}
/**
* Gets a list of clients.
*

View File

@@ -10,12 +10,15 @@ import type { BroadcastOptions } from "socket.io-adapter";
export class ParentNamespace<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents
> extends Namespace<ListenEvents, EmitEvents> {
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = {}
> extends Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
private static count: number = 0;
private children: Set<Namespace<ListenEvents, EmitEvents>> = new Set();
private children: Set<
Namespace<ListenEvents, EmitEvents, ServerSideEvents>
> = new Set();
constructor(server: Server<ListenEvents, EmitEvents>) {
constructor(server: Server<ListenEvents, EmitEvents, ServerSideEvents>) {
super(server, "/_" + ParentNamespace.count++);
}
@@ -43,7 +46,9 @@ export class ParentNamespace<
return true;
}
createChild(name: string): Namespace<ListenEvents, EmitEvents> {
createChild(
name: string
): Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
const namespace = new Namespace(this.server, name);
namespace._fns = this._fns.slice(0);
this.listeners("connect").forEach((listener) =>

View File

@@ -46,7 +46,7 @@ export interface EventEmitterReservedEventsMap {
export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set<
| ClientReservedEvents
| keyof NamespaceReservedEventsMap<never, never>
| keyof NamespaceReservedEventsMap<never, never, never>
| keyof SocketReservedEventsMap
| keyof EventEmitterReservedEventsMap
>(<const>[
@@ -110,7 +110,8 @@ export interface Handshake {
export class Socket<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = {}
> extends StrictEventEmitter<
ListenEvents,
EmitEvents,
@@ -126,7 +127,7 @@ export class Socket<
public connected: boolean;
public disconnected: boolean;
private readonly server: Server<ListenEvents, EmitEvents>;
private readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
private readonly adapter: Adapter;
private acks: Map<number, () => void> = new Map();
private fns: Array<
@@ -144,8 +145,8 @@ export class Socket<
* @package
*/
constructor(
readonly nsp: Namespace<ListenEvents, EmitEvents>,
readonly client: Client<ListenEvents, EmitEvents>,
readonly nsp: Namespace<ListenEvents, EmitEvents, ServerSideEvents>,
readonly client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
auth: object
) {
super();

28
package-lock.json generated
View File

@@ -1,6 +1,6 @@
{
"name": "socket.io",
"version": "4.0.2",
"version": "4.1.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@@ -1206,9 +1206,9 @@
}
},
"engine.io": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/engine.io/-/engine.io-5.0.0.tgz",
"integrity": "sha512-BATIdDV3H1SrE9/u2BAotvsmjJg0t1P4+vGedImSs1lkFAtQdvk4Ev1y4LDiPF7BPWgXWEG+NDY+nLvW3UrMWw==",
"version": "5.1.0",
"resolved": "https://registry.npmjs.org/engine.io/-/engine.io-5.1.0.tgz",
"integrity": "sha512-A2i4kVvOA3qezQLlMz+FayGFdqOo0LP3fYrb0VqXMDXKoXcbgM0KxcEYnsdVzOMJQErIAb1GIStRj7UWFoiqlQ==",
"requires": {
"accepts": "~1.3.4",
"base64id": "2.0.0",
@@ -1220,9 +1220,9 @@
}
},
"engine.io-client": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-5.0.1.tgz",
"integrity": "sha512-CQtGN3YwfvbxVwpPugcsHe5rHT4KgT49CEcQppNtu9N7WxbPN0MAG27lGaem7bvtCFtGNLSL+GEqXsFSz36jTg==",
"version": "5.1.1",
"resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-5.1.1.tgz",
"integrity": "sha512-jPFpw2HLL0lhZ2KY0BpZhIJdleQcUO9W1xkIpo0h3d6s+5D6+EV/xgQw9qWOmymszv2WXef/6KUUehyxEKomlQ==",
"dev": true,
"requires": {
"base64-arraybuffer": "0.1.4",
@@ -3330,21 +3330,21 @@
}
},
"socket.io-adapter": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.2.0.tgz",
"integrity": "sha512-rG49L+FwaVEwuAdeBRq49M97YI3ElVabJPzvHT9S6a2CWhDKnjSFasvwAwSYPRhQzfn4NtDIbCaGYgOCOU/rlg=="
"version": "2.3.0",
"resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.3.0.tgz",
"integrity": "sha512-jdIbSFRWOkaZpo5mXy8T7rXEN6qo3bOFuq4nVeX1ZS7AtFlkbk39y153xTXEIW7W94vZfhVOux1wTU88YxcM1w=="
},
"socket.io-client": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.0.2.tgz",
"integrity": "sha512-z6Hw9Cs+cc7BEkSPlDrtHFpAI++xXMklG9iEEyPJdK4WcFcVrhrXcczYVDZmV3GIpFed5hL3LEXhpnmMy8DqDg==",
"version": "4.1.0",
"resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.1.0.tgz",
"integrity": "sha512-L0lCCPTb5xz+KQ3Wrq0077XjJwpaYpjagRvqE5Sg9aXWekfrEqPFvICCUWs7pJqPv7QVN09KoaPKPkVOOetmbw==",
"dev": true,
"requires": {
"@types/component-emitter": "^1.2.10",
"backo2": "~1.0.2",
"component-emitter": "~1.3.0",
"debug": "~4.3.1",
"engine.io-client": "~5.0.0",
"engine.io-client": "~5.1.1",
"parseuri": "0.0.6",
"socket.io-parser": "~4.0.4"
},

View File

@@ -1,6 +1,6 @@
{
"name": "socket.io",
"version": "4.0.2",
"version": "4.1.0",
"description": "node.js realtime framework server",
"keywords": [
"realtime",
@@ -51,8 +51,8 @@
"accepts": "~1.3.4",
"base64id": "~2.0.0",
"debug": "~4.3.1",
"engine.io": "~5.0.0",
"socket.io-adapter": "~2.2.0",
"engine.io": "~5.1.0",
"socket.io-adapter": "~2.3.0",
"socket.io-parser": "~4.0.3"
},
"devDependencies": {
@@ -65,7 +65,7 @@
"nyc": "^15.1.0",
"prettier": "^2.2.0",
"rimraf": "^3.0.2",
"socket.io-client": "4.0.2",
"socket.io-client": "4.1.0",
"socket.io-client-v2": "npm:socket.io-client@^2.4.0",
"superagent": "^6.1.0",
"supertest": "^6.0.1",

View File

@@ -229,4 +229,48 @@ describe("server", () => {
});
});
});
describe("listen and emit event maps", () => {
interface ClientToServerEvents {
helloFromClient: (message: string) => void;
}
interface ServerToClientEvents {
helloFromServer: (message: string, x: number) => void;
}
interface InterServerEvents {
helloFromServerToServer: (message: string, x: number) => void;
}
describe("on", () => {
it("infers correct types for listener parameters", () => {
const srv = createServer();
const sio = new Server<
ClientToServerEvents,
ServerToClientEvents,
InterServerEvents
>(srv);
expectType<
Server<ClientToServerEvents, ServerToClientEvents, InterServerEvents>
>(sio);
srv.listen(() => {
sio.serverSideEmit("helloFromServerToServer", "hello", 10);
sio
.of("/test")
.serverSideEmit("helloFromServerToServer", "hello", 10);
sio.on("helloFromServerToServer", (message, x) => {
expectType<string>(message);
expectType<number>(x);
});
sio.of("/test").on("helloFromServerToServer", (message, x) => {
expectType<string>(message);
expectType<number>(x);
});
});
});
});
});
});

View File

@@ -812,7 +812,7 @@ describe("socket.io", () => {
});
});
it("should close a client without namespace", (done) => {
it("should close a client without namespace (2)", (done) => {
const srv = createServer();
const sio = new Server(srv, {
connectTimeout: 100,
@@ -886,6 +886,17 @@ describe("socket.io", () => {
});
});
it("should emit an 'new_namespace' event", (done) => {
const sio = new Server();
sio.on("new_namespace", (namespace) => {
expect(namespace.name).to.eql("/nsp");
done();
});
sio.of("/nsp");
});
describe("dynamic namespaces", () => {
it("should allow connections to dynamic namespaces with a regex", (done) => {
const srv = createServer();
@@ -942,6 +953,24 @@ describe("socket.io", () => {
});
});
});
it("should emit an 'new_namespace' event for a dynamic namespace", (done) => {
const srv = createServer();
const sio = new Server(srv);
srv.listen(() => {
sio.of(/^\/dynamic-\d+$/);
sio.on("new_namespace", (namespace) => {
expect(namespace.name).to.be("/dynamic-101");
socket.disconnect();
srv.close();
done();
});
const socket = client(srv, "/dynamic-101");
});
});
});
});
@@ -955,7 +984,9 @@ describe("socket.io", () => {
clientSocket.off("connect", init);
clientSocket.io.engine.close();
clientSocket.connect();
process.nextTick(() => {
clientSocket.connect();
});
clientSocket.on("connect", () => {
done();
});
@@ -2385,6 +2416,28 @@ describe("socket.io", () => {
});
});
});
it("should pre encode a broadcast packet", (done) => {
const srv = createServer();
const sio = new Server(srv);
srv.listen(() => {
const clientSocket = client(srv, { multiplex: false });
sio.on("connection", (socket) => {
socket.conn.on("packetCreate", (packet) => {
expect(packet.data).to.eql('2["hello","world"]');
expect(packet.options.wsPreEncoded).to.eql('42["hello","world"]');
clientSocket.close();
sio.close();
done();
});
sio.emit("hello", "world");
});
});
});
});
describe("middleware", () => {