Merge pull request #12972 from renanccastro/fix/subscription-handler-wait

Meteor 3.0 - FIX - publication handler is not being waited on
This commit is contained in:
Denilson
2024-01-23 11:58:31 -04:00
committed by GitHub
2 changed files with 33 additions and 14 deletions

View File

@@ -671,7 +671,7 @@ Object.assign(Session.prototype, {
},
protocol_handlers: {
sub: function (msg, unblock) {
sub: async function (msg, unblock) {
var self = this;
// cacheUnblock temporarly, so we can capture it later
@@ -730,7 +730,7 @@ Object.assign(Session.prototype, {
var handler = self.server.publish_handlers[msg.name];
self._startSubscription(handler, msg.id, msg.params, msg.name);
await self._startSubscription(handler, msg.id, msg.params, msg.name);
// cleaning cached unblock
self.cachedUnblock = null;
@@ -988,7 +988,7 @@ Object.assign(Session.prototype, {
else
self._universalSubs.push(sub);
sub._runHandler();
return sub._runHandler();
},
// Tear down specified subscription
@@ -1162,7 +1162,7 @@ var Subscription = function (
};
Object.assign(Subscription.prototype, {
_runHandler: function() {
_runHandler: async function() {
// XXX should we unblock() here? Either before running the publish
// function, or before running _publishCursor.
//
@@ -1202,17 +1202,17 @@ Object.assign(Subscription.prototype, {
// Both conventional and async publish handler functions are supported.
// If an object is returned with a then() function, it is either a promise
// or thenable and will be resolved asynchronously.
const isThenable =
const isThenable =
resultOrThenable && typeof resultOrThenable.then === 'function';
if (isThenable) {
Promise.resolve(resultOrThenable).then(
(...args) => self._publishHandlerResult.bind(self)(...args),
e => self.error(e)
);
try {
self._publishHandlerResult(await resultOrThenable);
} catch(e) {
self.error(e)
}
} else {
self._publishHandlerResult(resultOrThenable);
}
},
_publishHandlerResult: function (res) {

View File

@@ -350,16 +350,35 @@ Meteor.methods({
},
});
Meteor.publish("livedata_server_test_sub_chain", async function () {
await new Promise((r) => setTimeout(r, 2000));
this.ready();
return null;
});
Tinytest.addAsync(
"livedata server - waiting for subscription chain",
(test, onComplete) =>
makeTestConnection(test, async (clientConn, serverConn) => {
const handlers = [];
for (let i = 0; i < 10; i++) {
handlers.push(clientConn.subscribe("livedata_server_test_sub_chain"));
}
await new Promise((r) => setTimeout(r, 3000));
test.equal(
handlers.map((sub) => sub.ready()).filter((o) => o).length === 1,
true
);
onComplete();
})
);
Tinytest.addAsync("livedata server - waiting for Promise", (test, onComplete) =>
makeTestConnection(test, async (clientConn, serverConn) => {
const testResolvedPromiseResult = await clientConn.callAsync(
"testResolvedPromise",
"clientConn.call"
);
test.equal(
testResolvedPromiseResult,
"clientConn.call after waiting"
);
test.equal(testResolvedPromiseResult, "clientConn.call after waiting");
const clientCallPromise = new Promise((resolve, reject) =>
clientConn.call(