diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index 1ecea732b4..11bd6752a5 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -212,7 +212,7 @@ _.extend(SessionCollectionView.prototype, { /* Session */ /******************************************************************************/ -var Session = function (server, version) { +var Session = function (server, version, socket) { var self = this; self.id = Random.id(); @@ -220,18 +220,11 @@ var Session = function (server, version) { self.version = version; self.initialized = false; - self.socket = null; - self.last_connect_time = 0; - self.last_detach_time = +(new Date); + self.socket = socket; - self.in_queue = []; + self.inQueue = []; self.blocked = false; - self.worker_running = false; - - self.out_queue = []; - - // id of invocation => {result or error, when} - self.result_cache = {}; + self.workerRunning = false; // Sub objects for active subscriptions self._namedSubs = {}; @@ -257,6 +250,13 @@ var Session = function (server, version) { // when we are rerunning subscriptions, any ready messages // we want to buffer up for when we are done rerunning subscriptions self._pendingReady = []; + + socket.send(stringifyDDP({msg: 'connected', + session: self.id})); + // On initial connect, spin up all the universal publishers. + Fiber(function () { + self.startUniversalSubs(); + }).run(); }; _.extend(Session.prototype, { @@ -340,32 +340,7 @@ _.extend(Session.prototype, { var view = self.getCollectionView(collectionName); view.changed(subscriptionHandle, id, fields); }, - // Connect a new socket to this session, displacing (and closing) - // any socket that was previously connected - connect: function (socket) { - var self = this; - if (self.socket) { - self.socket.close(); - self.detach(self.socket); - } - self.socket = socket; - self.last_connect_time = +(new Date); - _.each(self.out_queue, function (msg) { - if (Meteor._printSentDDP) - Meteor._debug("Sent DDP", stringifyDDP(msg)); - self.socket.send(stringifyDDP(msg)); - }); - self.out_queue = []; - - // On initial connect, spin up all the universal publishers. - if (!self.initialized) { - self.initialized = true; - Fiber(function () { - self.startUniversalSubs(); - }).run(); - } - }, startUniversalSubs: function () { var self = this; @@ -378,70 +353,33 @@ _.extend(Session.prototype, { }); }, - // If 'socket' is the socket currently connected to this session, - // detach it (the session will then have no socket -- it will - // continue running and queue up its messages.) If 'socket' isn't - // the currently connected socket, just clean up the pointer that - // may have led us to believe otherwise. - detach: function (socket) { - var self = this; - if (socket === self.socket) { - self.socket = null; - self.last_detach_time = +(new Date); - } - if (socket.meteor_session === self) - socket.meteor_session = null; - }, - - // Should be called periodically to prune the method invocation - // replay cache. - cleanup: function () { - var self = this; - // Only prune if we're connected, and we've been connected for at - // least five minutes. That seems like enough time for the client - // to finish its reconnection. Then, keep five minutes of - // history. That seems like enough time for the client to receive - // our responses, or else for us to notice that the connection is - // gone. - var now = +(new Date); - if (!(self.socket && (now - self.last_connect_time) > 5 * 60 * 1000)) - return; // not connected, or not connected long enough - - var kill = []; - _.each(self.result_cache, function (info, id) { - if (now - info.when > 5 * 60 * 1000) - kill.push(id); - }); - _.each(kill, function (id) { - delete self.result_cache[id]; - }); - }, - // Destroy this session. Stop all processing and tear everything // down. If a socket was attached, close it. destroy: function () { var self = this; if (self.socket) { self.socket.close(); - self.detach(self.socket); + self.socket._meteorSession = null; } - self._deactivateAllSubscriptions(); + Meteor.defer(function () { + // stop callbacks can yield, so we defer this on destroy. + // see also _closeAllForTokens and its desire to destroy things in a loop. + self._deactivateAllSubscriptions(); + }); // Drop the merge box data immediately. self.collectionViews = {}; - self.in_queue = []; - self.out_queue = []; + self.inQueue = null; }, - // Send a message (queueing it if no socket is connected right now.) + // Send a message (doing nothing if no socket is connected right now.) // It should be a JSON object (it will be stringified.) send: function (msg) { var self = this; - if (Meteor._printSentDDP) - Meteor._debug("Sent DDP", stringifyDDP(msg)); - if (self.socket) + if (self.socket) { + if (Meteor._printSentDDP) + Meteor._debug("Sent DDP", stringifyDDP(msg)); self.socket.send(stringifyDDP(msg)); - else - self.out_queue.push(msg); + } }, // Send a connection error. @@ -468,20 +406,20 @@ _.extend(Session.prototype, { // way, but it's the easiest thing that's correct. (unsub needs to // be ordered against sub, methods need to be ordered against each // other.) - processMessage: function (msg_in, socket) { + processMessage: function (msg_in) { var self = this; - if (socket !== self.socket) + if (!self.inQueue) // we have been destroyed. return; - self.in_queue.push(msg_in); - if (self.worker_running) + self.inQueue.push(msg_in); + if (self.workerRunning) return; - self.worker_running = true; + self.workerRunning = true; var processNext = function () { - var msg = self.in_queue.shift(); + var msg = self.inQueue && self.inQueue.shift(); if (!msg) { - self.worker_running = false; + self.workerRunning = false; return; } @@ -569,18 +507,6 @@ _.extend(Session.prototype, { msg: 'updated', methods: [msg.id]}); }); - // check for a replayed method (this is important during - // reconnect) - if (_.has(self.result_cache, msg.id)) { - // found -- just resend whatever we sent last time - var payload = _.clone(self.result_cache[msg.id]); - delete payload.when; - self.send( - _.extend({msg: 'result', id: msg.id}, payload)); - fence.arm(); - return; - } - // find the handler var handler = self.server.method_handlers[msg.method]; if (!handler) { @@ -628,7 +554,6 @@ _.extend(Session.prototype, { var payload = exception ? {error: exception} : (result !== undefined ? {result: result} : {}); - self.result_cache[msg.id] = _.extend({when: +(new Date)}, payload); self.send(_.extend({msg: 'result', id: msg.id}, payload)); } }, @@ -728,6 +653,7 @@ _.extend(Session.prototype, { } }); + // XXX figure out the login token that was just used, and set up an observe // on the user doc so that deleting the user or the login token disconnects // the session. For now, if you want to make sure that your deleted users @@ -1050,7 +976,7 @@ Server = function () { self.stream_server.register(function (socket) { // socket implements the SockJSConnection interface - socket.meteor_session = null; + socket._meteorSession = null; var sendError = function (reason, offendingMessage) { var msg = {msg: 'error', reason: reason}; @@ -1076,7 +1002,7 @@ Server = function () { } if (msg.msg === 'connect') { - if (socket.meteor_session) { + if (socket._meteorSession) { sendError("Already connected", msg); return; } @@ -1084,11 +1010,11 @@ Server = function () { return; } - if (!socket.meteor_session) { + if (!socket._meteorSession) { sendError('Must connect first', msg); return; } - socket.meteor_session.processMessage(msg, socket); + socket._meteorSession.processMessage(msg); } catch (e) { // XXX print stack nicely Meteor._debug("Internal exception while processing message", msg, @@ -1097,36 +1023,13 @@ Server = function () { }); socket.on('close', function () { - if (socket.meteor_session) - socket.meteor_session.detach(socket); - }); - }); - - // Every minute, clean up sessions that have been abandoned for a - // minute. Also run result cache cleanup. - // XXX at scale, we'll want to have a separate timer for each - // session, and stagger them - // XXX when we get resume working again, we might keep sessions - // open longer (but stop running their diffs!) - Meteor.setInterval(function () { - var now = +(new Date); - var destroyedIds = []; - _.each(self.sessions, function (s, id) { - s.cleanup(); - if (!s.socket && (now - s.last_detach_time) > 60 * 1000) { - s.destroy(); - destroyedIds.push(id); + if (socket._meteorSession) { + Fiber(function () { + self._destroySession(socket._meteorSession); + }).run(); } }); - _.each(destroyedIds, function (id) { - var session = self.sessions[id]; - self.sessionsByLoginToken[session.loginToken] = _.without( - self.sessionsByLoginToken[session.loginToken], - id - ); - delete self.sessions[id]; - }); - }, 1 * 60 * 1000); + }); }; _.extend(Server.prototype, { @@ -1134,19 +1037,13 @@ _.extend(Server.prototype, { _handleConnect: function (socket, msg) { var self = this; // In the future, handle session resumption: something like: - // socket.meteor_session = self.sessions[msg.session] + // socket._meteorSession = self.sessions[msg.session] var version = calculateVersion(msg.support, SUPPORTED_DDP_VERSIONS); if (msg.version === version) { // Creating a new session - socket.meteor_session = new Session(self, version); - self.sessions[socket.meteor_session.id] = socket.meteor_session; - - - socket.send(stringifyDDP({msg: 'connected', - session: socket.meteor_session.id})); - // will kick off previous connection, if any - socket.meteor_session.connect(socket); + socket._meteorSession = new Session(self, version, socket); + self.sessions[socket._meteorSession.id] = socket._meteorSession; } else if (!msg.version) { // connect message without a version. This means an old (pre-pre1) // client is trying to connect. If we just disconnect the @@ -1240,6 +1137,21 @@ _.extend(Server.prototype, { } }, + _destroySession: function (session) { + var self = this; + delete self.sessions[session.id]; + if (session.sessionData.loginToken) { + self.sessionsByLoginToken[session.sessionData.loginToken] = _.without( + self.sessionsByLoginToken[session.sessionData.loginToken], + session.id + ); + if (_.isEmpty(self.sessionsByLoginToken[session.sessionData.loginToken])) { + delete self.sessionsByLoginToken[session.sessionData.loginToken]; + } + } + session.destroy(); + }, + methods: function (methods) { var self = this; _.each(methods, function (func, name) { @@ -1349,6 +1261,8 @@ _.extend(Server.prototype, { self.sessionsByLoginToken[oldToken], session.id ); + if (_.isEmpty(self.sessionsByLoginToken[oldToken])) + delete self.sessionsByLoginToken[oldToken]; } if (! _.has(self.sessionsByLoginToken, newToken)) self.sessionsByLoginToken[newToken] = []; @@ -1361,25 +1275,14 @@ _.extend(Server.prototype, { var self = this; _.each(tokens, function (token) { if (_.has(self.sessionsByLoginToken, token)) { - var destroyedIds = []; - _.each(self.sessionsByLoginToken[token], function (sessionId) { + // _destroySession modifies sessionsByLoginToken, so we clone it. + _.each(EJSON.clone(self.sessionsByLoginToken[token]), function (sessionId) { // Destroy session and remove from self.sessions. var session = self.sessions[sessionId]; if (session) { - session.cleanup(); - session.destroy(); - delete self.sessions[sessionId]; + self._destroySession(session); } - destroyedIds.push(sessionId); }); - - // Remove destroyed sessions from self.sessionsByLoginToken. - self.sessionsByLoginToken[token] = _.filter( - self.sessionsByLoginToken[token], - function (sessionId) { - return _.indexOf(destroyedIds, sessionId) === -1; - } - ); } }); }