diff --git a/packages/livedata/crossbar.js b/packages/livedata/crossbar.js index 1600b4379b..631fefe6f6 100644 --- a/packages/livedata/crossbar.js +++ b/packages/livedata/crossbar.js @@ -1,6 +1,4 @@ // A "crossbar" is a class that provides structured notification registration. -// The "invalidation crossbar" is a specific instance used by the DDP server to -// implement write fence notifications. DDPServer._Crossbar = function (options) { var self = this; @@ -17,10 +15,8 @@ DDPServer._Crossbar = function (options) { _.extend(DDPServer._Crossbar.prototype, { // Listen for notification that match 'trigger'. A notification // matches if it has the key-value pairs in trigger as a - // subset. When a notification matches, call 'callback', passing two - // arguments, the actual notification and an acknowledgement - // function. The callback should call the acknowledgement function - // when it is finished processing the notification. + // subset. When a notification matches, call 'callback', passing + // the actual notification. // // Returns a listen handle, which is an object with a method // stop(). Call stop() to stop listening. @@ -48,14 +44,13 @@ _.extend(DDPServer._Crossbar.prototype, { // Fire the provided 'notification' (an object whose attribute // values are all JSON-compatibile) -- inform all matching listeners - // (registered with listen()), and once they have all acknowledged - // the notification, call onComplete with no arguments. + // (registered with listen()). // // If fire() is called inside a write fence, then each of the // listener callbacks will be called inside the write fence as well. // // The listeners may be invoked in parallel, rather than serially. - fire: function (notification, onComplete) { + fire: function (notification) { var self = this; var callbacks = []; // XXX consider refactoring to "index" on "collection" @@ -64,22 +59,10 @@ _.extend(DDPServer._Crossbar.prototype, { callbacks.push(l.callback); }); - if (onComplete) - onComplete = Meteor.bindEnvironment( - onComplete, - "Crossbar fire complete callback"); - - var outstanding = callbacks.length; - if (!outstanding) - onComplete && onComplete(); - else { - _.each(callbacks, function (c) { - c(notification, function () { - if (--outstanding === 0) - onComplete && onComplete(); - }); - }); - } + _.each(callbacks, function (c) { + // XXX don't call c if it's been stopped already + c(notification); + }); }, // A notification matches a trigger if all keys that exist in both are equal. @@ -107,6 +90,11 @@ _.extend(DDPServer._Crossbar.prototype, { } }); +// The "invalidation crossbar" is a specific instance used by the DDP server to +// implement write fence notifications. Listener callbacks on this crossbar +// should call beginWrite on the current write fence before they return, if they +// want to delay the write fence from firing (ie, the DDP method-data-updated +// message from being sent). DDPServer._InvalidationCrossbar = new DDPServer._Crossbar({ factName: "invalidation-crossbar-listeners" }); diff --git a/packages/livedata/server_convenience.js b/packages/livedata/server_convenience.js index ac20e68fae..1e482b95f0 100644 --- a/packages/livedata/server_convenience.js +++ b/packages/livedata/server_convenience.js @@ -10,16 +10,7 @@ if (Package.webapp) { Meteor.server = new Server; Meteor.refresh = function (notification) { - var fence = DDPServer._CurrentWriteFence.get(); - if (fence) { - // Block the write fence until all of the invalidations have - // landed. - var proxy_write = fence.beginWrite(); - } - DDPServer._InvalidationCrossbar.fire(notification, function () { - if (proxy_write) - proxy_write.committed(); - }); + DDPServer._InvalidationCrossbar.fire(notification); }; // Proxy the public methods of Meteor.server so they can diff --git a/packages/livedata/writefence.js b/packages/livedata/writefence.js index 3315cbfe3c..359fe32a99 100644 --- a/packages/livedata/writefence.js +++ b/packages/livedata/writefence.js @@ -53,6 +53,8 @@ _.extend(DDPServer._WriteFence.prototype, { // uncommitted writes, it will activate. arm: function () { var self = this; + if (self === DDPServer._CurrentWriteFence.get()) + throw Error("Can't arm the current fence"); self.armed = true; self._maybeFire(); }, diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index 9f23ab84e3..363ac59277 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -72,13 +72,11 @@ OplogObserveDriver = function (options) { // XXX ordering w.r.t. everything else? self._stopHandles.push(listenAll( - self._cursorDescription, function (notification, complete) { + self._cursorDescription, function (notification) { // If we're not in a write fence, we don't have to do anything. var fence = DDPServer._CurrentWriteFence.get(); - if (!fence) { - complete(); + if (!fence) return; - } var write = fence.beginWrite(); // This write cannot complete until we've caught up to "this point" in the // oplog, and then made it back to the steady state. @@ -98,7 +96,6 @@ OplogObserveDriver = function (options) { self._writesToCommitWhenWeReachSteady.push(write); } }); - complete(); } )); diff --git a/packages/mongo-livedata/oplog_tailing.js b/packages/mongo-livedata/oplog_tailing.js index a04fb0eee9..b8c044ce3e 100644 --- a/packages/mongo-livedata/oplog_tailing.js +++ b/packages/mongo-livedata/oplog_tailing.js @@ -74,13 +74,9 @@ _.extend(OplogHandle.prototype, { self._readyFuture.wait(); var originalCallback = callback; - callback = Meteor.bindEnvironment(function (notification, onComplete) { + callback = Meteor.bindEnvironment(function (notification) { // XXX can we avoid this clone by making oplog.js careful? - try { - originalCallback(EJSON.clone(notification)); - } finally { - onComplete(); - } + originalCallback(EJSON.clone(notification)); }, function (err) { Meteor._debug("Error in oplog callback", err.stack); }); @@ -208,9 +204,7 @@ _.extend(OplogHandle.prototype, { trigger.id = idForOp(doc); } - var f = new Future; - self._crossbar.fire(trigger, f.resolver()); - f.wait(); + self._crossbar.fire(trigger); // Now that we've processed this operation, process pending sequencers. if (!doc.ts) diff --git a/packages/mongo-livedata/polling_observe_driver.js b/packages/mongo-livedata/polling_observe_driver.js index aa17477c33..c1c700d0c1 100644 --- a/packages/mongo-livedata/polling_observe_driver.js +++ b/packages/mongo-livedata/polling_observe_driver.js @@ -34,7 +34,7 @@ PollingObserveDriver = function (options) { self._taskQueue = new Meteor._SynchronousQueue(); var listenersHandle = listenAll( - self._cursorDescription, function (notification, complete) { + self._cursorDescription, function (notification) { // When someone does a transaction that might affect us, schedule a poll // of the database. If that transaction happens inside of a write fence, // block the fence until we've polled and notified observers. @@ -46,7 +46,6 @@ PollingObserveDriver = function (options) { // lead to us calling it unnecessarily in 50ms). if (self._pollsScheduledButNotStarted === 0) self._ensurePollIsScheduled(); - complete(); } ); self._stopCallbacks.push(function () { listenersHandle.stop(); });