- fix test: 'mongo-livedata - cursor dedup, MONGO | STRING'

This commit is contained in:
denihs
2023-02-13 18:34:00 -04:00
parent edc899df72
commit 2fc41f5eab
3 changed files with 103 additions and 95 deletions

View File

@@ -1,5 +1,5 @@
var MongoDB = NpmModuleMongodb;
var MongoDB = Meteor.isServer && NpmModuleMongodb;
Tinytest.add(
'collection - call Mongo.Collection without new',

View File

@@ -799,111 +799,118 @@ _.each( ['STRING', 'MONGO'], function(idGeneration) {
}
);
Tinytest.addAsync("mongo-livedata - cursor dedup, " + idGeneration, function (test, onComplete) {
var run = test.runId();
var coll = new Mongo.Collection("cursorDedup-"+run, collectionOptions);
Tinytest.addAsync(
'mongo-livedata - cursor dedup, ' + idGeneration,
async function(test, onComplete) {
var run = test.runId();
var coll = new Mongo.Collection(
'cursorDedup-' + run,
collectionOptions
);
var observer = function (noAdded) {
var output = [];
var callbacks = {
changed: function (newDoc) {
output.push({changed: newDoc._id});
}
};
if (!noAdded) {
callbacks.added = function (doc) {
output.push({added: doc._id});
const observer = async function(noAdded, name) {
const output = [];
const callbacks = {
changed: function(newDoc) {
output.push({ changed: newDoc._id });
},
};
}
var handle = coll.find({foo: 22}).observe(callbacks);
return {output: output, handle: handle};
};
if (!noAdded) {
callbacks.added = function(doc) {
output.push({ added: doc._id });
};
}
const handle = await coll.find({ foo: 22 }).observe(callbacks);
return { output: output, handle: handle };
};
// Insert a doc and start observing.
var docId1 = coll.insert({foo: 22});
var o1 = observer();
// Initial add.
test.length(o1.output, 1);
test.equal(o1.output.shift(), {added: docId1});
// Insert a doc and start observing.
var docId1 = await coll.insertAsync({ foo: 22 });
var o1 = await observer(false, 'o1');
// Initial add.
test.length(o1.output, 1);
test.equal(o1.output.shift(), { added: docId1 });
// Insert another doc (blocking until observes have fired).
var docId2;
runInFence(function () {
docId2 = coll.insert({foo: 22, bar: 5});
});
// Observed add.
test.length(o1.output, 1);
test.equal(o1.output.shift(), {added: docId2});
// Insert another doc (blocking until observes have fired).
var docId2;
await runInFence(async function() {
docId2 = await coll.insertAsync({ foo: 22, bar: 5 });
});
// Observed add.
test.length(o1.output, 1);
test.equal(o1.output.shift(), { added: docId2 });
// Second identical observe.
var o2 = observer();
// Initial adds.
test.length(o2.output, 2);
test.include([docId1, docId2], o2.output[0].added);
test.include([docId1, docId2], o2.output[1].added);
test.notEqual(o2.output[0].added, o2.output[1].added);
o2.output.length = 0;
// Original observe not affected.
test.length(o1.output, 0);
// Second identical observe.
var o2 = await observer(false, 'o2');
// White-box test: both observes should share an ObserveMultiplexer.
var observeMultiplexer = o1.handle._multiplexer;
test.isTrue(observeMultiplexer);
test.isTrue(observeMultiplexer === o2.handle._multiplexer);
// Initial adds.
test.length(o2.output, 2);
test.include([docId1, docId2], o2.output[0].added);
// Update. Both observes fire.
runInFence(function () {
coll.update(docId1, {$set: {x: 'y'}});
});
test.length(o1.output, 1);
test.length(o2.output, 1);
test.equal(o1.output.shift(), {changed: docId1});
test.equal(o2.output.shift(), {changed: docId1});
test.include([docId1, docId2], o2.output[1].added);
test.notEqual(o2.output[0].added, o2.output[1].added);
o2.output.length = 0;
// Original observe not affected.
test.length(o1.output, 0);
// Stop first handle. Second handle still around.
o1.handle.stop();
test.length(o1.output, 0);
test.length(o2.output, 0);
// White-box test: both observes should share an ObserveMultiplexer.
var observeMultiplexer = o1.handle._multiplexer;
test.isTrue(observeMultiplexer);
test.isTrue(observeMultiplexer === o2.handle._multiplexer);
// Another update. Just the second handle should fire.
runInFence(function () {
coll.update(docId2, {$set: {z: 'y'}});
});
test.length(o1.output, 0);
test.length(o2.output, 1);
test.equal(o2.output.shift(), {changed: docId2});
// Update. Both observes fire.
await runInFence(async function() {
await coll.updateAsync(docId1, { $set: { x: 'y' } });
});
test.length(o1.output, 1);
test.length(o2.output, 1);
test.equal(o1.output.shift(), { changed: docId1 });
test.equal(o2.output.shift(), { changed: docId1 });
// Stop second handle. Nothing should happen, but the multiplexer should
// be stopped.
test.isTrue(observeMultiplexer._handles); // This will change.
o2.handle.stop();
test.length(o1.output, 0);
test.length(o2.output, 0);
// White-box: ObserveMultiplexer has nulled its _handles so you can't
// accidentally join to it.
test.isNull(observeMultiplexer._handles);
// Stop first handle. Second handle still around.
o1.handle.stop();
test.length(o1.output, 0);
test.length(o2.output, 0);
// Start yet another handle on the same query.
var o3 = observer();
// Initial adds.
test.length(o3.output, 2);
test.include([docId1, docId2], o3.output[0].added);
test.include([docId1, docId2], o3.output[1].added);
test.notEqual(o3.output[0].added, o3.output[1].added);
// Old observers not called.
test.length(o1.output, 0);
test.length(o2.output, 0);
// White-box: Different ObserveMultiplexer.
test.isTrue(observeMultiplexer !== o3.handle._multiplexer);
// Another update. Just the second handle should fire.
await runInFence(async function() {
await coll.updateAsync(docId2, { $set: { z: 'y' } });
});
test.length(o1.output, 0);
test.length(o2.output, 1);
test.equal(o2.output.shift(), { changed: docId2 });
// Start another handle with no added callback. Regression test for #589.
var o4 = observer(true);
// Stop second handle. Nothing should happen, but the multiplexer should
// be stopped.
test.isTrue(observeMultiplexer._handles); // This will change.
await o2.handle.stop();
test.length(o1.output, 0);
test.length(o2.output, 0);
// White-box: ObserveMultiplexer has nulled its _handles so you can't
// accidentally join to it.
test.isNull(observeMultiplexer._handles);
// Start yet another handle on the same query.
var o3 = await observer();
// Initial adds.
test.length(o3.output, 2);
test.include([docId1, docId2], o3.output[0].added);
test.include([docId1, docId2], o3.output[1].added);
test.notEqual(o3.output[0].added, o3.output[1].added);
// Old observers not called.
test.length(o1.output, 0);
test.length(o2.output, 0);
// White-box: Different ObserveMultiplexer.
test.isTrue(observeMultiplexer !== o3.handle._multiplexer);
o3.handle.stop();
o4.handle.stop();
// Start another handle with no added callback. Regression test for #589.
var o4 = await observer(true);
onComplete();
});
o3.handle.stop();
o4.handle.stop();
onComplete();
}
);
Tinytest.addAsync("mongo-livedata - async server-side insert, " + idGeneration, function (test, onComplete) {
// Tests that insert returns before the callback runs. Relies on the fact

View File

@@ -83,7 +83,6 @@ OplogObserveDriver = function (options) {
Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact(
"mongo-livedata", "observe-drivers-oplog", 1);
self._registerPhaseChange(PHASE.QUERYING);
self._matcher = options.matcher;
// we are now using projection, not fields in the cursor description even if you pass {fields}
@@ -824,8 +823,10 @@ _.extend(OplogObserveDriver.prototype, {
if (self._stopped)
return;
if (self._phase !== PHASE.QUERYING)
throw Error("Phase unexpectedly " + self._phase);
// TODO[fibers] not sure about this change. For now it doesn't seems to affect other parts
self._registerPhaseChange(PHASE.QUERYING);
// if (self._phase !== PHASE.QUERYING)
// throw Error("Phase unexpectedly " + self._phase);
if (self._requeryWhenDoneThisQuery) {
self._requeryWhenDoneThisQuery = false;