Make sure to not clear queue completely

Expand on some comments
This commit is contained in:
David Glasser
2015-02-09 21:40:18 -08:00
parent 56f21fc82e
commit cf363d8530

View File

@@ -42,7 +42,6 @@ OplogHandle = function (oplogUrl, dbName) {
self._crossbar = new DDPServer._Crossbar({
factPackage: "mongo-livedata", factName: "oplog-watchers"
});
self._lastProcessedTS = null;
self._baseOplogSelector = {
ns: new RegExp('^' + quotemeta(self._dbName) + '\\.'),
$or: [
@@ -50,9 +49,27 @@ OplogHandle = function (oplogUrl, dbName) {
// If it is not db.collection.drop(), ignore it
{ op: 'c', 'o.drop': { $exists: true } }]
};
// XXX doc
// Data structures to support waitUntilCaughtUp(). Each oplog entry has a
// MongoTimestamp object on it (which is not the same as a Date --- it's a
// combination of time and an incrementing counter; see
// http://docs.mongodb.org/manual/reference/bson-types/#timestamps).
//
// _catchingUpFutures is an array of {ts: MongoTimestamp, future: Future}
// objects, sorted by ascending timestamp. _lastProcessedTS is the
// MongoTimestamp of the last oplog entry we've processed.
//
// Each time we call waitUntilCaughtUp, we take a peek at the final oplog
// entry in the db. If we've already processed it (ie, it is not greater than
// _lastProcessedTS), waitUntilCaughtUp immediately returns. Otherwise,
// waitUntilCaughtUp makes a new Future and inserts it along with the final
// timestamp entry that it read, into _catchingUpFutures. waitUntilCaughtUp
// then waits on that future, which is resolved once _lastProcessedTS is
// incremented to be past its timestamp by the worker fiber.
//
// XXX use a priority queue or something else that's faster than an array
self._catchingUpFutures = [];
self._lastProcessedTS = null;
self._nextOnSkippedEntriesCallbackId = 1;
self._onSkippedEntriesCallbacks = {};
@@ -249,7 +266,15 @@ _.extend(OplogHandle.prototype, {
// Are we too far behind? Just tell our observers that they need to
// repoll, and drop our queue.
if (self._entryQueue.length > TOO_FAR_BEHIND) {
// Instead of dropping all the way down to zero, we leave the final
// one on it. This ensures that after calling the skipped entries
// callbacks, we'll still process at least one more oplog entry
// immediately and thus examine any relevant items in
// _catchingUpFutures.
var last = self._entryQueue.pop();
self._entryQueue.clear();
self._entryQueue.push(last);
_.each(self._onSkippedEntriesCallbacks, function (cb, id) {
// Call the onSkippedEntries callbacks, but double-check that they
// weren't *just* stopped before calling.
@@ -257,6 +282,7 @@ _.extend(OplogHandle.prototype, {
cb();
}
});
continue;
}