From 913687cbd2d76c193362fe67ede16bcbc9855e19 Mon Sep 17 00:00:00 2001 From: Renan Castro Date: Thu, 18 Jan 2024 11:09:19 -0300 Subject: [PATCH] fix: wait for publication handler to resolve before processing next message in queue (except when unblocked explicitily) --- packages/ddp-server/livedata_server.js | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/ddp-server/livedata_server.js b/packages/ddp-server/livedata_server.js index ce1d76f679..de06084b45 100644 --- a/packages/ddp-server/livedata_server.js +++ b/packages/ddp-server/livedata_server.js @@ -671,7 +671,7 @@ Object.assign(Session.prototype, { }, protocol_handlers: { - sub: function (msg, unblock) { + sub: async function (msg, unblock) { var self = this; // cacheUnblock temporarly, so we can capture it later @@ -730,7 +730,7 @@ Object.assign(Session.prototype, { var handler = self.server.publish_handlers[msg.name]; - self._startSubscription(handler, msg.id, msg.params, msg.name); + await self._startSubscription(handler, msg.id, msg.params, msg.name); // cleaning cached unblock self.cachedUnblock = null; @@ -988,7 +988,7 @@ Object.assign(Session.prototype, { else self._universalSubs.push(sub); - sub._runHandler(); + return sub._runHandler(); }, // Tear down specified subscription @@ -1162,7 +1162,7 @@ var Subscription = function ( }; Object.assign(Subscription.prototype, { - _runHandler: function() { + _runHandler: async function() { // XXX should we unblock() here? Either before running the publish // function, or before running _publishCursor. // @@ -1202,17 +1202,17 @@ Object.assign(Subscription.prototype, { // Both conventional and async publish handler functions are supported. // If an object is returned with a then() function, it is either a promise // or thenable and will be resolved asynchronously. - const isThenable = + const isThenable = resultOrThenable && typeof resultOrThenable.then === 'function'; if (isThenable) { - Promise.resolve(resultOrThenable).then( - (...args) => self._publishHandlerResult.bind(self)(...args), - e => self.error(e) - ); + try { + self._publishHandlerResult(await resultOrThenable); + } catch(e) { + self.error(e) + } } else { self._publishHandlerResult(resultOrThenable); } - }, _publishHandlerResult: function (res) {