mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
Don't yield in oplog entry handler
This commit is contained in:
@@ -51,19 +51,21 @@ OplogObserveDriver = function (options) {
|
|||||||
forEachTrigger(self._cursorDescription, function (trigger) {
|
forEachTrigger(self._cursorDescription, function (trigger) {
|
||||||
self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry(
|
self._stopHandles.push(self._mongoHandle._oplogHandle.onOplogEntry(
|
||||||
trigger, function (notification) {
|
trigger, function (notification) {
|
||||||
var op = notification.op;
|
Meteor._noYieldsAllowed(function () {
|
||||||
if (notification.dropCollection) {
|
var op = notification.op;
|
||||||
// Note: this call is not allowed to block on anything (especially on
|
if (notification.dropCollection) {
|
||||||
// waiting for oplog entries to catch up) because that will block
|
// Note: this call is not allowed to block on anything (especially
|
||||||
// onOplogEntry!
|
// on waiting for oplog entries to catch up) because that will block
|
||||||
self._needToPollQuery();
|
// onOplogEntry!
|
||||||
} else {
|
self._needToPollQuery();
|
||||||
// All other operators should be handled depending on phase
|
} else {
|
||||||
if (self._phase === PHASE.QUERYING)
|
// All other operators should be handled depending on phase
|
||||||
self._handleOplogEntryQuerying(op);
|
if (self._phase === PHASE.QUERYING)
|
||||||
else
|
self._handleOplogEntryQuerying(op);
|
||||||
self._handleOplogEntrySteadyOrFetching(op);
|
else
|
||||||
}
|
self._handleOplogEntrySteadyOrFetching(op);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
));
|
));
|
||||||
});
|
});
|
||||||
@@ -156,50 +158,55 @@ _.extend(OplogObserveDriver.prototype, {
|
|||||||
_fetchModifiedDocuments: function () {
|
_fetchModifiedDocuments: function () {
|
||||||
var self = this;
|
var self = this;
|
||||||
self._registerPhaseChange(PHASE.FETCHING);
|
self._registerPhaseChange(PHASE.FETCHING);
|
||||||
while (!self._stopped && !self._needToFetch.empty()) {
|
// Defer, because nothing called from the oplog entry handler may yield, but
|
||||||
if (self._phase !== PHASE.FETCHING)
|
// fetch() yields.
|
||||||
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
|
Meteor.defer(function () {
|
||||||
|
while (!self._stopped && !self._needToFetch.empty()) {
|
||||||
|
if (self._phase !== PHASE.FETCHING)
|
||||||
|
throw new Error("phase in fetchModifiedDocuments: " + self._phase);
|
||||||
|
|
||||||
self._currentlyFetching = self._needToFetch;
|
self._currentlyFetching = self._needToFetch;
|
||||||
var thisGeneration = ++self._fetchGeneration;
|
var thisGeneration = ++self._fetchGeneration;
|
||||||
self._needToFetch = new LocalCollection._IdMap;
|
self._needToFetch = new LocalCollection._IdMap;
|
||||||
var waiting = 0;
|
var waiting = 0;
|
||||||
var anyError = null;
|
var anyError = null;
|
||||||
var fut = new Future;
|
var fut = new Future;
|
||||||
// This loop is safe, because _currentlyFetching will not be updated
|
// This loop is safe, because _currentlyFetching will not be updated
|
||||||
// during this loop (in fact, it is never mutated).
|
// during this loop (in fact, it is never mutated).
|
||||||
self._currentlyFetching.forEach(function (cacheKey, id) {
|
self._currentlyFetching.forEach(function (cacheKey, id) {
|
||||||
waiting++;
|
waiting++;
|
||||||
self._mongoHandle._docFetcher.fetch(
|
self._mongoHandle._docFetcher.fetch(
|
||||||
self._cursorDescription.collectionName, id, cacheKey,
|
self._cursorDescription.collectionName, id, cacheKey,
|
||||||
function (err, doc) {
|
function (err, doc) {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (!anyError)
|
if (!anyError)
|
||||||
anyError = err;
|
anyError = err;
|
||||||
} else if (!self._stopped && self._phase === PHASE.FETCHING
|
} else if (!self._stopped && self._phase === PHASE.FETCHING
|
||||||
&& self._fetchGeneration === thisGeneration) {
|
&& self._fetchGeneration === thisGeneration) {
|
||||||
// We re-check the generation in case we've had an explicit
|
// We re-check the generation in case we've had an explicit
|
||||||
// _pollQuery call which should effectively cancel this round of
|
// _pollQuery call which should effectively cancel this round of
|
||||||
// fetches. (_pollQuery increments the generation.)
|
// fetches. (_pollQuery increments the generation.)
|
||||||
self._handleDoc(id, doc);
|
self._handleDoc(id, doc);
|
||||||
}
|
}
|
||||||
waiting--;
|
waiting--;
|
||||||
// Because fetch() never calls its callback synchronously, this is
|
// Because fetch() never calls its callback synchronously, this is
|
||||||
// safe (ie, we won't call fut.return() before the forEach is done).
|
// safe (ie, we won't call fut.return() before the forEach is
|
||||||
if (waiting === 0)
|
// done).
|
||||||
fut.return();
|
if (waiting === 0)
|
||||||
});
|
fut.return();
|
||||||
});
|
});
|
||||||
fut.wait();
|
});
|
||||||
// XXX do this even if we've switched to PHASE.QUERYING?
|
fut.wait();
|
||||||
if (anyError)
|
// XXX do this even if we've switched to PHASE.QUERYING?
|
||||||
throw anyError;
|
if (anyError)
|
||||||
// Exit now if we've had a _pollQuery call.
|
throw anyError;
|
||||||
if (self._phase === PHASE.QUERYING)
|
// Exit now if we've had a _pollQuery call.
|
||||||
return;
|
if (self._phase === PHASE.QUERYING)
|
||||||
self._currentlyFetching = null;
|
return;
|
||||||
}
|
self._currentlyFetching = null;
|
||||||
self._beSteady();
|
}
|
||||||
|
self._beSteady();
|
||||||
|
});
|
||||||
},
|
},
|
||||||
_beSteady: function () {
|
_beSteady: function () {
|
||||||
var self = this;
|
var self = this;
|
||||||
|
|||||||
Reference in New Issue
Block a user