Compare commits

..

13 Commits
4.3.0 ... 4.4.0

Author SHA1 Message Date
Damien Arrachequesne
0f11c4745f chore(release): 4.4.0
Diff: https://github.com/socketio/socket.io/compare/4.3.2...4.4.0
2021-11-18 14:10:19 +01:00
Damien Arrachequesne
b839a3b400 fix: prevent double ack when emitting with a timeout
The ack was not properly removed upon timeout, and could be called
twice.

Related: f0ed42f18c
2021-11-18 14:03:07 +01:00
Damien Arrachequesne
f0ed42f18c feat: add timeout feature
Usage:

```js
socket.timeout(5000).emit("my-event", (err) => {
  if (err) {
    // the client did not acknowledge the event in the given delay
  }
});
```
2021-11-16 20:07:53 +01:00
Damien Arrachequesne
b7213e71e4 test: fix flaky test
`srv.close()` only closes the underlying HTTP server, but this does not
terminate the existing WebSocket connections.

Reference: https://nodejs.org/api/http.html#serverclosecallback
2021-11-16 15:58:55 +01:00
Damien Arrachequesne
2da82103d2 test: add test for volatile packet with binary
See also: 88eee5948a
2021-11-16 15:57:32 +01:00
Damien Arrachequesne
02b0f73e2c fix: only set 'connected' to true after middleware execution
The Socket instance is only considered connected when the "connection"
event is emitted, and not during the middleware(s) execution.

```js
io.use((socket, next) => {
  console.log(socket.connected); // prints "false"
  next();
});

io.on("connection", (socket) => {
  console.log(socket.connected); // prints "true"
});
```

Related: https://github.com/socketio/socket.io/issues/4129
2021-11-12 07:31:52 +01:00
Damien Arrachequesne
c0d8c5ab23 feat: add an implementation based on uWebSockets.js
Usage:

```js
const { App } = require("uWebSockets.js");
const { Server } = require("socket.io");

const app = new App();
const server = new Server();

server.attachApp(app);

app.listen(3000);
```

The Adapter prototype is updated so we can benefit from the publish
functionality of uWebSockets.js, so this will apply to all adapters
extending the default adapter.

Reference: https://github.com/uNetworking/uWebSockets.js

Related:

