From fca2966676e45e126d36534c9541c58527fa3efc Mon Sep 17 00:00:00 2001 From: David Glasser Date: Thu, 23 Jan 2014 23:13:42 -0800 Subject: [PATCH] Simplify crossbar by making it synchronous All existing listener callbacks were already calling complete() synchronously, so this should not be a functional change. This allows us to also eliminate the callback from crossbar.fire(). This in turn allows us to eliminate the `proxy_write` in Meteor.refresh. The only purpose of that write was to keep the current write fence open until fire's async callback got called; now that fire works synchronously, it's not necessary! (This relies on the fact that write fences never get armed while they are the current write fence, which now has an assertion.) --- packages/livedata/crossbar.js | 38 +++++++------------ packages/livedata/server_convenience.js | 11 +----- packages/livedata/writefence.js | 2 + .../mongo-livedata/oplog_observe_driver.js | 7 +--- packages/mongo-livedata/oplog_tailing.js | 12 ++---- .../mongo-livedata/polling_observe_driver.js | 3 +- 6 files changed, 22 insertions(+), 51 deletions(-) diff --git a/packages/livedata/crossbar.js b/packages/livedata/crossbar.js index 1600b4379b..631fefe6f6 100644 --- a/packages/livedata/crossbar.js +++ b/packages/livedata/crossbar.js @@ -1,6 +1,4 @@ // A "crossbar" is a class that provides structured notification registration. -// The "invalidation crossbar" is a specific instance used by the DDP server to -// implement write fence notifications. DDPServer._Crossbar = function (options) { var self = this; @@ -17,10 +15,8 @@ DDPServer._Crossbar = function (options) { _.extend(DDPServer._Crossbar.prototype, { // Listen for notification that match 'trigger'. A notification // matches if it has the key-value pairs in trigger as a - // subset. When a notification matches, call 'callback', passing two - // arguments, the actual notification and an acknowledgement - // function. The callback should call the acknowledgement function - // when it is finished processing the notification. + // subset. When a notification matches, call 'callback', passing + // the actual notification. // // Returns a listen handle, which is an object with a method // stop(). Call stop() to stop listening. @@ -48,14 +44,13 @@ _.extend(DDPServer._Crossbar.prototype, { // Fire the provided 'notification' (an object whose attribute // values are all JSON-compatibile) -- inform all matching listeners - // (registered with listen()), and once they have all acknowledged - // the notification, call onComplete with no arguments. + // (registered with listen()). // // If fire() is called inside a write fence, then each of the // listener callbacks will be called inside the write fence as well. // // The listeners may be invoked in parallel, rather than serially. - fire: function (notification, onComplete) { + fire: function (notification) { var self = this; var callbacks = []; // XXX consider refactoring to "index" on "collection" @@ -64,22 +59,10 @@ _.extend(DDPServer._Crossbar.prototype, { callbacks.push(l.callback); }); - if (onComplete) - onComplete = Meteor.bindEnvironment( - onComplete, - "Crossbar fire complete callback"); - - var outstanding = callbacks.length; - if (!outstanding) - onComplete && onComplete(); - else { - _.each(callbacks, function (c) { - c(notification, function () { - if (--outstanding === 0) - onComplete && onComplete(); - }); - }); - } + _.each(callbacks, function (c) { + // XXX don't call c if it's been stopped already + c(notification); + }); }, // A notification matches a trigger if all keys that exist in both are equal. @@ -107,6 +90,11 @@ _.extend(DDPServer._Crossbar.prototype, { } }); +// The "invalidation crossbar" is a specific instance used by the DDP server to +// implement write fence notifications. Listener callbacks on this crossbar +// should call beginWrite on the current write fence before they return, if they +// want to delay the write fence from firing (ie, the DDP method-data-updated +// message from being sent). DDPServer._InvalidationCrossbar = new DDPServer._Crossbar({ factName: "invalidation-crossbar-listeners" }); diff --git a/packages/livedata/server_convenience.js b/packages/livedata/server_convenience.js index ac20e68fae..1e482b95f0 100644 --- a/packages/livedata/server_convenience.js +++ b/packages/livedata/server_convenience.js @@ -10,16 +10,7 @@ if (Package.webapp) { Meteor.server = new Server; Meteor.refresh = function (notification) { - var fence = DDPServer._CurrentWriteFence.get(); - if (fence) { - // Block the write fence until all of the invalidations have - // landed. - var proxy_write = fence.beginWrite(); - } - DDPServer._InvalidationCrossbar.fire(notification, function () { - if (proxy_write) - proxy_write.committed(); - }); + DDPServer._InvalidationCrossbar.fire(notification); }; // Proxy the public methods of Meteor.server so they can diff --git a/packages/livedata/writefence.js b/packages/livedata/writefence.js index 3315cbfe3c..359fe32a99 100644 --- a/packages/livedata/writefence.js +++ b/packages/livedata/writefence.js @@ -53,6 +53,8 @@ _.extend(DDPServer._WriteFence.prototype, { // uncommitted writes, it will activate. arm: function () { var self = this; + if (self === DDPServer._CurrentWriteFence.get()) + throw Error("Can't arm the current fence"); self.armed = true; self._maybeFire(); }, diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index 9f23ab84e3..363ac59277 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -72,13 +72,11 @@ OplogObserveDriver = function (options) { // XXX ordering w.r.t. everything else? self._stopHandles.push(listenAll( - self._cursorDescription, function (notification, complete) { + self._cursorDescription, function (notification) { // If we're not in a write fence, we don't have to do anything. var fence = DDPServer._CurrentWriteFence.get(); - if (!fence) { - complete(); + if (!fence) return; - } var write = fence.beginWrite(); // This write cannot complete until we've caught up to "this point" in the // oplog, and then made it back to the steady state. @@ -98,7 +96,6 @@ OplogObserveDriver = function (options) { self._writesToCommitWhenWeReachSteady.push(write); } }); - complete(); } )); diff --git a/packages/mongo-livedata/oplog_tailing.js b/packages/mongo-livedata/oplog_tailing.js index a04fb0eee9..b8c044ce3e 100644 --- a/packages/mongo-livedata/oplog_tailing.js +++ b/packages/mongo-livedata/oplog_tailing.js @@ -74,13 +74,9 @@ _.extend(OplogHandle.prototype, { self._readyFuture.wait(); var originalCallback = callback; - callback = Meteor.bindEnvironment(function (notification, onComplete) { + callback = Meteor.bindEnvironment(function (notification) { // XXX can we avoid this clone by making oplog.js careful? - try { - originalCallback(EJSON.clone(notification)); - } finally { - onComplete(); - } + originalCallback(EJSON.clone(notification)); }, function (err) { Meteor._debug("Error in oplog callback", err.stack); }); @@ -208,9 +204,7 @@ _.extend(OplogHandle.prototype, { trigger.id = idForOp(doc); } - var f = new Future; - self._crossbar.fire(trigger, f.resolver()); - f.wait(); + self._crossbar.fire(trigger); // Now that we've processed this operation, process pending sequencers. if (!doc.ts) diff --git a/packages/mongo-livedata/polling_observe_driver.js b/packages/mongo-livedata/polling_observe_driver.js index aa17477c33..c1c700d0c1 100644 --- a/packages/mongo-livedata/polling_observe_driver.js +++ b/packages/mongo-livedata/polling_observe_driver.js @@ -34,7 +34,7 @@ PollingObserveDriver = function (options) { self._taskQueue = new Meteor._SynchronousQueue(); var listenersHandle = listenAll( - self._cursorDescription, function (notification, complete) { + self._cursorDescription, function (notification) { // When someone does a transaction that might affect us, schedule a poll // of the database. If that transaction happens inside of a write fence, // block the fence until we've polled and notified observers. @@ -46,7 +46,6 @@ PollingObserveDriver = function (options) { // lead to us calling it unnecessarily in 50ms). if (self._pollsScheduledButNotStarted === 0) self._ensurePollIsScheduled(); - complete(); } ); self._stopCallbacks.push(function () { listenersHandle.stop(); });