Ensure that oplog observe driver retries on error

This commit is contained in:
David Glasser
2014-04-07 22:24:23 -07:00
parent 1d3b7f9c69
commit be4d306288
2 changed files with 49 additions and 27 deletions

View File

@@ -424,7 +424,6 @@ _.extend(OplogObserveDriver.prototype, {
var thisGeneration = ++self._fetchGeneration; var thisGeneration = ++self._fetchGeneration;
self._needToFetch = new LocalCollection._IdMap; self._needToFetch = new LocalCollection._IdMap;
var waiting = 0; var waiting = 0;
var anyError = null;
var fut = new Future; var fut = new Future;
// This loop is safe, because _currentlyFetching will not be updated // This loop is safe, because _currentlyFetching will not be updated
// during this loop (in fact, it is never mutated). // during this loop (in fact, it is never mutated).
@@ -435,8 +434,11 @@ _.extend(OplogObserveDriver.prototype, {
finishIfNeedToPollQuery(function (err, doc) { finishIfNeedToPollQuery(function (err, doc) {
try { try {
if (err) { if (err) {
if (!anyError) Meteor._debug("Got exception while fetching documents: " +
anyError = err; err);
if (self._phase !== PHASE.QUERYING) {
self._needToPollQuery();
}
} else if (!self._stopped && self._phase === PHASE.FETCHING } else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) { && self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit // We re-check the generation in case we've had an explicit
@@ -456,9 +458,6 @@ _.extend(OplogObserveDriver.prototype, {
})); }));
}); });
fut.wait(); fut.wait();
// XXX do this even if we've switched to PHASE.QUERYING?
if (anyError)
throw anyError;
// Exit now if we've had a _pollQuery call (here or in another fiber). // Exit now if we've had a _pollQuery call (here or in another fiber).
if (self._phase === PHASE.QUERYING) if (self._phase === PHASE.QUERYING)
return; return;
@@ -599,22 +598,35 @@ _.extend(OplogObserveDriver.prototype, {
_runQuery: function () { _runQuery: function () {
var self = this; var self = this;
var newResults = new LocalCollection._IdMap; var newResults, newBuffer;
var newBuffer = new LocalCollection._IdMap;
// Query 2x documents as the half excluded from the original query will go while (true) {
// into unpublished buffer to reduce additional Mongo lookups in cases when newResults = new LocalCollection._IdMap;
// documents are removed from the published set and need a replacement. newBuffer = new LocalCollection._IdMap;
// XXX needs more thought on non-zero skip
// XXX 2 is a "magic number" meaning there is an extra chunk of docs for // Query 2x documents as the half excluded from the original query will go
// buffer if such is needed. // into unpublished buffer to reduce additional Mongo lookups in cases
var cursor = self._cursorForQuery({ limit: self._limit * 2 }); // when documents are removed from the published set and need a
cursor.forEach(function (doc, i) { // replacement.
if (!self._limit || i < self._limit) // XXX needs more thought on non-zero skip
newResults.set(doc._id, doc); // XXX 2 is a "magic number" meaning there is an extra chunk of docs for
else // buffer if such is needed.
newBuffer.set(doc._id, doc); var cursor = self._cursorForQuery({ limit: self._limit * 2 });
}); try {
cursor.forEach(function (doc, i) {
if (!self._limit || i < self._limit)
newResults.set(doc._id, doc);
else
newBuffer.set(doc._id, doc);
});
break;
} catch (e) {
// During failover (eg) if we get an exception we should log and retry
// instead of crashing.
Meteor._debug("Got exception while polling query: " + e);
Meteor._sleepForMs(100);
}
}
self._publishNewResults(newResults, newBuffer); self._publishNewResults(newResults, newBuffer);
}, },

View File

@@ -101,12 +101,22 @@ _.extend(OplogHandle.prototype, {
// be ready. // be ready.
self._readyFuture.wait(); self._readyFuture.wait();
// We need to make the selector at least as restrictive as the actual while (true) {
// tailing selector (ie, we need to specify the DB name) or else we might // We need to make the selector at least as restrictive as the actual
// find a TS that won't show up in the actual tail stream. // tailing selector (ie, we need to specify the DB name) or else we might
var lastEntry = self._oplogLastEntryConnection.findOne( // find a TS that won't show up in the actual tail stream.
OPLOG_COLLECTION, self._baseOplogSelector, try {
{fields: {ts: 1}, sort: {$natural: -1}}); var lastEntry = self._oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, self._baseOplogSelector,
{fields: {ts: 1}, sort: {$natural: -1}});
break;
} catch (e) {
// During failover (eg) if we get an exception we should log and retry
// instead of crashing.
Meteor._debug("Got exception while reading last entry: " + e);
Meteor._sleepForMs(100);
}
}
if (!lastEntry) { if (!lastEntry) {
// Really, nothing in the oplog? Well, we've processed everything. // Really, nothing in the oplog? Well, we've processed everything.