From 4e8c7cf1815298da77f206c2b94d9b52f12e5e60 Mon Sep 17 00:00:00 2001 From: Leonardo Venturini Date: Tue, 5 Nov 2024 18:40:50 -0400 Subject: [PATCH] fix potential race condition dropping messages --- .../ddp-client/common/livedata_connection.js | 119 ++++++++---------- 1 file changed, 50 insertions(+), 69 deletions(-) diff --git a/packages/ddp-client/common/livedata_connection.js b/packages/ddp-client/common/livedata_connection.js index 31081f7bcb..75fe2ae7cf 100644 --- a/packages/ddp-client/common/livedata_connection.js +++ b/packages/ddp-client/common/livedata_connection.js @@ -941,7 +941,7 @@ export class Connection { // documents. _saveOriginals() { if (! this._waitingForQuiescence()) { - this._flushBufferedWritesClient(); + this._flushBufferedWrites(); } Object.values(this._stores).forEach((store) => { @@ -1332,61 +1332,40 @@ export class Connection { return writes; } - async _flushBufferedWritesServer() { - const self = this; - const writes = self._prepareBuffersToFlush(); - await self._performWritesServer(writes); - } - _flushBufferedWritesClient() { - const self = this; - const writes = self._prepareBuffersToFlush(); - self._performWritesClient(writes); - } - _flushBufferedWrites() { - const self = this; - return Meteor.isClient - ? self._flushBufferedWritesClient() - : self._flushBufferedWritesServer(); - } + /** + * Server-side store updates handled asynchronously + * @private + */ async _performWritesServer(updates) { const self = this; - if (self._resetStores || ! isEmpty(updates)) { - // Begin a transactional update of each store. - - for (const [storeName, store] of Object.entries(self._stores)) { + if (self._resetStores || !isEmpty(updates)) { + // Start all store updates + for (const store of Object.values(self._stores)) { await store.beginUpdate( - hasOwn.call(updates, storeName) - ? updates[storeName].length - : 0, + updates[store._name]?.length || 0, self._resetStores ); } self._resetStores = false; - for (const [storeName, updateMessages] of Object.entries(updates)) { + // Process each store's updates + for (const [storeName, messages] of Object.entries(updates)) { const store = self._stores[storeName]; if (store) { - for (const updateMessage of updateMessages) { - await store.update(updateMessage); + for (const msg of messages) { + await store.update(msg); } } else { - // Nobody's listening for this data. Queue it up until - // someone wants it. - // XXX memory use will grow without bound if you forget to - // create a collection or just don't care about it... going - // to have to do something about that. - const updates = self._updatesForUnknownStores; - - if (! hasOwn.call(updates, storeName)) { - updates[storeName] = []; - } - - updates[storeName].push(...updateMessages); + // Queue updates for uninitialized stores + self._updatesForUnknownStores[storeName] = + self._updatesForUnknownStores[storeName] || []; + self._updatesForUnknownStores[storeName].push(...messages); } } - // End update transaction. + + // Complete all updates for (const store of Object.values(self._stores)) { await store.endUpdate(); } @@ -1394,53 +1373,55 @@ export class Connection { self._runAfterUpdateCallbacks(); } + + /** + * Client-side store updates handled synchronously for optimistic UI + * @private + */ _performWritesClient(updates) { const self = this; - if (self._resetStores || ! isEmpty(updates)) { - // Begin a transactional update of each store. - - for (const [storeName, store] of Object.entries(self._stores)) { + if (self._resetStores || !isEmpty(updates)) { + // Synchronous store updates for client + Object.values(self._stores).forEach(store => { store.beginUpdate( - hasOwn.call(updates, storeName) - ? updates[storeName].length - : 0, + updates[store._name]?.length || 0, self._resetStores ); - } + }); self._resetStores = false; - for (const [storeName, updateMessages] of Object.entries(updates)) { + Object.entries(updates).forEach(([storeName, messages]) => { const store = self._stores[storeName]; if (store) { - for (const updateMessage of updateMessages) { - store.update(updateMessage); - } + messages.forEach(msg => store.update(msg)); } else { - // Nobody's listening for this data. Queue it up until - // someone wants it. - // XXX memory use will grow without bound if you forget to - // create a collection or just don't care about it... going - // to have to do something about that. - const updates = self._updatesForUnknownStores; - - if (! hasOwn.call(updates, storeName)) { - updates[storeName] = []; - } - - updates[storeName].push(...updateMessages); + self._updatesForUnknownStores[storeName] = + self._updatesForUnknownStores[storeName] || []; + self._updatesForUnknownStores[storeName].push(...messages); } - } - // End update transaction. - for (const store of Object.values(self._stores)) { - store.endUpdate(); - } + }); + + Object.values(self._stores).forEach(store => store.endUpdate()); } self._runAfterUpdateCallbacks(); } + /** + * Executes buffered writes either synchronously (client) or async (server) + * @private + */ + _flushBufferedWrites() { + const self = this; + const writes = self._prepareBuffersToFlush(); + + return Meteor.isClient + ? self._performWritesClient(writes) + : self._performWritesServer(writes); + } + // Call any callbacks deferred with _runWhenAllServerDocsAreFlushed whose // relevant docs have been flushed, as well as dataVisible callbacks at // reconnect-quiescence time.