From d590d20e1948c3fa8c3fab204aec4fae554bbc21 Mon Sep 17 00:00:00 2001 From: denihs Date: Tue, 24 Jan 2023 15:52:02 -0400 Subject: [PATCH] - fix `receive data` and `buffering data` --- .../.npm/package/npm-shrinkwrap.json | 11 +++++++--- .../ddp-client/common/livedata_connection.js | 16 ++++++++++----- .../test/livedata_connection_tests.js | 20 +++++++++++++++++-- packages/mongo/collection.js | 8 +++++--- .../webapp/.npm/package/npm-shrinkwrap.json | 12 +++++------ 5 files changed, 48 insertions(+), 19 deletions(-) diff --git a/packages/babel-compiler/.npm/package/npm-shrinkwrap.json b/packages/babel-compiler/.npm/package/npm-shrinkwrap.json index ef860ee67d..4d2e9918e8 100644 --- a/packages/babel-compiler/.npm/package/npm-shrinkwrap.json +++ b/packages/babel-compiler/.npm/package/npm-shrinkwrap.json @@ -623,9 +623,9 @@ "integrity": "sha512-LEdx+3A7wV8XeTFWrfkOu0PtX+Vjgl28b4mW8bsM3lZPWIqx0PXTfpbXfDy61MgmmYXR7/cWjVJ8xqgSunF3Mg==", "dependencies": { "acorn": { - "version": "8.8.1", - "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz", - "integrity": "sha512-7zFpHzhnqYKrkYdUjF1HI1bzd0VygEGX8lFk4k5zVMqHEoES+P+7TKI+EvLO9WVMJ8eekdO0aDEK044xTXwPPA==" + "version": "8.8.2", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.2.tgz", + "integrity": "sha512-xjIYgE8HBrkpd/sJqOGNspf8uHG+NOHGOw6a/Urj8taM2EXfdNAH2oFcPeIFfsv3+kz/mJrS5VuMqbNLjCa2vw==" }, "semver": { "version": "5.7.1", @@ -644,6 +644,11 @@ "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.2.tgz", "integrity": "sha512-xjIYgE8HBrkpd/sJqOGNspf8uHG+NOHGOw6a/Urj8taM2EXfdNAH2oFcPeIFfsv3+kz/mJrS5VuMqbNLjCa2vw==" }, + "acorn-dynamic-import": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/acorn-dynamic-import/-/acorn-dynamic-import-4.0.0.tgz", + "integrity": "sha512-d3OEjQV4ROpoflsnUA8HozoIR504TFxNivYEUi6uwz0IYhBkTDXGuWlNdMtybRt3nqVx/L6XqMt0FxkXuWKZhw==" + }, "ansi-colors": { "version": "3.2.3", "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-3.2.3.tgz", diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 3d18074f69..0580519eec 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -1262,7 +1262,7 @@ export class Connection { self._bufferedWritesFlushAt = new Date().valueOf() + self._bufferedWritesMaxAge; } else if (self._bufferedWritesFlushAt < new Date().valueOf()) { - self._flushBufferedWrites(); + await self._flushBufferedWrites(); return; } @@ -1270,7 +1270,13 @@ export class Connection { clearTimeout(self._bufferedWritesFlushHandle); } self._bufferedWritesFlushHandle = setTimeout( - self.__flushBufferedWrites, + () => { + // __flushBufferedWrites is a promise, so with this we can wait the promise to finish + // before doing something + self._liveDataWritesPromise = self + .__flushBufferedWrites() + .finally(() => (self._liveDataWritesPromise = undefined)); + }, self._bufferedWritesInterval ); } @@ -1586,14 +1592,14 @@ export class Connection { } } - _livedata_result(msg) { + async _livedata_result(msg) { // id, result or error. error has error (code), reason, details const self = this; // Lets make sure there are no buffered writes before returning result. if (! isEmpty(self._bufferedWrites)) { - self._flushBufferedWrites(); + await self._flushBufferedWrites(); } // find the outstanding request @@ -1785,7 +1791,7 @@ export class Connection { } else if (msg.msg === 'nosub') { await this._livedata_nosub(msg); } else if (msg.msg === 'result') { - this._livedata_result(msg); + await this._livedata_result(msg); } else if (msg.msg === 'error') { this._livedata_error(msg); } else { diff --git a/packages/ddp-client/test/livedata_connection_tests.js b/packages/ddp-client/test/livedata_connection_tests.js index ea79979131..58fc7faca9 100644 --- a/packages/ddp-client/test/livedata_connection_tests.js +++ b/packages/ddp-client/test/livedata_connection_tests.js @@ -96,10 +96,10 @@ Tinytest.addAsync('livedata stub - receive data', async function(test) { // options works. const coll = new Mongo.Collection(coll_name, conn); + await coll._settingUpReplicationPromise; + // queue has been emptied and doc is in db. test.isUndefined(conn._updatesForUnknownStores[coll_name]); - console.log('conn._updatesForUnknownStores[coll_name]', conn._updatesForUnknownStores[coll_name], coll); - return test.equal(coll.find({}).fetch(), [{ _id: '1234', a: 1 }]); // second message. applied directly to the db. @@ -132,6 +132,11 @@ Tinytest.addAsync('livedata stub - buffering data', async function(test) { const testDocCount = async count => test.equal(await coll.find({}).count(), count); + const testIsLiveDataWritesPromiseUndefined = isUndefined => + isUndefined + ? test.isUndefined(conn._liveDataWritesPromise) + : test.isNotUndefined(conn._liveDataWritesPromise); + const addDoc = async () => { await stream.receive({ msg: 'added', @@ -144,16 +149,24 @@ Tinytest.addAsync('livedata stub - buffering data', async function(test) { // Starting at 0 ticks. At this point we haven't advanced the fake clock at all. await addDoc(); // 1st Doc + testIsLiveDataWritesPromiseUndefined(true); // make sure _liveDataWritesPromise is not set await testDocCount(0); // No doc been recognized yet because it's buffered, waiting for more. tick(6); // 6 total ticks + testIsLiveDataWritesPromiseUndefined(true);// make sure _liveDataWritesPromise is not set await testDocCount(0); // Ensure that the doc still hasn't shown up, despite the clock moving forward. tick(4); // 10 total ticks, 1st buffer interval + testIsLiveDataWritesPromiseUndefined(false); // make sure _liveDataWritesPromise is set + await conn._liveDataWritesPromise; // wait for _liveDataWritesPromise to finish await testDocCount(1); // No other docs have arrived, so we 'see' the 1st doc. await addDoc(); // 2nd doc + testIsLiveDataWritesPromiseUndefined(true); tick(1); // 11 total ticks (1 since last flush) + testIsLiveDataWritesPromiseUndefined(true); await testDocCount(1); // Again, second doc hasn't arrived because we're waiting for more... tick(9); // 20 total ticks (10 ticks since last flush & the 2nd 10-tick interval) + testIsLiveDataWritesPromiseUndefined(false); + await conn._liveDataWritesPromise; await testDocCount(2); // Now we're here and got the second document. // Add several docs, frequently enough that we buffer multiple times before the next flush. @@ -169,8 +182,11 @@ Tinytest.addAsync('livedata stub - buffering data', async function(test) { tick(9); // 53 ticks (33 since last flush) await addDoc(); // 8 docs tick(9); // 62 ticks! (42 ticks since last flush, over max-age - next interval triggers flush) + testIsLiveDataWritesPromiseUndefined(true); await testDocCount(2); // Still at 2 from before! (Just making sure) tick(1); // Ok, 63 ticks (10 since last doc, so this should cause the flush of all the docs) + testIsLiveDataWritesPromiseUndefined(false); + await conn._liveDataWritesPromise; await testDocCount(8); // See all the docs. // Put things back how they were. diff --git a/packages/mongo/collection.js b/packages/mongo/collection.js index f85f8c3ebe..d1d2a64e5a 100644 --- a/packages/mongo/collection.js +++ b/packages/mongo/collection.js @@ -118,7 +118,9 @@ Mongo.Collection = function Collection(name, options) { this._name = name; this._driver = options._driver; - this._maybeSetUpReplication(name, options); + // TODO[fibers]: _maybeSetUpReplication is now async. Let's watch how not waiting for this function to finish + // will affect everything + this._settingUpReplicationPromise = this._maybeSetUpReplication(name, options); // XXX don't define these until allow or deny is actually used for this // collection. Could be hard if the security rules are only defined on the @@ -152,7 +154,7 @@ Mongo.Collection = function Collection(name, options) { }; Object.assign(Mongo.Collection.prototype, { - _maybeSetUpReplication(name, { _suppressSameNameError = false }) { + async _maybeSetUpReplication(name, { _suppressSameNameError = false }) { const self = this; if (!(self._connection && self._connection.registerStore)) { return; @@ -161,7 +163,7 @@ Object.assign(Mongo.Collection.prototype, { // OK, we're going to be a slave, replicating some remote // database, except possibly with some temporary divergence while // we have unacknowledged RPC's. - const ok = self._connection.registerStore(name, { + const ok = await self._connection.registerStore(name, { // Called at the beginning of a batch of updates. batchSize is the number // of update calls to expect. // diff --git a/packages/webapp/.npm/package/npm-shrinkwrap.json b/packages/webapp/.npm/package/npm-shrinkwrap.json index 56f9e25aaf..d76c33b1dc 100644 --- a/packages/webapp/.npm/package/npm-shrinkwrap.json +++ b/packages/webapp/.npm/package/npm-shrinkwrap.json @@ -17,9 +17,9 @@ "integrity": "sha512-Yv0k4bXGOH+8a+7bELd2PqHQsuiANB+A8a4gnQrkRWzrkKlb6KHaVvyXhqs04sVW/OWlbPyYxRgYlIXLfrufMQ==" }, "@types/express-serve-static-core": { - "version": "4.17.32", - "resolved": "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-4.17.32.tgz", - "integrity": "sha512-aI5h/VOkxOF2Z1saPy0Zsxs5avets/iaiAJYznQFm5By/pamU31xWKL//epiF4OfUA2qTOc9PV6tCUjhO8wlZA==" + "version": "4.17.33", + "resolved": "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-4.17.33.tgz", + "integrity": "sha512-TPBqmR/HRYI3eC2E5hmiivIzv+bidAfXofM+sbonAGvyDhySGw9/PQZFt2BLOrjUUR++4eJVpx6KnLQK1Fk9tA==" }, "@types/mime": { "version": "3.0.1", @@ -245,9 +245,9 @@ "integrity": "sha512-yIovAzMX49sF8Yl58fSCWJ5svSLuaibPxXQJFLmBObTuCr0Mf1KiPopGM9NiFjiYBCbfaa2Fh6breQ6ANVTI0A==" }, "get-intrinsic": { - "version": "1.1.3", - "resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.1.3.tgz", - "integrity": "sha512-QJVz1Tj7MS099PevUG5jvnt9tSkXN8K14dxQlikJuPt4uD9hHAHjLyLBiLR5zELelBdD9QNRAXZzsJx0WaDL9A==" + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.0.tgz", + "integrity": "sha512-L049y6nFOuom5wGyRc3/gdTLO94dySVKRACj1RmJZBQXlbTMhtNIgkWkUHq+jYmZvKf14EW1EoJnnjbmoHij0Q==" }, "get-stream": { "version": "6.0.1",