From aeac87285e95f08e1a9ca88db94e8841f022bdd8 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Tue, 26 Nov 2013 17:16:24 -0800 Subject: [PATCH] rename to {Mongo,Oplog}ObserveDriver Give a consistent constructor API betweent the two. --- packages/livedata/crossbar.js | 3 -- packages/mongo-livedata/mongo_driver.js | 36 +++++++++---------- .../mongo-livedata/mongo_livedata_tests.js | 8 ++--- packages/mongo-livedata/oplog.js | 33 +++++++++-------- packages/mongo-livedata/package.js | 2 +- .../{mongo_pollster.js => polling.js} | 25 +++++++------ 6 files changed, 52 insertions(+), 55 deletions(-) rename packages/mongo-livedata/{mongo_pollster.js => polling.js} (92%) diff --git a/packages/livedata/crossbar.js b/packages/livedata/crossbar.js index 342a5e0ae9..358ddd5a22 100644 --- a/packages/livedata/crossbar.js +++ b/packages/livedata/crossbar.js @@ -27,9 +27,6 @@ _.extend(DDPServer._Crossbar.prototype, { // // XXX It should be legal to call fire() from inside a listen() // callback? - // - // Note: the MongoPollster constructor assumes that a call to listen() never - // yields. listen: function (trigger, callback) { var self = this; var id = self.nextId++; diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index e40d4cc3e2..8145371a33 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -840,12 +840,12 @@ MongoConnection.prototype._dropIndex = function (collectionName, index) { // reference to an ObserveMultiplexer. // // ObserveMultiplexer allows multiple identical ObserveHandles to be driven by a -// single low-level observe process such as a MongoPollster. +// single observe driver. // -// There are two "observe implementations" which drive ObserveMultiplexers: -// - MongoPollster caches the results of a query and reruns it when +// There are two "observe drivers" which drive ObserveMultiplexers: +// - PollingObserveDriver caches the results of a query and reruns it when // necessary. -// - OplogTailer follows the Mongo operation log to directly observe +// - OplogObserveDriver follows the Mongo operation log to directly observe // database changes. // Both implementations follow the same simple interface: when you create them, // they start sending observeChanges callbacks (and a ready() invocation) to @@ -1145,7 +1145,7 @@ MongoConnection.prototype._observeChanges = function ( var observeKey = JSON.stringify( _.extend({ordered: ordered}, cursorDescription)); - var multiplexer, observeImplementation; + var multiplexer, observeDriver; var firstHandle = false; // Find a matching ObserveMultiplexer, or create a new one. This next block is @@ -1155,40 +1155,38 @@ MongoConnection.prototype._observeChanges = function ( if (_.has(self._observeMultiplexers, observeKey)) { multiplexer = self._observeMultiplexers[observeKey]; } else { + firstHandle = true; // Create a new ObserveMultiplexer. multiplexer = new ObserveMultiplexer({ ordered: ordered, onStop: function () { - observeImplementation.stop(); + observeDriver.stop(); delete self._observeMultiplexers[observeKey]; } }); self._observeMultiplexers[observeKey] = multiplexer; - firstHandle = true; } }); var observeHandle = new ObserveHandle(multiplexer, callbacks); if (firstHandle) { + var driverClass = PollingObserveDriver; if (self._oplogHandle && !ordered && !callbacks._testOnlyPollCallback && cursorSupportedByOplogTailing(cursorDescription)) { - // Can yield! - observeImplementation = new OplogTailer( - cursorDescription, self, multiplexer); - } else { - // Start polling. - observeImplementation = new MongoPollster( - cursorDescription, - self, - multiplexer, - ordered, - callbacks._testOnlyPollCallback); + driverClass = OplogObserveDriver; } + observeDriver = new driverClass({ + cursorDescription: cursorDescription, + mongoHandle: self, + multiplexer: multiplexer, + ordered: ordered, + _testOnlyPollCallback: callbacks._testOnlyPollCallback + }); // This field is only set for the first ObserveHandle in an // ObserveMultiplexer. It is only there for use by one test. - observeHandle._observeImplementation = observeImplementation; + observeHandle._observeDriver = observeDriver; } // Blocks until the initial adds have been sent. diff --git a/packages/mongo-livedata/mongo_livedata_tests.js b/packages/mongo-livedata/mongo_livedata_tests.js index ce32bb54e9..5c12cbfb10 100644 --- a/packages/mongo-livedata/mongo_livedata_tests.js +++ b/packages/mongo-livedata/mongo_livedata_tests.js @@ -387,8 +387,8 @@ Tinytest.addAsync("mongo-livedata - fuzz test, " + idGeneration, function(test, // run. if (Meteor.isServer) { // For now, has to be polling (not oplog). - test.isTrue(obs._observeImplementation); - test.isTrue(obs._observeImplementation._suspendPolling); + test.isTrue(obs._observeDriver); + test.isTrue(obs._observeDriver._suspendPolling); } var step = 0; @@ -423,7 +423,7 @@ Tinytest.addAsync("mongo-livedata - fuzz test, " + idGeneration, function(test, finishObserve(function () { if (Meteor.isServer) - obs._observeImplementation._suspendPolling(); + obs._observeDriver._suspendPolling(); // Do a batch of 1-10 operations var batch_count = rnd(10) + 1; @@ -456,7 +456,7 @@ Tinytest.addAsync("mongo-livedata - fuzz test, " + idGeneration, function(test, } } if (Meteor.isServer) - obs._observeImplementation._resumePolling(); + obs._observeDriver._resumePolling(); }); diff --git a/packages/mongo-livedata/oplog.js b/packages/mongo-livedata/oplog.js index a7016581dd..f603152a85 100644 --- a/packages/mongo-livedata/oplog.js +++ b/packages/mongo-livedata/oplog.js @@ -7,17 +7,19 @@ var PHASE = { STEADY: 3 }; -// OplogTailer is an alternative to MongoPollster which follows the Mongo -// operation log instead of just re-polling the query. It obeys the same simple -// interface: constructing it starts sending observeChanges callbacks (and a -// ready() invocation) to the ObserveMultiplexer, and you stop it by calling -// the stop() method. -OplogTailer = function (cursorDescription, mongoHandle, multiplexer) { +// OplogObserveDriver is an alternative to PollingObserveDriver which follows +// the Mongo operation log instead of just re-polling the query. It obeys the +// same simple interface: constructing it starts sending observeChanges +// callbacks (and a ready() invocation) to the ObserveMultiplexer, and you stop +// it by calling the stop() method. +OplogObserveDriver = function (options) { var self = this; - self._cursorDescription = cursorDescription; - self._mongoHandle = mongoHandle; - self._multiplexer = multiplexer; + self._cursorDescription = options.cursorDescription; + self._mongoHandle = options.mongoHandle; + self._multiplexer = options.multiplexer; + if (options.ordered) + throw Error("OplogObserveDriver only supports unordered observeChanges"); self._stopped = false; self._stopHandles = []; @@ -28,9 +30,10 @@ OplogTailer = function (cursorDescription, mongoHandle, multiplexer) { self._phase = PHASE.INITIALIZING; self._published = new LocalCollection._IdMap; - var selector = cursorDescription.selector; - self._selectorFn = LocalCollection._compileSelector(selector); - var projection = cursorDescription.options.fields || {}; + var selector = self._cursorDescription.selector; + self._selectorFn = LocalCollection._compileSelector( + self._cursorDescription.selector); + var projection = self._cursorDescription.options.fields || {}; self._projectionFn = LocalCollection._compileProjection(projection); // Projection function, result of combining important fields for selector and // existing fields projection @@ -44,7 +47,7 @@ OplogTailer = function (cursorDescription, mongoHandle, multiplexer) { self._writesToCommitWhenWeReachSteady = []; - forEachTrigger(cursorDescription, function (trigger) { + forEachTrigger(self._cursorDescription, function (trigger) { self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry( trigger, function (notification) { var op = notification.op; @@ -67,7 +70,7 @@ OplogTailer = function (cursorDescription, mongoHandle, multiplexer) { // XXX ordering w.r.t. everything else? self._stopHandles.push(listenAll( - cursorDescription, function (notification, complete) { + self._cursorDescription, function (notification, complete) { // If we're not in a write fence, we don't have to do anything. var fence = DDPServer._CurrentWriteFence.get(); if (!fence) { @@ -102,7 +105,7 @@ OplogTailer = function (cursorDescription, mongoHandle, multiplexer) { }); }; -_.extend(OplogTailer.prototype, { +_.extend(OplogObserveDriver.prototype, { _add: function (doc) { var self = this; var id = doc._id; diff --git a/packages/mongo-livedata/package.js b/packages/mongo-livedata/package.js index 2b0061ef15..5553acc731 100644 --- a/packages/mongo-livedata/package.js +++ b/packages/mongo-livedata/package.js @@ -43,7 +43,7 @@ Package.on_use(function (api) { api.export('MongoTest', 'server', {testOnly: true}); api.add_files(['doc_fetcher.js', 'mongo_driver.js', 'observe_multiplex.js', - 'mongo_pollster.js', 'oplog.js'], 'server'); + 'polling.js', 'oplog.js'], 'server'); api.add_files('local_collection_driver.js', ['client', 'server']); api.add_files('remote_collection_driver.js', 'server'); api.add_files('collection.js', ['client', 'server']); diff --git a/packages/mongo-livedata/mongo_pollster.js b/packages/mongo-livedata/polling.js similarity index 92% rename from packages/mongo-livedata/mongo_pollster.js rename to packages/mongo-livedata/polling.js index 6c81e1188d..938798e519 100644 --- a/packages/mongo-livedata/mongo_pollster.js +++ b/packages/mongo-livedata/polling.js @@ -1,11 +1,10 @@ -MongoPollster = function (cursorDescription, mongoHandle, multiplexer, - ordered, testOnlyPollCallback) { +PollingObserveDriver = function (options) { var self = this; - self._cursorDescription = cursorDescription; - self._mongoHandle = mongoHandle; - self._ordered = ordered; - self._multiplexer = multiplexer; + self._cursorDescription = options.cursorDescription; + self._mongoHandle = options.mongoHandle; + self._ordered = options.ordered; + self._multiplexer = options.multiplexer; self._stopCallbacks = []; self._stopped = false; @@ -26,8 +25,8 @@ MongoPollster = function (cursorDescription, mongoHandle, multiplexer, self._pollsScheduledButNotStarted = 0; self._pendingWrites = []; // people to notify when polling completes - // Make sure to create a separately throttled function for each MongoPollster - // object. + // Make sure to create a separately throttled function for each + // PollingObserveDriver object. self._ensurePollIsScheduled = _.throttle( self._unthrottledEnsurePollIsScheduled, 50 /* ms */); @@ -35,7 +34,7 @@ MongoPollster = function (cursorDescription, mongoHandle, multiplexer, self._taskQueue = new Meteor._SynchronousQueue(); var listenersHandle = listenAll( - cursorDescription, function (notification, complete) { + self._cursorDescription, function (notification, complete) { // When someone does a transaction that might affect us, schedule a poll // of the database. If that transaction happens inside of a write fence, // block the fence until we've polled and notified observers. @@ -59,8 +58,8 @@ MongoPollster = function (cursorDescription, mongoHandle, multiplexer, // For testing, there's an undocumented callback argument to observeChanges // which disables time-based polling and gets called at the beginning of each // poll. - if (testOnlyPollCallback) { - self._testOnlyPollCallback = testOnlyPollCallback; + if (options._testOnlyPollCallback) { + self._testOnlyPollCallback = options._testOnlyPollCallback; } else { var intervalHandle = Meteor.setInterval( _.bind(self._ensurePollIsScheduled, self), 10 * 1000); @@ -76,7 +75,7 @@ MongoPollster = function (cursorDescription, mongoHandle, multiplexer, "mongo-livedata", "mongo-pollsters", 1); }; -_.extend(MongoPollster.prototype, { +_.extend(PollingObserveDriver.prototype, { // This is always called through _.throttle (except once at startup). _unthrottledEnsurePollIsScheduled: function () { var self = this; @@ -131,7 +130,7 @@ _.extend(MongoPollster.prototype, { if (!self._results) { first = true; // XXX maybe use _IdMap/OrderedDict instead? - self._results = self.ordered ? [] : {}; + self._results = self._ordered ? [] : {}; } self._testOnlyPollCallback && self._testOnlyPollCallback();