From e6c87236a6286664e43b75235be419ef0229ebcc Mon Sep 17 00:00:00 2001 From: David Glasser Date: Mon, 16 Dec 2013 15:14:43 -0800 Subject: [PATCH] Fix "drop collection" test failure There was a race condition in the manipulation of the write fence by the drop collection code. Specifically, when seeing a "drop collection" oplog entry, OplogObserveDriver took no immediate action and instead deferred some "should I re-poll now or later?" logic. This allowed the write fence's oplogHandle.waitUntilCaughtUp code to think that everything was "caught up" even though the effects of the "drop collection" entry had not yet affected the cursor's state (by transitioning it away from the STEADY state). The fix is to ensure that the state-change aspects of processing the entry occur immediately; the actual re-query is still deferred in order to not block the oplog tailer from continuing. Reported by: @awwx (The previous failure can made consistently reproducible by replacing the `Meteor.defer` that is removed by this commit with a `Meteor.setTimeout(,200)`.) --- .../mongo-livedata/oplog_observe_driver.js | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index dbd44c310b..e4bbaccb73 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -55,11 +55,10 @@ OplogObserveDriver = function (options) { trigger, function (notification) { var op = notification.op; if (notification.dropCollection) { - // Defer because it may block on "wait for oplog to catch up", which - // isn't kosher for an oplog entry handler (will cause deadlock). - Meteor.defer(function () { - self._needToPollQuery(); - }); + // Note: this call is not allowed to block on anything (especially on + // waiting for oplog entries to catch up) because that will block + // onOplogEntry! + self._needToPollQuery(); } else { // All other operators should be handled depending on phase if (self._phase === PHASE.QUERYING) @@ -295,6 +294,9 @@ _.extend(OplogObserveDriver.prototype, { // In various circumstances, we may just want to stop processing the oplog and // re-run the initial query, just as if we were a PollingObserveDriver. // + // This function may not block, because it is called from an oplog entry + // handler. + // // XXX We should call this when we detect that we've been in FETCHING for "too // long". // @@ -315,18 +317,27 @@ _.extend(OplogObserveDriver.prototype, { ++self._fetchGeneration; // ignore any in-flight fetches self._phase = PHASE.QUERYING; - // subtle note: _published does not contain _id fields, but newResults does - var newResults = new LocalCollection._IdMap; - var cursor = self._cursorForQuery(); - cursor.forEach(function (doc) { - newResults.set(doc._id, doc); + // Defer so that we don't block. + Meteor.defer(function () { + // subtle note: _published does not contain _id fields, but newResults + // does + var newResults = new LocalCollection._IdMap; + var cursor = self._cursorForQuery(); + cursor.forEach(function (doc) { + newResults.set(doc._id, doc); + }); + + self._publishNewResults(newResults); + + self._doneQuerying(); }); - - self._publishNewResults(newResults); - - self._doneQuerying(); }, + // Transitions to QUERYING and runs another query, or (if already in QUERYING) + // ensures that we will query again later. + // + // This function may not block, because it is called from an oplog entry + // handler. _needToPollQuery: function () { var self = this; if (self._stopped)