diff --git a/packages/mongo/oplog_observe_driver.js b/packages/mongo/oplog_observe_driver.js index cb67ac7fc5..e9b7bcebeb 100644 --- a/packages/mongo/oplog_observe_driver.js +++ b/packages/mongo/oplog_observe_driver.js @@ -525,37 +525,30 @@ Object.assign(OplogObserveDriver.prototype, { self._currentlyFetching = self._needToFetch; var thisGeneration = ++self._fetchGeneration; self._needToFetch = new LocalCollection._IdMap; - var waiting = 0; - let promiseResolver = null; - const awaitablePromise = new Promise(r => promiseResolver = r); - // This loop is safe, because _currentlyFetching will not be updated - // during this loop (in fact, it is never mutated). - await self._currentlyFetching.forEachAsync(async function (op, id) { - waiting++; - await self._mongoHandle._docFetcher.fetch( - self._cursorDescription.collectionName, - id, - op, - finishIfNeedToPollQuery(function(err, doc) { - if (err) { - Meteor._debug('Got exception while fetching documents', err); - // If we get an error from the fetcher (eg, trouble - // connecting to Mongo), let's just abandon the fetch phase - // altogether and fall back to polling. It's not like we're - // getting live updates anyway. - if (self._phase !== PHASE.QUERYING) { - self._needToPollQuery(); + // Create an array of promises for all the fetch operations + const fetchPromises = []; + + self._currentlyFetching.forEach(function (op, id) { + const fetchPromise = new Promise((resolve, reject) => { + self._mongoHandle._docFetcher.fetch( + self._cursorDescription.collectionName, + id, + op, + finishIfNeedToPollQuery(function(err, doc) { + if (err) { + Meteor._debug('Got exception while fetching documents', err); + // If we get an error from the fetcher (eg, trouble + // connecting to Mongo), let's just abandon the fetch phase + // altogether and fall back to polling. It's not like we're + // getting live updates anyway. + if (self._phase !== PHASE.QUERYING) { + self._needToPollQuery(); + } + reject(err); + return; } - waiting--; - // Because fetch() never calls its callback synchronously, - // this is safe (ie, we won't call fut.return() before the - // forEach is done). - if (waiting === 0) promiseResolver(); - return; - } - try { if ( !self._stopped && self._phase === PHASE.FETCHING && @@ -565,20 +558,26 @@ Object.assign(OplogObserveDriver.prototype, { // _pollQuery call (eg, in another fiber) which should // effectively cancel this round of fetches. (_pollQuery // increments the generation.) - - self._handleDoc(id, doc); + try { + self._handleDoc(id, doc); + resolve(); + } catch (err) { + reject(err); + } + } else { + resolve(); } - } finally { - waiting--; - // Because fetch() never calls its callback synchronously, - // this is safe (ie, we won't call fut.return() before the - // forEach is done). - if (waiting === 0) promiseResolver(); - } - }) - ); + }) + ) + }) + fetchPromises.push(fetchPromise); }); - await awaitablePromise; + // Wait for all fetch operations to complete + try { + await Promise.all(fetchPromises); + } catch (err) { + Meteor._debug('Got an exception in a fetch query', err); + } // Exit now if we've had a _pollQuery call (here or in another fiber). if (self._phase === PHASE.QUERYING) return;