replace sequencer writes with reads.

This commit is contained in:
David Glasser
2013-10-09 18:13:11 -07:00
parent f24be0684f
commit aae9fc23db
2 changed files with 75 additions and 60 deletions

View File

@@ -219,13 +219,6 @@ MongoConnection.prototype._maybeBeginWrite = function () {
var OPLOG_COLLECTION = 'oplog.rs';
var SEQUENCE_COLLECTION = 'meteor_livedata_Sequencer';
// XXX This is problematic if our RNG isn't seeded well enough.
var myServerId = Random.id();
var nextSequenceId = 1;
// XXX doc
var pendingSequences = [];
// Like Perl's quotemeta: quotes all regexp metacharacters. See
// https://github.com/substack/quotemeta/blob/master/index.js
// XXX this is duplicated with accounts_server.js
@@ -233,34 +226,23 @@ var quotemeta = function (str) {
return String(str).replace(/(\W)/g, '\\$1');
};
// Calls `callback` once the oplog has been processed up to a point that is
// roughly "now". Specifically, it does a dummy write which is then detected
// by the connection's oplog tailer.
// XXX This could be a read instead of a write, getting the last `ts`
// in oplog?
MongoConnection.prototype._callWhenOplogProcessed = function (callback) {
var self = this;
var sequenceId = nextSequenceId++;
pendingSequences.push({sequenceId: sequenceId,
callback: callback});
// Use direct write to Node Mongo driver so we don't end up with recursive
// fence stuff. Need to disable 'safe' because we aren't providing a callback.
var writeCollection = self._getCollection(SEQUENCE_COLLECTION);
writeCollection.update({_id: myServerId}, {$set: {sequence: sequenceId}},
{upsert: true, safe: false});
};
MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) {
var self = this;
var stopped = false;
var oplogConnection = null;
var tailHandle = null;
var readyFuture = new Future();
var nextId = 0;
var callbacksByCollection = {};
var lastProcessedTS = null;
var baseOplogSelector = {
ns: new RegExp('^' + quotemeta(dbName) + '\\.'),
// XXX also handle drop collection, etc
op: {$in: ['i', 'u', 'd']}
};
// XXX doc
var pendingSequencers = [];
self._oplogHandle = {
stop: function () {
@@ -290,47 +272,75 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) {
delete callbacksByCollection[collectionName][callbackId];
}
};
},
// Calls `callback` once the oplog has been processed up to a point that is
// roughly "now": specifically, once we've processed all ops that are
// currently visible.
// XXX become convinced that this is actually safe even if oplogConnection
// is some kind of pool
callWhenProcessedLatest: function (callback) {
if (stopped)
throw new Error("Called callWhenProcessedLatest on stopped handle!");
// Calling onOplogEntry requries us to wait for the oplog connection to be
// ready.
readyFuture.wait();
// Except for during startup, we DON'T block.
Fiber(function () {
// We need to make the selector at least as restrictive as the actual
// tailing selector (ie, we need to specify the DB name) or else we
// might find a TS that won't show up in the actual tail stream.
var lastEntry = oplogConnection.findOne(
OPLOG_COLLECTION, baseOplogSelector, {sort: {$natural: -1}});
if (!lastEntry) {
// Really, nothing in the oplog? Well, we've processed everything.
callback();
return;
}
var ts = lastEntry.ts;
if (!ts)
throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry));
if (lastProcessedTS && ts.lessThanOrEqual(lastProcessedTS)) {
// We've already caught up to here.
callback();
return;
}
if (!_.isEmpty(pendingSequencers)
&& _.last(pendingSequencers).ts.greaterThan(ts)) {
throw Error("found misordered oplog");
}
pendingSequencers.push({ts: ts,
callback: callback});
}).run();
}
};
// Actually setting up the connection and tail blocks, so we do it "later".
Meteor.defer(function () {
var oplogConnection = new MongoConnection(oplogUrl);
oplogConnection = new MongoConnection(oplogUrl);
// Find the last oplog entry. Blocks until the connection is ready.
var lastOplogEntry = oplogConnection.findOne(
OPLOG_COLLECTION, {}, {sort: {$natural: -1}});
var oplogSelector = {
ns: new RegExp('^' + quotemeta(dbName) + '\\.'),
// XXX also handle drop collection, etc
op: {$in: ['i', 'u', 'd']}
};
if (lastOplogEntry)
var oplogSelector = _.clone(baseOplogSelector);
if (lastOplogEntry) {
// Start after the last entry that currently exists.
oplogSelector.ts = {$gt: lastOplogEntry.ts};
// If there are any calls to callWhenProcessedLatest before any other
// oplog entries show up, allow callWhenProcessedLatest to call its
// callback immediately.
lastProcessedTS = lastOplogEntry.ts;
}
var cursorDescription = new CursorDescription(
OPLOG_COLLECTION, oplogSelector, {tailable: true});
var processSequence = function (doc) {
if (doc.op !== 'i' && doc.op !== 'u')
return;
var serverId = (doc.op === 'i' ? doc.o._id : doc.o2._id);
if (serverId !== myServerId)
return;
var sequenceId =
(doc.op === 'i' ? doc.o.sequence :
(doc.o.$set && doc.o.$set.sequence));
if (typeof sequenceId !== 'number')
return;
// Process all sequence points up to this point.
while (!_.isEmpty(pendingSequences)
&& pendingSequences[0].sequenceId <= sequenceId) {
var sequence = pendingSequences.shift();
sequence.callback();
}
};
tailHandle = oplogConnection.tail(cursorDescription, function (doc) {
if (!(doc.ns && doc.ns.length > dbName.length + 1 &&
doc.ns.substr(0, dbName.length + 1) === (dbName + '.')))
@@ -338,14 +348,19 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl, dbName) {
var collectionName = doc.ns.substr(dbName.length + 1);
if (collectionName === SEQUENCE_COLLECTION) {
processSequence(doc);
return;
}
_.each(callbacksByCollection[collectionName], function (callback) {
callback(EJSON.clone(doc));
});
// Now that we've processed this operation, process pending sequencers.
if (!doc.ts)
throw Error("oplog entry without ts: " + EJSON.stringify(doc));
lastProcessedTS = doc.ts;
while (!_.isEmpty(pendingSequencers)
&& pendingSequencers[0].ts.lessThanOrEqual(lastProcessedTS)) {
var sequencer = pendingSequencers.shift();
sequencer.callback();
}
});
readyFuture.return();
});

View File

@@ -172,7 +172,7 @@ MongoConnection.prototype._observeChangesWithOplog = function (
}
var write = fence.beginWrite();
// XXX this also has to wait for steady!!!
self._callWhenOplogProcessed(function () {
self._oplogHandle.callWhenProcessedLatest(function () {
write.committed();
});
complete();
@@ -185,7 +185,7 @@ MongoConnection.prototype._observeChangesWithOplog = function (
});
var catchUpFuture = new Future;
self._callWhenOplogProcessed(catchUpFuture.resolver());
self._oplogHandle.callWhenProcessedLatest(catchUpFuture.resolver());
catchUpFuture.wait();
if (phase !== PHASE.INITIALIZING)