Don't yield in oplog entry handler

This commit is contained in:
David Glasser
2014-01-23 21:45:57 -08:00
parent 8eca012c32
commit d868325b83

View File

@@ -51,19 +51,21 @@ OplogObserveDriver = function (options) {
forEachTrigger(self._cursorDescription, function (trigger) { forEachTrigger(self._cursorDescription, function (trigger) {
self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry( self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry(
trigger, function (notification) { trigger, function (notification) {
var op = notification.op; Meteor._noYieldsAllowed(function () {
if (notification.dropCollection) { var op = notification.op;
// Note: this call is not allowed to block on anything (especially on if (notification.dropCollection) {
// waiting for oplog entries to catch up) because that will block // Note: this call is not allowed to block on anything (especially
// onOplogEntry! // on waiting for oplog entries to catch up) because that will block
self._needToPollQuery(); // onOplogEntry!
} else { self._needToPollQuery();
// All other operators should be handled depending on phase } else {
if (self._phase === PHASE.QUERYING) // All other operators should be handled depending on phase
self._handleOplogEntryQuerying(op); if (self._phase === PHASE.QUERYING)
else self._handleOplogEntryQuerying(op);
self._handleOplogEntrySteadyOrFetching(op); else
} self._handleOplogEntrySteadyOrFetching(op);
}
});
} }
)); ));
}); });
@@ -156,50 +158,55 @@ _.extend(OplogObserveDriver.prototype, {
_fetchModifiedDocuments: function () { _fetchModifiedDocuments: function () {
var self = this; var self = this;
self._registerPhaseChange(PHASE.FETCHING); self._registerPhaseChange(PHASE.FETCHING);
while (!self._stopped && !self._needToFetch.empty()) { // Defer, because nothing called from the oplog entry handler may yield, but
if (self._phase !== PHASE.FETCHING) // fetch() yields.
throw new Error("phase in fetchModifiedDocuments: " + self._phase); Meteor.defer(function () {
while (!self._stopped && !self._needToFetch.empty()) {
if (self._phase !== PHASE.FETCHING)
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
self._currentlyFetching = self._needToFetch; self._currentlyFetching = self._needToFetch;
var thisGeneration = ++self._fetchGeneration; var thisGeneration = ++self._fetchGeneration;
self._needToFetch = new LocalCollection._IdMap; self._needToFetch = new LocalCollection._IdMap;
var waiting = 0; var waiting = 0;
var anyError = null; var anyError = null;
var fut = new Future; var fut = new Future;
// This loop is safe, because _currentlyFetching will not be updated // This loop is safe, because _currentlyFetching will not be updated
// during this loop (in fact, it is never mutated). // during this loop (in fact, it is never mutated).
self._currentlyFetching.forEach(function (cacheKey, id) { self._currentlyFetching.forEach(function (cacheKey, id) {
waiting++; waiting++;
self._mongoHandle._docFetcher.fetch( self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName, id, cacheKey, self._cursorDescription.collectionName, id, cacheKey,
function (err, doc) { function (err, doc) {
if (err) { if (err) {
if (!anyError) if (!anyError)
anyError = err; anyError = err;
} else if (!self._stopped && self._phase === PHASE.FETCHING } else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) { && self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit // We re-check the generation in case we've had an explicit
// _pollQuery call which should effectively cancel this round of // _pollQuery call which should effectively cancel this round of
// fetches. (_pollQuery increments the generation.) // fetches. (_pollQuery increments the generation.)
self._handleDoc(id, doc); self._handleDoc(id, doc);
} }
waiting--; waiting--;
// Because fetch() never calls its callback synchronously, this is // Because fetch() never calls its callback synchronously, this is
// safe (ie, we won't call fut.return() before the forEach is done). // safe (ie, we won't call fut.return() before the forEach is
if (waiting === 0) // done).
fut.return(); if (waiting === 0)
}); fut.return();
}); });
fut.wait(); });
// XXX do this even if we've switched to PHASE.QUERYING? fut.wait();
if (anyError) // XXX do this even if we've switched to PHASE.QUERYING?
throw anyError; if (anyError)
// Exit now if we've had a _pollQuery call. throw anyError;
if (self._phase === PHASE.QUERYING) // Exit now if we've had a _pollQuery call.
return; if (self._phase === PHASE.QUERYING)
self._currentlyFetching = null; return;
} self._currentlyFetching = null;
self._beSteady(); }
self._beSteady();
});
}, },
_beSteady: function () { _beSteady: function () {
var self = this; var self = this;