From 7ae8141f1652f8bb187cd027cb8040fa696b4c80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nacho=20Codo=C3=B1er?= Date: Wed, 21 Feb 2024 17:14:41 +0100 Subject: [PATCH 1/8] keep observeChanges return a query handler and implement observeChangesAsync --- packages/accounts-base/accounts_server.js | 3 +-- packages/minimongo/cursor.js | 7 +++++++ packages/mongo/collection.js | 2 +- packages/mongo/mongo_driver.js | 13 ++++++++++--- packages/mongo/observe_multiplex.js | 8 ++++++++ packages/mongo/oplog_tests.js | 4 ++-- 6 files changed, 29 insertions(+), 8 deletions(-) diff --git a/packages/accounts-base/accounts_server.js b/packages/accounts-base/accounts_server.js index e10a896478..437e305359 100644 --- a/packages/accounts-base/accounts_server.js +++ b/packages/accounts-base/accounts_server.js @@ -999,7 +999,7 @@ export class AccountsServer extends AccountsCommon { const observe = await this.users.find({ _id: userId, 'services.resume.loginTokens.hashedToken': newToken - }, { fields: { _id: 1 } }).observeChanges({ + }, { fields: { _id: 1 } }).observeChangesAsync({ added: () => { foundMatchingUser = true; }, @@ -1860,4 +1860,3 @@ const generateCasePermutationsForString = string => { } return permutations; } - diff --git a/packages/minimongo/cursor.js b/packages/minimongo/cursor.js index 4db04d9734..3192797985 100644 --- a/packages/minimongo/cursor.js +++ b/packages/minimongo/cursor.js @@ -373,6 +373,13 @@ export default class Cursor { return handle; } + observeChangesAsync(options) { + return new Promise((resolve) => { + const handle = this.observeChanges(options); + handle.isReadyPromise.then(() => resolve(handle)); + }); + } + // XXX Maybe we need a version of observe that just calls a callback if // anything changed. _depend(changers, _allow_unordered) { diff --git a/packages/mongo/collection.js b/packages/mongo/collection.js index 498e1608e1..2a9a859db2 100644 --- a/packages/mongo/collection.js +++ b/packages/mongo/collection.js @@ -565,7 +565,7 @@ Object.assign(Mongo.Collection.prototype, { Object.assign(Mongo.Collection, { async _publishCursor(cursor, sub, collection) { - var observeHandle = await cursor.observeChanges( + var observeHandle = await cursor.observeChangesAsync( { added: function(id, fields) { sub.added(collection, id, fields); diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 0ac003a35b..0c6c44a710 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -965,6 +965,13 @@ Cursor.prototype.observeChanges = function (callbacks, options = {}) { self._cursorDescription, ordered, callbacks, options.nonMutatingCallbacks); }; +Cursor.prototype.observeChangesAsync = async function (callbacks, options = {}) { + var self = this; + var handler = self.observeChanges(callbacks, options); + await handler.ready(); + return handler; +}; + MongoConnection.prototype._createSynchronousCursor = function( cursorDescription, options) { var self = this; @@ -1436,7 +1443,7 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeo }; Object.assign(MongoConnection.prototype, { - _observeChanges: async function ( + _observeChanges: function ( cursorDescription, ordered, callbacks, nonMutatingCallbacks) { var self = this; @@ -1531,7 +1538,7 @@ Object.assign(MongoConnection.prototype, { }); if (observeDriver._init) { - await observeDriver._init(); + observeHandle.initObserver = observeDriver._init(); } // This field is only set for use in tests. @@ -1539,7 +1546,7 @@ Object.assign(MongoConnection.prototype, { } self._observeMultiplexers[observeKey] = multiplexer; // Blocks until the initial adds have been sent. - await multiplexer.addHandleAndSendInitialAdds(observeHandle); + observeHandle.initHandler = multiplexer.addHandleAndSendInitialAdds(observeHandle); return observeHandle; }, diff --git a/packages/mongo/observe_multiplex.js b/packages/mongo/observe_multiplex.js index b138eb9937..8c7d3bd41a 100644 --- a/packages/mongo/observe_multiplex.js +++ b/packages/mongo/observe_multiplex.js @@ -222,11 +222,19 @@ ObserveHandle = class { this._stopped = false; this._id = nextObserveHandleId++; this.nonMutatingCallbacks = nonMutatingCallbacks; + this.initObserver = undefined; + this.initHandler = undefined; + } + + async ready() { + if (this.initObserver) await this.initObserver; + if (this.initHandler) await this.initHandler; } async stop() { if (this._stopped) return; this._stopped = true; + await this.ready(); await this._multiplexer.removeHandle(this._id); } }; diff --git a/packages/mongo/oplog_tests.js b/packages/mongo/oplog_tests.js index 455dd9f57e..66ea2b3f6c 100644 --- a/packages/mongo/oplog_tests.js +++ b/packages/mongo/oplog_tests.js @@ -8,7 +8,7 @@ Tinytest.addAsync('mongo-livedata - oplog - cursorSupported', async function( var supported = async function(expected, selector, options) { var cursor = OplogCollection.find(selector, options); - var handle = await cursor.observeChanges({ added: function() {} }); + var handle = await cursor.observeChangesAsync({ added: function() {} }); // If there's no oplog at all, we shouldn't ever use it. if (!oplogEnabled) expected = false; test.equal(!!handle._multiplexer._observeDriver._usesOplog, expected); @@ -125,7 +125,7 @@ process.env.MONGO_OPLOG_URL && species: 'dog', color: 'blue', }) - .observeChanges({ + .observeChangesAsync({ added(id, fields) { if (fields.name === 'dog 5') { blueDog5Id = id; From 8d35ccffbf75bb03f8d13d12d9de2ca247a39ca4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nacho=20Codo=C3=B1er?= Date: Wed, 21 Feb 2024 17:57:04 +0100 Subject: [PATCH 2/8] fix tests to use observeChangesAsync version --- packages/mongo/mongo_livedata_tests.js | 24 +++++++-------- packages/mongo/observe_changes_tests.js | 30 +++++++++---------- .../failover-test/server/failover-test.js | 2 +- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index 4e50d5d507..0bb6507047 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -700,7 +700,7 @@ _.each( [ 'MONGO', 'STRING'], function(idGeneration) { }, }); - const handle2 = await coll.find({ run }).observeChanges( + const handle2 = await coll.find({ run }).observeChangesAsync( { added: expectNotMutatable, changed: function(id, o) { @@ -2017,7 +2017,7 @@ _.each( [ 'MONGO', 'STRING'], function(idGeneration) { let polls = {}; const handlesToStop = []; const observe = async function(name, query) { - const handle = await coll.find(query).observeChanges({ + const handle = await coll.find(query).observeChangesAsync({ // Make sure that we only poll on invalidation, not due to time, and // keep track of when we do. Note: this option disables the use of // oplogs (which admittedly is somewhat irrelevant to this feature). @@ -3149,7 +3149,7 @@ if (Meteor.isServer) { async function (test, expect) { var self = this; if (self.miniC) { - self.obs = await self.miniC.find().observeChanges({ + self.obs = await self.miniC.find().observeChangesAsync({ added: async function (id, fields) { self.events.push({evt: "a", id: id}); await Meteor._sleepForMs(200); @@ -3476,13 +3476,13 @@ Meteor.isServer && if (MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle) { var observeWithOplog = await coll .find({ x: 5 }) - .observeChanges({ added: function() {} }); + .observeChangesAsync({ added: function() {} }); test.isTrue(observeWithOplog._multiplexer._observeDriver._usesOplog); await observeWithOplog.stop(); } var observeWithoutOplog = await coll .find({ x: 6 }, { _disableOplog: true }) - .observeChanges({ added: function() {} }); + .observeChangesAsync({ added: function() {} }); test.isFalse(observeWithoutOplog._multiplexer._observeDriver._usesOplog); await observeWithoutOplog.stop(); }); @@ -3506,7 +3506,7 @@ Meteor.isServer && var output = []; var handle = await coll .find({ a: 1, b: 2 }, { fields: { c: 1 } }) - .observeChanges({ + .observeChangesAsync({ added: function(id, fields) { output.push(['added', id, fields]); }, @@ -3559,7 +3559,7 @@ Meteor.isServer && ); var changesOutput = []; - var changesHandle = await cursor.observeChanges({ + var changesHandle = await cursor.observeChangesAsync({ added: function(id, fields) { changesOutput.push(['added', fields]); }, @@ -3604,7 +3604,7 @@ Meteor.isServer && var tmp; var output = []; - var handle = await coll.find({ a: 'foo' }).observeChanges({ + var handle = await coll.find({ a: 'foo' }).observeChangesAsync({ added: function(id, fields) { output.push(['added', id, fields]); }, @@ -3717,7 +3717,7 @@ testAsyncMulti('mongo-livedata - oplog - update EJSON', [ async function(test, expect) { var self = this; self.changes = []; - self.handle = await self.collection.find({}).observeChanges({ + self.handle = await self.collection.find({}).observeChangesAsync({ added: function(id, fields) { self.changes.push(['a', id, fields]); }, @@ -3819,7 +3819,7 @@ Meteor.isServer && _.times(100, async function() { await coll.insertAsync({ foo: 'baz' }); }); - var handler = await coll.find({}).observeChanges({ + var handler = await coll.find({}).observeChangesAsync({ added: async function(id) { await coll.updateAsync(id, { $set: { foo: 'bar' } }); }, @@ -4050,7 +4050,7 @@ if (Meteor.isClient) { futuresByNonce[nonce] = new Promise(r => (resolver = r)); var observe = await fenceOnBeforeFireErrorCollection .find({ nonce: nonce }) - .observeChanges({ added: function() {} }); + .observeChangesAsync({ added: function() {} }); Meteor.setTimeout(async function() { try { await fenceOnBeforeFireErrorCollection.insertAsync({ nonce }); @@ -4214,7 +4214,7 @@ if (Meteor.isServer) { resolve(); } - const observeHandle = await Collection.find().observeChanges({ + const observeHandle = await Collection.find().observeChangesAsync({ async changed(id, fields) { let expectedValue; diff --git a/packages/mongo/observe_changes_tests.js b/packages/mongo/observe_changes_tests.js index 30004d4b1d..6fff210a9e 100644 --- a/packages/mongo/observe_changes_tests.js +++ b/packages/mongo/observe_changes_tests.js @@ -25,7 +25,7 @@ _.each ([{added: 'added', forceOrdered: true}, var barid = await c.insertAsync({ thing: 'stuff' }); var fooid = await c.insertAsync({ noodles: 'good', bacon: 'bad', apples: 'ok' }); - var handle = await c.find(fooid).observeChanges(logger); + var handle = await c.find(fooid).observeChangesAsync(logger); if (added === 'added') { await logger.expectResult(added, [ fooid, @@ -60,7 +60,7 @@ _.each ([{added: 'added', forceOrdered: true}, const badCursor = c.find({}, { fields: { noodles: 1, _id: false } }); await test.throwsAsync(async function() { - await badCursor.observeChanges(logger); + await badCursor.observeChangesAsync(logger); }); onComplete(); @@ -81,10 +81,10 @@ Tinytest.addAsync('observeChanges - callback isolation', async function( async function(logger) { var handles = []; var cursor = c.find(); - handles.push(await cursor.observeChanges(logger)); + handles.push(await cursor.observeChangesAsync(logger)); // fields-tampering observer handles.push( - await cursor.observeChanges({ + await cursor.observeChangesAsync({ added: function(id, fields) { fields.apples = 'green'; }, @@ -122,7 +122,7 @@ Tinytest.addAsync('observeChanges - single id - initial adds', async function( Meteor.isServer, async function(logger) { var fooid = await c.insertAsync({ noodles: 'good', bacon: 'bad', apples: 'ok' }); - var handle = await c.find(fooid).observeChanges(logger); + var handle = await c.find(fooid).observeChangesAsync(logger); await logger.expectResult('added', [ fooid, { noodles: 'good', bacon: 'bad', apples: 'ok' }, @@ -148,7 +148,7 @@ Tinytest.addAsync('observeChanges - unordered - initial adds', async function( async function(logger) { var fooid = await c.insertAsync({ noodles: 'good', bacon: 'bad', apples: 'ok' }); var barid = await c.insertAsync({ noodles: 'good', bacon: 'weird', apples: 'ok' }); - var handle = await c.find().observeChanges(logger); + var handle = await c.find().observeChangesAsync(logger); await logger.expectResultUnordered([ { callback: 'added', @@ -176,7 +176,7 @@ Tinytest.addAsync('observeChanges - unordered - basics', async function( ['added', 'changed', 'removed'], Meteor.isServer, async function(logger) { - var handle = await c.find().observeChanges(logger); + var handle = await c.find().observeChangesAsync(logger); var barid = await c.insertAsync({ thing: 'stuff' }); await logger.expectResultOnly('added', [barid, { thing: 'stuff' }]); @@ -228,7 +228,7 @@ if (Meteor.isServer) { async function(logger) { var handle = await c .find({}, { fields: { noodles: 1, bacon: 1 } }) - .observeChanges(logger); + .observeChangesAsync(logger); var barid = await c.insertAsync({ thing: 'stuff' }); await logger.expectResultOnly('added', [barid, {}]); @@ -281,7 +281,7 @@ if (Meteor.isServer) { { mac: 1, cheese: 2 }, { fields: { noodles: 1, bacon: 1, eggs: 1 } } ) - .observeChanges(logger); + .observeChangesAsync(logger); var barid = await c.insertAsync({ thing: 'stuff', mac: 1, cheese: 2 }); await logger.expectResultOnly('added', [barid, {}]); @@ -360,7 +360,7 @@ Tinytest.addAsync( { mac: 1, cheese: 2 }, { fields: { noodles: 1, bacon: 1, eggs: 1 } } ) - .observeChanges(logger); + .observeChangesAsync(logger); var fooid = await c.insertAsync({ noodles: 'good', bacon: 'bad', @@ -402,7 +402,7 @@ Tinytest.addAsync( async function(logger) { var handle = await c .find({}, { fields: { 'type.name': 1 } }) - .observeChanges(logger); + .observeChangesAsync(logger); var id = await c.insertAsync({ type: { name: 'foobar' } }); await logger.expectResultOnly('added', [id, { type: { name: 'foobar' } }]); @@ -428,7 +428,7 @@ Tinytest.addAsync( ['added', 'changed', 'removed'], Meteor.isServer, async function(logger) { - var handle = await c.find({ noodles: 'good' }).observeChanges(logger); + var handle = await c.find({ noodles: 'good' }).observeChangesAsync(logger); var barid = await c.insertAsync({ thing: 'stuff' }); var fooid = await c.insertAsync({ noodles: 'good', bacon: 'bad', apples: 'ok' }); @@ -492,7 +492,7 @@ if (Meteor.isServer) { self.expects.push(resolver); var cursor = coll.find({ y: { $ne: 7 } }, { tailable: true }); - self.handle = await cursor.observeChanges({ + self.handle = await cursor.observeChangesAsync({ added: function(id, fields) { self.xs.push(fields.x); test.notEqual(self.expects.length, 0); @@ -550,7 +550,7 @@ testAsyncMulti("observeChanges - bad query", [ var c = makeCollection(); var observeThrows = async function () { await test.throwsAsync(async function () { - await c.find({__id: {$in: null}}).observeChanges({ + await c.find({__id: {$in: null}}).observeChangesAsync({ added: function () { test.fail("added shouldn't be called"); } @@ -588,7 +588,7 @@ if (Meteor.isServer) { await environmentVariable.withValue(true, async function() { var handle = await c .find({}, { fields: { 'type.name': 1 } }) - .observeChanges({ + .observeChangesAsync({ added: function() { test.isTrue(environmentVariable.get()); handle.stop(); diff --git a/tools/tests/apps/failover-test/server/failover-test.js b/tools/tests/apps/failover-test/server/failover-test.js index 81b41117aa..e2e705b128 100644 --- a/tools/tests/apps/failover-test/server/failover-test.js +++ b/tools/tests/apps/failover-test/server/failover-test.js @@ -63,7 +63,7 @@ steps.steppedDown = function () { process.exit(0); }; -C.find().observeChanges({ +C.find().observeChangesAsync({ added: function (id, fields) { if (nextStepTimeout) { Meteor.clearTimeout(nextStepTimeout); From 13483b8e9d2afa9b9b0e743dcf8fdae2dd429c1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nacho=20Codo=C3=B1er?= Date: Thu, 22 Feb 2024 15:40:23 +0100 Subject: [PATCH 3/8] remain observeChanges async but implement observeChangesAsync for migration --- packages/accounts-base/accounts_server.js | 3 +- packages/minimongo/cursor.js | 7 ----- packages/mongo/collection.js | 2 +- packages/mongo/mongo_driver.js | 10 +++---- packages/mongo/mongo_livedata_tests.js | 24 +++++++-------- packages/mongo/observe_changes_tests.js | 30 +++++++++---------- packages/mongo/observe_multiplex.js | 8 ----- packages/mongo/oplog_tests.js | 4 +-- .../failover-test/server/failover-test.js | 2 +- 9 files changed, 37 insertions(+), 53 deletions(-) diff --git a/packages/accounts-base/accounts_server.js b/packages/accounts-base/accounts_server.js index 437e305359..e10a896478 100644 --- a/packages/accounts-base/accounts_server.js +++ b/packages/accounts-base/accounts_server.js @@ -999,7 +999,7 @@ export class AccountsServer extends AccountsCommon { const observe = await this.users.find({ _id: userId, 'services.resume.loginTokens.hashedToken': newToken - }, { fields: { _id: 1 } }).observeChangesAsync({ + }, { fields: { _id: 1 } }).observeChanges({ added: () => { foundMatchingUser = true; }, @@ -1860,3 +1860,4 @@ const generateCasePermutationsForString = string => { } return permutations; } + diff --git a/packages/minimongo/cursor.js b/packages/minimongo/cursor.js index 3192797985..4db04d9734 100644 --- a/packages/minimongo/cursor.js +++ b/packages/minimongo/cursor.js @@ -373,13 +373,6 @@ export default class Cursor { return handle; } - observeChangesAsync(options) { - return new Promise((resolve) => { - const handle = this.observeChanges(options); - handle.isReadyPromise.then(() => resolve(handle)); - }); - } - // XXX Maybe we need a version of observe that just calls a callback if // anything changed. _depend(changers, _allow_unordered) { diff --git a/packages/mongo/collection.js b/packages/mongo/collection.js index 2a9a859db2..498e1608e1 100644 --- a/packages/mongo/collection.js +++ b/packages/mongo/collection.js @@ -565,7 +565,7 @@ Object.assign(Mongo.Collection.prototype, { Object.assign(Mongo.Collection, { async _publishCursor(cursor, sub, collection) { - var observeHandle = await cursor.observeChangesAsync( + var observeHandle = await cursor.observeChanges( { added: function(id, fields) { sub.added(collection, id, fields); diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 0c6c44a710..5abb327e6e 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -967,9 +967,7 @@ Cursor.prototype.observeChanges = function (callbacks, options = {}) { Cursor.prototype.observeChangesAsync = async function (callbacks, options = {}) { var self = this; - var handler = self.observeChanges(callbacks, options); - await handler.ready(); - return handler; + return self.observeChanges(callbacks, options); }; MongoConnection.prototype._createSynchronousCursor = function( @@ -1443,7 +1441,7 @@ MongoConnection.prototype.tail = function (cursorDescription, docCallback, timeo }; Object.assign(MongoConnection.prototype, { - _observeChanges: function ( + _observeChanges: async function ( cursorDescription, ordered, callbacks, nonMutatingCallbacks) { var self = this; @@ -1538,7 +1536,7 @@ Object.assign(MongoConnection.prototype, { }); if (observeDriver._init) { - observeHandle.initObserver = observeDriver._init(); + await observeDriver._init(); } // This field is only set for use in tests. @@ -1546,7 +1544,7 @@ Object.assign(MongoConnection.prototype, { } self._observeMultiplexers[observeKey] = multiplexer; // Blocks until the initial adds have been sent. - observeHandle.initHandler = multiplexer.addHandleAndSendInitialAdds(observeHandle); + await multiplexer.addHandleAndSendInitialAdds(observeHandle); return observeHandle; }, diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index 0bb6507047..4e50d5d507 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -700,7 +700,7 @@ _.each( [ 'MONGO', 'STRING'], function(idGeneration) { }, }); - const handle2 = await coll.find({ run }).observeChangesAsync( + const handle2 = await coll.find({ run }).observeChanges( { added: expectNotMutatable, changed: function(id, o) { @@ -2017,7 +2017,7 @@ _.each( [ 'MONGO', 'STRING'], function(idGeneration) { let polls = {}; const handlesToStop = []; const observe = async function(name, query) { - const handle = await coll.find(query).observeChangesAsync({ + const handle = await coll.find(query).observeChanges({ // Make sure that we only poll on invalidation, not due to time, and // keep track of when we do. Note: this option disables the use of // oplogs (which admittedly is somewhat irrelevant to this feature). @@ -3149,7 +3149,7 @@ if (Meteor.isServer) { async function (test, expect) { var self = this; if (self.miniC) { - self.obs = await self.miniC.find().observeChangesAsync({ + self.obs = await self.miniC.find().observeChanges({ added: async function (id, fields) { self.events.push({evt: "a", id: id}); await Meteor._sleepForMs(200); @@ -3476,13 +3476,13 @@ Meteor.isServer && if (MongoInternals.defaultRemoteCollectionDriver().mongo._oplogHandle) { var observeWithOplog = await coll .find({ x: 5 }) - .observeChangesAsync({ added: function() {} }); + .observeChanges({ added: function() {} }); test.isTrue(observeWithOplog._multiplexer._observeDriver._usesOplog); await observeWithOplog.stop(); } var observeWithoutOplog = await coll .find({ x: 6 }, { _disableOplog: true }) - .observeChangesAsync({ added: function() {} }); + .observeChanges({ added: function() {} }); test.isFalse(observeWithoutOplog._multiplexer._observeDriver._usesOplog); await observeWithoutOplog.stop(); }); @@ -3506,7 +3506,7 @@ Meteor.isServer && var output = []; var handle = await coll .find({ a: 1, b: 2 }, { fields: { c: 1 } }) - .observeChangesAsync({ + .observeChanges({ added: function(id, fields) { output.push(['added', id, fields]); }, @@ -3559,7 +3559,7 @@ Meteor.isServer && ); var changesOutput = []; - var changesHandle = await cursor.observeChangesAsync({ + var changesHandle = await cursor.observeChanges({ added: function(id, fields) { changesOutput.push(['added', fields]); }, @@ -3604,7 +3604,7 @@ Meteor.isServer && var tmp; var output = []; - var handle = await coll.find({ a: 'foo' }).observeChangesAsync({ + var handle = await coll.find({ a: 'foo' }).observeChanges({ added: function(id, fields) { output.push(['added', id, fields]); }, @@ -3717,7 +3717,7 @@ testAsyncMulti('mongo-livedata - oplog - update EJSON', [ async function(test, expect) { var self = this; self.changes = []; - self.handle = await self.collection.find({}).observeChangesAsync({ + self.handle = await self.collection.find({}).observeChanges({ added: function(id, fields) { self.changes.push(['a', id, fields]); }, @@ -3819,7 +3819,7 @@ Meteor.isServer && _.times(100, async function() { await coll.insertAsync({ foo: 'baz' }); }); - var handler = await coll.find({}).observeChangesAsync({ + var handler = await coll.find({}).observeChanges({ added: async function(id) { await coll.updateAsync(id, { $set: { foo: 'bar' } }); }, @@ -4050,7 +4050,7 @@ if (Meteor.isClient) { futuresByNonce[nonce] = new Promise(r => (resolver = r)); var observe = await fenceOnBeforeFireErrorCollection .find({ nonce: nonce }) - .observeChangesAsync({ added: function() {} }); + .observeChanges({ added: function() {} }); Meteor.setTimeout(async function() { try { await fenceOnBeforeFireErrorCollection.insertAsync({ nonce }); @@ -4214,7 +4214,7 @@ if (Meteor.isServer) { resolve(); } - const observeHandle = await Collection.find().observeChangesAsync({ + const observeHandle = await Collection.find().observeChanges({ async changed(id, fields) { let expectedValue; diff --git a/packages/mongo/observe_changes_tests.js b/packages/mongo/observe_changes_tests.js index 6fff210a9e..30004d4b1d 100644 --- a/packages/mongo/observe_changes_tests.js +++ b/packages/mongo/observe_changes_tests.js @@ -25,7 +25,7 @@ _.each ([{added: 'added', forceOrdered: true}, var barid = await c.insertAsync({ thing: 'stuff' }); var fooid = await c.insertAsync({ noodles: 'good', bacon: 'bad', apples: 'ok' }); - var handle = await c.find(fooid).observeChangesAsync(logger); + var handle = await c.find(fooid).observeChanges(logger); if (added === 'added') { await logger.expectResult(added, [ fooid, @@ -60,7 +60,7 @@ _.each ([{added: 'added', forceOrdered: true}, const badCursor = c.find({}, { fields: { noodles: 1, _id: false } }); await test.throwsAsync(async function() { - await badCursor.observeChangesAsync(logger); + await badCursor.observeChanges(logger); }); onComplete(); @@ -81,10 +81,10 @@ Tinytest.addAsync('observeChanges - callback isolation', async function( async function(logger) { var handles = []; var cursor = c.find(); - handles.push(await cursor.observeChangesAsync(logger)); + handles.push(await cursor.observeChanges(logger)); // fields-tampering observer handles.push( - await cursor.observeChangesAsync({ + await cursor.observeChanges({ added: function(id, fields) { fields.apples = 'green'; }, @@ -122,7 +122,7 @@ Tinytest.addAsync('observeChanges - single id - initial adds', async function( Meteor.isServer, async function(logger) { var fooid = await c.insertAsync({ noodles: 'good', bacon: 'bad', apples: 'ok' }); - var handle = await c.find(fooid).observeChangesAsync(logger); + var handle = await c.find(fooid).observeChanges(logger); await logger.expectResult('added', [ fooid, { noodles: 'good', bacon: 'bad', apples: 'ok' }, @@ -148,7 +148,7 @@ Tinytest.addAsync('observeChanges - unordered - initial adds', async function( async function(logger) { var fooid = await c.insertAsync({ noodles: 'good', bacon: 'bad', apples: 'ok' }); var barid = await c.insertAsync({ noodles: 'good', bacon: 'weird', apples: 'ok' }); - var handle = await c.find().observeChangesAsync(logger); + var handle = await c.find().observeChanges(logger); await logger.expectResultUnordered([ { callback: 'added', @@ -176,7 +176,7 @@ Tinytest.addAsync('observeChanges - unordered - basics', async function( ['added', 'changed', 'removed'], Meteor.isServer, async function(logger) { - var handle = await c.find().observeChangesAsync(logger); + var handle = await c.find().observeChanges(logger); var barid = await c.insertAsync({ thing: 'stuff' }); await logger.expectResultOnly('added', [barid, { thing: 'stuff' }]); @@ -228,7 +228,7 @@ if (Meteor.isServer) { async function(logger) { var handle = await c .find({}, { fields: { noodles: 1, bacon: 1 } }) - .observeChangesAsync(logger); + .observeChanges(logger); var barid = await c.insertAsync({ thing: 'stuff' }); await logger.expectResultOnly('added', [barid, {}]); @@ -281,7 +281,7 @@ if (Meteor.isServer) { { mac: 1, cheese: 2 }, { fields: { noodles: 1, bacon: 1, eggs: 1 } } ) - .observeChangesAsync(logger); + .observeChanges(logger); var barid = await c.insertAsync({ thing: 'stuff', mac: 1, cheese: 2 }); await logger.expectResultOnly('added', [barid, {}]); @@ -360,7 +360,7 @@ Tinytest.addAsync( { mac: 1, cheese: 2 }, { fields: { noodles: 1, bacon: 1, eggs: 1 } } ) - .observeChangesAsync(logger); + .observeChanges(logger); var fooid = await c.insertAsync({ noodles: 'good', bacon: 'bad', @@ -402,7 +402,7 @@ Tinytest.addAsync( async function(logger) { var handle = await c .find({}, { fields: { 'type.name': 1 } }) - .observeChangesAsync(logger); + .observeChanges(logger); var id = await c.insertAsync({ type: { name: 'foobar' } }); await logger.expectResultOnly('added', [id, { type: { name: 'foobar' } }]); @@ -428,7 +428,7 @@ Tinytest.addAsync( ['added', 'changed', 'removed'], Meteor.isServer, async function(logger) { - var handle = await c.find({ noodles: 'good' }).observeChangesAsync(logger); + var handle = await c.find({ noodles: 'good' }).observeChanges(logger); var barid = await c.insertAsync({ thing: 'stuff' }); var fooid = await c.insertAsync({ noodles: 'good', bacon: 'bad', apples: 'ok' }); @@ -492,7 +492,7 @@ if (Meteor.isServer) { self.expects.push(resolver); var cursor = coll.find({ y: { $ne: 7 } }, { tailable: true }); - self.handle = await cursor.observeChangesAsync({ + self.handle = await cursor.observeChanges({ added: function(id, fields) { self.xs.push(fields.x); test.notEqual(self.expects.length, 0); @@ -550,7 +550,7 @@ testAsyncMulti("observeChanges - bad query", [ var c = makeCollection(); var observeThrows = async function () { await test.throwsAsync(async function () { - await c.find({__id: {$in: null}}).observeChangesAsync({ + await c.find({__id: {$in: null}}).observeChanges({ added: function () { test.fail("added shouldn't be called"); } @@ -588,7 +588,7 @@ if (Meteor.isServer) { await environmentVariable.withValue(true, async function() { var handle = await c .find({}, { fields: { 'type.name': 1 } }) - .observeChangesAsync({ + .observeChanges({ added: function() { test.isTrue(environmentVariable.get()); handle.stop(); diff --git a/packages/mongo/observe_multiplex.js b/packages/mongo/observe_multiplex.js index 8c7d3bd41a..b138eb9937 100644 --- a/packages/mongo/observe_multiplex.js +++ b/packages/mongo/observe_multiplex.js @@ -222,19 +222,11 @@ ObserveHandle = class { this._stopped = false; this._id = nextObserveHandleId++; this.nonMutatingCallbacks = nonMutatingCallbacks; - this.initObserver = undefined; - this.initHandler = undefined; - } - - async ready() { - if (this.initObserver) await this.initObserver; - if (this.initHandler) await this.initHandler; } async stop() { if (this._stopped) return; this._stopped = true; - await this.ready(); await this._multiplexer.removeHandle(this._id); } }; diff --git a/packages/mongo/oplog_tests.js b/packages/mongo/oplog_tests.js index 66ea2b3f6c..455dd9f57e 100644 --- a/packages/mongo/oplog_tests.js +++ b/packages/mongo/oplog_tests.js @@ -8,7 +8,7 @@ Tinytest.addAsync('mongo-livedata - oplog - cursorSupported', async function( var supported = async function(expected, selector, options) { var cursor = OplogCollection.find(selector, options); - var handle = await cursor.observeChangesAsync({ added: function() {} }); + var handle = await cursor.observeChanges({ added: function() {} }); // If there's no oplog at all, we shouldn't ever use it. if (!oplogEnabled) expected = false; test.equal(!!handle._multiplexer._observeDriver._usesOplog, expected); @@ -125,7 +125,7 @@ process.env.MONGO_OPLOG_URL && species: 'dog', color: 'blue', }) - .observeChangesAsync({ + .observeChanges({ added(id, fields) { if (fields.name === 'dog 5') { blueDog5Id = id; diff --git a/tools/tests/apps/failover-test/server/failover-test.js b/tools/tests/apps/failover-test/server/failover-test.js index e2e705b128..81b41117aa 100644 --- a/tools/tests/apps/failover-test/server/failover-test.js +++ b/tools/tests/apps/failover-test/server/failover-test.js @@ -63,7 +63,7 @@ steps.steppedDown = function () { process.exit(0); }; -C.find().observeChangesAsync({ +C.find().observeChanges({ added: function (id, fields) { if (nextStepTimeout) { Meteor.clearTimeout(nextStepTimeout); From 14efff200b772bf1d733519013573e459caf68d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nacho=20Codo=C3=B1er?= Date: Thu, 22 Feb 2024 15:59:54 +0100 Subject: [PATCH 4/8] implement observeChangesAsync on the client --- packages/minimongo/cursor.js | 7 +++++++ packages/mongo/mongo_driver.js | 3 +-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/minimongo/cursor.js b/packages/minimongo/cursor.js index 4db04d9734..3192797985 100644 --- a/packages/minimongo/cursor.js +++ b/packages/minimongo/cursor.js @@ -373,6 +373,13 @@ export default class Cursor { return handle; } + observeChangesAsync(options) { + return new Promise((resolve) => { + const handle = this.observeChanges(options); + handle.isReadyPromise.then(() => resolve(handle)); + }); + } + // XXX Maybe we need a version of observe that just calls a callback if // anything changed. _depend(changers, _allow_unordered) { diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 5abb327e6e..3037e07b6b 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -966,8 +966,7 @@ Cursor.prototype.observeChanges = function (callbacks, options = {}) { }; Cursor.prototype.observeChangesAsync = async function (callbacks, options = {}) { - var self = this; - return self.observeChanges(callbacks, options); + return this.observeChanges(callbacks, options); }; MongoConnection.prototype._createSynchronousCursor = function( From 4aef1e3ba628c74afa2d805296360a8fb7428ad3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nacho=20Codo=C3=B1er?= Date: Thu, 22 Feb 2024 16:37:14 +0100 Subject: [PATCH 5/8] implement observeChangesAsync test --- packages/mongo/mongo_livedata_tests.js | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index 4e50d5d507..3e04440537 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -4302,3 +4302,19 @@ Tinytest.addAsync('mongo-livedata - maintained isomorphism using resolverType co test.equal(items, []); }); + +Tinytest.addAsync("mongo-livedata - support observeChangesAsync on client and server to keep isomorphism", async (test) => { + const Collection = new Mongo.Collection(`observe_changes_async${test.runId()}`, { resolverType: 'stub' }); + const id = 'a'; + await Collection.insertAsync({ _id: id, foo: { bar: 123 } }); + + return new Promise(async resolve => { + const obs = await Collection.find(id).observeChangesAsync({ + async changed() { + resolve(); + await obs.stop(); + }, + }); + await Collection.updateAsync(id, { $set: { 'foo.bar': 456 } }); + }); +}); From e7c051a70dba120c6783ce3092e21f7556206489 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nacho=20Codo=C3=B1er?= Date: Thu, 22 Feb 2024 16:38:22 +0100 Subject: [PATCH 6/8] resolve after stop observer --- packages/mongo/mongo_livedata_tests.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index 3e04440537..087fdffc24 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -4303,7 +4303,7 @@ Tinytest.addAsync('mongo-livedata - maintained isomorphism using resolverType co test.equal(items, []); }); -Tinytest.addAsync("mongo-livedata - support observeChangesAsync on client and server to keep isomorphism", async (test) => { +Tinytest.addAsync("mongo-livedata - support observeChangesAsync to keep isomorphism on client and server", async (test) => { const Collection = new Mongo.Collection(`observe_changes_async${test.runId()}`, { resolverType: 'stub' }); const id = 'a'; await Collection.insertAsync({ _id: id, foo: { bar: 123 } }); @@ -4311,8 +4311,8 @@ Tinytest.addAsync("mongo-livedata - support observeChangesAsync on client and se return new Promise(async resolve => { const obs = await Collection.find(id).observeChangesAsync({ async changed() { - resolve(); await obs.stop(); + resolve(); }, }); await Collection.updateAsync(id, { $set: { 'foo.bar': 456 } }); From 889e40f664cb83b29b62a8db71f122a99c290795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nacho=20Codo=C3=B1er?= Date: Thu, 22 Feb 2024 16:47:49 +0100 Subject: [PATCH 7/8] implement observeChangesAsync test --- packages/mongo/mongo_livedata_tests.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index 087fdffc24..af7d97dfb6 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -4310,9 +4310,11 @@ Tinytest.addAsync("mongo-livedata - support observeChangesAsync to keep isomorph return new Promise(async resolve => { const obs = await Collection.find(id).observeChangesAsync({ - async changed() { + async changed(_id, fields) { await obs.stop(); resolve(); + test.equal(_id, id); + test.equal(fields?.foo?.bar, 456); }, }); await Collection.updateAsync(id, { $set: { 'foo.bar': 456 } }); From d9f820d2523bf3d3b22f3904b039718cef8ddec3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nacho=20Codo=C3=B1er?= Date: Mon, 26 Feb 2024 15:13:13 +0100 Subject: [PATCH 8/8] implement observeAsync and test --- packages/minimongo/cursor.js | 16 +++++++++ packages/mongo/mongo_driver.js | 4 +++ packages/mongo/mongo_livedata_tests.js | 49 ++++++++++++++++++-------- 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/packages/minimongo/cursor.js b/packages/minimongo/cursor.js index 3192797985..1cdd2c88c1 100644 --- a/packages/minimongo/cursor.js +++ b/packages/minimongo/cursor.js @@ -216,6 +216,16 @@ export default class Cursor { return LocalCollection._observeFromObserveChanges(this, options); } + /** + * @summary observe async version + * @locus Anywhere + * @memberOf Promise + * @instance + */ + observeAsync(options) { + return new Promise(resolve => resolve(this.observe(options))); + } + /** * @summary Watch a query. Receive callbacks as the result set changes. Only * the differences between the old and new documents are passed to @@ -373,6 +383,12 @@ export default class Cursor { return handle; } + /** + * @summary observeChanges async version + * @locus Anywhere + * @memberOf Promise + * @instance + */ observeChangesAsync(options) { return new Promise((resolve) => { const handle = this.observeChanges(options); diff --git a/packages/mongo/mongo_driver.js b/packages/mongo/mongo_driver.js index 3037e07b6b..e92c40cf7c 100644 --- a/packages/mongo/mongo_driver.js +++ b/packages/mongo/mongo_driver.js @@ -940,6 +940,10 @@ Cursor.prototype.observe = function (callbacks) { return LocalCollection._observeFromObserveChanges(self, callbacks); }; +Cursor.prototype.observeAsync = function (callbacks) { + return new Promise(resolve => resolve(this.observe(callbacks))); +}; + Cursor.prototype.observeChanges = function (callbacks, options = {}) { var self = this; var methods = [ diff --git a/packages/mongo/mongo_livedata_tests.js b/packages/mongo/mongo_livedata_tests.js index af7d97dfb6..413548caa1 100644 --- a/packages/mongo/mongo_livedata_tests.js +++ b/packages/mongo/mongo_livedata_tests.js @@ -4303,20 +4303,39 @@ Tinytest.addAsync('mongo-livedata - maintained isomorphism using resolverType co test.equal(items, []); }); -Tinytest.addAsync("mongo-livedata - support observeChangesAsync to keep isomorphism on client and server", async (test) => { - const Collection = new Mongo.Collection(`observe_changes_async${test.runId()}`, { resolverType: 'stub' }); - const id = 'a'; - await Collection.insertAsync({ _id: id, foo: { bar: 123 } }); +testAsyncMulti("mongo-livedata - support observeChangesAsync and observeAsync to keep isomorphism on client and server", [ + async (test) => { + const Collection = new Mongo.Collection(`observe_changes_async${test.runId()}`, { resolverType: 'stub' }); + const id = 'a'; + await Collection.insertAsync({ _id: id, foo: { bar: 123 } }); - return new Promise(async resolve => { - const obs = await Collection.find(id).observeChangesAsync({ - async changed(_id, fields) { - await obs.stop(); - resolve(); - test.equal(_id, id); - test.equal(fields?.foo?.bar, 456); - }, + return new Promise(async resolve => { + const obs = await Collection.find(id).observeChangesAsync({ + async changed(_id, fields) { + await obs.stop(); + resolve(); + test.equal(_id, id); + test.equal(fields?.foo?.bar, 456); + }, + }); + await Collection.updateAsync(id, { $set: { 'foo.bar': 456 } }); }); - await Collection.updateAsync(id, { $set: { 'foo.bar': 456 } }); - }); -}); + }, + async (test) => { + const Collection = new Mongo.Collection(`observe_async${test.runId()}`, { resolverType: 'stub' }); + const id = 'a'; + await Collection.insertAsync({ _id: id, foo: { bar: 123 } }); + + return new Promise(async resolve => { + const obs = await Collection.find(id).observeAsync({ + async changed(newDocument) { + await obs.stop(); + test.equal(newDocument._id, id); + test.equal(newDocument?.foo?.bar, 456); + resolve(); + }, + }); + await Collection.updateAsync(id, { $set: { 'foo.bar': 456 } }); + }); + } +]);