diff --git a/packages/accounts-base/accounts_server.js b/packages/accounts-base/accounts_server.js index e10a896478..437e305359 100644 --- a/packages/accounts-base/accounts_server.js +++ b/packages/accounts-base/accounts_server.js @@ -999,7 +999,7 @@ export class AccountsServer extends AccountsCommon { const observe = await this.users.find({ _id: userId, 'services.resume.loginTokens.hashedToken': newToken - }, { fields: { _id: 1 } }).observeChanges({ + }, { fields: { _id: 1 } }).observeChangesAsync({ added: () => { foundMatchingUser = true; }, @@ -1860,4 +1860,3 @@ const generateCasePermutationsForString = string => { } return permutations; } - diff --git a/packages/minimongo/cursor.js b/packages/minimongo/cursor.js index 4db04d9734..3192797985 100644 --- a/packages/minimongo/cursor.js +++ b/packages/minimongo/cursor.js @@ -373,6 +373,13 @@ export default class Cursor { return handle; } + observeChangesAsync(options) { + return new Promise((resolve) => { + const handle = this.observeChanges(options); + handle.isReadyPromise.then(() => resolve(handle)); + }); + } + // XXX Maybe we need a version of observe that just calls a callback if // anything changed. _depend(changers, _allow_unordered) { diff --git a/packages/mongo/collection.js b/packages/mongo/collection.js index 498e1608e1..2a9a859db2 100644 --- a/packages/mongo/collection.js +++ b/packages/mongo/collection.js @@ -565,7 +565,7 @@ Object.assign(Mongo.Collection.prototype, { Object.assign(Mongo.Collection, { async _publishCursor(cursor, sub, collection) { - var observeHandle = await cursor.observeChanges( + var observeHandle = await cursor.observeChangesAsync( { added: function(id, fields) { sub.added(collection, id, fields); diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 0ac003a35b..0c6c44a710 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -965,6 +965,13 @@ Cursor.prototype.observeChanges = function (callbacks, options = {}) { self._cursorDescription, ordered, callbacks, options.nonMutatingCallbacks); }; +Cursor.prototype.observeChangesAsync = async function (callbacks, options = {}) { + var self = this; + var handler = self.observeChanges(callbacks, options); + await handler.ready(); + return handler; +}; + MongoConnection.prototype._createSynchronousCursor = function( cursorDescription, options) { var self = this; @@ -1436,7 +1443,7 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeo }; Object.assign(MongoConnection.prototype, { - _observeChanges: async function ( + _observeChanges: function ( cursorDescription, ordered, callbacks, nonMutatingCallbacks) { var self = this; @@ -1531,7 +1538,7 @@ Object.assign(MongoConnection.prototype, { }); if (observeDriver._init) { - await observeDriver._init(); + observeHandle.initObserver = observeDriver._init(); } // This field is only set for use in tests. @@ -1539,7 +1546,7 @@ Object.assign(MongoConnection.prototype, { } self._observeMultiplexers[observeKey] = multiplexer; // Blocks until the initial adds have been sent. - await multiplexer.addHandleAndSendInitialAdds(observeHandle); + observeHandle.initHandler = multiplexer.addHandleAndSendInitialAdds(observeHandle); return observeHandle; }, diff --git a/packages/mongo/observe_multiplex.js b/packages/mongo/observe_multiplex.js index b138eb9937..8c7d3bd41a 100644 --- a/packages/mongo/observe_multiplex.js +++ b/packages/mongo/observe_multiplex.js @@ -222,11 +222,19 @@ ObserveHandle = class { this._stopped = false; this._id = nextObserveHandleId++; this.nonMutatingCallbacks = nonMutatingCallbacks; + this.initObserver = undefined; + this.initHandler = undefined; + } + + async ready() { + if (this.initObserver) await this.initObserver; + if (this.initHandler) await this.initHandler; } async stop() { if (this._stopped) return; this._stopped = true; + await this.ready(); await this._multiplexer.removeHandle(this._id); } }; diff --git a/packages/mongo/oplog_tests.js b/packages/mongo/oplog_tests.js index 455dd9f57e..66ea2b3f6c 100644 --- a/packages/mongo/oplog_tests.js +++ b/packages/mongo/oplog_tests.js @@ -8,7 +8,7 @@ Tinytest.addAsync('mongo-livedata - oplog - cursorSupported', async function( var supported = async function(expected, selector, options) { var cursor = OplogCollection.find(selector, options); - var handle = await cursor.observeChanges({ added: function() {} }); + var handle = await cursor.observeChangesAsync({ added: function() {} }); // If there's no oplog at all, we shouldn't ever use it. if (!oplogEnabled) expected = false; test.equal(!!handle._multiplexer._observeDriver._usesOplog, expected); @@ -125,7 +125,7 @@ process.env.MONGO_OPLOG_URL && species: 'dog', color: 'blue', }) - .observeChanges({ + .observeChangesAsync({ added(id, fields) { if (fields.name === 'dog 5') { blueDog5Id = id;