diff --git a/packages/mongo/oplog_tailing.js b/packages/mongo/oplog_tailing.js index 0728b121a0..a50e3f5a1e 100644 --- a/packages/mongo/oplog_tailing.js +++ b/packages/mongo/oplog_tailing.js @@ -1,5 +1,3 @@ -var Future = Npm.require('fibers/future'); - import { NpmModuleMongodb } from "meteor/npm-mongo"; const { Long } = NpmModuleMongodb; @@ -8,10 +6,6 @@ OPLOG_COLLECTION = 'oplog.rs'; var TOO_FAR_BEHIND = process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000; var TAIL_TIMEOUT = +process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000; -var showTS = function (ts) { - return "Timestamp(" + ts.getHighBits() + ", " + ts.getLowBits() + ")"; -}; - idForOp = function (op) { if (op.op === 'd') return op.o._id; @@ -35,8 +29,8 @@ OplogHandle = function (oplogUrl, dbName) { self._oplogTailConnection = null; self._stopped = false; self._tailHandle = null; - self._readyFuture = new Future(); - self._isReady = false; + self._readyPromiseResolver = null; + self._readyPromise = new Promise(r => self._readyPromiseResolver = r); self._crossbar = new DDPServer._Crossbar({ factPackage: "mongo-livedata", factName: "oplog-watchers" }); @@ -73,7 +67,7 @@ OplogHandle = function (oplogUrl, dbName) { // incremented to be past its timestamp by the worker fiber. // // XXX use a priority queue or something else that's faster than an array - self._catchingUpFutures = []; + self._catchingUpResolvers = []; self._lastProcessedTS = null; self._onSkippedEntriesHook = new Hook({ @@ -83,7 +77,10 @@ OplogHandle = function (oplogUrl, dbName) { self._entryQueue = new Meteor._DoubleEndedQueue(); self._workerActive = false; - self._startTailing(); + const shouldAwait = self._startTailing(); + if (Meteor._isFibersEnabled) { + Promise.await(shouldAwait); + } }; Object.assign(OplogHandle.prototype, { @@ -96,35 +93,13 @@ Object.assign(OplogHandle.prototype, { self._tailHandle.stop(); // XXX should close connections too }, - _onOplogEntryFibers: function(trigger, callback) { + _onOplogEntry: async function(trigger, callback) { var self = this; if (self._stopped) throw new Error("Called onOplogEntry on stopped handle!"); // Calling onOplogEntry requires us to wait for the tailing to be ready. - self._readyFuture.wait(); - - var originalCallback = callback; - callback = Meteor.bindEnvironment(function (notification) { - originalCallback(notification); - }, function (err) { - Meteor._debug("Error in oplog callback", err); - }); - var listenHandle = self._crossbar.listen(trigger, callback); - return { - stop: function () { - listenHandle.stop(); - } - }; - }, - async _onOplogEntryNoFibers(trigger, callback) { - var self = this; - if (self._stopped) - throw new Error("Called onOplogEntry on stopped handle!"); - - while (!self._isReady) { - await Meteor._sleepForMs(100); - } + await self._readyPromise; var originalCallback = callback; callback = Meteor.bindEnvironment(function (notification) { @@ -140,7 +115,7 @@ Object.assign(OplogHandle.prototype, { }; }, onOplogEntry: function (trigger, callback) { - return Meteor._isFibersEnabled ? this._onOplogEntryFibers(trigger, callback) : this._onOplogEntryNoFibers(trigger, callback); + return Meteor._isFibersEnabled ? Promise.await(this._onOplogEntry(trigger, callback)) : this._onOplogEntry(trigger, callback); }, // Register a callback to be invoked any time we skip oplog entries (eg, // because we are too far behind). @@ -151,75 +126,14 @@ Object.assign(OplogHandle.prototype, { return self._onSkippedEntriesHook.register(callback); }, - _waitUntilCaughtUpFibers() { + async _waitUntilCaughtUp() { var self = this; if (self._stopped) throw new Error("Called waitUntilCaughtUp on stopped handle!"); // Calling waitUntilCaughtUp requries us to wait for the oplog connection to // be ready. - self._readyFuture.wait(); - var lastEntry; - - while (!self._stopped) { - // We need to make the selector at least as restrictive as the actual - // tailing selector (ie, we need to specify the DB name) or else we might - // find a TS that won't show up in the actual tail stream. - try { - lastEntry = self._oplogLastEntryConnection.findOne( - OPLOG_COLLECTION, self._baseOplogSelector, - {fields: {ts: 1}, sort: {$natural: -1}}); - break; - } catch (e) { - // During failover (eg) if we get an exception we should log and retry - // instead of crashing. - Meteor._debug("Got exception while reading last entry", e); - Meteor._sleepForMs(100); - } - } - - if (self._stopped) - return; - - if (!lastEntry) { - // Really, nothing in the oplog? Well, we've processed everything. - return; - } - - var ts = lastEntry.ts; - if (!ts) - throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry)); - - if (self._lastProcessedTS && ts.lessThanOrEqual(self._lastProcessedTS)) { - // We've already caught up to here. - return; - } - - - // Insert the future into our list. Almost always, this will be at the end, - // but it's conceivable that if we fail over from one primary to another, - // the oplog entries we see will go backwards. - var insertAfter = self._catchingUpFutures.length; - while (insertAfter - 1 > 0 && self._catchingUpFutures[insertAfter - 1].ts.greaterThan(ts)) { - insertAfter--; - } - var f = new Future; - self._catchingUpFutures.splice(insertAfter, 0, {ts: ts, future: f}); - f.wait(); - }, - - async _waitUntilCaughtUpNoFibers() { - var self = this; - if (self._stopped) - throw new Error("Called waitUntilCaughtUp on stopped handle!"); - - // TODO -> Should we wait? Is waiting needed? - // Calling waitUntilCaughtUp requries us to wait for the oplog connection to - // be ready. - while (!this._isReady) { - await Meteor._sleepForMs(100); - } - + await self._readyPromise; var lastEntry; while (!self._stopped) { @@ -255,25 +169,37 @@ Object.assign(OplogHandle.prototype, { // We've already caught up to here. return; } - }, + // Insert the future into our list. Almost always, this will be at the end, + // but it's conceivable that if we fail over from one primary to another, + // the oplog entries we see will go backwards. + var insertAfter = self._catchingUpResolvers.length; + while (insertAfter - 1 > 0 && self._catchingUpResolvers[insertAfter - 1].ts.greaterThan(ts)) { + insertAfter--; + } + let promiseResolver = null; + const promiseToAwait = new Promise(r => promiseResolver = r); + self._catchingUpResolvers.splice(insertAfter, 0, {ts: ts, resolver: promiseResolver}); + await promiseToAwait; + }, + // Calls `callback` once the oplog has been processed up to a point that is // roughly "now": specifically, once we've processed all ops that are // currently visible. // XXX become convinced that this is actually safe even if oplogConnection // is some kind of pool waitUntilCaughtUp: function () { - return Meteor._isFibersEnabled ? this._waitUntilCaughtUpFibers() : this._waitUntilCaughtUpNoFibers() + return Meteor._isFibersEnabled ? Promise.await(this._waitUntilCaughtUp()) : this._waitUntilCaughtUp(); }, - _startTailing: function () { + _startTailing: async function () { var self = this; // First, make sure that we're talking to the local database. var mongodbUri = Npm.require('mongodb-uri'); if (mongodbUri.parse(self._oplogUrl).database !== 'local') { throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of " + - "a Mongo replica set"); + "a Mongo replica set"); } // We make two separate connections to Mongo. The Node Mongo driver @@ -288,70 +214,17 @@ Object.assign(OplogHandle.prototype, { // The tail connection will only ever be running a single tail command, so // it only needs to make one underlying TCP connection. self._oplogTailConnection = new MongoConnection( - self._oplogUrl, {maxPoolSize: 1}); + self._oplogUrl, {maxPoolSize: 1}); // XXX better docs, but: it's to get monotonic results // XXX is it safe to say "if there's an in flight query, just use its // results"? I don't think so but should consider that self._oplogLastEntryConnection = new MongoConnection( - self._oplogUrl, {maxPoolSize: 1}); + self._oplogUrl, {maxPoolSize: 1}); - if (Meteor._isFibersEnabled) { - return this._startTailingFibers(); - } - self._oplogLastEntryConnection.db.admin().command( - { ismaster: 1 }, function(_, isMasterDoc) { - if (!(isMasterDoc && isMasterDoc.setName)) { - throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of " + - "a Mongo replica set"); - } - - // Find the last oplog entry. - var lastOplogEntry = self._oplogLastEntryConnection.findOne( - OPLOG_COLLECTION, {}, {sort: {$natural: -1}, fields: {ts: 1}}); - - var oplogSelector = Object.assign({}, self._baseOplogSelector); - if (lastOplogEntry) { - // Start after the last entry that currently exists. - oplogSelector.ts = {$gt: lastOplogEntry.ts}; - // If there are any calls to callWhenProcessedLatest before any other - // oplog entries show up, allow callWhenProcessedLatest to call its - // callback immediately. - self._lastProcessedTS = lastOplogEntry.ts; - } - - var cursorDescription = new CursorDescription( - OPLOG_COLLECTION, oplogSelector, {tailable: true}); - - // Start tailing the oplog. - // - // We restart the low-level oplog query every 30 seconds if we didn't get a - // doc. This is a workaround for #8598: the Node Mongo driver has at least - // one bug that can lead to query callbacks never getting called (even with - // an error) when leadership failover occur. - self._tailHandle = self._oplogTailConnection.tail( - cursorDescription, - function (doc) { - self._entryQueue.push(doc); - self._maybeStartWorker(); - }, - TAIL_TIMEOUT - ); - - self._isReady = true; - }); - }, - - _startTailingFibers: function() { - const self = this; - // Now, make sure that there actually is a repl set here. If not, oplog - // tailing won't ever find anything! - // More on the isMasterDoc - // https://docs.mongodb.com/manual/reference/command/isMaster/ - var f = new Future; - self._oplogLastEntryConnection.db.admin().command( - { ismaster: 1 }, f.resolver()); - var isMasterDoc = f.wait(); + const isMasterDoc = await Meteor.promisify((cb) => { + self._oplogLastEntryConnection.db.admin().command({ismaster: 1}, cb); + })(); if (!(isMasterDoc && isMasterDoc.setName)) { throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of " + @@ -359,10 +232,10 @@ Object.assign(OplogHandle.prototype, { } // Find the last oplog entry. - var lastOplogEntry = self._oplogLastEntryConnection.findOne( + var lastOplogEntry = await self._oplogLastEntryConnection.findOne( OPLOG_COLLECTION, {}, {sort: {$natural: -1}, fields: {ts: 1}}); - var oplogSelector = _.clone(self._baseOplogSelector); + var oplogSelector = Object.assign({}, self._baseOplogSelector); if (lastOplogEntry) { // Start after the last entry that currently exists. oplogSelector.ts = {$gt: lastOplogEntry.ts}; @@ -389,7 +262,8 @@ Object.assign(OplogHandle.prototype, { }, TAIL_TIMEOUT ); - self._readyFuture.return(); + + self._readyPromiseResolver(); }, _maybeStartWorker: function () { @@ -493,9 +367,9 @@ Object.assign(OplogHandle.prototype, { _setLastProcessedTS: function (ts) { var self = this; self._lastProcessedTS = ts; - while (!_.isEmpty(self._catchingUpFutures) && self._catchingUpFutures[0].ts.lessThanOrEqual(self._lastProcessedTS)) { - var sequencer = self._catchingUpFutures.shift(); - sequencer.future.return(); + while (!_.isEmpty(self._catchingUpResolvers) && self._catchingUpResolvers[0].ts.lessThanOrEqual(self._lastProcessedTS)) { + var sequencer = self._catchingUpResolvers.shift(); + sequencer.resolver(); } },