From 24a0006c1452bdba877e0cca7d591f6cc02ed50e Mon Sep 17 00:00:00 2001 From: David Glasser Date: Wed, 9 Apr 2014 17:52:55 -0700 Subject: [PATCH] Re-poll all OplogObserveDrivers on mongo failover --- packages/mongo-livedata/mongo_driver.js | 47 +++++++++++++++---- .../mongo-livedata/oplog_observe_driver.js | 7 +++ packages/mongo-livedata/package.js | 2 + 3 files changed, 48 insertions(+), 8 deletions(-) diff --git a/packages/mongo-livedata/mongo_driver.js b/packages/mongo-livedata/mongo_driver.js index e43e00d68c..a7e9535888 100644 --- a/packages/mongo-livedata/mongo_driver.js +++ b/packages/mongo-livedata/mongo_driver.js @@ -113,6 +113,7 @@ MongoConnection = function (url, options) { options = options || {}; self._connectCallbacks = []; self._observeMultiplexers = {}; + self._onFailoverHook = new Hook; var mongoOptions = {db: {safe: true}, server: {}, replSet: {}}; @@ -144,18 +145,42 @@ MongoConnection = function (url, options) { mongoOptions.replSet.poolSize = options.poolSize; } - MongoDB.connect(url, mongoOptions, function(err, db) { + MongoDB.connect(url, mongoOptions, Meteor.bindEnvironment(function(err, db) { if (err) throw err; self.db = db; + // We keep track of the ReplSet's primary, so that we can trigger hooks when + // it changes. The Node driver's joined callback seems to fire way too + // often, which is why we need to track it ourselves. + self._primary = null; + // First, figure out what the current primary is, if any. + if (self.db.serverConfig._state.master) + self._primary = self.db.serverConfig._state.master.name; + self.db.serverConfig.on( + 'joined', Meteor.bindEnvironment(function (kind, doc) { + if (kind === 'primary') { + if (doc.primary !== self._primary) { + self._primary = doc.primary; + self._onFailoverHook.each(function (callback) { + callback(); + return true; + }); + } + } else if (doc.me === self._primary) { + // The thing we thought was primary is now something other than + // primary. Forget that we thought it was primary. (This means that + // if a server stops being primary and then starts being primary again + // without another server becoming primary in the middle, we'll + // correctly count it as a failover.) + self._primary = null; + } + })); - Fiber(function () { - // drain queue of pending callbacks - _.each(self._connectCallbacks, function (c) { - c(db); - }); - }).run(); - }); + // drain queue of pending callbacks + _.each(self._connectCallbacks, function (c) { + c(db); + }); + })); self._docFetcher = new DocFetcher(self); self._oplogHandle = null; @@ -229,6 +254,12 @@ MongoConnection.prototype._maybeBeginWrite = function () { return {committed: function () {}}; }; +// Internal interface: adds a callback which is called when the Mongo primary +// changes. Returns a stop handle. +MongoConnection.prototype._onFailover = function (callback) { + return this._onFailoverHook.register(callback); +}; + //////////// Public API ////////// diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index ec6f508db4..bf3627c9cf 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -151,6 +151,13 @@ OplogObserveDriver = function (options) { } )); + // When Mongo fails over, we need to repoll the query, in case we processed an + // oplog entry that got rolled back. + self._stopHandles.push(self._mongoHandle._onFailover(finishIfNeedToPollQuery( + function () { + self._needToPollQuery(); + }))); + // Give _observeChanges a chance to add the new ObserveHandle to our // multiplexer, so that the added calls get streamed. Meteor.defer(finishIfNeedToPollQuery(function () { diff --git a/packages/mongo-livedata/package.js b/packages/mongo-livedata/package.js index 14283a6692..67f6581b59 100644 --- a/packages/mongo-livedata/package.js +++ b/packages/mongo-livedata/package.js @@ -44,6 +44,8 @@ Package.on_use(function (api) { // If the facts package is loaded, publish some statistics. api.use('facts', 'server', {weak: true}); + api.use('callback-hook', 'server'); + // Stuff that should be exposed via a real API, but we haven't yet. api.export('MongoInternals', 'server'); // For tests only.