Don't yield in oplog entry handler

This commit is contained in:
David Glasser
2014-01-23 21:45:57 -08:00
parent 8eca012c32
commit d868325b83

View File

@@ -51,19 +51,21 @@ OplogObserveDriver = function (options) {
forEachTrigger(self._cursorDescription, function (trigger) {
self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry(
trigger, function (notification) {
var op = notification.op;
if (notification.dropCollection) {
// Note: this call is not allowed to block on anything (especially on
// waiting for oplog entries to catch up) because that will block
// onOplogEntry!
self._needToPollQuery();
} else {
// All other operators should be handled depending on phase
if (self._phase === PHASE.QUERYING)
self._handleOplogEntryQuerying(op);
else
self._handleOplogEntrySteadyOrFetching(op);
}
Meteor._noYieldsAllowed(function () {
var op = notification.op;
if (notification.dropCollection) {
// Note: this call is not allowed to block on anything (especially
// on waiting for oplog entries to catch up) because that will block
// onOplogEntry!
self._needToPollQuery();
} else {
// All other operators should be handled depending on phase
if (self._phase === PHASE.QUERYING)
self._handleOplogEntryQuerying(op);
else
self._handleOplogEntrySteadyOrFetching(op);
}
});
}
));
});
@@ -156,50 +158,55 @@ _.extend(OplogObserveDriver.prototype, {
_fetchModifiedDocuments: function () {
var self = this;
self._registerPhaseChange(PHASE.FETCHING);
while (!self._stopped && !self._needToFetch.empty()) {
if (self._phase !== PHASE.FETCHING)
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
// Defer, because nothing called from the oplog entry handler may yield, but
// fetch() yields.
Meteor.defer(function () {
while (!self._stopped && !self._needToFetch.empty()) {
if (self._phase !== PHASE.FETCHING)
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
self._currentlyFetching = self._needToFetch;
var thisGeneration = ++self._fetchGeneration;
self._needToFetch = new LocalCollection._IdMap;
var waiting = 0;
var anyError = null;
var fut = new Future;
// This loop is safe, because _currentlyFetching will not be updated
// during this loop (in fact, it is never mutated).
self._currentlyFetching.forEach(function (cacheKey, id) {
waiting++;
self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName, id, cacheKey,
function (err, doc) {
if (err) {
if (!anyError)
anyError = err;
} else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
// _pollQuery call which should effectively cancel this round of
// fetches. (_pollQuery increments the generation.)
self._handleDoc(id, doc);
}
waiting--;
// Because fetch() never calls its callback synchronously, this is
// safe (ie, we won't call fut.return() before the forEach is done).
if (waiting === 0)
fut.return();
});
});
fut.wait();
// XXX do this even if we've switched to PHASE.QUERYING?
if (anyError)
throw anyError;
// Exit now if we've had a _pollQuery call.
if (self._phase === PHASE.QUERYING)
return;
self._currentlyFetching = null;
}
self._beSteady();
self._currentlyFetching = self._needToFetch;
var thisGeneration = ++self._fetchGeneration;
self._needToFetch = new LocalCollection._IdMap;
var waiting = 0;
var anyError = null;
var fut = new Future;
// This loop is safe, because _currentlyFetching will not be updated
// during this loop (in fact, it is never mutated).
self._currentlyFetching.forEach(function (cacheKey, id) {
waiting++;
self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName, id, cacheKey,
function (err, doc) {
if (err) {
if (!anyError)
anyError = err;
} else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
// _pollQuery call which should effectively cancel this round of
// fetches. (_pollQuery increments the generation.)
self._handleDoc(id, doc);
}
waiting--;
// Because fetch() never calls its callback synchronously, this is
// safe (ie, we won't call fut.return() before the forEach is
// done).
if (waiting === 0)
fut.return();
});
});
fut.wait();
// XXX do this even if we've switched to PHASE.QUERYING?
if (anyError)
throw anyError;
// Exit now if we've had a _pollQuery call.
if (self._phase === PHASE.QUERYING)
return;
self._currentlyFetching = null;
}
self._beSteady();
});
},
_beSteady: function () {
var self = this;