fix: wait for publication handler to resolve before processing next message in queue (except when unblocked explicitily)

This commit is contained in:
Renan Castro
2024-01-18 11:09:19 -03:00
parent f06bdb925a
commit 913687cbd2

View File

@@ -671,7 +671,7 @@ Object.assign(Session.prototype, {
},
protocol_handlers: {
sub: function (msg, unblock) {
sub: async function (msg, unblock) {
var self = this;
// cacheUnblock temporarly, so we can capture it later
@@ -730,7 +730,7 @@ Object.assign(Session.prototype, {
var handler = self.server.publish_handlers[msg.name];
self._startSubscription(handler, msg.id, msg.params, msg.name);
await self._startSubscription(handler, msg.id, msg.params, msg.name);
// cleaning cached unblock
self.cachedUnblock = null;
@@ -988,7 +988,7 @@ Object.assign(Session.prototype, {
else
self._universalSubs.push(sub);
sub._runHandler();
return sub._runHandler();
},
// Tear down specified subscription
@@ -1162,7 +1162,7 @@ var Subscription = function (
};
Object.assign(Subscription.prototype, {
_runHandler: function() {
_runHandler: async function() {
// XXX should we unblock() here? Either before running the publish
// function, or before running _publishCursor.
//
@@ -1202,17 +1202,17 @@ Object.assign(Subscription.prototype, {
// Both conventional and async publish handler functions are supported.
// If an object is returned with a then() function, it is either a promise
// or thenable and will be resolved asynchronously.
const isThenable =
const isThenable =
resultOrThenable && typeof resultOrThenable.then === 'function';
if (isThenable) {
Promise.resolve(resultOrThenable).then(
(...args) => self._publishHandlerResult.bind(self)(...args),
e => self.error(e)
);
try {
self._publishHandlerResult(await resultOrThenable);
} catch(e) {
self.error(e)
}
} else {
self._publishHandlerResult(resultOrThenable);
}
},
_publishHandlerResult: function (res) {