From 751fe0e144a01ed5733c44a823c61d1ec38a19c5 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Thu, 10 Jan 2013 12:36:34 -0800 Subject: [PATCH] Fixes to runtime universal sub creation and concurrency with re-running subs. Meteor.publish(null, f) now starts universal subscriptions for all existing sessions (as well as future sessions). Fixes #583. Care is taken when a session is re-running its subs. Re-running subs is now implemented by "deactivating" the _LivedataSubscription object and re-creating a new one instead of "resetting" it in place. This allows us to ensure that old publish handlers have no access to the merge boxes (SessionCollectionViews) because deactivated _LivedataSubscriptions ignore callbacks. Thus, we can be assured that after deactivating all subscription, the merge box really cannot be affected by old publishers continuing to run. Note: if a publish function has not even finished its original handler run (eg, if running setUserId in parallel to a universal subscription, which runs in a separate fiber from the message-handling loop), the old publish function *WILL* continue to run (both before and after this commit). However, with this commit, any attempt by the old publish function to call publisher methods will be ignored instead of being perhaps-inconsistently interpreted. If a publisher really needs to know if it has been stopped before it has finished running the handler, it can set an onStop callback before it does anything that can yield. --- packages/livedata/livedata_server.js | 98 ++++++++++++++++++---- packages/livedata/livedata_test_service.js | 12 +++ packages/livedata/livedata_tests.js | 12 +++ 3 files changed, 107 insertions(+), 15 deletions(-) diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index bf29cce159..9be8a29674 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -254,7 +254,15 @@ Meteor._LivedataSession = function (server, version) { self.collectionViews = {}; + // Set this to false to not send messages when collectionViews are + // modified. This is done when rerunning subs in _setUserId and those messages + // are calculated via a diff instead. self._isSending = true; + + // If this is true, don't start a newly-created universal publisher on this + // session. The session will take care of starting it when appropriate. + self._dontStartNewUniversalSubs = false; + // when we are rerunning subscriptions, any completion messages // we want to buffer up for when we are done rerunning subscriptions self._pendingCompletions = []; @@ -361,13 +369,22 @@ _.extend(Meteor._LivedataSession.prototype, { if (!self.initialized) { self.initialized = true; Fiber(function () { - _.each(self.server.universal_publish_handlers, function (handler) { - self._startSubscription(handler); - }); + self.startUniversalSubs(); }).run(); } }, + startUniversalSubs: function () { + var self = this; + // Make a shallow copy of the set of universal handlers and start them. If + // additional universal publishers start while we're running them (due to + // yielding), they will run separately as part of _LivedataServer.publish. + var handlers = _.clone(self.server.universal_publish_handlers); + _.each(handlers, function (handler) { + self._startSubscription(handler); + }); + }, + // If 'socket' is the socket currently connected to this session, // detach it (the session will then have no socket -- it will // continue running and queue up its messages.) If 'socket' isn't @@ -651,13 +668,15 @@ _.extend(Meteor._LivedataSession.prototype, { _setUserId: function(userId) { var self = this; - // Call stop callbacks for each sub, and reset their internal states to - // empty. This may yield. + self._dontStartNewUniversalSubs = true; + + // Prevent current subs from updating our collectionViews and call their + // stop callbacks. This may yield. self._eachSub(function (sub) { - sub._resetSubscription(); + sub._deactivate(); }); - // All subs should now be inactive. Stop sending messages to the client, + // All subs should now be deactivated. Stop sending messages to the client, // save the state of the published collections, reset to an empty view, and // update the userId. self._isSending = false; @@ -665,12 +684,22 @@ _.extend(Meteor._LivedataSession.prototype, { self.collectionViews = {}; self.userId = userId; - // Restart each sub, now with the new userId. - self._eachSub(function (sub) { - sub.userId = self.userId; - sub._runHandler(); + // Save the old named subs, and reset to having no subscriptions. + var oldNamedSubs = self._namedSubs; + self._namedSubs = {}; + self._universalSubs = []; + + _.each(oldNamedSubs, function (sub, subscriptionId) { + self._namedSubs[subscriptionId] = sub._recreate(); + self._namedSubs[subscriptionId]._runHandler(); }); + // Allow newly-created universal subs to be started on our connection in + // parallel with the ones we're spinning up here, and spin up universal + // subs. + self._dontStartNewUniversalSubs = false; + self.startUniversalSubs(); + // Start sending messages again, beginning with the diff from the previous // state of the world to the current state. No yields are allowed during // this diff, so that other changes cannot interleave. @@ -766,7 +795,6 @@ Meteor._LivedataSubscription = function ( // an opinion about self._documents = {}; - // remember if we are complete. self._complete = false; @@ -821,13 +849,34 @@ _.extend(Meteor._LivedataSubscription.prototype, { stop: function () { var self = this; + Meteor._noYieldsAllowed(function () { + self._removeAllDocuments(); + }); + self._deactivate(); + }, + // This is called by setUserId to deactivate the sub (prevent its handler from + // updating the SessionCollectionViews and call its stop callbacks) without + // producing "removed" messages for every document. The expectation is that + // you will start another _LivedataSubscription with the same handler, + // subscriptionId, and params, and diff collection views afterwards. + _deactivate: function() { + var self = this; if (self._stopped) return; self._stopped = true; - self._callStopCallbacks(); - self._removeAllDocuments(); + }, + + // Returns a new _LivedataSubscription for the same session with the same + // initial creation parameters. This isn't a clone: it doesn't have the same + // _documents cache, stopped state or callbacks; may have a different + // _subscriptionHandle, and gets its userId from the session, not from this + // object. + _recreate: function () { + var self = this; + return new Meteor._LivedataSubscription( + self._session, self._handler, self._subscriptionId, self._params); }, // This is meant to be used for a subscription that is about to be rerun. @@ -848,6 +897,8 @@ _.extend(Meteor._LivedataSubscription.prototype, { added: function (collectionName, id, fields) { var self = this; + if (self._stopped) + return; id = self._idFilter.idStringify(id); Meteor._ensure(self._documents, collectionName)[id] = true; self._session.added(self._subscriptionHandle, collectionName, id, fields); @@ -855,12 +906,16 @@ _.extend(Meteor._LivedataSubscription.prototype, { changed: function (collectionName, id, fields) { var self = this; + if (self._stopped) + return; id = self._idFilter.idStringify(id); self._session.changed(self._subscriptionHandle, collectionName, id, fields); }, removed: function (collectionName, id) { var self = this; + if (self._stopped) + return; id = self._idFilter.idStringify(id); // We don't bother to delete sets of things in a collection if the // collection is empty. It could break _removeAllDocuments. @@ -870,6 +925,8 @@ _.extend(Meteor._LivedataSubscription.prototype, { complete: function () { var self = this; + if (self._stopped) + return; if (!self._subscriptionId) return; // unnecessary but ignored for universal sub if (!self._complete) { @@ -1077,8 +1134,19 @@ _.extend(Meteor._LivedataServer.prototype, { if (name) self.publish_handlers[name] = handler; - else + else { self.universal_publish_handlers.push(handler); + // Spin up the new publisher on any existing session too. Run each + // session's subscription in a new Fiber, so that there's no change for + // self.sessions to change while we're running this loop. + _.each(self.sessions, function (session) { + if (!session._dontStartNewUniversalSubs) { + Fiber(function() { + session._startSubscription(handler); + }).run(); + } + }); + } }, methods: function (methods) { diff --git a/packages/livedata/livedata_test_service.js b/packages/livedata/livedata_test_service.js index 7f398fed3a..c021e7a3fb 100644 --- a/packages/livedata/livedata_test_service.js +++ b/packages/livedata/livedata_test_service.js @@ -203,5 +203,17 @@ if (Meteor.isServer) { })(); } +/// Helper for "livedata - runtime universal sub creation" + +if (Meteor.isServer) { + Meteor.methods({ + runtimeUniversalSubCreation: function (token) { + Meteor.publish(null, function () { + this.added("runtimeSubCreation", token, {}); + }); + } + }); +} + })(); diff --git a/packages/livedata/livedata_tests.js b/packages/livedata/livedata_tests.js index ba5ea8b3f9..747cf1398d 100644 --- a/packages/livedata/livedata_tests.js +++ b/packages/livedata/livedata_tests.js @@ -409,6 +409,18 @@ if (Meteor.isClient) { })); } ]); + + testAsyncMulti("livedata - runtime universal sub creation", [ + function (test, expect) { + var coll = new Meteor.Collection("runtimeSubCreation"); + var token = Meteor.uuid(); + test.isFalse(coll.findOne(token)); + Meteor.call("runtimeUniversalSubCreation", token, expect(function (err) { + test.isFalse(err); + test.isTrue(coll.findOne(token)); + })); + } + ]); } // XXX some things to test in greater detail: