From 4f4e5342eb6767f3837d43689f9618b46345e8a1 Mon Sep 17 00:00:00 2001 From: Naomi Seyfer Date: Mon, 23 Sep 2013 17:30:34 -0700 Subject: [PATCH] Remove code to keep around sessions after they are disconnected. We will deal with reconnects with session resumption someday, but it will not be using this dead code which has bitrotted in our repository for months on end. It was never fully implemented, and what we had was a sketch that causes bugs (and extra cpu usage). --- packages/livedata/livedata_server.js | 223 ++++++++------------------- 1 file changed, 63 insertions(+), 160 deletions(-) diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index 1ecea732b4..11bd6752a5 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -212,7 +212,7 @@ _.extend(SessionCollectionView.prototype, { /* Session */ /******************************************************************************/ -var Session = function (server, version) { +var Session = function (server, version, socket) { var self = this; self.id = Random.id(); @@ -220,18 +220,11 @@ var Session = function (server, version) { self.version = version; self.initialized = false; - self.socket = null; - self.last_connect_time = 0; - self.last_detach_time = +(new Date); + self.socket = socket; - self.in_queue = []; + self.inQueue = []; self.blocked = false; - self.worker_running = false; - - self.out_queue = []; - - // id of invocation => {result or error, when} - self.result_cache = {}; + self.workerRunning = false; // Sub objects for active subscriptions self._namedSubs = {}; @@ -257,6 +250,13 @@ var Session = function (server, version) { // when we are rerunning subscriptions, any ready messages // we want to buffer up for when we are done rerunning subscriptions self._pendingReady = []; + + socket.send(stringifyDDP({msg: 'connected', + session: self.id})); + // On initial connect, spin up all the universal publishers. + Fiber(function () { + self.startUniversalSubs(); + }).run(); }; _.extend(Session.prototype, { @@ -340,32 +340,7 @@ _.extend(Session.prototype, { var view = self.getCollectionView(collectionName); view.changed(subscriptionHandle, id, fields); }, - // Connect a new socket to this session, displacing (and closing) - // any socket that was previously connected - connect: function (socket) { - var self = this; - if (self.socket) { - self.socket.close(); - self.detach(self.socket); - } - self.socket = socket; - self.last_connect_time = +(new Date); - _.each(self.out_queue, function (msg) { - if (Meteor._printSentDDP) - Meteor._debug("Sent DDP", stringifyDDP(msg)); - self.socket.send(stringifyDDP(msg)); - }); - self.out_queue = []; - - // On initial connect, spin up all the universal publishers. - if (!self.initialized) { - self.initialized = true; - Fiber(function () { - self.startUniversalSubs(); - }).run(); - } - }, startUniversalSubs: function () { var self = this; @@ -378,70 +353,33 @@ _.extend(Session.prototype, { }); }, - // If 'socket' is the socket currently connected to this session, - // detach it (the session will then have no socket -- it will - // continue running and queue up its messages.) If 'socket' isn't - // the currently connected socket, just clean up the pointer that - // may have led us to believe otherwise. - detach: function (socket) { - var self = this; - if (socket === self.socket) { - self.socket = null; - self.last_detach_time = +(new Date); - } - if (socket.meteor_session === self) - socket.meteor_session = null; - }, - - // Should be called periodically to prune the method invocation - // replay cache. - cleanup: function () { - var self = this; - // Only prune if we're connected, and we've been connected for at - // least five minutes. That seems like enough time for the client - // to finish its reconnection. Then, keep five minutes of - // history. That seems like enough time for the client to receive - // our responses, or else for us to notice that the connection is - // gone. - var now = +(new Date); - if (!(self.socket && (now - self.last_connect_time) > 5 * 60 * 1000)) - return; // not connected, or not connected long enough - - var kill = []; - _.each(self.result_cache, function (info, id) { - if (now - info.when > 5 * 60 * 1000) - kill.push(id); - }); - _.each(kill, function (id) { - delete self.result_cache[id]; - }); - }, - // Destroy this session. Stop all processing and tear everything // down. If a socket was attached, close it. destroy: function () { var self = this; if (self.socket) { self.socket.close(); - self.detach(self.socket); + self.socket._meteorSession = null; } - self._deactivateAllSubscriptions(); + Meteor.defer(function () { + // stop callbacks can yield, so we defer this on destroy. + // see also _closeAllForTokens and its desire to destroy things in a loop. + self._deactivateAllSubscriptions(); + }); // Drop the merge box data immediately. self.collectionViews = {}; - self.in_queue = []; - self.out_queue = []; + self.inQueue = null; }, - // Send a message (queueing it if no socket is connected right now.) + // Send a message (doing nothing if no socket is connected right now.) // It should be a JSON object (it will be stringified.) send: function (msg) { var self = this; - if (Meteor._printSentDDP) - Meteor._debug("Sent DDP", stringifyDDP(msg)); - if (self.socket) + if (self.socket) { + if (Meteor._printSentDDP) + Meteor._debug("Sent DDP", stringifyDDP(msg)); self.socket.send(stringifyDDP(msg)); - else - self.out_queue.push(msg); + } }, // Send a connection error. @@ -468,20 +406,20 @@ _.extend(Session.prototype, { // way, but it's the easiest thing that's correct. (unsub needs to // be ordered against sub, methods need to be ordered against each // other.) - processMessage: function (msg_in, socket) { + processMessage: function (msg_in) { var self = this; - if (socket !== self.socket) + if (!self.inQueue) // we have been destroyed. return; - self.in_queue.push(msg_in); - if (self.worker_running) + self.inQueue.push(msg_in); + if (self.workerRunning) return; - self.worker_running = true; + self.workerRunning = true; var processNext = function () { - var msg = self.in_queue.shift(); + var msg = self.inQueue && self.inQueue.shift(); if (!msg) { - self.worker_running = false; + self.workerRunning = false; return; } @@ -569,18 +507,6 @@ _.extend(Session.prototype, { msg: 'updated', methods: [msg.id]}); }); - // check for a replayed method (this is important during - // reconnect) - if (_.has(self.result_cache, msg.id)) { - // found -- just resend whatever we sent last time - var payload = _.clone(self.result_cache[msg.id]); - delete payload.when; - self.send( - _.extend({msg: 'result', id: msg.id}, payload)); - fence.arm(); - return; - } - // find the handler var handler = self.server.method_handlers[msg.method]; if (!handler) { @@ -628,7 +554,6 @@ _.extend(Session.prototype, { var payload = exception ? {error: exception} : (result !== undefined ? {result: result} : {}); - self.result_cache[msg.id] = _.extend({when: +(new Date)}, payload); self.send(_.extend({msg: 'result', id: msg.id}, payload)); } }, @@ -728,6 +653,7 @@ _.extend(Session.prototype, { } }); + // XXX figure out the login token that was just used, and set up an observe // on the user doc so that deleting the user or the login token disconnects // the session. For now, if you want to make sure that your deleted users @@ -1050,7 +976,7 @@ Server = function () { self.stream_server.register(function (socket) { // socket implements the SockJSConnection interface - socket.meteor_session = null; + socket._meteorSession = null; var sendError = function (reason, offendingMessage) { var msg = {msg: 'error', reason: reason}; @@ -1076,7 +1002,7 @@ Server = function () { } if (msg.msg === 'connect') { - if (socket.meteor_session) { + if (socket._meteorSession) { sendError("Already connected", msg); return; } @@ -1084,11 +1010,11 @@ Server = function () { return; } - if (!socket.meteor_session) { + if (!socket._meteorSession) { sendError('Must connect first', msg); return; } - socket.meteor_session.processMessage(msg, socket); + socket._meteorSession.processMessage(msg); } catch (e) { // XXX print stack nicely Meteor._debug("Internal exception while processing message", msg, @@ -1097,36 +1023,13 @@ Server = function () { }); socket.on('close', function () { - if (socket.meteor_session) - socket.meteor_session.detach(socket); - }); - }); - - // Every minute, clean up sessions that have been abandoned for a - // minute. Also run result cache cleanup. - // XXX at scale, we'll want to have a separate timer for each - // session, and stagger them - // XXX when we get resume working again, we might keep sessions - // open longer (but stop running their diffs!) - Meteor.setInterval(function () { - var now = +(new Date); - var destroyedIds = []; - _.each(self.sessions, function (s, id) { - s.cleanup(); - if (!s.socket && (now - s.last_detach_time) > 60 * 1000) { - s.destroy(); - destroyedIds.push(id); + if (socket._meteorSession) { + Fiber(function () { + self._destroySession(socket._meteorSession); + }).run(); } }); - _.each(destroyedIds, function (id) { - var session = self.sessions[id]; - self.sessionsByLoginToken[session.loginToken] = _.without( - self.sessionsByLoginToken[session.loginToken], - id - ); - delete self.sessions[id]; - }); - }, 1 * 60 * 1000); + }); }; _.extend(Server.prototype, { @@ -1134,19 +1037,13 @@ _.extend(Server.prototype, { _handleConnect: function (socket, msg) { var self = this; // In the future, handle session resumption: something like: - // socket.meteor_session = self.sessions[msg.session] + // socket._meteorSession = self.sessions[msg.session] var version = calculateVersion(msg.support, SUPPORTED_DDP_VERSIONS); if (msg.version === version) { // Creating a new session - socket.meteor_session = new Session(self, version); - self.sessions[socket.meteor_session.id] = socket.meteor_session; - - - socket.send(stringifyDDP({msg: 'connected', - session: socket.meteor_session.id})); - // will kick off previous connection, if any - socket.meteor_session.connect(socket); + socket._meteorSession = new Session(self, version, socket); + self.sessions[socket._meteorSession.id] = socket._meteorSession; } else if (!msg.version) { // connect message without a version. This means an old (pre-pre1) // client is trying to connect. If we just disconnect the @@ -1240,6 +1137,21 @@ _.extend(Server.prototype, { } }, + _destroySession: function (session) { + var self = this; + delete self.sessions[session.id]; + if (session.sessionData.loginToken) { + self.sessionsByLoginToken[session.sessionData.loginToken] = _.without( + self.sessionsByLoginToken[session.sessionData.loginToken], + session.id + ); + if (_.isEmpty(self.sessionsByLoginToken[session.sessionData.loginToken])) { + delete self.sessionsByLoginToken[session.sessionData.loginToken]; + } + } + session.destroy(); + }, + methods: function (methods) { var self = this; _.each(methods, function (func, name) { @@ -1349,6 +1261,8 @@ _.extend(Server.prototype, { self.sessionsByLoginToken[oldToken], session.id ); + if (_.isEmpty(self.sessionsByLoginToken[oldToken])) + delete self.sessionsByLoginToken[oldToken]; } if (! _.has(self.sessionsByLoginToken, newToken)) self.sessionsByLoginToken[newToken] = []; @@ -1361,25 +1275,14 @@ _.extend(Server.prototype, { var self = this; _.each(tokens, function (token) { if (_.has(self.sessionsByLoginToken, token)) { - var destroyedIds = []; - _.each(self.sessionsByLoginToken[token], function (sessionId) { + // _destroySession modifies sessionsByLoginToken, so we clone it. + _.each(EJSON.clone(self.sessionsByLoginToken[token]), function (sessionId) { // Destroy session and remove from self.sessions. var session = self.sessions[sessionId]; if (session) { - session.cleanup(); - session.destroy(); - delete self.sessions[sessionId]; + self._destroySession(session); } - destroyedIds.push(sessionId); }); - - // Remove destroyed sessions from self.sessionsByLoginToken. - self.sessionsByLoginToken[token] = _.filter( - self.sessionsByLoginToken[token], - function (sessionId) { - return _.indexOf(destroyedIds, sessionId) === -1; - } - ); } }); }