Compare commits

...

6 Commits

19 changed files with 394 additions and 60 deletions

View File

@@ -1,3 +1,11 @@
## [3.1.2](https://github.com/socketio/socket.io/compare/3.1.1...3.1.2) (2021-02-26)
### Bug Fixes
* ignore packets received after disconnection ([494c64e](https://github.com/socketio/socket.io/commit/494c64e44f645cbd24c645f1186d203789e84af0))
## [3.1.1](https://github.com/socketio/socket.io/compare/3.1.0...3.1.1) (2021-02-03)

View File

@@ -1,5 +1,5 @@
/*!
* Socket.IO v3.1.1
* Socket.IO v3.1.2
* (c) 2014-2021 Guillermo Rauch
* Released under the MIT License.
*/
@@ -12,7 +12,7 @@
exports["io"] = factory();
else
root["io"] = factory();
})(window, function() {
})(this, function() {
return /******/ (function(modules) { // webpackBootstrap
/******/ // The module cache
/******/ var installedModules = {};
@@ -2433,6 +2433,17 @@ var Socket = /*#__PURE__*/function (_Emitter) {
_this.pingTimeoutTimer = null;
if (typeof addEventListener === "function") {
addEventListener("beforeunload", function () {
if (_this.transport) {
// silently close the transport
_this.transport.removeAllListeners();
_this.transport.close();
}
}, false);
}
_this.open();
return _this;
@@ -3280,11 +3291,6 @@ var rEscapedNewline = /\\n/g;
*/
var callbacks;
/**
* Noop.
*/
function empty() {}
var JSONPPolling = /*#__PURE__*/function (_Polling) {
_inherits(JSONPPolling, _Polling);
@@ -3320,14 +3326,7 @@ var JSONPPolling = /*#__PURE__*/function (_Polling) {
self.onData(msg);
}); // append to query string
_this.query.j = _this.index; // prevent spurious errors from being emitted when the window is unloaded
if (typeof addEventListener === "function") {
addEventListener("beforeunload", function () {
if (self.script) self.script.onerror = empty;
}, false);
}
_this.query.j = _this.index;
return _this;
}
/**
@@ -3345,6 +3344,9 @@ var JSONPPolling = /*#__PURE__*/function (_Polling) {
*/
value: function doClose() {
if (this.script) {
// prevent spurious errors from being emitted when the window is unloaded
this.script.onerror = function () {};
this.script.parentNode.removeChild(this.script);
this.script = null;
}
@@ -4426,6 +4428,7 @@ var WS = /*#__PURE__*/function (_Transport) {
value: function doClose() {
if (typeof this.ws !== "undefined") {
this.ws.close();
this.ws = null;
}
}
/**

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,31 @@
const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");
const WORKERS_COUNT = 4;
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
for (let i = 0; i < WORKERS_COUNT; i++) {
cluster.fork();
}
cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});
const httpServer = http.createServer();
setupMaster(httpServer, {
loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
});
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
} else {
console.log(`Worker ${process.pid} started`);
require("./index");
}

View File

@@ -0,0 +1,7 @@
version: "3"
services:
redis:
image: redis:5
ports:
- "6379:6379"

View File

@@ -1,52 +1,125 @@
const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});
io.use((socket, next) => {
const { setupWorker } = require("@socket.io/sticky");
const crypto = require("crypto");
const randomId = () => crypto.randomBytes(8).toString("hex");
const { RedisSessionStore } = require("./sessionStore");
const sessionStore = new RedisSessionStore(redisClient);
const { RedisMessageStore } = require("./messageStore");
const messageStore = new RedisMessageStore(redisClient);
io.use(async (socket, next) => {
const sessionID = socket.handshake.auth.sessionID;
if (sessionID) {
const session = await sessionStore.findSession(sessionID);
if (session) {
socket.sessionID = sessionID;
socket.userID = session.userID;
socket.username = session.username;
return next();
}
}
const username = socket.handshake.auth.username;
if (!username) {
return next(new Error("invalid username"));
}
socket.sessionID = randomId();
socket.userID = randomId();
socket.username = username;
next();
});
io.on("connection", (socket) => {
io.on("connection", async (socket) => {
// persist session
sessionStore.saveSession(socket.sessionID, {
userID: socket.userID,
username: socket.username,
connected: true,
});
// emit session details
socket.emit("session", {
sessionID: socket.sessionID,
userID: socket.userID,
});
// join the "userID" room
socket.join(socket.userID);
// fetch existing users
const users = [];
for (let [id, socket] of io.of("/").sockets) {
const [messages, sessions] = await Promise.all([
messageStore.findMessagesForUser(socket.userID),
sessionStore.findAllSessions(),
]);
const messagesPerUser = new Map();
messages.forEach((message) => {
const { from, to } = message;
const otherUser = socket.userID === from ? to : from;
if (messagesPerUser.has(otherUser)) {
messagesPerUser.get(otherUser).push(message);
} else {
messagesPerUser.set(otherUser, [message]);
}
});
sessions.forEach((session) => {
users.push({
userID: id,
username: socket.username,
userID: session.userID,
username: session.username,
connected: session.connected,
messages: messagesPerUser.get(session.userID) || [],
});
}
});
socket.emit("users", users);
// notify existing users
socket.broadcast.emit("user connected", {
userID: socket.id,
userID: socket.userID,
username: socket.username,
connected: true,
messages: [],
});
// forward the private message to the right recipient
// forward the private message to the right recipient (and to other tabs of the sender)
socket.on("private message", ({ content, to }) => {
socket.to(to).emit("private message", {
const message = {
content,
from: socket.id,
});
from: socket.userID,
to,
};
socket.to(to).to(socket.userID).emit("private message", message);
messageStore.saveMessage(message);
});
// notify users upon disconnection
socket.on("disconnect", () => {
socket.broadcast.emit("user disconnected", socket.id);
socket.on("disconnect", async () => {
const matchingSockets = await io.in(socket.userID).allSockets();
const isDisconnected = matchingSockets.size === 0;
if (isDisconnected) {
// notify other users
socket.broadcast.emit("user disconnected", socket.userID);
// update the connection status of the session
sessionStore.saveSession(socket.sessionID, {
userID: socket.userID,
username: socket.username,
connected: false,
});
}
});
});
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
setupWorker(io);

View File

@@ -0,0 +1,54 @@
/* abstract */ class MessageStore {
saveMessage(message) {}
findMessagesForUser(userID) {}
}
class InMemoryMessageStore extends MessageStore {
constructor() {
super();
this.messages = [];
}
saveMessage(message) {
this.messages.push(message);
}
findMessagesForUser(userID) {
return this.messages.filter(
({ from, to }) => from === userID || to === userID
);
}
}
const CONVERSATION_TTL = 24 * 60 * 60;
class RedisMessageStore extends MessageStore {
constructor(redisClient) {
super();
this.redisClient = redisClient;
}
saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}
findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}
module.exports = {
InMemoryMessageStore,
RedisMessageStore,
};

View File

@@ -4,11 +4,14 @@
"description": "",
"main": "index.js",
"scripts": {
"start": "node index.js"
"start": "node cluster.js"
},
"author": "Damien Arrachequesne <damien.arrachequesne@gmail.com>",
"license": "MIT",
"dependencies": {
"socket.io": "^3.1.1"
"@socket.io/sticky": "^1.0.0",
"ioredis": "^4.22.0",
"socket.io": "^3.1.1",
"socket.io-redis": "^6.0.1"
}
}

View File

@@ -0,0 +1,89 @@
/* abstract */ class SessionStore {
findSession(id) {}
saveSession(id, session) {}
findAllSessions() {}
}
class InMemorySessionStore extends SessionStore {
constructor() {
super();
this.sessions = new Map();
}
findSession(id) {
return this.sessions.get(id);
}
saveSession(id, session) {
this.sessions.set(id, session);
}
findAllSessions() {
return [...this.sessions.values()];
}
}
const SESSION_TTL = 24 * 60 * 60;
const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;
class RedisSessionStore extends SessionStore {
constructor(redisClient) {
super();
this.redisClient = redisClient;
}
findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}
saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(
`session:${id}`,
"userID",
userID,
"username",
username,
"connected",
connected
)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}
async findAllSessions() {
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}
module.exports = {
InMemorySessionStore,
RedisSessionStore,
};

View File

@@ -32,6 +32,23 @@ export default {
},
},
created() {
const sessionID = localStorage.getItem("sessionID");
if (sessionID) {
this.usernameAlreadySelected = true;
socket.auth = { sessionID };
socket.connect();
}
socket.on("session", ({ sessionID, userID }) => {
// attach the session ID to the next reconnection attempts
socket.auth = { sessionID };
// store it in the localStorage
localStorage.setItem("sessionID", sessionID);
// save the ID of the user
socket.userID = userID;
});
socket.on("connect_error", (err) => {
if (err.message === "invalid username") {
this.usernameAlreadySelected = false;

View File

@@ -68,18 +68,28 @@ export default {
});
const initReactiveProperties = (user) => {
user.connected = true;
user.messages = [];
user.hasNewMessages = false;
};
socket.on("users", (users) => {
users.forEach((user) => {
user.self = user.userID === socket.id;
user.messages.forEach((message) => {
message.fromSelf = message.from === socket.userID;
});
for (let i = 0; i < this.users.length; i++) {
const existingUser = this.users[i];
if (existingUser.userID === user.userID) {
existingUser.connected = user.connected;
existingUser.messages = user.messages;
return;
}
}
user.self = user.userID === socket.userID;
initReactiveProperties(user);
this.users.push(user);
});
// put the current user first, and sort by username
this.users = users.sort((a, b) => {
this.users.sort((a, b) => {
if (a.self) return -1;
if (b.self) return 1;
if (a.username < b.username) return -1;
@@ -88,6 +98,13 @@ export default {
});
socket.on("user connected", (user) => {
for (let i = 0; i < this.users.length; i++) {
const existingUser = this.users[i];
if (existingUser.userID === user.userID) {
existingUser.connected = true;
return;
}
}
initReactiveProperties(user);
this.users.push(user);
});
@@ -102,13 +119,14 @@ export default {
}
});
socket.on("private message", ({ content, from }) => {
socket.on("private message", ({ content, from, to }) => {
for (let i = 0; i < this.users.length; i++) {
const user = this.users[i];
if (user.userID === from) {
const fromSelf = socket.userID === from;
if (user.userID === (fromSelf ? to : from)) {
user.messages.push({
content,
fromSelf: false,
fromSelf,
});
if (user !== this.selectedUser) {
user.hasNewMessages = true;

View File

@@ -537,7 +537,11 @@ export class Socket extends EventEmitter {
if (err) {
return this._onerror(err);
}
super.emit.apply(this, event);
if (this.connected) {
super.emit.apply(this, event);
} else {
debug("ignore packet received after disconnection");
}
});
});
}

14
package-lock.json generated
View File

@@ -1,6 +1,6 @@
{
"name": "socket.io",
"version": "3.1.1",
"version": "3.1.2",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@@ -874,9 +874,9 @@
}
},
"engine.io-client": {
"version": "4.1.1",
"resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-4.1.1.tgz",
"integrity": "sha512-iYasV/EttP/2pLrdowe9G3zwlNIFhwny8VSIh+vPlMnYZqSzLsTzSLa9hFy015OrH1s4fzoYxeHjVkO8hSFKwg==",
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-4.1.2.tgz",
"integrity": "sha512-1mwvwKYMa0AaCy+sPgvJ/SnKyO5MJZ1HEeXfA3Rm/KHkHGiYD5bQVq8QzvIrkI01FuVtOdZC5lWdRw1BGXB2NQ==",
"dev": true,
"requires": {
"base64-arraybuffer": "0.1.4",
@@ -2258,9 +2258,9 @@
"integrity": "sha512-+vDov/aTsLjViYTwS9fPy5pEtTkrbEKsw2M+oVSoFGw6OD1IpvlV1VPhUzNbofCQ8oyMbdYJqDtGdmHQK6TdPg=="
},
"socket.io-client": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-3.1.1.tgz",
"integrity": "sha512-BLgIuCjI7Sf3mDHunKddX9zKR/pbkP7IACM3sJS3jha+zJ6/pGKRV6Fz5XSBHCfUs9YzT8kYIqNwOOuFNLtnYA==",
"version": "3.1.2",
"resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-3.1.2.tgz",
"integrity": "sha512-fXhF8plHrd7U14A7K0JPOmZzpmGkLpIS6623DzrBZqYzI/yvlP4fA3LnxwthEVgiHmn2uJ4KjdnQD8A03PuBWQ==",
"dev": true,
"requires": {
"@types/component-emitter": "^1.2.10",

View File

@@ -1,6 +1,6 @@
{
"name": "socket.io",
"version": "3.1.1",
"version": "3.1.2",
"description": "node.js realtime framework server",
"keywords": [
"realtime",
@@ -45,7 +45,7 @@
"dependencies": {
"@types/cookie": "^0.4.0",
"@types/cors": "^2.8.8",
"@types/node": "^14.14.10",
"@types/node": ">=10.0.0",
"accepts": "~1.3.4",
"base64id": "~2.0.0",
"debug": "~4.3.1",
@@ -63,7 +63,7 @@
"nyc": "^15.1.0",
"prettier": "^2.2.0",
"rimraf": "^3.0.2",
"socket.io-client": "3.1.1",
"socket.io-client": "3.1.2",
"socket.io-client-v2": "npm:socket.io-client@^2.4.0",
"superagent": "^6.1.0",
"supertest": "^6.0.1",

View File

@@ -1787,6 +1787,33 @@ describe("socket.io", () => {
});
});
it("should ignore a packet received after disconnection", (done) => {
const srv = createServer();
const sio = new Server(srv);
srv.listen(() => {
const clientSocket = client(srv);
const success = () => {
clientSocket.close();
sio.close();
done();
};
sio.on("connection", (socket) => {
socket.on("test", () => {
done(new Error("should not happen"));
});
socket.on("disconnect", success);
});
clientSocket.on("connect", () => {
clientSocket.emit("test", Buffer.alloc(10));
clientSocket.disconnect();
});
});
});
describe("onAny", () => {
it("should call listener", (done) => {
const srv = createServer();