diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index c153c96a51..ec6f508db4 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -424,7 +424,6 @@ _.extend(OplogObserveDriver.prototype, { var thisGeneration = ++self._fetchGeneration; self._needToFetch = new LocalCollection._IdMap; var waiting = 0; - var anyError = null; var fut = new Future; // This loop is safe, because _currentlyFetching will not be updated // during this loop (in fact, it is never mutated). @@ -435,8 +434,11 @@ _.extend(OplogObserveDriver.prototype, { finishIfNeedToPollQuery(function (err, doc) { try { if (err) { - if (!anyError) - anyError = err; + Meteor._debug("Got exception while fetching documents: " + + err); + if (self._phase !== PHASE.QUERYING) { + self._needToPollQuery(); + } } else if (!self._stopped && self._phase === PHASE.FETCHING && self._fetchGeneration === thisGeneration) { // We re-check the generation in case we've had an explicit @@ -456,9 +458,6 @@ _.extend(OplogObserveDriver.prototype, { })); }); fut.wait(); - // XXX do this even if we've switched to PHASE.QUERYING? - if (anyError) - throw anyError; // Exit now if we've had a _pollQuery call (here or in another fiber). if (self._phase === PHASE.QUERYING) return; @@ -599,22 +598,35 @@ _.extend(OplogObserveDriver.prototype, { _runQuery: function () { var self = this; - var newResults = new LocalCollection._IdMap; - var newBuffer = new LocalCollection._IdMap; + var newResults, newBuffer; - // Query 2x documents as the half excluded from the original query will go - // into unpublished buffer to reduce additional Mongo lookups in cases when - // documents are removed from the published set and need a replacement. - // XXX needs more thought on non-zero skip - // XXX 2 is a "magic number" meaning there is an extra chunk of docs for - // buffer if such is needed. - var cursor = self._cursorForQuery({ limit: self._limit * 2 }); - cursor.forEach(function (doc, i) { - if (!self._limit || i < self._limit) - newResults.set(doc._id, doc); - else - newBuffer.set(doc._id, doc); - }); + while (true) { + newResults = new LocalCollection._IdMap; + newBuffer = new LocalCollection._IdMap; + + // Query 2x documents as the half excluded from the original query will go + // into unpublished buffer to reduce additional Mongo lookups in cases + // when documents are removed from the published set and need a + // replacement. + // XXX needs more thought on non-zero skip + // XXX 2 is a "magic number" meaning there is an extra chunk of docs for + // buffer if such is needed. + var cursor = self._cursorForQuery({ limit: self._limit * 2 }); + try { + cursor.forEach(function (doc, i) { + if (!self._limit || i < self._limit) + newResults.set(doc._id, doc); + else + newBuffer.set(doc._id, doc); + }); + break; + } catch (e) { + // During failover (eg) if we get an exception we should log and retry + // instead of crashing. + Meteor._debug("Got exception while polling query: " + e); + Meteor._sleepForMs(100); + } + } self._publishNewResults(newResults, newBuffer); }, diff --git a/packages/mongo-livedata/oplog_tailing.js b/packages/mongo-livedata/oplog_tailing.js index b203bfa412..b8dab965e7 100644 --- a/packages/mongo-livedata/oplog_tailing.js +++ b/packages/mongo-livedata/oplog_tailing.js @@ -101,12 +101,22 @@ _.extend(OplogHandle.prototype, { // be ready. self._readyFuture.wait(); - // We need to make the selector at least as restrictive as the actual - // tailing selector (ie, we need to specify the DB name) or else we might - // find a TS that won't show up in the actual tail stream. - var lastEntry = self._oplogLastEntryConnection.findOne( - OPLOG_COLLECTION, self._baseOplogSelector, - {fields: {ts: 1}, sort: {$natural: -1}}); + while (true) { + // We need to make the selector at least as restrictive as the actual + // tailing selector (ie, we need to specify the DB name) or else we might + // find a TS that won't show up in the actual tail stream. + try { + var lastEntry = self._oplogLastEntryConnection.findOne( + OPLOG_COLLECTION, self._baseOplogSelector, + {fields: {ts: 1}, sort: {$natural: -1}}); + break; + } catch (e) { + // During failover (eg) if we get an exception we should log and retry + // instead of crashing. + Meteor._debug("Got exception while reading last entry: " + e); + Meteor._sleepForMs(100); + } + } if (!lastEntry) { // Really, nothing in the oplog? Well, we've processed everything.