- https://github.com/socketio/socket.io/issues/3601
- https://github.com/socketio/engine.io/issues/578
2021-11-12 07:01:55 +01:00
Nikita Kolmogorov
fe8730ca0f feat: add type information to socket.data (#4159)
Usage:

```js
interface SocketData {
  name: string;
  age: number;
}

const io = new Server<ClientToServerEvents, ServerToClientEvents, InterServerEvents, SocketData>();

io.on("connection", (socket) => {
  socket.data.name = "john";
  socket.data.age = 42;
});
```
2021-11-08 15:21:48 +01:00
Damien Arrachequesne
ed8483da4d chore(release): 4.3.2
Diff: https://github.com/socketio/socket.io/compare/4.3.1...4.3.2
2021-11-08 06:39:20 +01:00
Sebastiaan Marynissen
9d86397243 fix: fix race condition in dynamic namespaces (#4137)
Using an async operation with `io.use()` could lead to the creation of
several instances of a same namespace, each of them overriding the
previous one.

Example:

```js
io.use(async (nsp, auth, next) => {
  await anOperationThatTakesSomeTime();
  next();
});
```

Related: https://github.com/socketio/socket.io/pull/4136
2021-10-24 07:46:29 +02:00
Naseem
44e20ba5bf refactor: add event type for use() (#4138) 2021-10-24 07:19:43 +02:00
Damien Arrachequesne
ccc5ec39a8 chore(release): 4.3.1
Diff: https://github.com/socketio/socket.io/compare/4.3.0...4.3.1
2021-10-17 00:02:16 +02:00
Josh Field
0ef2a4d02c fix: fix server attachment (#4127)
The check excluded an HTTPS server from being properly attached.

Related: https://github.com/socketio/socket.io/issues/4124
2021-10-16 23:58:55 +02:00
21 changed files with 920 additions and 189 deletions

View File

@@ -1,3 +1,35 @@
# [4.4.0](https://github.com/socketio/socket.io/compare/4.3.2...4.4.0) (2021-11-18)
### Bug Fixes
* only set 'connected' to true after middleware execution ([02b0f73](https://github.com/socketio/socket.io/commit/02b0f73e2c64b09c72c5fbf7dc5f059557bdbe50))
### Features
* add an implementation based on uWebSockets.js ([c0d8c5a](https://github.com/socketio/socket.io/commit/c0d8c5ab234d0d2bef0d0dec472973cc9662f647))
* add timeout feature ([f0ed42f](https://github.com/socketio/socket.io/commit/f0ed42f18cabef20ad976aeec37077b6bf3837a5))
* add type information to `socket.data` ([#4159](https://github.com/socketio/socket.io/issues/4159)) ([fe8730c](https://github.com/socketio/socket.io/commit/fe8730ca0f15bc92d5de81cf934c89c76d6af329))
## [4.3.2](https://github.com/socketio/socket.io/compare/4.3.1...4.3.2) (2021-11-08)
### Bug Fixes
* fix race condition in dynamic namespaces ([#4137](https://github.com/socketio/socket.io/issues/4137)) ([9d86397](https://github.com/socketio/socket.io/commit/9d86397243bcbb5775a29d96e5ef03e17148a8e7))
## [4.3.1](https://github.com/socketio/socket.io/compare/4.3.0...4.3.1) (2021-10-16)
### Bug Fixes
* fix server attachment ([#4127](https://github.com/socketio/socket.io/issues/4127)) ([0ef2a4d](https://github.com/socketio/socket.io/commit/0ef2a4d02c9350aff163df9cb61aece89c4dac0f))
# [4.3.0](https://github.com/socketio/socket.io/compare/4.2.0...4.3.0) (2021-10-14)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,5 +1,5 @@
/*!
* Socket.IO v4.3.0
* Socket.IO v4.4.0
* (c) 2014-2021 Guillermo Rauch
* Released under the MIT License.
*/
@@ -3148,8 +3148,12 @@
packet.options.compress = this.flags.compress !== false; // event ack callback
if ("function" === typeof args[args.length - 1]) {
this.acks[this.ids] = args.pop();
packet.id = this.ids++;
var id = this.ids++;
var ack = args.pop();
this._registerAckCallback(id, ack);
packet.id = id;
}
var isTransportWritable = this.io.engine && this.io.engine.transport && this.io.engine.transport.writable;
@@ -3164,6 +3168,46 @@
this.flags = {};
return this;
}
/**
* @private
*/
}, {
key: "_registerAckCallback",
value: function _registerAckCallback(id, ack) {
var _this2 = this;
var timeout = this.flags.timeout;
if (timeout === undefined) {
this.acks[id] = ack;
return;
} // @ts-ignore
var timer = this.io.setTimeoutFn(function () {
delete _this2.acks[id];
for (var i = 0; i < _this2.sendBuffer.length; i++) {
if (_this2.sendBuffer[i].id === id) {
_this2.sendBuffer.splice(i, 1);
}
}
ack.call(_this2, new Error("operation has timed out"));
}, timeout);
this.acks[id] = function () {
// @ts-ignore
_this2.io.clearTimeoutFn(timer);
for (var _len3 = arguments.length, args = new Array(_len3), _key3 = 0; _key3 < _len3; _key3++) {
args[_key3] = arguments[_key3];
}
ack.apply(_this2, [null].concat(args));
};
}
/**
* Sends a packet.
*
@@ -3187,11 +3231,11 @@
}, {
key: "onopen",
value: function onopen() {
var _this2 = this;
var _this3 = this;
if (typeof this.auth == "function") {
this.auth(function (data) {
_this2.packet({
_this3.packet({
type: PacketType.CONNECT,
data: data
});
@@ -3277,6 +3321,7 @@
break;
case PacketType.CONNECT_ERROR:
this.destroy();
var err = new Error(packet.data.message); // @ts-ignore
err.data = packet.data.data;
@@ -3345,8 +3390,8 @@
if (sent) return;
sent = true;
for (var _len3 = arguments.length, args = new Array(_len3), _key3 = 0; _key3 < _len3; _key3++) {
args[_key3] = arguments[_key3];
for (var _len4 = arguments.length, args = new Array(_len4), _key4 = 0; _key4 < _len4; _key4++) {
args[_key4] = arguments[_key4];
}
self.packet({
@@ -3397,14 +3442,14 @@
}, {
key: "emitBuffered",
value: function emitBuffered() {
var _this3 = this;
var _this4 = this;
this.receiveBuffer.forEach(function (args) {
return _this3.emitEvent(args);
return _this4.emitEvent(args);
});
this.receiveBuffer = [];
this.sendBuffer.forEach(function (packet) {
return _this3.packet(packet);
return _this4.packet(packet);
});
this.sendBuffer = [];
}
@@ -3507,6 +3552,28 @@
this.flags["volatile"] = true;
return this;
}
/**
* Sets a modifier for a subsequent event emission that the callback will be called with an error when the
* given number of milliseconds have elapsed without an acknowledgement from the server:
*
* ```
* socket.timeout(5000).emit("my-event", (err) => {
* if (err) {
* // the server did not acknowledge the event in the given delay
* }
* });
* ```
*
* @returns self
* @public
*/
}, {
key: "timeout",
value: function timeout(_timeout) {
this.flags.timeout = _timeout;
return this;
}
/**
* Adds a listener that will be fired when any event is emitted. The event name is passed as the first argument to the
* callback.
@@ -4020,15 +4087,7 @@
value: function _close() {
this.skipReconnect = true;
this._reconnecting = false;
if ("opening" === this._readyState) {
// `onclose` will not fire because
// an open event never happened
this.cleanup();
}
this.backoff.reset();
this._readyState = "closed";
this.onclose("forced close");
if (this.engine) this.engine.close();
}
/**
@@ -4164,7 +4223,16 @@
}
return io.socket(parsed.path, opts);
}
} // so that "lookup" can be used both as a function (e.g. `io(...)`) and as a
// namespace (e.g. `io.connect(...)`), for backward compatibility
_extends(lookup, {
Manager: Manager,
Socket: Socket,
io: lookup,
connect: lookup
});
return lookup;

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -21,21 +21,27 @@ interface WriteOptions {
export class Client<
ListenEvents extends EventsMap,
EmitEvents extends EventsMap,
ServerSideEvents extends EventsMap
ServerSideEvents extends EventsMap,
SocketData = any
> {
public readonly conn: RawSocket;
private readonly id: string;
private readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
private readonly server: Server<
ListenEvents,
EmitEvents,
ServerSideEvents,
SocketData
>;
private readonly encoder: Encoder;
private readonly decoder: Decoder;
private sockets: Map<
SocketId,
Socket<ListenEvents, EmitEvents, ServerSideEvents>
Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
> = new Map();
private nsps: Map<
string,
Socket<ListenEvents, EmitEvents, ServerSideEvents>
Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
> = new Map();
private connectTimeout?: NodeJS.Timeout;
@@ -47,7 +53,7 @@ export class Client<
* @package
*/
constructor(
server: Server<ListenEvents, EmitEvents, ServerSideEvents>,
server: Server<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
conn: any
) {
this.server = server;
@@ -112,11 +118,10 @@ export class Client<
auth,
(
dynamicNspName:
| Namespace<ListenEvents, EmitEvents, ServerSideEvents>
| Namespace<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
| false
) => {
if (dynamicNspName) {
debug("dynamic namespace %s was created", dynamicNspName);
this.doConnect(name, auth);
} else {
debug("creation of namespace %s was denied", name);
@@ -172,7 +177,9 @@ export class Client<
*
* @private
*/
_remove(socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>): void {
_remove(
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
): void {
if (this.sockets.has(socket.id)) {
const nsp = this.sockets.get(socket.id)!.nsp.name;
this.sockets.delete(socket.id);

View File

@@ -9,6 +9,7 @@ import {
Server as Engine,
ServerOptions as EngineOptions,
AttachOptions,
uServer,
} from "engine.io";
import { Client } from "./client";
import { EventEmitter } from "events";
@@ -27,6 +28,7 @@ import {
StrictEventEmitter,
EventNames,
} from "./typed-events";
import { patchAdapter, restoreAdapter, serveFile } from "./uws.js";
const debug = debugModule("socket.io:server");
@@ -72,16 +74,23 @@ interface ServerOptions extends EngineOptions, AttachOptions {
export class Server<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = DefaultEventsMap
ServerSideEvents extends EventsMap = DefaultEventsMap,
SocketData = any
> extends StrictEventEmitter<
ServerSideEvents,
EmitEvents,
ServerReservedEventsMap<ListenEvents, EmitEvents, ServerSideEvents>
ServerReservedEventsMap<
ListenEvents,
EmitEvents,
ServerSideEvents,
SocketData
>
> {
public readonly sockets: Namespace<
ListenEvents,
EmitEvents,
ServerSideEvents
ServerSideEvents,
SocketData
>;
/**
* A reference to the underlying Engine.IO server.
@@ -156,9 +165,7 @@ export class Server<
this.adapter(opts.adapter || Adapter);
this.sockets = this.of("/");
this.opts = opts;
if (srv instanceof http.Server || typeof srv === "number") {
this.attach(srv);
}
if (srv || typeof srv == "number") this.attach(srv as http.Server | number);
}
/**
@@ -204,15 +211,18 @@ export class Server<
}
nextFn.value(name, auth, (err, allow) => {
if (err || !allow) {
run();
} else {
const namespace = this.parentNsps
.get(nextFn.value)!
.createChild(name);
// @ts-ignore
this.sockets.emitReserved("new_namespace", namespace);
fn(namespace);
return run();
}
if (this._nsps.has(name)) {
// the namespace was created in the meantime
debug("dynamic namespace %s already exists", name);
return fn(this._nsps.get(name) as Namespace);
}
const namespace = this.parentNsps.get(nextFn.value)!.createChild(name);
debug("dynamic namespace %s was created", name);
// @ts-ignore
this.sockets.emitReserved("new_namespace", namespace);
fn(namespace);
});
};
@@ -336,6 +346,69 @@ export class Server<
return this;
}
public attachApp(app /*: TemplatedApp */, opts: Partial<ServerOptions> = {}) {
// merge the options passed to the Socket.IO server
Object.assign(opts, this.opts);
// set engine.io path to `/socket.io`
opts.path = opts.path || this._path;
// initialize engine
debug("creating uWebSockets.js-based engine with opts %j", opts);
const engine = new uServer(opts);
engine.attach(app, opts);
// bind to engine events
this.bind(engine);
if (this._serveClient) {
// attach static file serving
app.get(`${this._path}/*`, (res, req) => {
if (!this.clientPathRegex.test(req.getUrl())) {
req.setYield(true);
return;
}
const filename = req
.getUrl()
.replace(this._path, "")
.replace(/\?.*$/, "")
.replace(/^\//, "");
const isMap = dotMapRegex.test(filename);
const type = isMap ? "map" : "source";
// Per the standard, ETags must be quoted:
// https://tools.ietf.org/html/rfc7232#section-2.3
const expectedEtag = '"' + clientVersion + '"';
const weakEtag = "W/" + expectedEtag;
const etag = req.getHeader("if-none-match");
if (etag) {
if (expectedEtag === etag || weakEtag === etag) {
debug("serve client %s 304", type);
res.writeStatus("304 Not Modified");
res.end();
return;
}
}
debug("serve client %s", type);
res.writeHeader("cache-control", "public, max-age=0");
res.writeHeader(
"content-type",
"application/" + (isMap ? "json" : "javascript")
);
res.writeHeader("etag", expectedEtag);
const filepath = path.join(__dirname, "../client-dist/", filename);
serveFile(res, filepath);
});
}
patchAdapter(app);
}
/**
* Initialize engine
*
@@ -503,7 +576,9 @@ export class Server<
*/
public of(
name: string | RegExp | ParentNspNameMatchFn,
fn?: (socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>) => void
fn?: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
) => void
): Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
if (typeof name === "function" || name instanceof RegExp) {
const parentNsp = new ParentNamespace(this);
@@ -552,6 +627,9 @@ export class Server<
this.engine.close();
// restore the Adapter prototype
restoreAdapter();
if (this.httpServer) {
this.httpServer.close(fn);
} else {
@@ -567,7 +645,7 @@ export class Server<
*/
public use(
fn: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
next: (err?: ExtendedError) => void
) => void
): this {

View File

@@ -21,56 +21,72 @@ export interface ExtendedError extends Error {
export interface NamespaceReservedEventsMap<
ListenEvents extends EventsMap,
EmitEvents extends EventsMap,
ServerSideEvents extends EventsMap
ServerSideEvents extends EventsMap,
SocketData
> {
connect: (socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>) => void;
connect: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
) => void;
connection: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
) => void;
}
export interface ServerReservedEventsMap<
ListenEvents,
EmitEvents,
ServerSideEvents
ServerSideEvents,
SocketData
> extends NamespaceReservedEventsMap<
ListenEvents,
EmitEvents,
ServerSideEvents
ServerSideEvents,
SocketData
> {
new_namespace: (
namespace: Namespace<ListenEvents, EmitEvents, ServerSideEvents>
namespace: Namespace<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
) => void;
}
export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set<
keyof ServerReservedEventsMap<never, never, never>
keyof ServerReservedEventsMap<never, never, never, never>
>(<const>["connect", "connection", "new_namespace"]);
export class Namespace<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = DefaultEventsMap
ServerSideEvents extends EventsMap = DefaultEventsMap,
SocketData = any
> extends StrictEventEmitter<
ServerSideEvents,
EmitEvents,
NamespaceReservedEventsMap<ListenEvents, EmitEvents, ServerSideEvents>
NamespaceReservedEventsMap<
ListenEvents,
EmitEvents,
ServerSideEvents,
SocketData
>
> {
public readonly name: string;
public readonly sockets: Map<
SocketId,
Socket<ListenEvents, EmitEvents, ServerSideEvents>
Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
> = new Map();
public adapter: Adapter;
/** @private */
readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
readonly server: Server<
ListenEvents,
EmitEvents,
ServerSideEvents,
SocketData
>;
/** @private */
_fns: Array<
(
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
next: (err?: ExtendedError) => void
) => void
> = [];
@@ -85,7 +101,7 @@ export class Namespace<
* @param name
*/
constructor(
server: Server<ListenEvents, EmitEvents, ServerSideEvents>,
server: Server<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
name: string
) {
super();
@@ -114,7 +130,7 @@ export class Namespace<
*/
public use(
fn: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
next: (err?: ExtendedError) => void
) => void
): this {
@@ -130,7 +146,7 @@ export class Namespace<
* @private
*/
private run(
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
fn: (err: ExtendedError | null) => void
) {
const fns = this._fns.slice(0);
@@ -195,7 +211,7 @@ export class Namespace<
client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
query,
fn?: () => void
): Socket<ListenEvents, EmitEvents, ServerSideEvents> {
): Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData> {
debug("adding socket to nsp %s", this.name);
const socket = new Socket(this, client, query);
this.run(socket, (err) => {
@@ -238,7 +254,9 @@ export class Namespace<
*
* @private
*/
_remove(socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>): void {
_remove(
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
): void {
if (this.sockets.has(socket.id)) {
this.sockets.delete(socket.id);
} else {

View File

@@ -11,13 +11,17 @@ import type { BroadcastOptions } from "socket.io-adapter";
export class ParentNamespace<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = DefaultEventsMap
> extends Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
ServerSideEvents extends EventsMap = DefaultEventsMap,
SocketData = any
> extends Namespace<ListenEvents, EmitEvents, ServerSideEvents, SocketData> {
private static count: number = 0;
private children: Set<Namespace<ListenEvents, EmitEvents, ServerSideEvents>> =
new Set();
private children: Set<
Namespace<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
> = new Set();
constructor(server: Server<ListenEvents, EmitEvents, ServerSideEvents>) {
constructor(
server: Server<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
) {
super(server, "/_" + ParentNamespace.count++);
}
@@ -47,7 +51,7 @@ export class ParentNamespace<
createChild(
name: string
): Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
): Namespace<ListenEvents, EmitEvents, ServerSideEvents, SocketData> {
const namespace = new Namespace(this.server, name);
namespace._fns = this._fns.slice(0);
this.listeners("connect").forEach((listener) =>

View File

@@ -1,5 +1,4 @@
import { Packet, PacketType } from "socket.io-parser";
import url = require("url");
import debugModule from "debug";
import type { Server } from "./index";
import {
@@ -46,7 +45,7 @@ export interface EventEmitterReservedEventsMap {
export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set<
| ClientReservedEvents
| keyof NamespaceReservedEventsMap<never, never, never>
| keyof NamespaceReservedEventsMap<never, never, never, never>
| keyof SocketReservedEventsMap
| keyof EventEmitterReservedEventsMap
>(<const>[
@@ -108,10 +107,13 @@ export interface Handshake {
auth: { [key: string]: any };
}
type Event = [eventName: string, ...args: any[]];
export class Socket<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = DefaultEventsMap
ServerSideEvents extends EventsMap = DefaultEventsMap,
SocketData = any
> extends StrictEventEmitter<
ListenEvents,
EmitEvents,
@@ -122,17 +124,20 @@ export class Socket<
/**
* Additional information that can be attached to the Socket instance and which will be used in the fetchSockets method
*/
public data: any = {};
public data: Partial<SocketData> = {};
public connected: boolean;
public disconnected: boolean;
public connected: boolean = false;
private readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
private readonly server: Server<
ListenEvents,
EmitEvents,
ServerSideEvents,
SocketData
>;
private readonly adapter: Adapter;
private acks: Map<number, () => void> = new Map();
private fns: Array<(event: Array<any>, next: (err?: Error) => void) => void> =
[];
private flags: BroadcastFlags = {};
private fns: Array<(event: Event, next: (err?: Error) => void) => void> = [];
private flags: BroadcastFlags & { timeout?: number } = {};
private _anyListeners?: Array<(...args: any[]) => void>;
/**
@@ -157,8 +162,6 @@ export class Socket<
} else {
this.id = base64id.generateId(); // don't reuse the Engine.IO id because it's sensitive information
}
this.connected = true;
this.disconnected = false;
this.handshake = this.buildHandshake(auth);
}
@@ -177,7 +180,8 @@ export class Socket<
secure: !!this.request.connection.encrypted,
issued: +new Date(),
url: this.request.url!,
query: url.parse(this.request.url!, true).query,
// @ts-ignore
query: this.request._query,
auth,
};
}
@@ -203,9 +207,11 @@ export class Socket<
// access last argument to see if it's an ACK callback
if (typeof data[data.length - 1] === "function") {
debug("emitting packet with ack id %d", this.nsp._ids);
this.acks.set(this.nsp._ids, data.pop());
packet.id = this.nsp._ids++;
const id = this.nsp._ids++;
debug("emitting packet with ack id %d", id);
this.registerAckCallback(id, data.pop());
packet.id = id;
}
const flags = Object.assign({}, this.flags);
@@ -216,6 +222,28 @@ export class Socket<
return true;
}
/**
* @private
*/
private registerAckCallback(id: number, ack: (...args: any[]) => void): void {
const timeout = this.flags.timeout;
if (timeout === undefined) {
this.acks.set(id, ack);
return;
}
const timer = setTimeout(() => {
debug("event with ack id %d has timed out after %d ms", id, timeout);
this.acks.delete(id);
ack.call(this, new Error("operation has timed out"));
}, timeout);
this.acks.set(id, (...args) => {
clearTimeout(timer);
ack.apply(this, [null, ...args]);
});
}
/**
* Targets a room when broadcasting.
*
@@ -335,6 +363,7 @@ export class Socket<
*/
_onconnect(): void {
debug("socket connected - writing packet");
this.connected = true;
this.join(this.id);
if (this.conn.protocol === 3) {
this.packet({ type: PacketType.CONNECT });
@@ -482,7 +511,6 @@ export class Socket<
this.nsp._remove(this);
this.client._remove(this);
this.connected = false;
this.disconnected = true;
this.emitReserved("disconnect", reason);
return;
}
@@ -563,13 +591,33 @@ export class Socket<
return this.newBroadcastOperator().local;
}
/**
* Sets a modifier for a subsequent event emission that the callback will be called with an error when the
* given number of milliseconds have elapsed without an acknowledgement from the client:
*
* ```
* socket.timeout(5000).emit("my-event", (err) => {
* if (err) {
* // the client did not acknowledge the event in the given delay
* }
* });
* ```
*
* @returns self
* @public
*/
public timeout(timeout: number): this {
this.flags.timeout = timeout;
return this;
}
/**
* Dispatch incoming event to socket listeners.
*
* @param {Array} event - event that will get emitted
* @private
*/
private dispatch(event: [eventName: string, ...args: any[]]): void {
private dispatch(event: Event): void {
debug("dispatching an event %j", event);
this.run(event, (err) => {
process.nextTick(() => {
@@ -592,9 +640,7 @@ export class Socket<
* @return {Socket} self
* @public
*/
public use(
fn: (event: Array<any>, next: (err?: Error) => void) => void
): this {
public use(fn: (event: Event, next: (err?: Error) => void) => void): this {
this.fns.push(fn);
return this;
}
@@ -606,10 +652,7 @@ export class Socket<
* @param {Function} fn - last fn call in the middleware
* @private
*/
private run(
event: [eventName: string, ...args: any[]],
fn: (err: Error | null) => void
): void {
private run(event: Event, fn: (err: Error | null) => void): void {
const fns = this.fns.slice(0);
if (!fns.length) return fn(null);
@@ -629,6 +672,13 @@ export class Socket<
run(0);
}
/**
* Whether the socket is currently disconnected
*/
public get disconnected() {
return !this.connected;
}
/**
* A reference to the request that originated the underlying Engine.IO Socket.
*

162
lib/uws.ts Normal file
View File

@@ -0,0 +1,162 @@
import { Adapter, Room } from "socket.io-adapter";
import type { WebSocket } from "uWebSockets.js";
import type { Socket } from "./socket.js";
import { createReadStream, statSync } from "fs";
import debugModule from "debug";
const debug = debugModule("socket.io:adapter-uws");
const SEPARATOR = "\x1f"; // see https://en.wikipedia.org/wiki/Delimiter#ASCII_delimited_text
const { addAll, del, broadcast } = Adapter.prototype;
export function patchAdapter(app /* : TemplatedApp */) {
Adapter.prototype.addAll = function (id, rooms) {
const isNew = !this.sids.has(id);
addAll.call(this, id, rooms);
const socket: Socket = this.nsp.sockets.get(id);
if (!socket) {
return;
}
if (socket.conn.transport.name === "websocket") {
subscribe(this.nsp.name, socket, isNew, rooms);
return;
}
if (isNew) {
socket.conn.on("upgrade", () => {
const rooms = this.sids.get(id);
subscribe(this.nsp.name, socket, isNew, rooms);
});
}
};
Adapter.prototype.del = function (id, room) {
del.call(this, id, room);
const socket: Socket = this.nsp.sockets.get(id);
if (socket && socket.conn.transport.name === "websocket") {
// @ts-ignore
const sessionId = socket.conn.id;
// @ts-ignore
const websocket: WebSocket = socket.conn.transport.socket;
const topic = `${this.nsp.name}${SEPARATOR}${room}`;
debug("unsubscribe connection %s from topic %s", sessionId, topic);
websocket.unsubscribe(topic);
}
};
Adapter.prototype.broadcast = function (packet, opts) {
const useFastPublish = opts.rooms.size <= 1 && opts.except!.size === 0;
if (!useFastPublish) {
broadcast.call(this, packet, opts);
return;
}
const flags = opts.flags || {};
const basePacketOpts = {
preEncoded: true,
volatile: flags.volatile,
compress: flags.compress,
};
packet.nsp = this.nsp.name;
const encodedPackets = this.encoder.encode(packet);
const topic =
opts.rooms.size === 0
? this.nsp.name
: `${this.nsp.name}${SEPARATOR}${opts.rooms.keys().next().value}`;
debug("fast publish to %s", topic);
// fast publish for clients connected with WebSocket
encodedPackets.forEach((encodedPacket) => {
const isBinary = typeof encodedPacket !== "string";
// "4" being the message type in the Engine.IO protocol, see https://github.com/socketio/engine.io-protocol
app.publish(
topic,
isBinary ? encodedPacket : "4" + encodedPacket,
isBinary
);
});
this.apply(opts, (socket) => {
if (socket.conn.transport.name !== "websocket") {
// classic publish for clients connected with HTTP long-polling
socket.client.writeToEngine(encodedPackets, basePacketOpts);
}
});
};
}
function subscribe(
namespaceName: string,
socket: Socket,
isNew: boolean,
rooms: Set<Room>
) {
// @ts-ignore
const sessionId = socket.conn.id;
// @ts-ignore
const websocket: WebSocket = socket.conn.transport.socket;
if (isNew) {
debug("subscribe connection %s to topic %s", sessionId, namespaceName);
websocket.subscribe(namespaceName);
}
rooms.forEach((room) => {
const topic = `${namespaceName}${SEPARATOR}${room}`; // '#' can be used as wildcard
debug("subscribe connection %s to topic %s", sessionId, topic);
websocket.subscribe(topic);
});
}
export function restoreAdapter() {
Adapter.prototype.addAll = addAll;
Adapter.prototype.del = del;
Adapter.prototype.broadcast = broadcast;
}
const toArrayBuffer = (buffer: Buffer) => {
const { buffer: arrayBuffer, byteOffset, byteLength } = buffer;
return arrayBuffer.slice(byteOffset, byteOffset + byteLength);
};
// imported from https://github.com/kolodziejczak-sz/uwebsocket-serve
export function serveFile(res /* : HttpResponse */, filepath: string) {
const { size } = statSync(filepath);
const readStream = createReadStream(filepath);
const destroyReadStream = () => !readStream.destroyed && readStream.destroy();
const onError = (error: Error) => {
destroyReadStream();
throw error;
};
const onDataChunk = (chunk: Buffer) => {
const arrayBufferChunk = toArrayBuffer(chunk);
const lastOffset = res.getWriteOffset();
const [ok, done] = res.tryEnd(arrayBufferChunk, size);
if (!done && !ok) {
readStream.pause();
res.onWritable((offset) => {
const [ok, done] = res.tryEnd(
arrayBufferChunk.slice(offset - lastOffset),
size
);
if (!done && ok) {
readStream.resume();
}
return ok;
});
}
};
res.onAborted(destroyReadStream);
readStream
.on("data", onDataChunk)
.on("error", onError)
.on("end", destroyReadStream);
}

62
package-lock.json generated
View File

@@ -1,6 +1,6 @@
{
"name": "socket.io",
"version": "4.3.0",
"version": "4.4.0",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@@ -442,9 +442,9 @@
"dev": true
},
"@types/node": {
"version": "16.7.6",
"resolved": "https://registry.npmjs.org/@types/node/-/node-16.7.6.tgz",
"integrity": "sha512-VESVNFoa/ahYA62xnLBjo5ur6gPsgEE5cNRy8SrdnkZ2nwJSW0kJ4ufbFr2zuU9ALtHM8juY53VcRoTA7htXSg=="
"version": "16.11.7",
"resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.7.tgz",
"integrity": "sha512-QB5D2sqfSjCmTuWcBWyJ+/44bcjO7VbjSbOE0ucoVbAsSNQc4Lt6QkgkVXkTDwkL4z/beecZNDvVX15D4P8Jbw=="
},
"@types/normalize-package-data": {
"version": "2.4.0",
@@ -888,9 +888,9 @@
"dev": true
},
"engine.io": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.0.0.tgz",
"integrity": "sha512-Ui7yl3JajEIaACg8MOUwWvuuwU7jepZqX3BKs1ho7NQRuP4LhN4XIykXhp8bEy+x/DhA0LBZZXYSCkZDqrwMMg==",
"version": "6.1.0",
"resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.1.0.tgz",
"integrity": "sha512-ErhZOVu2xweCjEfYcTdkCnEYUiZgkAcBBAhW4jbIvNG8SLU3orAqoJCiytZjYF7eTpVmmCrLDjLIEaPlUAs1uw==",
"requires": {
"@types/cookie": "^0.4.1",
"@types/cors": "^2.8.12",
@@ -904,19 +904,6 @@
"ws": "~8.2.3"
},
"dependencies": {
"base64-arraybuffer": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/base64-arraybuffer/-/base64-arraybuffer-1.0.1.tgz",
"integrity": "sha512-vFIUq7FdLtjZMhATwDul5RZWv2jpXQ09Pd6jcVEOvIsqCWTRFD/ONHNfyOS8dA/Ippi5dsIgpyKWKZaAKZltbA=="
},
"engine.io-parser": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.0.0.tgz",
"integrity": "sha512-wn6QavHEqXoM+cg+x8uUG7GhxLBCfKEKNEsCNc7V2ugj3gB3lK91l1MuZiy6xFB2V9D1eew0aWkmpiT/aBb/KA==",
"requires": {
"base64-arraybuffer": "~1.0.1"
}
},
"ws": {
"version": "8.2.3",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.2.3.tgz",
@@ -925,9 +912,9 @@
}
},
"engine.io-client": {
"version": "6.0.1",
"resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.0.1.tgz",
"integrity": "sha512-/7xmDxU2i9aYmqz7ekseB2jUfUYvJ5s3+OJcEvObloQNB31wd+0j68kmLVAoZG7Qu0FzB+EN46hsc3bY/kjgSw==",
"version": "6.1.1",
"resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.1.1.tgz",
"integrity": "sha512-V05mmDo4gjimYW+FGujoGmmmxRaDsrVr7AXA3ZIfa04MWM1jOfZfUwou0oNqhNwy/votUDvGDt4JA4QF4e0b4g==",
"dev": true,
"requires": {
"@socket.io/component-emitter": "~3.0.0",
@@ -956,10 +943,9 @@
}
},
"engine.io-parser": {
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.0.0.tgz",
"integrity": "sha512-wn6QavHEqXoM+cg+x8uUG7GhxLBCfKEKNEsCNc7V2ugj3gB3lK91l1MuZiy6xFB2V9D1eew0aWkmpiT/aBb/KA==",
"dev": true,
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.0.1.tgz",
"integrity": "sha512-j4p3WwJrG2k92VISM0op7wiq60vO92MlF3CRGxhKHy9ywG1/Dkc72g0dXeDQ+//hrcDn8gqQzoEkdO9FN0d9AA==",
"requires": {
"base64-arraybuffer": "~1.0.1"
},
@@ -967,8 +953,7 @@
"base64-arraybuffer": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/base64-arraybuffer/-/base64-arraybuffer-1.0.1.tgz",
"integrity": "sha512-vFIUq7FdLtjZMhATwDul5RZWv2jpXQ09Pd6jcVEOvIsqCWTRFD/ONHNfyOS8dA/Ippi5dsIgpyKWKZaAKZltbA==",
"dev": true
"integrity": "sha512-vFIUq7FdLtjZMhATwDul5RZWv2jpXQ09Pd6jcVEOvIsqCWTRFD/ONHNfyOS8dA/Ippi5dsIgpyKWKZaAKZltbA=="
}
}
},
@@ -2446,20 +2431,20 @@
"dev": true
},
"socket.io-adapter": {
"version": "2.3.2",
"resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.3.2.tgz",
"integrity": "sha512-PBZpxUPYjmoogY0aoaTmo1643JelsaS1CiAwNjRVdrI0X9Seuc19Y2Wife8k88avW6haG8cznvwbubAZwH4Mtg=="
"version": "2.3.3",
"resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.3.3.tgz",
"integrity": "sha512-Qd/iwn3VskrpNO60BeRyCyr8ZWw9CPZyitW4AQwmRZ8zCiyDiL+znRnWX6tDHXnWn1sJrM1+b6Mn6wEDJJ4aYQ=="
},
"socket.io-client": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.3.0.tgz",
"integrity": "sha512-Cg3N7UWGTT2kYFZ1yShbGyrYDcqiMMv53cLNvOvweGJojwZ3N7x4fB+fyfNaq64tRUjV0Qljyj7nMV8+myBgqg==",
"version": "4.4.0",
"resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.4.0.tgz",
"integrity": "sha512-g7riSEJXi7qCFImPow98oT8X++MSsHz6MMFRXkWNJ6uEROSHOa3kxdrsYWMq85dO+09CFMkcqlpjvbVXQl4z6g==",
"dev": true,
"requires": {
"@socket.io/component-emitter": "~3.0.0",
"backo2": "~1.0.2",
"debug": "~4.3.2",
"engine.io-client": "~6.0.1",
"engine.io-client": "~6.1.1",
"parseuri": "0.0.6",
"socket.io-parser": "~4.1.1"
},
@@ -2849,6 +2834,11 @@
"integrity": "sha512-gzP+t5W4hdy4c+68bfcv0t400HVJMMd2+H9B7gae1nQlBzCqvrXX+6GL/b3GAgyTH966pzrZ70/fRjwAtZksSQ==",
"dev": true
},
"uWebSockets.js": {
"version": "github:uNetworking/uWebSockets.js#4558ee00f9f1f686fffe1accbfc2e85b1af9c50f",
"from": "github:uNetworking/uWebSockets.js#v20.0.0",
"dev": true
},
"util-deprecate": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz",

View File

@@ -1,6 +1,6 @@
{
"name": "socket.io",
"version": "4.3.0",
"version": "4.4.0",
"description": "node.js realtime framework server",
"keywords": [
"realtime",
@@ -48,8 +48,8 @@
"accepts": "~1.3.4",
"base64id": "~2.0.0",
"debug": "~4.3.2",
"engine.io": "~6.0.0",
"socket.io-adapter": "~2.3.2",
"engine.io": "~6.1.0",
"socket.io-adapter": "~2.3.3",
"socket.io-parser": "~4.0.4"
},
"devDependencies": {
@@ -59,13 +59,14 @@
"nyc": "^15.1.0",
"prettier": "^2.3.2",
"rimraf": "^3.0.2",
"socket.io-client": "4.3.0",
"socket.io-client": "4.4.0",
"socket.io-client-v2": "npm:socket.io-client@^2.4.0",
"superagent": "^6.1.0",
"supertest": "^6.1.6",
"ts-node": "^10.2.1",
"tsd": "^0.17.0",
"typescript": "^4.4.2"
"typescript": "^4.4.2",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.0.0"
},
"contributors": [
{

57
test/socket-timeout.ts Normal file
View File

@@ -0,0 +1,57 @@
import { Server } from "..";
import { createClient, success } from "./support/util";
import expect from "expect.js";
describe("timeout", () => {
it("should timeout if the client does not acknowledge the event", (done) => {
const io = new Server(0);
const client = createClient(io, "/");
io.on("connection", (socket) => {
socket.timeout(50).emit("unknown", (err) => {
expect(err).to.be.an(Error);
success(done, io, client);
});
});
});
it("should timeout if the client does not acknowledge the event in time", (done) => {
const io = new Server(0);
const client = createClient(io, "/");
client.on("echo", (arg, cb) => {
cb(arg);
});
let count = 0;
io.on("connection", (socket) => {
socket.timeout(0).emit("echo", 42, (err) => {
expect(err).to.be.an(Error);
count++;
});
});
setTimeout(() => {
expect(count).to.eql(1);
success(done, io, client);
}, 200);
});
it("should not timeout if the client does acknowledge the event", (done) => {
const io = new Server(0);
const client = createClient(io, "/");
client.on("echo", (arg, cb) => {
cb(arg);
});
io.on("connection", (socket) => {
socket.timeout(50).emit("echo", 42, (err, value) => {
expect(err).to.be(null);
expect(value).to.be(42);
success(done, io, client);
});
});
});
});

View File

@@ -14,6 +14,9 @@ import { io as ioc, Socket as ClientSocket } from "socket.io-client";
import "./support/util";
import "./utility-methods";
import "./uws";
type callback = (err: Error | null, success: boolean) => void;
// Creates a socket.io client for the given server
function client(srv, nsp?: string | object, opts?: object): ClientSocket {
@@ -818,29 +821,6 @@ describe("socket.io", () => {
});
});
it("should close a client without namespace (2)", (done) => {
const srv = createServer();
const sio = new Server(srv, {
connectTimeout: 100,
});
sio.use((_, next) => {
next(new Error("nope"));
});
srv.listen(() => {
const socket = client(srv);
const success = () => {
socket.close();
sio.close();
done();
};
socket.on("disconnect", success);
});
});
it("should exclude a specific socket when emitting", (done) => {
const srv = createServer();
const io = new Server(srv);
@@ -998,6 +978,46 @@ describe("socket.io", () => {
const socket = client(srv, "/dynamic-101");
});
});
it("should handle race conditions with dynamic namespaces (#4136)", (done) => {
const srv = createServer();
const sio = new Server(srv);
const counters = {
connected: 0,
created: 0,
events: 0,
};
const buffer: callback[] = [];
sio.on("new_namespace", (namespace) => {
counters.created++;
});
srv.listen(() => {
const handler = () => {
if (++counters.events === 2) {
expect(counters.created).to.equal(1);
done();
}
};
sio
.of((name, query, next) => {
buffer.push(next);
if (buffer.length === 2) {
buffer.forEach((next) => next(null, true));
}
})
.on("connection", (socket) => {
if (++counters.connected === 2) {
sio.of("/dynamic-101").emit("message");
}
});
let one = client(srv, "/dynamic-101");
let two = client(srv, "/dynamic-101");
one.on("message", handler);
two.on("message", handler);
});
});
});
});
@@ -1030,7 +1050,7 @@ describe("socket.io", () => {
reconnectionDelay: 100,
});
clientSocket.on("connect", () => {
srv.close();
sio.close();
});
clientSocket.io.on("reconnect_failed", () => {
@@ -1410,6 +1430,32 @@ describe("socket.io", () => {
}, 200);
});
it("should broadcast only one consecutive volatile event with binary (ws)", (done) => {
const srv = createServer();
const sio = new Server(srv, { transports: ["websocket"] });
let counter = 0;
srv.listen(() => {
sio.on("connection", (s) => {
// Wait to make sure there are no packets being sent for opening the connection
setTimeout(() => {
sio.volatile.emit("ev", Buffer.from([1, 2, 3]));
sio.volatile.emit("ev", Buffer.from([4, 5, 6]));
}, 20);
});
const socket = client(srv, { transports: ["websocket"] });
socket.on("ev", () => {
counter++;
});
});
setTimeout(() => {
expect(counter).to.be(1);
done();
}, 200);
});
it("should emit regular events after trying a failed volatile event (polling)", (done) => {
const srv = createServer();
const sio = new Server(srv, { transports: ["polling"] });
@@ -2473,28 +2519,6 @@ describe("socket.io", () => {
});
});
});
it("should pre encode a broadcast packet", (done) => {
const srv = createServer();
const sio = new Server(srv);
srv.listen(() => {
const clientSocket = client(srv, { multiplex: false });
sio.on("connection", (socket) => {
socket.conn.on("packetCreate", (packet) => {
expect(packet.data).to.eql('2["hello","world"]');
expect(packet.options.wsPreEncoded).to.eql('42["hello","world"]');
clientSocket.close();
sio.close();
done();
});
sio.emit("hello", "world");
});
});
});
});
describe("middleware", () => {
@@ -2683,6 +2707,25 @@ describe("socket.io", () => {
if (++count === 2) done();
});
});
it("should only set `connected` to true after the middleware execution", (done) => {
const httpServer = createServer();
const io = new Server(httpServer);
const clientSocket = client(httpServer, "/");
io.use((socket, next) => {
expect(socket.connected).to.be(false);
expect(socket.disconnected).to.be(true);
next();
});
io.on("connection", (socket) => {
expect(socket.connected).to.be(true);
expect(socket.disconnected).to.be(false);
success(io, clientSocket, done);
});
});
});
describe("socket middleware", () => {
@@ -2822,4 +2865,6 @@ describe("socket.io", () => {
});
});
});
require("./socket-timeout");
});

View File

@@ -1,3 +1,11 @@
import type { Server } from "../..";
import {
io as ioc,
ManagerOptions,
Socket as ClientSocket,
SocketOptions,
} from "socket.io-client";
const expect = require("expect.js");
const i = expect.stringify;
@@ -20,3 +28,19 @@ expect.Assertion.prototype.contain = function (...args) {
}
return contain.apply(this, args);
};
export function createClient(
io: Server,
nsp: string,
opts?: ManagerOptions & SocketOptions
): ClientSocket {
// @ts-ignore
const port = io.httpServer.address().port;
return ioc(`http://localhost:${port}${nsp}`, opts);
}
export function success(done: Function, io: Server, client: ClientSocket) {
io.close();
client.disconnect();
done();
}

195
test/uws.ts Normal file
View File

@@ -0,0 +1,195 @@
import { App, us_socket_local_port } from "uWebSockets.js";
import { Server } from "..";
import { io as ioc, Socket as ClientSocket } from "socket.io-client";
import request from "supertest";
import expect from "expect.js";
const createPartialDone = (done: (err?: Error) => void, count: number) => {
let i = 0;
return () => {
if (++i === count) {
done();
} else if (i > count) {
done(new Error(`partialDone() called too many times: ${i} > ${count}`));
}
};
};
const shouldNotHappen = (done) => () => done(new Error("should not happen"));
describe("socket.io with uWebSocket.js-based engine", () => {
let io: Server,
port: number,
client: ClientSocket,
clientWSOnly: ClientSocket,
clientPollingOnly: ClientSocket,
clientCustomNamespace: ClientSocket;
beforeEach((done) => {
const app = App();
io = new Server();
io.attachApp(app);
io.of("/custom");
app.listen(0, (listenSocket) => {
port = us_socket_local_port(listenSocket);
client = ioc(`http://localhost:${port}`);
clientWSOnly = ioc(`http://localhost:${port}`, {
transports: ["websocket"],
});
clientPollingOnly = ioc(`http://localhost:${port}`, {
transports: ["polling"],
});
clientCustomNamespace = ioc(`http://localhost:${port}/custom`);
});
const partialDone = createPartialDone(done, 4);
io.on("connection", partialDone);
io.of("/custom").on("connection", partialDone);
});
afterEach(() => {
io.close();
client.disconnect();
clientWSOnly.disconnect();
clientPollingOnly.disconnect();
clientCustomNamespace.disconnect();
});
it("should broadcast", (done) => {
const partialDone = createPartialDone(done, 3);
client.on("hello", partialDone);
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));
io.emit("hello");
});
it("should broadcast in a namespace", (done) => {
client.on("hello", shouldNotHappen(done));
clientWSOnly.on("hello", shouldNotHappen(done));
clientPollingOnly.on("hello", shouldNotHappen(done));
clientCustomNamespace.on("hello", done);
io.of("/custom").emit("hello");
});
it("should broadcast in a dynamic namespace", (done) => {
const dynamicNamespace = io.of(/\/dynamic-\d+/);
const dynamicClient = clientWSOnly.io.socket("/dynamic-101");
dynamicClient.on("connect", () => {
dynamicNamespace.emit("hello");
});
dynamicClient.on("hello", () => {
dynamicClient.disconnect();
done();
});
});
it("should broadcast binary content", (done) => {
const partialDone = createPartialDone(done, 3);
client.on("hello", partialDone);
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));
io.emit("hello", Buffer.from([1, 2, 3]));
});
it("should broadcast volatile packet with binary content", (done) => {
const partialDone = createPartialDone(done, 3);
client.on("hello", partialDone);
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));
// wait to make sure there are no packets being sent for opening the connection
setTimeout(() => {
io.volatile.emit("hello", Buffer.from([1, 2, 3]));
}, 20);
});
it("should broadcast in a room", (done) => {
const partialDone = createPartialDone(done, 2);
client.on("hello", shouldNotHappen(done));
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));
io.of("/").sockets.get(clientWSOnly.id)!.join("room1");
io.of("/").sockets.get(clientPollingOnly.id)!.join("room1");
io.to("room1").emit("hello");
});
it("should broadcast in multiple rooms", (done) => {
const partialDone = createPartialDone(done, 2);
client.on("hello", shouldNotHappen(done));
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));
io.of("/").sockets.get(clientWSOnly.id)!.join("room1");
io.of("/").sockets.get(clientPollingOnly.id)!.join("room2");
io.to(["room1", "room2"]).emit("hello");
});
it("should broadcast in all but a given room", (done) => {
const partialDone = createPartialDone(done, 2);
client.on("hello", partialDone);
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", shouldNotHappen(done));
clientCustomNamespace.on("hello", shouldNotHappen(done));
io.of("/").sockets.get(clientWSOnly.id)!.join("room1");
io.of("/").sockets.get(clientPollingOnly.id)!.join("room2");
io.except("room2").emit("hello");
});
it("should work even after leaving room", (done) => {
const partialDone = createPartialDone(done, 2);
client.on("hello", partialDone);
clientWSOnly.on("hello", shouldNotHappen(done));
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));
io.of("/").sockets.get(client.id)!.join("room1");
io.of("/").sockets.get(clientPollingOnly.id)!.join("room1");
io.of("/").sockets.get(clientWSOnly.id)!.join("room1");
io.of("/").sockets.get(clientWSOnly.id)!.leave("room1");
io.to("room1").emit("hello");
});
it("should serve static files", (done) => {
const clientVersion = require("socket.io-client/package.json").version;
request(`http://localhost:${port}`)
.get("/socket.io/socket.io.js")
.buffer(true)
.end((err, res) => {
if (err) return done(err);
expect(res.headers["content-type"]).to.be("application/javascript");
expect(res.headers.etag).to.be('"' + clientVersion + '"');
expect(res.headers["x-sourcemap"]).to.be(undefined);
expect(res.text).to.match(/engine\.io/);
expect(res.status).to.be(200);
done();
});
});
});