Compare commits

...

4 Commits

17 changed files with 231 additions and 50 deletions

View File

@@ -1,3 +1,11 @@
## [3.1.2](https://github.com/socketio/socket.io/compare/3.1.1...3.1.2) (2021-02-26)
### Bug Fixes
* ignore packets received after disconnection ([494c64e](https://github.com/socketio/socket.io/commit/494c64e44f645cbd24c645f1186d203789e84af0))
## [3.1.1](https://github.com/socketio/socket.io/compare/3.1.0...3.1.1) (2021-02-03)

View File

@@ -1,5 +1,5 @@
/*!
* Socket.IO v3.1.1
* Socket.IO v3.1.2
* (c) 2014-2021 Guillermo Rauch
* Released under the MIT License.
*/
@@ -12,7 +12,7 @@
exports["io"] = factory();
else
root["io"] = factory();
})(window, function() {
})(this, function() {
return /******/ (function(modules) { // webpackBootstrap
/******/ // The module cache
/******/ var installedModules = {};
@@ -2433,6 +2433,17 @@ var Socket = /*#__PURE__*/function (_Emitter) {
_this.pingTimeoutTimer = null;
if (typeof addEventListener === "function") {
addEventListener("beforeunload", function () {
if (_this.transport) {
// silently close the transport
_this.transport.removeAllListeners();
_this.transport.close();
}
}, false);
}
_this.open();
return _this;
@@ -3280,11 +3291,6 @@ var rEscapedNewline = /\\n/g;
*/
var callbacks;
/**
* Noop.
*/
function empty() {}
var JSONPPolling = /*#__PURE__*/function (_Polling) {
_inherits(JSONPPolling, _Polling);
@@ -3320,14 +3326,7 @@ var JSONPPolling = /*#__PURE__*/function (_Polling) {
self.onData(msg);
}); // append to query string
_this.query.j = _this.index; // prevent spurious errors from being emitted when the window is unloaded
if (typeof addEventListener === "function") {
addEventListener("beforeunload", function () {
if (self.script) self.script.onerror = empty;
}, false);
}
_this.query.j = _this.index;
return _this;
}
/**
@@ -3345,6 +3344,9 @@ var JSONPPolling = /*#__PURE__*/function (_Polling) {
*/
value: function doClose() {
if (this.script) {
// prevent spurious errors from being emitted when the window is unloaded
this.script.onerror = function () {};
this.script.parentNode.removeChild(this.script);
this.script = null;
}
@@ -4426,6 +4428,7 @@ var WS = /*#__PURE__*/function (_Transport) {
value: function doClose() {
if (typeof this.ws !== "undefined") {
this.ws.close();
this.ws = null;
}
}
/**

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,31 @@
const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");
const WORKERS_COUNT = 4;
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
for (let i = 0; i < WORKERS_COUNT; i++) {
cluster.fork();
}
cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});
const httpServer = http.createServer();
setupMaster(httpServer, {
loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
});
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
} else {
console.log(`Worker ${process.pid} started`);
require("./index");
}

View File

@@ -0,0 +1,7 @@
version: "3"
services:
redis:
image: redis:5
ports:
- "6379:6379"

View File

@@ -1,23 +1,30 @@
const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});
const { setupWorker } = require("@socket.io/sticky");
const crypto = require("crypto");
const randomId = () => crypto.randomBytes(8).toString("hex");
const { InMemorySessionStore } = require("./sessionStore");
const sessionStore = new InMemorySessionStore();
const { RedisSessionStore } = require("./sessionStore");
const sessionStore = new RedisSessionStore(redisClient);
const { InMemoryMessageStore } = require("./messageStore");
const messageStore = new InMemoryMessageStore();
const { RedisMessageStore } = require("./messageStore");
const messageStore = new RedisMessageStore(redisClient);
io.use((socket, next) => {
io.use(async (socket, next) => {
const sessionID = socket.handshake.auth.sessionID;
if (sessionID) {
const session = sessionStore.findSession(sessionID);
const session = await sessionStore.findSession(sessionID);
if (session) {
socket.sessionID = sessionID;
socket.userID = session.userID;
@@ -35,7 +42,7 @@ io.use((socket, next) => {
next();
});
io.on("connection", (socket) => {
io.on("connection", async (socket) => {
// persist session
sessionStore.saveSession(socket.sessionID, {
userID: socket.userID,
@@ -54,8 +61,12 @@ io.on("connection", (socket) => {
// fetch existing users
const users = [];
const [messages, sessions] = await Promise.all([
messageStore.findMessagesForUser(socket.userID),
sessionStore.findAllSessions(),
]);
const messagesPerUser = new Map();
messageStore.findMessagesForUser(socket.userID).forEach((message) => {
messages.forEach((message) => {
const { from, to } = message;
const otherUser = socket.userID === from ? to : from;
if (messagesPerUser.has(otherUser)) {
@@ -64,7 +75,8 @@ io.on("connection", (socket) => {
messagesPerUser.set(otherUser, [message]);
}
});
sessionStore.findAllSessions().forEach((session) => {
sessions.forEach((session) => {
users.push({
userID: session.userID,
username: session.username,
@@ -110,8 +122,4 @@ io.on("connection", (socket) => {
});
});
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
setupWorker(io);

View File

@@ -20,6 +20,35 @@ class InMemoryMessageStore extends MessageStore {
}
}
const CONVERSATION_TTL = 24 * 60 * 60;
class RedisMessageStore extends MessageStore {
constructor(redisClient) {
super();
this.redisClient = redisClient;
}
saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}
findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}
module.exports = {
InMemoryMessageStore,
RedisMessageStore,
};

View File

@@ -4,11 +4,14 @@
"description": "",
"main": "index.js",
"scripts": {
"start": "node index.js"
"start": "node cluster.js"
},
"author": "Damien Arrachequesne <damien.arrachequesne@gmail.com>",
"license": "MIT",
"dependencies": {
"socket.io": "^3.1.1"
"@socket.io/sticky": "^1.0.0",
"ioredis": "^4.22.0",
"socket.io": "^3.1.1",
"socket.io-redis": "^6.0.1"
}
}

View File

@@ -23,6 +23,67 @@ class InMemorySessionStore extends SessionStore {
}
}
const SESSION_TTL = 24 * 60 * 60;
const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;
class RedisSessionStore extends SessionStore {
constructor(redisClient) {
super();
this.redisClient = redisClient;
}
findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}
saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(
`session:${id}`,
"userID",
userID,
"username",
username,
"connected",
connected
)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}
async findAllSessions() {
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}
module.exports = {
InMemorySessionStore
InMemorySessionStore,
RedisSessionStore,
};

View File

@@ -537,7 +537,11 @@ export class Socket extends EventEmitter {
if (err) {
return this._onerror(err);
}
super.emit.apply(this, event);
if (this.connected) {
super.emit.apply(this, event);
} else {
debug("ignore packet received after disconnection");
}
});
});
}

14
package-lock.json generated
View File

@@ -1,6 +1,6 @@
{
"name": "socket.io",
"version": "3.1.1",
"version": "3.1.2",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@@ -874,9 +874,9 @@
}
},
"engine.io-client": {
"version": "4.1.1",
"resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-4.1.1.tgz",
"integrity": "sha512-iYasV/EttP/2pLrdowe9G3zwlNIFhwny8VSIh+vPlMnYZqSzLsTzSLa9hFy015OrH1s4fzoYxeHjVkO8hSFKwg==",
"version": "4.1.2",
"resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-4.1.2.tgz",
"integrity": "sha512-1mwvwKYMa0AaCy+sPgvJ/SnKyO5MJZ1HEeXfA3Rm/KHkHGiYD5bQVq8QzvIrkI01FuVtOdZC5lWdRw1BGXB2NQ==",
"dev": true,
"requires": {
"base64-arraybuffer": "0.1.4",
@@ -2258,9 +2258,9 @@
"integrity": "sha512-+vDov/aTsLjViYTwS9fPy5pEtTkrbEKsw2M+oVSoFGw6OD1IpvlV1VPhUzNbofCQ8oyMbdYJqDtGdmHQK6TdPg=="
},
"socket.io-client": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-3.1.1.tgz",
"integrity": "sha512-BLgIuCjI7Sf3mDHunKddX9zKR/pbkP7IACM3sJS3jha+zJ6/pGKRV6Fz5XSBHCfUs9YzT8kYIqNwOOuFNLtnYA==",
"version": "3.1.2",
"resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-3.1.2.tgz",
"integrity": "sha512-fXhF8plHrd7U14A7K0JPOmZzpmGkLpIS6623DzrBZqYzI/yvlP4fA3LnxwthEVgiHmn2uJ4KjdnQD8A03PuBWQ==",
"dev": true,
"requires": {
"@types/component-emitter": "^1.2.10",

View File

@@ -1,6 +1,6 @@
{
"name": "socket.io",
"version": "3.1.1",
"version": "3.1.2",
"description": "node.js realtime framework server",
"keywords": [
"realtime",
@@ -45,7 +45,7 @@
"dependencies": {
"@types/cookie": "^0.4.0",
"@types/cors": "^2.8.8",
"@types/node": "^14.14.10",
"@types/node": ">=10.0.0",
"accepts": "~1.3.4",
"base64id": "~2.0.0",
"debug": "~4.3.1",
@@ -63,7 +63,7 @@
"nyc": "^15.1.0",
"prettier": "^2.2.0",
"rimraf": "^3.0.2",
"socket.io-client": "3.1.1",
"socket.io-client": "3.1.2",
"socket.io-client-v2": "npm:socket.io-client@^2.4.0",
"superagent": "^6.1.0",
"supertest": "^6.0.1",

View File

@@ -1787,6 +1787,33 @@ describe("socket.io", () => {
});
});
it("should ignore a packet received after disconnection", (done) => {
const srv = createServer();
const sio = new Server(srv);
srv.listen(() => {
const clientSocket = client(srv);
const success = () => {
clientSocket.close();
sio.close();
done();
};
sio.on("connection", (socket) => {
socket.on("test", () => {
done(new Error("should not happen"));
});
socket.on("disconnect", success);
});
clientSocket.on("connect", () => {
clientSocket.emit("test", Buffer.alloc(10));
clientSocket.disconnect();
});
});
});
describe("onAny", () => {
it("should call listener", (done) => {
const srv = createServer();