diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index 851e7c05be..16bc502549 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -305,61 +305,58 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, // currently visible. // XXX become convinced that this is actually safe even if oplogConnection // is some kind of pool - callWhenProcessedLatest: function (callback) { + waitUntilProcessedLatest: function () { if (stopped) - throw new Error("Called callWhenProcessedLatest on stopped handle!"); + throw new Error("Called waitUntilProcessedLatest on stopped handle!"); // Calling onOplogEntry requries us to wait for the oplog connection to be // ready. readyFuture.wait(); - var coll = oplogLastEntryConnection._getCollection(OPLOG_COLLECTION); // We need to make the selector at least as restrictive as the actual // tailing selector (ie, we need to specify the DB name) or else we // might find a TS that won't show up in the actual tail stream. - coll.findOne(baseOplogSelector(), {fields: {ts: 1}, sort: {$natural: -1}}, function (err, lastEntry) { - if (err) { - console.log("OH NO ERROR", err) - // call callback anyway, I guess - callback(); - return; - } + // + // We don't want to block here: the whole point is to call callback + // asynchronously! + var lastEntry = oplogLastEntryConnection.findOne( + OPLOG_COLLECTION, baseOplogSelector(), + {fields: {ts: 1}, sort: {$natural: -1}}); - if (!lastEntry) { - // Really, nothing in the oplog? Well, we've processed everything. - callback(); - return; - } - var ts = lastEntry.ts; - if (!ts) - throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry)); + if (!lastEntry) { + // Really, nothing in the oplog? Well, we've processed everything. + return; + } - if (lastProcessedTS && ts.lessThanOrEqual(lastProcessedTS)) { - // We've already caught up to here. - callback(); - return; - } + var ts = lastEntry.ts; + if (!ts) + throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry)); - var insertAfter = pendingSequencers.length; - while (insertAfter - 1 > 0 - && pendingSequencers[insertAfter - 1].ts.greaterThan(ts)) { - insertAfter--; - } + if (lastProcessedTS && ts.lessThanOrEqual(lastProcessedTS)) { + // We've already caught up to here. + return; + } - // XXX this can occur if we fail over from one primary to another. so - // this check needs to be removed before we merge oplog. that said, it - // has been helpful so far at proving that we are properly using - // poolSize 1. Also, we could keep something like it if we could - // actually detect failover; see - // https://github.com/mongodb/node-mongodb-native/issues/1120 - if (insertAfter !== pendingSequencers.length) { - throw Error("found misordered oplog: " - + showTS(_.last(pendingSequencers).ts) + " vs " - + showTS(ts)); - } + var insertAfter = pendingSequencers.length; + while (insertAfter - 1 > 0 + && pendingSequencers[insertAfter - 1].ts.greaterThan(ts)) { + insertAfter--; + } - pendingSequencers.splice(insertAfter, 0, {ts: ts, callback: callback}); - }); + // XXX this can occur if we fail over from one primary to another. so + // this check needs to be removed before we merge oplog. that said, it + // has been helpful so far at proving that we are properly using + // poolSize 1. Also, we could keep something like it if we could + // actually detect failover; see + // https://github.com/mongodb/node-mongodb-native/issues/1120 + if (insertAfter !== pendingSequencers.length) { + throw Error("found misordered oplog: " + + showTS(_.last(pendingSequencers).ts) + " vs " + + showTS(ts)); + } + var f = new Future; + pendingSequencers.splice(insertAfter, 0, {ts: ts, future: f}); + f.wait(); } }; @@ -420,7 +417,7 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, while (!_.isEmpty(pendingSequencers) && pendingSequencers[0].ts.lessThanOrEqual(lastProcessedTS)) { var sequencer = pendingSequencers.shift(); - sequencer.callback(); + sequencer.future.return(); } }); readyFuture.return(); diff --git a/packages/mongo-livedata/oplog.js b/packages/mongo-livedata/oplog.js index 01b34e520c..5796600c66 100644 --- a/packages/mongo-livedata/oplog.js +++ b/packages/mongo-livedata/oplog.js @@ -206,13 +206,12 @@ MongoConnection.prototype._observeChangesWithOplog = function ( var write = fence.beginWrite(); // This write cannot complete until we've caught up to "this point" in the // oplog, and then made it back to the steady state. - self._oplogHandle.callWhenProcessedLatest(function () { - if (stopped || phase === PHASE.STEADY) - write.committed(); - else - writesToCommitWhenWeReachSteady.push(write); - }); - complete(); + Meteor.defer(complete); + self._oplogHandle.waitUntilProcessedLatest(); + if (stopped || phase === PHASE.STEADY) + write.committed(); + else + writesToCommitWhenWeReachSteady.push(write); } ); @@ -221,9 +220,7 @@ MongoConnection.prototype._observeChangesWithOplog = function ( add(initialDoc); }); - var catchUpFuture = new Future; - self._oplogHandle.callWhenProcessedLatest(catchUpFuture.resolver()); - catchUpFuture.wait(); + self._oplogHandle.waitUntilProcessedLatest(); if (phase !== PHASE.INITIALIZING) throw Error("Phase unexpectedly " + phase);