docs: add examples with @socket.io/cluster-engine

[skip ci]
This commit is contained in:
Damien Arrachequesne
2024-07-19 08:42:38 +02:00
parent 1f09a3e979
commit 8b0a40fd4a
9 changed files with 252 additions and 0 deletions

View File

@@ -0,0 +1,19 @@
# Example with `@socket.io/cluster-engine` and Node.js cluster
## How to use
```bash
# run the server
$ node server.js
# run the client
$ node client.js
```
## Explanation
The `server.js` script will create one Socket.IO server per core, each listening on the same port (`3000`).
With the default engine (provided by the `engine.io` package), sticky sessions would be required, so that each HTTP request of the same Engine.IO session reaches the same worker.
The `NodeClusterEngine` is a custom engine which takes care of the synchronization between the servers by using [the IPC channel](https://nodejs.org/api/cluster.html#workersendmessage-sendhandle-options-callback) and removes the need for sticky sessions when scaling horizontally.

View File

@@ -0,0 +1,26 @@
import { io } from "socket.io-client";
const CLIENTS_COUNT = 3;
for (let i = 0; i < CLIENTS_COUNT; i++) {
const socket = io("ws://localhost:3000/", {
// transports: ["polling"],
// transports: ["websocket"],
});
socket.on("connect", () => {
console.log(`connected as ${socket.id}`);
});
socket.on("disconnect", (reason) => {
console.log(`disconnected due to ${reason}`);
});
socket.on("hello", (socketId, workerId) => {
console.log(`received "hello" from ${socketId} (worker: ${workerId})`);
});
setInterval(() => {
socket.emit("hello");
}, 2000);
}

View File

@@ -0,0 +1,12 @@
{
"private": true,
"name": "cluster-engine-node-cluster",
"version": "0.0.1",
"type": "module",
"dependencies": {
"@socket.io/cluster-adapter": "^0.2.2",
"@socket.io/cluster-engine": "^0.1.0",
"socket.io": "^4.7.5",
"socket.io-client": "^4.7.5"
}
}

View File

@@ -0,0 +1,63 @@
import cluster from "node:cluster";
import process from "node:process";
import { availableParallelism } from "node:os";
import {
setupPrimary as setupPrimaryEngine,
NodeClusterEngine,
} from "@socket.io/cluster-engine";
import {
setupPrimary as setupPrimaryAdapter,
createAdapter,
} from "@socket.io/cluster-adapter";
import { createServer } from "node:http";
import { Server } from "socket.io";
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
const numCPUs = availableParallelism();
// fork workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
setupPrimaryEngine();
setupPrimaryAdapter();
// needed for packets containing Buffer objects (you can ignore it if you only send plaintext objects)
cluster.setupPrimary({
serialization: "advanced",
});
cluster.on("exit", (worker, code, signal) => {
console.log(`worker ${worker.process.pid} died`);
});
} else {
const httpServer = createServer((req, res) => {
res.writeHead(404).end();
});
const engine = new NodeClusterEngine();
engine.attach(httpServer, {
path: "/socket.io/",
});
const io = new Server({
adapter: createAdapter(),
});
io.bind(engine);
io.on("connection", (socket) => {
socket.on("hello", () => {
socket.broadcast.emit("hello", socket.id, process.pid);
});
});
// workers will share the same port
httpServer.listen(3000);
console.log(`Worker ${process.pid} started`);
}

View File

@@ -0,0 +1,22 @@
# Example with `@socket.io/cluster-engine` and Redis
## How to use
```bash
# start the redis server
$ docker compose up -d
# run the server
$ node server.js
# run the client
$ node client.js
```
## Explanation
The `server.js` script will create 3 Socket.IO servers, each listening on a distinct port (`3001`, `3002` and `3003`), and a proxy server listening on port `3000` which randomly redirects to one of those servers.
With the default engine (provided by the `engine.io` package), sticky sessions would be required, so that each HTTP request of the same Engine.IO session reaches the same server.
The `RedisEngine` is a custom engine which takes care of the synchronization between the servers by using [Redis pub/sub](https://redis.io/docs/latest/develop/interact/pubsub/) and removes the need for sticky sessions when scaling horizontally.

View File

@@ -0,0 +1,26 @@
import { io } from "socket.io-client";
const CLIENTS_COUNT = 3;
for (let i = 0; i < CLIENTS_COUNT; i++) {
const socket = io("ws://localhost:3000/", {
// transports: ["polling"],
// transports: ["websocket"],
});
socket.on("connect", () => {
console.log(`connected as ${socket.id}`);
});
socket.on("disconnect", (reason) => {
console.log(`disconnected due to ${reason}`);
});
socket.on("hello", (socketId, workerId) => {
console.log(`received "hello" from ${socketId} (worker: ${workerId})`);
});
setInterval(() => {
socket.emit("hello");
}, 2000);
}

View File

@@ -0,0 +1,5 @@
services:
redis:
image: redis:7
ports:
- "6379:6379"

View File

@@ -0,0 +1,14 @@
{
"private": true,
"name": "cluster-engine-redis",
"version": "0.0.1",
"type": "module",
"dependencies": {
"@socket.io/cluster-engine": "^0.1.0",
"@socket.io/redis-adapter": "^8.3.0",
"http-proxy": "^1.18.1",
"redis": "^4.6.15",
"socket.io": "^4.7.5",
"socket.io-client": "^4.7.5"
}
}

View File

@@ -0,0 +1,65 @@
import { RedisEngine } from "@socket.io/cluster-engine";
import { createServer } from "node:http";
import { createClient } from "redis";
import { Server } from "socket.io";
import { createAdapter } from "@socket.io/redis-adapter";
import proxyModule from "http-proxy";
const { createProxyServer } = proxyModule;
async function initServer(port) {
const httpServer = createServer((req, res) => {
res.writeHead(404).end();
});
const pubClient = createClient();
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
const engine = new RedisEngine(pubClient, subClient);
engine.attach(httpServer, {
path: "/socket.io/",
});
const io = new Server({
adapter: createAdapter(pubClient, subClient),
});
io.bind(engine);
io.on("connection", (socket) => {
socket.on("hello", () => {
socket.broadcast.emit("hello", socket.id, port);
});
});
httpServer.listen(port);
}
function initProxy() {
const proxy = createProxyServer();
function randomTarget() {
return [
"http://localhost:3001",
"http://localhost:3002",
"http://localhost:3003",
][Math.floor(Math.random() * 3)];
}
const httpServer = createServer((req, res) => {
proxy.web(req, res, { target: randomTarget() });
});
httpServer.on("upgrade", function (req, socket, head) {
proxy.ws(req, socket, head, { target: randomTarget() });
});
httpServer.listen(3000);
}
await Promise.all([initServer(3001), initServer(3002), initServer(3003)]);
initProxy();