diff --git a/packages/mongo/oplog_tailing.js b/packages/mongo/oplog_tailing.js index a2f6191425..9ddcc0b696 100644 --- a/packages/mongo/oplog_tailing.js +++ b/packages/mongo/oplog_tailing.js @@ -42,7 +42,6 @@ OplogHandle = function (oplogUrl, dbName) { self._crossbar = new DDPServer._Crossbar({ factPackage: "mongo-livedata", factName: "oplog-watchers" }); - self._lastProcessedTS = null; self._baseOplogSelector = { ns: new RegExp('^' + quotemeta(self._dbName) + '\\.'), $or: [ @@ -50,9 +49,27 @@ OplogHandle = function (oplogUrl, dbName) { // If it is not db.collection.drop(), ignore it { op: 'c', 'o.drop': { $exists: true } }] }; - // XXX doc + + // Data structures to support waitUntilCaughtUp(). Each oplog entry has a + // MongoTimestamp object on it (which is not the same as a Date --- it's a + // combination of time and an incrementing counter; see + // http://docs.mongodb.org/manual/reference/bson-types/#timestamps). + // + // _catchingUpFutures is an array of {ts: MongoTimestamp, future: Future} + // objects, sorted by ascending timestamp. _lastProcessedTS is the + // MongoTimestamp of the last oplog entry we've processed. + // + // Each time we call waitUntilCaughtUp, we take a peek at the final oplog + // entry in the db. If we've already processed it (ie, it is not greater than + // _lastProcessedTS), waitUntilCaughtUp immediately returns. Otherwise, + // waitUntilCaughtUp makes a new Future and inserts it along with the final + // timestamp entry that it read, into _catchingUpFutures. waitUntilCaughtUp + // then waits on that future, which is resolved once _lastProcessedTS is + // incremented to be past its timestamp by the worker fiber. + // // XXX use a priority queue or something else that's faster than an array self._catchingUpFutures = []; + self._lastProcessedTS = null; self._nextOnSkippedEntriesCallbackId = 1; self._onSkippedEntriesCallbacks = {}; @@ -249,7 +266,15 @@ _.extend(OplogHandle.prototype, { // Are we too far behind? Just tell our observers that they need to // repoll, and drop our queue. if (self._entryQueue.length > TOO_FAR_BEHIND) { + // Instead of dropping all the way down to zero, we leave the final + // one on it. This ensures that after calling the skipped entries + // callbacks, we'll still process at least one more oplog entry + // immediately and thus examine any relevant items in + // _catchingUpFutures. + var last = self._entryQueue.pop(); self._entryQueue.clear(); + self._entryQueue.push(last); + _.each(self._onSkippedEntriesCallbacks, function (cb, id) { // Call the onSkippedEntries callbacks, but double-check that they // weren't *just* stopped before calling. @@ -257,6 +282,7 @@ _.extend(OplogHandle.prototype, { cb(); } }); + continue; }