Re-poll all OplogObserveDrivers on mongo failover

This commit is contained in:
David Glasser
2014-04-09 17:52:55 -07:00
parent 42bfb46e46
commit 24a0006c14
3 changed files with 48 additions and 8 deletions

View File

@@ -113,6 +113,7 @@ MongoConnection = function (url, options) {
options = options || {}; options = options || {};
self._connectCallbacks = []; self._connectCallbacks = [];
self._observeMultiplexers = {}; self._observeMultiplexers = {};
self._onFailoverHook = new Hook;
var mongoOptions = {db: {safe: true}, server: {}, replSet: {}}; var mongoOptions = {db: {safe: true}, server: {}, replSet: {}};
@@ -144,18 +145,42 @@ MongoConnection = function (url, options) {
mongoOptions.replSet.poolSize = options.poolSize; mongoOptions.replSet.poolSize = options.poolSize;
} }
MongoDB.connect(url, mongoOptions, function(err, db) { MongoDB.connect(url, mongoOptions, Meteor.bindEnvironment(function(err, db) {
if (err) if (err)
throw err; throw err;
self.db = db; self.db = db;
// We keep track of the ReplSet's primary, so that we can trigger hooks when
// it changes. The Node driver's joined callback seems to fire way too
// often, which is why we need to track it ourselves.
self._primary = null;
// First, figure out what the current primary is, if any.
if (self.db.serverConfig._state.master)
self._primary = self.db.serverConfig._state.master.name;
self.db.serverConfig.on(
'joined', Meteor.bindEnvironment(function (kind, doc) {
if (kind === 'primary') {
if (doc.primary !== self._primary) {
self._primary = doc.primary;
self._onFailoverHook.each(function (callback) {
callback();
return true;
});
}
} else if (doc.me === self._primary) {
// The thing we thought was primary is now something other than
// primary. Forget that we thought it was primary. (This means that
// if a server stops being primary and then starts being primary again
// without another server becoming primary in the middle, we'll
// correctly count it as a failover.)
self._primary = null;
}
}));
Fiber(function () { // drain queue of pending callbacks
// drain queue of pending callbacks _.each(self._connectCallbacks, function (c) {
_.each(self._connectCallbacks, function (c) { c(db);
c(db); });
}); }));
}).run();
});
self._docFetcher = new DocFetcher(self); self._docFetcher = new DocFetcher(self);
self._oplogHandle = null; self._oplogHandle = null;
@@ -229,6 +254,12 @@ MongoConnection.prototype._maybeBeginWrite = function () {
return {committed: function () {}}; return {committed: function () {}};
}; };
// Internal interface: adds a callback which is called when the Mongo primary
// changes. Returns a stop handle.
MongoConnection.prototype._onFailover = function (callback) {
return this._onFailoverHook.register(callback);
};
//////////// Public API ////////// //////////// Public API //////////

View File

@@ -151,6 +151,13 @@ OplogObserveDriver = function (options) {
} }
)); ));
// When Mongo fails over, we need to repoll the query, in case we processed an
// oplog entry that got rolled back.
self._stopHandles.push(self._mongoHandle._onFailover(finishIfNeedToPollQuery(
function () {
self._needToPollQuery();
})));
// Give _observeChanges a chance to add the new ObserveHandle to our // Give _observeChanges a chance to add the new ObserveHandle to our
// multiplexer, so that the added calls get streamed. // multiplexer, so that the added calls get streamed.
Meteor.defer(finishIfNeedToPollQuery(function () { Meteor.defer(finishIfNeedToPollQuery(function () {

View File

@@ -44,6 +44,8 @@ Package.on_use(function (api) {
// If the facts package is loaded, publish some statistics. // If the facts package is loaded, publish some statistics.
api.use('facts', 'server', {weak: true}); api.use('facts', 'server', {weak: true});
api.use('callback-hook', 'server');
// Stuff that should be exposed via a real API, but we haven't yet. // Stuff that should be exposed via a real API, but we haven't yet.
api.export('MongoInternals', 'server'); api.export('MongoInternals', 'server');
// For tests only. // For tests only.