Remove fibers future usage from oplog_tailing.js.

This commit is contained in:
Matheus Castro
2022-10-28 11:42:50 -03:00
parent 484e15a0b8
commit e6ff9c754b

View File

@@ -1,5 +1,3 @@
var Future = Npm.require('fibers/future');
import { NpmModuleMongodb } from "meteor/npm-mongo";
const { Long } = NpmModuleMongodb;
@@ -8,10 +6,6 @@ OPLOG_COLLECTION = 'oplog.rs';
var TOO_FAR_BEHIND = process.env.METEOR_OPLOG_TOO_FAR_BEHIND || 2000;
var TAIL_TIMEOUT = +process.env.METEOR_OPLOG_TAIL_TIMEOUT || 30000;
var showTS = function (ts) {
return "Timestamp(" + ts.getHighBits() + ", " + ts.getLowBits() + ")";
};
idForOp = function (op) {
if (op.op === 'd')
return op.o._id;
@@ -35,8 +29,8 @@ OplogHandle = function (oplogUrl, dbName) {
self._oplogTailConnection = null;
self._stopped = false;
self._tailHandle = null;
self._readyFuture = new Future();
self._isReady = false;
self._readyPromiseResolver = null;
self._readyPromise = new Promise(r => self._readyPromiseResolver = r);
self._crossbar = new DDPServer._Crossbar({
factPackage: "mongo-livedata", factName: "oplog-watchers"
});
@@ -73,7 +67,7 @@ OplogHandle = function (oplogUrl, dbName) {
// incremented to be past its timestamp by the worker fiber.
//
// XXX use a priority queue or something else that's faster than an array
self._catchingUpFutures = [];
self._catchingUpResolvers = [];
self._lastProcessedTS = null;
self._onSkippedEntriesHook = new Hook({
@@ -83,7 +77,10 @@ OplogHandle = function (oplogUrl, dbName) {
self._entryQueue = new Meteor._DoubleEndedQueue();
self._workerActive = false;
self._startTailing();
const shouldAwait = self._startTailing();
if (Meteor._isFibersEnabled) {
Promise.await(shouldAwait);
}
};
Object.assign(OplogHandle.prototype, {
@@ -96,35 +93,13 @@ Object.assign(OplogHandle.prototype, {
self._tailHandle.stop();
// XXX should close connections too
},
_onOplogEntryFibers: function(trigger, callback) {
_onOplogEntry: async function(trigger, callback) {
var self = this;
if (self._stopped)
throw new Error("Called onOplogEntry on stopped handle!");
// Calling onOplogEntry requires us to wait for the tailing to be ready.
self._readyFuture.wait();
var originalCallback = callback;
callback = Meteor.bindEnvironment(function (notification) {
originalCallback(notification);
}, function (err) {
Meteor._debug("Error in oplog callback", err);
});
var listenHandle = self._crossbar.listen(trigger, callback);
return {
stop: function () {
listenHandle.stop();
}
};
},
async _onOplogEntryNoFibers(trigger, callback) {
var self = this;
if (self._stopped)
throw new Error("Called onOplogEntry on stopped handle!");
while (!self._isReady) {
await Meteor._sleepForMs(100);
}
await self._readyPromise;
var originalCallback = callback;
callback = Meteor.bindEnvironment(function (notification) {
@@ -140,7 +115,7 @@ Object.assign(OplogHandle.prototype, {
};
},
onOplogEntry: function (trigger, callback) {
return Meteor._isFibersEnabled ? this._onOplogEntryFibers(trigger, callback) : this._onOplogEntryNoFibers(trigger, callback);
return Meteor._isFibersEnabled ? Promise.await(this._onOplogEntry(trigger, callback)) : this._onOplogEntry(trigger, callback);
},
// Register a callback to be invoked any time we skip oplog entries (eg,
// because we are too far behind).
@@ -151,75 +126,14 @@ Object.assign(OplogHandle.prototype, {
return self._onSkippedEntriesHook.register(callback);
},
_waitUntilCaughtUpFibers() {
async _waitUntilCaughtUp() {
var self = this;
if (self._stopped)
throw new Error("Called waitUntilCaughtUp on stopped handle!");
// Calling waitUntilCaughtUp requries us to wait for the oplog connection to
// be ready.
self._readyFuture.wait();
var lastEntry;
while (!self._stopped) {
// We need to make the selector at least as restrictive as the actual
// tailing selector (ie, we need to specify the DB name) or else we might
// find a TS that won't show up in the actual tail stream.
try {
lastEntry = self._oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, self._baseOplogSelector,
{fields: {ts: 1}, sort: {$natural: -1}});
break;
} catch (e) {
// During failover (eg) if we get an exception we should log and retry
// instead of crashing.
Meteor._debug("Got exception while reading last entry", e);
Meteor._sleepForMs(100);
}
}
if (self._stopped)
return;
if (!lastEntry) {
// Really, nothing in the oplog? Well, we've processed everything.
return;
}
var ts = lastEntry.ts;
if (!ts)
throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry));
if (self._lastProcessedTS && ts.lessThanOrEqual(self._lastProcessedTS)) {
// We've already caught up to here.
return;
}
// Insert the future into our list. Almost always, this will be at the end,
// but it's conceivable that if we fail over from one primary to another,
// the oplog entries we see will go backwards.
var insertAfter = self._catchingUpFutures.length;
while (insertAfter - 1 > 0 && self._catchingUpFutures[insertAfter - 1].ts.greaterThan(ts)) {
insertAfter--;
}
var f = new Future;
self._catchingUpFutures.splice(insertAfter, 0, {ts: ts, future: f});
f.wait();
},
async _waitUntilCaughtUpNoFibers() {
var self = this;
if (self._stopped)
throw new Error("Called waitUntilCaughtUp on stopped handle!");
// TODO -> Should we wait? Is waiting needed?
// Calling waitUntilCaughtUp requries us to wait for the oplog connection to
// be ready.
while (!this._isReady) {
await Meteor._sleepForMs(100);
}
await self._readyPromise;
var lastEntry;
while (!self._stopped) {
@@ -255,25 +169,37 @@ Object.assign(OplogHandle.prototype, {
// We've already caught up to here.
return;
}
},
// Insert the future into our list. Almost always, this will be at the end,
// but it's conceivable that if we fail over from one primary to another,
// the oplog entries we see will go backwards.
var insertAfter = self._catchingUpResolvers.length;
while (insertAfter - 1 > 0 && self._catchingUpResolvers[insertAfter - 1].ts.greaterThan(ts)) {
insertAfter--;
}
let promiseResolver = null;
const promiseToAwait = new Promise(r => promiseResolver = r);
self._catchingUpResolvers.splice(insertAfter, 0, {ts: ts, resolver: promiseResolver});
await promiseToAwait;
},
// Calls `callback` once the oplog has been processed up to a point that is
// roughly "now": specifically, once we've processed all ops that are
// currently visible.
// XXX become convinced that this is actually safe even if oplogConnection
// is some kind of pool
waitUntilCaughtUp: function () {
return Meteor._isFibersEnabled ? this._waitUntilCaughtUpFibers() : this._waitUntilCaughtUpNoFibers()
return Meteor._isFibersEnabled ? Promise.await(this._waitUntilCaughtUp()) : this._waitUntilCaughtUp();
},
_startTailing: function () {
_startTailing: async function () {
var self = this;
// First, make sure that we're talking to the local database.
var mongodbUri = Npm.require('mongodb-uri');
if (mongodbUri.parse(self._oplogUrl).database !== 'local') {
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of " +
"a Mongo replica set");
"a Mongo replica set");
}
// We make two separate connections to Mongo. The Node Mongo driver
@@ -288,70 +214,17 @@ Object.assign(OplogHandle.prototype, {
// The tail connection will only ever be running a single tail command, so
// it only needs to make one underlying TCP connection.
self._oplogTailConnection = new MongoConnection(
self._oplogUrl, {maxPoolSize: 1});
self._oplogUrl, {maxPoolSize: 1});
// XXX better docs, but: it's to get monotonic results
// XXX is it safe to say "if there's an in flight query, just use its
// results"? I don't think so but should consider that
self._oplogLastEntryConnection = new MongoConnection(
self._oplogUrl, {maxPoolSize: 1});
self._oplogUrl, {maxPoolSize: 1});
if (Meteor._isFibersEnabled) {
return this._startTailingFibers();
}
self._oplogLastEntryConnection.db.admin().command(
{ ismaster: 1 }, function(_, isMasterDoc) {
if (!(isMasterDoc && isMasterDoc.setName)) {
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of " +
"a Mongo replica set");
}
// Find the last oplog entry.
var lastOplogEntry = self._oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, {}, {sort: {$natural: -1}, fields: {ts: 1}});
var oplogSelector = Object.assign({}, self._baseOplogSelector);
if (lastOplogEntry) {
// Start after the last entry that currently exists.
oplogSelector.ts = {$gt: lastOplogEntry.ts};
// If there are any calls to callWhenProcessedLatest before any other
// oplog entries show up, allow callWhenProcessedLatest to call its
// callback immediately.
self._lastProcessedTS = lastOplogEntry.ts;
}
var cursorDescription = new CursorDescription(
OPLOG_COLLECTION, oplogSelector, {tailable: true});
// Start tailing the oplog.
//
// We restart the low-level oplog query every 30 seconds if we didn't get a
// doc. This is a workaround for #8598: the Node Mongo driver has at least
// one bug that can lead to query callbacks never getting called (even with
// an error) when leadership failover occur.
self._tailHandle = self._oplogTailConnection.tail(
cursorDescription,
function (doc) {
self._entryQueue.push(doc);
self._maybeStartWorker();
},
TAIL_TIMEOUT
);
self._isReady = true;
});
},
_startTailingFibers: function() {
const self = this;
// Now, make sure that there actually is a repl set here. If not, oplog
// tailing won't ever find anything!
// More on the isMasterDoc
// https://docs.mongodb.com/manual/reference/command/isMaster/
var f = new Future;
self._oplogLastEntryConnection.db.admin().command(
{ ismaster: 1 }, f.resolver());
var isMasterDoc = f.wait();
const isMasterDoc = await Meteor.promisify((cb) => {
self._oplogLastEntryConnection.db.admin().command({ismaster: 1}, cb);
})();
if (!(isMasterDoc && isMasterDoc.setName)) {
throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of " +
@@ -359,10 +232,10 @@ Object.assign(OplogHandle.prototype, {
}
// Find the last oplog entry.
var lastOplogEntry = self._oplogLastEntryConnection.findOne(
var lastOplogEntry = await self._oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, {}, {sort: {$natural: -1}, fields: {ts: 1}});
var oplogSelector = _.clone(self._baseOplogSelector);
var oplogSelector = Object.assign({}, self._baseOplogSelector);
if (lastOplogEntry) {
// Start after the last entry that currently exists.
oplogSelector.ts = {$gt: lastOplogEntry.ts};
@@ -389,7 +262,8 @@ Object.assign(OplogHandle.prototype, {
},
TAIL_TIMEOUT
);
self._readyFuture.return();
self._readyPromiseResolver();
},
_maybeStartWorker: function () {
@@ -493,9 +367,9 @@ Object.assign(OplogHandle.prototype, {
_setLastProcessedTS: function (ts) {
var self = this;
self._lastProcessedTS = ts;
while (!_.isEmpty(self._catchingUpFutures) && self._catchingUpFutures[0].ts.lessThanOrEqual(self._lastProcessedTS)) {
var sequencer = self._catchingUpFutures.shift();
sequencer.future.return();
while (!_.isEmpty(self._catchingUpResolvers) && self._catchingUpResolvers[0].ts.lessThanOrEqual(self._lastProcessedTS)) {
var sequencer = self._catchingUpResolvers.shift();
sequencer.resolver();
}
},