diff --git a/packages/livedata/crossbar.js b/packages/livedata/crossbar.js index e6db276cac..342a5e0ae9 100644 --- a/packages/livedata/crossbar.js +++ b/packages/livedata/crossbar.js @@ -2,13 +2,16 @@ // The "invalidation crossbar" is a specific instance used by the DDP server to // implement write fence notifications. -DDPServer._Crossbar = function () { +DDPServer._Crossbar = function (options) { var self = this; + options = options || {}; - self.next_id = 1; + self.nextId = 1; // map from listener id to object. each object has keys 'trigger', // 'callback'. self.listeners = {}; + self.factPackage = options.factPackage || "livedata"; + self.factName = options.factName || null; }; _.extend(DDPServer._Crossbar.prototype, { @@ -29,14 +32,18 @@ _.extend(DDPServer._Crossbar.prototype, { // yields. listen: function (trigger, callback) { var self = this; - var id = self.next_id++; + var id = self.nextId++; self.listeners[id] = {trigger: EJSON.clone(trigger), callback: callback}; - Package.facts && Package.facts.Facts.incrementServerFact( - "livedata", "crossbar-listeners", 1); + if (self.factName && Package.facts) { + Package.facts.Facts.incrementServerFact( + self.factPackage, self.factName, 1); + } return { stop: function () { - Package.facts && Package.facts.Facts.incrementServerFact( - "livedata", "crossbar-listeners", -1); + if (self.factName && Package.facts) { + Package.facts.Facts.incrementServerFact( + self.factPackage, self.factName, -1); + } delete self.listeners[id]; } }; @@ -54,6 +61,7 @@ _.extend(DDPServer._Crossbar.prototype, { fire: function (notification, onComplete) { var self = this; var callbacks = []; + // XXX consider refactoring to "index" on "collection" _.each(self.listeners, function (l) { if (self._matches(notification, l.trigger)) callbacks.push(l.callback); @@ -103,4 +111,6 @@ _.extend(DDPServer._Crossbar.prototype, { } }); -DDPServer._InvalidationCrossbar = new DDPServer._Crossbar; +DDPServer._InvalidationCrossbar = new DDPServer._Crossbar({ + factName: "invalidation-crossbar-listeners" +}); diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index ddb01a35de..06cd8d5ea6 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -250,8 +250,9 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, var stopped = false; var tailHandle = null; var readyFuture = new Future(); - var nextId = 0; - var callbacksByCollection = {}; + var crossbar = new DDPServer._Crossbar({ + factPackage: "mongo-livedata", factName: "oplog-watchers" + }); var lastProcessedTS = null; // Lazily calculate the basic selector. Don't call baseOplogSelector() at the // top level of this function, because we don't want this function to block. @@ -277,27 +278,28 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, // XXX should close connections too }, - onOplogEntry: function (collectionName, callback) { + onOplogEntry: function (trigger, callback) { if (stopped) throw new Error("Called onOplogEntry on stopped handle!"); // Calling onOplogEntry requires us to wait for the tailing to be ready. readyFuture.wait(); - callback = Meteor.bindEnvironment(callback, function (err) { + var originalCallback = callback; + callback = Meteor.bindEnvironment(function (notification, onComplete) { + // XXX can we avoid this clone by making oplog.js careful? + try { + originalCallback(EJSON.clone(notification)); + } finally { + onComplete(); + } + }, function (err) { Meteor._debug("Error in oplog callback", err.stack); }); - if (!_.has(callbacksByCollection, collectionName)) - callbacksByCollection[collectionName] = {}; - var callbackId = nextId++; - Package.facts && Package.facts.Facts.incrementServerFact( - "mongo-livedata", "oplog-watchers", 1); - callbacksByCollection[collectionName][callbackId] = callback; + var listenHandle = crossbar.listen(trigger, callback); return { stop: function () { - delete callbacksByCollection[collectionName][callbackId]; - Package.facts && Package.facts.Facts.incrementServerFact( - "mongo-livedata", "oplog-watchers", -1); + listenHandle.stop(); } }; }, @@ -311,8 +313,8 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, if (stopped) throw new Error("Called waitUntilCaughtUp on stopped handle!"); - // Calling onOplogEntry requries us to wait for the oplog connection to be - // ready. + // Calling waitUntilCaughtUp requries us to wait for the oplog connection + // to be ready. readyFuture.wait(); // We need to make the selector at least as restrictive as the actual @@ -403,16 +405,21 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, doc.ns.substr(0, dbName.length + 1) === (dbName + '.'))) throw new Error("Unexpected ns"); - var collectionName = doc.ns.substr(dbName.length + 1); + var trigger = {collection: doc.ns.substr(dbName.length + 1), op: doc}; // Is it a special command and the collection name is hidden somewhere in // operator? - if (collectionName === "$cmd") - collectionName = doc.o.drop; + if (trigger.collection === "$cmd") { + trigger.collection = doc.o.drop; + trigger.dropCollection = true; + } else { + // All other ops have an id. + trigger.id = idForOp(doc); + } - _.each(callbacksByCollection[collectionName], function (callback) { - callback(EJSON.clone(doc)); - }); + var f = new Future; + crossbar.fire(trigger, f.resolver()); + f.wait(); // Now that we've processed this operation, process pending sequencers. if (!doc.ts) @@ -1181,23 +1188,17 @@ MongoConnection.prototype._observeChanges = function ( // here, so that updates to different specific IDs don't cause us to poll. // listenCallback is the same kind of (notification, complete) callback passed // to InvalidationCrossbar.listen. + listenAll = function (cursorDescription, listenCallback) { var listeners = []; - var listenOnTrigger = function (trigger) { + forEachTrigger(cursorDescription, function (trigger) { + // The "drop collection" event is used by the oplog crossbar, not the + // invalidation crossbar. + if (trigger.dropCollection) + return; listeners.push(DDPServer._InvalidationCrossbar.listen( trigger, listenCallback)); - }; - - var key = {collection: cursorDescription.collectionName}; - var specificIds = LocalCollection._idsMatchedBySelector( - cursorDescription.selector); - if (specificIds) { - _.each(specificIds, function (id) { - listenOnTrigger(_.extend({id: id}, key)); - }); - } else { - listenOnTrigger(key); - } + }); return { stop: function () { @@ -1208,6 +1209,20 @@ listenAll = function (cursorDescription, listenCallback) { }; }; +forEachTrigger = function (cursorDescription, triggerCallback) { + var key = {collection: cursorDescription.collectionName}; + var specificIds = LocalCollection._idsMatchedBySelector( + cursorDescription.selector); + if (specificIds) { + _.each(specificIds, function (id) { + triggerCallback(_.extend({id: id}, key)); + }); + triggerCallback(_.extend({dropCollection: true}, key)); + } else { + triggerCallback(key); + } +}; + var MongoPollster = function (cursorDescription, mongoHandle, ordered, multiplexer, testOnlyPollCallback) { var self = this; diff --git a/packages/mongo-livedata/oplog.js b/packages/mongo-livedata/oplog.js index 5f03ec06fa..f9d988f5d5 100644 --- a/packages/mongo-livedata/oplog.js +++ b/packages/mongo-livedata/oplog.js @@ -7,7 +7,7 @@ var PHASE = { STEADY: 3 }; -var idForOp = function (op) { +idForOp = function (op) { if (op.op === 'd') return op.o._id; else if (op.op === 'i') @@ -25,6 +25,7 @@ observeChangesWithOplog = function (cursorDescription, mongoHandle, multiplexer) { var stopped = false; + var stopHandles = []; Package.facts && Package.facts.Facts.incrementServerFact( "mongo-livedata", "oplog-observers", 1); @@ -186,22 +187,26 @@ observeChangesWithOplog = function (cursorDescription, }; oplogEntryHandlers[PHASE.FETCHING] = oplogEntryHandlers[PHASE.STEADY]; - - var oplogEntryHandle = mongoHandle._oplogHandle.onOplogEntry( - cursorDescription.collectionName, function (op) { - if (op.op === 'c') { - published.forEach(function (fields, id) { - remove(id); - }); - } else { - // All other operators should be handled depending on phase - oplogEntryHandlers[phase](op); + forEachTrigger(cursorDescription, function (trigger) { + stopHandles.push(mongoHandle._oplogHandle.onOplogEntry( + trigger, function (notification) { + var op = notification.op; + if (op.op === 'c') { + // XXX actually, drop collection needs to be handled by doing a + // re-query + published.forEach(function (fields, id) { + remove(id); + }); + } else { + // All other operators should be handled depending on phase + oplogEntryHandlers[phase](op); + } } - } - ); + )); + }); // XXX ordering w.r.t. everything else? - var listenersHandle = listenAll( + stopHandles.push(listenAll( cursorDescription, function (notification, complete) { // If we're not in a write fence, we don't have to do anything. var fence = DDPServer._CurrentWriteFence.get(); @@ -225,7 +230,7 @@ observeChangesWithOplog = function (cursorDescription, } }); } - ); + )); // observeChangesWithOplog cannot yield (because the manipulation of // mongoHandle._observeMultiplexers needs to be yield-free); calling @@ -268,8 +273,9 @@ observeChangesWithOplog = function (cursorDescription, if (stopped) return; stopped = true; - listenersHandle.stop(); - oplogEntryHandle.stop(); + _.each(stopHandles, function (handle) { + handle.stop(); + }); published = null; selector = null;