Simplify crossbar by making it synchronous

All existing listener callbacks were already calling complete()
synchronously, so this should not be a functional change.

This allows us to also eliminate the callback from crossbar.fire().

This in turn allows us to eliminate the `proxy_write` in
Meteor.refresh. The only purpose of that write was to keep the current
write fence open until fire's async callback got called; now that fire
works synchronously, it's not necessary!  (This relies on the fact that
write fences never get armed while they are the current write fence,
which now has an assertion.)
This commit is contained in:
David Glasser
2014-01-23 23:13:42 -08:00
parent d868325b83
commit fca2966676
6 changed files with 22 additions and 51 deletions

View File

@@ -1,6 +1,4 @@
// A "crossbar" is a class that provides structured notification registration.
// The "invalidation crossbar" is a specific instance used by the DDP server to
// implement write fence notifications.
DDPServer._Crossbar = function (options) {
var self = this;
@@ -17,10 +15,8 @@ DDPServer._Crossbar = function (options) {
_.extend(DDPServer._Crossbar.prototype, {
// Listen for notification that match 'trigger'. A notification
// matches if it has the key-value pairs in trigger as a
// subset. When a notification matches, call 'callback', passing two
// arguments, the actual notification and an acknowledgement
// function. The callback should call the acknowledgement function
// when it is finished processing the notification.
// subset. When a notification matches, call 'callback', passing
// the actual notification.
//
// Returns a listen handle, which is an object with a method
// stop(). Call stop() to stop listening.
@@ -48,14 +44,13 @@ _.extend(DDPServer._Crossbar.prototype, {
// Fire the provided 'notification' (an object whose attribute
// values are all JSON-compatibile) -- inform all matching listeners
// (registered with listen()), and once they have all acknowledged
// the notification, call onComplete with no arguments.
// (registered with listen()).
//
// If fire() is called inside a write fence, then each of the
// listener callbacks will be called inside the write fence as well.
//
// The listeners may be invoked in parallel, rather than serially.
fire: function (notification, onComplete) {
fire: function (notification) {
var self = this;
var callbacks = [];
// XXX consider refactoring to "index" on "collection"
@@ -64,22 +59,10 @@ _.extend(DDPServer._Crossbar.prototype, {
callbacks.push(l.callback);
});
if (onComplete)
onComplete = Meteor.bindEnvironment(
onComplete,
"Crossbar fire complete callback");
var outstanding = callbacks.length;
if (!outstanding)
onComplete && onComplete();
else {
_.each(callbacks, function (c) {
c(notification, function () {
if (--outstanding === 0)
onComplete && onComplete();
});
});
}
_.each(callbacks, function (c) {
// XXX don't call c if it's been stopped already
c(notification);
});
},
// A notification matches a trigger if all keys that exist in both are equal.
@@ -107,6 +90,11 @@ _.extend(DDPServer._Crossbar.prototype, {
}
});
// The "invalidation crossbar" is a specific instance used by the DDP server to
// implement write fence notifications. Listener callbacks on this crossbar
// should call beginWrite on the current write fence before they return, if they
// want to delay the write fence from firing (ie, the DDP method-data-updated
// message from being sent).
DDPServer._InvalidationCrossbar = new DDPServer._Crossbar({
factName: "invalidation-crossbar-listeners"
});

View File

@@ -10,16 +10,7 @@ if (Package.webapp) {
Meteor.server = new Server;
Meteor.refresh = function (notification) {
var fence = DDPServer._CurrentWriteFence.get();
if (fence) {
// Block the write fence until all of the invalidations have
// landed.
var proxy_write = fence.beginWrite();
}
DDPServer._InvalidationCrossbar.fire(notification, function () {
if (proxy_write)
proxy_write.committed();
});
DDPServer._InvalidationCrossbar.fire(notification);
};
// Proxy the public methods of Meteor.server so they can

View File

@@ -53,6 +53,8 @@ _.extend(DDPServer._WriteFence.prototype, {
// uncommitted writes, it will activate.
arm: function () {
var self = this;
if (self === DDPServer._CurrentWriteFence.get())
throw Error("Can't arm the current fence");
self.armed = true;
self._maybeFire();
},

View File

@@ -72,13 +72,11 @@ OplogObserveDriver = function (options) {
// XXX ordering w.r.t. everything else?
self._stopHandles.push(listenAll(
self._cursorDescription, function (notification, complete) {
self._cursorDescription, function (notification) {
// If we're not in a write fence, we don't have to do anything.
var fence = DDPServer._CurrentWriteFence.get();
if (!fence) {
complete();
if (!fence)
return;
}
var write = fence.beginWrite();
// This write cannot complete until we've caught up to "this point" in the
// oplog, and then made it back to the steady state.
@@ -98,7 +96,6 @@ OplogObserveDriver = function (options) {
self._writesToCommitWhenWeReachSteady.push(write);
}
});
complete();
}
));

View File

@@ -74,13 +74,9 @@ _.extend(OplogHandle.prototype, {
self._readyFuture.wait();
var originalCallback = callback;
callback = Meteor.bindEnvironment(function (notification, onComplete) {
callback = Meteor.bindEnvironment(function (notification) {
// XXX can we avoid this clone by making oplog.js careful?
try {
originalCallback(EJSON.clone(notification));
} finally {
onComplete();
}
originalCallback(EJSON.clone(notification));
}, function (err) {
Meteor._debug("Error in oplog callback", err.stack);
});
@@ -208,9 +204,7 @@ _.extend(OplogHandle.prototype, {
trigger.id = idForOp(doc);
}
var f = new Future;
self._crossbar.fire(trigger, f.resolver());
f.wait();
self._crossbar.fire(trigger);
// Now that we've processed this operation, process pending sequencers.
if (!doc.ts)

View File

@@ -34,7 +34,7 @@ PollingObserveDriver = function (options) {
self._taskQueue = new Meteor._SynchronousQueue();
var listenersHandle = listenAll(
self._cursorDescription, function (notification, complete) {
self._cursorDescription, function (notification) {
// When someone does a transaction that might affect us, schedule a poll
// of the database. If that transaction happens inside of a write fence,
// block the fence until we've polled and notified observers.
@@ -46,7 +46,6 @@ PollingObserveDriver = function (options) {
// lead to us calling it unnecessarily in 50ms).
if (self._pollsScheduledButNotStarted === 0)
self._ensurePollIsScheduled();
complete();
}
);
self._stopCallbacks.push(function () { listenersHandle.stop(); });