fix potential race condition dropping messages

This commit is contained in:
Leonardo Venturini
2024-11-05 18:40:50 -04:00
parent 104cede0e1
commit 4e8c7cf181

View File

@@ -941,7 +941,7 @@ export class Connection {
// documents.
_saveOriginals() {
if (! this._waitingForQuiescence()) {
this._flushBufferedWritesClient();
this._flushBufferedWrites();
}
Object.values(this._stores).forEach((store) => {
@@ -1332,61 +1332,40 @@ export class Connection {
return writes;
}
async _flushBufferedWritesServer() {
const self = this;
const writes = self._prepareBuffersToFlush();
await self._performWritesServer(writes);
}
_flushBufferedWritesClient() {
const self = this;
const writes = self._prepareBuffersToFlush();
self._performWritesClient(writes);
}
_flushBufferedWrites() {
const self = this;
return Meteor.isClient
? self._flushBufferedWritesClient()
: self._flushBufferedWritesServer();
}
/**
* Server-side store updates handled asynchronously
* @private
*/
async _performWritesServer(updates) {
const self = this;
if (self._resetStores || ! isEmpty(updates)) {
// Begin a transactional update of each store.
for (const [storeName, store] of Object.entries(self._stores)) {
if (self._resetStores || !isEmpty(updates)) {
// Start all store updates
for (const store of Object.values(self._stores)) {
await store.beginUpdate(
hasOwn.call(updates, storeName)
? updates[storeName].length
: 0,
updates[store._name]?.length || 0,
self._resetStores
);
}
self._resetStores = false;
for (const [storeName, updateMessages] of Object.entries(updates)) {
// Process each store's updates
for (const [storeName, messages] of Object.entries(updates)) {
const store = self._stores[storeName];
if (store) {
for (const updateMessage of updateMessages) {
await store.update(updateMessage);
for (const msg of messages) {
await store.update(msg);
}
} else {
// Nobody's listening for this data. Queue it up until
// someone wants it.
// XXX memory use will grow without bound if you forget to
// create a collection or just don't care about it... going
// to have to do something about that.
const updates = self._updatesForUnknownStores;
if (! hasOwn.call(updates, storeName)) {
updates[storeName] = [];
}
updates[storeName].push(...updateMessages);
// Queue updates for uninitialized stores
self._updatesForUnknownStores[storeName] =
self._updatesForUnknownStores[storeName] || [];
self._updatesForUnknownStores[storeName].push(...messages);
}
}
// End update transaction.
// Complete all updates
for (const store of Object.values(self._stores)) {
await store.endUpdate();
}
@@ -1394,53 +1373,55 @@ export class Connection {
self._runAfterUpdateCallbacks();
}
/**
* Client-side store updates handled synchronously for optimistic UI
* @private
*/
_performWritesClient(updates) {
const self = this;
if (self._resetStores || ! isEmpty(updates)) {
// Begin a transactional update of each store.
for (const [storeName, store] of Object.entries(self._stores)) {
if (self._resetStores || !isEmpty(updates)) {
// Synchronous store updates for client
Object.values(self._stores).forEach(store => {
store.beginUpdate(
hasOwn.call(updates, storeName)
? updates[storeName].length
: 0,
updates[store._name]?.length || 0,
self._resetStores
);
}
});
self._resetStores = false;
for (const [storeName, updateMessages] of Object.entries(updates)) {
Object.entries(updates).forEach(([storeName, messages]) => {
const store = self._stores[storeName];
if (store) {
for (const updateMessage of updateMessages) {
store.update(updateMessage);
}
messages.forEach(msg => store.update(msg));
} else {
// Nobody's listening for this data. Queue it up until
// someone wants it.
// XXX memory use will grow without bound if you forget to
// create a collection or just don't care about it... going
// to have to do something about that.
const updates = self._updatesForUnknownStores;
if (! hasOwn.call(updates, storeName)) {
updates[storeName] = [];
}
updates[storeName].push(...updateMessages);
self._updatesForUnknownStores[storeName] =
self._updatesForUnknownStores[storeName] || [];
self._updatesForUnknownStores[storeName].push(...messages);
}
}
// End update transaction.
for (const store of Object.values(self._stores)) {
store.endUpdate();
}
});
Object.values(self._stores).forEach(store => store.endUpdate());
}
self._runAfterUpdateCallbacks();
}
/**
* Executes buffered writes either synchronously (client) or async (server)
* @private
*/
_flushBufferedWrites() {
const self = this;
const writes = self._prepareBuffersToFlush();
return Meteor.isClient
? self._performWritesClient(writes)
: self._performWritesServer(writes);
}
// Call any callbacks deferred with _runWhenAllServerDocsAreFlushed whose
// relevant docs have been flushed, as well as dataVisible callbacks at
// reconnect-quiescence time.