Fix tests for Oplog

This commit is contained in:
Edimar Cardoso
2023-01-06 10:49:44 -03:00
parent 1634bf4519
commit 8a5631cb12
4 changed files with 47 additions and 41 deletions

View File

@@ -62,6 +62,10 @@ export class DocFetcher {
} finally {
// XXX consider keeping the doc around for a period of time before
// removing from the cache
const evEmmiter = self._callbacksForOp.get(op);
if (evEmmiter && evEmmiter.removeAllListeners) {
evEmmiter.removeAllListeners();
}
self._callbacksForOp.delete(op);
}
}

View File

@@ -226,7 +226,7 @@ _.each( ['STRING'], function(idGeneration) {
);
Tinytest.addAsync("mongo-livedata - basics, " + idGeneration, async function (test) {
Tinytest.addAsync("mongo-livedata - basics, " + idGeneration, async function (test, onComplete) {
var run = test.runId();
var coll, coll2;
if (Meteor.isClient) {
@@ -270,8 +270,8 @@ _.each( ['STRING'], function(idGeneration) {
var expectObserve = async function (expected, f) {
if (!(expected instanceof Array))
expected = [expected];
test.include(expected, await captureObserve(f));
const currentValue = await captureObserve(f);
test.include(expected, currentValue);
};
test.equal(await coll.find({run: run}).count(), 0);
@@ -350,8 +350,8 @@ _.each( ['STRING'], function(idGeneration) {
await expectObserve('c(3,0,1)c(6,1,4)', async function () {
var count = await coll.update({run: run}, {$inc: {x: 2}}, {multi: true});
test.equal(count, 2);
test.equal(_.pluck(await coll.find({run: run}, {sort: {x: -1}}).fetch(), "x"),
[6, 3]);
const res = _.pluck(await coll.find({run: run}, {sort: {x: -1}}).fetch(), 'x');
test.equal(res, [6, 3]);
});
await expectObserve(['c(13,0,3)m(13,0,1)', 'm(6,1,0)c(13,1,3)',
@@ -863,7 +863,7 @@ _.each( ['STRING'], function(idGeneration) {
// TODO -> Also uses oplog
// This test mainly checks the correctness of oplog code dealing with limited
// queries. Compitablity with poll-diff is added as well.
Tinytest.addAsync("mongo-livedata - observe sorted, limited " + idGeneration, async function (test) {
Tinytest.addAsync("mongo-livedata - observe sorted, limited " + idGeneration, async function (test, onComplete) {
var run = test.runId();
var coll = new Mongo.Collection("observeLimit-"+run, collectionOptions);
@@ -1025,7 +1025,7 @@ _.each( ['STRING'], function(idGeneration) {
// Remove first 4 docs (3, 1, 2, 4) forcing buffer to become empty and
// schedule a repoll.
await rem({ bar: { $lt: 10 } });
await rem({ bar: { $lt: 10 } }, '{ bar: { $lt: 10 } }');
// State: [ 17:8 18:7 19:6 | ]!
// XXX the oplog code analyzes the events one by one: one remove after
@@ -1130,6 +1130,7 @@ _.each( ['STRING'], function(idGeneration) {
testSafeAppendToBufferFlag(false);
await o.handle.stop();
onComplete();
});
// TODO -> Also uses oplog
Tinytest.addAsync("mongo-livedata - observe sorted, limited, sort fields " + idGeneration, async function (test) {
@@ -1487,9 +1488,7 @@ _.each( ['STRING'], function(idGeneration) {
testAsyncMulti('mongo-livedata - transform sets _id if not present, ' + idGeneration, [
async function (test, expect) {
var self = this;
var justId = function (doc) {
console.log({doc});
return _.omit(doc, '_id');
};
TRANSFORMS["justId"] = justId;
@@ -1589,14 +1588,11 @@ _.each( ['STRING'], function(idGeneration) {
var self = this;
await self.coll.update(
self.docId, new Dog("rover", "orange")).then(id => {
console.log({id});
expect();
}).catch(err => {
console.log({err});
test.isTrue(err);
expect();
});
//console.log({expect});
}
]);

View File

@@ -127,11 +127,11 @@ ObserveMultiplexer = class {
// all handles. "ready" must have already been called on this multiplexer.
async onFlush(cb) {
var self = this;
return await this._queue.queueTask(async function () {
//return await this._queue.queueTask(async function () {
if (!self._ready())
throw Error("only call onFlush on a multiplexer that will be ready");
await cb();
});
//});
}
callbackNames() {
if (this._ordered)
@@ -142,9 +142,9 @@ ObserveMultiplexer = class {
_ready() {
return !!this._isReady;
}
_applyCallback(callbackName, args) {
async _applyCallback(callbackName, args) {
const self = this;
this._queue.queueTask(async function () {
//this._queue.queueTask(async function () {
// If we stopped in the meantime, do nothing.
if (!self._handles)
return;
@@ -174,7 +174,7 @@ ObserveMultiplexer = class {
});
await Promise.all(toAwait);
});
//});
}
// Sends initial adds to a handle. It should only be called from within a task

View File

@@ -505,15 +505,31 @@ _.extend(OplogObserveDriver.prototype, {
const awaitablePromise = new Promise(r => promiseResolver = r);
// This loop is safe, because _currentlyFetching will not be updated
// during this loop (in fact, it is never mutated).
self._currentlyFetching.forEach(function (op, id) {
await self._currentlyFetching.forEachAsync(async function (op, id) {
waiting++;
self._mongoHandle._docFetcher.fetch(
self._cursorDescription.collectionName, id, op,
finishIfNeedToPollQuery(function (err, doc) {
try {
if (err) {
await self._mongoHandle._docFetcher.fetch(self._cursorDescription.collectionName, id, op)
.then(finishIfNeedToPollQuery(function (doc) {
try {
if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
// _pollQuery call (eg, in another fiber) which should
// effectively cancel this round of fetches. (_pollQuery
// increments the generation.)
self._handleDoc(id, doc);
}
} finally {
waiting--;
// Because fetch() never calls its callback synchronously,
// this is safe (ie, we won't call fut.return() before the
// forEach is done).
if (waiting === 0)
promiseResolver();
}
})).catch(err => {
Meteor._debug("Got exception while fetching documents",
err);
err);
// If we get an error from the fetcher (eg, trouble
// connecting to Mongo), let's just abandon the fetch phase
// altogether and fall back to polling. It's not like we're
@@ -521,23 +537,13 @@ _.extend(OplogObserveDriver.prototype, {
if (self._phase !== PHASE.QUERYING) {
self._needToPollQuery();
}
} else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
// _pollQuery call (eg, in another fiber) which should
// effectively cancel this round of fetches. (_pollQuery
// increments the generation.)
self._handleDoc(id, doc);
}
} finally {
waiting--;
// Because fetch() never calls its callback synchronously,
// this is safe (ie, we won't call fut.return() before the
// forEach is done).
if (waiting === 0)
promiseResolver();
}
}));
waiting--;
// Because fetch() never calls its callback synchronously,
// this is safe (ie, we won't call fut.return() before the
// forEach is done).
if (waiting === 0)
promiseResolver();
});
});
await awaitablePromise;
// Exit now if we've had a _pollQuery call (here or in another fiber).