From 16cc4edc07cae007b4240500a90cc87c5255179e Mon Sep 17 00:00:00 2001 From: David Glasser Date: Tue, 26 Nov 2013 17:05:43 -0800 Subject: [PATCH] make OplogTailer a real class --- packages/mongo-livedata/mongo_driver.js | 14 +- packages/mongo-livedata/mongo_pollster.js | 4 +- packages/mongo-livedata/oplog.js | 580 +++++++++++----------- 3 files changed, 312 insertions(+), 286 deletions(-) diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index e4010e9740..e40d4cc3e2 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -842,8 +842,14 @@ MongoConnection.prototype._dropIndex = function (collectionName, index) { // ObserveMultiplexer allows multiple identical ObserveHandles to be driven by a // single low-level observe process such as a MongoPollster. // -// A MongoPollster caches the results of a query and reruns it when necessary. -// It is hooked up to an ObserveMultiplexer. +// There are two "observe implementations" which drive ObserveMultiplexers: +// - MongoPollster caches the results of a query and reruns it when +// necessary. +// - OplogTailer follows the Mongo operation log to directly observe +// database changes. +// Both implementations follow the same simple interface: when you create them, +// they start sending observeChanges callbacks (and a ready() invocation) to +// their ObserveMultiplexer, and you stop them by calling their stop() method. var CursorDescription = function (collectionName, selector, options) { var self = this; @@ -1168,15 +1174,15 @@ MongoConnection.prototype._observeChanges = function ( if (self._oplogHandle && !ordered && !callbacks._testOnlyPollCallback && cursorSupportedByOplogTailing(cursorDescription)) { // Can yield! - observeImplementation = observeChangesWithOplog( + observeImplementation = new OplogTailer( cursorDescription, self, multiplexer); } else { // Start polling. observeImplementation = new MongoPollster( cursorDescription, self, - ordered, multiplexer, + ordered, callbacks._testOnlyPollCallback); } diff --git a/packages/mongo-livedata/mongo_pollster.js b/packages/mongo-livedata/mongo_pollster.js index 8152678b20..6c81e1188d 100644 --- a/packages/mongo-livedata/mongo_pollster.js +++ b/packages/mongo-livedata/mongo_pollster.js @@ -1,5 +1,5 @@ -MongoPollster = function (cursorDescription, mongoHandle, ordered, - multiplexer, testOnlyPollCallback) { +MongoPollster = function (cursorDescription, mongoHandle, multiplexer, + ordered, testOnlyPollCallback) { var self = this; self._cursorDescription = cursorDescription; diff --git a/packages/mongo-livedata/oplog.js b/packages/mongo-livedata/oplog.js index 9d967d104b..a7016581dd 100644 --- a/packages/mongo-livedata/oplog.js +++ b/packages/mongo-livedata/oplog.js @@ -7,6 +7,306 @@ var PHASE = { STEADY: 3 }; +// OplogTailer is an alternative to MongoPollster which follows the Mongo +// operation log instead of just re-polling the query. It obeys the same simple +// interface: constructing it starts sending observeChanges callbacks (and a +// ready() invocation) to the ObserveMultiplexer, and you stop it by calling +// the stop() method. +OplogTailer = function (cursorDescription, mongoHandle, multiplexer) { + var self = this; + + self._cursorDescription = cursorDescription; + self._mongoHandle = mongoHandle; + self._multiplexer = multiplexer; + + self._stopped = false; + self._stopHandles = []; + + Package.facts && Package.facts.Facts.incrementServerFact( + "mongo-livedata", "oplog-observers", 1); + + self._phase = PHASE.INITIALIZING; + + self._published = new LocalCollection._IdMap; + var selector = cursorDescription.selector; + self._selectorFn = LocalCollection._compileSelector(selector); + var projection = cursorDescription.options.fields || {}; + self._projectionFn = LocalCollection._compileProjection(projection); + // Projection function, result of combining important fields for selector and + // existing fields projection + var sharedProjection = LocalCollection._combineSelectorAndProjection( + selector, projection); + self._sharedProjectionFn = LocalCollection._compileProjection( + sharedProjection); + + self._needToFetch = new LocalCollection._IdMap; + self._currentlyFetching = new LocalCollection._IdMap; + + self._writesToCommitWhenWeReachSteady = []; + + forEachTrigger(cursorDescription, function (trigger) { + self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry( + trigger, function (notification) { + var op = notification.op; + if (op.op === 'c') { + // XXX actually, drop collection needs to be handled by doing a + // re-query + self._published.forEach(function (fields, id) { + self._remove(id); + }); + } else { + // All other operators should be handled depending on phase + if (self._phase === PHASE.INITIALIZING) + self._handleOplogEntryInitializing(op); + else + self._handleOplogEntrySteadyOrFetching(op); + } + } + )); + }); + + // XXX ordering w.r.t. everything else? + self._stopHandles.push(listenAll( + cursorDescription, function (notification, complete) { + // If we're not in a write fence, we don't have to do anything. + var fence = DDPServer._CurrentWriteFence.get(); + if (!fence) { + complete(); + return; + } + var write = fence.beginWrite(); + // This write cannot complete until we've caught up to "this point" in the + // oplog, and then made it back to the steady state. + Meteor.defer(complete); + self._mongoHandle._oplogHandle.waitUntilCaughtUp(); + if (self._stopped) { + // We're stopped, so just immediately commit. + write.committed(); + } else if (self._phase === PHASE.STEADY) { + // Make sure that all of the callbacks have made it through the + // multiplexer and been delivered to ObserveHandles before committing + // writes. + self._multiplexer.onFlush(function () { + write.committed(); + }); + } else { + self._writesToCommitWhenWeReachSteady.push(write); + } + } + )); + + // Give _observeChanges a chance to add the new ObserveHandle to our + // multiplexer, so that the added calls get streamed. + Meteor.defer(function () { + self._runInitialQuery(); + }); +}; + +_.extend(OplogTailer.prototype, { + _add: function (doc) { + var self = this; + var id = doc._id; + var fields = _.clone(doc); + delete fields._id; + if (self._published.has(id)) + throw Error("tried to add something already published " + id); + self._published.set(id, self._sharedProjectionFn(fields)); + self._multiplexer.added(id, self._projectionFn(fields)); + }, + _remove: function (id) { + var self = this; + if (!self._published.has(id)) + throw Error("tried to remove something unpublished " + id); + self._published.remove(id); + self._multiplexer.removed(id); + }, + _handleDoc: function (id, newDoc) { + var self = this; + newDoc = _.clone(newDoc); + var matchesNow = newDoc && self._selectorFn(newDoc); + var matchedBefore = self._published.has(id); + if (matchesNow && !matchedBefore) { + self._add(newDoc); + } else if (matchedBefore && !matchesNow) { + self._remove(id); + } else if (matchesNow) { + var oldDoc = self._published.get(id); + if (!oldDoc) + throw Error("thought that " + id + " was there!"); + delete newDoc._id; + self._published.set(id, self._sharedProjectionFn(newDoc)); + var changed = LocalCollection._makeChangedFields(_.clone(newDoc), oldDoc); + changed = self._projectionFn(changed); + if (!_.isEmpty(changed)) + self._multiplexer.changed(id, changed); + } + }, + _fetchModifiedDocuments: function () { + var self = this; + self._phase = PHASE.FETCHING; + while (!self._stopped && !self._needToFetch.empty()) { + if (self._phase !== PHASE.FETCHING) + throw new Error("phase in fetchModifiedDocuments: " + self._phase); + + self._currentlyFetching = self._needToFetch; + self._needToFetch = new LocalCollection._IdMap; + var waiting = 0; + var error = null; + var fut = new Future; + Fiber(function () { + self._currentlyFetching.forEach(function (cacheKey, id) { + // currentlyFetching will not be updated during this loop. + waiting++; + self._mongoHandle._docFetcher.fetch( + self._cursorDescription.collectionName, id, cacheKey, + function (err, doc) { + if (err) { + if (!error) + error = err; + } else if (!self._stopped) { + self._handleDoc(id, doc); + } + waiting--; + if (waiting == 0) + fut.return(); + }); + }); + }).run(); + fut.wait(); + if (error) + throw error; + self._currentlyFetching = new LocalCollection._IdMap; + } + self._beSteady(); + }, + _beSteady: function () { + var self = this; + self._phase = PHASE.STEADY; + var writes = self._writesToCommitWhenWeReachSteady; + self._writesToCommitWhenWeReachSteady = []; + self._multiplexer.onFlush(function () { + _.each(writes, function (w) { + w.committed(); + }); + }); + }, + _handleOplogEntryInitializing: function (op) { + var self = this; + self._needToFetch.set(idForOp(op), op.ts.toString()); + }, + _handleOplogEntrySteadyOrFetching: function (op) { + var self = this; + var id = idForOp(op); + // If we're already fetching this one, or about to, we can't optimize; make + // sure that we fetch it again if necessary. + if (self._currentlyFetching.has(id) || self._needToFetch.has(id)) { + if (self._phase !== PHASE.FETCHING) + throw Error("map not empty during steady phase"); + self._needToFetch.set(id, op.ts.toString()); + return; + } + + if (op.op === 'd') { + if (self._published.has(id)) + self._remove(id); + } else if (op.op === 'i') { + if (self._published.has(id)) + throw new Error("insert found for already-existing ID"); + + // XXX what if selector yields? for now it can't but later it could have + // $where + if (self._selectorFn(op.o)) + self._add(op.o); + } else if (op.op === 'u') { + // Is this a modifier ($set/$unset, which may require us to poll the + // database to figure out if the whole document matches the selector) or a + // replacement (in which case we can just directly re-evaluate the + // selector)? + var isReplace = !_.has(op.o, '$set') && !_.has(op.o, '$unset'); + + if (isReplace) { + self._handleDoc(id, _.extend({_id: id}, op.o)); + } else if (self._published.has(id)) { + // Oh great, we actually know what the document is, so we can apply + // this directly. + var newDoc = EJSON.clone(self._published.get(id)); + newDoc._id = id; + LocalCollection._modify(newDoc, op.o); + self._handleDoc(id, self._sharedProjectionFn(newDoc)); + } else if (LocalCollection._canSelectorBecomeTrueByModifier( + self._cursorDescription.selector, op.o)) { + self._needToFetch.set(id, op.ts.toString()); + if (self._phase === PHASE.STEADY) + self._fetchModifiedDocuments(); + } + } else { + throw Error("XXX SURPRISING OPERATION: " + op); + } + }, + _runInitialQuery: function () { + var self = this; + if (self._stopped) + throw new Error("oplog stopped surprisingly early"); + + var initialCursor = new Cursor(self._mongoHandle, self._cursorDescription); + initialCursor.forEach(function (initialDoc) { + self._add(initialDoc); + }); + if (self._stopped) + throw new Error("oplog stopped quite early"); + // Allow observeChanges calls to return. (After this, it's possible for + // stop() to be called.) + self._multiplexer.ready(); + + if (self._stopped) + return; + self._mongoHandle._oplogHandle.waitUntilCaughtUp(); + + if (self._stopped) + return; + if (self._phase !== PHASE.INITIALIZING) + throw Error("Phase unexpectedly " + self._phase); + + if (self._needToFetch.empty()) { + self._beSteady(); + } else { + self._fetchModifiedDocuments(); + } + }, + // This stop function is invoked from the onStop of the ObserveMultiplexer, so + // it shouldn't actually be possible to call it until the multiplexer is + // ready. + stop: function () { + var self = this; + if (self._stopped) + return; + self._stopped = true; + _.each(self._stopHandles, function (handle) { + handle.stop(); + }); + + // Note: we *don't* use multiplexer.onFlush here because this stop + // callback is actually invoked by the multiplexer itself when it has + // determined that there are no handles left. So nothing is actually going + // to get flushed (and it's probably not valid to call methods on the + // dying multiplexer). + _.each(self._writesToCommitWhenWeReachSteady, function (w) { + w.committed(); + }); + self._writesToCommitWhenWeReachSteady = null; + + // Proactively drop references to potentially big things. + self._published = null; + self._needToFetch = null; + self._currentlyFetching = null; + self._oplogEntryHandle = null; + self._listenersHandle = null; + + Package.facts && Package.facts.Facts.incrementServerFact( + "mongo-livedata", "oplog-observers", -1); + } +}); + idForOp = function (op) { if (op.op === 'd') return op.o._id; @@ -20,283 +320,3 @@ idForOp = function (op) { else throw Error("Unknown op: " + EJSON.stringify(op)); }; - -observeChangesWithOplog = function (cursorDescription, - mongoHandle, - multiplexer) { - var stopped = false; - var stopHandles = []; - - Package.facts && Package.facts.Facts.incrementServerFact( - "mongo-livedata", "oplog-observers", 1); - - var phase = PHASE.INITIALIZING; - - var published = new LocalCollection._IdMap; - var selector = cursorDescription.selector; - var selectorFn = LocalCollection._compileSelector(selector); - var projection = cursorDescription.options.fields || {}; - var projectionFn = LocalCollection._compileProjection(projection); - // Projection function, result of combining important fields for selector and - // existing fields projection - var sharedProjection = LocalCollection._combineSelectorAndProjection(selector, projection); - var sharedProjectionFn = LocalCollection._compileProjection(sharedProjection); - - var needToFetch = new LocalCollection._IdMap; - var currentlyFetching = new LocalCollection._IdMap; - - var add = function (doc) { - var id = doc._id; - var fields = _.clone(doc); - delete fields._id; - if (published.has(id)) - throw Error("tried to add something already published " + id); - published.set(id, sharedProjectionFn(fields)); - multiplexer.added(id, projectionFn(fields)); - }; - - var remove = function (id) { - if (!published.has(id)) - throw Error("tried to remove something unpublished " + id); - published.remove(id); - multiplexer.removed(id); - }; - - var handleDoc = function (id, newDoc) { - newDoc = _.clone(newDoc); - var matchesNow = newDoc && selectorFn(newDoc); - var matchedBefore = published.has(id); - if (matchesNow && !matchedBefore) { - add(newDoc); - } else if (matchedBefore && !matchesNow) { - remove(id); - } else if (matchesNow) { - var oldDoc = published.get(id); - if (!oldDoc) - throw Error("thought that " + id + " was there!"); - delete newDoc._id; - published.set(id, sharedProjectionFn(newDoc)); - var changed = LocalCollection._makeChangedFields( - _.clone(newDoc), oldDoc); - changed = projectionFn(changed); - if (!_.isEmpty(changed)) - multiplexer.changed(id, changed); - } - }; - - var fetchModifiedDocuments = function () { - phase = PHASE.FETCHING; - while (!stopped && !needToFetch.empty()) { - if (phase !== PHASE.FETCHING) - throw new Error("Surprising phase in fetchModifiedDocuments: " + phase); - - currentlyFetching = needToFetch; - needToFetch = new LocalCollection._IdMap; - var waiting = 0; - var error = null; - var fut = new Future; - Fiber(function () { - currentlyFetching.forEach(function (cacheKey, id) { - // currentlyFetching will not be updated during this loop. - waiting++; - mongoHandle._docFetcher.fetch(cursorDescription.collectionName, id, cacheKey, function (err, doc) { - if (err) { - if (!error) - error = err; - } else if (!stopped) { - handleDoc(id, doc); - } - waiting--; - if (waiting == 0) - fut.return(); - }); - }); - }).run(); - fut.wait(); - if (error) - throw error; - currentlyFetching = new LocalCollection._IdMap; - } - beSteady(); - }; - - var writesToCommitWhenWeReachSteady = []; - var beSteady = function () { - phase = PHASE.STEADY; - var writes = writesToCommitWhenWeReachSteady; - writesToCommitWhenWeReachSteady = []; - multiplexer.onFlush(function () { - _.each(writes, function (w) { - w.committed(); - }); - }); - }; - - var oplogEntryHandlers = {}; - oplogEntryHandlers[PHASE.INITIALIZING] = function (op) { - needToFetch.set(idForOp(op), op.ts.toString()); - }; - // We can use the same handler for STEADY and FETCHING; the main difference is - // that FETCHING has non-empty currentlyFetching and/or needToFetch. - oplogEntryHandlers[PHASE.STEADY] = function (op) { - var id = idForOp(op); - // If we're already fetching this one, or about to, we can't optimize; make - // sure that we fetch it again if necessary. - if (currentlyFetching.has(id) || needToFetch.has(id)) { - if (phase !== PHASE.FETCHING) - throw Error("map not empty during steady phase"); - needToFetch.set(id, op.ts.toString()); - return; - } - - if (op.op === 'd') { - if (published.has(id)) - remove(id); - } else if (op.op === 'i') { - if (published.has(id)) - throw new Error("insert found for already-existing ID"); - - // XXX what if selector yields? for now it can't but later it could have - // $where - if (selectorFn(op.o)) - add(op.o); - } else if (op.op === 'u') { - // Is this a modifier ($set/$unset, which may require us to poll the - // database to figure out if the whole document matches the selector) or a - // replacement (in which case we can just directly re-evaluate the - // selector)? - var isReplace = !_.has(op.o, '$set') && !_.has(op.o, '$unset'); - - if (isReplace) { - handleDoc(id, _.extend({_id: id}, op.o)); - } else if (published.has(id)) { - // Oh great, we actually know what the document is, so we can apply - // this directly. - var newDoc = EJSON.clone(published.get(id)); - newDoc._id = id; - LocalCollection._modify(newDoc, op.o); - handleDoc(id, sharedProjectionFn(newDoc)); - } else if (LocalCollection._canSelectorBecomeTrueByModifier( - cursorDescription.selector, op.o)) { - needToFetch.set(id, op.ts.toString()); - if (phase === PHASE.STEADY) - fetchModifiedDocuments(); - return; - } - } else { - throw Error("XXX SURPRISING OPERATION: " + op); - } - }; - oplogEntryHandlers[PHASE.FETCHING] = oplogEntryHandlers[PHASE.STEADY]; - - forEachTrigger(cursorDescription, function (trigger) { - stopHandles.push(mongoHandle._oplogHandle.onOplogEntry( - trigger, function (notification) { - var op = notification.op; - if (op.op === 'c') { - // XXX actually, drop collection needs to be handled by doing a - // re-query - published.forEach(function (fields, id) { - remove(id); - }); - } else { - // All other operators should be handled depending on phase - oplogEntryHandlers[phase](op); - } - } - )); - }); - - // XXX ordering w.r.t. everything else? - stopHandles.push(listenAll( - cursorDescription, function (notification, complete) { - // If we're not in a write fence, we don't have to do anything. - var fence = DDPServer._CurrentWriteFence.get(); - if (!fence) { - complete(); - return; - } - var write = fence.beginWrite(); - // This write cannot complete until we've caught up to "this point" in the - // oplog, and then made it back to the steady state. - Meteor.defer(complete); - mongoHandle._oplogHandle.waitUntilCaughtUp(); - // Make sure that all of the callbacks have made it through the - // multiplexer and been delivered to ObserveHandles before committing - // writes. - if (stopped || phase === PHASE.STEADY) { - multiplexer.onFlush(function () { - write.committed(); - }); - } else { - writesToCommitWhenWeReachSteady.push(write); - } - } - )); - - // Give _observeChanges a chance to add the new ObserveHandle to our - // multiplexer, so that the added calls get streamed. - Meteor.defer(function () { - if (stopped) - throw new Error("oplog stopped surprisingly early"); - - var initialCursor = new Cursor(mongoHandle, cursorDescription); - initialCursor.forEach(function (initialDoc) { - add(initialDoc); - }); - if (stopped) - throw new Error("oplog stopped quite early"); - // Allow observeChanges calls to return. - multiplexer.ready(); - - if (stopped) - return; - mongoHandle._oplogHandle.waitUntilCaughtUp(); - - if (stopped) - return; - if (phase !== PHASE.INITIALIZING) - throw Error("Phase unexpectedly " + phase); - - if (needToFetch.empty()) { - beSteady(); - } else { - fetchModifiedDocuments(); - } - }); - - return { - // This stop function is invoked from the onStop of the ObserveMultiplexer, - // so it shouldn't actually be possible to call it until the multiplexer is - // ready. - stop: function () { - if (stopped) - return; - stopped = true; - _.each(stopHandles, function (handle) { - handle.stop(); - }); - - published = null; - selector = null; - needToFetch = null; - currentlyFetching = null; - - // Note: we *don't* use multiplexer.onFlush here because this stop - // callback is actually invoked by the multiplexer itself when it has - // determined that there are no handles left. So nothing is actually going - // to get flushed (and it's probably not valid to call methods on the - // dying multiplexer). - _.each(writesToCommitWhenWeReachSteady, function (w) { - w.committed(); - }); - writesToCommitWhenWeReachSteady = null; - - oplogEntryHandle = null; - listenersHandle = null; - - Package.facts && Package.facts.Facts.incrementServerFact( - "mongo-livedata", "oplog-observers", -1); - } - }; -};