From 76ac0ca6eead9ddc4ecd7fb367fe5956fc4a96ce Mon Sep 17 00:00:00 2001 From: David Glasser Date: Fri, 6 Oct 2017 17:13:16 -0700 Subject: [PATCH] mongo: restart underlying oplog tail every 30 seconds without a doc Workaround for #8598: the Node Mongo driver has at least one bug that can lead to query callbacks never getting called (even with an error) when leadership failover occur. --- packages/mongo/mongo_driver.js | 75 +++++++++++++++++++++++++++------ packages/mongo/oplog_tailing.js | 13 +++++- 2 files changed, 73 insertions(+), 15 deletions(-) diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 73f96fee20..b407ef8bed 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -1001,21 +1001,33 @@ var SynchronousCursor = function (dbCursor, cursorDescription, options) { self._transform = null; } - // Need to specify that the callback is the first argument to nextObject, - // since otherwise when we try to call it with no args the driver will - // interpret "undefined" first arg as an options hash and crash. - self._synchronousNextObject = Future.wrap( - dbCursor.nextObject.bind(dbCursor), 0); self._synchronousCount = Future.wrap(dbCursor.count.bind(dbCursor)); self._visitedIds = new LocalCollection._IdMap; }; _.extend(SynchronousCursor.prototype, { - _nextObject: function () { + // Returns a Promise for the next object from the underlying cursor (before + // the Mongo->Meteor type replacement). + _rawNextObjectPromise: function () { + const self = this; + return new Promise((resolve, reject) => { + self._dbCursor.next((err, doc) => { + if (err) { + reject(err); + } else { + resolve(doc); + } + }); + }); + }, + + // Returns a Promise for the next object from the cursor, skipping those whose + // IDs we've already seen and replacing Mongo atoms with Meteor atoms. + _nextObjectPromise: async function () { var self = this; while (true) { - var doc = self._synchronousNextObject().wait(); + var doc = await self._rawNextObjectPromise(); if (!doc) return null; doc = replaceTypes(doc, replaceMongoAtomWithMeteor); @@ -1038,6 +1050,36 @@ _.extend(SynchronousCursor.prototype, { } }, + // Returns a promise which is resolved with the next object (like with + // _nextObjectPromise) or rejected if the cursor doesn't return within + // timeoutMS ms. + _nextObjectPromiseWithTimeout: function (timeoutMS) { + const self = this; + if (!timeoutMS) { + return self._nextObjectPromise(); + } + const nextObjectPromise = self._nextObjectPromise(); + const timeoutErr = new Error('Client-side timeout waiting for next object'); + const timeoutPromise = new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject(timeoutErr); + }, timeoutMS); + }); + return Promise.race([nextObjectPromise, timeoutPromise]) + .catch((err) => { + if (err === timeoutErr) { + console.log("hit client-side timeout"); + self.close(); + } + throw err; + }); + }, + + _nextObject: function () { + var self = this; + return self._nextObjectPromise().await(); + }, + forEach: function (callback, thisArg) { var self = this; @@ -1124,7 +1166,13 @@ SynchronousCursor.prototype[Symbol.iterator] = function () { }; }; -MongoConnection.prototype.tail = function (cursorDescription, docCallback) { +// Tails the cursor described by cursorDescription, most likely on the +// oplog. Calls docCallback with each document found. Ignores errors and just +// restarts the tail on error. +// +// If timeoutMS is set, then if we don't get a new document every timeoutMS, +// kill and restart the cursor. This is primarily a workaround for #8598. +MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeoutMS) { var self = this; if (!cursorDescription.options.tailable) throw new Error("Can only tail a tailable cursor"); @@ -1139,14 +1187,15 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback) { if (stopped) return; try { - doc = cursor._nextObject(); + doc = cursor._nextObjectPromiseWithTimeout(timeoutMS).await(); } catch (err) { - // There's no good way to figure out if this was actually an error - // from Mongo. Ah well. But either way, we need to retry the cursor - // (unless the failure was because the observe got stopped). + // There's no good way to figure out if this was actually an error from + // Mongo, or just client-side (including our own timeout error). Ah + // well. But either way, we need to retry the cursor (unless the failure + // was because the observe got stopped). doc = null; } - // Since cursor._nextObject can yield, we need to check again to see if + // Since we awaited a promise above, we need to check again to see if // we've been stopped before calling the callback. if (stopped) return; diff --git a/packages/mongo/oplog_tailing.js b/packages/mongo/oplog_tailing.js index eb7f59d74b..2a5abb69e4 100644 --- a/packages/mongo/oplog_tailing.js +++ b/packages/mongo/oplog_tailing.js @@ -3,6 +3,7 @@ var Future = Npm.require('fibers/future'); OPLOG_COLLECTION = 'oplog.rs'; var TOO_FAR_BEHIND = process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000; +var TAIL_TIMEOUT = +process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000; var showTS = function (ts) { return "Timestamp(" + ts.getHighBits() + ", " + ts.getLowBits() + ")"; @@ -236,11 +237,19 @@ _.extend(OplogHandle.prototype, { var cursorDescription = new CursorDescription( OPLOG_COLLECTION, oplogSelector, {tailable: true}); + // Start tailing the oplog. + // + // We restart the low-level oplog query every 30 seconds if we didn't get a + // doc. This is a workaround for #8598: the Node Mongo driver has at least + // one bug that can lead to query callbacks never getting called (even with + // an error) when leadership failover occur. self._tailHandle = self._oplogTailConnection.tail( - cursorDescription, function (doc) { + cursorDescription, + function (doc) { self._entryQueue.push(doc); self._maybeStartWorker(); - } + }, + TAIL_TIMEOUT ); self._readyFuture.return(); },