diff --git a/packages/mongo/collection_tests.js b/packages/mongo/collection_tests.js index 055aebb8b6..e8cf45143d 100644 --- a/packages/mongo/collection_tests.js +++ b/packages/mongo/collection_tests.js @@ -1,5 +1,5 @@ -var MongoDB = NpmModuleMongodb; +var MongoDB = Meteor.isServer && NpmModuleMongodb; Tinytest.add( 'collection - call Mongo.Collection without new', diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index d458565588..118d16bb1b 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -799,111 +799,118 @@ _.each( ['STRING', 'MONGO'], function(idGeneration) { } ); - Tinytest.addAsync("mongo-livedata - cursor dedup, " + idGeneration, function (test, onComplete) { - var run = test.runId(); - var coll = new Mongo.Collection("cursorDedup-"+run, collectionOptions); + Tinytest.addAsync( + 'mongo-livedata - cursor dedup, ' + idGeneration, + async function(test, onComplete) { + var run = test.runId(); + var coll = new Mongo.Collection( + 'cursorDedup-' + run, + collectionOptions + ); - var observer = function (noAdded) { - var output = []; - var callbacks = { - changed: function (newDoc) { - output.push({changed: newDoc._id}); - } - }; - if (!noAdded) { - callbacks.added = function (doc) { - output.push({added: doc._id}); + const observer = async function(noAdded, name) { + const output = []; + const callbacks = { + changed: function(newDoc) { + output.push({ changed: newDoc._id }); + }, }; - } - var handle = coll.find({foo: 22}).observe(callbacks); - return {output: output, handle: handle}; - }; + if (!noAdded) { + callbacks.added = function(doc) { + output.push({ added: doc._id }); + }; + } + const handle = await coll.find({ foo: 22 }).observe(callbacks); + return { output: output, handle: handle }; + }; - // Insert a doc and start observing. - var docId1 = coll.insert({foo: 22}); - var o1 = observer(); - // Initial add. - test.length(o1.output, 1); - test.equal(o1.output.shift(), {added: docId1}); + // Insert a doc and start observing. + var docId1 = await coll.insertAsync({ foo: 22 }); + var o1 = await observer(false, 'o1'); + // Initial add. + test.length(o1.output, 1); + test.equal(o1.output.shift(), { added: docId1 }); - // Insert another doc (blocking until observes have fired). - var docId2; - runInFence(function () { - docId2 = coll.insert({foo: 22, bar: 5}); - }); - // Observed add. - test.length(o1.output, 1); - test.equal(o1.output.shift(), {added: docId2}); + // Insert another doc (blocking until observes have fired). + var docId2; + await runInFence(async function() { + docId2 = await coll.insertAsync({ foo: 22, bar: 5 }); + }); + // Observed add. + test.length(o1.output, 1); + test.equal(o1.output.shift(), { added: docId2 }); - // Second identical observe. - var o2 = observer(); - // Initial adds. - test.length(o2.output, 2); - test.include([docId1, docId2], o2.output[0].added); - test.include([docId1, docId2], o2.output[1].added); - test.notEqual(o2.output[0].added, o2.output[1].added); - o2.output.length = 0; - // Original observe not affected. - test.length(o1.output, 0); + // Second identical observe. + var o2 = await observer(false, 'o2'); - // White-box test: both observes should share an ObserveMultiplexer. - var observeMultiplexer = o1.handle._multiplexer; - test.isTrue(observeMultiplexer); - test.isTrue(observeMultiplexer === o2.handle._multiplexer); + // Initial adds. + test.length(o2.output, 2); + test.include([docId1, docId2], o2.output[0].added); - // Update. Both observes fire. - runInFence(function () { - coll.update(docId1, {$set: {x: 'y'}}); - }); - test.length(o1.output, 1); - test.length(o2.output, 1); - test.equal(o1.output.shift(), {changed: docId1}); - test.equal(o2.output.shift(), {changed: docId1}); + test.include([docId1, docId2], o2.output[1].added); + test.notEqual(o2.output[0].added, o2.output[1].added); + o2.output.length = 0; + // Original observe not affected. + test.length(o1.output, 0); - // Stop first handle. Second handle still around. - o1.handle.stop(); - test.length(o1.output, 0); - test.length(o2.output, 0); + // White-box test: both observes should share an ObserveMultiplexer. + var observeMultiplexer = o1.handle._multiplexer; + test.isTrue(observeMultiplexer); + test.isTrue(observeMultiplexer === o2.handle._multiplexer); - // Another update. Just the second handle should fire. - runInFence(function () { - coll.update(docId2, {$set: {z: 'y'}}); - }); - test.length(o1.output, 0); - test.length(o2.output, 1); - test.equal(o2.output.shift(), {changed: docId2}); + // Update. Both observes fire. + await runInFence(async function() { + await coll.updateAsync(docId1, { $set: { x: 'y' } }); + }); + test.length(o1.output, 1); + test.length(o2.output, 1); + test.equal(o1.output.shift(), { changed: docId1 }); + test.equal(o2.output.shift(), { changed: docId1 }); - // Stop second handle. Nothing should happen, but the multiplexer should - // be stopped. - test.isTrue(observeMultiplexer._handles); // This will change. - o2.handle.stop(); - test.length(o1.output, 0); - test.length(o2.output, 0); - // White-box: ObserveMultiplexer has nulled its _handles so you can't - // accidentally join to it. - test.isNull(observeMultiplexer._handles); + // Stop first handle. Second handle still around. + o1.handle.stop(); + test.length(o1.output, 0); + test.length(o2.output, 0); - // Start yet another handle on the same query. - var o3 = observer(); - // Initial adds. - test.length(o3.output, 2); - test.include([docId1, docId2], o3.output[0].added); - test.include([docId1, docId2], o3.output[1].added); - test.notEqual(o3.output[0].added, o3.output[1].added); - // Old observers not called. - test.length(o1.output, 0); - test.length(o2.output, 0); - // White-box: Different ObserveMultiplexer. - test.isTrue(observeMultiplexer !== o3.handle._multiplexer); + // Another update. Just the second handle should fire. + await runInFence(async function() { + await coll.updateAsync(docId2, { $set: { z: 'y' } }); + }); + test.length(o1.output, 0); + test.length(o2.output, 1); + test.equal(o2.output.shift(), { changed: docId2 }); - // Start another handle with no added callback. Regression test for #589. - var o4 = observer(true); + // Stop second handle. Nothing should happen, but the multiplexer should + // be stopped. + test.isTrue(observeMultiplexer._handles); // This will change. + await o2.handle.stop(); + test.length(o1.output, 0); + test.length(o2.output, 0); + // White-box: ObserveMultiplexer has nulled its _handles so you can't + // accidentally join to it. + test.isNull(observeMultiplexer._handles); + // Start yet another handle on the same query. + var o3 = await observer(); + // Initial adds. + test.length(o3.output, 2); + test.include([docId1, docId2], o3.output[0].added); + test.include([docId1, docId2], o3.output[1].added); + test.notEqual(o3.output[0].added, o3.output[1].added); + // Old observers not called. + test.length(o1.output, 0); + test.length(o2.output, 0); + // White-box: Different ObserveMultiplexer. + test.isTrue(observeMultiplexer !== o3.handle._multiplexer); - o3.handle.stop(); - o4.handle.stop(); + // Start another handle with no added callback. Regression test for #589. + var o4 = await observer(true); - onComplete(); - }); + o3.handle.stop(); + o4.handle.stop(); + + onComplete(); + } + ); Tinytest.addAsync("mongo-livedata - async server-side insert, " + idGeneration, function (test, onComplete) { // Tests that insert returns before the callback runs. Relies on the fact diff --git a/packages/mongo/oplog_observe_driver.js b/packages/mongo/oplog_observe_driver.js index 13fbb4df55..482fae7133 100644 --- a/packages/mongo/oplog_observe_driver.js +++ b/packages/mongo/oplog_observe_driver.js @@ -83,7 +83,6 @@ OplogObserveDriver = function (options) { Package['facts-base'] && Package['facts-base'].Facts.incrementServerFact( "mongo-livedata", "observe-drivers-oplog", 1); - self._registerPhaseChange(PHASE.QUERYING); self._matcher = options.matcher; // we are now using projection, not fields in the cursor description even if you pass {fields} @@ -824,8 +823,10 @@ _.extend(OplogObserveDriver.prototype, { if (self._stopped) return; - if (self._phase !== PHASE.QUERYING) - throw Error("Phase unexpectedly " + self._phase); + // TODO[fibers] not sure about this change. For now it doesn't seems to affect other parts + self._registerPhaseChange(PHASE.QUERYING); + // if (self._phase !== PHASE.QUERYING) + // throw Error("Phase unexpectedly " + self._phase); if (self._requeryWhenDoneThisQuery) { self._requeryWhenDoneThisQuery = false;