Compare commits

...

4 Commits

Author SHA1 Message Date
Damien Arrachequesne
7521ac227b chore(release): @socket.io/cluster-engine@0.1.0 2024-07-17 09:53:34 +02:00
Damien Arrachequesne
b00124b65a feat: implement cluster-friendly engine 2024-07-17 09:27:12 +02:00
Damien Arrachequesne
b7da542890 chore: normalize repository URLs
In order to address the following warning when publishing:

> npm warn publish npm auto-corrected some errors in your package.json when publishing.  Please run "npm pkg fix" to address these errors.
> npm warn publish errors corrected:
> npm warn publish "repository.url" was normalized to "git+https://github.com/socketio/socket.io.git"
2024-07-11 14:59:00 +02:00
Damien Arrachequesne
0692bed462 chore: fix the publish workflow
It seems the "registry-url" variable is mandatory: https://github.com/npm/cli/issues/6184
2024-07-11 13:45:44 +02:00
29 changed files with 2184 additions and 9 deletions

View File

@@ -21,6 +21,17 @@ jobs:
- 18
- 20
services:
redis:
image: redis:7
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
- name: Checkout repository
uses: actions/checkout@v4

View File

@@ -23,6 +23,7 @@ jobs:
uses: actions/setup-node@v4
with:
node-version: 20
registry-url: 'https://registry.npmjs.org'
- name: Install dependencies
run: npm ci

View File

@@ -10,5 +10,6 @@ Here are the detailed changelogs for each package in this monorepo:
| `socket.io` | [link](packages/socket.io/CHANGELOG.md) |
| `socket.io-adapter` | [link](packages/socket.io-adapter/CHANGELOG.md) |
| `socket.io-client` | [link](packages/socket.io-client/CHANGELOG.md) |
| `@socket.io/cluster-engine` | [link](packages/socket.io-cluster-engine/CHANGELOG.md) |
| `@socket.io/component-emitter` | [link](packages/socket.io-component-emitter/History.md) |
| `socket.io-parser` | [link](packages/socket.io-parser/CHANGELOG.md) |

View File

