diff --git a/packages/ddp-server/livedata_server.js b/packages/ddp-server/livedata_server.js index ce1d76f679..de06084b45 100644 --- a/packages/ddp-server/livedata_server.js +++ b/packages/ddp-server/livedata_server.js @@ -671,7 +671,7 @@ Object.assign(Session.prototype, { }, protocol_handlers: { - sub: function (msg, unblock) { + sub: async function (msg, unblock) { var self = this; // cacheUnblock temporarly, so we can capture it later @@ -730,7 +730,7 @@ Object.assign(Session.prototype, { var handler = self.server.publish_handlers[msg.name]; - self._startSubscription(handler, msg.id, msg.params, msg.name); + await self._startSubscription(handler, msg.id, msg.params, msg.name); // cleaning cached unblock self.cachedUnblock = null; @@ -988,7 +988,7 @@ Object.assign(Session.prototype, { else self._universalSubs.push(sub); - sub._runHandler(); + return sub._runHandler(); }, // Tear down specified subscription @@ -1162,7 +1162,7 @@ var Subscription = function ( }; Object.assign(Subscription.prototype, { - _runHandler: function() { + _runHandler: async function() { // XXX should we unblock() here? Either before running the publish // function, or before running _publishCursor. // @@ -1202,17 +1202,17 @@ Object.assign(Subscription.prototype, { // Both conventional and async publish handler functions are supported. // If an object is returned with a then() function, it is either a promise // or thenable and will be resolved asynchronously. - const isThenable = + const isThenable = resultOrThenable && typeof resultOrThenable.then === 'function'; if (isThenable) { - Promise.resolve(resultOrThenable).then( - (...args) => self._publishHandlerResult.bind(self)(...args), - e => self.error(e) - ); + try { + self._publishHandlerResult(await resultOrThenable); + } catch(e) { + self.error(e) + } } else { self._publishHandlerResult(resultOrThenable); } - }, _publishHandlerResult: function (res) { diff --git a/packages/ddp-server/livedata_server_tests.js b/packages/ddp-server/livedata_server_tests.js index cbdf4b632e..eb84c78fae 100644 --- a/packages/ddp-server/livedata_server_tests.js +++ b/packages/ddp-server/livedata_server_tests.js @@ -350,16 +350,35 @@ Meteor.methods({ }, }); +Meteor.publish("livedata_server_test_sub_chain", async function () { + await new Promise((r) => setTimeout(r, 2000)); + this.ready(); + return null; +}); + +Tinytest.addAsync( + "livedata server - waiting for subscription chain", + (test, onComplete) => + makeTestConnection(test, async (clientConn, serverConn) => { + const handlers = []; + for (let i = 0; i < 10; i++) { + handlers.push(clientConn.subscribe("livedata_server_test_sub_chain")); + } + await new Promise((r) => setTimeout(r, 3000)); + test.equal( + handlers.map((sub) => sub.ready()).filter((o) => o).length === 1, + true + ); + onComplete(); + }) +); Tinytest.addAsync("livedata server - waiting for Promise", (test, onComplete) => makeTestConnection(test, async (clientConn, serverConn) => { const testResolvedPromiseResult = await clientConn.callAsync( "testResolvedPromise", "clientConn.call" ); - test.equal( - testResolvedPromiseResult, - "clientConn.call after waiting" - ); + test.equal(testResolvedPromiseResult, "clientConn.call after waiting"); const clientCallPromise = new Promise((resolve, reject) => clientConn.call(