diff --git a/packages/mongo/oplog_observe_driver.js b/packages/mongo/oplog_observe_driver.js index 5b0afa2ab1..3e13047ba3 100644 --- a/packages/mongo/oplog_observe_driver.js +++ b/packages/mongo/oplog_observe_driver.js @@ -192,25 +192,20 @@ _.extend(OplogObserveDriver.prototype, { } } }); - }); - } - )); - - // When Mongo fails over, we need to repoll the query, in case we processed an - // oplog entry that got rolled back. - self._stopHandles.push(self._mongoHandle._onFailover(finishIfNeedToPollQuery( - function () { - self._needToPollQuery(); - }))); - - // Give _observeChanges a chance to add the new ObserveHandle to our - // multiplexer, so that the added calls get streamed. - Meteor.defer(finishIfNeedToPollQuery(function () { - self._runInitialQuery(); - })); -}; - -_.extend(OplogObserveDriver.prototype, { + } + )); + + // When Mongo fails over, we need to repoll the query, in case we processed an + // oplog entry that got rolled back. + self._addStopHandles(self._mongoHandle._onFailover(finishIfNeedToPollQuery( + function () { + return self._needToPollQuery(); + }))); + + // Give _observeChanges a chance to add the new ObserveHandle to our + // multiplexer, so that the added calls get streamed. + return self._runInitialQuery(); + }, _addPublished: function (id, doc) { var self = this; Meteor._noYieldsAllowed(function () { @@ -584,15 +579,17 @@ _.extend(OplogObserveDriver.prototype, { }, _beSteady: async function () { var self = this; - Meteor._noYieldsAllowed(function () { - self._registerPhaseChange(PHASE.STEADY); - var writes = self._writesToCommitWhenWeReachSteady; - self._writesToCommitWhenWeReachSteady = []; - self._multiplexer.onFlush(function () { - _.each(writes, function (w) { - w.committed(); - }); - }); + self._registerPhaseChange(PHASE.STEADY); + var writes = self._writesToCommitWhenWeReachSteady || []; + self._writesToCommitWhenWeReachSteady = []; + await self._multiplexer.onFlush(async function () { + try { + for (const w of writes) { + await w.committed(); + } + } catch (e) { + console.error("_beSteady error", {writes}, e); + } }); }, _handleOplogEntryQuerying: function (op) { @@ -956,18 +953,15 @@ _.extend(OplogObserveDriver.prototype, { if (self._stopped) return; self._stopped = true; - _.each(self._stopHandles, function (handle) { - handle.stop(); - }); // Note: we *don't* use multiplexer.onFlush here because this stop // callback is actually invoked by the multiplexer itself when it has // determined that there are no handles left. So nothing is actually going // to get flushed (and it's probably not valid to call methods on the // dying multiplexer). - _.each(self._writesToCommitWhenWeReachSteady, function (w) { - w.committed(); // maybe yields? - }); + for (const w of self._writesToCommitWhenWeReachSteady) { + await w.committed(); + } self._writesToCommitWhenWeReachSteady = null; // Proactively drop references to potentially big things.