docs(examples): 4th and final part of the "private messaging" example

See also: https://socket.io/get-started/private-messaging-part-4/
This commit is contained in:
Damien Arrachequesne
2021-02-17 00:24:23 +01:00
parent 7247b4051f
commit 7467216e02
6 changed files with 156 additions and 17 deletions

View File

@@ -0,0 +1,31 @@
const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");
const WORKERS_COUNT = 4;
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
for (let i = 0; i < WORKERS_COUNT; i++) {
cluster.fork();
}
cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});
const httpServer = http.createServer();
setupMaster(httpServer, {
loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
});
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
} else {
console.log(`Worker ${process.pid} started`);
require("./index");
}

View File

@@ -0,0 +1,7 @@
version: "3"
services:
redis:
image: redis:5
ports:
- "6379:6379"

View File

@@ -1,23 +1,30 @@
const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});
const { setupWorker } = require("@socket.io/sticky");
const crypto = require("crypto");
const randomId = () => crypto.randomBytes(8).toString("hex");
const { InMemorySessionStore } = require("./sessionStore");
const sessionStore = new InMemorySessionStore();
const { RedisSessionStore } = require("./sessionStore");
const sessionStore = new RedisSessionStore(redisClient);
const { InMemoryMessageStore } = require("./messageStore");
const messageStore = new InMemoryMessageStore();
const { RedisMessageStore } = require("./messageStore");
const messageStore = new RedisMessageStore(redisClient);
io.use((socket, next) => {
io.use(async (socket, next) => {
const sessionID = socket.handshake.auth.sessionID;
if (sessionID) {
const session = sessionStore.findSession(sessionID);
const session = await sessionStore.findSession(sessionID);
if (session) {
socket.sessionID = sessionID;
socket.userID = session.userID;
@@ -35,7 +42,7 @@ io.use((socket, next) => {
next();
});
io.on("connection", (socket) => {
io.on("connection", async (socket) => {
// persist session
sessionStore.saveSession(socket.sessionID, {
userID: socket.userID,
@@ -54,8 +61,12 @@ io.on("connection", (socket) => {
// fetch existing users
const users = [];
const [messages, sessions] = await Promise.all([
messageStore.findMessagesForUser(socket.userID),
sessionStore.findAllSessions(),
]);
const messagesPerUser = new Map();
messageStore.findMessagesForUser(socket.userID).forEach((message) => {
messages.forEach((message) => {
const { from, to } = message;
const otherUser = socket.userID === from ? to : from;
if (messagesPerUser.has(otherUser)) {
@@ -64,7 +75,8 @@ io.on("connection", (socket) => {
messagesPerUser.set(otherUser, [message]);
}
});
sessionStore.findAllSessions().forEach((session) => {
sessions.forEach((session) => {
users.push({
userID: session.userID,
username: session.username,
@@ -110,8 +122,4 @@ io.on("connection", (socket) => {
});
});
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
setupWorker(io);

View File

@@ -20,6 +20,35 @@ class InMemoryMessageStore extends MessageStore {
}
}
const CONVERSATION_TTL = 24 * 60 * 60;
class RedisMessageStore extends MessageStore {
constructor(redisClient) {
super();
this.redisClient = redisClient;
}
saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}
findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}
module.exports = {
InMemoryMessageStore,
RedisMessageStore,
};

View File

@@ -4,11 +4,14 @@
"description": "",
"main": "index.js",
"scripts": {
"start": "node index.js"
"start": "node cluster.js"
},
"author": "Damien Arrachequesne <damien.arrachequesne@gmail.com>",
"license": "MIT",
"dependencies": {
"socket.io": "^3.1.1"
"@socket.io/sticky": "^1.0.0",
"ioredis": "^4.22.0",
"socket.io": "^3.1.1",
"socket.io-redis": "^6.0.1"
}
}

View File

@@ -23,6 +23,67 @@ class InMemorySessionStore extends SessionStore {
}
}
const SESSION_TTL = 24 * 60 * 60;
const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;
class RedisSessionStore extends SessionStore {
constructor(redisClient) {
super();
this.redisClient = redisClient;
}
findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}
saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(
`session:${id}`,
"userID",
userID,
"username",
username,
"connected",
connected
)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}
async findAllSessions() {
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}
module.exports = {
InMemorySessionStore
InMemorySessionStore,
RedisSessionStore,
};