Remove code to keep around sessions after they are disconnected.

We will deal with reconnects with session resumption someday, but it will not be
using this dead code which has bitrotted in our repository for months on end.
It was never fully implemented, and what we had was a sketch that causes bugs
(and extra cpu usage).
This commit is contained in:
Naomi Seyfer
2013-09-23 17:30:34 -07:00
parent 35e8a04834
commit 4f4e5342eb

View File

@@ -212,7 +212,7 @@ _.extend(SessionCollectionView.prototype, {
/* Session */ /* Session */
/******************************************************************************/ /******************************************************************************/
var Session = function (server, version) { var Session = function (server, version, socket) {
var self = this; var self = this;
self.id = Random.id(); self.id = Random.id();
@@ -220,18 +220,11 @@ var Session = function (server, version) {
self.version = version; self.version = version;
self.initialized = false; self.initialized = false;
self.socket = null; self.socket = socket;
self.last_connect_time = 0;
self.last_detach_time = +(new Date);
self.in_queue = []; self.inQueue = [];
self.blocked = false; self.blocked = false;
self.worker_running = false; self.workerRunning = false;
self.out_queue = [];
// id of invocation => {result or error, when}
self.result_cache = {};
// Sub objects for active subscriptions // Sub objects for active subscriptions
self._namedSubs = {}; self._namedSubs = {};
@@ -257,6 +250,13 @@ var Session = function (server, version) {
// when we are rerunning subscriptions, any ready messages // when we are rerunning subscriptions, any ready messages
// we want to buffer up for when we are done rerunning subscriptions // we want to buffer up for when we are done rerunning subscriptions
self._pendingReady = []; self._pendingReady = [];
socket.send(stringifyDDP({msg: 'connected',
session: self.id}));
// On initial connect, spin up all the universal publishers.
Fiber(function () {
self.startUniversalSubs();
}).run();
}; };
_.extend(Session.prototype, { _.extend(Session.prototype, {
@@ -340,32 +340,7 @@ _.extend(Session.prototype, {
var view = self.getCollectionView(collectionName); var view = self.getCollectionView(collectionName);
view.changed(subscriptionHandle, id, fields); view.changed(subscriptionHandle, id, fields);
}, },
// Connect a new socket to this session, displacing (and closing)
// any socket that was previously connected
connect: function (socket) {
var self = this;
if (self.socket) {
self.socket.close();
self.detach(self.socket);
}
self.socket = socket;
self.last_connect_time = +(new Date);
_.each(self.out_queue, function (msg) {
if (Meteor._printSentDDP)
Meteor._debug("Sent DDP", stringifyDDP(msg));
self.socket.send(stringifyDDP(msg));
});
self.out_queue = [];
// On initial connect, spin up all the universal publishers.
if (!self.initialized) {
self.initialized = true;
Fiber(function () {
self.startUniversalSubs();
}).run();
}
},
startUniversalSubs: function () { startUniversalSubs: function () {
var self = this; var self = this;
@@ -378,70 +353,33 @@ _.extend(Session.prototype, {
}); });
}, },
// If 'socket' is the socket currently connected to this session,
// detach it (the session will then have no socket -- it will
// continue running and queue up its messages.) If 'socket' isn't
// the currently connected socket, just clean up the pointer that
// may have led us to believe otherwise.
detach: function (socket) {
var self = this;
if (socket === self.socket) {
self.socket = null;
self.last_detach_time = +(new Date);
}
if (socket.meteor_session === self)
socket.meteor_session = null;
},
// Should be called periodically to prune the method invocation
// replay cache.
cleanup: function () {
var self = this;
// Only prune if we're connected, and we've been connected for at
// least five minutes. That seems like enough time for the client
// to finish its reconnection. Then, keep five minutes of
// history. That seems like enough time for the client to receive
// our responses, or else for us to notice that the connection is
// gone.
var now = +(new Date);
if (!(self.socket && (now - self.last_connect_time) > 5 * 60 * 1000))
return; // not connected, or not connected long enough
var kill = [];
_.each(self.result_cache, function (info, id) {
if (now - info.when > 5 * 60 * 1000)
kill.push(id);
});
_.each(kill, function (id) {
delete self.result_cache[id];
});
},
// Destroy this session. Stop all processing and tear everything // Destroy this session. Stop all processing and tear everything
// down. If a socket was attached, close it. // down. If a socket was attached, close it.
destroy: function () { destroy: function () {
var self = this; var self = this;
if (self.socket) { if (self.socket) {
self.socket.close(); self.socket.close();
self.detach(self.socket); self.socket._meteorSession = null;
} }
self._deactivateAllSubscriptions(); Meteor.defer(function () {
// stop callbacks can yield, so we defer this on destroy.
// see also _closeAllForTokens and its desire to destroy things in a loop.
self._deactivateAllSubscriptions();
});
// Drop the merge box data immediately. // Drop the merge box data immediately.
self.collectionViews = {}; self.collectionViews = {};
self.in_queue = []; self.inQueue = null;
self.out_queue = [];
}, },
// Send a message (queueing it if no socket is connected right now.) // Send a message (doing nothing if no socket is connected right now.)
// It should be a JSON object (it will be stringified.) // It should be a JSON object (it will be stringified.)
send: function (msg) { send: function (msg) {
var self = this; var self = this;
if (Meteor._printSentDDP) if (self.socket) {
Meteor._debug("Sent DDP", stringifyDDP(msg)); if (Meteor._printSentDDP)
if (self.socket) Meteor._debug("Sent DDP", stringifyDDP(msg));
self.socket.send(stringifyDDP(msg)); self.socket.send(stringifyDDP(msg));
else }
self.out_queue.push(msg);
}, },
// Send a connection error. // Send a connection error.
@@ -468,20 +406,20 @@ _.extend(Session.prototype, {
// way, but it's the easiest thing that's correct. (unsub needs to // way, but it's the easiest thing that's correct. (unsub needs to
// be ordered against sub, methods need to be ordered against each // be ordered against sub, methods need to be ordered against each
// other.) // other.)
processMessage: function (msg_in, socket) { processMessage: function (msg_in) {
var self = this; var self = this;
if (socket !== self.socket) if (!self.inQueue) // we have been destroyed.
return; return;
self.in_queue.push(msg_in); self.inQueue.push(msg_in);
if (self.worker_running) if (self.workerRunning)
return; return;
self.worker_running = true; self.workerRunning = true;
var processNext = function () { var processNext = function () {
var msg = self.in_queue.shift(); var msg = self.inQueue && self.inQueue.shift();
if (!msg) { if (!msg) {
self.worker_running = false; self.workerRunning = false;
return; return;
} }
@@ -569,18 +507,6 @@ _.extend(Session.prototype, {
msg: 'updated', methods: [msg.id]}); msg: 'updated', methods: [msg.id]});
}); });
// check for a replayed method (this is important during
// reconnect)
if (_.has(self.result_cache, msg.id)) {
// found -- just resend whatever we sent last time
var payload = _.clone(self.result_cache[msg.id]);
delete payload.when;
self.send(
_.extend({msg: 'result', id: msg.id}, payload));
fence.arm();
return;
}
// find the handler // find the handler
var handler = self.server.method_handlers[msg.method]; var handler = self.server.method_handlers[msg.method];
if (!handler) { if (!handler) {
@@ -628,7 +554,6 @@ _.extend(Session.prototype, {
var payload = var payload =
exception ? {error: exception} : (result !== undefined ? exception ? {error: exception} : (result !== undefined ?
{result: result} : {}); {result: result} : {});
self.result_cache[msg.id] = _.extend({when: +(new Date)}, payload);
self.send(_.extend({msg: 'result', id: msg.id}, payload)); self.send(_.extend({msg: 'result', id: msg.id}, payload));
} }
}, },
@@ -728,6 +653,7 @@ _.extend(Session.prototype, {
} }
}); });
// XXX figure out the login token that was just used, and set up an observe // XXX figure out the login token that was just used, and set up an observe
// on the user doc so that deleting the user or the login token disconnects // on the user doc so that deleting the user or the login token disconnects
// the session. For now, if you want to make sure that your deleted users // the session. For now, if you want to make sure that your deleted users
@@ -1050,7 +976,7 @@ Server = function () {
self.stream_server.register(function (socket) { self.stream_server.register(function (socket) {
// socket implements the SockJSConnection interface // socket implements the SockJSConnection interface
socket.meteor_session = null; socket._meteorSession = null;
var sendError = function (reason, offendingMessage) { var sendError = function (reason, offendingMessage) {
var msg = {msg: 'error', reason: reason}; var msg = {msg: 'error', reason: reason};
@@ -1076,7 +1002,7 @@ Server = function () {
} }
if (msg.msg === 'connect') { if (msg.msg === 'connect') {
if (socket.meteor_session) { if (socket._meteorSession) {
sendError("Already connected", msg); sendError("Already connected", msg);
return; return;
} }
@@ -1084,11 +1010,11 @@ Server = function () {
return; return;
} }
if (!socket.meteor_session) { if (!socket._meteorSession) {
sendError('Must connect first', msg); sendError('Must connect first', msg);
return; return;
} }
socket.meteor_session.processMessage(msg, socket); socket._meteorSession.processMessage(msg);
} catch (e) { } catch (e) {
// XXX print stack nicely // XXX print stack nicely
Meteor._debug("Internal exception while processing message", msg, Meteor._debug("Internal exception while processing message", msg,
@@ -1097,36 +1023,13 @@ Server = function () {
}); });
socket.on('close', function () { socket.on('close', function () {
if (socket.meteor_session) if (socket._meteorSession) {
socket.meteor_session.detach(socket); Fiber(function () {
}); self._destroySession(socket._meteorSession);
}); }).run();
// Every minute, clean up sessions that have been abandoned for a
// minute. Also run result cache cleanup.
// XXX at scale, we'll want to have a separate timer for each
// session, and stagger them
// XXX when we get resume working again, we might keep sessions
// open longer (but stop running their diffs!)
Meteor.setInterval(function () {
var now = +(new Date);
var destroyedIds = [];
_.each(self.sessions, function (s, id) {
s.cleanup();
if (!s.socket && (now - s.last_detach_time) > 60 * 1000) {
s.destroy();
destroyedIds.push(id);
} }
}); });
_.each(destroyedIds, function (id) { });
var session = self.sessions[id];
self.sessionsByLoginToken[session.loginToken] = _.without(
self.sessionsByLoginToken[session.loginToken],
id
);
delete self.sessions[id];
});
}, 1 * 60 * 1000);
}; };
_.extend(Server.prototype, { _.extend(Server.prototype, {
@@ -1134,19 +1037,13 @@ _.extend(Server.prototype, {
_handleConnect: function (socket, msg) { _handleConnect: function (socket, msg) {
var self = this; var self = this;
// In the future, handle session resumption: something like: // In the future, handle session resumption: something like:
// socket.meteor_session = self.sessions[msg.session] // socket._meteorSession = self.sessions[msg.session]
var version = calculateVersion(msg.support, SUPPORTED_DDP_VERSIONS); var version = calculateVersion(msg.support, SUPPORTED_DDP_VERSIONS);
if (msg.version === version) { if (msg.version === version) {
// Creating a new session // Creating a new session
socket.meteor_session = new Session(self, version); socket._meteorSession = new Session(self, version, socket);
self.sessions[socket.meteor_session.id] = socket.meteor_session; self.sessions[socket._meteorSession.id] = socket._meteorSession;
socket.send(stringifyDDP({msg: 'connected',
session: socket.meteor_session.id}));
// will kick off previous connection, if any
socket.meteor_session.connect(socket);
} else if (!msg.version) { } else if (!msg.version) {
// connect message without a version. This means an old (pre-pre1) // connect message without a version. This means an old (pre-pre1)
// client is trying to connect. If we just disconnect the // client is trying to connect. If we just disconnect the
@@ -1240,6 +1137,21 @@ _.extend(Server.prototype, {
} }
}, },
_destroySession: function (session) {
var self = this;
delete self.sessions[session.id];
if (session.sessionData.loginToken) {
self.sessionsByLoginToken[session.sessionData.loginToken] = _.without(
self.sessionsByLoginToken[session.sessionData.loginToken],
session.id
);
if (_.isEmpty(self.sessionsByLoginToken[session.sessionData.loginToken])) {
delete self.sessionsByLoginToken[session.sessionData.loginToken];
}
}
session.destroy();
},
methods: function (methods) { methods: function (methods) {
var self = this; var self = this;
_.each(methods, function (func, name) { _.each(methods, function (func, name) {
@@ -1349,6 +1261,8 @@ _.extend(Server.prototype, {
self.sessionsByLoginToken[oldToken], self.sessionsByLoginToken[oldToken],
session.id session.id
); );
if (_.isEmpty(self.sessionsByLoginToken[oldToken]))
delete self.sessionsByLoginToken[oldToken];
} }
if (! _.has(self.sessionsByLoginToken, newToken)) if (! _.has(self.sessionsByLoginToken, newToken))
self.sessionsByLoginToken[newToken] = []; self.sessionsByLoginToken[newToken] = [];
@@ -1361,25 +1275,14 @@ _.extend(Server.prototype, {
var self = this; var self = this;
_.each(tokens, function (token) { _.each(tokens, function (token) {
if (_.has(self.sessionsByLoginToken, token)) { if (_.has(self.sessionsByLoginToken, token)) {
var destroyedIds = []; // _destroySession modifies sessionsByLoginToken, so we clone it.
_.each(self.sessionsByLoginToken[token], function (sessionId) { _.each(EJSON.clone(self.sessionsByLoginToken[token]), function (sessionId) {
// Destroy session and remove from self.sessions. // Destroy session and remove from self.sessions.
var session = self.sessions[sessionId]; var session = self.sessions[sessionId];
if (session) { if (session) {
session.cleanup(); self._destroySession(session);
session.destroy();
delete self.sessions[sessionId];
} }
destroyedIds.push(sessionId);
}); });
// Remove destroyed sessions from self.sessionsByLoginToken.
self.sessionsByLoginToken[token] = _.filter(
self.sessionsByLoginToken[token],
function (sessionId) {
return _.indexOf(destroyedIds, sessionId) === -1;
}
);
} }
}); });
} }