From 78b280ef88aef02d96691b5ecf514ecbf6123957 Mon Sep 17 00:00:00 2001 From: David Glasser Date: Fri, 16 May 2014 12:40:01 -0700 Subject: [PATCH] Fix PollingObserveDriver error handling The getRawObjects call can throw (eg, if you can't connect to the mongo server for too long). A few pieces of state were being corrupted in that case: - self._results was being set too early, leading to 'first' not being set on future _pollMongo calls, and_multiplexer.ready() never being called. This had two effects: - The observe (and thus any subscription) would never become ready(). Due to deduping, *no observe on this query* would ever become ready either. This also implies that the observeChanges that are part of _publishCursor would never return, so the sub.onStop would never get called, so the observeHandle would never stop, leading not only to leaks, but for an inability for that query to ever stop being deduped with the corrupted PollingObserveDriver! - The onFlush calls would throw a "not ready" error instead of calling the callback, so (a) errors would be logged and (b) write fences would never be closed Fixed this by not writing to self._results at the top of the function. - writesForCycle was being lost, so those write fences would never close. Fixed this by pushing writesForCycle back onto _pendingWrites if getRawObjects throws. --- .../mongo-livedata/polling_observe_driver.js | 39 +++++++++++++------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/packages/mongo-livedata/polling_observe_driver.js b/packages/mongo-livedata/polling_observe_driver.js index c1c700d0c1..9bfca7ec44 100644 --- a/packages/mongo-livedata/polling_observe_driver.js +++ b/packages/mongo-livedata/polling_observe_driver.js @@ -126,10 +126,11 @@ _.extend(PollingObserveDriver.prototype, { --self._pollsScheduledButNotStarted; var first = false; - if (!self._results) { + var oldResults = self._results; + if (!oldResults) { first = true; // XXX maybe use OrderedDict instead? - self._results = self._ordered ? [] : new LocalCollection._IdMap; + oldResults = self._ordered ? [] : new LocalCollection._IdMap; } self._testOnlyPollCallback && self._testOnlyPollCallback(); @@ -138,25 +139,39 @@ _.extend(PollingObserveDriver.prototype, { var writesForCycle = self._pendingWrites; self._pendingWrites = []; - // Get the new query results. (These calls can yield.) - if (!first) - self._synchronousCursor.rewind(); - var newResults = self._synchronousCursor.getRawObjects(self._ordered); - var oldResults = self._results; + // Always rewind the cursor; it's a no-op the first time, but better safe + // than sorry (eg, if the first call to getRawObjects throws, the cursor + // needs rewinding even though 'first' is true). + self._synchronousCursor.rewind(); - // Run diffs. (This can yield too.) + // Get the new query results. (This yields.) + try { + var newResults = self._synchronousCursor.getRawObjects(self._ordered); + } catch (e) { + // getRawObjects can throw if we're having trouble talking to the + // database. That's fine --- we will repoll later anyway. But we should + // make sure not to lose track of this cycle's writes. + Array.prototype.push.apply(self._pendingWrites, writesForCycle); + throw e; + } + + // Run diffs. if (!self._stopped) { LocalCollection._diffQueryChanges( self._ordered, oldResults, newResults, self._multiplexer); } - // Replace self._results atomically. - self._results = newResults; - - // Signals the multiplexer to call all initial adds. + // Signals the multiplexer to allow all observeChanges calls that share this + // multiplexer to return. (This happens asynchronously, via the + // multiplexer's queue.) if (first) self._multiplexer.ready(); + // Replace self._results atomically. (This assignment is what makes `first` + // stay through on the next cycle, so we've waited until after we've + // committed to ready-ing the multiplexer.) + self._results = newResults; + // Once the ObserveMultiplexer has processed everything we've done in this // round, mark all the writes which existed before this call as // commmitted. (If new writes have shown up in the meantime, there'll