- fix receive data and buffering data

This commit is contained in:
denihs
2023-01-24 15:52:02 -04:00
parent 368e059e0b
commit d590d20e19
5 changed files with 48 additions and 19 deletions

View File

@@ -623,9 +623,9 @@
"integrity": "sha512-LEdx+3A7wV8XeTFWrfkOu0PtX+Vjgl28b4mW8bsM3lZPWIqx0PXTfpbXfDy61MgmmYXR7/cWjVJ8xqgSunF3Mg==",
"dependencies": {
"acorn": {
"version": "8.8.1",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz",
"integrity": "sha512-7zFpHzhnqYKrkYdUjF1HI1bzd0VygEGX8lFk4k5zVMqHEoES+P+7TKI+EvLO9WVMJ8eekdO0aDEK044xTXwPPA=="
"version": "8.8.2",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.2.tgz",
"integrity": "sha512-xjIYgE8HBrkpd/sJqOGNspf8uHG+NOHGOw6a/Urj8taM2EXfdNAH2oFcPeIFfsv3+kz/mJrS5VuMqbNLjCa2vw=="
},
"semver": {
"version": "5.7.1",
@@ -644,6 +644,11 @@
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.2.tgz",
"integrity": "sha512-xjIYgE8HBrkpd/sJqOGNspf8uHG+NOHGOw6a/Urj8taM2EXfdNAH2oFcPeIFfsv3+kz/mJrS5VuMqbNLjCa2vw=="
},
"acorn-dynamic-import": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/acorn-dynamic-import/-/acorn-dynamic-import-4.0.0.tgz",
"integrity": "sha512-d3OEjQV4ROpoflsnUA8HozoIR504TFxNivYEUi6uwz0IYhBkTDXGuWlNdMtybRt3nqVx/L6XqMt0FxkXuWKZhw=="
},
"ansi-colors": {
"version": "3.2.3",
"resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-3.2.3.tgz",

View File

@@ -1262,7 +1262,7 @@ export class Connection {
self._bufferedWritesFlushAt =
new Date().valueOf() + self._bufferedWritesMaxAge;
} else if (self._bufferedWritesFlushAt < new Date().valueOf()) {
self._flushBufferedWrites();
await self._flushBufferedWrites();
return;
}
@@ -1270,7 +1270,13 @@ export class Connection {
clearTimeout(self._bufferedWritesFlushHandle);
}
self._bufferedWritesFlushHandle = setTimeout(
self.__flushBufferedWrites,
() => {
// __flushBufferedWrites is a promise, so with this we can wait the promise to finish
// before doing something
self._liveDataWritesPromise = self
.__flushBufferedWrites()
.finally(() => (self._liveDataWritesPromise = undefined));
},
self._bufferedWritesInterval
);
}
@@ -1586,14 +1592,14 @@ export class Connection {
}
}
_livedata_result(msg) {
async _livedata_result(msg) {
// id, result or error. error has error (code), reason, details
const self = this;
// Lets make sure there are no buffered writes before returning result.
if (! isEmpty(self._bufferedWrites)) {
self._flushBufferedWrites();
await self._flushBufferedWrites();
}
// find the outstanding request
@@ -1785,7 +1791,7 @@ export class Connection {
} else if (msg.msg === 'nosub') {
await this._livedata_nosub(msg);
} else if (msg.msg === 'result') {
this._livedata_result(msg);
await this._livedata_result(msg);
} else if (msg.msg === 'error') {
this._livedata_error(msg);
} else {

View File

@@ -96,10 +96,10 @@ Tinytest.addAsync('livedata stub - receive data', async function(test) {
// options works.
const coll = new Mongo.Collection(coll_name, conn);
await coll._settingUpReplicationPromise;
// queue has been emptied and doc is in db.
test.isUndefined(conn._updatesForUnknownStores[coll_name]);
console.log('conn._updatesForUnknownStores[coll_name]', conn._updatesForUnknownStores[coll_name], coll);
return
test.equal(coll.find({}).fetch(), [{ _id: '1234', a: 1 }]);
// second message. applied directly to the db.
@@ -132,6 +132,11 @@ Tinytest.addAsync('livedata stub - buffering data', async function(test) {
const testDocCount = async count => test.equal(await coll.find({}).count(), count);
const testIsLiveDataWritesPromiseUndefined = isUndefined =>
isUndefined
? test.isUndefined(conn._liveDataWritesPromise)
: test.isNotUndefined(conn._liveDataWritesPromise);
const addDoc = async () => {
await stream.receive({
msg: 'added',
@@ -144,16 +149,24 @@ Tinytest.addAsync('livedata stub - buffering data', async function(test) {
// Starting at 0 ticks. At this point we haven't advanced the fake clock at all.
await addDoc(); // 1st Doc
testIsLiveDataWritesPromiseUndefined(true); // make sure _liveDataWritesPromise is not set
await testDocCount(0); // No doc been recognized yet because it's buffered, waiting for more.
tick(6); // 6 total ticks
testIsLiveDataWritesPromiseUndefined(true);// make sure _liveDataWritesPromise is not set
await testDocCount(0); // Ensure that the doc still hasn't shown up, despite the clock moving forward.
tick(4); // 10 total ticks, 1st buffer interval
testIsLiveDataWritesPromiseUndefined(false); // make sure _liveDataWritesPromise is set
await conn._liveDataWritesPromise; // wait for _liveDataWritesPromise to finish
await testDocCount(1); // No other docs have arrived, so we 'see' the 1st doc.
await addDoc(); // 2nd doc
testIsLiveDataWritesPromiseUndefined(true);
tick(1); // 11 total ticks (1 since last flush)
testIsLiveDataWritesPromiseUndefined(true);
await testDocCount(1); // Again, second doc hasn't arrived because we're waiting for more...
tick(9); // 20 total ticks (10 ticks since last flush & the 2nd 10-tick interval)
testIsLiveDataWritesPromiseUndefined(false);
await conn._liveDataWritesPromise;
await testDocCount(2); // Now we're here and got the second document.
// Add several docs, frequently enough that we buffer multiple times before the next flush.
@@ -169,8 +182,11 @@ Tinytest.addAsync('livedata stub - buffering data', async function(test) {
tick(9); // 53 ticks (33 since last flush)
await addDoc(); // 8 docs
tick(9); // 62 ticks! (42 ticks since last flush, over max-age - next interval triggers flush)
testIsLiveDataWritesPromiseUndefined(true);
await testDocCount(2); // Still at 2 from before! (Just making sure)
tick(1); // Ok, 63 ticks (10 since last doc, so this should cause the flush of all the docs)
testIsLiveDataWritesPromiseUndefined(false);
await conn._liveDataWritesPromise;
await testDocCount(8); // See all the docs.
// Put things back how they were.

View File

@@ -118,7 +118,9 @@ Mongo.Collection = function Collection(name, options) {
this._name = name;
this._driver = options._driver;
this._maybeSetUpReplication(name, options);
// TODO[fibers]: _maybeSetUpReplication is now async. Let's watch how not waiting for this function to finish
// will affect everything
this._settingUpReplicationPromise = this._maybeSetUpReplication(name, options);
// XXX don't define these until allow or deny is actually used for this
// collection. Could be hard if the security rules are only defined on the
@@ -152,7 +154,7 @@ Mongo.Collection = function Collection(name, options) {
};
Object.assign(Mongo.Collection.prototype, {
_maybeSetUpReplication(name, { _suppressSameNameError = false }) {
async _maybeSetUpReplication(name, { _suppressSameNameError = false }) {
const self = this;
if (!(self._connection && self._connection.registerStore)) {
return;
@@ -161,7 +163,7 @@ Object.assign(Mongo.Collection.prototype, {
// OK, we're going to be a slave, replicating some remote
// database, except possibly with some temporary divergence while
// we have unacknowledged RPC's.
const ok = self._connection.registerStore(name, {
const ok = await self._connection.registerStore(name, {
// Called at the beginning of a batch of updates. batchSize is the number
// of update calls to expect.
//

View File

@@ -17,9 +17,9 @@
"integrity": "sha512-Yv0k4bXGOH+8a+7bELd2PqHQsuiANB+A8a4gnQrkRWzrkKlb6KHaVvyXhqs04sVW/OWlbPyYxRgYlIXLfrufMQ=="
},
"@types/express-serve-static-core": {
"version": "4.17.32",
"resolved": "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-4.17.32.tgz",
"integrity": "sha512-aI5h/VOkxOF2Z1saPy0Zsxs5avets/iaiAJYznQFm5By/pamU31xWKL//epiF4OfUA2qTOc9PV6tCUjhO8wlZA=="
"version": "4.17.33",
"resolved": "https://registry.npmjs.org/@types/express-serve-static-core/-/express-serve-static-core-4.17.33.tgz",
"integrity": "sha512-TPBqmR/HRYI3eC2E5hmiivIzv+bidAfXofM+sbonAGvyDhySGw9/PQZFt2BLOrjUUR++4eJVpx6KnLQK1Fk9tA=="
},
"@types/mime": {
"version": "3.0.1",
@@ -245,9 +245,9 @@
"integrity": "sha512-yIovAzMX49sF8Yl58fSCWJ5svSLuaibPxXQJFLmBObTuCr0Mf1KiPopGM9NiFjiYBCbfaa2Fh6breQ6ANVTI0A=="
},
"get-intrinsic": {
"version": "1.1.3",
"resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.1.3.tgz",
"integrity": "sha512-QJVz1Tj7MS099PevUG5jvnt9tSkXN8K14dxQlikJuPt4uD9hHAHjLyLBiLR5zELelBdD9QNRAXZzsJx0WaDL9A=="
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/get-intrinsic/-/get-intrinsic-1.2.0.tgz",
"integrity": "sha512-L049y6nFOuom5wGyRc3/gdTLO94dySVKRACj1RmJZBQXlbTMhtNIgkWkUHq+jYmZvKf14EW1EoJnnjbmoHij0Q=="
},
"get-stream": {
"version": "6.0.1",