better way of not using more fibers

This commit is contained in:
David Glasser
2013-10-25 00:29:37 -07:00
parent 60f7aa75cd
commit 70a28a6229
2 changed files with 46 additions and 52 deletions

View File

@@ -305,61 +305,58 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl,
// currently visible.
// XXX become convinced that this is actually safe even if oplogConnection
// is some kind of pool
callWhenProcessedLatest: function (callback) {
waitUntilProcessedLatest: function () {
if (stopped)
throw new Error("Called callWhenProcessedLatest on stopped handle!");
throw new Error("Called waitUntilProcessedLatest on stopped handle!");
// Calling onOplogEntry requries us to wait for the oplog connection to be
// ready.
readyFuture.wait();
var coll = oplogLastEntryConnection._getCollection(OPLOG_COLLECTION);
// We need to make the selector at least as restrictive as the actual
// tailing selector (ie, we need to specify the DB name) or else we
// might find a TS that won't show up in the actual tail stream.
coll.findOne(baseOplogSelector(), {fields: {ts: 1}, sort: {$natural: -1}}, function (err, lastEntry) {
if (err) {
console.log("OH NO ERROR", err)
// call callback anyway, I guess
callback();
return;
}
//
// We don't want to block here: the whole point is to call callback
// asynchronously!
var lastEntry = oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, baseOplogSelector(),
{fields: {ts: 1}, sort: {$natural: -1}});
if (!lastEntry) {
// Really, nothing in the oplog? Well, we've processed everything.
callback();
return;
}
var ts = lastEntry.ts;
if (!ts)
throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry));
if (!lastEntry) {
// Really, nothing in the oplog? Well, we've processed everything.
return;
}
if (lastProcessedTS && ts.lessThanOrEqual(lastProcessedTS)) {
// We've already caught up to here.
callback();
return;
}
var ts = lastEntry.ts;
if (!ts)
throw Error("oplog entry without ts: " + EJSON.stringify(lastEntry));
var insertAfter = pendingSequencers.length;
while (insertAfter - 1 > 0
&& pendingSequencers[insertAfter - 1].ts.greaterThan(ts)) {
insertAfter--;
}
if (lastProcessedTS && ts.lessThanOrEqual(lastProcessedTS)) {
// We've already caught up to here.
return;
}
// XXX this can occur if we fail over from one primary to another. so
// this check needs to be removed before we merge oplog. that said, it
// has been helpful so far at proving that we are properly using
// poolSize 1. Also, we could keep something like it if we could
// actually detect failover; see
// https://github.com/mongodb/node-mongodb-native/issues/1120
if (insertAfter !== pendingSequencers.length) {
throw Error("found misordered oplog: "
+ showTS(_.last(pendingSequencers).ts) + " vs "
+ showTS(ts));
}
var insertAfter = pendingSequencers.length;
while (insertAfter - 1 > 0
&& pendingSequencers[insertAfter - 1].ts.greaterThan(ts)) {
insertAfter--;
}
pendingSequencers.splice(insertAfter, 0, {ts: ts, callback: callback});
});
// XXX this can occur if we fail over from one primary to another. so
// this check needs to be removed before we merge oplog. that said, it
// has been helpful so far at proving that we are properly using
// poolSize 1. Also, we could keep something like it if we could
// actually detect failover; see
// https://github.com/mongodb/node-mongodb-native/issues/1120
if (insertAfter !== pendingSequencers.length) {
throw Error("found misordered oplog: "
+ showTS(_.last(pendingSequencers).ts) + " vs "
+ showTS(ts));
}
var f = new Future;
pendingSequencers.splice(insertAfter, 0, {ts: ts, future: f});
f.wait();
}
};
@@ -420,7 +417,7 @@ MongoConnection.prototype._startOplogTailing = function (oplogUrl,
while (!_.isEmpty(pendingSequencers)
&& pendingSequencers[0].ts.lessThanOrEqual(lastProcessedTS)) {
var sequencer = pendingSequencers.shift();
sequencer.callback();
sequencer.future.return();
}
});
readyFuture.return();

View File

@@ -206,13 +206,12 @@ MongoConnection.prototype._observeChangesWithOplog = function (
var write = fence.beginWrite();
// This write cannot complete until we've caught up to "this point" in the
// oplog, and then made it back to the steady state.
self._oplogHandle.callWhenProcessedLatest(function () {
if (stopped || phase === PHASE.STEADY)
write.committed();
else
writesToCommitWhenWeReachSteady.push(write);
});
complete();
Meteor.defer(complete);
self._oplogHandle.waitUntilProcessedLatest();
if (stopped || phase === PHASE.STEADY)
write.committed();
else
writesToCommitWhenWeReachSteady.push(write);
}
);
@@ -221,9 +220,7 @@ MongoConnection.prototype._observeChangesWithOplog = function (
add(initialDoc);
});
var catchUpFuture = new Future;
self._oplogHandle.callWhenProcessedLatest(catchUpFuture.resolver());
catchUpFuture.wait();
self._oplogHandle.waitUntilProcessedLatest();
if (phase !== PHASE.INITIALIZING)
throw Error("Phase unexpectedly " + phase);