mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
Re-poll all OplogObserveDrivers on mongo failover
This commit is contained in:
@@ -113,6 +113,7 @@ MongoConnection = function (url, options) {
|
|||||||
options = options || {};
|
options = options || {};
|
||||||
self._connectCallbacks = [];
|
self._connectCallbacks = [];
|
||||||
self._observeMultiplexers = {};
|
self._observeMultiplexers = {};
|
||||||
|
self._onFailoverHook = new Hook;
|
||||||
|
|
||||||
var mongoOptions = {db: {safe: true}, server: {}, replSet: {}};
|
var mongoOptions = {db: {safe: true}, server: {}, replSet: {}};
|
||||||
|
|
||||||
@@ -144,18 +145,42 @@ MongoConnection = function (url, options) {
|
|||||||
mongoOptions.replSet.poolSize = options.poolSize;
|
mongoOptions.replSet.poolSize = options.poolSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
MongoDB.connect(url, mongoOptions, function(err, db) {
|
MongoDB.connect(url, mongoOptions, Meteor.bindEnvironment(function(err, db) {
|
||||||
if (err)
|
if (err)
|
||||||
throw err;
|
throw err;
|
||||||
self.db = db;
|
self.db = db;
|
||||||
|
// We keep track of the ReplSet's primary, so that we can trigger hooks when
|
||||||
|
// it changes. The Node driver's joined callback seems to fire way too
|
||||||
|
// often, which is why we need to track it ourselves.
|
||||||
|
self._primary = null;
|
||||||
|
// First, figure out what the current primary is, if any.
|
||||||
|
if (self.db.serverConfig._state.master)
|
||||||
|
self._primary = self.db.serverConfig._state.master.name;
|
||||||
|
self.db.serverConfig.on(
|
||||||
|
'joined', Meteor.bindEnvironment(function (kind, doc) {
|
||||||
|
if (kind === 'primary') {
|
||||||
|
if (doc.primary !== self._primary) {
|
||||||
|
self._primary = doc.primary;
|
||||||
|
self._onFailoverHook.each(function (callback) {
|
||||||
|
callback();
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else if (doc.me === self._primary) {
|
||||||
|
// The thing we thought was primary is now something other than
|
||||||
|
// primary. Forget that we thought it was primary. (This means that
|
||||||
|
// if a server stops being primary and then starts being primary again
|
||||||
|
// without another server becoming primary in the middle, we'll
|
||||||
|
// correctly count it as a failover.)
|
||||||
|
self._primary = null;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
Fiber(function () {
|
// drain queue of pending callbacks
|
||||||
// drain queue of pending callbacks
|
_.each(self._connectCallbacks, function (c) {
|
||||||
_.each(self._connectCallbacks, function (c) {
|
c(db);
|
||||||
c(db);
|
});
|
||||||
});
|
}));
|
||||||
}).run();
|
|
||||||
});
|
|
||||||
|
|
||||||
self._docFetcher = new DocFetcher(self);
|
self._docFetcher = new DocFetcher(self);
|
||||||
self._oplogHandle = null;
|
self._oplogHandle = null;
|
||||||
@@ -229,6 +254,12 @@ MongoConnection.prototype._maybeBeginWrite = function () {
|
|||||||
return {committed: function () {}};
|
return {committed: function () {}};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Internal interface: adds a callback which is called when the Mongo primary
|
||||||
|
// changes. Returns a stop handle.
|
||||||
|
MongoConnection.prototype._onFailover = function (callback) {
|
||||||
|
return this._onFailoverHook.register(callback);
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
//////////// Public API //////////
|
//////////// Public API //////////
|
||||||
|
|
||||||
|
|||||||
@@ -151,6 +151,13 @@ OplogObserveDriver = function (options) {
|
|||||||
}
|
}
|
||||||
));
|
));
|
||||||
|
|
||||||
|
// When Mongo fails over, we need to repoll the query, in case we processed an
|
||||||
|
// oplog entry that got rolled back.
|
||||||
|
self._stopHandles.push(self._mongoHandle._onFailover(finishIfNeedToPollQuery(
|
||||||
|
function () {
|
||||||
|
self._needToPollQuery();
|
||||||
|
})));
|
||||||
|
|
||||||
// Give _observeChanges a chance to add the new ObserveHandle to our
|
// Give _observeChanges a chance to add the new ObserveHandle to our
|
||||||
// multiplexer, so that the added calls get streamed.
|
// multiplexer, so that the added calls get streamed.
|
||||||
Meteor.defer(finishIfNeedToPollQuery(function () {
|
Meteor.defer(finishIfNeedToPollQuery(function () {
|
||||||
|
|||||||
@@ -44,6 +44,8 @@ Package.on_use(function (api) {
|
|||||||
// If the facts package is loaded, publish some statistics.
|
// If the facts package is loaded, publish some statistics.
|
||||||
api.use('facts', 'server', {weak: true});
|
api.use('facts', 'server', {weak: true});
|
||||||
|
|
||||||
|
api.use('callback-hook', 'server');
|
||||||
|
|
||||||
// Stuff that should be exposed via a real API, but we haven't yet.
|
// Stuff that should be exposed via a real API, but we haven't yet.
|
||||||
api.export('MongoInternals', 'server');
|
api.export('MongoInternals', 'server');
|
||||||
// For tests only.
|
// For tests only.
|
||||||
|
|||||||
Reference in New Issue
Block a user