diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index dbd44c310b..e4bbaccb73 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -55,11 +55,10 @@ OplogObserveDriver = function (options) { trigger, function (notification) { var op = notification.op; if (notification.dropCollection) { - // Defer because it may block on "wait for oplog to catch up", which - // isn't kosher for an oplog entry handler (will cause deadlock). - Meteor.defer(function () { - self._needToPollQuery(); - }); + // Note: this call is not allowed to block on anything (especially on + // waiting for oplog entries to catch up) because that will block + // onOplogEntry! + self._needToPollQuery(); } else { // All other operators should be handled depending on phase if (self._phase === PHASE.QUERYING) @@ -295,6 +294,9 @@ _.extend(OplogObserveDriver.prototype, { // In various circumstances, we may just want to stop processing the oplog and // re-run the initial query, just as if we were a PollingObserveDriver. // + // This function may not block, because it is called from an oplog entry + // handler. + // // XXX We should call this when we detect that we've been in FETCHING for "too // long". // @@ -315,18 +317,27 @@ _.extend(OplogObserveDriver.prototype, { ++self._fetchGeneration; // ignore any in-flight fetches self._phase = PHASE.QUERYING; - // subtle note: _published does not contain _id fields, but newResults does - var newResults = new LocalCollection._IdMap; - var cursor = self._cursorForQuery(); - cursor.forEach(function (doc) { - newResults.set(doc._id, doc); + // Defer so that we don't block. + Meteor.defer(function () { + // subtle note: _published does not contain _id fields, but newResults + // does + var newResults = new LocalCollection._IdMap; + var cursor = self._cursorForQuery(); + cursor.forEach(function (doc) { + newResults.set(doc._id, doc); + }); + + self._publishNewResults(newResults); + + self._doneQuerying(); }); - - self._publishNewResults(newResults); - - self._doneQuerying(); }, + // Transitions to QUERYING and runs another query, or (if already in QUERYING) + // ensures that we will query again later. + // + // This function may not block, because it is called from an oplog entry + // handler. _needToPollQuery: function () { var self = this; if (self._stopped)