diff --git a/packages/mongo-livedata/oplog_observe_driver.js b/packages/mongo-livedata/oplog_observe_driver.js index 6f5411a272..9a4e5e9cc6 100644 --- a/packages/mongo-livedata/oplog_observe_driver.js +++ b/packages/mongo-livedata/oplog_observe_driver.js @@ -156,30 +156,29 @@ _.extend(OplogObserveDriver.prototype, { self._currentlyFetching = self._needToFetch; self._needToFetch = new LocalCollection._IdMap; var waiting = 0; - var error = null; + var anyError = null; var fut = new Future; - Fiber(function () { - self._currentlyFetching.forEach(function (cacheKey, id) { - // currentlyFetching will not be updated during this loop. - waiting++; - self._mongoHandle._docFetcher.fetch( - self._cursorDescription.collectionName, id, cacheKey, - function (err, doc) { - if (err) { - if (!error) - error = err; - } else if (!self._stopped) { - self._handleDoc(id, doc); - } - waiting--; - if (waiting == 0) - fut.return(); - }); - }); - }).run(); + // This loop is safe, because _currentlyFetching will not be updated + // during this loop (in fact, it is never mutated). + self._currentlyFetching.forEach(function (cacheKey, id) { + waiting++; + self._mongoHandle._docFetcher.fetch( + self._cursorDescription.collectionName, id, cacheKey, + function (err, doc) { + if (err) { + if (!anyError) + anyError = err; + } else if (!self._stopped) { + self._handleDoc(id, doc); + } + waiting--; + if (waiting == 0) + fut.return(); + }); + }); fut.wait(); - if (error) - throw error; + if (anyError) + throw anyError; self._currentlyFetching = new LocalCollection._IdMap; } self._beSteady();