From 913687cbd2d76c193362fe67ede16bcbc9855e19 Mon Sep 17 00:00:00 2001 From: Renan Castro Date: Thu, 18 Jan 2024 11:09:19 -0300 Subject: [PATCH 1/3] fix: wait for publication handler to resolve before processing next message in queue (except when unblocked explicitily) --- packages/ddp-server/livedata_server.js | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/ddp-server/livedata_server.js b/packages/ddp-server/livedata_server.js index ce1d76f679..de06084b45 100644 --- a/packages/ddp-server/livedata_server.js +++ b/packages/ddp-server/livedata_server.js @@ -671,7 +671,7 @@ Object.assign(Session.prototype, { }, protocol_handlers: { - sub: function (msg, unblock) { + sub: async function (msg, unblock) { var self = this; // cacheUnblock temporarly, so we can capture it later @@ -730,7 +730,7 @@ Object.assign(Session.prototype, { var handler = self.server.publish_handlers[msg.name]; - self._startSubscription(handler, msg.id, msg.params, msg.name); + await self._startSubscription(handler, msg.id, msg.params, msg.name); // cleaning cached unblock self.cachedUnblock = null; @@ -988,7 +988,7 @@ Object.assign(Session.prototype, { else self._universalSubs.push(sub); - sub._runHandler(); + return sub._runHandler(); }, // Tear down specified subscription @@ -1162,7 +1162,7 @@ var Subscription = function ( }; Object.assign(Subscription.prototype, { - _runHandler: function() { + _runHandler: async function() { // XXX should we unblock() here? Either before running the publish // function, or before running _publishCursor. // @@ -1202,17 +1202,17 @@ Object.assign(Subscription.prototype, { // Both conventional and async publish handler functions are supported. // If an object is returned with a then() function, it is either a promise // or thenable and will be resolved asynchronously. - const isThenable = + const isThenable = resultOrThenable && typeof resultOrThenable.then === 'function'; if (isThenable) { - Promise.resolve(resultOrThenable).then( - (...args) => self._publishHandlerResult.bind(self)(...args), - e => self.error(e) - ); + try { + self._publishHandlerResult(await resultOrThenable); + } catch(e) { + self.error(e) + } } else { self._publishHandlerResult(resultOrThenable); } - }, _publishHandlerResult: function (res) { From 9e0990ee87729ec75f74edcdd8f3075dfd625d8a Mon Sep 17 00:00:00 2001 From: Renan Castro Date: Thu, 18 Jan 2024 22:47:31 -0300 Subject: [PATCH 2/3] fix: add regression test --- packages/ddp-server/livedata_server_tests.js | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/packages/ddp-server/livedata_server_tests.js b/packages/ddp-server/livedata_server_tests.js index cbdf4b632e..867283fa4f 100644 --- a/packages/ddp-server/livedata_server_tests.js +++ b/packages/ddp-server/livedata_server_tests.js @@ -350,6 +350,21 @@ Meteor.methods({ }, }); +Meteor.publish("livedata_server_test_sub_chain", async function () { + await new Promise(r => setTimeout(r, 2000)); + this.ready(); + return null; +}); + +Tinytest.addAsync("livedata server - waiting for subscription chain", (test, onComplete) => makeTestConnection(test, async (clientConn, serverConn) => { + const handlers = []; + for(let i=0; i< 10; i++){ + handlers.push(clientConn.subscribe('livedata_server_test_sub_chain')); + } + await new Promise(r => setTimeout(r, 3000)); + test.equal(handlers.map(sub => sub.ready()).filter(o=>o).length === 1, true); + onComplete(); +})); Tinytest.addAsync("livedata server - waiting for Promise", (test, onComplete) => makeTestConnection(test, async (clientConn, serverConn) => { const testResolvedPromiseResult = await clientConn.callAsync( From c5105436d924b6ce03e5370d01e5b7119454610c Mon Sep 17 00:00:00 2001 From: Renan Castro Date: Fri, 19 Jan 2024 09:03:08 -0300 Subject: [PATCH 3/3] fix: formatting --- packages/ddp-server/livedata_server_tests.js | 32 +++++++++++--------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/packages/ddp-server/livedata_server_tests.js b/packages/ddp-server/livedata_server_tests.js index 867283fa4f..eb84c78fae 100644 --- a/packages/ddp-server/livedata_server_tests.js +++ b/packages/ddp-server/livedata_server_tests.js @@ -351,30 +351,34 @@ Meteor.methods({ }); Meteor.publish("livedata_server_test_sub_chain", async function () { - await new Promise(r => setTimeout(r, 2000)); + await new Promise((r) => setTimeout(r, 2000)); this.ready(); return null; }); -Tinytest.addAsync("livedata server - waiting for subscription chain", (test, onComplete) => makeTestConnection(test, async (clientConn, serverConn) => { - const handlers = []; - for(let i=0; i< 10; i++){ - handlers.push(clientConn.subscribe('livedata_server_test_sub_chain')); - } - await new Promise(r => setTimeout(r, 3000)); - test.equal(handlers.map(sub => sub.ready()).filter(o=>o).length === 1, true); - onComplete(); -})); +Tinytest.addAsync( + "livedata server - waiting for subscription chain", + (test, onComplete) => + makeTestConnection(test, async (clientConn, serverConn) => { + const handlers = []; + for (let i = 0; i < 10; i++) { + handlers.push(clientConn.subscribe("livedata_server_test_sub_chain")); + } + await new Promise((r) => setTimeout(r, 3000)); + test.equal( + handlers.map((sub) => sub.ready()).filter((o) => o).length === 1, + true + ); + onComplete(); + }) +); Tinytest.addAsync("livedata server - waiting for Promise", (test, onComplete) => makeTestConnection(test, async (clientConn, serverConn) => { const testResolvedPromiseResult = await clientConn.callAsync( "testResolvedPromise", "clientConn.call" ); - test.equal( - testResolvedPromiseResult, - "clientConn.call after waiting" - ); + test.equal(testResolvedPromiseResult, "clientConn.call after waiting"); const clientCallPromise = new Promise((resolve, reject) => clientConn.call(