diff --git a/packages/minimongo/local_collection.js b/packages/minimongo/local_collection.js index 740ac469e6..2c7f5a923a 100644 --- a/packages/minimongo/local_collection.js +++ b/packages/minimongo/local_collection.js @@ -345,11 +345,6 @@ export default class LocalCollection { // XXX atomicity: if multi is true, and one modification fails, do // we rollback the whole operation, or what? async update(selector, mod, options) { - // if (! callback && options instanceof Function) { - // callback = options; - // options = null; - // } - if (!options) { options = {}; } @@ -486,17 +481,11 @@ export default class LocalCollection { // A convenience wrapper on update. LocalCollection.upsert(sel, mod) is // equivalent to LocalCollection.update(sel, mod, {upsert: true, // _returnObject: true}). - upsert(selector, mod, options, callback) { - if (!callback && typeof options === 'function') { - callback = options; - options = {}; - } - + async upsert(selector, mod, options) { return this.update( selector, mod, - Object.assign({}, options, {upsert: true, _returnObject: true}), - callback + Object.assign({}, options, {upsert: true, _returnObject: true}) ); } @@ -739,9 +728,9 @@ LocalCollection._CachingChangeObserver = class _CachingChangeObserver { DiffSequence.applyChanges(doc, fields); }; - this.applyChange.removed = id => { + this.applyChange.removed = async id => { if (callbacks.removed) { - callbacks.removed.call(this, id); + await callbacks.removed.call(this, id); } this.docs.remove(id); @@ -1472,7 +1461,7 @@ LocalCollection._observeFromObserveChangesNoFibers = async (cursor, observeCallb }, removed(id) { if (observeCallbacks.removed) { - observeCallbacks.removed(transform(this.docs.get(id))); + return observeCallbacks.removed(transform(this.docs.get(id))); } }, }; diff --git a/packages/mongo/collection.js b/packages/mongo/collection.js index 57f59cba9d..ce742fe60f 100644 --- a/packages/mongo/collection.js +++ b/packages/mongo/collection.js @@ -684,31 +684,17 @@ Object.assign(Mongo.Collection.prototype, { * @memberof Mongo.Collection * @instance * @param {MongoSelector} selector Specifies which documents to remove - * @param {Function} [callback] Optional. If present, called with an error object as its argument. */ - remove(selector, callback) { + async remove(selector) { selector = Mongo.Collection._rewriteSelector(selector); - const wrappedCallback = wrapCallback(callback); - if (this._isRemoteCollection()) { return this._callMutatorMethodAsync('remove', [selector]); } // it's my collection. descend into the collection1 object // and propagate any exception. - try { - // If the user provided a callback and the collection implements this - // operation asynchronously, then queryRet will be undefined, and the - // result will be returned through the callback instead. - return this._collection.remove(selector, wrappedCallback); - } catch (e) { - if (callback) { - callback(e); - return null; - } - throw e; - } + return this._collection.remove(selector); }, // Determine if this collection is simply a minimongo representation of a real @@ -728,14 +714,8 @@ Object.assign(Mongo.Collection.prototype, { * @param {MongoModifier} modifier Specifies how to modify the documents * @param {Object} [options] * @param {Boolean} options.multi True to modify all matching documents; false to only modify one of the matching documents (the default). - * @param {Function} [callback] Optional. If present, called with an error object as the first argument and, if no error, the number of affected documents as the second. */ - upsert(selector, modifier, options, callback) { - if (!callback && typeof options === 'function') { - callback = options; - options = {}; - } - + async upsert(selector, modifier, options) { return this.update( selector, modifier, @@ -743,9 +723,7 @@ Object.assign(Mongo.Collection.prototype, { ...options, _returnObject: true, upsert: true, - }, - callback - ); + }); }, // We'll actually design an index API later. For now, we just pass through to diff --git a/packages/mongo/doc_fetcher_tests.js b/packages/mongo/doc_fetcher_tests.js index 86c1164a69..39f6fff944 100644 --- a/packages/mongo/doc_fetcher_tests.js +++ b/packages/mongo/doc_fetcher_tests.js @@ -14,15 +14,14 @@ testAsyncMulti("mongo-livedata - doc fetcher", [ // Test basic operation. const fakeOp1 = {}; const fakeOp2 = {}; - fetcher.fetch(collName, id1, fakeOp1, expect(null, {_id: id1, x: 1})); - fetcher.fetch(collName, "nonexistent!", fakeOp2, expect(null, null)); + await fetcher.fetch(collName, id1, fakeOp1).then(() => expect(null, {_id: id1, x: 1})); + await fetcher.fetch(collName, "nonexistent!", fakeOp2, expect(null, null)); var fetched = false; var fakeOp3 = {}; var expected = {_id: id2, y: 2}; - fetcher.fetch(collName, id2, fakeOp3, expect(function (e, d) { + await fetcher.fetch(collName, id2, fakeOp3).then(d => expect(function () { fetched = true; - test.isFalse(e); test.equal(d, expected); })); // The fetcher yields. @@ -31,8 +30,7 @@ testAsyncMulti("mongo-livedata - doc fetcher", [ // Now ask for another document with the same op reference. Because a // fetch for that op is in flight, we will get the other fetch's // document, not this random document. - fetcher.fetch(collName, Random.id(), fakeOp3, expect(function (e, d) { - test.isFalse(e); + await fetcher.fetch(collName, Random.id(), fakeOp3).then(d => expect(function () { test.equal(d, expected); })); } diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 2ba0c7208e..8da5ebb163 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -527,23 +527,15 @@ MongoConnection.prototype._update = async function (collection_name, selector, m // - The id is defined by query or mod we can just add it to the replacement doc // - The user did not specify any id preference and the id is a Mongo ObjectId, // then we can just let Mongo generate the id - - simulateUpsertWithInsertedId( - collection, mongoSelector, mongoMod, options, - // This callback does not need to be bindEnvironment'ed because - // simulateUpsertWithInsertedId() wraps it and then passes it through - // bindEnvironmentForWrite. - function (error, result) { - // If we got here via a upsert() call, then options._returnObject will - // be set and we should return the whole object. Otherwise, we should - // just return the number of affected docs to match the mongo API. - if (result && ! options._returnObject) { - callback(error, result.numberAffected); - } else { - callback(error, result); - } - } - ); + return await simulateUpsertWithInsertedId(collection, mongoSelector, mongoMod, options) + .then(result => { + refresh(); + if (result && ! options._returnObject) { + return result.numberAffected; + } else { + return result; + } + }); } else { if (options.upsert && !knownId && options.insertedId && isModify) { if (!mongoMod.hasOwnProperty('$setOnInsert')) { diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index 7a360cb06c..4f37df1930 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -83,26 +83,13 @@ var compareResults = function (test, skipIds, actual, expected) { test.equal(actual, expected); }; -var upsert = async function (coll, useUpdate, query, mod, options, callback) { - if (! callback && typeof options === "function") { - callback = options; - options = {}; - } - +var upsert = async function (coll, useUpdate, query, mod, options) { if (!useUpdate) { - return await coll.upsert(query, mod, options, callback); + return await coll.upsert(query, mod, options); } - if (callback) { - await coll.update(query, mod, - _.extend({ upsert: true }, options)) - .then(result => { - callback(null, {numberAffected: result}); - }); - } - - return await Promise.resolve(coll.update(query, mod, - _.extend({ upsert: true }, options))).then(r => ({numberAffected: r})); + return coll.update(query, mod, _.extend({ upsert: true }, options)) + .then(r => ({numberAffected: r})); }; var upsertTestMethod = "livedata_upsert_test_method"; @@ -1465,13 +1452,12 @@ _.each( ['STRING'], function(idGeneration) { var self = this; self.coll = new Mongo.Collection(self.collectionName, self.collectionOptions); var obs; - var expectAdd = function (doc) { + var expectAdd = expect(function (doc) { test.equal(doc.seconds(), 50); - }; - var expectRemove = async function (doc) { + }); + var expectRemove = expect(function (doc) { test.equal(doc.seconds(), 50); - await obs.stop(); - }; + }); const id = await runAndThrowIfNeeded(() => self.coll.insert({d: new Date(1356152390004)}), test, false); test.isTrue(id); var cursor = self.coll.find(); @@ -1479,15 +1465,15 @@ _.each( ['STRING'], function(idGeneration) { added: expectAdd, removed: expectRemove }); - // test.equal(await cursor.count(), 1); - // test.equal((await cursor.fetch())[0].seconds(), 50); - // test.equal((await self.coll.findOne()).seconds(), 50); - // test.equal((await self.coll.findOne({}, {transform: null})).seconds, undefined); - // test.equal((await self.coll.findOne({}, { - // transform: function (doc) {return {seconds: doc.d.getSeconds()};} - // })).seconds, 50); + test.equal(await cursor.count(), 1); + test.equal((await cursor.fetch())[0].seconds(), 50); + test.equal((await self.coll.findOne()).seconds(), 50); + test.equal((await self.coll.findOne({}, {transform: null})).seconds, undefined); + test.equal((await self.coll.findOne({}, { + transform: function (doc) {return {seconds: doc.d.getSeconds()};} + })).seconds, 50); await self.coll.remove(id); - expect(); + obs.stop(); }, async function (test) { var self = this; @@ -1518,7 +1504,7 @@ _.each( ['STRING'], function(idGeneration) { Meteor.subscribe('c-' + this.collectionName, expect()); } }, - async function (test) { + async function (test, expect) { var self = this; self.coll = new Mongo.Collection(this.collectionName, collectionOptions); const id = await runAndThrowIfNeeded(() => self.coll.insert({}), test); @@ -1545,7 +1531,7 @@ _.each( ['STRING'], function(idGeneration) { await Meteor.callAsync('createInsecureCollection', this.collectionName, collectionOptions); Meteor.subscribe('c-' + this.collectionName, expect()); } - }, async function (test) { + }, async function (test, expect) { const coll = new Mongo.Collection(this.collectionName, collectionOptions); const id = await runAndThrowIfNeeded(() => coll.insert({b: bin}), test); test.isTrue(id); @@ -1566,7 +1552,7 @@ _.each( ['STRING'], function(idGeneration) { } }, - async function (test) { + async function (test, expect) { var self = this; self.coll = new Mongo.Collection(this.collectionName, collectionOptions); var docId; @@ -2263,7 +2249,7 @@ testAsyncMulti('mongo-livedata - specified _id', [ await Meteor.callAsync('createInsecureCollection', this.collectionName); Meteor.subscribe('c-' + this.collectionName, expect()); } - }, async function (test) { + }, async function (test, expect) { var coll = new Mongo.Collection(this.collectionName); const id1 = await runAndThrowIfNeeded(() => coll.insert({ _id: "foo", name: "foo" }), test); test.equal(id1, "foo"); @@ -2276,6 +2262,7 @@ testAsyncMulti('mongo-livedata - specified _id', [ console.log({id1, id2}, await coll.find({}).fetch()); const doc2 = await coll.findOne(); test.equal(doc2.name, "foo"); + expect(); } ]); @@ -2477,6 +2464,7 @@ testAsyncMulti('mongo-livedata - empty string _id', [ var self = this; var docs = await self.coll.find().fetch(); test.equal(docs, [{_id: "realid", f: "bar"}]); + expect(); }, async function (test, expect) { var self = this; @@ -2484,6 +2472,7 @@ testAsyncMulti('mongo-livedata - empty string _id', [ await self.coll._collection.insert({_id: "", f: "baz"}); test.equal((await self.coll.find().fetch()).length, 2); } + expect(); } ]); @@ -3149,6 +3138,7 @@ testAsyncMulti("mongo-livedata - undefined find options", [ skip: undefined }); test.equal(result, self.doc); + expect(); } ]); @@ -3197,6 +3187,7 @@ Meteor.isServer && testAsyncMulti("mongo-livedata - observe limit bug", [ return self.coll.remove({toDelete: true}); }); test.equal(_.keys(state), [self.id1]); + expect(); } ]); @@ -3218,6 +3209,7 @@ Meteor.isServer && testAsyncMulti("mongo-livedata - update with replace forbidde return c.update(id, { foo3: "bar3", $set: { blah: 1 } }); }, "cannot have both modifier and non-modifier fields"); test.equal(await c.findOne(id), { _id: id, foo2: "bar2" }); + expect(); } ]); diff --git a/packages/mongo/observe_multiplex.js b/packages/mongo/observe_multiplex.js index 123070a4db..350e36aad6 100644 --- a/packages/mongo/observe_multiplex.js +++ b/packages/mongo/observe_multiplex.js @@ -127,9 +127,9 @@ ObserveMultiplexer = class { // Calls "cb" once the effects of all "ready", "addHandleAndSendInitialAdds" // and observe callbacks which came before this call have been propagated to // all handles. "ready" must have already been called on this multiplexer. - onFlush(cb) { + async onFlush(cb) { var self = this; - return this._queue.queueTask(async function () { + return await this._queue.queueTask(async function () { if (!self._ready()) throw Error("only call onFlush on a multiplexer that will be ready"); await cb(); diff --git a/packages/mongo/oplog_observe_driver.js b/packages/mongo/oplog_observe_driver.js index b9d18fd0ee..7c7e504725 100644 --- a/packages/mongo/oplog_observe_driver.js +++ b/packages/mongo/oplog_observe_driver.js @@ -480,78 +480,76 @@ _.extend(OplogObserveDriver.prototype, { }, _fetchModifiedDocuments: function () { var self = this; - Meteor._noYieldsAllowed(function () { - self._registerPhaseChange(PHASE.FETCHING); - // Defer, because nothing called from the oplog entry handler may yield, - // but fetch() yields. - Meteor.defer(finishIfNeedToPollQuery(async function () { - while (!self._stopped && !self._needToFetch.empty()) { - if (self._phase === PHASE.QUERYING) { - // While fetching, we decided to go into QUERYING mode, and then we - // saw another oplog entry, so _needToFetch is not empty. But we - // shouldn't fetch these documents until AFTER the query is done. - break; - } - - // Being in steady phase here would be surprising. - if (self._phase !== PHASE.FETCHING) - throw new Error("phase in fetchModifiedDocuments: " + self._phase); - - self._currentlyFetching = self._needToFetch; - var thisGeneration = ++self._fetchGeneration; - self._needToFetch = new LocalCollection._IdMap; - var waiting = 0; - - let promiseResolver = null; - const awaitablePromise = new Promise(r => promiseResolver = r); - // This loop is safe, because _currentlyFetching will not be updated - // during this loop (in fact, it is never mutated). - self._currentlyFetching.forEach(function (op, id) { - waiting++; - self._mongoHandle._docFetcher.fetch( - self._cursorDescription.collectionName, id, op, - finishIfNeedToPollQuery(function (err, doc) { - try { - if (err) { - Meteor._debug("Got exception while fetching documents", - err); - // If we get an error from the fetcher (eg, trouble - // connecting to Mongo), let's just abandon the fetch phase - // altogether and fall back to polling. It's not like we're - // getting live updates anyway. - if (self._phase !== PHASE.QUERYING) { - self._needToPollQuery(); - } - } else if (!self._stopped && self._phase === PHASE.FETCHING - && self._fetchGeneration === thisGeneration) { - // We re-check the generation in case we've had an explicit - // _pollQuery call (eg, in another fiber) which should - // effectively cancel this round of fetches. (_pollQuery - // increments the generation.) - self._handleDoc(id, doc); - } - } finally { - waiting--; - // Because fetch() never calls its callback synchronously, - // this is safe (ie, we won't call fut.return() before the - // forEach is done). - if (waiting === 0) - promiseResolver(); - } - })); - }); - await awaitablePromise; - // Exit now if we've had a _pollQuery call (here or in another fiber). - if (self._phase === PHASE.QUERYING) - return; - self._currentlyFetching = null; + self._registerPhaseChange(PHASE.FETCHING); + // Defer, because nothing called from the oplog entry handler may yield, + // but fetch() yields. + Meteor.defer(finishIfNeedToPollQuery(async function () { + while (!self._stopped && !self._needToFetch.empty()) { + if (self._phase === PHASE.QUERYING) { + // While fetching, we decided to go into QUERYING mode, and then we + // saw another oplog entry, so _needToFetch is not empty. But we + // shouldn't fetch these documents until AFTER the query is done. + break; } - // We're done fetching, so we can be steady, unless we've had a - // _pollQuery call (here or in another fiber). - if (self._phase !== PHASE.QUERYING) - await self._beSteady(); - })); - }); + + // Being in steady phase here would be surprising. + if (self._phase !== PHASE.FETCHING) + throw new Error("phase in fetchModifiedDocuments: " + self._phase); + + self._currentlyFetching = self._needToFetch; + var thisGeneration = ++self._fetchGeneration; + self._needToFetch = new LocalCollection._IdMap; + var waiting = 0; + + let promiseResolver = null; + const awaitablePromise = new Promise(r => promiseResolver = r); + // This loop is safe, because _currentlyFetching will not be updated + // during this loop (in fact, it is never mutated). + self._currentlyFetching.forEach(function (op, id) { + waiting++; + self._mongoHandle._docFetcher.fetch( + self._cursorDescription.collectionName, id, op, + finishIfNeedToPollQuery(function (err, doc) { + try { + if (err) { + Meteor._debug("Got exception while fetching documents", + err); + // If we get an error from the fetcher (eg, trouble + // connecting to Mongo), let's just abandon the fetch phase + // altogether and fall back to polling. It's not like we're + // getting live updates anyway. + if (self._phase !== PHASE.QUERYING) { + self._needToPollQuery(); + } + } else if (!self._stopped && self._phase === PHASE.FETCHING + && self._fetchGeneration === thisGeneration) { + // We re-check the generation in case we've had an explicit + // _pollQuery call (eg, in another fiber) which should + // effectively cancel this round of fetches. (_pollQuery + // increments the generation.) + self._handleDoc(id, doc); + } + } finally { + waiting--; + // Because fetch() never calls its callback synchronously, + // this is safe (ie, we won't call fut.return() before the + // forEach is done). + if (waiting === 0) + promiseResolver(); + } + })); + }); + await awaitablePromise; + // Exit now if we've had a _pollQuery call (here or in another fiber). + if (self._phase === PHASE.QUERYING) + return; + self._currentlyFetching = null; + } + // We're done fetching, so we can be steady, unless we've had a + // _pollQuery call (here or in another fiber). + if (self._phase !== PHASE.QUERYING) + await self._beSteady(); + })); }, _beSteady: async function () { var self = this;