mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
Ensure that oplog observe driver retries on error
This commit is contained in:
@@ -424,7 +424,6 @@ _.extend(OplogObserveDriver.prototype, {
|
|||||||
var thisGeneration = ++self._fetchGeneration;
|
var thisGeneration = ++self._fetchGeneration;
|
||||||
self._needToFetch = new LocalCollection._IdMap;
|
self._needToFetch = new LocalCollection._IdMap;
|
||||||
var waiting = 0;
|
var waiting = 0;
|
||||||
var anyError = null;
|
|
||||||
var fut = new Future;
|
var fut = new Future;
|
||||||
// This loop is safe, because _currentlyFetching will not be updated
|
// This loop is safe, because _currentlyFetching will not be updated
|
||||||
// during this loop (in fact, it is never mutated).
|
// during this loop (in fact, it is never mutated).
|
||||||
@@ -435,8 +434,11 @@ _.extend(OplogObserveDriver.prototype, {
|
|||||||
finishIfNeedToPollQuery(function (err, doc) {
|
finishIfNeedToPollQuery(function (err, doc) {
|
||||||
try {
|
try {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (!anyError)
|
Meteor._debug("Got exception while fetching documents: " +
|
||||||
anyError = err;
|
err);
|
||||||
|
if (self._phase !== PHASE.QUERYING) {
|
||||||
|
self._needToPollQuery();
|
||||||
|
}
|
||||||
} else if (!self._stopped && self._phase === PHASE.FETCHING
|
} else if (!self._stopped && self._phase === PHASE.FETCHING
|
||||||
&& self._fetchGeneration === thisGeneration) {
|
&& self._fetchGeneration === thisGeneration) {
|
||||||
// We re-check the generation in case we've had an explicit
|
// We re-check the generation in case we've had an explicit
|
||||||
@@ -456,9 +458,6 @@ _.extend(OplogObserveDriver.prototype, {
|
|||||||
}));
|
}));
|
||||||
});
|
});
|
||||||
fut.wait();
|
fut.wait();
|
||||||
// XXX do this even if we've switched to PHASE.QUERYING?
|
|
||||||
if (anyError)
|
|
||||||
throw anyError;
|
|
||||||
// Exit now if we've had a _pollQuery call (here or in another fiber).
|
// Exit now if we've had a _pollQuery call (here or in another fiber).
|
||||||
if (self._phase === PHASE.QUERYING)
|
if (self._phase === PHASE.QUERYING)
|
||||||
return;
|
return;
|
||||||
@@ -599,22 +598,35 @@ _.extend(OplogObserveDriver.prototype, {
|
|||||||
|
|
||||||
_runQuery: function () {
|
_runQuery: function () {
|
||||||
var self = this;
|
var self = this;
|
||||||
var newResults = new LocalCollection._IdMap;
|
var newResults, newBuffer;
|
||||||
var newBuffer = new LocalCollection._IdMap;
|
|
||||||
|
|
||||||
// Query 2x documents as the half excluded from the original query will go
|
while (true) {
|
||||||
// into unpublished buffer to reduce additional Mongo lookups in cases when
|
newResults = new LocalCollection._IdMap;
|
||||||
// documents are removed from the published set and need a replacement.
|
newBuffer = new LocalCollection._IdMap;
|
||||||
// XXX needs more thought on non-zero skip
|
|
||||||
// XXX 2 is a "magic number" meaning there is an extra chunk of docs for
|
// Query 2x documents as the half excluded from the original query will go
|
||||||
// buffer if such is needed.
|
// into unpublished buffer to reduce additional Mongo lookups in cases
|
||||||
var cursor = self._cursorForQuery({ limit: self._limit * 2 });
|
// when documents are removed from the published set and need a
|
||||||
cursor.forEach(function (doc, i) {
|
// replacement.
|
||||||
if (!self._limit || i < self._limit)
|
// XXX needs more thought on non-zero skip
|
||||||
newResults.set(doc._id, doc);
|
// XXX 2 is a "magic number" meaning there is an extra chunk of docs for
|
||||||
else
|
// buffer if such is needed.
|
||||||
newBuffer.set(doc._id, doc);
|
var cursor = self._cursorForQuery({ limit: self._limit * 2 });
|
||||||
});
|
try {
|
||||||
|
cursor.forEach(function (doc, i) {
|
||||||
|
if (!self._limit || i < self._limit)
|
||||||
|
newResults.set(doc._id, doc);
|
||||||
|
else
|
||||||
|
newBuffer.set(doc._id, doc);
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
} catch (e) {
|
||||||
|
// During failover (eg) if we get an exception we should log and retry
|
||||||
|
// instead of crashing.
|
||||||
|
Meteor._debug("Got exception while polling query: " + e);
|
||||||
|
Meteor._sleepForMs(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
self._publishNewResults(newResults, newBuffer);
|
self._publishNewResults(newResults, newBuffer);
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -101,12 +101,22 @@ _.extend(OplogHandle.prototype, {
|
|||||||
// be ready.
|
// be ready.
|
||||||
self._readyFuture.wait();
|
self._readyFuture.wait();
|
||||||
|
|
||||||
// We need to make the selector at least as restrictive as the actual
|
while (true) {
|
||||||
// tailing selector (ie, we need to specify the DB name) or else we might
|
// We need to make the selector at least as restrictive as the actual
|
||||||
// find a TS that won't show up in the actual tail stream.
|
// tailing selector (ie, we need to specify the DB name) or else we might
|
||||||
var lastEntry = self._oplogLastEntryConnection.findOne(
|
// find a TS that won't show up in the actual tail stream.
|
||||||
OPLOG_COLLECTION, self._baseOplogSelector,
|
try {
|
||||||
{fields: {ts: 1}, sort: {$natural: -1}});
|
var lastEntry = self._oplogLastEntryConnection.findOne(
|
||||||
|
OPLOG_COLLECTION, self._baseOplogSelector,
|
||||||
|
{fields: {ts: 1}, sort: {$natural: -1}});
|
||||||
|
break;
|
||||||
|
} catch (e) {
|
||||||
|
// During failover (eg) if we get an exception we should log and retry
|
||||||
|
// instead of crashing.
|
||||||
|
Meteor._debug("Got exception while reading last entry: " + e);
|
||||||
|
Meteor._sleepForMs(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!lastEntry) {
|
if (!lastEntry) {
|
||||||
// Really, nothing in the oplog? Well, we've processed everything.
|
// Really, nothing in the oplog? Well, we've processed everything.
|
||||||
|
|||||||
Reference in New Issue
Block a user