@@ -78,6 +78,7 @@ This repository is a [monorepo](https://en.wikipedia.org/wiki/Monorepo) which co
| `socket.io` | The server-side implementation of the bidirectional channel, built on top on the `engine.io` package. |
| `socket.io-adapter` | An extensible component responsible for broadcasting a packet to all connected clients, used by the `socket.io` package. |
| `socket.io-client` | The client-side implementation of the bidirectional channel, built on top on the `engine.io-client` package. |
| `@socket.io/cluster-engine` | A cluster-friendly engine to share load between multiple Node.js processes (without sticky sessions) |
| `@socket.io/component-emitter` | An `EventEmitter` implementation, similar to the one provided by [Node.js](https://nodejs.org/api/events.html) but for all platforms. |
| `socket.io-parser` | The parser responsible for encoding and decoding Socket.IO packets, used by both the `socket.io` and `socket.io-client` packages. |

217
package-lock.json generated
View File

@@ -8,6 +8,7 @@
"packages/socket.io-component-emitter",
"packages/engine.io-parser",
"packages/engine.io",
"packages/socket.io-cluster-engine",
"packages/engine.io-client",
"packages/socket.io-adapter",
"packages/socket.io-parser",
@@ -46,10 +47,12 @@
"express-session": "^1.18.0",
"has-cors": "^1.1.0",
"helmet": "^7.1.0",
"ioredis": "^5.4.1",
"mocha": "^10.6.0",
"node-forge": "^1.3.1",
"nyc": "^17.0.0",
"prettier": "^2.8.8",
"redis": "^4.6.15",
"rimraf": "^6.0.0",
"rollup": "^2.79.1",
"rollup-plugin-terser": "^7.0.2",
@@ -1846,6 +1849,12 @@
"node": ">=16"
}
},
"node_modules/@ioredis/commands": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz",
"integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==",
"dev": true
},
"node_modules/@isaacs/cliui": {
"version": "8.0.2",
"resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz",
@@ -2235,6 +2244,14 @@
"node": ">= 0.4"
}
},
"node_modules/@msgpack/msgpack": {
"version": "2.8.0",
"resolved": "https://registry.npmjs.org/@msgpack/msgpack/-/msgpack-2.8.0.tgz",
"integrity": "sha512-h9u4u/jiIRKbq25PM+zymTyW6bhTzELvOoUd+AvYriWOAKpLGnIamaET3pnHYoI5iYphAHBI4ayx0MehR+VVPQ==",
"engines": {
"node": ">= 10"
}
},
"node_modules/@nodelib/fs.scandir": {
"version": "2.1.5",
"resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz",
@@ -2359,6 +2376,71 @@
"streamx": "^2.15.0"
}
},
"node_modules/@redis/bloom": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz",
"integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==",
"dev": true,
"peerDependencies": {
"@redis/client": "^1.0.0"
}
},
"node_modules/@redis/client": {
"version": "1.5.17",
"resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.17.tgz",
"integrity": "sha512-IPvU9A31qRCZ7lds/x+ksuK/UMndd0EASveAvCvEtFFKIZjZ+m/a4a0L7S28KEWoR5ka8526hlSghDo4Hrc2Hg==",
"dev": true,
"dependencies": {
"cluster-key-slot": "1.1.2",
"generic-pool": "3.9.0",
"yallist": "4.0.0"
},
"engines": {
"node": ">=14"
}
},
"node_modules/@redis/client/node_modules/yallist": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz",
"integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==",
"dev": true
},
"node_modules/@redis/graph": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.1.tgz",
"integrity": "sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==",
"dev": true,
"peerDependencies": {
"@redis/client": "^1.0.0"
}
},
"node_modules/@redis/json": {
"version": "1.0.6",
"resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.6.tgz",
"integrity": "sha512-rcZO3bfQbm2zPRpqo82XbW8zg4G/w4W3tI7X8Mqleq9goQjAGLL7q/1n1ZX4dXEAmORVZ4s1+uKLaUOg7LrUhw==",
"dev": true,
"peerDependencies": {
"@redis/client": "^1.0.0"
}
},
"node_modules/@redis/search": {
"version": "1.1.6",
"resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.6.tgz",
"integrity": "sha512-mZXCxbTYKBQ3M2lZnEddwEAks0Kc7nauire8q20oA0oA/LoA+E/b5Y5KZn232ztPb1FkIGqo12vh3Lf+Vw5iTw==",
"dev": true,
"peerDependencies": {
"@redis/client": "^1.0.0"
}
},
"node_modules/@redis/time-series": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.5.tgz",
"integrity": "sha512-IFjIgTusQym2B5IZJG3XKr5llka7ey84fw/NOYqESP5WUfQs9zz1ww/9+qoz4ka/S6KcGBodzlCeZ5UImKbscg==",
"dev": true,
"peerDependencies": {
"@redis/client": "^1.0.0"
}
},
"node_modules/@rollup/plugin-alias": {
"version": "5.1.0",
"resolved": "https://registry.npmjs.org/@rollup/plugin-alias/-/plugin-alias-5.1.0.tgz",
@@ -2640,6 +2722,10 @@
"@sinonjs/commons": "^3.0.0"
}
},
"node_modules/@socket.io/cluster-engine": {
"resolved": "packages/socket.io-cluster-engine",
"link": true
},
"node_modules/@socket.io/component-emitter": {
"resolved": "packages/socket.io-component-emitter",
"link": true
@@ -5349,6 +5435,15 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/cluster-key-slot": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz",
"integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==",
"dev": true,
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/color-convert": {
"version": "1.9.3",
"resolved": "https://registry.npmjs.org/color-convert/-/color-convert-1.9.3.tgz",
@@ -5975,6 +6070,15 @@
"node": ">=0.4.0"
}
},
"node_modules/denque": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz",
"integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==",
"dev": true,
"engines": {
"node": ">=0.10"
}
},
"node_modules/depd": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz",
@@ -7645,6 +7749,15 @@
"node": "^16.13.0 || >=18.0.0"
}
},
"node_modules/generic-pool": {
"version": "3.9.0",
"resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz",
"integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==",
"dev": true,
"engines": {
"node": ">= 4"
}
},
"node_modules/gensync": {
"version": "1.0.0-beta.2",
"resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz",
@@ -8373,6 +8486,30 @@
"node": ">=8"
}
},
"node_modules/ioredis": {
"version": "5.4.1",
"resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.4.1.tgz",
"integrity": "sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==",
"dev": true,
"dependencies": {
"@ioredis/commands": "^1.1.1",
"cluster-key-slot": "^1.1.0",
"debug": "^4.3.4",
"denque": "^2.1.0",
"lodash.defaults": "^4.2.0",
"lodash.isarguments": "^3.1.0",
"redis-errors": "^1.2.0",
"redis-parser": "^3.0.0",
"standard-as-callback": "^2.1.0"
},
"engines": {
"node": ">=12.22.0"
},
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/ioredis"
}
},
"node_modules/ip": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/ip/-/ip-2.0.1.tgz",
@@ -9480,12 +9617,24 @@
"integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==",
"dev": true
},
"node_modules/lodash.defaults": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz",
"integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==",
"dev": true
},
"node_modules/lodash.flattendeep": {
"version": "4.4.0",
"resolved": "https://registry.npmjs.org/lodash.flattendeep/-/lodash.flattendeep-4.4.0.tgz",
"integrity": "sha512-uHaJFihxmJcEX3kT4I23ABqKKalJ/zDrDg0lsFtc1h+3uw49SIJ5beyhx5ExVRti3AvKoOJngIj7xz3oylPdWQ==",
"dev": true
},
"node_modules/lodash.isarguments": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz",
"integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==",
"dev": true
},
"node_modules/lodash.isequal": {
"version": "4.5.0",
"resolved": "https://registry.npmjs.org/lodash.isequal/-/lodash.isequal-4.5.0.tgz",
@@ -11857,6 +12006,41 @@
"node": ">=8"
}
},
"node_modules/redis": {
"version": "4.6.15",
"resolved": "https://registry.npmjs.org/redis/-/redis-4.6.15.tgz",
"integrity": "sha512-2NtuOpMW3tnYzBw6S8mbXSX7RPzvVFCA2wFJq9oErushO2UeBkxObk+uvo7gv7n0rhWeOj/IzrHO8TjcFlRSOg==",
"dev": true,
"dependencies": {
"@redis/bloom": "1.2.0",
"@redis/client": "1.5.17",
"@redis/graph": "1.1.1",
"@redis/json": "1.0.6",
"@redis/search": "1.1.6",
"@redis/time-series": "1.0.5"
}
},
"node_modules/redis-errors": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz",
"integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==",
"dev": true,
"engines": {
"node": ">=4"
}
},
"node_modules/redis-parser": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz",
"integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==",
"dev": true,
"dependencies": {
"redis-errors": "^1.0.0"
},
"engines": {
"node": ">=4"
}
},
"node_modules/regenerate": {
"version": "1.4.2",
"resolved": "https://registry.npmjs.org/regenerate/-/regenerate-1.4.2.tgz",
@@ -12997,6 +13181,12 @@
"node": ">=8"
}
},
"node_modules/standard-as-callback": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz",
"integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==",
"dev": true
},
"node_modules/statuses": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz",
@@ -15008,7 +15198,7 @@
}
},
"packages/engine.io-parser": {
"version": "5.2.2",
"version": "5.2.3",
"license": "MIT",
"devDependencies": {
"prettier": "^3.3.2"
@@ -15113,6 +15303,31 @@
"xmlhttprequest-ssl": "~2.0.0"
}
},
"packages/socket.io-cluster-engine": {
"version": "0.1.0",
"license": "MIT",
"dependencies": {
"@msgpack/msgpack": "~2.8.0",
"debug": "~4.3.3",
"engine.io": "~6.6.0",
"engine.io-parser": "~5.2.3"
},
"engines": {
"node": ">=10.2.0"
}
},
"packages/socket.io-clustered-engine": {
"name": "@socket.io/clustered-engine",
"version": "0.0.1",
"extraneous": true,
"license": "MIT",
"dependencies": {
"@msgpack/msgpack": "~2.8.0",
"debug": "~4.3.3",
"engine.io": "~6.6.0",
"engine.io-parser": "~5.2.3"
}
},
"packages/socket.io-component-emitter": {
"name": "@socket.io/component-emitter",
"version": "3.1.2",

View File

@@ -4,6 +4,7 @@
"packages/socket.io-component-emitter",
"packages/engine.io-parser",
"packages/engine.io",
"packages/socket.io-cluster-engine",
"packages/engine.io-client",
"packages/socket.io-adapter",
"packages/socket.io-parser",
@@ -47,10 +48,12 @@
"express-session": "^1.18.0",
"has-cors": "^1.1.0",
"helmet": "^7.1.0",
"ioredis": "^5.4.1",
"mocha": "^10.6.0",
"node-forge": "^1.3.1",
"nyc": "^17.0.0",
"prettier": "^2.8.8",
"redis": "^4.6.15",
"rimraf": "^6.0.0",
"rollup": "^2.79.1",
"rollup-plugin-terser": "^7.0.2",

View File

@@ -82,7 +82,7 @@
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/engine.io-client#readme",
"repository": {
"type": "git",
"url": "https://github.com/socketio/socket.io.git"
"url": "git+https://github.com/socketio/socket.io.git"
},
"bugs": {
"url": "https://github.com/socketio/socket.io/issues"

View File

@@ -25,7 +25,7 @@
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/engine.io-parser#readme",
"repository": {
"type": "git",
"url": "https://github.com/socketio/socket.io.git"
"url": "git+https://github.com/socketio/socket.io.git"
},
"bugs": {
"url": "https://github.com/socketio/socket.io/issues"

View File

@@ -56,7 +56,7 @@
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/engine.io#readme",
"repository": {
"type": "git",
"url": "https://github.com/socketio/socket.io.git"
"url": "git+https://github.com/socketio/socket.io.git"
},
"bugs": {
"url": "https://github.com/socketio/socket.io/issues"

View File

@@ -5,7 +5,7 @@
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io-adapter#readme",
"repository": {
"type": "git",
"url": "https://github.com/socketio/socket.io.git"
"url": "git+https://github.com/socketio/socket.io.git"
},
"bugs": {
"url": "https://github.com/socketio/socket.io/issues"

View File

@@ -82,7 +82,7 @@
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io-client#readme",
"repository": {
"type": "git",
"url": "https://github.com/socketio/socket.io.git"
"url": "git+https://github.com/socketio/socket.io.git"
},
"bugs": {
"url": "https://github.com/socketio/socket.io/issues"

View File

@@ -0,0 +1,11 @@
# History
| Version | Release date |
|--------------------------|--------------|
| [0.1.0](#010-2024-07-17) | July 2024 |
# Release notes
## `0.1.0` (2024-07-17)
Initial release!

View File

@@ -0,0 +1,22 @@
(The MIT License)
Copyright (c) 2024-present Guillermo Rauch and Socket.IO contributors
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
'Software'), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -0,0 +1,194 @@
# Socket.IO cluster engine
A cluster-friendly engine to share load between multiple Node.js processes (without sticky sessions).
**Table of contents**
<!-- TOC -->
* [Installation](#installation)
* [Usage](#usage)
* [Node.js cluster](#nodejs-cluster)
* [Redis](#redis)
* [Node.js cluster & Redis](#nodejs-cluster--redis)
* [Options](#options)
* [How it works](#how-it-works)
* [License](#license)
<!-- TOC -->
## Installation
```
npm i @socket.io/cluster-engine
```
NPM: https://npmjs.com/package/@socket.io/cluster-engine
## Usage
### Node.js cluster
```js
import cluster from "node:cluster";
import process from "node:process";
import { availableParallelism } from "node:os";
import { setupPrimary, NodeClusterEngine } from "@socket.io/cluster-engine";
import { createServer } from "node:http";
import { Server } from "socket.io";
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
const numCPUs = availableParallelism();
// fork workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
// setup connection within the cluster
setupPrimary();
// needed for packets containing Buffer objects (you can ignore it if you only send plaintext objects)
cluster.setupPrimary({
serialization: "advanced",
});
cluster.on("exit", (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
const httpServer = createServer((req, res) => {
res.writeHead(404).end();
});
const engine = new NodeClusterEngine();
engine.attach(httpServer, {
path: "/socket.io/"
});
const io = new Server();
io.bind(engine);
// workers will share the same port
httpServer.listen(3000);
console.log(`Worker ${process.pid} started`);
}
```
### Redis
```js
import { createServer } from "node:http";
import { createClient } from "redis";
import { RedisEngine } from "@socket.io/cluster-engine";
import { Server } from "socket.io";
const httpServer = createServer((req, res) => {
res.writeHead(404).end();
});
const pubClient = createClient();
const subClient = pubClient.duplicate();
await Promise.all([
pubClient.connect(),
subClient.connect(),
]);
const engine = new RedisEngine(pubClient, subClient);
engine.attach(httpServer, {
path: "/socket.io/"
});
const io = new Server();
io.bind(engine);
httpServer.listen(3000);
```
### Node.js cluster & Redis
```js
import cluster from "node:cluster";
import process from "node:process";
import { availableParallelism } from "node:os";
import { createClient } from "redis";
import { setupPrimaryWithRedis, NodeClusterEngine } from "@socket.io/cluster-engine";
import { createServer } from "node:http";
import { Server } from "socket.io";
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
const numCPUs = availableParallelism();
// fork workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
const pubClient = createClient();
const subClient = pubClient.duplicate();
await Promise.all([
pubClient.connect(),
subClient.connect(),
]);
// setup connection between and within the clusters
setupPrimaryWithRedis(pubClient, subClient);
// needed for packets containing Buffer objects (you can ignore it if you only send plaintext objects)
cluster.setupPrimary({
serialization: "advanced",
});
cluster.on("exit", (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
const httpServer = createServer((req, res) => {
res.writeHead(404).end();
});
const engine = new NodeClusterEngine();
engine.attach(httpServer, {
path: "/socket.io/"
});
const io = new Server();
io.bind(engine);
// workers will share the same port
httpServer.listen(3000);
console.log(`Worker ${process.pid} started`);
}
```
## Options
| Name | Description | Default value |
|----------------------------|-----------------------------------------------------------------------|---------------|
| `responseTimeout` | The maximum waiting time for responses from other nodes, in ms. | `1000 ms` |
| `noopUpgradeInterval` | The delay between two "noop" packets when the client upgrades, in ms. | `200 ms` |
| `delayedConnectionTimeout` | The maximum waiting time for a successful upgrade, in ms. | `300 ms` |
## How it works
This engine extends the one provided by the `engine.io` package, so that sticky sessions are not required when scaling horizontally.
The Node.js workers communicate via the IPC channel (or via Redis pub/sub) to check whether the Engine.IO session exists on another worker. In that case, the packets are forwarded to the worker which owns the session.
Additionally, when a client starts with HTTP long-polling, the connection is delayed to allow the client to upgrade, so that the WebSocket connection ends up on the worker which owns the session.
## License
[MIT](LICENSE)

View File

@@ -0,0 +1,5 @@
services:
redis:
image: redis:7
ports:
- "6379:6379"

View File

@@ -0,0 +1,65 @@
import cluster from "node:cluster";
import { type ServerOptions } from "engine.io";
import { ClusterEngine, type Message } from "./engine";
import debugModule from "debug";
const debug = debugModule("engine:cluster");
const MESSAGE_SOURCE = "_eio";
const kNodeId = Symbol("nodeId");
function ignoreError() {}
export function setupPrimary() {
cluster.on("message", (sourceWorker, message: { _source?: string }) => {
if (message._source !== MESSAGE_SOURCE) {
debug("ignore message from unknown source");
return;
}
if (!sourceWorker[kNodeId]) {
sourceWorker[kNodeId] = (message as Message).senderId;
}
// @ts-expect-error recipientId is not defined for all messages
let recipientId = (message as Message).recipientId;
if (recipientId) {
for (const worker of Object.values(cluster.workers)) {
if (worker[kNodeId] === recipientId) {
debug("forward message to worker %d", worker.id);
worker.send(message, null, ignoreError);
return;
}
}
}
debug("forward message to all other workers");
for (const worker of Object.values(cluster.workers)) {
if (worker.id !== sourceWorker.id) {
worker.send(message, null, ignoreError);
}
}
});
}
export class NodeClusterEngine extends ClusterEngine {
constructor(opts?: ServerOptions) {
super(opts);
process.on("message", (message: Message & { _source?: string }) => {
if (message._source !== MESSAGE_SOURCE) {
debug("ignore message from unknown source");
return;
}
debug("received message: %j", message);
this.onMessage(message);
});
}
override publishMessage(message: Message & { _source?: string }) {
message._source = MESSAGE_SOURCE;
debug("send message to primary");
process.send(message, null, { swallowErrors: true }, ignoreError);
}
}

View File

@@ -0,0 +1,687 @@
import { Server, type ServerOptions, Socket, type Transport } from "engine.io";
import { randomBytes } from "node:crypto";
import { setTimeout, clearTimeout } from "node:timers";
import { type IncomingMessage } from "node:http";
import { type Packet } from "engine.io-parser";
import debugModule from "debug";
const debug = debugModule("engine");
const kDelayed = Symbol("delayed");
const kDelayedTimer = Symbol("delayedTimer");
const kBuffer = Symbol("buffer");
const kPacketListener = Symbol("packetListener");
const kNoopTimer = Symbol("noopTimer");
const kSenderId = Symbol("senderId");
type Brand<K, T> = K & { __brand: T };
type NodeId = Brand<string, "NodeId">;
type SessionId = Brand<string, "SessionId">;
type RequestId = Brand<number, "RequestId">;
function randomId() {
return randomBytes(3).toString("hex");
}
enum MessageType {
ACQUIRE_LOCK = 0,
ACQUIRE_LOCK_RESPONSE,
DRAIN,
PACKET,
UPGRADE,
UPGRADE_RESPONSE,
CLOSE,
}
export type Message = {
senderId: NodeId;
} & (
| {
requestId: RequestId;
type: MessageType.ACQUIRE_LOCK;
data: {
sid: SessionId;
transportName: string;
type: "read" | "write";
};
}
| {
recipientId: NodeId;
requestId: RequestId;
type: MessageType.ACQUIRE_LOCK_RESPONSE;
data: {
success: boolean;
};
}
| {
recipientId: NodeId;
type: MessageType.DRAIN;
data: {
sid: SessionId;
packets: Packet[];
};
}
| {
recipientId: NodeId;
type: MessageType.PACKET;
data: {
sid: SessionId;
packet: Packet;
};
}
| {
requestId: RequestId;
recipientId: NodeId;
type: MessageType.UPGRADE;
data: {
sid: SessionId;
success: boolean;
};
}
| {
requestId: RequestId;
recipientId: NodeId;
type: MessageType.UPGRADE_RESPONSE;
data: {
takeOver: boolean;
packets: Packet[];
};
}
| {
recipientId: NodeId;
type: MessageType.CLOSE;
data: {
sid: SessionId;
reason: string;
};
}
);
type ClusterRequest = {
timer: NodeJS.Timer;
onSuccess: (...args: any[]) => void;
onError: () => void;
};
function isClientLockable(
client: Socket,
transportName: string,
lockType: "read" | "write"
) {
switch (transportName) {
case "polling":
return (
client.transport.name === "polling" &&
(lockType === "write" || !client.transport.writable)
);
case "websocket":
case "webtransport":
return (
client.transport.name === "polling" &&
!client.upgrading &&
!client.upgraded
);
}
}
function isValidSessionId(str: string) {
return typeof str === "string" && str.length === 20;
}
interface ClusterEngineOptions {
/**
* The maximum waiting time for responses from other nodes, in ms.
*
* @default 1000
*/
responseTimeout?: number;
/**
* The delay between two "noop" packets when the client upgrades, in ms.
*
* @default 200
*/
noopUpgradeInterval?: number;
/**
* The maximum waiting time for a successful upgrade, in ms.
*
* @default 300
*/
delayedConnectionTimeout?: number;
}
// @ts-expect-error onWebSocket() method is private in parent class
export abstract class ClusterEngine extends Server {
private readonly _opts: Required<ClusterEngineOptions>;
protected readonly _nodeId = randomId() as NodeId;
private readonly _requests = new Map<RequestId, ClusterRequest>();
private readonly _remoteTransports = new Map<SessionId, Transport>();
private _requestCount = 0;
constructor(opts?: ServerOptions & ClusterEngineOptions) {
super(opts);
this._opts = Object.assign(
{
responseTimeout: 1000,
noopUpgradeInterval: 200,
delayedConnectionTimeout: 300,
},
opts
);
}
protected onMessage(message: Message) {
if (message.senderId === this._nodeId) {
return;
}
debug("received: %j", message);
switch (message.type) {
case MessageType.ACQUIRE_LOCK: {
const sid = message.data.sid;
const client = this.clients[sid];
if (!client) {
return;
}
const transportName = message.data.transportName;
const success = isClientLockable(
client,
transportName,
message.data.type
);
this.publishMessage({
requestId: message.requestId,
senderId: this._nodeId,
recipientId: message.senderId,
type: MessageType.ACQUIRE_LOCK_RESPONSE,
data: {
success,
},
});
switch (transportName) {
case "polling": {
if (message.data.type === "read") {
this._forwardFlushWhenPolling(client, sid, message.senderId);
}
break;
}
case "websocket":
case "webtransport": {
client.upgrading = true;
client[kNoopTimer] = setTimeout(() => {
debug("writing a noop packet to polling for fast upgrade");
// @ts-expect-error sendPacket() is private
client.sendPacket("noop");
}, this._opts.noopUpgradeInterval);
}
}
break;
}
case MessageType.ACQUIRE_LOCK_RESPONSE: {
const requestId = message.requestId;
const request = this._requests.get(requestId);
if (!request) {
return;
}
this._requests.delete(requestId);
clearTimeout(request.timer);
if (message.data.success) {
request.onSuccess(message.senderId);
} else {
request.onError();
}
break;
}
case MessageType.DRAIN: {
const transport = this._remoteTransports.get(message.data.sid);
if (!transport) {
return;
}
if (transport.name === "polling") {
// HTTP long-polling can only be drained once
this._remoteTransports.delete(message.data.sid);
}
transport.send(message.data.packets);
break;
}
case MessageType.PACKET: {
const client = this.clients[message.data.sid];
if (!client) {
return;
}
if (client[kDelayed]) {
client[kBuffer].push(message.data.packet);
} else {
// @ts-expect-error onPacket() is private
client.onPacket(message.data.packet);
}
break;
}
case MessageType.UPGRADE: {
const sid = message.data.sid;
const client = this.clients[sid];
if (!client) {
return;
}
clearInterval(client[kNoopTimer]);
client.upgrading = false;
if (message.data.success) {
client.upgraded = true;
client.emit("upgrade");
if (client[kDelayed]) {
client[kDelayed] = false;
clearTimeout(client[kDelayedTimer]);
client.close(true);
delete this.clients[sid];
this.publishMessage({
requestId: message.requestId,
senderId: this._nodeId,
recipientId: message.senderId,
type: MessageType.UPGRADE_RESPONSE,
data: {
takeOver: true,
packets: client[kBuffer],
},
});
} else {
this._forwardFlushWhenWebSocket(client, sid, message.senderId);
this.publishMessage({
requestId: message.requestId,
senderId: this._nodeId,
recipientId: message.senderId,
type: MessageType.UPGRADE_RESPONSE,
data: {
takeOver: false,
packets: [],
},
});
}
}
break;
}
case MessageType.UPGRADE_RESPONSE: {
const requestId = message.requestId;
const request = this._requests.get(requestId);
if (!request) {
return;
}
this._requests.delete(requestId);
clearTimeout(request.timer);
request.onSuccess(message.data.takeOver, message.data.packets);
break;
}
case MessageType.CLOSE: {
const client = this.clients[message.data.sid];
if (!client) {
return;
}
this._doConnect(client);
// @ts-expect-error onClose() is private
client.onClose(message.data.reason);
break;
}
}
}
private _forwardFlushWhenPolling(
client: Socket,
sid: SessionId,
senderId: NodeId
) {
// @ts-expect-error req is private
client.transport.req = true;
client.transport.writable = true;
const oldSend = client.transport.send;
client.transport.send = (packets) => {
this.publishMessage({
senderId: this._nodeId,
recipientId: senderId,
type: MessageType.DRAIN,
data: {
sid,
packets,
},
});
// @ts-expect-error req is private
client.transport.req = null;
client.transport.writable = false;
client.transport.send = oldSend;
};
// @ts-expect-error flush() is private
client.flush();
}
private _forwardFlushWhenWebSocket(
client: Socket,
sid: SessionId,
senderId: NodeId
) {
client.transport.writable = true;
client.transport.send = (packets) => {
this.publishMessage({
senderId: this._nodeId,
recipientId: senderId,
type: MessageType.DRAIN,
data: {
sid,
packets,
},
});
};
// @ts-expect-error flush() is private
client.flush();
}
override verify(
req: IncomingMessage & { _query: Record<string, string> },
upgrade: boolean,
fn: (errorCode?: number, context?: any) => void
): void {
super.verify(req, upgrade, (errorCode: number, errorContext: any) => {
if (errorCode !== Server.errors.UNKNOWN_SID) {
return fn(errorCode, errorContext);
}
const sid = req._query.sid as SessionId;
if (!isValidSessionId(sid)) {
return fn(errorCode, errorContext);
}
const transportName = req._query.transport;
const lockType = req.method === "GET" ? "read" : "write";
const onSuccess = async (senderId: NodeId) => {
if (upgrade) {
req[kSenderId] = senderId;
fn();
} else {
const transport = this.createTransport(transportName, req);
this._hookTransport(sid, transport, lockType, senderId);
transport.onRequest(req);
}
};
this._acquireLock(sid, transportName, lockType, onSuccess, () =>
fn(errorCode, errorContext)
);
});
}
private _acquireLock(
sid: SessionId,
transportName: string,
lockType: "read" | "write",
onSuccess: (senderId: NodeId) => void,
onError: () => void
) {
const requestId = ++this._requestCount as RequestId;
const timer = setTimeout(() => {
this._requests.delete(requestId);
onError();
}, this._opts.responseTimeout);
this._requests.set(requestId, {
timer,
onSuccess,
onError,
});
this.publishMessage({
requestId,
senderId: this._nodeId,
type: MessageType.ACQUIRE_LOCK,
data: {
sid,
transportName,
type: lockType,
},
});
}
private _hookTransport(
sid: SessionId,
transport: Transport,
lockType: "read" | "write",
senderId: NodeId
) {
if (lockType === "read") {
this._remoteTransports.set(sid, transport);
}
transport.on("packet", async (packet: Packet) =>
this._onPacket(sid, senderId, packet)
);
transport.once("error", () =>
this._onClose(sid, senderId, "transport error")
);
transport.once("close", () =>
this._onClose(sid, senderId, "transport close")
);
}
private _tryUpgrade(
transport: Transport,
onSuccess: () => void,
onError: () => void
) {
debug("starting upgrade process");
const upgradeTimeoutTimer = setTimeout(() => {
debug("client did not complete upgrade - closing transport");
transport.close();
transport.removeAllListeners();
onError();
}, this.opts.upgradeTimeout);
transport.on("packet", (packet) => {
if (packet.type === "ping" && packet.data === "probe") {
debug("got probe ping packet, sending pong");
transport.send([{ type: "pong", data: "probe" }]);
} else if (packet.type === "upgrade") {
clearTimeout(upgradeTimeoutTimer);
transport.removeAllListeners();
onSuccess();
} else {
transport.removeAllListeners();
transport.close();
onError();
}
});
transport.on("error", () => {
transport.removeAllListeners();
onError();
});
transport.on("close", () => {
transport.removeAllListeners();
onError();
});
}
private _onPacket(sid: SessionId, senderId: NodeId, packet: Packet) {
this.publishMessage({
senderId: this._nodeId,
recipientId: senderId,
type: MessageType.PACKET,
data: {
sid,
packet,
},
});
}
private _onClose(
sid: SessionId,
senderId: NodeId,
reason: "transport error" | "transport close"
) {
this.publishMessage({
senderId: this._nodeId,
recipientId: senderId,
type: MessageType.CLOSE,
data: {
sid,
reason,
},
});
}
override onWebSocket(req: any, socket: any, websocket: any) {
const sid = req._query.sid;
if (!sid || this.clients[sid]) {
// @ts-expect-error onWebSocket() is private
return super.onWebSocket(req, socket, websocket);
}
websocket.on("error", () => {});
req.websocket = websocket;
const transport = this.createTransport(req._query.transport, req);
const senderId = req[kSenderId];
this._tryUpgrade(
transport,
() => this._onUpgradeSuccess(sid, transport, req, senderId),
() => {
debug("upgrade failure");
}
);
}
private _onUpgradeSuccess(
sid: SessionId,
transport: Transport,
req: any,
senderId: NodeId
) {
debug("upgrade success");
this._hookTransport(sid, transport, "read", senderId);
const requestId = ++this._requestCount as RequestId;
const onSuccess = (takeOver: boolean, packets: Packet[]) => {
if (takeOver) {
this._remoteTransports.delete(sid);
const send = transport.send;
transport.send = () => {};
const socket = new Socket(sid, this, transport, req, 4);
transport.send = send;
this.clients[sid] = socket;
this.clientsCount++;
socket.once("close", () => {
delete this.clients[sid];
this.clientsCount--;
});
super.emit("connection", socket);
socket.emit("upgrade");
for (const packet of packets) {
// @ts-expect-error onPacket() is private
socket.onPacket(packet);
}
}
};
const onError = () => {
transport.close();
};
const timer = setTimeout(() => {
this._requests.delete(requestId);
onError();
}, this._opts.responseTimeout);
this._requests.set(requestId, {
timer,
onSuccess,
onError,
});
this.publishMessage({
requestId,
senderId: this._nodeId,
recipientId: senderId,
type: MessageType.UPGRADE,
data: {
sid,
success: true,
},
});
}
override emit(ev: string, ...args: any[]): boolean {
if (ev !== "connection") {
return super.emit(ev, ...args);
}
const socket = args[0] as Socket;
if (socket.transport.name === "websocket") {
return super.emit(ev, ...args);
}
debug("delaying connection");
socket[kDelayed] = true;
socket[kBuffer] = [];
socket[kPacketListener] = (packet: Packet) => {
socket[kBuffer].push(packet);
};
socket.on("packet", socket[kPacketListener]);
socket[kDelayedTimer] = setTimeout(
() => this._doConnect(socket),
this._opts.delayedConnectionTimeout
);
}
private _doConnect(socket: Socket) {
if (!socket[kDelayed] || socket.readyState !== "open") {
return;
}
debug(
"the client has not upgraded yet, so the connection process is completed here"
);
socket[kDelayed] = false;
socket.off("packet", socket[kPacketListener]);
clearTimeout(socket[kDelayedTimer]);
super.emit("connection", socket);
socket[kBuffer].forEach((packet: Packet) => {
// @ts-expect-error onPacket() method is private
socket.onPacket(packet);
});
delete socket[kBuffer];
if (socket.upgraded) {
socket.emit("upgrade");
}
}
abstract publishMessage(message: Message): void;
}

View File

@@ -0,0 +1,2 @@
export { setupPrimary, NodeClusterEngine } from "./cluster";
export { setupPrimaryWithRedis, RedisEngine } from "./redis";

View File

@@ -0,0 +1,200 @@
import { ClusterEngine, type Message } from "./engine";
import { encode, decode } from "@msgpack/msgpack";
import { type ServerOptions } from "engine.io";
import cluster from "node:cluster";
import { randomUUID } from "node:crypto";
import debugModule from "debug";
const debug = debugModule("engine:redis");
const MESSAGE_SOURCE = "_eio";
const kNodeId = Symbol("nodeId");
function ignoreError() {}
interface PrimaryWithRedisOptions {
/**
* The prefix for the Redis Pub/Sub channels.
*
* @default "engine.io"
*/
channelPrefix?: string;
}
function channelName(prefix: string, nodeId?: string) {
if (nodeId) {
return prefix + "#" + nodeId + "#";
} else {
return prefix + "#";
}
}
export function setupPrimaryWithRedis(
pubClient: any,
subClient: any,
opts?: PrimaryWithRedisOptions
) {
const primaryId = randomUUID();
const prefix = opts?.channelPrefix || "engine.io";
const channels = [channelName(prefix), channelName(prefix, primaryId)];
debug("subscribing to redis channels: %s", channels);
SUBSCRIBE(subClient, channels, (buffer: Buffer) => {
let message: Message & { _source?: string; _primaryId?: string };
try {
message = decode(buffer) as Message;
} catch (e) {
debug("ignore malformed buffer");
return;
}
if (message._source !== MESSAGE_SOURCE) {
debug("ignore message from unknown source");
return;
}
if (message._primaryId === primaryId) {
debug("ignore message from self");
return;
}
debug("received message: %j", message);
// @ts-expect-error recipientId is not defined for all messages
const recipientId = (message as Message).recipientId;
if (recipientId) {
for (const worker of Object.values(cluster.workers)) {
if (worker[kNodeId] === recipientId) {
debug("forward message to worker %d", worker.id);
worker.send(message, null, ignoreError);
return;
}
}
}
debug("forward message to all workers");
for (const worker of Object.values(cluster.workers)) {
worker.send(message, null, ignoreError);
}
});
cluster.on(
"message",
(
sourceWorker,
message: Message & { _source?: string; _primaryId?: string }
) => {
if (message._source !== MESSAGE_SOURCE) {
debug("ignore message from unknown source");
return;
}
if (!sourceWorker[kNodeId]) {
sourceWorker[kNodeId] = (message as Message).senderId;
}
// @ts-expect-error recipientId is not defined for all messages
let recipientId = message.recipientId;
if (recipientId) {
for (const worker of Object.values(cluster.workers)) {
if (worker[kNodeId] === recipientId) {
debug("forward message to worker %d", worker.id);
worker.send(message, null, ignoreError);
return;
}
}
}
debug("forward message to all other workers");
for (const worker of Object.values(cluster.workers)) {
if (worker.id !== sourceWorker.id) {
worker.send(message, null, ignoreError);
}
}
// @ts-expect-error recipientId is not defined for all messages
const channel = channelName(prefix, message.recipientId);
message._primaryId = primaryId;
debug("publish message to channel %s", channel);
pubClient.publish(channel, encode(message));
}
);
}
interface RedisEngineOptions extends ServerOptions {
/**
* The prefix for the Redis Pub/Sub channels.
*
* @default "engine.io"
*/
channelPrefix?: string;
}
export class RedisEngine extends ClusterEngine {
private readonly _pubClient: any;
private readonly _channelPrefix: string;
constructor(pubClient: any, subClient: any, opts?: RedisEngineOptions) {
super(opts);
this._pubClient = pubClient;
this._channelPrefix = opts?.channelPrefix || "engine.io";
const channels = [
channelName(this._channelPrefix),
channelName(this._channelPrefix, this._nodeId),
];
debug("subscribing to redis channels: %s", channels);
SUBSCRIBE(subClient, channels, (buffer: Buffer) => {
let message: Message & { _source?: string; _primaryId?: string };
try {
message = decode(buffer) as Message;
} catch (e) {
debug("ignore malformed buffer");
return;
}
if (message._source !== MESSAGE_SOURCE) {
debug("ignore message from unknown source");
return;
}
debug("received message: %j", message);
this.onMessage(message);
});
}
publishMessage(message: Message & { _source?: string }): void {
// @ts-expect-error recipientId is not defined for all messages
const channel = channelName(this._channelPrefix, message.recipientId);
message._source = MESSAGE_SOURCE;
debug("publish message to channel %s", channel);
this._pubClient.publish(channel, Buffer.from(encode(message)));
}
}
const RETURN_BUFFERS = true;
function SUBSCRIBE(
redisClient: any,
channels: string[],
listener: (message: Buffer) => void
) {
if (isRedisClient(redisClient)) {
redisClient.subscribe(channels, listener, RETURN_BUFFERS);
} else {
redisClient.subscribe(channels);
redisClient.on("messageBuffer", (_channel: Buffer, message: Buffer) =>
listener(message)
);
}
}
/**
* Whether the redis client comes from the 'redis' or the 'ioredis' package
* @param redisClient
*/
function isRedisClient(redisClient: any) {
return typeof redisClient.sSubscribe === "function";
}

View File

@@ -0,0 +1,41 @@
{
"name": "@socket.io/cluster-engine",
"version": "0.1.0",
"description": "A cluster-friendly engine to share load between multiple Node.js processes (without sticky sessions)",
"type": "commonjs",
"license": "MIT",
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io-clustered-engine#readme",
"repository": {
"type": "git",
"url": "git+https://github.com/socketio/socket.io.git"
},
"bugs": {
"url": "https://github.com/socketio/socket.io/issues"
},
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"files": [
"dist/"
],
"dependencies": {
"@msgpack/msgpack": "~2.8.0",
"debug": "~4.3.3",
"engine.io": "~6.6.0",
"engine.io-parser": "~5.2.3"
},
"scripts": {
"compile": "rimraf ./dist && tsc",
"test": "npm run format:check && npm run compile && npm run test:unit",
"test:unit": "mocha --require ts-node/register test/*.ts",
"format:check": "prettier --check \"lib/**/*.ts\" \"test/**/*.ts\"",
"format:fix": "prettier --write \"lib/**/*.ts\" \"test/**/*.ts\"",
"prepack": "npm run compile"
},
"engines": {
"node": ">=10.2.0"
},
"keywords": [
"socket.io",
"cluster"
]
}

View File

@@ -0,0 +1,83 @@
import cluster from "node:cluster";
import expect = require("expect.js");
import { handshake, url } from "./util";
import { setupPrimary } from "../lib";
cluster.setupPrimary({
exec: "./test/worker.js",
// @ts-expect-error
serialization: "advanced", // needed for packets with Buffer objects
});
setupPrimary();
describe("cluster", () => {
beforeEach((done) => {
for (let i = 0; i < 3; i++) {
const worker = cluster.fork();
if (i === 2) {
worker.on("listening", () => done());
}
}
});
afterEach((done) => {
for (const worker of Object.values(cluster.workers)) {
worker.kill();
}
function onExit() {
if (Object.keys(cluster.workers).length === 0) {
cluster.off("exit", onExit);
done();
}
}
cluster.on("exit", onExit);
});
it("should ping/pong", (done) => {
(async () => {
const sid = await handshake(3000);
for (let i = 0; i < 10; i++) {
const pollRes = await fetch(url(3000, sid));
expect(pollRes.status).to.eql(200);
const body = await pollRes.text();
expect(body).to.eql("2");
const dataRes = await fetch(url(3000, sid), {
method: "POST",
body: "3",
});
expect(dataRes.status).to.eql(200);
}
done();
})();
});
it("should send and receive binary", (done) => {
(async () => {
const sid = await handshake(3000);
const dataRes = await fetch(url(3000, sid), {
method: "POST",
body: "bAQIDBA==", // buffer <01 02 03 04> encoded as base64
});
expect(dataRes.status).to.eql(200);
for (let i = 0; i < 100; i++) {
const pollRes = await fetch(url(3000, sid));
expect(pollRes.status).to.eql(200);
const body = await pollRes.text();
if (body === "bAQIDBA==") {
done();
break;
} else {
// ping packet
}
}
})();
});
});

View File

@@ -0,0 +1,363 @@
import { EventEmitter } from "node:events";
import { createServer, Server } from "node:http";
import expect = require("expect.js");
import { WebSocket } from "ws";
import { ClusterEngine, type Message } from "../lib/engine";
import { type ServerOptions } from "engine.io";
import { url, handshake } from "./util";
class InMemoryEngine extends ClusterEngine {
constructor(readonly eventBus: EventEmitter, opts?: ServerOptions) {
super(opts);
eventBus.on("message", (message) => this.onMessage(message));
}
publishMessage(message: Message) {
this.eventBus.emit("message", message);
}
}
describe("in-memory", () => {
let engine1: ClusterEngine,
httpServer1: Server,
engine2: ClusterEngine,
httpServer2: Server,
engine3: ClusterEngine,
httpServer3: Server;
beforeEach(() => {
const eventBus = new EventEmitter();
httpServer1 = createServer();
engine1 = new InMemoryEngine(eventBus);
engine1.attach(httpServer1);
httpServer1.listen(3000);
httpServer2 = createServer();
engine2 = new InMemoryEngine(eventBus);
engine2.attach(httpServer2);
httpServer2.listen(3001);
httpServer3 = createServer();
engine3 = new InMemoryEngine(eventBus, {
pingInterval: 50,
});
engine3.attach(httpServer3);
httpServer3.listen(3002);
});
afterEach(() => {
engine1.close();
engine2.close();
engine3.close();
httpServer1.close();
httpServer1.closeAllConnections();
httpServer2.close();
httpServer2.closeAllConnections();
httpServer3.close();
httpServer3.closeAllConnections();
});
it("should work (read)", (done) => {
engine1.on("connection", (socket) => {
socket.send("hello");
});
(async () => {
const sid = await handshake(3000);
const res = await fetch(url(3001, sid));
expect(res.status).to.eql(200);
const body = await res.text();
expect(body).to.eql("4hello");
done();
})();
});
it("should work (read - deferred)", (done) => {
engine1.on("connection", (socket) => {
setTimeout(() => {
socket.send("hello");
}, 200);
});
(async () => {
const sid = await handshake(3000);
const res = await fetch(url(3001, sid));
expect(res.status).to.eql(200);
const body = await res.text();
expect(body).to.eql("4hello");
done();
})();
});
it("should work (write)", (done) => {
engine1.on("connection", (socket) => {
socket.on("message", (data) => {
expect(data).to.eql("hello");
done();
});
});
(async () => {
const sid = await handshake(3000);
const res = await fetch(url(3001, sid), {
method: "POST",
body: "4hello",
});
expect(res.status).to.eql(200);
})();
});
it("should work (write - multiple)", (done) => {
engine1.on("connection", (socket) => {
let packets = [];
socket.on("message", (data) => {
packets.push(data);
if (packets.length === 6) {
expect(packets).to.eql(["1", "2", "3", "4", "5", "6"]);
done();
}
});
});
(async () => {
const sid = await handshake(3000);
const res1 = await fetch(url(3001, sid), {
method: "POST",
body: "41\x1e42\x1e43",
});
expect(res1.status).to.eql(200);
const res2 = await fetch(url(3000, sid), {
method: "POST",
body: "44\x1e45",
});
expect(res2.status).to.eql(200);
const res3 = await fetch(url(3001, sid), {
method: "POST",
body: "46",
});
expect(res3.status).to.eql(200);
})();
});
it("should acquire read lock (different process)", (done) => {
(async () => {
const sid = await handshake(3000);
const controller = new AbortController();
fetch(url(3000, sid), {
signal: controller.signal,
});
const res = await fetch(url(3001, sid));
expect(res.status).to.eql(400);
controller.abort();
done();
})();
});
it("should acquire read lock (same process)", (done) => {
(async () => {
const sid = await handshake(3000);
const controller = new AbortController();
fetch(url(3001, sid), {
signal: controller.signal,
});
const res = await fetch(url(3000, sid));
expect(res.status).to.eql(400);
controller.abort();
done();
})();
});
it("should handle close from main process", (done) => {
engine1.on("connection", (socket) => {
setTimeout(() => {
socket.close();
}, 100);
});
(async () => {
const sid = await handshake(3000);
const res = await fetch(url(3001, sid));
expect(res.status).to.eql(200);
const body = await res.text();
expect(body).to.eql("1");
done();
})();
});
it("should handle close from client", (done) => {
engine1.on("connection", (socket) => {
socket.on("close", (reason) => {
expect(reason).to.eql("transport error");
done();
});
});
(async () => {
const sid = await handshake(3000);
const controller = new AbortController();
fetch(url(3001, sid), {
signal: controller.signal,
});
setTimeout(() => {
controller.abort();
}, 100);
})();
});
it("should ping/pong", function (done) {
(async () => {
const sid = await handshake(3002);
for (let i = 0; i < 10; i++) {
const port1 = [3000, 3001, 3002][i % 3];
const res1 = await fetch(url(port1, sid));
expect(res1.status).to.eql(200);
const body1 = await res1.text();
expect(body1).to.eql("2");
const port2 = [3000, 3001, 3002][(i + 1) % 3];
const res2 = await fetch(url(port2, sid), {
method: "POST",
body: "3",
});
expect(res2.status).to.eql(200);
}
// @ts-expect-error
expect(engine1._requests.size).to.eql(0);
// @ts-expect-error
expect(engine2._requests.size).to.eql(0);
// @ts-expect-error
expect(engine3._requests.size).to.eql(0);
// @ts-expect-error
expect(engine1._remoteTransports.size).to.eql(0);
// @ts-expect-error
expect(engine2._remoteTransports.size).to.eql(0);
// @ts-expect-error
expect(engine3._remoteTransports.size).to.eql(0);
done();
})();
});
it("should reject an invalid id", (done) => {
(async () => {
const res = await fetch(url(3001, "01234567890123456789"));
expect(res.status).to.eql(400);
done();
})();
});
it("should upgrade", (done) => {
engine2.on("connection", (socket) => {
socket.on("upgrade", () => {
socket.send("hello");
});
socket.on("message", (val) => {
expect(val).to.eql("hi");
socket.close();
done();
});
});
(async () => {
const sid = await handshake(3000);
const socket = new WebSocket(
`ws://localhost:3001/engine.io/?EIO=4&transport=websocket&sid=${sid}`
);
socket.onopen = () => {
socket.send("2probe");
};
let i = 0;
socket.onmessage = ({ data }) => {
switch (i++) {
case 0:
expect(data).to.eql("3probe");
socket.send("5");
break;
case 1:
expect(data).to.eql("4hello");
socket.send("4hi");
break;
}
};
})();
});
it("should upgrade and send buffered messages", (done) => {
engine2.on("connection", (socket) => {
socket.on("upgrade", () => {
socket.send("hello");
});
socket.on("message", (val) => {
expect(val).to.eql("hi");
socket.close();
done();
});
});
(async () => {
const sid = await handshake(3000);
const res = await fetch(url(3001, sid), {
method: "POST",
body: "4hi",
});
expect(res.status).to.eql(200);
const socket = new WebSocket(
`ws://localhost:3001/engine.io/?EIO=4&transport=websocket&sid=${sid}`
);
socket.onopen = () => {
socket.send("2probe");
};
let i = 0;
socket.onmessage = ({ data }) => {
switch (i++) {
case 0:
expect(data).to.eql("3probe");
socket.send("5");
break;
case 1:
expect(data).to.eql("4hello");
break;
}
};
})();
});
});

View File

@@ -0,0 +1,225 @@
import expect = require("expect.js");
import { createServer } from "node:http";
import { createClient } from "redis";
import { handshake, url } from "./util";
import { type ClusterEngine } from "../lib/engine";
import { RedisEngine } from "../lib";
import Redis from "ioredis";
describe("redis", () => {
let engine1: ClusterEngine,
engine2: ClusterEngine,
engine3: ClusterEngine,
cleanup: () => Promise<void>;
describe("redis package", () => {
beforeEach(async () => {
const pubClient = createClient();
const subClient1 = pubClient.duplicate();
const subClient2 = pubClient.duplicate();
const subClient3 = pubClient.duplicate();
await Promise.all([
pubClient.connect(),
subClient1.connect(),
subClient2.connect(),
subClient3.connect(),
]);
const httpServer1 = createServer();
engine1 = new RedisEngine(pubClient, subClient1);
engine1.attach(httpServer1);
httpServer1.listen(3000);
const httpServer2 = createServer();
engine2 = new RedisEngine(pubClient, subClient2);
engine2.attach(httpServer2);
httpServer2.listen(3001);
const httpServer3 = createServer();
engine3 = new RedisEngine(pubClient, subClient3, {
pingInterval: 50,
});
engine3.attach(httpServer3);
httpServer3.listen(3002);
cleanup = () => {
engine1.close();
engine2.close();
engine3.close();
httpServer1.close();
httpServer1.closeAllConnections();
httpServer2.close();
httpServer2.closeAllConnections();
httpServer3.close();
httpServer3.closeAllConnections();
return Promise.all([
pubClient.disconnect(),
subClient1.disconnect(),
subClient2.disconnect(),
subClient3.disconnect(),
]).then();
};
});
afterEach(() => {
return cleanup();
});
it("should ping/pong", (done) => {
(async () => {
const sid = await handshake(3002);
for (let i = 0; i < 10; i++) {
const pollPort = [3000, 3001, 3002][i % 3];
const pollRes = await fetch(url(pollPort, sid));
expect(pollRes.status).to.eql(200);
const body = await pollRes.text();
expect(body).to.eql("2");
const dataPort = [3000, 3001, 3002][(i + 1) % 3];
const dataRes = await fetch(url(dataPort, sid), {
method: "POST",
body: "3",
});
expect(dataRes.status).to.eql(200);
}
done();
})();
});
it("should send and receive binary", (done) => {
engine1.on("connection", (socket) => {
socket.on("message", (val: any) => {
socket.send(val);
});
});
(async () => {
const sid = await handshake(3000);
const dataRes = await fetch(url(3001, sid), {
method: "POST",
body: "bAQIDBA==", // buffer <01 02 03 04> encoded as base64
});
expect(dataRes.status).to.eql(200);
while (true) {
const pollRes = await fetch(url(3002, sid));
expect(pollRes.status).to.eql(200);
const body = await pollRes.text();
if (body === "bAQIDBA==") {
done();
break;
} else {
// ping packet
}
}
})();
});
});
describe("ioredis package", () => {
beforeEach(async () => {
const pubClient = new Redis();
const subClient1 = pubClient.duplicate();
const subClient2 = pubClient.duplicate();
const subClient3 = pubClient.duplicate();
const httpServer1 = createServer();
engine1 = new RedisEngine(pubClient, subClient1);
engine1.attach(httpServer1);
httpServer1.listen(3000);
const httpServer2 = createServer();
engine2 = new RedisEngine(pubClient, subClient2);
engine2.attach(httpServer2);
httpServer2.listen(3001);
const httpServer3 = createServer();
engine3 = new RedisEngine(pubClient, subClient3, {
pingInterval: 50,
});
engine3.attach(httpServer3);
httpServer3.listen(3002);
cleanup = async () => {
engine1.close();
engine2.close();
engine3.close();
httpServer1.close();
httpServer1.closeAllConnections();
httpServer2.close();
httpServer2.closeAllConnections();
httpServer3.close();
httpServer3.closeAllConnections();
pubClient.disconnect();
subClient1.disconnect();
subClient2.disconnect();
subClient3.disconnect();
};
});
afterEach(() => {
return cleanup();
});
it("should ping/pong", (done) => {
(async () => {
const sid = await handshake(3002);
for (let i = 0; i < 10; i++) {
const pollPort = [3000, 3001, 3002][i % 3];
const pollRes = await fetch(url(pollPort, sid));
expect(pollRes.status).to.eql(200);
const body = await pollRes.text();
expect(body).to.eql("2");
const dataPort = [3000, 3001, 3002][(i + 1) % 3];
const dataRes = await fetch(url(dataPort, sid), {
method: "POST",
body: "3",
});
expect(dataRes.status).to.eql(200);
}
done();
})();
});
it("should send and receive binary", (done) => {
engine1.on("connection", (socket) => {
socket.on("message", (val: any) => {
socket.send(val);
});
});
(async () => {
const sid = await handshake(3000);
const dataRes = await fetch(url(3001, sid), {
method: "POST",
body: "bAQIDBA==", // buffer <01 02 03 04> encoded as base64
});
expect(dataRes.status).to.eql(200);
while (true) {
const pollRes = await fetch(url(3002, sid));
expect(pollRes.status).to.eql(200);
const body = await pollRes.text();
if (body === "bAQIDBA==") {
done();
break;
} else {
// ping packet
}
}
})();
});
});
});

View File

@@ -0,0 +1,17 @@
import expect = require("expect.js");
export function url(port: number, sid?: string) {
let url = `http://localhost:${port}/engine.io/?EIO=4&transport=polling`;
if (sid) {
url += `&sid=${sid}`;
}
return url;
}
export async function handshake(port: number) {
const res = await fetch(url(port));
expect(res.status).to.eql(200);
const body1 = await res.text();
return JSON.parse(body1.substring(1)).sid;
}

View File

@@ -0,0 +1,16 @@
const { createServer } = require("node:http");
const { NodeClusterEngine } = require("../dist/cluster");
const httpServer = createServer();
const engine = new NodeClusterEngine({
pingInterval: 50
});
engine.on("connection", socket => {
socket.on("message", (val) => {
socket.send(val);
});
});
engine.attach(httpServer);
httpServer.listen(3000);

View File

@@ -0,0 +1,12 @@
{
"compilerOptions": {
"outDir": "./dist",
"module": "node16",
"target": "ES2022",
"declaration": true,
"strict": false
},
"include": [
"./lib/**/*"
]
}

View File

@@ -14,7 +14,7 @@
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io-component-emitter#readme",
"repository": {
"type": "git",
"url": "https://github.com/socketio/socket.io.git"
"url": "git+https://github.com/socketio/socket.io.git"
},
"bugs": {
"url": "https://github.com/socketio/socket.io/issues"

View File

@@ -5,7 +5,7 @@
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io-client#readme",
"repository": {
"type": "git",
"url": "https://github.com/socketio/socket.io.git"
"url": "git+https://github.com/socketio/socket.io.git"
},
"bugs": {
"url": "https://github.com/socketio/socket.io/issues"

View File

@@ -35,7 +35,7 @@
"homepage": "https://github.com/socketio/socket.io/tree/main/packages/socket.io#readme",
"repository": {
"type": "git",
"url": "https://github.com/socketio/socket.io.git"
"url": "git+https://github.com/socketio/socket.io.git"
},
"bugs": {
"url": "https://github.com/socketio/socket.io/issues"