13599 - Removed the waiting variable in favor of using promises to keep track of fetch requests. This was done to mitigate a race condition where a single fetch could increment waiting while the callback could be called twice, potentially decrementing waiting to -1 leaving the driver stuck in PHASE.FETCHING

This commit is contained in:
Gavin Buerk
2025-02-03 09:50:19 -05:00
parent 46dd3d4cc5
commit 451f42901d

View File

@@ -525,37 +525,30 @@ Object.assign(OplogObserveDriver.prototype, {
self._currentlyFetching = self._needToFetch;
var thisGeneration = ++self._fetchGeneration;
self._needToFetch = new LocalCollection._IdMap;
var waiting = 0;
let promiseResolver = null;
const awaitablePromise = new Promise(r => promiseResolver = r);
// This loop is safe, because _currentlyFetching will not be updated
// during this loop (in fact, it is never mutated).
await self._currentlyFetching.forEachAsync(async function (op, id) {
waiting++;
await self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName,
id,
op,
finishIfNeedToPollQuery(function(err, doc) {
if (err) {
Meteor._debug('Got exception while fetching documents', err);
// If we get an error from the fetcher (eg, trouble
// connecting to Mongo), let's just abandon the fetch phase
// altogether and fall back to polling. It's not like we're
// getting live updates anyway.
if (self._phase !== PHASE.QUERYING) {
self._needToPollQuery();
// Create an array of promises for all the fetch operations
const fetchPromises = [];
self._currentlyFetching.forEach(function (op, id) {
const fetchPromise = new Promise((resolve, reject) => {
self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName,
id,
op,
finishIfNeedToPollQuery(function(err, doc) {
if (err) {
Meteor._debug('Got exception while fetching documents', err);
// If we get an error from the fetcher (eg, trouble
// connecting to Mongo), let's just abandon the fetch phase
// altogether and fall back to polling. It's not like we're
// getting live updates anyway.
if (self._phase !== PHASE.QUERYING) {
self._needToPollQuery();
}
reject(err);
return;
}
waiting--;
// Because fetch() never calls its callback synchronously,
// this is safe (ie, we won't call fut.return() before the
// forEach is done).
if (waiting === 0) promiseResolver();
return;
}
try {
if (
!self._stopped &&
self._phase === PHASE.FETCHING &&
@@ -565,20 +558,26 @@ Object.assign(OplogObserveDriver.prototype, {
// _pollQuery call (eg, in another fiber) which should
// effectively cancel this round of fetches. (_pollQuery
// increments the generation.)
self._handleDoc(id, doc);
try {
self._handleDoc(id, doc);
resolve();
} catch (err) {
reject(err);
}
} else {
resolve();
}
} finally {
waiting--;
// Because fetch() never calls its callback synchronously,
// this is safe (ie, we won't call fut.return() before the
// forEach is done).
if (waiting === 0) promiseResolver();
}
})
);
})
)
})
fetchPromises.push(fetchPromise);
});
await awaitablePromise;
// Wait for all fetch operations to complete
try {
await Promise.all(fetchPromises);
} catch (err) {
Meteor._debug('Got an exception in a fetch query', err);
}
// Exit now if we've had a _pollQuery call (here or in another fiber).
if (self._phase === PHASE.QUERYING)
return;