mirror of
https://github.com/meteor/meteor.git
synced 2026-05-02 03:01:46 -04:00
Fix PollingObserveDriver error handling
The getRawObjects call can throw (eg, if you can't connect to the mongo
server for too long). A few pieces of state were being corrupted in
that case:
- self._results was being set too early, leading to 'first' not being
set on future _pollMongo calls, and_multiplexer.ready() never being
called. This had two effects:
- The observe (and thus any subscription) would never become
ready(). Due to deduping, *no observe on this query* would
ever become ready either. This also implies that the
observeChanges that are part of _publishCursor would never return,
so the sub.onStop would never get called, so the observeHandle
would never stop, leading not only to leaks, but for an inability
for that query to ever stop being deduped with the corrupted
PollingObserveDriver!
- The onFlush calls would throw a "not ready" error instead of
calling the callback, so (a) errors would be logged and (b) write
fences would never be closed
Fixed this by not writing to self._results at the top of the function.
- writesForCycle was being lost, so those write fences would never
close. Fixed this by pushing writesForCycle back onto _pendingWrites
if getRawObjects throws.
This commit is contained in:
@@ -126,10 +126,11 @@ _.extend(PollingObserveDriver.prototype, {
|
||||
--self._pollsScheduledButNotStarted;
|
||||
|
||||
var first = false;
|
||||
if (!self._results) {
|
||||
var oldResults = self._results;
|
||||
if (!oldResults) {
|
||||
first = true;
|
||||
// XXX maybe use OrderedDict instead?
|
||||
self._results = self._ordered ? [] : new LocalCollection._IdMap;
|
||||
oldResults = self._ordered ? [] : new LocalCollection._IdMap;
|
||||
}
|
||||
|
||||
self._testOnlyPollCallback && self._testOnlyPollCallback();
|
||||
@@ -138,25 +139,39 @@ _.extend(PollingObserveDriver.prototype, {
|
||||
var writesForCycle = self._pendingWrites;
|
||||
self._pendingWrites = [];
|
||||
|
||||
// Get the new query results. (These calls can yield.)
|
||||
if (!first)
|
||||
self._synchronousCursor.rewind();
|
||||
var newResults = self._synchronousCursor.getRawObjects(self._ordered);
|
||||
var oldResults = self._results;
|
||||
// Always rewind the cursor; it's a no-op the first time, but better safe
|
||||
// than sorry (eg, if the first call to getRawObjects throws, the cursor
|
||||
// needs rewinding even though 'first' is true).
|
||||
self._synchronousCursor.rewind();
|
||||
|
||||
// Run diffs. (This can yield too.)
|
||||
// Get the new query results. (This yields.)
|
||||
try {
|
||||
var newResults = self._synchronousCursor.getRawObjects(self._ordered);
|
||||
} catch (e) {
|
||||
// getRawObjects can throw if we're having trouble talking to the
|
||||
// database. That's fine --- we will repoll later anyway. But we should
|
||||
// make sure not to lose track of this cycle's writes.
|
||||
Array.prototype.push.apply(self._pendingWrites, writesForCycle);
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Run diffs.
|
||||
if (!self._stopped) {
|
||||
LocalCollection._diffQueryChanges(
|
||||
self._ordered, oldResults, newResults, self._multiplexer);
|
||||
}
|
||||
|
||||
// Replace self._results atomically.
|
||||
self._results = newResults;
|
||||
|
||||
// Signals the multiplexer to call all initial adds.
|
||||
// Signals the multiplexer to allow all observeChanges calls that share this
|
||||
// multiplexer to return. (This happens asynchronously, via the
|
||||
// multiplexer's queue.)
|
||||
if (first)
|
||||
self._multiplexer.ready();
|
||||
|
||||
// Replace self._results atomically. (This assignment is what makes `first`
|
||||
// stay through on the next cycle, so we've waited until after we've
|
||||
// committed to ready-ing the multiplexer.)
|
||||
self._results = newResults;
|
||||
|
||||
// Once the ObserveMultiplexer has processed everything we've done in this
|
||||
// round, mark all the writes which existed before this call as
|
||||
// commmitted. (If new writes have shown up in the meantime, there'll
|
||||
|
||||
Reference in New Issue
Block a user