mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
Don't yield in oplog entry handler
This commit is contained in:
@@ -51,19 +51,21 @@ OplogObserveDriver = function (options) {
|
||||
forEachTrigger(self._cursorDescription, function (trigger) {
|
||||
self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry(
|
||||
trigger, function (notification) {
|
||||
var op = notification.op;
|
||||
if (notification.dropCollection) {
|
||||
// Note: this call is not allowed to block on anything (especially on
|
||||
// waiting for oplog entries to catch up) because that will block
|
||||
// onOplogEntry!
|
||||
self._needToPollQuery();
|
||||
} else {
|
||||
// All other operators should be handled depending on phase
|
||||
if (self._phase === PHASE.QUERYING)
|
||||
self._handleOplogEntryQuerying(op);
|
||||
else
|
||||
self._handleOplogEntrySteadyOrFetching(op);
|
||||
}
|
||||
Meteor._noYieldsAllowed(function () {
|
||||
var op = notification.op;
|
||||
if (notification.dropCollection) {
|
||||
// Note: this call is not allowed to block on anything (especially
|
||||
// on waiting for oplog entries to catch up) because that will block
|
||||
// onOplogEntry!
|
||||
self._needToPollQuery();
|
||||
} else {
|
||||
// All other operators should be handled depending on phase
|
||||
if (self._phase === PHASE.QUERYING)
|
||||
self._handleOplogEntryQuerying(op);
|
||||
else
|
||||
self._handleOplogEntrySteadyOrFetching(op);
|
||||
}
|
||||
});
|
||||
}
|
||||
));
|
||||
});
|
||||
@@ -156,50 +158,55 @@ _.extend(OplogObserveDriver.prototype, {
|
||||
_fetchModifiedDocuments: function () {
|
||||
var self = this;
|
||||
self._registerPhaseChange(PHASE.FETCHING);
|
||||
while (!self._stopped && !self._needToFetch.empty()) {
|
||||
if (self._phase !== PHASE.FETCHING)
|
||||
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
|
||||
// Defer, because nothing called from the oplog entry handler may yield, but
|
||||
// fetch() yields.
|
||||
Meteor.defer(function () {
|
||||
while (!self._stopped && !self._needToFetch.empty()) {
|
||||
if (self._phase !== PHASE.FETCHING)
|
||||
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
|
||||
|
||||
self._currentlyFetching = self._needToFetch;
|
||||
var thisGeneration = ++self._fetchGeneration;
|
||||
self._needToFetch = new LocalCollection._IdMap;
|
||||
var waiting = 0;
|
||||
var anyError = null;
|
||||
var fut = new Future;
|
||||
// This loop is safe, because _currentlyFetching will not be updated
|
||||
// during this loop (in fact, it is never mutated).
|
||||
self._currentlyFetching.forEach(function (cacheKey, id) {
|
||||
waiting++;
|
||||
self._mongoHandle._docFetcher.fetch(
|
||||
self._cursorDescription.collectionName, id, cacheKey,
|
||||
function (err, doc) {
|
||||
if (err) {
|
||||
if (!anyError)
|
||||
anyError = err;
|
||||
} else if (!self._stopped && self._phase === PHASE.FETCHING
|
||||
&& self._fetchGeneration === thisGeneration) {
|
||||
// We re-check the generation in case we've had an explicit
|
||||
// _pollQuery call which should effectively cancel this round of
|
||||
// fetches. (_pollQuery increments the generation.)
|
||||
self._handleDoc(id, doc);
|
||||
}
|
||||
waiting--;
|
||||
// Because fetch() never calls its callback synchronously, this is
|
||||
// safe (ie, we won't call fut.return() before the forEach is done).
|
||||
if (waiting === 0)
|
||||
fut.return();
|
||||
});
|
||||
});
|
||||
fut.wait();
|
||||
// XXX do this even if we've switched to PHASE.QUERYING?
|
||||
if (anyError)
|
||||
throw anyError;
|
||||
// Exit now if we've had a _pollQuery call.
|
||||
if (self._phase === PHASE.QUERYING)
|
||||
return;
|
||||
self._currentlyFetching = null;
|
||||
}
|
||||
self._beSteady();
|
||||
self._currentlyFetching = self._needToFetch;
|
||||
var thisGeneration = ++self._fetchGeneration;
|
||||
self._needToFetch = new LocalCollection._IdMap;
|
||||
var waiting = 0;
|
||||
var anyError = null;
|
||||
var fut = new Future;
|
||||
// This loop is safe, because _currentlyFetching will not be updated
|
||||
// during this loop (in fact, it is never mutated).
|
||||
self._currentlyFetching.forEach(function (cacheKey, id) {
|
||||
waiting++;
|
||||
self._mongoHandle._docFetcher.fetch(
|
||||
self._cursorDescription.collectionName, id, cacheKey,
|
||||
function (err, doc) {
|
||||
if (err) {
|
||||
if (!anyError)
|
||||
anyError = err;
|
||||
} else if (!self._stopped && self._phase === PHASE.FETCHING
|
||||
&& self._fetchGeneration === thisGeneration) {
|
||||
// We re-check the generation in case we've had an explicit
|
||||
// _pollQuery call which should effectively cancel this round of
|
||||
// fetches. (_pollQuery increments the generation.)
|
||||
self._handleDoc(id, doc);
|
||||
}
|
||||
waiting--;
|
||||
// Because fetch() never calls its callback synchronously, this is
|
||||
// safe (ie, we won't call fut.return() before the forEach is
|
||||
// done).
|
||||
if (waiting === 0)
|
||||
fut.return();
|
||||
});
|
||||
});
|
||||
fut.wait();
|
||||
// XXX do this even if we've switched to PHASE.QUERYING?
|
||||
if (anyError)
|
||||
throw anyError;
|
||||
// Exit now if we've had a _pollQuery call.
|
||||
if (self._phase === PHASE.QUERYING)
|
||||
return;
|
||||
self._currentlyFetching = null;
|
||||
}
|
||||
self._beSteady();
|
||||
});
|
||||
},
|
||||
_beSteady: function () {
|
||||
var self = this;
|
||||
|
||||
Reference in New Issue
Block a user