diff --git a/examples/private-messaging/server/cluster.js b/examples/private-messaging/server/cluster.js new file mode 100644 index 00000000..e3bad09f --- /dev/null +++ b/examples/private-messaging/server/cluster.js @@ -0,0 +1,31 @@ +const cluster = require("cluster"); +const http = require("http"); +const { setupMaster } = require("@socket.io/sticky"); + +const WORKERS_COUNT = 4; + +if (cluster.isMaster) { + console.log(`Master ${process.pid} is running`); + + for (let i = 0; i < WORKERS_COUNT; i++) { + cluster.fork(); + } + + cluster.on("exit", (worker) => { + console.log(`Worker ${worker.process.pid} died`); + cluster.fork(); + }); + + const httpServer = http.createServer(); + setupMaster(httpServer, { + loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection" + }); + const PORT = process.env.PORT || 3000; + + httpServer.listen(PORT, () => + console.log(`server listening at http://localhost:${PORT}`) + ); +} else { + console.log(`Worker ${process.pid} started`); + require("./index"); +} diff --git a/examples/private-messaging/server/docker-compose.yml b/examples/private-messaging/server/docker-compose.yml new file mode 100644 index 00000000..4845950c --- /dev/null +++ b/examples/private-messaging/server/docker-compose.yml @@ -0,0 +1,7 @@ +version: "3" + +services: + redis: + image: redis:5 + ports: + - "6379:6379" diff --git a/examples/private-messaging/server/index.js b/examples/private-messaging/server/index.js index cec69501..1ab99be5 100644 --- a/examples/private-messaging/server/index.js +++ b/examples/private-messaging/server/index.js @@ -1,23 +1,30 @@ const httpServer = require("http").createServer(); +const Redis = require("ioredis"); +const redisClient = new Redis(); const io = require("socket.io")(httpServer, { cors: { origin: "http://localhost:8080", }, + adapter: require("socket.io-redis")({ + pubClient: redisClient, + subClient: redisClient.duplicate(), + }), }); +const { setupWorker } = require("@socket.io/sticky"); const crypto = require("crypto"); const randomId = () => crypto.randomBytes(8).toString("hex"); -const { InMemorySessionStore } = require("./sessionStore"); -const sessionStore = new InMemorySessionStore(); +const { RedisSessionStore } = require("./sessionStore"); +const sessionStore = new RedisSessionStore(redisClient); -const { InMemoryMessageStore } = require("./messageStore"); -const messageStore = new InMemoryMessageStore(); +const { RedisMessageStore } = require("./messageStore"); +const messageStore = new RedisMessageStore(redisClient); -io.use((socket, next) => { +io.use(async (socket, next) => { const sessionID = socket.handshake.auth.sessionID; if (sessionID) { - const session = sessionStore.findSession(sessionID); + const session = await sessionStore.findSession(sessionID); if (session) { socket.sessionID = sessionID; socket.userID = session.userID; @@ -35,7 +42,7 @@ io.use((socket, next) => { next(); }); -io.on("connection", (socket) => { +io.on("connection", async (socket) => { // persist session sessionStore.saveSession(socket.sessionID, { userID: socket.userID, @@ -54,8 +61,12 @@ io.on("connection", (socket) => { // fetch existing users const users = []; + const [messages, sessions] = await Promise.all([ + messageStore.findMessagesForUser(socket.userID), + sessionStore.findAllSessions(), + ]); const messagesPerUser = new Map(); - messageStore.findMessagesForUser(socket.userID).forEach((message) => { + messages.forEach((message) => { const { from, to } = message; const otherUser = socket.userID === from ? to : from; if (messagesPerUser.has(otherUser)) { @@ -64,7 +75,8 @@ io.on("connection", (socket) => { messagesPerUser.set(otherUser, [message]); } }); - sessionStore.findAllSessions().forEach((session) => { + + sessions.forEach((session) => { users.push({ userID: session.userID, username: session.username, @@ -110,8 +122,4 @@ io.on("connection", (socket) => { }); }); -const PORT = process.env.PORT || 3000; - -httpServer.listen(PORT, () => - console.log(`server listening at http://localhost:${PORT}`) -); +setupWorker(io); diff --git a/examples/private-messaging/server/messageStore.js b/examples/private-messaging/server/messageStore.js index 38d14464..60ab0f6f 100644 --- a/examples/private-messaging/server/messageStore.js +++ b/examples/private-messaging/server/messageStore.js @@ -20,6 +20,35 @@ class InMemoryMessageStore extends MessageStore { } } +const CONVERSATION_TTL = 24 * 60 * 60; + +class RedisMessageStore extends MessageStore { + constructor(redisClient) { + super(); + this.redisClient = redisClient; + } + + saveMessage(message) { + const value = JSON.stringify(message); + this.redisClient + .multi() + .rpush(`messages:${message.from}`, value) + .rpush(`messages:${message.to}`, value) + .expire(`messages:${message.from}`, CONVERSATION_TTL) + .expire(`messages:${message.to}`, CONVERSATION_TTL) + .exec(); + } + + findMessagesForUser(userID) { + return this.redisClient + .lrange(`messages:${userID}`, 0, -1) + .then((results) => { + return results.map((result) => JSON.parse(result)); + }); + } +} + module.exports = { InMemoryMessageStore, + RedisMessageStore, }; diff --git a/examples/private-messaging/server/package.json b/examples/private-messaging/server/package.json index 0c98b9ff..422d1ba1 100644 --- a/examples/private-messaging/server/package.json +++ b/examples/private-messaging/server/package.json @@ -4,11 +4,14 @@ "description": "", "main": "index.js", "scripts": { - "start": "node index.js" + "start": "node cluster.js" }, "author": "Damien Arrachequesne ", "license": "MIT", "dependencies": { - "socket.io": "^3.1.1" + "@socket.io/sticky": "^1.0.0", + "ioredis": "^4.22.0", + "socket.io": "^3.1.1", + "socket.io-redis": "^6.0.1" } } diff --git a/examples/private-messaging/server/sessionStore.js b/examples/private-messaging/server/sessionStore.js index 42dd294d..0ace3f4e 100644 --- a/examples/private-messaging/server/sessionStore.js +++ b/examples/private-messaging/server/sessionStore.js @@ -23,6 +23,67 @@ class InMemorySessionStore extends SessionStore { } } +const SESSION_TTL = 24 * 60 * 60; +const mapSession = ([userID, username, connected]) => + userID ? { userID, username, connected: connected === "true" } : undefined; + +class RedisSessionStore extends SessionStore { + constructor(redisClient) { + super(); + this.redisClient = redisClient; + } + + findSession(id) { + return this.redisClient + .hmget(`session:${id}`, "userID", "username", "connected") + .then(mapSession); + } + + saveSession(id, { userID, username, connected }) { + this.redisClient + .multi() + .hset( + `session:${id}`, + "userID", + userID, + "username", + username, + "connected", + connected + ) + .expire(`session:${id}`, SESSION_TTL) + .exec(); + } + + async findAllSessions() { + const keys = new Set(); + let nextIndex = 0; + do { + const [nextIndexAsStr, results] = await this.redisClient.scan( + nextIndex, + "MATCH", + "session:*", + "COUNT", + "100" + ); + nextIndex = parseInt(nextIndexAsStr, 10); + results.forEach((s) => keys.add(s)); + } while (nextIndex !== 0); + const commands = []; + keys.forEach((key) => { + commands.push(["hmget", key, "userID", "username", "connected"]); + }); + return this.redisClient + .multi(commands) + .exec() + .then((results) => { + return results + .map(([err, session]) => (err ? undefined : mapSession(session))) + .filter((v) => !!v); + }); + } +} module.exports = { - InMemorySessionStore + InMemorySessionStore, + RedisSessionStore, };