mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
Re-poll all OplogObserveDrivers on mongo failover
This commit is contained in:
@@ -113,6 +113,7 @@ MongoConnection = function (url, options) {
|
||||
options = options || {};
|
||||
self._connectCallbacks = [];
|
||||
self._observeMultiplexers = {};
|
||||
self._onFailoverHook = new Hook;
|
||||
|
||||
var mongoOptions = {db: {safe: true}, server: {}, replSet: {}};
|
||||
|
||||
@@ -144,18 +145,42 @@ MongoConnection = function (url, options) {
|
||||
mongoOptions.replSet.poolSize = options.poolSize;
|
||||
}
|
||||
|
||||
MongoDB.connect(url, mongoOptions, function(err, db) {
|
||||
MongoDB.connect(url, mongoOptions, Meteor.bindEnvironment(function(err, db) {
|
||||
if (err)
|
||||
throw err;
|
||||
self.db = db;
|
||||
// We keep track of the ReplSet's primary, so that we can trigger hooks when
|
||||
// it changes. The Node driver's joined callback seems to fire way too
|
||||
// often, which is why we need to track it ourselves.
|
||||
self._primary = null;
|
||||
// First, figure out what the current primary is, if any.
|
||||
if (self.db.serverConfig._state.master)
|
||||
self._primary = self.db.serverConfig._state.master.name;
|
||||
self.db.serverConfig.on(
|
||||
'joined', Meteor.bindEnvironment(function (kind, doc) {
|
||||
if (kind === 'primary') {
|
||||
if (doc.primary !== self._primary) {
|
||||
self._primary = doc.primary;
|
||||
self._onFailoverHook.each(function (callback) {
|
||||
callback();
|
||||
return true;
|
||||
});
|
||||
}
|
||||
} else if (doc.me === self._primary) {
|
||||
// The thing we thought was primary is now something other than
|
||||
// primary. Forget that we thought it was primary. (This means that
|
||||
// if a server stops being primary and then starts being primary again
|
||||
// without another server becoming primary in the middle, we'll
|
||||
// correctly count it as a failover.)
|
||||
self._primary = null;
|
||||
}
|
||||
}));
|
||||
|
||||
Fiber(function () {
|
||||
// drain queue of pending callbacks
|
||||
_.each(self._connectCallbacks, function (c) {
|
||||
c(db);
|
||||
});
|
||||
}).run();
|
||||
});
|
||||
// drain queue of pending callbacks
|
||||
_.each(self._connectCallbacks, function (c) {
|
||||
c(db);
|
||||
});
|
||||
}));
|
||||
|
||||
self._docFetcher = new DocFetcher(self);
|
||||
self._oplogHandle = null;
|
||||
@@ -229,6 +254,12 @@ MongoConnection.prototype._maybeBeginWrite = function () {
|
||||
return {committed: function () {}};
|
||||
};
|
||||
|
||||
// Internal interface: adds a callback which is called when the Mongo primary
|
||||
// changes. Returns a stop handle.
|
||||
MongoConnection.prototype._onFailover = function (callback) {
|
||||
return this._onFailoverHook.register(callback);
|
||||
};
|
||||
|
||||
|
||||
//////////// Public API //////////
|
||||
|
||||
|
||||
@@ -151,6 +151,13 @@ OplogObserveDriver = function (options) {
|
||||
}
|
||||
));
|
||||
|
||||
// When Mongo fails over, we need to repoll the query, in case we processed an
|
||||
// oplog entry that got rolled back.
|
||||
self._stopHandles.push(self._mongoHandle._onFailover(finishIfNeedToPollQuery(
|
||||
function () {
|
||||
self._needToPollQuery();
|
||||
})));
|
||||
|
||||
// Give _observeChanges a chance to add the new ObserveHandle to our
|
||||
// multiplexer, so that the added calls get streamed.
|
||||
Meteor.defer(finishIfNeedToPollQuery(function () {
|
||||
|
||||
@@ -44,6 +44,8 @@ Package.on_use(function (api) {
|
||||
// If the facts package is loaded, publish some statistics.
|
||||
api.use('facts', 'server', {weak: true});
|
||||
|
||||
api.use('callback-hook', 'server');
|
||||
|
||||
// Stuff that should be exposed via a real API, but we haven't yet.
|
||||
api.export('MongoInternals', 'server');
|
||||
// For tests only.
|
||||
|
||||
Reference in New Issue
Block a user