mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
mongo: restart underlying oplog tail every 30 seconds without a doc
Workaround for #8598: the Node Mongo driver has at least one bug that can lead to query callbacks never getting called (even with an error) when leadership failover occur.
This commit is contained in:
committed by
Jesse Rosenberger
parent
1d0a0c6201
commit
76ac0ca6ee
@@ -1001,21 +1001,33 @@ var SynchronousCursor = function (dbCursor, cursorDescription, options) {
|
||||
self._transform = null;
|
||||
}
|
||||
|
||||
// Need to specify that the callback is the first argument to nextObject,
|
||||
// since otherwise when we try to call it with no args the driver will
|
||||
// interpret "undefined" first arg as an options hash and crash.
|
||||
self._synchronousNextObject = Future.wrap(
|
||||
dbCursor.nextObject.bind(dbCursor), 0);
|
||||
self._synchronousCount = Future.wrap(dbCursor.count.bind(dbCursor));
|
||||
self._visitedIds = new LocalCollection._IdMap;
|
||||
};
|
||||
|
||||
_.extend(SynchronousCursor.prototype, {
|
||||
_nextObject: function () {
|
||||
// Returns a Promise for the next object from the underlying cursor (before
|
||||
// the Mongo->Meteor type replacement).
|
||||
_rawNextObjectPromise: function () {
|
||||
const self = this;
|
||||
return new Promise((resolve, reject) => {
|
||||
self._dbCursor.next((err, doc) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
resolve(doc);
|
||||
}
|
||||
});
|
||||
});
|
||||
},
|
||||
|
||||
// Returns a Promise for the next object from the cursor, skipping those whose
|
||||
// IDs we've already seen and replacing Mongo atoms with Meteor atoms.
|
||||
_nextObjectPromise: async function () {
|
||||
var self = this;
|
||||
|
||||
while (true) {
|
||||
var doc = self._synchronousNextObject().wait();
|
||||
var doc = await self._rawNextObjectPromise();
|
||||
|
||||
if (!doc) return null;
|
||||
doc = replaceTypes(doc, replaceMongoAtomWithMeteor);
|
||||
@@ -1038,6 +1050,36 @@ _.extend(SynchronousCursor.prototype, {
|
||||
}
|
||||
},
|
||||
|
||||
// Returns a promise which is resolved with the next object (like with
|
||||
// _nextObjectPromise) or rejected if the cursor doesn't return within
|
||||
// timeoutMS ms.
|
||||
_nextObjectPromiseWithTimeout: function (timeoutMS) {
|
||||
const self = this;
|
||||
if (!timeoutMS) {
|
||||
return self._nextObjectPromise();
|
||||
}
|
||||
const nextObjectPromise = self._nextObjectPromise();
|
||||
const timeoutErr = new Error('Client-side timeout waiting for next object');
|
||||
const timeoutPromise = new Promise((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
reject(timeoutErr);
|
||||
}, timeoutMS);
|
||||
});
|
||||
return Promise.race([nextObjectPromise, timeoutPromise])
|
||||
.catch((err) => {
|
||||
if (err === timeoutErr) {
|
||||
console.log("hit client-side timeout");
|
||||
self.close();
|
||||
}
|
||||
throw err;
|
||||
});
|
||||
},
|
||||
|
||||
_nextObject: function () {
|
||||
var self = this;
|
||||
return self._nextObjectPromise().await();
|
||||
},
|
||||
|
||||
forEach: function (callback, thisArg) {
|
||||
var self = this;
|
||||
|
||||
@@ -1124,7 +1166,13 @@ SynchronousCursor.prototype[Symbol.iterator] = function () {
|
||||
};
|
||||
};
|
||||
|
||||
MongoConnection.prototype.tail = function (cursorDescription, docCallback) {
|
||||
// Tails the cursor described by cursorDescription, most likely on the
|
||||
// oplog. Calls docCallback with each document found. Ignores errors and just
|
||||
// restarts the tail on error.
|
||||
//
|
||||
// If timeoutMS is set, then if we don't get a new document every timeoutMS,
|
||||
// kill and restart the cursor. This is primarily a workaround for #8598.
|
||||
MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeoutMS) {
|
||||
var self = this;
|
||||
if (!cursorDescription.options.tailable)
|
||||
throw new Error("Can only tail a tailable cursor");
|
||||
@@ -1139,14 +1187,15 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback) {
|
||||
if (stopped)
|
||||
return;
|
||||
try {
|
||||
doc = cursor._nextObject();
|
||||
doc = cursor._nextObjectPromiseWithTimeout(timeoutMS).await();
|
||||
} catch (err) {
|
||||
// There's no good way to figure out if this was actually an error
|
||||
// from Mongo. Ah well. But either way, we need to retry the cursor
|
||||
// (unless the failure was because the observe got stopped).
|
||||
// There's no good way to figure out if this was actually an error from
|
||||
// Mongo, or just client-side (including our own timeout error). Ah
|
||||
// well. But either way, we need to retry the cursor (unless the failure
|
||||
// was because the observe got stopped).
|
||||
doc = null;
|
||||
}
|
||||
// Since cursor._nextObject can yield, we need to check again to see if
|
||||
// Since we awaited a promise above, we need to check again to see if
|
||||
// we've been stopped before calling the callback.
|
||||
if (stopped)
|
||||
return;
|
||||
|
||||
@@ -3,6 +3,7 @@ var Future = Npm.require('fibers/future');
|
||||
OPLOG_COLLECTION = 'oplog.rs';
|
||||
|
||||
var TOO_FAR_BEHIND = process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000;
|
||||
var TAIL_TIMEOUT = +process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000;
|
||||
|
||||
var showTS = function (ts) {
|
||||
return "Timestamp(" + ts.getHighBits() + ", " + ts.getLowBits() + ")";
|
||||
@@ -236,11 +237,19 @@ _.extend(OplogHandle.prototype, {
|
||||
var cursorDescription = new CursorDescription(
|
||||
OPLOG_COLLECTION, oplogSelector, {tailable: true});
|
||||
|
||||
// Start tailing the oplog.
|
||||
//
|
||||
// We restart the low-level oplog query every 30 seconds if we didn't get a
|
||||
// doc. This is a workaround for #8598: the Node Mongo driver has at least
|
||||
// one bug that can lead to query callbacks never getting called (even with
|
||||
// an error) when leadership failover occur.
|
||||
self._tailHandle = self._oplogTailConnection.tail(
|
||||
cursorDescription, function (doc) {
|
||||
cursorDescription,
|
||||
function (doc) {
|
||||
self._entryQueue.push(doc);
|
||||
self._maybeStartWorker();
|
||||
}
|
||||
},
|
||||
TAIL_TIMEOUT
|
||||
);
|
||||
self._readyFuture.return();
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user