Re-poll all OplogObserveDrivers on mongo failover

This commit is contained in:
David Glasser
2014-04-09 17:52:55 -07:00
parent 42bfb46e46
commit 24a0006c14
3 changed files with 48 additions and 8 deletions

View File

@@ -113,6 +113,7 @@ MongoConnection = function (url, options) {
options = options || {};
self._connectCallbacks = [];
self._observeMultiplexers = {};
self._onFailoverHook = new Hook;
var mongoOptions = {db: {safe: true}, server: {}, replSet: {}};
@@ -144,18 +145,42 @@ MongoConnection = function (url, options) {
mongoOptions.replSet.poolSize = options.poolSize;
}
MongoDB.connect(url, mongoOptions, function(err, db) {
MongoDB.connect(url, mongoOptions, Meteor.bindEnvironment(function(err, db) {
if (err)
throw err;
self.db = db;
// We keep track of the ReplSet's primary, so that we can trigger hooks when
// it changes. The Node driver's joined callback seems to fire way too
// often, which is why we need to track it ourselves.
self._primary = null;
// First, figure out what the current primary is, if any.
if (self.db.serverConfig._state.master)
self._primary = self.db.serverConfig._state.master.name;
self.db.serverConfig.on(
'joined', Meteor.bindEnvironment(function (kind, doc) {
if (kind === 'primary') {
if (doc.primary !== self._primary) {
self._primary = doc.primary;
self._onFailoverHook.each(function (callback) {
callback();
return true;
});
}
} else if (doc.me === self._primary) {
// The thing we thought was primary is now something other than
// primary. Forget that we thought it was primary. (This means that
// if a server stops being primary and then starts being primary again
// without another server becoming primary in the middle, we'll
// correctly count it as a failover.)
self._primary = null;
}
}));
Fiber(function () {
// drain queue of pending callbacks
_.each(self._connectCallbacks, function (c) {
c(db);
});
}).run();
});
// drain queue of pending callbacks
_.each(self._connectCallbacks, function (c) {
c(db);
});
}));
self._docFetcher = new DocFetcher(self);
self._oplogHandle = null;
@@ -229,6 +254,12 @@ MongoConnection.prototype._maybeBeginWrite = function () {
return {committed: function () {}};
};
// Internal interface: adds a callback which is called when the Mongo primary
// changes. Returns a stop handle.
MongoConnection.prototype._onFailover = function (callback) {
return this._onFailoverHook.register(callback);
};
//////////// Public API //////////

View File

@@ -151,6 +151,13 @@ OplogObserveDriver = function (options) {
}
));
// When Mongo fails over, we need to repoll the query, in case we processed an
// oplog entry that got rolled back.
self._stopHandles.push(self._mongoHandle._onFailover(finishIfNeedToPollQuery(
function () {
self._needToPollQuery();
})));
// Give _observeChanges a chance to add the new ObserveHandle to our
// multiplexer, so that the added calls get streamed.
Meteor.defer(finishIfNeedToPollQuery(function () {

View File

@@ -44,6 +44,8 @@ Package.on_use(function (api) {
// If the facts package is loaded, publish some statistics.
api.use('facts', 'server', {weak: true});
api.use('callback-hook', 'server');
// Stuff that should be exposed via a real API, but we haven't yet.
api.export('MongoInternals', 'server');
// For tests only.