Create Meteor._runAsync helper.

This commit is contained in:
Matheus Castro
2022-08-29 20:30:46 -03:00
parent bd61ad58ba
commit 8d5983d2bc
2 changed files with 26 additions and 49 deletions

View File

@@ -1,7 +1,5 @@
DDPServer = {};
var Fiber = Npm.require('fibers');
const ASL = global.asyncLocalStorage;
// Publication strategies define how we handle data from published cursors at the collection level
// This allows someone to:
// - Choose a trade-off between client-server bandwidth and server memory usage
@@ -330,15 +328,9 @@ var Session = function (server, version, socket, options) {
self.send({ msg: 'connected', session: self.id });
// On initial connect, spin up all the universal publishers.
if (Meteor._isFibersEnabled) {
Fiber(function() {
self.startUniversalSubs();
}).run();
} else {
ASL.run(Meteor._getAslStore, function () {
Meteor._runAsync(function() {
self.startUniversalSubs();
});
}
if (version !== 'pre1' && options.heartbeatInterval !== 0) {
// We no longer need the low level timeout because we have heartbeats.
@@ -562,15 +554,11 @@ Object.assign(Session.prototype, {
//
// Any message counts as receiving a pong, as it demonstrates that
// the client is still alive.
if (Meteor._isFibersEnabled && self.heartbeat) {
Fiber(function() {
self.heartbeat.messageReceived();
}).run();
} else if (self.heartbeat) {
ASL.run(Meteor._getAslStore, function () {
if (self.heartbeat) {
Meteor._runAsync(function() {
self.heartbeat.messageReceived();
});
}
};
if (self.version !== 'pre1' && msg_in.msg === 'ping') {
if (self._respondToPings)
@@ -616,12 +604,7 @@ Object.assign(Session.prototype, {
unblock(); // in case the handler didn't already do it
}
if (Meteor._isFibersEnabled) {
Fiber(runHandlers).run();
return;
}
ASL.run(Meteor._getAslStore, runHandlers);
Meteor._runAsync(runHandlers);
};
processNext();
@@ -1505,15 +1488,9 @@ Server = function (options = {}) {
return;
}
if (Meteor._isFibersEnabled) {
Fiber(function() {
Meteor._runAsync(function() {
self._handleConnect(socket, msg);
}).run();
} else {
ASL.run(Meteor._getAslStore, function () {
self._handleConnect(socket, msg);
});
}
})
return;
}
@@ -1531,14 +1508,7 @@ Server = function (options = {}) {
socket.on('close', function () {
if (socket._meteorSession) {
if (Meteor._isFibersEnabled) {
Fiber(function() {
socket._meteorSession.close()
}).run();
return;
}
ASL.run(Meteor._getAslStore, function () {
Meteor._runAsync(function() {
socket._meteorSession.close();
});
}
@@ -1718,14 +1688,7 @@ Object.assign(Server.prototype, {
// self.sessions to change while we're running this loop.
self.sessions.forEach(function (session) {
if (!session._dontStartNewUniversalSubs) {
if (Meteor._isFibersEnabled) {
Fiber(function() {
session._startSubscription(handler);
}).run();
return;
}
ASL.run(Meteor._getAslStore, function() {
Meteor._runAsync(function() {
session._startSubscription(handler);
});
}

View File

@@ -6,3 +6,17 @@ Meteor._isFibersEnabled = !process.env.DISABLE_FIBERS && Meteor.isServer;
Meteor._getAslStore = getAslStore;
Meteor._getValueFromAslStore = getValueFromAslStore;
Meteor._updateAslStore = updateAslStore;
Meteor._runAsync = (fn, ctx) => {
if (Meteor._isFibersEnabled) {
const Fiber = Npm.require('fibers');
return Fiber(() => {
fn.call(ctx);
}).run();
}
global.asyncLocalStorage.run(Meteor._getAslStore(), () => {
fn.call(ctx);
});
};