diff --git a/packages/livedata/livedata_server.js b/packages/livedata/livedata_server.js index bf29cce159..9be8a29674 100644 --- a/packages/livedata/livedata_server.js +++ b/packages/livedata/livedata_server.js @@ -254,7 +254,15 @@ Meteor._LivedataSession = function (server, version) { self.collectionViews = {}; + // Set this to false to not send messages when collectionViews are + // modified. This is done when rerunning subs in _setUserId and those messages + // are calculated via a diff instead. self._isSending = true; + + // If this is true, don't start a newly-created universal publisher on this + // session. The session will take care of starting it when appropriate. + self._dontStartNewUniversalSubs = false; + // when we are rerunning subscriptions, any completion messages // we want to buffer up for when we are done rerunning subscriptions self._pendingCompletions = []; @@ -361,13 +369,22 @@ _.extend(Meteor._LivedataSession.prototype, { if (!self.initialized) { self.initialized = true; Fiber(function () { - _.each(self.server.universal_publish_handlers, function (handler) { - self._startSubscription(handler); - }); + self.startUniversalSubs(); }).run(); } }, + startUniversalSubs: function () { + var self = this; + // Make a shallow copy of the set of universal handlers and start them. If + // additional universal publishers start while we're running them (due to + // yielding), they will run separately as part of _LivedataServer.publish. + var handlers = _.clone(self.server.universal_publish_handlers); + _.each(handlers, function (handler) { + self._startSubscription(handler); + }); + }, + // If 'socket' is the socket currently connected to this session, // detach it (the session will then have no socket -- it will // continue running and queue up its messages.) If 'socket' isn't @@ -651,13 +668,15 @@ _.extend(Meteor._LivedataSession.prototype, { _setUserId: function(userId) { var self = this; - // Call stop callbacks for each sub, and reset their internal states to - // empty. This may yield. + self._dontStartNewUniversalSubs = true; + + // Prevent current subs from updating our collectionViews and call their + // stop callbacks. This may yield. self._eachSub(function (sub) { - sub._resetSubscription(); + sub._deactivate(); }); - // All subs should now be inactive. Stop sending messages to the client, + // All subs should now be deactivated. Stop sending messages to the client, // save the state of the published collections, reset to an empty view, and // update the userId. self._isSending = false; @@ -665,12 +684,22 @@ _.extend(Meteor._LivedataSession.prototype, { self.collectionViews = {}; self.userId = userId; - // Restart each sub, now with the new userId. - self._eachSub(function (sub) { - sub.userId = self.userId; - sub._runHandler(); + // Save the old named subs, and reset to having no subscriptions. + var oldNamedSubs = self._namedSubs; + self._namedSubs = {}; + self._universalSubs = []; + + _.each(oldNamedSubs, function (sub, subscriptionId) { + self._namedSubs[subscriptionId] = sub._recreate(); + self._namedSubs[subscriptionId]._runHandler(); }); + // Allow newly-created universal subs to be started on our connection in + // parallel with the ones we're spinning up here, and spin up universal + // subs. + self._dontStartNewUniversalSubs = false; + self.startUniversalSubs(); + // Start sending messages again, beginning with the diff from the previous // state of the world to the current state. No yields are allowed during // this diff, so that other changes cannot interleave. @@ -766,7 +795,6 @@ Meteor._LivedataSubscription = function ( // an opinion about self._documents = {}; - // remember if we are complete. self._complete = false; @@ -821,13 +849,34 @@ _.extend(Meteor._LivedataSubscription.prototype, { stop: function () { var self = this; + Meteor._noYieldsAllowed(function () { + self._removeAllDocuments(); + }); + self._deactivate(); + }, + // This is called by setUserId to deactivate the sub (prevent its handler from + // updating the SessionCollectionViews and call its stop callbacks) without + // producing "removed" messages for every document. The expectation is that + // you will start another _LivedataSubscription with the same handler, + // subscriptionId, and params, and diff collection views afterwards. + _deactivate: function() { + var self = this; if (self._stopped) return; self._stopped = true; - self._callStopCallbacks(); - self._removeAllDocuments(); + }, + + // Returns a new _LivedataSubscription for the same session with the same + // initial creation parameters. This isn't a clone: it doesn't have the same + // _documents cache, stopped state or callbacks; may have a different + // _subscriptionHandle, and gets its userId from the session, not from this + // object. + _recreate: function () { + var self = this; + return new Meteor._LivedataSubscription( + self._session, self._handler, self._subscriptionId, self._params); }, // This is meant to be used for a subscription that is about to be rerun. @@ -848,6 +897,8 @@ _.extend(Meteor._LivedataSubscription.prototype, { added: function (collectionName, id, fields) { var self = this; + if (self._stopped) + return; id = self._idFilter.idStringify(id); Meteor._ensure(self._documents, collectionName)[id] = true; self._session.added(self._subscriptionHandle, collectionName, id, fields); @@ -855,12 +906,16 @@ _.extend(Meteor._LivedataSubscription.prototype, { changed: function (collectionName, id, fields) { var self = this; + if (self._stopped) + return; id = self._idFilter.idStringify(id); self._session.changed(self._subscriptionHandle, collectionName, id, fields); }, removed: function (collectionName, id) { var self = this; + if (self._stopped) + return; id = self._idFilter.idStringify(id); // We don't bother to delete sets of things in a collection if the // collection is empty. It could break _removeAllDocuments. @@ -870,6 +925,8 @@ _.extend(Meteor._LivedataSubscription.prototype, { complete: function () { var self = this; + if (self._stopped) + return; if (!self._subscriptionId) return; // unnecessary but ignored for universal sub if (!self._complete) { @@ -1077,8 +1134,19 @@ _.extend(Meteor._LivedataServer.prototype, { if (name) self.publish_handlers[name] = handler; - else + else { self.universal_publish_handlers.push(handler); + // Spin up the new publisher on any existing session too. Run each + // session's subscription in a new Fiber, so that there's no change for + // self.sessions to change while we're running this loop. + _.each(self.sessions, function (session) { + if (!session._dontStartNewUniversalSubs) { + Fiber(function() { + session._startSubscription(handler); + }).run(); + } + }); + } }, methods: function (methods) { diff --git a/packages/livedata/livedata_test_service.js b/packages/livedata/livedata_test_service.js index 7f398fed3a..c021e7a3fb 100644 --- a/packages/livedata/livedata_test_service.js +++ b/packages/livedata/livedata_test_service.js @@ -203,5 +203,17 @@ if (Meteor.isServer) { })(); } +/// Helper for "livedata - runtime universal sub creation" + +if (Meteor.isServer) { + Meteor.methods({ + runtimeUniversalSubCreation: function (token) { + Meteor.publish(null, function () { + this.added("runtimeSubCreation", token, {}); + }); + } + }); +} + })(); diff --git a/packages/livedata/livedata_tests.js b/packages/livedata/livedata_tests.js index ba5ea8b3f9..747cf1398d 100644 --- a/packages/livedata/livedata_tests.js +++ b/packages/livedata/livedata_tests.js @@ -409,6 +409,18 @@ if (Meteor.isClient) { })); } ]); + + testAsyncMulti("livedata - runtime universal sub creation", [ + function (test, expect) { + var coll = new Meteor.Collection("runtimeSubCreation"); + var token = Meteor.uuid(); + test.isFalse(coll.findOne(token)); + Meteor.call("runtimeUniversalSubCreation", token, expect(function (err) { + test.isFalse(err); + test.isTrue(coll.findOne(token)); + })); + } + ]); } // XXX some things to test in greater detail: