move more oplog stuff out of mongo_driver.js

This commit is contained in:
David Glasser
2013-11-26 17:32:38 -08:00
parent aeac87285e
commit 4ff3004e3c
6 changed files with 255 additions and 252 deletions

View File

@@ -57,3 +57,5 @@ _.extend(DocFetcher.prototype, {
}).run();
}
});
MongoTest.DocFetcher = DocFetcher;

View File

@@ -13,6 +13,7 @@ var Fiber = Npm.require('fibers');
var Future = Npm.require(path.join('fibers', 'future'));
MongoInternals = {};
MongoTest = {};
var replaceNames = function (filter, thing) {
if (typeof thing === "object") {
@@ -228,216 +229,6 @@ MongoConnection.prototype._maybeBeginWrite = function () {
return {committed: function () {}};
};
var OPLOG_COLLECTION = 'oplog.rs';
// Like Perl's quotemeta: quotes all regexp metacharacters. See
// https://github.com/substack/quotemeta/blob/master/index.js
// XXX this is duplicated with accounts_server.js
var quotemeta = function (str) {
return String(str).replace(/(\W)/g, '\\$1');
};
var showTS = function (ts) {
return "Timestamp(" + ts.getHighBits() + ", " + ts.getLowBits() + ")";
};
MongoConnection.prototype._startOplogTailing = function (oplogUrl,
dbNameFuture) {
var self = this;
var oplogLastEntryConnection = null;
var oplogTailConnection = null;
var stopped = false;
var tailHandle = null;
var readyFuture = new Future();
var crossbar = new DDPServer._Crossbar({
factPackage: "mongo-livedata", factName: "oplog-watchers"
});
var lastProcessedTS = null;
// Lazily calculate the basic selector. Don't call baseOplogSelector() at the
// top level of this function, because we don't want this function to block.
var baseOplogSelector = _.once(function () {
return {
ns: new RegExp('^' + quotemeta(dbNameFuture.wait()) + '\\.'),
$or: [
{ op: {$in: ['i', 'u', 'd']} },
// If it is not db.collection.drop(), ignore it
{ op: 'c', 'o.drop': { $exists: true } }]
};
});
// XXX doc
var catchingUpFutures = [];
self._oplogHandle = {
stop: function () {
if (stopped)
return;
stopped = true;
if (tailHandle)
tailHandle.stop();
// XXX should close connections too
},
onOplogEntry: function (trigger, callback) {
if (stopped)
throw new Error("Called onOplogEntry on stopped handle!");
// Calling onOplogEntry requires us to wait for the tailing to be ready.
readyFuture.wait();
var originalCallback = callback;
callback = Meteor.bindEnvironment(function (notification, onComplete) {
// XXX can we avoid this clone by making oplog.js careful?
try {
originalCallback(EJSON.clone(notification));
} finally {
onComplete();
}
}, function (err) {
Meteor._debug("Error in oplog callback", err.stack);
});
var listenHandle = crossbar.listen(trigger, callback);
return {
stop: function () {
listenHandle.stop();
}
};
},
// Calls `callback` once the oplog has been processed up to a point that is
// roughly "now": specifically, once we've processed all ops that are
// currently visible.
// XXX become convinced that this is actually safe even if oplogConnection
// is some kind of pool
waitUntilCaughtUp: function () {
if (stopped)
throw new Error("Called waitUntilCaughtUp on stopped handle!");
// Calling waitUntilCaughtUp requries us to wait for the oplog connection
// to be ready.
readyFuture.wait();
// We need to make the selector at least as restrictive as the actual
// tailing selector (ie, we need to specify the DB name) or else we
// might find a TS that won't show up in the actual tail stream.
var lastEntry = oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, baseOplogSelector(),
{fields: {ts: 1}, sort: {$natural: -1}});
if (!lastEntry) {
// Really, nothing in the oplog? Well, we've processed everything.
return;
}
var ts = lastEntry.ts;
if (!ts)
throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry));
if (lastProcessedTS && ts.lessThanOrEqual(lastProcessedTS)) {
// We've already caught up to here.
return;
}
var insertAfter = catchingUpFutures.length;
while (insertAfter - 1 > 0
&& catchingUpFutures[insertAfter - 1].ts.greaterThan(ts)) {
insertAfter--;
}
// XXX this can occur if we fail over from one primary to another. so
// this check needs to be removed before we merge oplog. that said, it
// has been helpful so far at proving that we are properly using
// poolSize 1. Also, we could keep something like it if we could
// actually detect failover; see
// https://github.com/mongodb/node-mongodb-native/issues/1120
if (insertAfter !== catchingUpFutures.length) {
throw Error("found misordered oplog: "
+ showTS(_.last(catchingUpFutures).ts) + " vs "
+ showTS(ts));
}
var f = new Future;
catchingUpFutures.splice(insertAfter, 0, {ts: ts, future: f});
f.wait();
}
};
// Setting up the connections and tail handler is a blocking operation, so we
// do it "later".
Meteor.defer(function () {
// We make two separate connections to Mongo. The Node Mongo driver
// implements a naive round-robin connection pool: each "connection" is a
// pool of several (5 by default) TCP connections, and each request is
// rotated through the pools. Tailable cursor queries block on the server
// until there is some data to return (or until a few seconds have
// passed). So if the connection pool used for tailing cursors is the same
// pool used for other queries, the other queries will be delayed by seconds
// 1/5 of the time.
//
// The tail connection will only ever be running a single tail command, so
// it only needs to make one underlying TCP connection.
oplogTailConnection = new MongoConnection(oplogUrl, {poolSize: 1});
// XXX better docs, but: it's to get monotonic results
// XXX is it safe to say "if there's an in flight query, just use its
// results"? I don't think so but should consider that
oplogLastEntryConnection = new MongoConnection(oplogUrl, {poolSize: 1});
// Find the last oplog entry. Blocks until the connection is ready.
var lastOplogEntry = oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, {}, {sort: {$natural: -1}});
var dbName = dbNameFuture.wait();
var oplogSelector = _.clone(baseOplogSelector());
if (lastOplogEntry) {
// Start after the last entry that currently exists.
oplogSelector.ts = {$gt: lastOplogEntry.ts};
// If there are any calls to callWhenProcessedLatest before any other
// oplog entries show up, allow callWhenProcessedLatest to call its
// callback immediately.
lastProcessedTS = lastOplogEntry.ts;
}
var cursorDescription = new CursorDescription(
OPLOG_COLLECTION, oplogSelector, {tailable: true});
tailHandle = oplogTailConnection.tail(cursorDescription, function (doc) {
if (!(doc.ns && doc.ns.length > dbName.length + 1 &&
doc.ns.substr(0, dbName.length + 1) === (dbName + '.')))
throw new Error("Unexpected ns");
var trigger = {collection: doc.ns.substr(dbName.length + 1),
dropCollection: false,
op: doc};
// Is it a special command and the collection name is hidden somewhere in
// operator?
if (trigger.collection === "$cmd") {
trigger.collection = doc.o.drop;
trigger.dropCollection = true;
trigger.id = null;
} else {
// All other ops have an id.
trigger.id = idForOp(doc);
}
var f = new Future;
crossbar.fire(trigger, f.resolver());
f.wait();
// Now that we've processed this operation, process pending sequencers.
if (!doc.ts)
throw Error("oplog entry without ts: " + EJSON.stringify(doc));
lastProcessedTS = doc.ts;
while (!_.isEmpty(catchingUpFutures)
&& catchingUpFutures[0].ts.lessThanOrEqual(lastProcessedTS)) {
var sequencer = catchingUpFutures.shift();
sequencer.future.return();
}
});
readyFuture.return();
});
};
//////////// Public API //////////
@@ -851,7 +642,7 @@ MongoConnection.prototype._dropIndex = function (collectionName, index) {
// they start sending observeChanges callbacks (and a ready() invocation) to
// their ObserveMultiplexer, and you stop them by calling their stop() method.
var CursorDescription = function (collectionName, selector, options) {
CursorDescription = function (collectionName, selector, options) {
var self = this;
self.collectionName = collectionName;
self.selector = Meteor.Collection._rewriteSelector(selector);
@@ -1173,7 +964,7 @@ MongoConnection.prototype._observeChanges = function (
if (firstHandle) {
var driverClass = PollingObserveDriver;
if (self._oplogHandle && !ordered && !callbacks._testOnlyPollCallback
&& cursorSupportedByOplogTailing(cursorDescription)) {
&& OplogObserveDriver.cursorSupported(cursorDescription)) {
driverClass = OplogObserveDriver;
}
observeDriver = new driverClass({
@@ -1289,38 +1080,6 @@ MongoConnection.prototype._observeChangesTailable = function (
});
};
// Does our oplog tailing code support this cursor? For now, we are being very
// conservative and allowing only simple queries with simple options.
var cursorSupportedByOplogTailing = function (cursorDescription) {
// First, check the options.
var options = cursorDescription.options;
// This option (which are mostly used for sorted cursors) require us to figure
// out where a given document fits in an order to know if it's included or
// not, and we don't track that information when doing oplog tailing.
if (options.limit || options.skip) return false;
// For now, we're just dealing with equality queries: no $operators, regexps,
// or $and/$or/$where/etc clauses. We can expand the scope of what we're
// comfortable processing later. ($where will get pretty scary since it will
// allow selector processing to yield!)
return _.all(cursorDescription.selector, function (value, field) {
// No logical operators like $and.
if (field.substr(0, 1) === '$')
return false;
// We only allow scalars, not sub-documents or $operators or RegExp.
// XXX Date would be easy too, though I doubt anyone is doing equality
// lookups on dates
return typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean" ||
value === null ||
value instanceof Meteor.Collection.ObjectID;
});
};
// XXX We probably need to find a better way to expose this. Right now
// it's only used by tests, but in fact you need it in normal
// operation to interact with capped collections (eg, Galaxy uses it).
@@ -1328,8 +1087,3 @@ MongoInternals.MongoTimestamp = MongoDB.Timestamp;
MongoInternals.Connection = MongoConnection;
MongoInternals.NpmModule = MongoDB;
MongoTest = {
cursorSupportedByOplogTailing: cursorSupportedByOplogTailing,
DocFetcher: DocFetcher
};

View File

@@ -310,6 +310,38 @@ _.extend(OplogObserveDriver.prototype, {
}
});
// Does our oplog tailing code support this cursor? For now, we are being very
// conservative and allowing only simple queries with simple options.
// (This is a "static method".)
OplogObserveDriver.cursorSupported = function (cursorDescription) {
// First, check the options.
var options = cursorDescription.options;
// This option (which are mostly used for sorted cursors) require us to figure
// out where a given document fits in an order to know if it's included or
// not, and we don't track that information when doing oplog tailing.
if (options.limit || options.skip) return false;
// For now, we're just dealing with equality queries: no $operators, regexps,
// or $and/$or/$where/etc clauses. We can expand the scope of what we're
// comfortable processing later. ($where will get pretty scary since it will
// allow selector processing to yield!)
return _.all(cursorDescription.selector, function (value, field) {
// No logical operators like $and.
if (field.substr(0, 1) === '$')
return false;
// We only allow scalars, not sub-documents or $operators or RegExp.
// XXX Date would be easy too, though I doubt anyone is doing equality
// lookups on dates
return typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean" ||
value === null ||
value instanceof Meteor.Collection.ObjectID;
});
};
idForOp = function (op) {
if (op.op === 'd')
return op.o._id;
@@ -323,3 +355,5 @@ idForOp = function (op) {
else
throw Error("Unknown op: " + EJSON.stringify(op));
};
MongoTest.OplogObserveDriver = OplogObserveDriver;

View File

@@ -0,0 +1,211 @@
var Future = Npm.require('fibers/future');
var OPLOG_COLLECTION = 'oplog.rs';
// Like Perl's quotemeta: quotes all regexp metacharacters. See
// https://github.com/substack/quotemeta/blob/master/index.js
// XXX this is duplicated with accounts_server.js
var quotemeta = function (str) {
return String(str).replace(/(\W)/g, '\\$1');
};
var showTS = function (ts) {
return "Timestamp(" + ts.getHighBits() + ", " + ts.getLowBits() + ")";
};
MongoConnection.prototype._startOplogTailing = function (oplogUrl,
dbNameFuture) {
var self = this;
var oplogLastEntryConnection = null;
var oplogTailConnection = null;
var stopped = false;
var tailHandle = null;
var readyFuture = new Future();
var crossbar = new DDPServer._Crossbar({
factPackage: "mongo-livedata", factName: "oplog-watchers"
});
var lastProcessedTS = null;
// Lazily calculate the basic selector. Don't call baseOplogSelector() at the
// top level of this function, because we don't want this function to block.
var baseOplogSelector = _.once(function () {
return {
ns: new RegExp('^' + quotemeta(dbNameFuture.wait()) + '\\.'),
$or: [
{ op: {$in: ['i', 'u', 'd']} },
// If it is not db.collection.drop(), ignore it
{ op: 'c', 'o.drop': { $exists: true } }]
};
});
// XXX doc
var catchingUpFutures = [];
self._oplogHandle = {
stop: function () {
if (stopped)
return;
stopped = true;
if (tailHandle)
tailHandle.stop();
// XXX should close connections too
},
onOplogEntry: function (trigger, callback) {
if (stopped)
throw new Error("Called onOplogEntry on stopped handle!");
// Calling onOplogEntry requires us to wait for the tailing to be ready.
readyFuture.wait();
var originalCallback = callback;
callback = Meteor.bindEnvironment(function (notification, onComplete) {
// XXX can we avoid this clone by making oplog.js careful?
try {
originalCallback(EJSON.clone(notification));
} finally {
onComplete();
}
}, function (err) {
Meteor._debug("Error in oplog callback", err.stack);
});
var listenHandle = crossbar.listen(trigger, callback);
return {
stop: function () {
listenHandle.stop();
}
};
},
// Calls `callback` once the oplog has been processed up to a point that is
// roughly "now": specifically, once we've processed all ops that are
// currently visible.
// XXX become convinced that this is actually safe even if oplogConnection
// is some kind of pool
waitUntilCaughtUp: function () {
if (stopped)
throw new Error("Called waitUntilCaughtUp on stopped handle!");
// Calling waitUntilCaughtUp requries us to wait for the oplog connection
// to be ready.
readyFuture.wait();
// We need to make the selector at least as restrictive as the actual
// tailing selector (ie, we need to specify the DB name) or else we
// might find a TS that won't show up in the actual tail stream.
var lastEntry = oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, baseOplogSelector(),
{fields: {ts: 1}, sort: {$natural: -1}});
if (!lastEntry) {
// Really, nothing in the oplog? Well, we've processed everything.
return;
}
var ts = lastEntry.ts;
if (!ts)
throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry));
if (lastProcessedTS && ts.lessThanOrEqual(lastProcessedTS)) {
// We've already caught up to here.
return;
}
var insertAfter = catchingUpFutures.length;
while (insertAfter - 1 > 0
&& catchingUpFutures[insertAfter - 1].ts.greaterThan(ts)) {
insertAfter--;
}
// XXX this can occur if we fail over from one primary to another. so
// this check needs to be removed before we merge oplog. that said, it
// has been helpful so far at proving that we are properly using
// poolSize 1. Also, we could keep something like it if we could
// actually detect failover; see
// https://github.com/mongodb/node-mongodb-native/issues/1120
if (insertAfter !== catchingUpFutures.length) {
throw Error("found misordered oplog: "
+ showTS(_.last(catchingUpFutures).ts) + " vs "
+ showTS(ts));
}
var f = new Future;
catchingUpFutures.splice(insertAfter, 0, {ts: ts, future: f});
f.wait();
}
};
// Setting up the connections and tail handler is a blocking operation, so we
// do it "later".
Meteor.defer(function () {
// We make two separate connections to Mongo. The Node Mongo driver
// implements a naive round-robin connection pool: each "connection" is a
// pool of several (5 by default) TCP connections, and each request is
// rotated through the pools. Tailable cursor queries block on the server
// until there is some data to return (or until a few seconds have
// passed). So if the connection pool used for tailing cursors is the same
// pool used for other queries, the other queries will be delayed by seconds
// 1/5 of the time.
//
// The tail connection will only ever be running a single tail command, so
// it only needs to make one underlying TCP connection.
oplogTailConnection = new MongoConnection(oplogUrl, {poolSize: 1});
// XXX better docs, but: it's to get monotonic results
// XXX is it safe to say "if there's an in flight query, just use its
// results"? I don't think so but should consider that
oplogLastEntryConnection = new MongoConnection(oplogUrl, {poolSize: 1});
// Find the last oplog entry. Blocks until the connection is ready.
var lastOplogEntry = oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, {}, {sort: {$natural: -1}});
var dbName = dbNameFuture.wait();
var oplogSelector = _.clone(baseOplogSelector());
if (lastOplogEntry) {
// Start after the last entry that currently exists.
oplogSelector.ts = {$gt: lastOplogEntry.ts};
// If there are any calls to callWhenProcessedLatest before any other
// oplog entries show up, allow callWhenProcessedLatest to call its
// callback immediately.
lastProcessedTS = lastOplogEntry.ts;
}
var cursorDescription = new CursorDescription(
OPLOG_COLLECTION, oplogSelector, {tailable: true});
tailHandle = oplogTailConnection.tail(cursorDescription, function (doc) {
if (!(doc.ns && doc.ns.length > dbName.length + 1 &&
doc.ns.substr(0, dbName.length + 1) === (dbName + '.')))
throw new Error("Unexpected ns");
var trigger = {collection: doc.ns.substr(dbName.length + 1),
dropCollection: false,
op: doc};
// Is it a special command and the collection name is hidden somewhere in
// operator?
if (trigger.collection === "$cmd") {
trigger.collection = doc.o.drop;
trigger.dropCollection = true;
trigger.id = null;
} else {
// All other ops have an id.
trigger.id = idForOp(doc);
}
var f = new Future;
crossbar.fire(trigger, f.resolver());
f.wait();
// Now that we've processed this operation, process pending sequencers.
if (!doc.ts)
throw Error("oplog entry without ts: " + EJSON.stringify(doc));
lastProcessedTS = doc.ts;
while (!_.isEmpty(catchingUpFutures)
&& catchingUpFutures[0].ts.lessThanOrEqual(lastProcessedTS)) {
var sequencer = catchingUpFutures.shift();
sequencer.future.return();
}
});
readyFuture.return();
});
};

View File

@@ -1,10 +1,10 @@
var OplogCollection = new Meteor.Collection("oplog-" + Random.id());
Tinytest.add("mongo-livedata - oplog - cursorSupportedByOplogTailing", function (test) {
Tinytest.add("mongo-livedata - oplog - cursorSupported", function (test) {
var supported = function (expected, selector) {
var cursor = OplogCollection.find(selector);
test.equal(
MongoTest.cursorSupportedByOplogTailing(cursor._cursorDescription),
MongoTest.OplogObserveDriver.cursorSupported(cursor._cursorDescription),
expected);
};

View File

@@ -42,7 +42,9 @@ Package.on_use(function (api) {
// For tests only.
api.export('MongoTest', 'server', {testOnly: true});
api.add_files(['doc_fetcher.js', 'mongo_driver.js', 'observe_multiplex.js',
api.add_files(['mongo_driver.js', 'oplog_tailing.js',
'observe_multiplex.js', 'doc_fetcher.js',
// XXX rename to have _observe_driver
'polling.js', 'oplog.js'], 'server');
api.add_files('local_collection_driver.js', ['client', 'server']);
api.add_files('remote_collection_driver.js', 'server');