Fix oplog_observe_driver

This commit is contained in:
harryadel
2024-08-24 13:40:28 +03:00
parent fa10502b30
commit 277cae2e1a

View File

@@ -192,25 +192,20 @@ _.extend(OplogObserveDriver.prototype, {
}
}
});
});
}
));
// When Mongo fails over, we need to repoll the query, in case we processed an
// oplog entry that got rolled back.
self._stopHandles.push(self._mongoHandle._onFailover(finishIfNeedToPollQuery(
function () {
self._needToPollQuery();
})));
// Give _observeChanges a chance to add the new ObserveHandle to our
// multiplexer, so that the added calls get streamed.
Meteor.defer(finishIfNeedToPollQuery(function () {
self._runInitialQuery();
}));
};
_.extend(OplogObserveDriver.prototype, {
}
));
// When Mongo fails over, we need to repoll the query, in case we processed an
// oplog entry that got rolled back.
self._addStopHandles(self._mongoHandle._onFailover(finishIfNeedToPollQuery(
function () {
return self._needToPollQuery();
})));
// Give _observeChanges a chance to add the new ObserveHandle to our
// multiplexer, so that the added calls get streamed.
return self._runInitialQuery();
},
_addPublished: function (id, doc) {
var self = this;
Meteor._noYieldsAllowed(function () {
@@ -584,15 +579,17 @@ _.extend(OplogObserveDriver.prototype, {
},
_beSteady: async function () {
var self = this;
Meteor._noYieldsAllowed(function () {
self._registerPhaseChange(PHASE.STEADY);
var writes = self._writesToCommitWhenWeReachSteady;
self._writesToCommitWhenWeReachSteady = [];
self._multiplexer.onFlush(function () {
_.each(writes, function (w) {
w.committed();
});
});
self._registerPhaseChange(PHASE.STEADY);
var writes = self._writesToCommitWhenWeReachSteady || [];
self._writesToCommitWhenWeReachSteady = [];
await self._multiplexer.onFlush(async function () {
try {
for (const w of writes) {
await w.committed();
}
} catch (e) {
console.error("_beSteady error", {writes}, e);
}
});
},
_handleOplogEntryQuerying: function (op) {
@@ -956,18 +953,15 @@ _.extend(OplogObserveDriver.prototype, {
if (self._stopped)
return;
self._stopped = true;
_.each(self._stopHandles, function (handle) {
handle.stop();
});
// Note: we *don't* use multiplexer.onFlush here because this stop
// callback is actually invoked by the multiplexer itself when it has
// determined that there are no handles left. So nothing is actually going
// to get flushed (and it's probably not valid to call methods on the
// dying multiplexer).
_.each(self._writesToCommitWhenWeReachSteady, function (w) {
w.committed(); // maybe yields?
});
for (const w of self._writesToCommitWhenWeReachSteady) {
await w.committed();
}
self._writesToCommitWhenWeReachSteady = null;
// Proactively drop references to potentially big things.