diff --git a/packages/mongo/doc_fetcher.js b/packages/mongo/doc_fetcher.js index c7e20f8a41..e4cf514f48 100644 --- a/packages/mongo/doc_fetcher.js +++ b/packages/mongo/doc_fetcher.js @@ -62,6 +62,10 @@ export class DocFetcher { } finally { // XXX consider keeping the doc around for a period of time before // removing from the cache + const evEmmiter = self._callbacksForOp.get(op); + if (evEmmiter && evEmmiter.removeAllListeners) { + evEmmiter.removeAllListeners(); + } self._callbacksForOp.delete(op); } } diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index c6879be816..fb1afa7cb3 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -226,7 +226,7 @@ _.each( ['STRING'], function(idGeneration) { ); - Tinytest.addAsync("mongo-livedata - basics, " + idGeneration, async function (test) { + Tinytest.addAsync("mongo-livedata - basics, " + idGeneration, async function (test, onComplete) { var run = test.runId(); var coll, coll2; if (Meteor.isClient) { @@ -270,8 +270,8 @@ _.each( ['STRING'], function(idGeneration) { var expectObserve = async function (expected, f) { if (!(expected instanceof Array)) expected = [expected]; - - test.include(expected, await captureObserve(f)); + const currentValue = await captureObserve(f); + test.include(expected, currentValue); }; test.equal(await coll.find({run: run}).count(), 0); @@ -350,8 +350,8 @@ _.each( ['STRING'], function(idGeneration) { await expectObserve('c(3,0,1)c(6,1,4)', async function () { var count = await coll.update({run: run}, {$inc: {x: 2}}, {multi: true}); test.equal(count, 2); - test.equal(_.pluck(await coll.find({run: run}, {sort: {x: -1}}).fetch(), "x"), - [6, 3]); + const res = _.pluck(await coll.find({run: run}, {sort: {x: -1}}).fetch(), 'x'); + test.equal(res, [6, 3]); }); await expectObserve(['c(13,0,3)m(13,0,1)', 'm(6,1,0)c(13,1,3)', @@ -863,7 +863,7 @@ _.each( ['STRING'], function(idGeneration) { // TODO -> Also uses oplog // This test mainly checks the correctness of oplog code dealing with limited // queries. Compitablity with poll-diff is added as well. - Tinytest.addAsync("mongo-livedata - observe sorted, limited " + idGeneration, async function (test) { + Tinytest.addAsync("mongo-livedata - observe sorted, limited " + idGeneration, async function (test, onComplete) { var run = test.runId(); var coll = new Mongo.Collection("observeLimit-"+run, collectionOptions); @@ -1025,7 +1025,7 @@ _.each( ['STRING'], function(idGeneration) { // Remove first 4 docs (3, 1, 2, 4) forcing buffer to become empty and // schedule a repoll. - await rem({ bar: { $lt: 10 } }); + await rem({ bar: { $lt: 10 } }, '{ bar: { $lt: 10 } }'); // State: [ 17:8 18:7 19:6 | ]! // XXX the oplog code analyzes the events one by one: one remove after @@ -1130,6 +1130,7 @@ _.each( ['STRING'], function(idGeneration) { testSafeAppendToBufferFlag(false); await o.handle.stop(); + onComplete(); }); // TODO -> Also uses oplog Tinytest.addAsync("mongo-livedata - observe sorted, limited, sort fields " + idGeneration, async function (test) { @@ -1487,9 +1488,7 @@ _.each( ['STRING'], function(idGeneration) { testAsyncMulti('mongo-livedata - transform sets _id if not present, ' + idGeneration, [ async function (test, expect) { - var self = this; var justId = function (doc) { - console.log({doc}); return _.omit(doc, '_id'); }; TRANSFORMS["justId"] = justId; @@ -1589,14 +1588,11 @@ _.each( ['STRING'], function(idGeneration) { var self = this; await self.coll.update( self.docId, new Dog("rover", "orange")).then(id => { - console.log({id}); expect(); }).catch(err => { - console.log({err}); test.isTrue(err); expect(); }); - //console.log({expect}); } ]); diff --git a/packages/mongo/observe_multiplex.js b/packages/mongo/observe_multiplex.js index d86b2040f1..6c4319a18f 100644 --- a/packages/mongo/observe_multiplex.js +++ b/packages/mongo/observe_multiplex.js @@ -127,11 +127,11 @@ ObserveMultiplexer = class { // all handles. "ready" must have already been called on this multiplexer. async onFlush(cb) { var self = this; - return await this._queue.queueTask(async function () { + //return await this._queue.queueTask(async function () { if (!self._ready()) throw Error("only call onFlush on a multiplexer that will be ready"); await cb(); - }); + //}); } callbackNames() { if (this._ordered) @@ -142,9 +142,9 @@ ObserveMultiplexer = class { _ready() { return !!this._isReady; } - _applyCallback(callbackName, args) { + async _applyCallback(callbackName, args) { const self = this; - this._queue.queueTask(async function () { + //this._queue.queueTask(async function () { // If we stopped in the meantime, do nothing. if (!self._handles) return; @@ -174,7 +174,7 @@ ObserveMultiplexer = class { }); await Promise.all(toAwait); - }); + //}); } // Sends initial adds to a handle. It should only be called from within a task diff --git a/packages/mongo/oplog_observe_driver.js b/packages/mongo/oplog_observe_driver.js index 7c7e504725..e22c885dff 100644 --- a/packages/mongo/oplog_observe_driver.js +++ b/packages/mongo/oplog_observe_driver.js @@ -505,15 +505,31 @@ _.extend(OplogObserveDriver.prototype, { const awaitablePromise = new Promise(r => promiseResolver = r); // This loop is safe, because _currentlyFetching will not be updated // during this loop (in fact, it is never mutated). - self._currentlyFetching.forEach(function (op, id) { + await self._currentlyFetching.forEachAsync(async function (op, id) { waiting++; - self._mongoHandle._docFetcher.fetch( - self._cursorDescription.collectionName, id, op, - finishIfNeedToPollQuery(function (err, doc) { - try { - if (err) { + await self._mongoHandle._docFetcher.fetch(self._cursorDescription.collectionName, id, op) + .then(finishIfNeedToPollQuery(function (doc) { + try { + if (!self._stopped && self._phase === PHASE.FETCHING + && self._fetchGeneration === thisGeneration) { + // We re-check the generation in case we've had an explicit + // _pollQuery call (eg, in another fiber) which should + // effectively cancel this round of fetches. (_pollQuery + // increments the generation.) + + self._handleDoc(id, doc); + } + } finally { + waiting--; + // Because fetch() never calls its callback synchronously, + // this is safe (ie, we won't call fut.return() before the + // forEach is done). + if (waiting === 0) + promiseResolver(); + } + })).catch(err => { Meteor._debug("Got exception while fetching documents", - err); + err); // If we get an error from the fetcher (eg, trouble // connecting to Mongo), let's just abandon the fetch phase // altogether and fall back to polling. It's not like we're @@ -521,23 +537,13 @@ _.extend(OplogObserveDriver.prototype, { if (self._phase !== PHASE.QUERYING) { self._needToPollQuery(); } - } else if (!self._stopped && self._phase === PHASE.FETCHING - && self._fetchGeneration === thisGeneration) { - // We re-check the generation in case we've had an explicit - // _pollQuery call (eg, in another fiber) which should - // effectively cancel this round of fetches. (_pollQuery - // increments the generation.) - self._handleDoc(id, doc); - } - } finally { - waiting--; - // Because fetch() never calls its callback synchronously, - // this is safe (ie, we won't call fut.return() before the - // forEach is done). - if (waiting === 0) - promiseResolver(); - } - })); + waiting--; + // Because fetch() never calls its callback synchronously, + // this is safe (ie, we won't call fut.return() before the + // forEach is done). + if (waiting === 0) + promiseResolver(); + }); }); await awaitablePromise; // Exit now if we've had a _pollQuery call (here or in another fiber).