rename to {Mongo,Oplog}ObserveDriver

Give a consistent constructor API betweent the two.
This commit is contained in:
David Glasser
2013-11-26 17:16:24 -08:00
parent 16cc4edc07
commit aeac87285e
6 changed files with 52 additions and 55 deletions

View File

@@ -27,9 +27,6 @@ _.extend(DDPServer._Crossbar.prototype, {
//
// XXX It should be legal to call fire() from inside a listen()
// callback?
//
// Note: the MongoPollster constructor assumes that a call to listen() never
// yields.
listen: function (trigger, callback) {
var self = this;
var id = self.nextId++;

View File

@@ -840,12 +840,12 @@ MongoConnection.prototype._dropIndex = function (collectionName, index) {
// reference to an ObserveMultiplexer.
//
// ObserveMultiplexer allows multiple identical ObserveHandles to be driven by a
// single low-level observe process such as a MongoPollster.
// single observe driver.
//
// There are two "observe implementations" which drive ObserveMultiplexers:
// - MongoPollster caches the results of a query and reruns it when
// There are two "observe drivers" which drive ObserveMultiplexers:
// - PollingObserveDriver caches the results of a query and reruns it when
// necessary.
// - OplogTailer follows the Mongo operation log to directly observe
// - OplogObserveDriver follows the Mongo operation log to directly observe
// database changes.
// Both implementations follow the same simple interface: when you create them,
// they start sending observeChanges callbacks (and a ready() invocation) to
@@ -1145,7 +1145,7 @@ MongoConnection.prototype._observeChanges = function (
var observeKey = JSON.stringify(
_.extend({ordered: ordered}, cursorDescription));
var multiplexer, observeImplementation;
var multiplexer, observeDriver;
var firstHandle = false;
// Find a matching ObserveMultiplexer, or create a new one. This next block is
@@ -1155,40 +1155,38 @@ MongoConnection.prototype._observeChanges = function (
if (_.has(self._observeMultiplexers, observeKey)) {
multiplexer = self._observeMultiplexers[observeKey];
} else {
firstHandle = true;
// Create a new ObserveMultiplexer.
multiplexer = new ObserveMultiplexer({
ordered: ordered,
onStop: function () {
observeImplementation.stop();
observeDriver.stop();
delete self._observeMultiplexers[observeKey];
}
});
self._observeMultiplexers[observeKey] = multiplexer;
firstHandle = true;
}
});
var observeHandle = new ObserveHandle(multiplexer, callbacks);
if (firstHandle) {
var driverClass = PollingObserveDriver;
if (self._oplogHandle && !ordered && !callbacks._testOnlyPollCallback
&& cursorSupportedByOplogTailing(cursorDescription)) {
// Can yield!
observeImplementation = new OplogTailer(
cursorDescription, self, multiplexer);
} else {
// Start polling.
observeImplementation = new MongoPollster(
cursorDescription,
self,
multiplexer,
ordered,
callbacks._testOnlyPollCallback);
driverClass = OplogObserveDriver;
}
observeDriver = new driverClass({
cursorDescription: cursorDescription,
mongoHandle: self,
multiplexer: multiplexer,
ordered: ordered,
_testOnlyPollCallback: callbacks._testOnlyPollCallback
});
// This field is only set for the first ObserveHandle in an
// ObserveMultiplexer. It is only there for use by one test.
observeHandle._observeImplementation = observeImplementation;
observeHandle._observeDriver = observeDriver;
}
// Blocks until the initial adds have been sent.

View File

@@ -387,8 +387,8 @@ Tinytest.addAsync("mongo-livedata - fuzz test, " + idGeneration, function(test,
// run.
if (Meteor.isServer) {
// For now, has to be polling (not oplog).
test.isTrue(obs._observeImplementation);
test.isTrue(obs._observeImplementation._suspendPolling);
test.isTrue(obs._observeDriver);
test.isTrue(obs._observeDriver._suspendPolling);
}
var step = 0;
@@ -423,7 +423,7 @@ Tinytest.addAsync("mongo-livedata - fuzz test, " + idGeneration, function(test,
finishObserve(function () {
if (Meteor.isServer)
obs._observeImplementation._suspendPolling();
obs._observeDriver._suspendPolling();
// Do a batch of 1-10 operations
var batch_count = rnd(10) + 1;
@@ -456,7 +456,7 @@ Tinytest.addAsync("mongo-livedata - fuzz test, " + idGeneration, function(test,
}
}
if (Meteor.isServer)
obs._observeImplementation._resumePolling();
obs._observeDriver._resumePolling();
});

View File

@@ -7,17 +7,19 @@ var PHASE = {
STEADY: 3
};
// OplogTailer is an alternative to MongoPollster which follows the Mongo
// operation log instead of just re-polling the query. It obeys the same simple
// interface: constructing it starts sending observeChanges callbacks (and a
// ready() invocation) to the ObserveMultiplexer, and you stop it by calling
// the stop() method.
OplogTailer = function (cursorDescription, mongoHandle, multiplexer) {
// OplogObserveDriver is an alternative to PollingObserveDriver which follows
// the Mongo operation log instead of just re-polling the query. It obeys the
// same simple interface: constructing it starts sending observeChanges
// callbacks (and a ready() invocation) to the ObserveMultiplexer, and you stop
// it by calling the stop() method.
OplogObserveDriver = function (options) {
var self = this;
self._cursorDescription = cursorDescription;
self._mongoHandle = mongoHandle;
self._multiplexer = multiplexer;
self._cursorDescription = options.cursorDescription;
self._mongoHandle = options.mongoHandle;
self._multiplexer = options.multiplexer;
if (options.ordered)
throw Error("OplogObserveDriver only supports unordered observeChanges");
self._stopped = false;
self._stopHandles = [];
@@ -28,9 +30,10 @@ OplogTailer = function (cursorDescription, mongoHandle, multiplexer) {
self._phase = PHASE.INITIALIZING;
self._published = new LocalCollection._IdMap;
var selector = cursorDescription.selector;
self._selectorFn = LocalCollection._compileSelector(selector);
var projection = cursorDescription.options.fields || {};
var selector = self._cursorDescription.selector;
self._selectorFn = LocalCollection._compileSelector(
self._cursorDescription.selector);
var projection = self._cursorDescription.options.fields || {};
self._projectionFn = LocalCollection._compileProjection(projection);
// Projection function, result of combining important fields for selector and
// existing fields projection
@@ -44,7 +47,7 @@ OplogTailer = function (cursorDescription, mongoHandle, multiplexer) {
self._writesToCommitWhenWeReachSteady = [];
forEachTrigger(cursorDescription, function (trigger) {
forEachTrigger(self._cursorDescription, function (trigger) {
self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry(
trigger, function (notification) {
var op = notification.op;
@@ -67,7 +70,7 @@ OplogTailer = function (cursorDescription, mongoHandle, multiplexer) {
// XXX ordering w.r.t. everything else?
self._stopHandles.push(listenAll(
cursorDescription, function (notification, complete) {
self._cursorDescription, function (notification, complete) {
// If we're not in a write fence, we don't have to do anything.
var fence = DDPServer._CurrentWriteFence.get();
if (!fence) {
@@ -102,7 +105,7 @@ OplogTailer = function (cursorDescription, mongoHandle, multiplexer) {
});
};
_.extend(OplogTailer.prototype, {
_.extend(OplogObserveDriver.prototype, {
_add: function (doc) {
var self = this;
var id = doc._id;

View File

@@ -43,7 +43,7 @@ Package.on_use(function (api) {
api.export('MongoTest', 'server', {testOnly: true});
api.add_files(['doc_fetcher.js', 'mongo_driver.js', 'observe_multiplex.js',
'mongo_pollster.js', 'oplog.js'], 'server');
'polling.js', 'oplog.js'], 'server');
api.add_files('local_collection_driver.js', ['client', 'server']);
api.add_files('remote_collection_driver.js', 'server');
api.add_files('collection.js', ['client', 'server']);

View File

@@ -1,11 +1,10 @@
MongoPollster = function (cursorDescription, mongoHandle, multiplexer,
ordered, testOnlyPollCallback) {
PollingObserveDriver = function (options) {
var self = this;
self._cursorDescription = cursorDescription;
self._mongoHandle = mongoHandle;
self._ordered = ordered;
self._multiplexer = multiplexer;
self._cursorDescription = options.cursorDescription;
self._mongoHandle = options.mongoHandle;
self._ordered = options.ordered;
self._multiplexer = options.multiplexer;
self._stopCallbacks = [];
self._stopped = false;
@@ -26,8 +25,8 @@ MongoPollster = function (cursorDescription, mongoHandle, multiplexer,
self._pollsScheduledButNotStarted = 0;
self._pendingWrites = []; // people to notify when polling completes
// Make sure to create a separately throttled function for each MongoPollster
// object.
// Make sure to create a separately throttled function for each
// PollingObserveDriver object.
self._ensurePollIsScheduled = _.throttle(
self._unthrottledEnsurePollIsScheduled, 50 /* ms */);
@@ -35,7 +34,7 @@ MongoPollster = function (cursorDescription, mongoHandle, multiplexer,
self._taskQueue = new Meteor._SynchronousQueue();
var listenersHandle = listenAll(
cursorDescription, function (notification, complete) {
self._cursorDescription, function (notification, complete) {
// When someone does a transaction that might affect us, schedule a poll
// of the database. If that transaction happens inside of a write fence,
// block the fence until we've polled and notified observers.
@@ -59,8 +58,8 @@ MongoPollster = function (cursorDescription, mongoHandle, multiplexer,
// For testing, there's an undocumented callback argument to observeChanges
// which disables time-based polling and gets called at the beginning of each
// poll.
if (testOnlyPollCallback) {
self._testOnlyPollCallback = testOnlyPollCallback;
if (options._testOnlyPollCallback) {
self._testOnlyPollCallback = options._testOnlyPollCallback;
} else {
var intervalHandle = Meteor.setInterval(
_.bind(self._ensurePollIsScheduled, self), 10 * 1000);
@@ -76,7 +75,7 @@ MongoPollster = function (cursorDescription, mongoHandle, multiplexer,
"mongo-livedata", "mongo-pollsters", 1);
};
_.extend(MongoPollster.prototype, {
_.extend(PollingObserveDriver.prototype, {
// This is always called through _.throttle (except once at startup).
_unthrottledEnsurePollIsScheduled: function () {
var self = this;
@@ -131,7 +130,7 @@ _.extend(MongoPollster.prototype, {
if (!self._results) {
first = true;
// XXX maybe use _IdMap/OrderedDict instead?
self._results = self.ordered ? [] : {};
self._results = self._ordered ? [] : {};
}
self._testOnlyPollCallback && self._testOnlyPollCallback();